SELECT pg_drop_replication_slot('regression_slot');
ERROR: permission denied to use replication slots
DETAIL: Only roles with the REPLICATION attribute may use replication slots.
+SELECT pg_sync_replication_slots();
+ERROR: permission denied to use replication slots
+DETAIL: Only roles with the REPLICATION attribute may use replication slots.
RESET ROLE;
-- replication users can drop superuser created slots
SET ROLE regress_lr_superuser;
init
(1 row)
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
+ERROR: cannot enable failover for a temporary replication slot
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
?column?
----------
INSERT INTO lr_test VALUES('lr_superuser_init');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_sync_replication_slots();
RESET ROLE;
-- replication users can drop superuser created slots
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
SELECT slot_name, slot_type, failover FROM pg_replication_slots;
<varname>primary_conninfo</varname> string, or in a separate
<filename>~/.pgpass</filename> file on the standby server (use
<literal>replication</literal> as the database name).
- Do not specify a database name in the
- <varname>primary_conninfo</varname> string.
+ </para>
+ <para>
+ For replication slot synchronization (see
+ <xref linkend="logicaldecoding-replication-slots-synchronization"/>),
+ it is also necessary to specify a valid <literal>dbname</literal>
+ in the <varname>primary_conninfo</varname> string. This will only be
+ used for slot synchronization. It is ignored for streaming.
</para>
<para>
This parameter can only be set in the <filename>postgresql.conf</filename>
</row>
<row>
- <entry role="func_table_entry"><para role="func_signature">
+ <entry id="pg-create-logical-replication-slot" role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_create_logical_replication_slot</primary>
</indexterm>
record is flushed along with its transaction.
</para></entry>
</row>
+
+ <row>
+ <entry id="pg-sync-replication-slots" role="func_table_entry"><para role="func_signature">
+ <indexterm>
+ <primary>pg_sync_replication_slots</primary>
+ </indexterm>
+ <function>pg_sync_replication_slots</function> ()
+ <returnvalue>void</returnvalue>
+ </para>
+ <para>
+ Synchronize the logical failover replication slots from the primary
+ server to the standby server. This function can only be executed on the
+ standby server. Temporary synced slots, if any, cannot be used for
+ logical decoding and must be dropped after promotion. See
+ <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+ </para>
+
+ <caution>
+ <para>
+ If, after executing the function,
+ <link linkend="guc-hot-standby-feedback">
+ <varname>hot_standby_feedback</varname></link> is disabled on
+ the standby or the physical slot configured in
+ <link linkend="guc-primary-slot-name">
+ <varname>primary_slot_name</varname></link> is
+ removed, then it is possible that the necessary rows of the
+ synchronized slot will be removed by the VACUUM process on the primary
+ server, resulting in the synchronized slot becoming invalidated.
+ </para>
+ </caution>
+ </entry>
+ </row>
+
</tbody>
</tgroup>
</table>
So if a slot is no longer required it should be dropped.
</para>
</caution>
+
+ </sect2>
+
+ <sect2 id="logicaldecoding-replication-slots-synchronization">
+ <title>Replication Slot Synchronization</title>
+ <para>
+ The logical replication slots on the primary can be synchronized to
+ the hot standby by using the <literal>failover</literal> parameter of
+ <link linkend="pg-create-logical-replication-slot">
+ <function>pg_create_logical_replication_slot</function></link>, or by
+ using the <link linkend="sql-createsubscription-params-with-failover">
+ <literal>failover</literal></link> option of
+ <command>CREATE SUBSCRIPTION</command> during slot creation, and then calling
+ <link linkend="pg-sync-replication-slots">
+ <function>pg_sync_replication_slots</function></link>
+ on the standby. For the synchronization to work, it is mandatory to
+ have a physical replication slot between the primary and the standby aka
+ <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
+ should be configured on the standby, and
+ <link linkend="guc-hot-standby-feedback"><varname>hot_standby_feedback</varname></link>
+ must be enabled on the standby. It is also necessary to specify a valid
+ <literal>dbname</literal> in the
+ <link linkend="guc-primary-conninfo"><varname>primary_conninfo</varname></link>.
+ </para>
+
+ <para>
+ The ability to resume logical replication after failover depends upon the
+ <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
+ value for the synchronized slots on the standby at the time of failover.
+ Only persistent slots that have attained synced state as true on the standby
+ before failover can be used for logical replication after failover.
+ Temporary synced slots cannot be used for logical decoding, therefore
+ logical replication for those slots cannot be resumed. For example, if the
+ synchronized slot could not become persistent on the standby due to a
+ disabled subscription, then the subscription cannot be resumed after
+ failover even when it is enabled.
+ </para>
+
+ <para>
+ To resume logical replication after failover from the synced logical
+ slots, the subscription's 'conninfo' must be altered to point to the
+ new primary server. This is done using
+ <link linkend="sql-altersubscription-params-connection"><command>ALTER SUBSCRIPTION ... CONNECTION</command></link>.
+ It is recommended that subscriptions are first disabled before promoting
+ the standby and are re-enabled after altering the connection string.
+ </para>
+ <caution>
+ <para>
+ There is a chance that the old primary is up again during the promotion
+ and if subscriptions are not disabled, the logical subscribers may
+ continue to receive data from the old primary server even after promotion
+ until the connection string is altered. This might result in data
+ inconsistency issues, preventing the logical subscribers from being
+ able to continue replication from the new primary server.
+ </para>
+ </caution>
</sect2>
<sect2 id="logicaldecoding-explanation-output-plugins">
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
- If true, the slot is enabled to be synced to the standbys.
+ If true, the slot is enabled to be synced to the standbys
+ so that logical replication can be resumed after failover.
The default is false.
</para>
</listitem>
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
- If true, the slot is enabled to be synced to the standbys.
+ If true, the slot is enabled to be synced to the standbys
+ so that logical replication can be resumed after failover.
</para>
</listitem>
</varlistentry>
<structfield>failover</structfield> <type>bool</type>
</para>
<para>
- True if this is a logical slot enabled to be synced to the standbys.
- Always false for physical slots.
+ True if this is a logical slot enabled to be synced to the standbys
+ so that logical replication can be resumed from the new primary
+ after failover. Always false for physical slots.
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>synced</structfield> <type>bool</type>
+ </para>
+ <para>
+ True if this is a logical slot that was synced from a primary server.
+ On a hot standby, the slots with the synced column marked as true can
+ neither be used for logical decoding nor dropped manually. The value
+ of this column has no meaning on the primary server; the column value on
+ the primary is default false for all slots but may (if leftover from a
+ promoted standby) also be true.
+ </para></entry>
+ </row>
+
</tbody>
</tgroup>
</table>
L.safe_wal_size,
L.two_phase,
L.conflict_reason,
- L.failover
+ L.failover,
+ L.synced
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
proto.o \
relation.o \
reorderbuffer.o \
+ slotsync.o \
snapbuild.o \
tablesync.o \
worker.o
errmsg("replication slot \"%s\" was not created in this database",
NameStr(slot->data.name))));
+ /*
+ * Do not allow consumption of a "synchronized" slot until the standby
+ * gets promoted.
+ */
+ if (RecoveryInProgress() && slot->data.synced)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot use replication slot \"%s\" for logical decoding",
+ NameStr(slot->data.name)),
+ errdetail("This slot is being synchronized from the primary server."),
+ errhint("Specify another replication slot."));
+
/*
* Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
* "cannot get changes" wording in this errmsg because that'd be
'proto.c',
'relation.c',
'reorderbuffer.c',
+ 'slotsync.c',
'snapbuild.c',
'tablesync.c',
'worker.c',
--- /dev/null
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ * Functionality for synchronizing slots to a standby server from the
+ * primary server.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/slotsync.c
+ *
+ * This file contains the code for slot synchronization on a physical standby
+ * to fetch logical failover slots information from the primary server, create
+ * the slots on the standby and synchronize them. This is done by a call to SQL
+ * function pg_sync_replication_slots.
+ *
+ * If on physical standby, the WAL corresponding to the remote's restart_lsn
+ * is not available or the remote's catalog_xmin precedes the oldest xid for which
+ * it is guaranteed that rows wouldn't have been removed then we cannot create
+ * the local standby slot because that would mean moving the local slot
+ * backward and decoding won't be possible via such a slot. In this case, the
+ * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
+ * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
+ * which we can call pg_sync_replication_slots() periodically to perform
+ * syncs.
+ *
+ * Any standby synchronized slots will be dropped if they no longer need
+ * to be synchronized. See comment atop drop_local_obsolete_slots() for more
+ * details.
+ *---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_database.h"
+#include "commands/dbcommands.h"
+#include "replication/logical.h"
+#include "replication/slotsync.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+/* Struct for sharing information to control slot synchronization. */
+typedef struct SlotSyncCtxStruct
+{
+ /* prevents concurrent slot syncs to avoid slot overwrites */
+ bool syncing;
+ slock_t mutex;
+} SlotSyncCtxStruct;
+
+SlotSyncCtxStruct *SlotSyncCtx = NULL;
+
+/*
+ * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
+ * in SlotSyncCtxStruct, this flag is true only if the current process is
+ * performing slot synchronization.
+ */
+static bool syncing_slots = false;
+
+/*
+ * Structure to hold information fetched from the primary server about a logical
+ * replication slot.
+ */
+typedef struct RemoteSlot
+{
+ char *name;
+ char *plugin;
+ char *database;
+ bool two_phase;
+ bool failover;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_lsn;
+ TransactionId catalog_xmin;
+
+ /* RS_INVAL_NONE if valid, or the reason of invalidation */
+ ReplicationSlotInvalidationCause invalidated;
+} RemoteSlot;
+
+/*
+ * If necessary, update the local synced slot's metadata based on the data
+ * from the remote slot.
+ *
+ * If no update was needed (the data of the remote slot is the same as the
+ * local slot) return false, otherwise true.
+ */
+static bool
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+{
+ ReplicationSlot *slot = MyReplicationSlot;
+ bool xmin_changed;
+ bool restart_lsn_changed;
+ NameData plugin_name;
+
+ Assert(slot->data.invalidated == RS_INVAL_NONE);
+
+ xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
+ restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+
+ if (!xmin_changed &&
+ !restart_lsn_changed &&
+ remote_dbid == slot->data.database &&
+ remote_slot->two_phase == slot->data.two_phase &&
+ remote_slot->failover == slot->data.failover &&
+ remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
+ strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
+ return false;
+
+ /* Avoid expensive operations while holding a spinlock. */
+ namestrcpy(&plugin_name, remote_slot->plugin);
+
+ SpinLockAcquire(&slot->mutex);
+ slot->data.plugin = plugin_name;
+ slot->data.database = remote_dbid;
+ slot->data.two_phase = remote_slot->two_phase;
+ slot->data.failover = remote_slot->failover;
+ slot->data.restart_lsn = remote_slot->restart_lsn;
+ slot->data.confirmed_flush = remote_slot->confirmed_lsn;
+ slot->data.catalog_xmin = remote_slot->catalog_xmin;
+ slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+ SpinLockRelease(&slot->mutex);
+
+ if (xmin_changed)
+ ReplicationSlotsComputeRequiredXmin(false);
+
+ if (restart_lsn_changed)
+ ReplicationSlotsComputeRequiredLSN();
+
+ return true;
+}
+
+/*
+ * Get the list of local logical slots that are synchronized from the
+ * primary server.
+ */
+static List *
+get_local_synced_slots(void)
+{
+ List *local_slots = NIL;
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ /* Check if it is a synchronized slot */
+ if (s->in_use && s->data.synced)
+ {
+ Assert(SlotIsLogical(s));
+ local_slots = lappend(local_slots, s);
+ }
+ }
+
+ LWLockRelease(ReplicationSlotControlLock);
+
+ return local_slots;
+}
+
+/*
+ * Helper function to check if local_slot is required to be retained.
+ *
+ * Return false either if local_slot does not exist in the remote_slots list
+ * or is invalidated while the corresponding remote slot is still valid,
+ * otherwise true.
+ */
+static bool
+local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
+{
+ bool remote_exists = false;
+ bool locally_invalidated = false;
+
+ foreach_ptr(RemoteSlot, remote_slot, remote_slots)
+ {
+ if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
+ {
+ remote_exists = true;
+
+ /*
+ * If remote slot is not invalidated but local slot is marked as
+ * invalidated, then set locally_invalidated flag.
+ */
+ SpinLockAcquire(&local_slot->mutex);
+ locally_invalidated =
+ (remote_slot->invalidated == RS_INVAL_NONE) &&
+ (local_slot->data.invalidated != RS_INVAL_NONE);
+ SpinLockRelease(&local_slot->mutex);
+
+ break;
+ }
+ }
+
+ return (remote_exists && !locally_invalidated);
+}
+
+/*
+ * Drop local obsolete slots.
+ *
+ * Drop the local slots that no longer need to be synced i.e. these either do
+ * not exist on the primary or are no longer enabled for failover.
+ *
+ * Additionally, drop any slots that are valid on the primary but got
+ * invalidated on the standby. This situation may occur due to the following
+ * reasons:
+ * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
+ * records from the restart_lsn of the slot.
+ * - 'primary_slot_name' is temporarily reset to null and the physical slot is
+ * removed.
+ * These dropped slots will get recreated in next sync-cycle and it is okay to
+ * drop and recreate such slots as long as these are not consumable on the
+ * standby (which is the case currently).
+ *
+ * Note: Change of 'wal_level' on the primary server to a level lower than
+ * logical may also result in slot invalidation and removal on the standby.
+ * This is because such 'wal_level' change is only possible if the logical
+ * slots are removed on the primary server, so it's expected to see the
+ * slots being invalidated and removed on the standby too (and re-created
+ * if they are re-created on the primary server).
+ */
+static void
+drop_local_obsolete_slots(List *remote_slot_list)
+{
+ List *local_slots = get_local_synced_slots();
+
+ foreach_ptr(ReplicationSlot, local_slot, local_slots)
+ {
+ /* Drop the local slot if it is not required to be retained. */
+ if (!local_sync_slot_required(local_slot, remote_slot_list))
+ {
+ bool synced_slot;
+
+ /*
+ * Use shared lock to prevent a conflict with
+ * ReplicationSlotsDropDBSlots(), trying to drop the same slot
+ * during a drop-database operation.
+ */
+ LockSharedObject(DatabaseRelationId, local_slot->data.database,
+ 0, AccessShareLock);
+
+ /*
+ * In the small window between getting the slot to drop and
+ * locking the database, there is a possibility of a parallel
+ * database drop by the startup process and the creation of a new
+ * slot by the user. This new user-created slot may end up using
+ * the same shared memory as that of 'local_slot'. Thus check if
+ * local_slot is still the synced one before performing actual
+ * drop.
+ */
+ SpinLockAcquire(&local_slot->mutex);
+ synced_slot = local_slot->in_use && local_slot->data.synced;
+ SpinLockRelease(&local_slot->mutex);
+
+ if (synced_slot)
+ {
+ ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+ ReplicationSlotDropAcquired();
+ }
+
+ UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
+ 0, AccessShareLock);
+
+ ereport(LOG,
+ errmsg("dropped replication slot \"%s\" of dbid %d",
+ NameStr(local_slot->data.name),
+ local_slot->data.database));
+ }
+ }
+}
+
+/*
+ * Reserve WAL for the currently active local slot using the specified WAL
+ * location (restart_lsn).
+ *
+ * If the given WAL location has been removed, reserve WAL using the oldest
+ * existing WAL segment.
+ */
+static void
+reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
+{
+ XLogSegNo oldest_segno;
+ XLogSegNo segno;
+ ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(slot != NULL);
+ Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn));
+
+ while (true)
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->data.restart_lsn = restart_lsn;
+ SpinLockRelease(&slot->mutex);
+
+ /* Prevent WAL removal as fast as possible */
+ ReplicationSlotsComputeRequiredLSN();
+
+ XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+
+ /*
+ * Find the oldest existing WAL segment file.
+ *
+ * Normally, we can determine it by using the last removed segment
+ * number. However, if no WAL segment files have been removed by a
+ * checkpoint since startup, we need to search for the oldest segment
+ * file from the current timeline existing in XLOGDIR.
+ *
+ * XXX: Currently, we are searching for the oldest segment in the
+ * current timeline as there is less chance of the slot's restart_lsn
+ * from being some prior timeline, and even if it happens, in the
+ * worst case, we will wait to sync till the slot's restart_lsn moved
+ * to the current timeline.
+ */
+ oldest_segno = XLogGetLastRemovedSegno() + 1;
+
+ if (oldest_segno == 1)
+ {
+ TimeLineID cur_timeline;
+
+ GetWalRcvFlushRecPtr(NULL, &cur_timeline);
+ oldest_segno = XLogGetOldestSegno(cur_timeline);
+ }
+
+ /*
+ * If all required WAL is still there, great, otherwise retry. The
+ * slot should prevent further removal of WAL, unless there's a
+ * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+ * the new restart_lsn above, so normally we should never need to loop
+ * more than twice.
+ */
+ if (segno >= oldest_segno)
+ break;
+
+ /* Retry using the location of the oldest wal segment */
+ XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
+ }
+}
+
+/*
+ * If the remote restart_lsn and catalog_xmin have caught up with the
+ * local ones, then update the LSNs and persist the local synced slot for
+ * future synchronization; otherwise, do nothing.
+ */
+static void
+update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+{
+ ReplicationSlot *slot = MyReplicationSlot;
+
+ /*
+ * Check if the primary server has caught up. Refer to the comment atop
+ * the file for details on this check.
+ */
+ if (remote_slot->restart_lsn < slot->data.restart_lsn ||
+ TransactionIdPrecedes(remote_slot->catalog_xmin,
+ slot->data.catalog_xmin))
+ {
+ /*
+ * The remote slot didn't catch up to locally reserved position.
+ *
+ * We do not drop the slot because the restart_lsn can be ahead of the
+ * current location when recreating the slot in the next cycle. It may
+ * take more time to create such a slot. Therefore, we keep this slot
+ * and attempt the synchronization in the next cycle.
+ */
+ return;
+ }
+
+ /* First time slot update, the function must return true */
+ if (!update_local_synced_slot(remote_slot, remote_dbid))
+ elog(ERROR, "failed to update slot");
+
+ ReplicationSlotPersist();
+
+ ereport(LOG,
+ errmsg("newly created slot \"%s\" is sync-ready now",
+ remote_slot->name));
+}
+
+/*
+ * Synchronize a single slot to the given position.
+ *
+ * This creates a new slot if there is no existing one and updates the
+ * metadata of the slot as per the data received from the primary server.
+ *
+ * The slot is created as a temporary slot and stays in the same state until the
+ * the remote_slot catches up with locally reserved position and local slot is
+ * updated. The slot is then persisted and is considered as sync-ready for
+ * periodic syncs.
+ */
+static void
+synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+{
+ ReplicationSlot *slot;
+ XLogRecPtr latestFlushPtr;
+
+ /*
+ * Make sure that concerned WAL is received and flushed before syncing
+ * slot to target lsn received from the primary server.
+ */
+ latestFlushPtr = GetStandbyFlushRecPtr(NULL);
+ if (remote_slot->confirmed_lsn > latestFlushPtr)
+ elog(ERROR,
+ "skipping slot synchronization as the received slot sync"
+ " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr));
+
+ /* Search for the named slot */
+ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
+ {
+ bool synced;
+
+ SpinLockAcquire(&slot->mutex);
+ synced = slot->data.synced;
+ SpinLockRelease(&slot->mutex);
+
+ /* User-created slot with the same name exists, raise ERROR. */
+ if (!synced)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("exiting from slot synchronization because same"
+ " name slot \"%s\" already exists on the standby",
+ remote_slot->name));
+
+ /*
+ * The slot has been synchronized before.
+ *
+ * It is important to acquire the slot here before checking
+ * invalidation. If we don't acquire the slot first, there could be a
+ * race condition that the local slot could be invalidated just after
+ * checking the 'invalidated' flag here and we could end up
+ * overwriting 'invalidated' flag to remote_slot's value. See
+ * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
+ * if the slot is not acquired by other processes.
+ */
+ ReplicationSlotAcquire(remote_slot->name, true);
+
+ Assert(slot == MyReplicationSlot);
+
+ /*
+ * Copy the invalidation cause from remote only if local slot is not
+ * invalidated locally, we don't want to overwrite existing one.
+ */
+ if (slot->data.invalidated == RS_INVAL_NONE &&
+ remote_slot->invalidated != RS_INVAL_NONE)
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->data.invalidated = remote_slot->invalidated;
+ SpinLockRelease(&slot->mutex);
+
+ /* Make sure the invalidated state persists across server restart */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ }
+
+ /* Skip the sync of an invalidated slot */
+ if (slot->data.invalidated != RS_INVAL_NONE)
+ {
+ ReplicationSlotRelease();
+ return;
+ }
+
+ /* Slot not ready yet, let's attempt to make it sync-ready now. */
+ if (slot->data.persistency == RS_TEMPORARY)
+ {
+ update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ }
+
+ /* Slot ready for sync, so sync it. */
+ else
+ {
+ /*
+ * Sanity check: As long as the invalidations are handled
+ * appropriately as above, this should never happen.
+ */
+ if (remote_slot->restart_lsn < slot->data.restart_lsn)
+ elog(ERROR,
+ "cannot synchronize local slot \"%s\" LSN(%X/%X)"
+ " to remote slot's LSN(%X/%X) as synchronization"
+ " would move it backwards", remote_slot->name,
+ LSN_FORMAT_ARGS(slot->data.restart_lsn),
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn));
+
+ /* Make sure the slot changes persist across server restart */
+ if (update_local_synced_slot(remote_slot, remote_dbid))
+ {
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ }
+ }
+ }
+ /* Otherwise create the slot first. */
+ else
+ {
+ NameData plugin_name;
+ TransactionId xmin_horizon = InvalidTransactionId;
+
+ /* Skip creating the local slot if remote_slot is invalidated already */
+ if (remote_slot->invalidated != RS_INVAL_NONE)
+ return;
+
+ /*
+ * We create temporary slots instead of ephemeral slots here because
+ * we want the slots to survive after releasing them. This is done to
+ * avoid dropping and re-creating the slots in each synchronization
+ * cycle if the restart_lsn or catalog_xmin of the remote slot has not
+ * caught up.
+ */
+ ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
+ remote_slot->two_phase,
+ remote_slot->failover,
+ true);
+
+ /* For shorter lines. */
+ slot = MyReplicationSlot;
+
+ /* Avoid expensive operations while holding a spinlock. */
+ namestrcpy(&plugin_name, remote_slot->plugin);
+
+ SpinLockAcquire(&slot->mutex);
+ slot->data.database = remote_dbid;
+ slot->data.plugin = plugin_name;
+ SpinLockRelease(&slot->mutex);
+
+ reserve_wal_for_local_slot(remote_slot->restart_lsn);
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+ SpinLockAcquire(&slot->mutex);
+ slot->effective_catalog_xmin = xmin_horizon;
+ slot->data.catalog_xmin = xmin_horizon;
+ SpinLockRelease(&slot->mutex);
+ ReplicationSlotsComputeRequiredXmin(true);
+ LWLockRelease(ProcArrayLock);
+
+ update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ }
+
+ ReplicationSlotRelease();
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Gets the failover logical slots info from the primary server and updates
+ * the slots locally. Creates the slots if not present on the standby.
+ */
+static void
+synchronize_slots(WalReceiverConn *wrconn)
+{
+#define SLOTSYNC_COLUMN_COUNT 9
+ Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
+ LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+
+ WalRcvExecResult *res;
+ TupleTableSlot *tupslot;
+ StringInfoData s;
+ List *remote_slot_list = NIL;
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+
+ initStringInfo(&s);
+
+ /* Construct query to fetch slots with failover enabled. */
+ appendStringInfo(&s,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase, failover,"
+ " database, conflict_reason"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE failover and NOT temporary");
+
+ /* Execute the query */
+ res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow);
+ pfree(s.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ errmsg("could not fetch failover logical slots info from the primary server: %s",
+ res->err));
+
+ /* Construct the remote_slot tuple and synchronize each slot locally */
+ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+ {
+ bool isnull;
+ RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
+ Datum d;
+ int col = 0;
+
+ remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ /*
+ * It is possible to get null values for LSN and Xmin if slot is
+ * invalidated on the primary server, so handle accordingly.
+ */
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
+ DatumGetLSN(d);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
+ DatumGetTransactionId(d);
+
+ remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
+ &isnull));
+ Assert(!isnull);
+
+ remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
+ ++col, &isnull));
+ Assert(!isnull);
+
+ d = slot_getattr(tupslot, ++col, &isnull);
+ remote_slot->invalidated = isnull ? RS_INVAL_NONE :
+ GetSlotInvalidationCause(TextDatumGetCString(d));
+
+ /* Sanity check */
+ Assert(col == SLOTSYNC_COLUMN_COUNT);
+
+ /*
+ * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
+ * slot is valid, that means we have fetched the remote_slot in its
+ * RS_EPHEMERAL state. In such a case, don't sync it; we can always
+ * sync it in the next sync cycle when the remote_slot is persisted
+ * and has valid lsn(s) and xmin values.
+ *
+ * XXX: In future, if we plan to expose 'slot->data.persistency' in
+ * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
+ * slots in the first place.
+ */
+ if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
+ XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
+ !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
+ remote_slot->invalidated == RS_INVAL_NONE)
+ pfree(remote_slot);
+ else
+ /* Create list of remote slots */
+ remote_slot_list = lappend(remote_slot_list, remote_slot);
+
+ ExecClearTuple(tupslot);
+ }
+
+ /* Drop local slots that no longer need to be synced. */
+ drop_local_obsolete_slots(remote_slot_list);
+
+ /* Now sync the slots locally */
+ foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
+ {
+ Oid remote_dbid = get_database_oid(remote_slot->database, false);
+
+ /*
+ * Use shared lock to prevent a conflict with
+ * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
+ * a drop-database operation.
+ */
+ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
+
+ synchronize_one_slot(remote_slot, remote_dbid);
+
+ UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
+ }
+
+ /* We are done, free remote_slot_list elements */
+ list_free_deep(remote_slot_list);
+
+ walrcv_clear_result(res);
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+}
+
+/*
+ * Checks the remote server info.
+ *
+ * We ensure that the 'primary_slot_name' exists on the remote server and the
+ * remote server is not a standby node.
+ */
+static void
+validate_remote_info(WalReceiverConn *wrconn)
+{
+#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
+ WalRcvExecResult *res;
+ Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
+ StringInfoData cmd;
+ bool isnull;
+ TupleTableSlot *tupslot;
+ bool remote_in_recovery;
+ bool primary_slot_valid;
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT pg_is_in_recovery(), count(*) = 1"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE slot_type='physical' AND slot_name=%s",
+ quote_literal_cstr(PrimarySlotName));
+
+ res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
+ PrimarySlotName, res->err),
+ errhint("Check if \"primary_slot_name\" is configured correctly."));
+
+ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+ elog(ERROR,
+ "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
+
+ remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
+ Assert(!isnull);
+
+ if (remote_in_recovery)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot synchronize replication slots from a standby server"));
+
+ primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
+ Assert(!isnull);
+
+ if (!primary_slot_valid)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("bad configuration for slot synchronization"),
+ /* translator: second %s is a GUC variable name */
+ errdetail("The replication slot \"%s\" specified by \"%s\" does not exist on the primary server.",
+ PrimarySlotName, "primary_slot_name"));
+
+ ExecClearTuple(tupslot);
+ walrcv_clear_result(res);
+}
+
+/*
+ * Check all necessary GUCs for slot synchronization are set
+ * appropriately, otherwise, raise ERROR.
+ */
+void
+ValidateSlotSyncParams(void)
+{
+ char *dbname;
+
+ /*
+ * A physical replication slot(primary_slot_name) is required on the
+ * primary to ensure that the rows needed by the standby are not removed
+ * after restarting, so that the synchronized slot on the standby will not
+ * be invalidated.
+ */
+ if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
+ ereport(ERROR,
+ /* translator: %s is a GUC variable name */
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("bad configuration for slot synchronization"),
+ errhint("\"%s\" must be defined.", "primary_slot_name"));
+
+ /*
+ * hot_standby_feedback must be enabled to cooperate with the physical
+ * replication slot, which allows informing the primary about the xmin and
+ * catalog_xmin values on the standby.
+ */
+ if (!hot_standby_feedback)
+ ereport(ERROR,
+ /* translator: %s is a GUC variable name */
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("bad configuration for slot synchronization"),
+ errhint("\"%s\" must be enabled.", "hot_standby_feedback"));
+
+ /* Logical slot sync/creation requires wal_level >= logical. */
+ if (wal_level < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("bad configuration for slot synchronization"),
+ errhint("\"wal_level\" must be >= logical."));
+
+ /*
+ * The primary_conninfo is required to make connection to primary for
+ * getting slots information.
+ */
+ if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
+ ereport(ERROR,
+ /* translator: %s is a GUC variable name */
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("bad configuration for slot synchronization"),
+ errhint("\"%s\" must be defined.", "primary_conninfo"));
+
+ /*
+ * The slot synchronization needs a database connection for walrcv_exec to
+ * work.
+ */
+ dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
+ if (dbname == NULL)
+ ereport(ERROR,
+
+ /*
+ * translator: 'dbname' is a specific option; %s is a GUC variable
+ * name
+ */
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("bad configuration for slot synchronization"),
+ errhint("'dbname' must be specified in \"%s\".", "primary_conninfo"));
+}
+
+/*
+ * Is current process syncing replication slots ?
+ */
+bool
+IsSyncingReplicationSlots(void)
+{
+ return syncing_slots;
+}
+
+/*
+ * Amount of shared memory required for slot synchronization.
+ */
+Size
+SlotSyncShmemSize(void)
+{
+ return sizeof(SlotSyncCtxStruct);
+}
+
+/*
+ * Allocate and initialize the shared memory of slot synchronization.
+ */
+void
+SlotSyncShmemInit(void)
+{
+ bool found;
+
+ SlotSyncCtx = (SlotSyncCtxStruct *)
+ ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found);
+
+ if (!found)
+ {
+ SlotSyncCtx->syncing = false;
+ SpinLockInit(&SlotSyncCtx->mutex);
+ }
+}
+
+/*
+ * Error cleanup callback for slot synchronization.
+ */
+static void
+slotsync_failure_callback(int code, Datum arg)
+{
+ WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+ if (syncing_slots)
+ {
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory
+ * and reset the flag here.
+ */
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+ }
+
+ walrcv_disconnect(wrconn);
+}
+
+/*
+ * Synchronize the failover enabled replication slots using the specified
+ * primary server connection.
+ */
+void
+SyncReplicationSlots(WalReceiverConn *wrconn)
+{
+ PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+ {
+ validate_remote_info(wrconn);
+
+ synchronize_slots(wrconn);
+ }
+ PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+}
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/slotsync.h"
#include "replication/slot.h"
#include "storage/fd.h"
#include "storage/ipc.h"
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
-#define SLOT_VERSION 4 /* version for new files */
+#define SLOT_VERSION 5 /* version for new files */
/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
* slots */
static void ReplicationSlotShmemExit(int code, Datum arg);
-static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
* user will only get commit prepared.
* failover: If enabled, allows the slot to be synced to standbys so
* that logical replication can be resumed after failover.
+ * synced: True if the slot is synchronized from the primary server.
*/
void
ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover)
+ bool two_phase, bool failover, bool synced)
{
ReplicationSlot *slot = NULL;
int i;
ReplicationSlotValidateName(name, ERROR);
+ if (failover)
+ {
+ /*
+ * Do not allow users to create the failover enabled slots on the
+ * standby as we do not support sync to the cascading standby.
+ *
+ * However, failover enabled slots can be created during slot
+ * synchronization because we need to retain the same values as the
+ * remote slot.
+ */
+ if (RecoveryInProgress() && !IsSyncingReplicationSlots())
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable failover for a replication slot created on the standby"));
+
+ /*
+ * Do not allow users to create failover enabled temporary slots,
+ * because temporary slots will not be synced to the standby.
+ *
+ * However, failover enabled temporary slots can be created during
+ * slot synchronization. See the comments atop slotsync.c for details.
+ */
+ if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable failover for a temporary replication slot"));
+ }
+
/*
* If some other backend ran this code concurrently with us, we'd likely
* both allocate the same slot, and that would be bad. We'd also be at
slot->data.two_phase = two_phase;
slot->data.two_phase_at = InvalidXLogRecPtr;
slot->data.failover = failover;
+ slot->data.synced = synced;
/* and then data only present in shared memory */
slot->just_dirtied = false;
ReplicationSlotAcquire(name, nowait);
+ /*
+ * Do not allow users to drop the slots which are currently being synced
+ * from the primary to the standby.
+ */
+ if (RecoveryInProgress() && MyReplicationSlot->data.synced)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot drop replication slot \"%s\"", name),
+ errdetail("This slot is being synced from the primary server."));
+
ReplicationSlotDropAcquired();
}
errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT"));
+ if (RecoveryInProgress())
+ {
+ /*
+ * Do not allow users to alter the slots which are currently being
+ * synced from the primary to the standby.
+ */
+ if (MyReplicationSlot->data.synced)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot alter replication slot \"%s\"", name),
+ errdetail("This slot is being synced from the primary server."));
+
+ /*
+ * Do not allow users to enable failover on the standby as we do not
+ * support sync to the cascading standby.
+ */
+ if (failover)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable failover for a replication slot"
+ " on the standby"));
+ }
+
+ /*
+ * Do not allow users to enable failover for temporary slots as we do not
+ * support syncing temporary slots to the standby.
+ */
+ if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable failover for a temporary replication slot"));
+
if (MyReplicationSlot->data.failover != failover)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
/*
* Permanently drop the currently acquired replication slot.
*/
-static void
+void
ReplicationSlotDropAcquired(void)
{
ReplicationSlot *slot = MyReplicationSlot;
}
/*
- * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
- * guaranteeing it will be there after an eventual crash.
+ * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
+ * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
*/
void
ReplicationSlotPersist(void)
(errmsg("too many replication slots active before shutdown"),
errhint("Increase max_replication_slots and try again.")));
}
+
+/*
+ * Maps the pg_replication_slots.conflict_reason text value to
+ * ReplicationSlotInvalidationCause enum value
+ */
+ReplicationSlotInvalidationCause
+GetSlotInvalidationCause(char *conflict_reason)
+{
+ Assert(conflict_reason);
+
+ if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0)
+ return RS_INVAL_WAL_REMOVED;
+ else if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0)
+ return RS_INVAL_HORIZON;
+ else if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0)
+ return RS_INVAL_WAL_LEVEL;
+ else
+ Assert(0);
+
+ /* Keep compiler quiet */
+ return RS_INVAL_NONE;
+}
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
+#include "replication/slotsync.h"
#include "utils/builtins.h"
+#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/pg_lsn.h"
#include "utils/resowner.h"
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
- false);
+ false, false);
if (immediately_reserve)
{
*/
ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
- failover);
+ failover, false);
/*
* Create logical decoding context to find start point or, if we don't
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 16
+#define PG_GET_REPLICATION_SLOTS_COLS 17
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
break;
case RS_INVAL_WAL_REMOVED:
- values[i++] = CStringGetTextDatum("wal_removed");
+ values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT);
break;
case RS_INVAL_HORIZON:
- values[i++] = CStringGetTextDatum("rows_removed");
+ values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT);
break;
case RS_INVAL_WAL_LEVEL:
- values[i++] = CStringGetTextDatum("wal_level_insufficient");
+ values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT);
break;
}
}
values[i++] = BoolGetDatum(slot_contents.data.failover);
+ values[i++] = BoolGetDatum(slot_contents.data.synced);
+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
XLogRecPtr src_restart_lsn;
bool src_islogical;
bool temporary;
- bool failover;
char *plugin;
Datum values[2];
bool nulls[2];
src_islogical = SlotIsLogical(&first_slot_contents);
src_restart_lsn = first_slot_contents.data.restart_lsn;
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
- failover = first_slot_contents.data.failover;
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
/* Check type of replication slot */
* We must not try to read WAL, since we haven't reserved it yet --
* hence pass find_startpoint false. confirmed_flush will be set
* below, by copying from the source slot.
+ *
+ * To avoid potential issues with the slot synchronization where the
+ * restart_lsn of a replication slot can go backward, we set the
+ * failover option to false here. This situation occurs when a slot
+ * on the primary server is dropped and immediately replaced with a
+ * new slot of the same name, created by copying from another existing
+ * slot. However, the slot synchronization will only observe the
+ * restart_lsn of the same slot going backward.
*/
create_logical_replication_slot(NameStr(*dst_name),
plugin,
temporary,
false,
- failover,
+ false,
src_restart_lsn,
false);
}
{
return copy_replication_slot(fcinfo, false);
}
+
+/*
+ * Synchronize failover enabled replication slots to a standby server
+ * from the primary server.
+ */
+Datum
+pg_sync_replication_slots(PG_FUNCTION_ARGS)
+{
+ WalReceiverConn *wrconn;
+ char *err;
+ StringInfoData app_name;
+
+ CheckSlotPermissions();
+
+ if (!RecoveryInProgress())
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slots can only be synchronized to a standby server"));
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ ValidateSlotSyncParams();
+
+ initStringInfo(&app_name);
+ if (cluster_name[0])
+ appendStringInfo(&app_name, "%s_slotsync", cluster_name);
+ else
+ appendStringInfoString(&app_name, "slotsync");
+
+ /* Connect to the primary server. */
+ wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
+ app_name.data, &err);
+ pfree(app_name.data);
+
+ if (!wrconn)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the primary server: %s", err));
+
+ SyncReplicationSlots(wrconn);
+
+ walrcv_disconnect(wrconn);
+
+ PG_RETURN_VOID();
+}
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
+#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
static void XLogSendPhysical(void);
static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
-static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
static void IdentifySystem(void);
static void UploadManifest(void);
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset,
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
- false, false);
+ false, false, false);
if (reserve_wal)
{
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- two_phase, failover);
+ two_phase, failover, false);
/*
* Do options check early so that we can bail before calling the
}
/*
- * Returns the latest point in WAL that has been safely flushed to disk, and
- * can be sent to the standby. This should only be called when in recovery,
- * ie. we're streaming to a cascaded standby.
+ * Returns the latest point in WAL that has been safely flushed to disk.
+ * This should only be called when in recovery.
+ *
+ * This is called either by cascading walsender to find WAL postion to be sent
+ * to a cascaded standby or by slot synchronization function to validate remote
+ * slot's lsn before syncing it locally.
*
* As a side-effect, *tli is updated to the TLI of the last
* replayed WAL record.
*/
-static XLogRecPtr
+XLogRecPtr
GetStandbyFlushRecPtr(TimeLineID *tli)
{
XLogRecPtr replayPtr;
TimeLineID receiveTLI;
XLogRecPtr result;
+ Assert(am_cascading_walsender || IsSyncingReplicationSlots());
+
/*
* We can safely send what's already been replayed. Also, if walreceiver
* is streaming WAL from the same timeline, we can send anything that it
#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/slot.h"
+#include "replication/slotsync.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
size = add_size(size, StatsShmemSize());
size = add_size(size, WaitEventExtensionShmemSize());
size = add_size(size, InjectionPointShmemSize());
+ size = add_size(size, SlotSyncShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
WalSummarizerShmemInit();
PgArchShmemInit();
ApplyLauncherShmemInit();
+ SlotSyncShmemInit();
/*
* Set up other modules that need some shared memory space
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202401301
+#define CATALOG_VERSION_NO 202402141
#endif
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
- proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}',
+ proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
prosrc => 'pg_logical_emit_message_bytea' },
+{ oid => '9929', descr => 'sync replication slots from the primary to the standby',
+ proname => 'pg_sync_replication_slots', provolatile => 'v', proparallel => 'u',
+ prorettype => 'void', proargtypes => '',
+ prosrc => 'pg_sync_replication_slots' },
# event triggers
{ oid => '3566', descr => 'list objects dropped by the current command',
RS_INVAL_WAL_LEVEL,
} ReplicationSlotInvalidationCause;
+/*
+ * The possible values for 'conflict_reason' returned in
+ * pg_get_replication_slots.
+ */
+#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed"
+#define SLOT_INVAL_HORIZON_TEXT "rows_removed"
+#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient"
+
/*
* On-Disk data of a replication slot, preserved across restarts.
*/
/* plugin name */
NameData plugin;
+ /*
+ * Was this slot synchronized from the primary server?
+ */
+ char synced;
+
/*
* Is this a failover slot (sync candidate for standbys)? Only relevant
* for logical slots on the primary server.
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover);
+ bool two_phase, bool failover,
+ bool synced);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
+extern void ReplicationSlotDropAcquired(void);
extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
+extern ReplicationSlotInvalidationCause
+ GetSlotInvalidationCause(char *conflict_reason);
#endif /* SLOT_H */
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * slotsync.h
+ * Exports for slot synchronization.
+ *
+ * Portions Copyright (c) 2016-2024, PostgreSQL Global Development Group
+ *
+ * src/include/replication/slotsync.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOTSYNC_H
+#define SLOTSYNC_H
+
+#include "replication/walreceiver.h"
+
+extern void ValidateSlotSyncParams(void);
+extern bool IsSyncingReplicationSlots(void);
+extern Size SlotSyncShmemSize(void);
+extern void SlotSyncShmemInit(void);
+extern void SyncReplicationSlots(WalReceiverConn *wrconn);
+
+#endif /* SLOTSYNC_H */
#ifndef _WALSENDER_H
#define _WALSENDER_H
+#include "access/xlogdefs.h"
+
/*
* What to do with a snapshot in create replication slot command.
*/
extern bool exec_replication_command(const char *cmd_string);
extern void WalSndErrorCleanup(void);
extern void WalSndResourceCleanup(bool isCommit);
+extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
ok( $stderr =~ /ERROR: cannot set failover for enabled subscription/,
"altering failover is not allowed for enabled subscription");
+##################################################
+# Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
+##################################################
+
+($result, $stdout, $stderr) =
+ $publisher->psql('postgres', "SELECT pg_sync_replication_slots();");
+ok( $stderr =~
+ /ERROR: replication slots can only be synchronized to a standby server/,
+ "cannot sync slots on a non-standby server");
+
+##################################################
+# Test logical failover slots on the standby
+# Configure standby1 to replicate and synchronize logical slots configured
+# for failover on the primary
+#
+# failover slot lsub1_slot ->| ----> subscriber1 (connected via logical replication)
+# failover slot lsub2_slot | inactive
+# primary ---> |
+# physical slot sb1_slot --->| ----> standby1 (connected via streaming replication)
+# | lsub1_slot, lsub2_slot (synced_slot)
+##################################################
+
+my $primary = $publisher;
+my $backup_name = 'backup';
+$primary->backup($backup_name);
+
+# Create a standby
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+ $primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+
+my $connstr_1 = $primary->connstr;
+$standby1->append_conf(
+ 'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+$primary->psql('postgres',
+ q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
+);
+
+$primary->psql('postgres',
+ q{SELECT pg_create_physical_replication_slot('sb1_slot');});
+
+# Start the standby so that slot syncing can begin
+$standby1->start;
+
+$primary->wait_for_catchup('regress_mysub1');
+
+# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
+$subscriber1->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+ 'postgres',
+ "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
+ 1);
+
+# Wait for the standby to catch up so that the standby is not lagging behind
+# the subscriber.
+$primary->wait_for_replay_catchup($standby1);
+
+# Synchronize the primary server slots to the standby.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slots are created on the standby and are
+# flagged as 'synced'
+is( $standby1->safe_psql(
+ 'postgres',
+ q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced;}
+ ),
+ "t",
+ 'logical slots have synced as true on standby');
+
+##################################################
+# Test that the synchronized slot will be dropped if the corresponding remote
+# slot on the primary server has been dropped.
+##################################################
+
+$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
+
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+is( $standby1->safe_psql(
+ 'postgres',
+ q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
+ ),
+ "t",
+ 'synchronized slot has been dropped');
+
+##################################################
+# Test that if the synchronized slot is invalidated while the remote slot is
+# still valid, the slot will be dropped and re-created on the standby by
+# executing pg_sync_replication_slots() again.
+##################################################
+
+# Configure the max_slot_wal_keep_size so that the synced slot can be
+# invalidated due to wal removal.
+$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = 64kB');
+$standby1->reload;
+
+# Generate some activity and switch WAL file on the primary
+$primary->advance_wal(1);
+$primary->psql('postgres', "CHECKPOINT");
+$primary->wait_for_replay_catchup($standby1);
+
+# Request a checkpoint on the standby to trigger the WAL file(s) removal
+$standby1->safe_psql('postgres', "CHECKPOINT");
+
+# Check if the synced slot is invalidated
+is( $standby1->safe_psql(
+ 'postgres',
+ q{SELECT conflict_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+ ),
+ "t",
+ 'synchronized slot has been invalidated');
+
+# Reset max_slot_wal_keep_size to avoid further wal removal
+$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
+$standby1->reload;
+
+# Enable the subscription to let it catch up to the latest wal position
+$subscriber1->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+
+$primary->wait_for_catchup('regress_mysub1');
+
+# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
+$subscriber1->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+ 'postgres',
+ "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
+ 1);
+
+# Wait for the standby to catch up so that the standby is not lagging behind
+# the subscriber.
+$primary->wait_for_replay_catchup($standby1);
+
+my $log_offset = -s $standby1->logfile;
+
+# Synchronize the primary server slots to the standby.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the invalidated slot has been dropped.
+$standby1->wait_for_log(qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/,
+ $log_offset);
+
+# Confirm that the logical slot has been re-created on the standby and is
+# flagged as 'synced'
+is( $standby1->safe_psql(
+ 'postgres',
+ q{SELECT conflict_reason IS NULL AND synced FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+ ),
+ "t",
+ 'logical slot is re-synced');
+
+##################################################
+# Test that a synchronized slot can not be decoded, altered or dropped by the
+# user
+##################################################
+
+# Attempting to perform logical decoding on a synced slot should result in an error
+($result, $stdout, $stderr) = $standby1->psql('postgres',
+ "select * from pg_logical_slot_get_changes('lsub1_slot', NULL, NULL);");
+ok( $stderr =~
+ /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/,
+ "logical decoding is not allowed on synced slot");
+
+# Attempting to alter a synced slot should result in an error
+($result, $stdout, $stderr) = $standby1->psql(
+ 'postgres',
+ qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);],
+ replication => 'database');
+ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/,
+ "synced slot on standby cannot be altered");
+
+# Attempting to drop a synced slot should result in an error
+($result, $stdout, $stderr) = $standby1->psql('postgres',
+ "SELECT pg_drop_replication_slot('lsub1_slot');");
+ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/,
+ "synced slot on standby cannot be dropped");
+
+##################################################
+# Test that we cannot synchronize slots if dbname is not specified in the
+# primary_conninfo.
+##################################################
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1'");
+$standby1->reload;
+
+($result, $stdout, $stderr) =
+ $standby1->psql('postgres', "SELECT pg_sync_replication_slots();");
+ok( $stderr =~
+ /HINT: 'dbname' must be specified in "primary_conninfo"/,
+ "cannot sync slots if dbname is not specified in primary_conninfo");
+
+##################################################
+# Test that we cannot synchronize slots to a cascading standby server.
+##################################################
+
+# Create a cascading standby
+$backup_name = 'backup2';
+$standby1->backup($backup_name);
+
+my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+$cascading_standby->init_from_backup(
+ $standby1, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+
+my $cascading_connstr = $standby1->connstr;
+$cascading_standby->append_conf(
+ 'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'cascading_sb_slot'
+primary_conninfo = '$cascading_connstr dbname=postgres'
+));
+
+$standby1->psql('postgres',
+ q{SELECT pg_create_physical_replication_slot('cascading_sb_slot');});
+
+$cascading_standby->start;
+
+($result, $stdout, $stderr) =
+ $cascading_standby->psql('postgres', "SELECT pg_sync_replication_slots();");
+ok( $stderr =~
+ /ERROR: cannot synchronize replication slots from a standby server/,
+ "cannot sync slots to a cascading standby server");
+
done_testing();
l.safe_wal_size,
l.two_phase,
l.conflict_reason,
- l.failover
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover)
+ l.failover,
+ l.synced
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
RelptrFreePageBtree
RelptrFreePageManager
RelptrFreePageSpanLeader
+RemoteSlot
RenameStmt
ReopenPtrType
ReorderBuffer
SlabContext
SlabSlot
SlotNumber
+SlotSyncCtxStruct
SlruCtl
SlruCtlData
SlruErrorCause