From 776e1c8a5d1494e345e5e1b16a5eba5e98aaddca Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 11 Oct 2022 10:37:52 +0530 Subject: [PATCH] Add a common function to generate the origin name. Make a common replication origin name formatting function to replace multiple snprintf() expressions. This also includes logic previously done by ReplicationOriginNameForTablesync(). This makes the code to generate the origin name consistent among apply worker and tablesync worker. Author: Peter Smith Reviewed-By: Aleksander Alekseev Discussion: https://postgr.es/m/CAHut%2BPsa8hhfSE6ozUK-ih7GkQziAVAf4f3bqiXEj2nQiu-43g%40mail.gmail.com --- src/backend/commands/subscriptioncmds.c | 15 +++++---- src/backend/replication/logical/tablesync.c | 36 +++++++-------------- src/backend/replication/logical/worker.c | 35 +++++++++++++++++--- src/include/replication/worker_internal.h | 4 +-- 4 files changed, 52 insertions(+), 38 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f3bfcca434c..97594cd9b18 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); /* @@ -946,8 +946,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * origin and by this time the origin might be already * removed. For these reasons, passing missing_ok = true. */ - ReplicationOriginNameForTablesync(sub->oid, relid, originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, char originname[NAMEDATALEN]; XLogRecPtr remote_lsn; - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, false); remote_lsn = replorigin_get_progress(originid, false); @@ -1521,8 +1522,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * worker so passing missing_ok = true. This can happen for the states * before SUBREL_STATE_FINISHEDCOPY. */ - ReplicationOriginNameForTablesync(subid, relid, originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(subid, relid, originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); } @@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) RemoveSubscriptionRel(subid, InvalidOid); /* Remove the origin tracking if exists. */ - snprintf(originname, sizeof(originname), "pg_%u", subid); + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b4a7b4b7f6e..94e813ac53c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -353,10 +353,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ StartTransactionCommand(); - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); /* * Resetting the origin session removes the ownership of the slot. @@ -505,10 +505,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. */ - ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, - rstate->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, + rstate->relid, + originname, + sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* @@ -1193,18 +1193,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, relid, GetSystemIdentifier()); } -/* - * Form the origin name for tablesync. - * - * Return the name in the supplied buffer. - */ -void -ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname) -{ - snprintf(originname, szorgname, "pg_%u_%u", suboid, relid); -} - /* * Start syncing the table in the sync worker. * @@ -1274,10 +1262,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); /* Assign the origin tracking record name. */ - ReplicationOriginNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) { diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 207a5805ba7..5250ae7f54c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -364,6 +364,30 @@ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +/* + * Form the origin name for the subscription. + * + * This is a common function for tablesync and other workers. Tablesync workers + * must pass a valid relid. Other callers must pass relid = InvalidOid. + * + * Return the name in the supplied buffer. + */ +void +ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname) +{ + if (OidIsValid(relid)) + { + /* Replication origin name for tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid); + } + else + { + /* Replication origin name for non-tablesync workers. */ + snprintf(originname, szoriginname, "pg_%u", suboid); + } +} + /* * Should this worker apply changes for given relation. * @@ -3679,10 +3703,10 @@ ApplyWorkerMain(Datum main_arg) * Allocate the origin name in long-lived context for error context * message. */ - ReplicationOriginNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, originname); } @@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg) /* Setup replication origin tracking. */ StartTransactionCommand(); - snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); originid = replorigin_by_name(originname, true); if (!OidIsValid(originid)) originid = replorigin_create(originname); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f82bc518c32..2b7114ff6d9 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); extern int logicalrep_sync_worker_count(Oid subid); -extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, - char *originname, Size szorgname); +extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, + char *originname, Size szoriginname); extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern bool AllTablesyncsReady(void); -- 2.30.2