7676
7777
7878#include "multimaster.h"
79- #include "state.h"
8079
8180#define MAX_ROUTES 16
8281#define INIT_BUFFER_SIZE 1024
@@ -190,6 +189,7 @@ static void MtmDisconnect(int node)
190189 MtmUnregisterSocket (sockets [node ]);
191190 pg_closesocket (sockets [node ], MtmUseRDMA );
192191 sockets [node ] = -1 ;
192+ MtmOnNodeDisconnect (node + 1 );
193193}
194194
195195static int MtmWaitSocket (int sd , bool forWrite , timestamp_t timeoutMsec )
@@ -316,22 +316,25 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316316 } else {
317317 BIT_CLEAR (Mtm -> currentLockNodeMask , resp -> node - 1 );
318318 }
319-
320- // if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321- // {
322- // MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323- // }
324-
325- if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) &&
326- sockets [resp -> node - 1 ] < 0 )
327- {
328- /* We've received heartbeat from disabled node.
319+ if (
320+ ( BIT_CHECK (resp -> disabledNodeMask , MtmNodeId - 1 ) || Mtm -> status == MTM_IN_MINORITY )
321+ && !BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 )
322+ && Mtm -> status != MTM_RECOVERY
323+ && Mtm -> status != MTM_RECOVERED
324+ && Mtm -> nodes [MtmNodeId - 1 ].lastStatusChangeTime + MSEC_TO_USEC (MtmNodeDisableDelay ) < MtmGetSystemTime ())
325+ {
326+ MTM_ELOG (WARNING , "Node %d thinks that I'm dead, while I'm %s (message %s)" , resp -> node , MtmNodeStatusMnem [Mtm -> status ], MtmMessageKindMnem [resp -> code ]);
327+ BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
328+ Mtm -> nConfigChanges += 1 ;
329+ MtmSwitchClusterMode (MTM_RECOVERY );
330+ } else if (BIT_CHECK (Mtm -> disabledNodeMask , resp -> node - 1 ) && sockets [resp -> node - 1 ] < 0 ) {
331+ /* We receive heartbeat from disabled node.
329332 * Looks like it is restarted.
330333 * Try to reconnect to it.
331334 */
332335 MTM_ELOG (WARNING , "Receive heartbeat from disabled node %d" , resp -> node );
333336 BIT_SET (Mtm -> reconnectMask , resp -> node - 1 );
334- }
337+ }
335338}
336339
337340static void MtmScheduleHeartbeat ()
@@ -540,9 +543,17 @@ static void MtmOpenConnections()
540543 for (i = 0 ; i < nNodes ; i ++ ) {
541544 if (i + 1 != MtmNodeId && i < Mtm -> nAllNodes ) {
542545 sockets [i ] = MtmConnectSocket (i , Mtm -> nodes [i ].con .arbiterPort );
546+ if (sockets [i ] < 0 ) {
547+ MtmOnNodeDisconnect (i + 1 );
548+ }
543549 }
544550 }
545- MtmStateProcessEvent (MTM_ARBITER_RECEIVER_START );
551+ if (Mtm -> nLiveNodes < Mtm -> nAllNodes /2 + 1 ) { /* no quorum */
552+ MTM_ELOG (WARNING , "Node is out of quorum: only %d nodes of %d are accessible" , Mtm -> nLiveNodes , Mtm -> nAllNodes );
553+ MtmSwitchClusterMode (MTM_IN_MINORITY );
554+ } else if (Mtm -> status == MTM_INITIALIZATION ) {
555+ MtmSwitchClusterMode (MTM_CONNECTED );
556+ }
546557}
547558
548559
@@ -575,6 +586,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
575586 }
576587 sockets [node ] = MtmConnectSocket (node , Mtm -> nodes [node ].con .arbiterPort );
577588 if (sockets [node ] < 0 ) {
589+ MtmOnNodeDisconnect (node + 1 );
578590 result = false;
579591 break ;
580592 }
@@ -704,18 +716,16 @@ static void MtmSender(Datum arg)
704716{
705717 int nNodes = MtmMaxNodes ;
706718 int i ;
707- MtmBuffer * txBuffer ;
708719
709720 MtmBackgroundWorker = true;
710721
711- txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
722+ MtmBuffer * txBuffer = (MtmBuffer * )palloc0 (sizeof (MtmBuffer )* nNodes );
712723 MTM_ELOG (LOG , "Start arbiter sender %d" , MyProcPid );
713724 InitializeTimeouts ();
714725
715726 pqsignal (SIGINT , SetStop );
716727 pqsignal (SIGQUIT , SetStop );
717728 pqsignal (SIGTERM , SetStop );
718- pqsignal (SIGHUP , PostgresSigHupHandler );
719729
720730 /* We're now ready to receive signals */
721731 BackgroundWorkerUnblockSignals ();
@@ -734,12 +744,6 @@ static void MtmSender(Datum arg)
734744 PGSemaphoreLock (& Mtm -> sendSemaphore );
735745 CHECK_FOR_INTERRUPTS ();
736746
737- if (ConfigReloadPending )
738- {
739- ConfigReloadPending = false;
740- ProcessConfigFile (PGC_SIGHUP );
741- }
742-
743747 MtmCheckHeartbeat ();
744748 /*
745749 * Use shared lock to improve locality,
@@ -801,7 +805,6 @@ static void MtmMonitor(Datum arg)
801805 pqsignal (SIGINT , SetStop );
802806 pqsignal (SIGQUIT , SetStop );
803807 pqsignal (SIGTERM , SetStop );
804- pqsignal (SIGHUP , PostgresSigHupHandler );
805808
806809 MtmBackgroundWorker = true;
807810
@@ -816,13 +819,6 @@ static void MtmMonitor(Datum arg)
816819 if (rc & WL_POSTMASTER_DEATH ) {
817820 break ;
818821 }
819-
820- if (ConfigReloadPending )
821- {
822- ConfigReloadPending = false;
823- ProcessConfigFile (PGC_SIGHUP );
824- }
825-
826822 MtmRefreshClusterStatus ();
827823 }
828824}
@@ -848,7 +844,6 @@ static void MtmReceiver(Datum arg)
848844 pqsignal (SIGINT , SetStop );
849845 pqsignal (SIGQUIT , SetStop );
850846 pqsignal (SIGTERM , SetStop );
851- pqsignal (SIGHUP , PostgresSigHupHandler );
852847
853848 MtmBackgroundWorker = true;
854849
@@ -884,14 +879,7 @@ static void MtmReceiver(Datum arg)
884879 for (j = 0 ; j < n ; j ++ ) {
885880 if (events [j ].events & EPOLLIN )
886881#else
887- fd_set events ;
888-
889- if (ConfigReloadPending )
890- {
891- ConfigReloadPending = false;
892- ProcessConfigFile (PGC_SIGHUP );
893- }
894-
882+ fd_set events ;
895883 do {
896884 struct timeval tv ;
897885 events = inset ;
@@ -1018,7 +1006,7 @@ static void MtmReceiver(Datum arg)
10181006 default :
10191007 break ;
10201008 }
1021- if (BIT_CHECK (msg -> disabledNodeMask , node - 1 ) || BIT_CHECK ( Mtm -> disabledNodeMask , node - 1 ) ) {
1009+ if (BIT_CHECK (msg -> disabledNodeMask , node - 1 )) {
10221010 MTM_ELOG (WARNING , "Ignore message from dead node %d\n" , node );
10231011 continue ;
10241012 }
@@ -1096,7 +1084,7 @@ static void MtmReceiver(Datum arg)
10961084 if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
10971085 MTM_LOG1 ("Arbiter receive abort message for transaction %s (%llu) from node %d" , ts -> gid , (long64 )ts -> xid , node );
10981086 Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
1099- ts -> abortedByNode = node ;
1087+ ts -> aborted_by_node = node ;
11001088 MtmAbortTransaction (ts );
11011089 }
11021090 if ((ts -> participantsMask & ~Mtm -> disabledNodeMask & ~ts -> votedMask ) == 0 ) {
@@ -1173,3 +1161,4 @@ static void MtmReceiver(Datum arg)
11731161 }
11741162 proc_exit (1 ); /* force restart of this bgwroker */
11751163}
1164+
0 commit comments