@@ -212,8 +212,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
212212 *
213213 * Used when transitioning from SYNCWAIT state to CATCHUP.
214214 *
215- * Returns false if the apply worker has disappeared or the table state has been
216- * reset.
215+ * Returns false if the apply worker has disappeared.
217216 */
218217static bool
219218wait_for_worker_state_change (char expected_state )
@@ -226,17 +225,30 @@ wait_for_worker_state_change(char expected_state)
226225
227226 CHECK_FOR_INTERRUPTS ();
228227
229- /* Bail if the apply has died. */
228+ /*
229+ * Done if already in correct state. (We assume this fetch is atomic
230+ * enough to not give a misleading answer if we do it with no lock.)
231+ */
232+ if (MyLogicalRepWorker -> relstate == expected_state )
233+ return true;
234+
235+ /*
236+ * Bail out if the apply worker has died, else signal it we're
237+ * waiting.
238+ */
230239 LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
231240 worker = logicalrep_worker_find (MyLogicalRepWorker -> subid ,
232241 InvalidOid , false);
242+ if (worker && worker -> proc )
243+ logicalrep_worker_wakeup_ptr (worker );
233244 LWLockRelease (LogicalRepWorkerLock );
234245 if (!worker )
235- return false;
236-
237- if (MyLogicalRepWorker -> relstate == expected_state )
238- return true;
246+ break ;
239247
248+ /*
249+ * Wait. We expect to get a latch signal back from the apply worker,
250+ * but use a timeout in case it dies without sending one.
251+ */
240252 rc = WaitLatch (MyLatch ,
241253 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
242254 1000L , WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE );
@@ -245,7 +257,8 @@ wait_for_worker_state_change(char expected_state)
245257 if (rc & WL_POSTMASTER_DEATH )
246258 proc_exit (1 );
247259
248- ResetLatch (MyLatch );
260+ if (rc & WL_LATCH_SET )
261+ ResetLatch (MyLatch );
249262 }
250263
251264 return false;
@@ -422,83 +435,96 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
422435 else
423436 {
424437 LogicalRepWorker * syncworker ;
425- int nsyncworkers = 0 ;
426438
439+ /*
440+ * Look for a sync worker for this relation.
441+ */
427442 LWLockAcquire (LogicalRepWorkerLock , LW_SHARED );
443+
428444 syncworker = logicalrep_worker_find (MyLogicalRepWorker -> subid ,
429445 rstate -> relid , false);
446+
430447 if (syncworker )
431448 {
449+ /* Found one, update our copy of its state */
432450 SpinLockAcquire (& syncworker -> relmutex );
433451 rstate -> state = syncworker -> relstate ;
434452 rstate -> lsn = syncworker -> relstate_lsn ;
453+ if (rstate -> state == SUBREL_STATE_SYNCWAIT )
454+ {
455+ /*
456+ * Sync worker is waiting for apply. Tell sync worker it
457+ * can catchup now.
458+ */
459+ syncworker -> relstate = SUBREL_STATE_CATCHUP ;
460+ syncworker -> relstate_lsn =
461+ Max (syncworker -> relstate_lsn , current_lsn );
462+ }
435463 SpinLockRelease (& syncworker -> relmutex );
464+
465+ /* If we told worker to catch up, wait for it. */
466+ if (rstate -> state == SUBREL_STATE_SYNCWAIT )
467+ {
468+ /* Signal the sync worker, as it may be waiting for us. */
469+ if (syncworker -> proc )
470+ logicalrep_worker_wakeup_ptr (syncworker );
471+
472+ /* Now safe to release the LWLock */
473+ LWLockRelease (LogicalRepWorkerLock );
474+
475+ /*
476+ * Enter busy loop and wait for synchronization worker to
477+ * reach expected state (or die trying).
478+ */
479+ if (!started_tx )
480+ {
481+ StartTransactionCommand ();
482+ started_tx = true;
483+ }
484+
485+ wait_for_relation_state_change (rstate -> relid ,
486+ SUBREL_STATE_SYNCDONE );
487+ }
488+ else
489+ LWLockRelease (LogicalRepWorkerLock );
436490 }
437491 else
438-
492+ {
439493 /*
440494 * If there is no sync worker for this table yet, count
441495 * running sync workers for this subscription, while we have
442- * the lock, for later .
496+ * the lock.
443497 */
444- nsyncworkers = logicalrep_sync_worker_count (MyLogicalRepWorker -> subid );
445- LWLockRelease (LogicalRepWorkerLock );
446-
447- /*
448- * There is a worker synchronizing the relation and waiting for
449- * apply to do something.
450- */
451- if (syncworker && rstate -> state == SUBREL_STATE_SYNCWAIT )
452- {
453- /*
454- * Tell sync worker it can catchup now. We'll wait for it so
455- * it does not get lost.
456- */
457- SpinLockAcquire (& syncworker -> relmutex );
458- syncworker -> relstate = SUBREL_STATE_CATCHUP ;
459- syncworker -> relstate_lsn =
460- Max (syncworker -> relstate_lsn , current_lsn );
461- SpinLockRelease (& syncworker -> relmutex );
498+ int nsyncworkers =
499+ logicalrep_sync_worker_count (MyLogicalRepWorker -> subid );
462500
463- /* Signal the sync worker, as it may be waiting for us. */
464- logicalrep_worker_wakeup_ptr ( syncworker );
501+ /* Now safe to release the LWLock */
502+ LWLockRelease ( LogicalRepWorkerLock );
465503
466504 /*
467- * Enter busy loop and wait for synchronization worker to
468- * reach expected state (or die trying) .
505+ * If there are free sync worker slot(s), start a new sync
506+ * worker for the table .
469507 */
470- if (!started_tx )
471- {
472- StartTransactionCommand ();
473- started_tx = true;
474- }
475- wait_for_relation_state_change (rstate -> relid ,
476- SUBREL_STATE_SYNCDONE );
477- }
478-
479- /*
480- * If there is no sync worker registered for the table and there
481- * is some free sync worker slot, start a new sync worker for the
482- * table.
483- */
484- else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription )
485- {
486- TimestampTz now = GetCurrentTimestamp ();
487- struct tablesync_start_time_mapping * hentry ;
488- bool found ;
489-
490- hentry = hash_search (last_start_times , & rstate -> relid , HASH_ENTER , & found );
491-
492- if (!found ||
493- TimestampDifferenceExceeds (hentry -> last_start_time , now ,
494- wal_retrieve_retry_interval ))
508+ if (nsyncworkers < max_sync_workers_per_subscription )
495509 {
496- logicalrep_worker_launch (MyLogicalRepWorker -> dbid ,
497- MySubscription -> oid ,
498- MySubscription -> name ,
499- MyLogicalRepWorker -> userid ,
500- rstate -> relid );
501- hentry -> last_start_time = now ;
510+ TimestampTz now = GetCurrentTimestamp ();
511+ struct tablesync_start_time_mapping * hentry ;
512+ bool found ;
513+
514+ hentry = hash_search (last_start_times , & rstate -> relid ,
515+ HASH_ENTER , & found );
516+
517+ if (!found ||
518+ TimestampDifferenceExceeds (hentry -> last_start_time , now ,
519+ wal_retrieve_retry_interval ))
520+ {
521+ logicalrep_worker_launch (MyLogicalRepWorker -> dbid ,
522+ MySubscription -> oid ,
523+ MySubscription -> name ,
524+ MyLogicalRepWorker -> userid ,
525+ rstate -> relid );
526+ hentry -> last_start_time = now ;
527+ }
502528 }
503529 }
504530 }
@@ -512,7 +538,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
512538}
513539
514540/*
515- * Process state possible change(s) of tables that are being synchronized.
541+ * Process possible state change(s) of tables that are being synchronized.
516542 */
517543void
518544process_syncing_tables (XLogRecPtr current_lsn )
0 commit comments