* synchronization has finished.
*
* The stream position synchronization works in multiple steps.
- * - Sync finishes copy and sets table state as SYNCWAIT and waits
- * for state to change in a loop.
+ * - Sync finishes copy and sets worker state as SYNCWAIT and waits for
+ * state to change in a loop.
* - Apply periodically checks tables that are synchronizing for SYNCWAIT.
- * When the desired state appears it will compare its position in the
- * stream with the SYNCWAIT position and based on that changes the
- * state to based on following rules:
- * - if the apply is in front of the sync in the WAL stream the new
- * state is set to CATCHUP and apply loops until the sync process
- * catches up to the same LSN as apply
- * - if the sync is in front of the apply in the WAL stream the new
- * state is set to SYNCDONE
- * - if both apply and sync are at the same position in the WAL stream
- * the state of the table is set to READY
- * - If the state was set to CATCHUP sync will read the stream and
- * apply changes until it catches up to the specified stream
- * position and then sets state to READY and signals apply that it
- * can stop waiting and exits, if the state was set to something
- * else than CATCHUP the sync process will simply end.
- * - If the state was set to SYNCDONE by apply, the apply will
- * continue tracking the table until it reaches the SYNCDONE stream
- * position at which point it sets state to READY and stops tracking.
+ * When the desired state appears, it will set the worker state to
+ * CATCHUP and starts loop-waiting until either the table state is set
+ * to SYNCDONE or the sync worker exits.
+ * - After the sync worker has seen the state change to CATCHUP, it will
+ * read the stream and apply changes (acting like an apply worker) until
+ * it catches up to the specified stream position. Then it sets the
+ * state to SYNCDONE. There might be zero changes applied between
+ * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
+ * apply worker.
+ * - Once the state was set to SYNCDONE, the apply will continue tracking
+ * the table until it reaches the SYNCDONE stream position, at which
+ * point it sets state to READY and stops tracking. Again, there might
+ * be zero changes in between.
+ *
+ * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
+ * SYNCDONE -> READY.
*
* The catalog pg_subscription_rel is used to keep information about
- * subscribed tables and their state and some transient state during
- * data synchronization is kept in shared memory.
+ * subscribed tables and their state. Some transient state during data
+ * synchronization is kept in shared memory. The states SYNCWAIT and
+ * CATCHUP only appear in memory.
*
* Example flows look like this:
* - Apply is in front:
* sync:8
- * -> set SYNCWAIT
+ * -> set in memory SYNCWAIT
* apply:10
- * -> set CATCHUP
+ * -> set in memory CATCHUP
* -> enter wait-loop
* sync:10
- * -> set READY
+ * -> set in catalog SYNCDONE
* -> exit
* apply:10
* -> exit wait-loop
* -> continue rep
+ * apply:11
+ * -> set in catalog READY
* - Sync in front:
* sync:10
- * -> set SYNCWAIT
+ * -> set in memory SYNCWAIT
* apply:8
- * -> set SYNCDONE
+ * -> set in memory CATCHUP
* -> continue per-table filtering
* sync:10
+ * -> set in catalog SYNCDONE
* -> exit
* apply:10
- * -> set READY
+ * -> set in catalog READY
* -> stop per-table filtering
* -> continue rep
*-------------------------------------------------------------------------
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
+#include "utils/snapmgr.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
/* And flush all writes. */
XLogFlush(GetXLogWriteRecPtr());
- /* Find the main apply worker and signal it. */
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
-
StartTransactionCommand();
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
+ /* Find the main apply worker and signal it. */
+ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+
/* Stop gracefully */
proc_exit(0);
}
/*
- * Wait until the table synchronization change.
+ * Wait until the relation synchronization state is set in catalog to the
+ * expected one.
*
- * If called from apply worker, it will wait for the synchronization worker to
- * change table state in shmem. If called from synchronization worker, it
- * will wait for apply worker to change table state in shmem.
+ * Used when transitioning from CATCHUP state to SYNCDONE.
*
- * Returns false if the opposite worker has disappeared or the table state has
- * been reset.
+ * Returns false if the synchronization worker has disappeared or the table state
+ * has been reset.
*/
static bool
-wait_for_sync_status_change(Oid relid, char origstate)
+wait_for_relation_state_change(Oid relid, char expected_state)
{
int rc;
- char state = origstate;
+ char state;
for (;;)
{
LogicalRepWorker *worker;
+ XLogRecPtr statelsn;
CHECK_FOR_INTERRUPTS();
+ /* XXX use cache invalidation here to improve performance? */
+ PushActiveSnapshot(GetLatestSnapshot());
+ state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
+ relid, &statelsn, true);
+ PopActiveSnapshot();
+
+ if (state == SUBREL_STATE_UNKNOWN)
+ return false;
+
+ if (state == expected_state)
+ return true;
+
+ /* Check if the sync worker is still running and bail if not. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
/* Check if the opposite worker is still running and bail if not. */
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
am_tablesync_worker() ? InvalidOid : relid,
false);
+ LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- {
- LWLockRelease(LogicalRepWorkerLock);
return false;
- }
- /*
- * If I'm the synchronization worker, look at my own state. Otherwise
- * look at the state of the synchronization worker we found above.
- */
- if (am_tablesync_worker())
- worker = MyLogicalRepWorker;
+ rc = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
- Assert(worker->relid == relid);
- state = worker->relstate;
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
- LWLockRelease(LogicalRepWorkerLock);
+ ResetLatch(&MyProc->procLatch);
+ }
- if (state == SUBREL_STATE_UNKNOWN)
+ return false;
+}
+
+/*
+ * Wait until the the apply worker changes the state of our synchronization
+ * worker to the expected one.
+ *
+ * Used when transitioning from SYNCWAIT state to CATCHUP.
+ *
+ * Returns false if the apply worker has disappeared or table state has been
+ * reset.
+ */
+static bool
+wait_for_worker_state_change(char expected_state)
+{
+ int rc;
+
+ for (;;)
+ {
+ LogicalRepWorker *worker;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Bail if he apply has died. */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+ InvalidOid, false);
+ LWLockRelease(LogicalRepWorkerLock);
+ if (!worker)
return false;
- if (state != origstate)
+ if (MyLogicalRepWorker->relstate == expected_state)
return true;
rc = WaitLatch(&MyProc->procLatch,
* Handle table synchronization cooperation from the synchronization
* worker.
*
- * If the sync worker is in catch up mode and reached the predetermined
- * synchronization point in the WAL stream, mark the table as READY and
- * finish. If it caught up too far, set to SYNCDONE and finish. Things will
- * then proceed in the "sync in front" scenario.
+ * If the sync worker is in CATCHUP state and reached (or passed) the
+ * predetermined synchronization point in the WAL stream, mark the table as
+ * SYNCDONE and finish.
*/
static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{
TimeLineID tli;
- MyLogicalRepWorker->relstate =
- (current_lsn == MyLogicalRepWorker->relstate_lsn)
- ? SUBREL_STATE_READY
- : SUBREL_STATE_SYNCDONE;
+ MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
* at least wal_retrieve_retry_interval.
*
* For tables that are being synchronized already, check if sync workers
- * either need action from the apply worker or have finished.
- *
- * The usual scenario is that the apply got ahead of the sync while the sync
- * ran, and then the action needed by apply is to mark a table for CATCHUP and
- * wait for the catchup to happen. In the less common case that sync worker
- * got in front of the apply worker, the table is marked as SYNCDONE but not
- * ready yet, as it needs to be tracked until apply reaches the same position
- * to which it was synced.
+ * either need action from the apply worker or have finished. This is the
+ * SYNCWAIT to CATCHUP transition.
*
- * If the synchronization position is reached, then the table can be marked as
- * READY and is no longer tracked.
+ * If the synchronization position is reached (SYNCDONE), then the table can
+ * be marked as READY and is no longer tracked.
*/
static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
last_start_times = NULL;
}
- /* Process all tables that are being synchronized. */
+ /*
+ * Process all tables that are being synchronized.
+ */
foreach(lc, table_states)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT)
{
/*
- * There are three possible synchronization situations here.
- *
- * a) Apply is in front of the table sync: We tell the table
- * sync to CATCHUP.
- *
- * b) Apply is behind the table sync: We tell the table sync
- * to mark the table as SYNCDONE and finish.
- *
- * c) Apply and table sync are at the same position: We tell
- * table sync to mark the table as READY and finish.
- *
- * In any case we'll need to wait for table sync to change the
- * state in catalog and only then continue ourselves.
+ * Tell sync worker it can catchup now. We'll wait for it so
+ * it does not get lost.
*/
- if (current_lsn > rstate->lsn)
- {
- rstate->state = SUBREL_STATE_CATCHUP;
- rstate->lsn = current_lsn;
- }
- else if (current_lsn == rstate->lsn)
- {
- rstate->state = SUBREL_STATE_READY;
- rstate->lsn = current_lsn;
- }
- else
- rstate->state = SUBREL_STATE_SYNCDONE;
-
SpinLockAcquire(&syncworker->relmutex);
- syncworker->relstate = rstate->state;
- syncworker->relstate_lsn = rstate->lsn;
+ syncworker->relstate = SUBREL_STATE_CATCHUP;
+ syncworker->relstate_lsn =
+ Max(syncworker->relstate_lsn, current_lsn);
SpinLockRelease(&syncworker->relmutex);
/* Signal the sync worker, as it may be waiting for us. */
logicalrep_worker_wakeup_ptr(syncworker);
/*
- * Enter busy loop and wait for synchronization status change.
+ * Enter busy loop and wait for synchronization worker to
+ * reach expected state (or die trying).
*/
- wait_for_sync_status_change(rstate->relid, rstate->state);
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+ wait_for_relation_state_change(rstate->relid,
+ SUBREL_STATE_SYNCDONE);
}
/*
MyLogicalRepWorker->relstate_lsn = *origin_startpos;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
+ /* Wait for main apply worker to tell us to catchup. */
+ wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
+
/*
- * Wait for main apply worker to either tell us to catchup or
- * that we are done.
+ * There are now two possible states here:
+ * a) Sync is behind the apply. If that's the case we need to
+ * catch up with it by consuming the logical replication
+ * stream up to the relstate_lsn. For that, we exit this
+ * function and continue in ApplyWorkerMain().
+ * b) Sync is caught up with the apply. So it can just set
+ * the state to SYNCDONE and finish.
*/
- wait_for_sync_status_change(MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate);
- if (MyLogicalRepWorker->relstate != SUBREL_STATE_CATCHUP)
+ if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
{
- /* Update the new state. */
+ /*
+ * Update the new state in catalog. No need to bother
+ * with the shmem state as we are exiting for good.
+ */
SetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;