7676
7777
7878#include "multimaster.h"
79+ #include "state.h"
7980
8081#define MAX_ROUTES 16
8182#define INIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189190 MtmUnregisterSocket (sockets [node ]);
190191 pg_closesocket (sockets [node ], MtmUseRDMA );
191192 sockets [node ] = -1 ;
192- MtmOnNodeDisconnect (node + 1 );
193193}
194194
195195static int MtmWaitSocket (int sd , bool forWrite , timestamp_t timeoutMsec )
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316316 } else {
317317 BIT_CLEAR (Mtm -> currentLockNodeMask , resp -> node - 1 );
318318 }
319- if (
320- ( BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 ) || Mtm -> status == MTM_IN_MINORITY )
321- && !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )
322- && Mtm -> status != MTM_RECOVERY
323- && Mtm -> status != MTM_RECOVERED
324- && Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC (MtmNodeDisableDelay ) < MtmGetSystemTime ())
325- {
326- MTM_ELOG (WARNING , "Node %d thinks that I'm dead, while I'm %s (message %s)" , resp -> node , MtmNodeStatusMnem [Mtm -> status ], MtmMessageKindMnem [resp -> code ]);
327- BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
328- Mtm -> nConfigChanges += 1 ;
329- MtmSwitchClusterMode (MTM_RECOVERY );
330- } else if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) && sockets [resp -> node - 1 ] < 0 ) {
331- /* We receive heartbeat from disabled node.
319+
320+ // if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321+ // {
322+ // MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323+ // }
324+
325+ if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) &&
326+ sockets [resp -> node - 1 ] < 0 )
327+ {
328+ /* We've received heartbeat from disabled node.
332329 * Looks like it is restarted.
333330 * Try to reconnect to it.
334331 */
335332 MTM_ELOG (WARNING , "Receive heartbeat from disabled node %d" , resp -> node );
336333 BIT_SET (Mtm -> reconnectMask , resp -> node - 1 );
337- }
334+ }
338335}
339336
340337static void MtmScheduleHeartbeat ()
@@ -362,11 +359,17 @@ static void MtmSendHeartbeat()
362359 for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
363360 {
364361 if (i + 1 != MtmNodeId ) {
365- if (!BIT_CHECK (busy_mask , i )
366- && (Mtm -> status != MTM_ONLINE
367- || sockets [i ] >= 0
368- || !BIT_CHECK (Mtm -> disabledNodeMask , i )
369- || BIT_CHECK (Mtm -> reconnectMask , i )))
362+ if (!BIT_CHECK (busy_mask , i ))
363+ /*
364+ * Old behaviour here can cause subtle bugs, for example
365+ * it can happened that none of mentioned conditiotions is
366+ * true when disabled node connects to a major node which
367+ * is online. So just send it allways. --sk
368+ */
369+ // && (Mtm->status != MTM_ONLINE
370+ // || sockets[i] >= 0
371+ // || !BIT_CHECK(Mtm->disabledNodeMask, i)
372+ // || BIT_CHECK(Mtm->reconnectMask, i)))
370373 {
371374 if (!MtmSendToNode (i , & msg , sizeof (msg ))) {
372375 MTM_ELOG (LOG , "Arbiter failed to send heartbeat to node %d" , i + 1 );
@@ -543,17 +546,9 @@ static void MtmOpenConnections()
543546 for (i = 0 ; i < nNodes ; i ++ ) {
544547 if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
545548 sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort );
546- if (sockets [i ] < 0 ) {
547- MtmOnNodeDisconnect (i + 1 );
548- }
549549 }
550550 }
551- if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
552- MTM_ELOG (WARNING , "Node is out of quorum: only %d nodes of %d are accessible" , Mtm -> nLiveNodes , Mtm -> nAllNodes );
553- MtmSwitchClusterMode (MTM_IN_MINORITY );
554- } else if (Mtm -> status == MTM_INITIALIZATION ) {
555- MtmSwitchClusterMode (MTM_CONNECTED );
556- }
551+ MtmStateProcessEvent (MTM_ARBITER_RECEIVER_START );
557552}
558553
559554
@@ -586,7 +581,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586581 }
587582 sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
588583 if (sockets [node ] < 0 ) {
589- MtmOnNodeDisconnect (node + 1 );
590584 result = false;
591585 break ;
592586 }
@@ -716,16 +710,18 @@ static void MtmSender(Datum arg)
716710{
717711 int nNodes = MtmMaxNodes ;
718712 int i ;
713+ MtmBuffer * txBuffer ;
719714
720715 MtmBackgroundWorker = true;
721716
722- MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
717+ txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
723718 MTM_ELOG (LOG , "Start arbiter sender %d" , MyProcPid );
724719 InitializeTimeouts ();
725720
726721 pqsignal (SIGINT , SetStop );
727722 pqsignal (SIGQUIT , SetStop );
728723 pqsignal (SIGTERM , SetStop );
724+ pqsignal (SIGHUP , PostgresSigHupHandler );
729725
730726 /* We're now ready to receive signals */
731727 BackgroundWorkerUnblockSignals ();
@@ -744,6 +740,12 @@ static void MtmSender(Datum arg)
744740 PGSemaphoreLock (& Mtm -> sendSemaphore );
745741 CHECK_FOR_INTERRUPTS ();
746742
743+ if (ConfigReloadPending )
744+ {
745+ ConfigReloadPending = false;
746+ ProcessConfigFile (PGC_SIGHUP );
747+ }
748+
747749 MtmCheckHeartbeat ();
748750 /*
749751 * Use shared lock to improve locality,
@@ -805,6 +807,7 @@ static void MtmMonitor(Datum arg)
805807 pqsignal (SIGINT , SetStop );
806808 pqsignal (SIGQUIT , SetStop );
807809 pqsignal (SIGTERM , SetStop );
810+ pqsignal (SIGHUP , PostgresSigHupHandler );
808811
809812 MtmBackgroundWorker = true;
810813
@@ -819,6 +822,13 @@ static void MtmMonitor(Datum arg)
819822 if (rc & WL_POSTMASTER_DEATH ) {
820823 break ;
821824 }
825+
826+ if (ConfigReloadPending )
827+ {
828+ ConfigReloadPending = false;
829+ ProcessConfigFile (PGC_SIGHUP );
830+ }
831+
822832 MtmRefreshClusterStatus ();
823833 }
824834}
@@ -844,6 +854,7 @@ static void MtmReceiver(Datum arg)
844854 pqsignal (SIGINT , SetStop );
845855 pqsignal (SIGQUIT , SetStop );
846856 pqsignal (SIGTERM , SetStop );
857+ pqsignal (SIGHUP , PostgresSigHupHandler );
847858
848859 MtmBackgroundWorker = true;
849860
@@ -879,7 +890,14 @@ static void MtmReceiver(Datum arg)
879890 for (j = 0 ; j < n ; j ++ ) {
880891 if (events [j ].events & EPOLLIN )
881892#else
882- fd_set events ;
893+ fd_set events ;
894+
895+ if (ConfigReloadPending )
896+ {
897+ ConfigReloadPending = false;
898+ ProcessConfigFile (PGC_SIGHUP );
899+ }
900+
883901 do {
884902 struct timeval tv ;
885903 events = inset ;
@@ -969,6 +987,7 @@ static void MtmReceiver(Datum arg)
969987 msg -> gid , MtmTxnStatusMnem [msg -> status ], node );
970988
971989 replorigin_session_origin = DoNotReplicateId ;
990+ TXFINISH ("%s ABORT, MSG_POLL_STATUS" , msg -> gid );
972991 MtmFinishPreparedTransaction (ts , false);
973992 replorigin_session_origin = InvalidRepOriginId ;
974993 }
@@ -982,6 +1001,7 @@ static void MtmReceiver(Datum arg)
9821001 MTM_ELOG (LOG , "Commit transaction %s because it is prepared at all live nodes" , msg -> gid );
9831002
9841003 replorigin_session_origin = DoNotReplicateId ;
1004+ TXFINISH ("%s COMMIT, MSG_POLL_STATUS" , msg -> gid );
9851005 MtmFinishPreparedTransaction (ts , true);
9861006 replorigin_session_origin = InvalidRepOriginId ;
9871007 } else {
@@ -1006,7 +1026,7 @@ static void MtmReceiver(Datum arg)
10061026 default :
10071027 break ;
10081028 }
1009- if (BIT_CHECK (msg -> disabledNodeMask , node - 1 )) {
1029+ if (BIT_CHECK (msg -> disabledNodeMask , node - 1 ) || BIT_CHECK ( Mtm -> disabledNodeMask , node - 1 ) ) {
10101030 MTM_ELOG (WARNING , "Ignore message from dead node %d\n" , node );
10111031 continue ;
10121032 }
@@ -1057,17 +1077,10 @@ static void MtmReceiver(Datum arg)
10571077 if (ts -> isTwoPhase ) {
10581078 MtmWakeUpBackend (ts );
10591079 } else if (MtmUseDtm ) {
1060- ts -> votedMask = 0 ;
10611080 MTM_TXTRACE (ts , "MtmTransReceiver send MSG_PRECOMMIT" );
10621081 Assert (replorigin_session_origin == InvalidRepOriginId );
1063- MTM_LOG2 ("SetPreparedTransactionState for %s" , ts -> gid );
1064- MtmUnlock ();
1065- MtmResetTransaction ();
1066- StartTransactionCommand ();
1067- SetPreparedTransactionState (ts -> gid , MULTIMASTER_PRECOMMITTED );
1068- CommitTransactionCommand ();
1069- Assert (!MtmTransIsActive ());
1070- MtmLock (LW_EXCLUSIVE );
1082+ ts -> isPrepared = false;
1083+ SetLatch (& ProcGlobal -> allProcs [ts -> procno ].procLatch );
10711084 } else {
10721085 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
10731086 MtmWakeUpBackend (ts );
@@ -1084,7 +1097,7 @@ static void MtmReceiver(Datum arg)
10841097 if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
10851098 MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d" , ts -> gid , (long64 )ts -> xid , node );
10861099 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1087- ts -> aborted_by_node = node ;
1100+ ts -> abortedByNode = node ;
10881101 MtmAbortTransaction (ts );
10891102 }
10901103 if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
@@ -1161,4 +1174,3 @@ static void MtmReceiver(Datum arg)
11611174 }
11621175 proc_exit (1 ); /* force restart of this bgwroker */
11631176}
1164-
0 commit comments