/*-------------------------------------------------------------------------
* tablesync.c
- * PostgreSQL logical replication
+ * PostgreSQL logical replication: initial table data synchronization
*
* Copyright (c) 2012-2020, PostgreSQL Global Development Group
*
* - It allows us to synchronize any tables added after the initial
* synchronization has finished.
*
- * The stream position synchronization works in multiple steps.
- * - Sync finishes copy and sets worker state as SYNCWAIT and waits for
- * state to change in a loop.
- * - Apply periodically checks tables that are synchronizing for SYNCWAIT.
- * When the desired state appears, it will set the worker state to
- * CATCHUP and starts loop-waiting until either the table state is set
- * to SYNCDONE or the sync worker exits.
+ * The stream position synchronization works in multiple steps:
+ * - Apply worker requests a tablesync worker to start, setting the new
+ * table state to INIT.
+ * - Tablesync worker starts; changes table state from INIT to DATASYNC while
+ * copying.
+ * - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
+ * waits for state change.
+ * - Apply worker periodically checks for tables in SYNCWAIT state. When
+ * any appear, it sets the table state to CATCHUP and starts loop-waiting
+ * until either the table state is set to SYNCDONE or the sync worker
+ * exits.
* - After the sync worker has seen the state change to CATCHUP, it will
* read the stream and apply changes (acting like an apply worker) until
* it catches up to the specified stream position. Then it sets the
* state to SYNCDONE. There might be zero changes applied between
* CATCHUP and SYNCDONE, because the sync worker might be ahead of the
* apply worker.
- * - Once the state was set to SYNCDONE, the apply will continue tracking
+ * - Once the state is set to SYNCDONE, the apply will continue tracking
* the table until it reaches the SYNCDONE stream position, at which
* point it sets state to READY and stops tracking. Again, there might
* be zero changes in between.
*
- * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
- * SYNCDONE -> READY.
+ * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
+ * CATCHUP -> SYNCDONE -> READY.
*
* The catalog pg_subscription_rel is used to keep information about
* subscribed tables and their state. Some transient state during data
* -> continue rep
* apply:11
* -> set in catalog READY
- * - Sync in front:
+ *
+ * - Sync is in front:
* sync:10
* -> set in memory SYNCWAIT
* apply:8
}
/*
- * Wait until the relation synchronization state is set in the catalog to the
- * expected one.
+ * Wait until the relation sync state is set in the catalog to the expected
+ * one; return true when it happens.
*
- * Used when transitioning from CATCHUP state to SYNCDONE.
+ * Returns false if the table sync worker or the table itself have
+ * disappeared, or the table state has been reset.
*
- * Returns false if the synchronization worker has disappeared or the table state
- * has been reset.
+ * Currently, this is used in the apply worker when transitioning from
+ * CATCHUP state to SYNCDONE.
*/
static bool
wait_for_relation_state_change(Oid relid, char expected_state)
CHECK_FOR_INTERRUPTS();
- /* XXX use cache invalidation here to improve performance? */
- PushActiveSnapshot(GetLatestSnapshot());
+ InvalidateCatalogSnapshot();
state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
- relid, &statelsn, true);
- PopActiveSnapshot();
+ relid, &statelsn);
if (state == SUBREL_STATE_UNKNOWN)
- return false;
+ break;
if (state == expected_state)
return true;
/* Check if the sync worker is still running and bail if not. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- /* Check if the opposite worker is still running and bail if not. */
- worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- am_tablesync_worker() ? InvalidOid : relid,
+ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
false);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- return false;
+ break;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
/*
* Start syncing the table in the sync worker.
*
+ * If nothing needs to be done to sync the table, we exit the worker without
+ * any further action.
+ *
* The returned slot name is palloc'ed in current memory context.
*/
char *
char *err;
char relstate;
XLogRecPtr relstate_lsn;
+ Relation rel;
+ WalRcvExecResult *res;
/* Check the state of the table synchronization. */
StartTransactionCommand();
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
- &relstate_lsn, true);
+ &relstate_lsn);
CommitTransactionCommand();
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate_lsn = relstate_lsn;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
+ /*
+ * If synchronization is already done or no longer necessary, exit now
+ * that we've updated shared memory state.
+ */
+ switch (relstate)
+ {
+ case SUBREL_STATE_SYNCDONE:
+ case SUBREL_STATE_READY:
+ case SUBREL_STATE_UNKNOWN:
+ finish_sync_worker(); /* doesn't return */
+ }
+
/*
* To build a slot name for the sync work, we are limited to NAMEDATALEN -
* 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
- switch (MyLogicalRepWorker->relstate)
- {
- case SUBREL_STATE_INIT:
- case SUBREL_STATE_DATASYNC:
- {
- Relation rel;
- WalRcvExecResult *res;
+ Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
+ MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
- SpinLockAcquire(&MyLogicalRepWorker->relmutex);
- MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
- MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
- SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
- /* Update the state and make it visible to others. */
- StartTransactionCommand();
- UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
- CommitTransactionCommand();
- pgstat_report_stat(false);
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
+ MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
- /*
- * We want to do the table data sync in a single transaction.
- */
- StartTransactionCommand();
+ /* Update the state and make it visible to others. */
+ StartTransactionCommand();
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
- /*
- * Use a standard write lock here. It might be better to
- * disallow access to the table while it's being synchronized.
- * But we don't want to block the main apply process from
- * working and it has to open the relation in RowExclusiveLock
- * when remapping remote relation id to local one.
- */
- rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+ /*
+ * We want to do the table data sync in a single transaction.
+ */
+ StartTransactionCommand();
- /*
- * Create a temporary slot for the sync process. We do this
- * inside the transaction so that we can use the snapshot made
- * by the slot to get existing data.
- */
- res = walrcv_exec(wrconn,
- "BEGIN READ ONLY ISOLATION LEVEL "
- "REPEATABLE READ", 0, NULL);
- if (res->status != WALRCV_OK_COMMAND)
- ereport(ERROR,
- (errmsg("table copy could not start transaction on publisher"),
- errdetail("The error was: %s", res->err)));
- walrcv_clear_result(res);
+ /*
+ * Use a standard write lock here. It might be better to disallow access
+ * to the table while it's being synchronized. But we don't want to block
+ * the main apply process from working and it has to open the relation in
+ * RowExclusiveLock when remapping remote relation id to local one.
+ */
+ rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
- /*
- * Create new temporary logical decoding slot.
- *
- * We'll use slot for data copy so make sure the snapshot is
- * used for the transaction; that way the COPY will get data
- * that is consistent with the lsn used by the slot to start
- * decoding.
- */
- walrcv_create_slot(wrconn, slotname, true,
- CRS_USE_SNAPSHOT, origin_startpos);
+ /*
+ * Start a transaction in the remote node in REPEATABLE READ mode. This
+ * ensures that both the replication slot we create (see below) and the
+ * COPY are consistent with each other.
+ */
+ res = walrcv_exec(wrconn,
+ "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
+ 0, NULL);
+ if (res->status != WALRCV_OK_COMMAND)
+ ereport(ERROR,
+ (errmsg("table copy could not start transaction on publisher"),
+ errdetail("The error was: %s", res->err)));
+ walrcv_clear_result(res);
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
+ /*
+ * Create a new temporary logical decoding slot. This slot will be used
+ * for the catchup phase after COPY is done, so tell it to use the
+ * snapshot to make the final data consistent.
+ */
+ walrcv_create_slot(wrconn, slotname, true,
+ CRS_USE_SNAPSHOT, origin_startpos);
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
- if (res->status != WALRCV_OK_COMMAND)
- ereport(ERROR,
- (errmsg("table copy could not finish transaction on publisher"),
- errdetail("The error was: %s", res->err)));
- walrcv_clear_result(res);
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
- table_close(rel, NoLock);
+ res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ if (res->status != WALRCV_OK_COMMAND)
+ ereport(ERROR,
+ (errmsg("table copy could not finish transaction on publisher"),
+ errdetail("The error was: %s", res->err)));
+ walrcv_clear_result(res);
- /* Make the copy visible. */
- CommandCounterIncrement();
+ table_close(rel, NoLock);
- /*
- * We are done with the initial data synchronization, update
- * the state.
- */
- SpinLockAcquire(&MyLogicalRepWorker->relmutex);
- MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
- MyLogicalRepWorker->relstate_lsn = *origin_startpos;
- SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
- /* Wait for main apply worker to tell us to catchup. */
- wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
-
- /*----------
- * There are now two possible states here:
- * a) Sync is behind the apply. If that's the case we need to
- * catch up with it by consuming the logical replication
- * stream up to the relstate_lsn. For that, we exit this
- * function and continue in ApplyWorkerMain().
- * b) Sync is caught up with the apply. So it can just set
- * the state to SYNCDONE and finish.
- *----------
- */
- if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
- {
- /*
- * Update the new state in catalog. No need to bother
- * with the shmem state as we are exiting for good.
- */
- UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos);
- finish_sync_worker();
- }
- break;
- }
- case SUBREL_STATE_SYNCDONE:
- case SUBREL_STATE_READY:
- case SUBREL_STATE_UNKNOWN:
+ /* Make the copy visible. */
+ CommandCounterIncrement();
- /*
- * Nothing to do here but finish. (UNKNOWN means the relation was
- * removed from pg_subscription_rel before the sync worker could
- * start.)
- */
- finish_sync_worker();
- break;
- default:
- elog(ERROR, "unknown relation state \"%c\"",
- MyLogicalRepWorker->relstate);
- }
+ /*
+ * We are done with the initial data synchronization, update the state.
+ */
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
+ MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+ /*
+ * Finally, wait until the main apply worker tells us to catch up and then
+ * return to let LogicalRepApplyLoop do it.
+ */
+ wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;
}