7676
7777
7878#include "multimaster.h"
79+ #include "state.h"
7980
8081#define MAX_ROUTES 16
8182#define INIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189190 MtmUnregisterSocket (sockets [node ]);
190191 pg_closesocket (sockets [node ], MtmUseRDMA );
191192 sockets [node ] = -1 ;
192- MtmOnNodeDisconnect (node + 1 );
193193}
194194
195195static int MtmWaitSocket (int sd , bool forWrite , timestamp_t timeoutMsec )
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316316 } else {
317317 BIT_CLEAR (Mtm -> currentLockNodeMask , resp -> node - 1 );
318318 }
319- if (
320- ( BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 ) || Mtm -> status == MTM_IN_MINORITY )
321- && !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )
322- && Mtm -> status != MTM_RECOVERY
323- && Mtm -> status != MTM_RECOVERED
324- && Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC (MtmNodeDisableDelay ) < MtmGetSystemTime ())
325- {
326- MTM_ELOG (WARNING , "Node %d thinks that I'm dead, while I'm %s (message %s)" , resp -> node , MtmNodeStatusMnem [Mtm -> status ], MtmMessageKindMnem [resp -> code ]);
327- BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
328- Mtm -> nConfigChanges += 1 ;
329- MtmSwitchClusterMode (MTM_RECOVERY );
330- } else if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) && sockets [resp -> node - 1 ] < 0 ) {
331- /* We receive heartbeat from disabled node.
319+
320+ // if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321+ // {
322+ // MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323+ // }
324+
325+ if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) &&
326+ sockets [resp -> node - 1 ] < 0 )
327+ {
328+ /* We've received heartbeat from disabled node.
332329 * Looks like it is restarted.
333330 * Try to reconnect to it.
334331 */
335332 MTM_ELOG (WARNING , "Receive heartbeat from disabled node %d" , resp -> node );
336333 BIT_SET (Mtm -> reconnectMask , resp -> node - 1 );
337- }
334+ }
338335}
339336
340337static void MtmScheduleHeartbeat ()
@@ -543,17 +540,9 @@ static void MtmOpenConnections()
543540 for (i = 0 ; i < nNodes ; i ++ ) {
544541 if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
545542 sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort );
546- if (sockets [i ] < 0 ) {
547- MtmOnNodeDisconnect (i + 1 );
548- }
549543 }
550544 }
551- if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
552- MTM_ELOG (WARNING , "Node is out of quorum: only %d nodes of %d are accessible" , Mtm -> nLiveNodes , Mtm -> nAllNodes );
553- MtmSwitchClusterMode (MTM_IN_MINORITY );
554- } else if (Mtm -> status == MTM_INITIALIZATION ) {
555- MtmSwitchClusterMode (MTM_CONNECTED );
556- }
545+ MtmStateProcessEvent (MTM_ARBITER_RECEIVER_START );
557546}
558547
559548
@@ -586,7 +575,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586575 }
587576 sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
588577 if (sockets [node ] < 0 ) {
589- MtmOnNodeDisconnect (node + 1 );
590578 result = false;
591579 break ;
592580 }
@@ -716,16 +704,18 @@ static void MtmSender(Datum arg)
716704{
717705 int nNodes = MtmMaxNodes ;
718706 int i ;
707+ MtmBuffer * txBuffer ;
719708
720709 MtmBackgroundWorker = true;
721710
722- MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
711+ txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
723712 MTM_ELOG (LOG , "Start arbiter sender %d" , MyProcPid );
724713 InitializeTimeouts ();
725714
726715 pqsignal (SIGINT , SetStop );
727716 pqsignal (SIGQUIT , SetStop );
728717 pqsignal (SIGTERM , SetStop );
718+ pqsignal (SIGHUP , PostgresSigHupHandler );
729719
730720 /* We're now ready to receive signals */
731721 BackgroundWorkerUnblockSignals ();
@@ -744,6 +734,12 @@ static void MtmSender(Datum arg)
744734 PGSemaphoreLock (& Mtm -> sendSemaphore );
745735 CHECK_FOR_INTERRUPTS ();
746736
737+ if (ConfigReloadPending )
738+ {
739+ ConfigReloadPending = false;
740+ ProcessConfigFile (PGC_SIGHUP );
741+ }
742+
747743 MtmCheckHeartbeat ();
748744 /*
749745 * Use shared lock to improve locality,
@@ -805,6 +801,7 @@ static void MtmMonitor(Datum arg)
805801 pqsignal (SIGINT , SetStop );
806802 pqsignal (SIGQUIT , SetStop );
807803 pqsignal (SIGTERM , SetStop );
804+ pqsignal (SIGHUP , PostgresSigHupHandler );
808805
809806 MtmBackgroundWorker = true;
810807
@@ -819,6 +816,13 @@ static void MtmMonitor(Datum arg)
819816 if (rc & WL_POSTMASTER_DEATH ) {
820817 break ;
821818 }
819+
820+ if (ConfigReloadPending )
821+ {
822+ ConfigReloadPending = false;
823+ ProcessConfigFile (PGC_SIGHUP );
824+ }
825+
822826 MtmRefreshClusterStatus ();
823827 }
824828}
@@ -844,6 +848,7 @@ static void MtmReceiver(Datum arg)
844848 pqsignal (SIGINT , SetStop );
845849 pqsignal (SIGQUIT , SetStop );
846850 pqsignal (SIGTERM , SetStop );
851+ pqsignal (SIGHUP , PostgresSigHupHandler );
847852
848853 MtmBackgroundWorker = true;
849854
@@ -879,7 +884,14 @@ static void MtmReceiver(Datum arg)
879884 for (j = 0 ; j < n ; j ++ ) {
880885 if (events [j ].events & EPOLLIN )
881886#else
882- fd_set events ;
887+ fd_set events ;
888+
889+ if (ConfigReloadPending )
890+ {
891+ ConfigReloadPending = false;
892+ ProcessConfigFile (PGC_SIGHUP );
893+ }
894+
883895 do {
884896 struct timeval tv ;
885897 events = inset ;
@@ -1006,7 +1018,7 @@ static void MtmReceiver(Datum arg)
10061018 default :
10071019 break ;
10081020 }
1009- if (BIT_CHECK (msg -> disabledNodeMask , node - 1 )) {
1021+ if (BIT_CHECK (msg -> disabledNodeMask , node - 1 ) || BIT_CHECK ( Mtm -> disabledNodeMask , node - 1 ) ) {
10101022 MTM_ELOG (WARNING , "Ignore message from dead node %d\n" , node );
10111023 continue ;
10121024 }
@@ -1084,7 +1096,7 @@ static void MtmReceiver(Datum arg)
10841096 if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
10851097 MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d" , ts -> gid , (long64 )ts -> xid , node );
10861098 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1087- ts -> aborted_by_node = node ;
1099+ ts -> abortedByNode = node ;
10881100 MtmAbortTransaction (ts );
10891101 }
10901102 if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
@@ -1161,4 +1173,3 @@ static void MtmReceiver(Datum arg)
11611173 }
11621174 proc_exit (1 ); /* force restart of this bgwroker */
11631175}
1164-
0 commit comments