@@ -643,33 +643,36 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
643643{
644644 MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" , MyProcPid , x -> xid , x -> isPrepared , x -> isDistributed , commit ? "commit" : "abort" );
645645 if (x -> isDistributed && (x -> isPrepared || x -> isReplicated )) {
646- MtmTransState * ts ;
646+ MtmTransState * ts = NULL ;
647647 MtmLock (LW_EXCLUSIVE );
648648 if (x -> isPrepared ) {
649649 ts = hash_search (xid2state , & x -> xid , HASH_FIND , NULL );
650650 Assert (ts != NULL );
651651 } else {
652652 MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid , x -> gid , HASH_REMOVE , NULL );
653- Assert (hm != NULL );
654- ts = hm -> state ;
653+ if (hm != NULL ) {
654+ ts = hm -> state ;
655+ }
655656 }
656- if (commit ) {
657- ts -> status = TRANSACTION_STATUS_COMMITTED ;
658- if (x -> csn > ts -> csn ) {
659- ts -> csn = x -> csn ;
660- MtmSyncClock (ts -> csn );
657+ if (ts != NULL ) {
658+ if (commit ) {
659+ ts -> status = TRANSACTION_STATUS_COMMITTED ;
660+ if (x -> csn > ts -> csn ) {
661+ ts -> csn = x -> csn ;
662+ MtmSyncClock (ts -> csn );
663+ }
664+ } else {
665+ ts -> status = TRANSACTION_STATUS_ABORTED ;
666+ if (x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
667+ /*
668+ * Send notification only of ABORT happens during transaction processing at replicas,
669+ * do not send notification if ABORT is receiver from master
670+ */
671+ MtmSendNotificationMessage (ts ); /* send notification to coordinator */
672+ }
661673 }
662- } else {
663- ts -> status = TRANSACTION_STATUS_ABORTED ;
664- if (x -> isReplicated && TransactionIdIsValid (x -> gtid .xid )) {
665- /*
666- * Send notification only of ABORT happens during transaction processing at replicas,
667- * do not send notification if ABORT is receiver from master
668- */
669- MtmSendNotificationMessage (ts ); /* send notification to coordinator */
670- }
674+ MtmAdjustSubtransactions (ts );
671675 }
672- MtmAdjustSubtransactions (ts );
673676 MtmUnlock ();
674677 }
675678 x -> snapshot = INVALID_CSN ;
@@ -1691,6 +1694,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
16911694 DestReceiver * dest , char * completionTag )
16921695{
16931696 bool skipCommand ;
1697+ MTM_TRACE ("%d: Process utility statement %s\n" , MyProcPid , queryString );
16941698 switch (nodeTag (parsetree ))
16951699 {
16961700 case T_TransactionStmt :
@@ -1702,8 +1706,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
17021706 if (dtmTx .isDistributed && dtmTx .containsDML ) {
17031707 char gid [MULTIMASTER_MAX_GID_SIZE ];
17041708 MtmGenerateGid (gid );
1709+ MTM_TRACE ("%d: Start 2PC with GID=%s for %s\n" , MyProcPid , gid , queryString );
17051710 if (!IsTransactionBlock ()) {
17061711 elog (WARNING , "Start transaction block for %d" , dtmTx .xid );
1712+ BeginTransactionBlock ();
17071713 CommitTransactionCommand ();
17081714 StartTransactionCommand ();
17091715 }
0 commit comments