Add a slot synchronization function.
authorAmit Kapila <[email protected]>
Wed, 14 Feb 2024 04:15:36 +0000 (09:45 +0530)
committerAmit Kapila <[email protected]>
Wed, 14 Feb 2024 04:15:36 +0000 (09:45 +0530)
This commit introduces a new SQL function pg_sync_replication_slots()
which is used to synchronize the logical replication slots from the
primary server to the physical standby so that logical replication can be
resumed after a failover or planned switchover.

A new 'synced' flag is introduced in pg_replication_slots view, indicating
whether the slot has been synchronized from the primary server. On a
standby, synced slots cannot be dropped or consumed, and any attempt to
perform logical decoding on them will result in an error.

The logical replication slots on the primary can be synchronized to the
hot standby by using the 'failover' parameter of
pg-create-logical-replication-slot(), or by using the 'failover' option of
CREATE SUBSCRIPTION during slot creation, and then calling
pg_sync_replication_slots() on standby. For the synchronization to work,
it is mandatory to have a physical replication slot between the primary
and the standby aka 'primary_slot_name' should be configured on the
standby, and 'hot_standby_feedback' must be enabled on the standby. It is
also necessary to specify a valid 'dbname' in the 'primary_conninfo'.

If a logical slot is invalidated on the primary, then that slot on the
standby is also invalidated.

If a logical slot on the primary is valid but is invalidated on the
standby, then that slot is dropped but will be recreated on the standby in
the next pg_sync_replication_slots() call provided the slot still exists
on the primary server. It is okay to recreate such slots as long as these
are not consumable on standby (which is the case currently). This
situation may occur due to the following reasons:
- The 'max_slot_wal_keep_size' on the standby is insufficient to retain
WAL records from the restart_lsn of the slot.
- 'primary_slot_name' is temporarily reset to null and the physical slot
is removed.

The slot synchronization status on the standby can be monitored using the
'synced' column of pg_replication_slots view.

A functionality to automatically synchronize slots by a background worker
and allow logical walsenders to wait for the physical will be done in
subsequent commits.

Author: Hou Zhijie, Shveta Malik, Ajin Cherian based on an earlier version by Peter Eisentraut
Reviewed-by: Masahiko Sawada, Bertrand Drouvot, Peter Smith, Dilip Kumar, Nisha Moond, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com

26 files changed:
contrib/test_decoding/expected/permissions.out
contrib/test_decoding/expected/slot.out
contrib/test_decoding/sql/permissions.sql
contrib/test_decoding/sql/slot.sql
doc/src/sgml/config.sgml
doc/src/sgml/func.sgml
doc/src/sgml/logicaldecoding.sgml
doc/src/sgml/protocol.sgml
doc/src/sgml/system-views.sgml
src/backend/catalog/system_views.sql
src/backend/replication/logical/Makefile
src/backend/replication/logical/logical.c
src/backend/replication/logical/meson.build
src/backend/replication/logical/slotsync.c [new file with mode: 0644]
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/backend/storage/ipc/ipci.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/slot.h
src/include/replication/slotsync.h [new file with mode: 0644]
src/include/replication/walsender.h
src/test/recovery/t/040_standby_failover_slots_sync.pl
src/test/regress/expected/rules.out
src/tools/pgindent/typedefs.list

index d6eaba8c55d68a26975f30a7651bd06902f7dd62..8d100646ce642d971fd22b398cf54112ffd803e6 100644 (file)
@@ -64,6 +64,9 @@ DETAIL:  Only roles with the REPLICATION attribute may use replication slots.
 SELECT pg_drop_replication_slot('regression_slot');
 ERROR:  permission denied to use replication slots
 DETAIL:  Only roles with the REPLICATION attribute may use replication slots.
+SELECT pg_sync_replication_slots();
+ERROR:  permission denied to use replication slots
+DETAIL:  Only roles with the REPLICATION attribute may use replication slots.
 RESET ROLE;
 -- replication users can drop superuser created slots
 SET ROLE regress_lr_superuser;
index 261d8886d3b8eefae0a5bbebf465ee30f530b98e..349ab2d38092f657c15126f9a43748fb8122f240 100644 (file)
@@ -425,6 +425,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', '
  init
 (1 row)
 
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
+ERROR:  cannot enable failover for a temporary replication slot
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
  ?column? 
 ----------
index 312b5145937440a21ef0e0e01d125abb59dad3ff..94db936aee2537c817138498f31ac94ea7dc3ece 100644 (file)
@@ -29,6 +29,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 INSERT INTO lr_test VALUES('lr_superuser_init');
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_sync_replication_slots();
 RESET ROLE;
 
 -- replication users can drop superuser created slots
index 45aeae7fd5a5add7fa51dce13f4ea8e14684f2ee..580e3ae3befad703c219310c113a4521644c2f19 100644 (file)
@@ -181,6 +181,7 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp');
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
+SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
 
 SELECT slot_name, slot_type, failover FROM pg_replication_slots;
index 61038472c5a57fba3fd763e58f862564bc767316..037a3b8a64c8b3de324481793649b7327d4752ae 100644 (file)
@@ -4612,8 +4612,13 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
           <varname>primary_conninfo</varname> string, or in a separate
           <filename>~/.pgpass</filename> file on the standby server (use
           <literal>replication</literal> as the database name).
-          Do not specify a database name in the
-          <varname>primary_conninfo</varname> string.
+         </para>
+         <para>
+          For replication slot synchronization (see
+          <xref linkend="logicaldecoding-replication-slots-synchronization"/>),
+          it is also necessary to specify a valid <literal>dbname</literal>
+          in the <varname>primary_conninfo</varname> string. This will only be
+          used for slot synchronization. It is ignored for streaming.
          </para>
          <para>
           This parameter can only be set in the <filename>postgresql.conf</filename>
index 11d537b341c9b1320235db06d4745e2215199f7a..8f147a2417fab0f558331244df4f526785e89b98 100644 (file)
@@ -28075,7 +28075,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
       </row>
 
       <row>
-       <entry role="func_table_entry"><para role="func_signature">
+       <entry id="pg-create-logical-replication-slot" role="func_table_entry"><para role="func_signature">
         <indexterm>
          <primary>pg_create_logical_replication_slot</primary>
         </indexterm>
@@ -28444,6 +28444,39 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         record is flushed along with its transaction.
        </para></entry>
       </row>
+
+      <row>
+       <entry id="pg-sync-replication-slots" role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_sync_replication_slots</primary>
+        </indexterm>
+        <function>pg_sync_replication_slots</function> ()
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        Synchronize the logical failover replication slots from the primary
+        server to the standby server. This function can only be executed on the
+        standby server. Temporary synced slots, if any, cannot be used for
+        logical decoding and must be dropped after promotion. See
+        <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
+       </para>
+
+       <caution>
+        <para>
+          If, after executing the function,
+          <link linkend="guc-hot-standby-feedback">
+          <varname>hot_standby_feedback</varname></link> is disabled on
+          the standby or the physical slot configured in
+          <link linkend="guc-primary-slot-name">
+          <varname>primary_slot_name</varname></link> is
+          removed, then it is possible that the necessary rows of the
+          synchronized slot will be removed by the VACUUM process on the primary
+          server, resulting in the synchronized slot becoming invalidated.
+        </para>
+       </caution>
+      </entry>
+      </row>
+
      </tbody>
     </tgroup>
    </table>
index cd152d4ced987c967815d8cbd4ca65e933c290b3..eceaaaa2735e66186f506ea4e38354b5c931a4c2 100644 (file)
@@ -358,6 +358,62 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
       So if a slot is no longer required it should be dropped.
      </para>
     </caution>
+
+   </sect2>
+
+   <sect2 id="logicaldecoding-replication-slots-synchronization">
+    <title>Replication Slot Synchronization</title>
+    <para>
+     The logical replication slots on the primary can be synchronized to
+     the hot standby by using the <literal>failover</literal> parameter of
+     <link linkend="pg-create-logical-replication-slot">
+     <function>pg_create_logical_replication_slot</function></link>, or by
+     using the <link linkend="sql-createsubscription-params-with-failover">
+     <literal>failover</literal></link> option of
+     <command>CREATE SUBSCRIPTION</command> during slot creation, and then calling
+     <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby. For the synchronization to work, it is mandatory to
+     have a physical replication slot between the primary and the standby aka
+     <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
+     should be configured on the standby, and
+     <link linkend="guc-hot-standby-feedback"><varname>hot_standby_feedback</varname></link>
+     must be enabled on the standby. It is also necessary to specify a valid
+     <literal>dbname</literal> in the
+     <link linkend="guc-primary-conninfo"><varname>primary_conninfo</varname></link>.
+    </para>
+
+    <para>
+     The ability to resume logical replication after failover depends upon the
+     <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
+     value for the synchronized slots on the standby at the time of failover.
+     Only persistent slots that have attained synced state as true on the standby
+     before failover can be used for logical replication after failover.
+     Temporary synced slots cannot be used for logical decoding, therefore
+     logical replication for those slots cannot be resumed. For example, if the
+     synchronized slot could not become persistent on the standby due to a
+     disabled subscription, then the subscription cannot be resumed after
+     failover even when it is enabled.
+    </para>
+
+    <para>
+     To resume logical replication after failover from the synced logical
+     slots, the subscription's 'conninfo' must be altered to point to the
+     new primary server. This is done using
+     <link linkend="sql-altersubscription-params-connection"><command>ALTER SUBSCRIPTION ... CONNECTION</command></link>.
+     It is recommended that subscriptions are first disabled before promoting
+     the standby and are re-enabled after altering the connection string.
+    </para>
+    <caution>
+     <para>
+      There is a chance that the old primary is up again during the promotion
+      and if subscriptions are not disabled, the logical subscribers may
+      continue to receive data from the old primary server even after promotion
+      until the connection string is altered. This might result in data
+      inconsistency issues, preventing the logical subscribers from being
+      able to continue replication from the new primary server.
+     </para>
+    </caution>
    </sect2>
 
    <sect2 id="logicaldecoding-explanation-output-plugins">
index 05d6cc42da34924b4469380b646e1fc2f6eadab1..a5cb19357f5c7311b46137fa7afe996a68835f14 100644 (file)
@@ -2062,7 +2062,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
         <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
         <listitem>
          <para>
-          If true, the slot is enabled to be synced to the standbys.
+          If true, the slot is enabled to be synced to the standbys
+          so that logical replication can be resumed after failover.
           The default is false.
          </para>
         </listitem>
@@ -2162,7 +2163,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
         <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
         <listitem>
          <para>
-          If true, the slot is enabled to be synced to the standbys.
+          If true, the slot is enabled to be synced to the standbys
+          so that logical replication can be resumed after failover.
          </para>
         </listitem>
        </varlistentry>
index dd468b31ea775d37a2ea8a6db0a6ad6b5a8f78d4..be90edd0e2022c703a2738297780503c629453fe 100644 (file)
@@ -2561,10 +2561,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        <structfield>failover</structfield> <type>bool</type>
       </para>
       <para>
-       True if this is a logical slot enabled to be synced to the standbys.
-       Always false for physical slots.
+       True if this is a logical slot enabled to be synced to the standbys
+       so that logical replication can be resumed from the new primary
+       after failover. Always false for physical slots.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>synced</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if this is a logical slot that was synced from a primary server.
+       On a hot standby, the slots with the synced column marked as true can
+       neither be used for logical decoding nor dropped manually. The value
+       of this column has no meaning on the primary server; the column value on
+       the primary is default false for all slots but may (if leftover from a
+       promoted standby) also be true.
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
index 6791bff9dd2b6a5ccd9251399f20e5d2d3a2e4cc..04227a72d100d873495687d918854ab30cb98c27 100644 (file)
@@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS
             L.safe_wal_size,
             L.two_phase,
             L.conflict_reason,
-            L.failover
+            L.failover,
+            L.synced
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
index 2dc25e37bb89a5121cb843247f515e292a36865a..ba03eeff1c6ee65a80702f87fe7509a5b0b8162d 100644 (file)
@@ -25,6 +25,7 @@ OBJS = \
    proto.o \
    relation.o \
    reorderbuffer.o \
+   slotsync.o \
    snapbuild.o \
    tablesync.o \
    worker.o
index ca09c683f119255ec8b945a26625c7223d9d23ac..a53815f2ed5ab10579e49dcab485b3c852d38f30 100644 (file)
@@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                 errmsg("replication slot \"%s\" was not created in this database",
                        NameStr(slot->data.name))));
 
+   /*
+    * Do not allow consumption of a "synchronized" slot until the standby
+    * gets promoted.
+    */
+   if (RecoveryInProgress() && slot->data.synced)
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("cannot use replication slot \"%s\" for logical decoding",
+                      NameStr(slot->data.name)),
+               errdetail("This slot is being synchronized from the primary server."),
+               errhint("Specify another replication slot."));
+
    /*
     * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
     * "cannot get changes" wording in this errmsg because that'd be
index 1050eb2c09b012e9fa0b28a604f09c2834f1660e..3dec36a6de5f5ebcdb76567f2806c178b315666d 100644 (file)
@@ -11,6 +11,7 @@ backend_sources += files(
   'proto.c',
   'relation.c',
   'reorderbuffer.c',
+  'slotsync.c',
   'snapbuild.c',
   'tablesync.c',
   'worker.c',
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
new file mode 100644 (file)
index 0000000..0aa1bf1
--- /dev/null
@@ -0,0 +1,906 @@
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ *    Functionality for synchronizing slots to a standby server from the
+ *         primary server.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *   src/backend/replication/logical/slotsync.c
+ *
+ * This file contains the code for slot synchronization on a physical standby
+ * to fetch logical failover slots information from the primary server, create
+ * the slots on the standby and synchronize them. This is done by a call to SQL
+ * function pg_sync_replication_slots.
+ *
+ * If on physical standby, the WAL corresponding to the remote's restart_lsn
+ * is not available or the remote's catalog_xmin precedes the oldest xid for which
+ * it is guaranteed that rows wouldn't have been removed then we cannot create
+ * the local standby slot because that would mean moving the local slot
+ * backward and decoding won't be possible via such a slot. In this case, the
+ * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
+ * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
+ * which we can call pg_sync_replication_slots() periodically to perform
+ * syncs.
+ *
+ * Any standby synchronized slots will be dropped if they no longer need
+ * to be synchronized. See comment atop drop_local_obsolete_slots() for more
+ * details.
+ *---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_database.h"
+#include "commands/dbcommands.h"
+#include "replication/logical.h"
+#include "replication/slotsync.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+
+/* Struct for sharing information to control slot synchronization. */
+typedef struct SlotSyncCtxStruct
+{
+   /* prevents concurrent slot syncs to avoid slot overwrites */
+   bool        syncing;
+   slock_t     mutex;
+} SlotSyncCtxStruct;
+
+SlotSyncCtxStruct *SlotSyncCtx = NULL;
+
+/*
+ * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
+ * in SlotSyncCtxStruct, this flag is true only if the current process is
+ * performing slot synchronization.
+ */
+static bool syncing_slots = false;
+
+/*
+ * Structure to hold information fetched from the primary server about a logical
+ * replication slot.
+ */
+typedef struct RemoteSlot
+{
+   char       *name;
+   char       *plugin;
+   char       *database;
+   bool        two_phase;
+   bool        failover;
+   XLogRecPtr  restart_lsn;
+   XLogRecPtr  confirmed_lsn;
+   TransactionId catalog_xmin;
+
+   /* RS_INVAL_NONE if valid, or the reason of invalidation */
+   ReplicationSlotInvalidationCause invalidated;
+} RemoteSlot;
+
+/*
+ * If necessary, update the local synced slot's metadata based on the data
+ * from the remote slot.
+ *
+ * If no update was needed (the data of the remote slot is the same as the
+ * local slot) return false, otherwise true.
+ */
+static bool
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+{
+   ReplicationSlot *slot = MyReplicationSlot;
+   bool        xmin_changed;
+   bool        restart_lsn_changed;
+   NameData    plugin_name;
+
+   Assert(slot->data.invalidated == RS_INVAL_NONE);
+
+   xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
+   restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+
+   if (!xmin_changed &&
+       !restart_lsn_changed &&
+       remote_dbid == slot->data.database &&
+       remote_slot->two_phase == slot->data.two_phase &&
+       remote_slot->failover == slot->data.failover &&
+       remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
+       strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
+       return false;
+
+   /* Avoid expensive operations while holding a spinlock. */
+   namestrcpy(&plugin_name, remote_slot->plugin);
+
+   SpinLockAcquire(&slot->mutex);
+   slot->data.plugin = plugin_name;
+   slot->data.database = remote_dbid;
+   slot->data.two_phase = remote_slot->two_phase;
+   slot->data.failover = remote_slot->failover;
+   slot->data.restart_lsn = remote_slot->restart_lsn;
+   slot->data.confirmed_flush = remote_slot->confirmed_lsn;
+   slot->data.catalog_xmin = remote_slot->catalog_xmin;
+   slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+   SpinLockRelease(&slot->mutex);
+
+   if (xmin_changed)
+       ReplicationSlotsComputeRequiredXmin(false);
+
+   if (restart_lsn_changed)
+       ReplicationSlotsComputeRequiredLSN();
+
+   return true;
+}
+
+/*
+ * Get the list of local logical slots that are synchronized from the
+ * primary server.
+ */
+static List *
+get_local_synced_slots(void)
+{
+   List       *local_slots = NIL;
+
+   LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+   for (int i = 0; i < max_replication_slots; i++)
+   {
+       ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+       /* Check if it is a synchronized slot */
+       if (s->in_use && s->data.synced)
+       {
+           Assert(SlotIsLogical(s));
+           local_slots = lappend(local_slots, s);
+       }
+   }
+
+   LWLockRelease(ReplicationSlotControlLock);
+
+   return local_slots;
+}
+
+/*
+ * Helper function to check if local_slot is required to be retained.
+ *
+ * Return false either if local_slot does not exist in the remote_slots list
+ * or is invalidated while the corresponding remote slot is still valid,
+ * otherwise true.
+ */
+static bool
+local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
+{
+   bool        remote_exists = false;
+   bool        locally_invalidated = false;
+
+   foreach_ptr(RemoteSlot, remote_slot, remote_slots)
+   {
+       if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
+       {
+           remote_exists = true;
+
+           /*
+            * If remote slot is not invalidated but local slot is marked as
+            * invalidated, then set locally_invalidated flag.
+            */
+           SpinLockAcquire(&local_slot->mutex);
+           locally_invalidated =
+               (remote_slot->invalidated == RS_INVAL_NONE) &&
+               (local_slot->data.invalidated != RS_INVAL_NONE);
+           SpinLockRelease(&local_slot->mutex);
+
+           break;
+       }
+   }
+
+   return (remote_exists && !locally_invalidated);
+}
+
+/*
+ * Drop local obsolete slots.
+ *
+ * Drop the local slots that no longer need to be synced i.e. these either do
+ * not exist on the primary or are no longer enabled for failover.
+ *
+ * Additionally, drop any slots that are valid on the primary but got
+ * invalidated on the standby. This situation may occur due to the following
+ * reasons:
+ * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
+ *   records from the restart_lsn of the slot.
+ * - 'primary_slot_name' is temporarily reset to null and the physical slot is
+ *   removed.
+ * These dropped slots will get recreated in next sync-cycle and it is okay to
+ * drop and recreate such slots as long as these are not consumable on the
+ * standby (which is the case currently).
+ *
+ * Note: Change of 'wal_level' on the primary server to a level lower than
+ * logical may also result in slot invalidation and removal on the standby.
+ * This is because such 'wal_level' change is only possible if the logical
+ * slots are removed on the primary server, so it's expected to see the
+ * slots being invalidated and removed on the standby too (and re-created
+ * if they are re-created on the primary server).
+ */
+static void
+drop_local_obsolete_slots(List *remote_slot_list)
+{
+   List       *local_slots = get_local_synced_slots();
+
+   foreach_ptr(ReplicationSlot, local_slot, local_slots)
+   {
+       /* Drop the local slot if it is not required to be retained. */
+       if (!local_sync_slot_required(local_slot, remote_slot_list))
+       {
+           bool        synced_slot;
+
+           /*
+            * Use shared lock to prevent a conflict with
+            * ReplicationSlotsDropDBSlots(), trying to drop the same slot
+            * during a drop-database operation.
+            */
+           LockSharedObject(DatabaseRelationId, local_slot->data.database,
+                            0, AccessShareLock);
+
+           /*
+            * In the small window between getting the slot to drop and
+            * locking the database, there is a possibility of a parallel
+            * database drop by the startup process and the creation of a new
+            * slot by the user. This new user-created slot may end up using
+            * the same shared memory as that of 'local_slot'. Thus check if
+            * local_slot is still the synced one before performing actual
+            * drop.
+            */
+           SpinLockAcquire(&local_slot->mutex);
+           synced_slot = local_slot->in_use && local_slot->data.synced;
+           SpinLockRelease(&local_slot->mutex);
+
+           if (synced_slot)
+           {
+               ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+               ReplicationSlotDropAcquired();
+           }
+
+           UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
+                              0, AccessShareLock);
+
+           ereport(LOG,
+                   errmsg("dropped replication slot \"%s\" of dbid %d",
+                          NameStr(local_slot->data.name),
+                          local_slot->data.database));
+       }
+   }
+}
+
+/*
+ * Reserve WAL for the currently active local slot using the specified WAL
+ * location (restart_lsn).
+ *
+ * If the given WAL location has been removed, reserve WAL using the oldest
+ * existing WAL segment.
+ */
+static void
+reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
+{
+   XLogSegNo   oldest_segno;
+   XLogSegNo   segno;
+   ReplicationSlot *slot = MyReplicationSlot;
+
+   Assert(slot != NULL);
+   Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn));
+
+   while (true)
+   {
+       SpinLockAcquire(&slot->mutex);
+       slot->data.restart_lsn = restart_lsn;
+       SpinLockRelease(&slot->mutex);
+
+       /* Prevent WAL removal as fast as possible */
+       ReplicationSlotsComputeRequiredLSN();
+
+       XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+
+       /*
+        * Find the oldest existing WAL segment file.
+        *
+        * Normally, we can determine it by using the last removed segment
+        * number. However, if no WAL segment files have been removed by a
+        * checkpoint since startup, we need to search for the oldest segment
+        * file from the current timeline existing in XLOGDIR.
+        *
+        * XXX: Currently, we are searching for the oldest segment in the
+        * current timeline as there is less chance of the slot's restart_lsn
+        * from being some prior timeline, and even if it happens, in the
+        * worst case, we will wait to sync till the slot's restart_lsn moved
+        * to the current timeline.
+        */
+       oldest_segno = XLogGetLastRemovedSegno() + 1;
+
+       if (oldest_segno == 1)
+       {
+           TimeLineID  cur_timeline;
+
+           GetWalRcvFlushRecPtr(NULL, &cur_timeline);
+           oldest_segno = XLogGetOldestSegno(cur_timeline);
+       }
+
+       /*
+        * If all required WAL is still there, great, otherwise retry. The
+        * slot should prevent further removal of WAL, unless there's a
+        * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+        * the new restart_lsn above, so normally we should never need to loop
+        * more than twice.
+        */
+       if (segno >= oldest_segno)
+           break;
+
+       /* Retry using the location of the oldest wal segment */
+       XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
+   }
+}
+
+/*
+ * If the remote restart_lsn and catalog_xmin have caught up with the
+ * local ones, then update the LSNs and persist the local synced slot for
+ * future synchronization; otherwise, do nothing.
+ */
+static void
+update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+{
+   ReplicationSlot *slot = MyReplicationSlot;
+
+   /*
+    * Check if the primary server has caught up. Refer to the comment atop
+    * the file for details on this check.
+    */
+   if (remote_slot->restart_lsn < slot->data.restart_lsn ||
+       TransactionIdPrecedes(remote_slot->catalog_xmin,
+                             slot->data.catalog_xmin))
+   {
+       /*
+        * The remote slot didn't catch up to locally reserved position.
+        *
+        * We do not drop the slot because the restart_lsn can be ahead of the
+        * current location when recreating the slot in the next cycle. It may
+        * take more time to create such a slot. Therefore, we keep this slot
+        * and attempt the synchronization in the next cycle.
+        */
+       return;
+   }
+
+   /* First time slot update, the function must return true */
+   if (!update_local_synced_slot(remote_slot, remote_dbid))
+       elog(ERROR, "failed to update slot");
+
+   ReplicationSlotPersist();
+
+   ereport(LOG,
+           errmsg("newly created slot \"%s\" is sync-ready now",
+                  remote_slot->name));
+}
+
+/*
+ * Synchronize a single slot to the given position.
+ *
+ * This creates a new slot if there is no existing one and updates the
+ * metadata of the slot as per the data received from the primary server.
+ *
+ * The slot is created as a temporary slot and stays in the same state until the
+ * the remote_slot catches up with locally reserved position and local slot is
+ * updated. The slot is then persisted and is considered as sync-ready for
+ * periodic syncs.
+ */
+static void
+synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+{
+   ReplicationSlot *slot;
+   XLogRecPtr  latestFlushPtr;
+
+   /*
+    * Make sure that concerned WAL is received and flushed before syncing
+    * slot to target lsn received from the primary server.
+    */
+   latestFlushPtr = GetStandbyFlushRecPtr(NULL);
+   if (remote_slot->confirmed_lsn > latestFlushPtr)
+       elog(ERROR,
+            "skipping slot synchronization as the received slot sync"
+            " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+            remote_slot->name,
+            LSN_FORMAT_ARGS(latestFlushPtr));
+
+   /* Search for the named slot */
+   if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
+   {
+       bool        synced;
+
+       SpinLockAcquire(&slot->mutex);
+       synced = slot->data.synced;
+       SpinLockRelease(&slot->mutex);
+
+       /* User-created slot with the same name exists, raise ERROR. */
+       if (!synced)
+           ereport(ERROR,
+                   errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                   errmsg("exiting from slot synchronization because same"
+                          " name slot \"%s\" already exists on the standby",
+                          remote_slot->name));
+
+       /*
+        * The slot has been synchronized before.
+        *
+        * It is important to acquire the slot here before checking
+        * invalidation. If we don't acquire the slot first, there could be a
+        * race condition that the local slot could be invalidated just after
+        * checking the 'invalidated' flag here and we could end up
+        * overwriting 'invalidated' flag to remote_slot's value. See
+        * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
+        * if the slot is not acquired by other processes.
+        */
+       ReplicationSlotAcquire(remote_slot->name, true);
+
+       Assert(slot == MyReplicationSlot);
+
+       /*
+        * Copy the invalidation cause from remote only if local slot is not
+        * invalidated locally, we don't want to overwrite existing one.
+        */
+       if (slot->data.invalidated == RS_INVAL_NONE &&
+           remote_slot->invalidated != RS_INVAL_NONE)
+       {
+           SpinLockAcquire(&slot->mutex);
+           slot->data.invalidated = remote_slot->invalidated;
+           SpinLockRelease(&slot->mutex);
+
+           /* Make sure the invalidated state persists across server restart */
+           ReplicationSlotMarkDirty();
+           ReplicationSlotSave();
+       }
+
+       /* Skip the sync of an invalidated slot */
+       if (slot->data.invalidated != RS_INVAL_NONE)
+       {
+           ReplicationSlotRelease();
+           return;
+       }
+
+       /* Slot not ready yet, let's attempt to make it sync-ready now. */
+       if (slot->data.persistency == RS_TEMPORARY)
+       {
+           update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+       }
+
+       /* Slot ready for sync, so sync it. */
+       else
+       {
+           /*
+            * Sanity check: As long as the invalidations are handled
+            * appropriately as above, this should never happen.
+            */
+           if (remote_slot->restart_lsn < slot->data.restart_lsn)
+               elog(ERROR,
+                    "cannot synchronize local slot \"%s\" LSN(%X/%X)"
+                    " to remote slot's LSN(%X/%X) as synchronization"
+                    " would move it backwards", remote_slot->name,
+                    LSN_FORMAT_ARGS(slot->data.restart_lsn),
+                    LSN_FORMAT_ARGS(remote_slot->restart_lsn));
+
+           /* Make sure the slot changes persist across server restart */
+           if (update_local_synced_slot(remote_slot, remote_dbid))
+           {
+               ReplicationSlotMarkDirty();
+               ReplicationSlotSave();
+           }
+       }
+   }
+   /* Otherwise create the slot first. */
+   else
+   {
+       NameData    plugin_name;
+       TransactionId xmin_horizon = InvalidTransactionId;
+
+       /* Skip creating the local slot if remote_slot is invalidated already */
+       if (remote_slot->invalidated != RS_INVAL_NONE)
+           return;
+
+       /*
+        * We create temporary slots instead of ephemeral slots here because
+        * we want the slots to survive after releasing them. This is done to
+        * avoid dropping and re-creating the slots in each synchronization
+        * cycle if the restart_lsn or catalog_xmin of the remote slot has not
+        * caught up.
+        */
+       ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
+                             remote_slot->two_phase,
+                             remote_slot->failover,
+                             true);
+
+       /* For shorter lines. */
+       slot = MyReplicationSlot;
+
+       /* Avoid expensive operations while holding a spinlock. */
+       namestrcpy(&plugin_name, remote_slot->plugin);
+
+       SpinLockAcquire(&slot->mutex);
+       slot->data.database = remote_dbid;
+       slot->data.plugin = plugin_name;
+       SpinLockRelease(&slot->mutex);
+
+       reserve_wal_for_local_slot(remote_slot->restart_lsn);
+
+       LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+       xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+       SpinLockAcquire(&slot->mutex);
+       slot->effective_catalog_xmin = xmin_horizon;
+       slot->data.catalog_xmin = xmin_horizon;
+       SpinLockRelease(&slot->mutex);
+       ReplicationSlotsComputeRequiredXmin(true);
+       LWLockRelease(ProcArrayLock);
+
+       update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+   }
+
+   ReplicationSlotRelease();
+}
+
+/*
+ * Synchronize slots.
+ *
+ * Gets the failover logical slots info from the primary server and updates
+ * the slots locally. Creates the slots if not present on the standby.
+ */
+static void
+synchronize_slots(WalReceiverConn *wrconn)
+{
+#define SLOTSYNC_COLUMN_COUNT 9
+   Oid         slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
+   LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+
+   WalRcvExecResult *res;
+   TupleTableSlot *tupslot;
+   StringInfoData s;
+   List       *remote_slot_list = NIL;
+
+   SpinLockAcquire(&SlotSyncCtx->mutex);
+   if (SlotSyncCtx->syncing)
+   {
+       SpinLockRelease(&SlotSyncCtx->mutex);
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("cannot synchronize replication slots concurrently"));
+   }
+
+   SlotSyncCtx->syncing = true;
+   SpinLockRelease(&SlotSyncCtx->mutex);
+
+   syncing_slots = true;
+
+   initStringInfo(&s);
+
+   /* Construct query to fetch slots with failover enabled. */
+   appendStringInfo(&s,
+                    "SELECT slot_name, plugin, confirmed_flush_lsn,"
+                    " restart_lsn, catalog_xmin, two_phase, failover,"
+                    " database, conflict_reason"
+                    " FROM pg_catalog.pg_replication_slots"
+                    " WHERE failover and NOT temporary");
+
+   /* Execute the query */
+   res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow);
+   pfree(s.data);
+
+   if (res->status != WALRCV_OK_TUPLES)
+       ereport(ERROR,
+               errmsg("could not fetch failover logical slots info from the primary server: %s",
+                      res->err));
+
+   /* Construct the remote_slot tuple and synchronize each slot locally */
+   tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+   while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+   {
+       bool        isnull;
+       RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
+       Datum       d;
+       int         col = 0;
+
+       remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
+                                                            &isnull));
+       Assert(!isnull);
+
+       remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
+                                                              &isnull));
+       Assert(!isnull);
+
+       /*
+        * It is possible to get null values for LSN and Xmin if slot is
+        * invalidated on the primary server, so handle accordingly.
+        */
+       d = slot_getattr(tupslot, ++col, &isnull);
+       remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
+           DatumGetLSN(d);
+
+       d = slot_getattr(tupslot, ++col, &isnull);
+       remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+       d = slot_getattr(tupslot, ++col, &isnull);
+       remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
+           DatumGetTransactionId(d);
+
+       remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
+                                                          &isnull));
+       Assert(!isnull);
+
+       remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
+                                                         &isnull));
+       Assert(!isnull);
+
+       remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
+                                                                ++col, &isnull));
+       Assert(!isnull);
+
+       d = slot_getattr(tupslot, ++col, &isnull);
+       remote_slot->invalidated = isnull ? RS_INVAL_NONE :
+           GetSlotInvalidationCause(TextDatumGetCString(d));
+
+       /* Sanity check */
+       Assert(col == SLOTSYNC_COLUMN_COUNT);
+
+       /*
+        * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
+        * slot is valid, that means we have fetched the remote_slot in its
+        * RS_EPHEMERAL state. In such a case, don't sync it; we can always
+        * sync it in the next sync cycle when the remote_slot is persisted
+        * and has valid lsn(s) and xmin values.
+        *
+        * XXX: In future, if we plan to expose 'slot->data.persistency' in
+        * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
+        * slots in the first place.
+        */
+       if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
+            XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
+            !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
+           remote_slot->invalidated == RS_INVAL_NONE)
+           pfree(remote_slot);
+       else
+           /* Create list of remote slots */
+           remote_slot_list = lappend(remote_slot_list, remote_slot);
+
+       ExecClearTuple(tupslot);
+   }
+
+   /* Drop local slots that no longer need to be synced. */
+   drop_local_obsolete_slots(remote_slot_list);
+
+   /* Now sync the slots locally */
+   foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
+   {
+       Oid         remote_dbid = get_database_oid(remote_slot->database, false);
+
+       /*
+        * Use shared lock to prevent a conflict with
+        * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
+        * a drop-database operation.
+        */
+       LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
+
+       synchronize_one_slot(remote_slot, remote_dbid);
+
+       UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
+   }
+
+   /* We are done, free remote_slot_list elements */
+   list_free_deep(remote_slot_list);
+
+   walrcv_clear_result(res);
+
+   SpinLockAcquire(&SlotSyncCtx->mutex);
+   SlotSyncCtx->syncing = false;
+   SpinLockRelease(&SlotSyncCtx->mutex);
+
+   syncing_slots = false;
+}
+
+/*
+ * Checks the remote server info.
+ *
+ * We ensure that the 'primary_slot_name' exists on the remote server and the
+ * remote server is not a standby node.
+ */
+static void
+validate_remote_info(WalReceiverConn *wrconn)
+{
+#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
+   WalRcvExecResult *res;
+   Oid         slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
+   StringInfoData cmd;
+   bool        isnull;
+   TupleTableSlot *tupslot;
+   bool        remote_in_recovery;
+   bool        primary_slot_valid;
+
+   initStringInfo(&cmd);
+   appendStringInfo(&cmd,
+                    "SELECT pg_is_in_recovery(), count(*) = 1"
+                    " FROM pg_catalog.pg_replication_slots"
+                    " WHERE slot_type='physical' AND slot_name=%s",
+                    quote_literal_cstr(PrimarySlotName));
+
+   res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
+   pfree(cmd.data);
+
+   if (res->status != WALRCV_OK_TUPLES)
+       ereport(ERROR,
+               errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
+                      PrimarySlotName, res->err),
+               errhint("Check if \"primary_slot_name\" is configured correctly."));
+
+   tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+   if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+       elog(ERROR,
+            "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
+
+   remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
+   Assert(!isnull);
+
+   if (remote_in_recovery)
+       ereport(ERROR,
+               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+               errmsg("cannot synchronize replication slots from a standby server"));
+
+   primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
+   Assert(!isnull);
+
+   if (!primary_slot_valid)
+       ereport(ERROR,
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("bad configuration for slot synchronization"),
+       /* translator: second %s is a GUC variable name */
+               errdetail("The replication slot \"%s\" specified by \"%s\" does not exist on the primary server.",
+                         PrimarySlotName, "primary_slot_name"));
+
+   ExecClearTuple(tupslot);
+   walrcv_clear_result(res);
+}
+
+/*
+ * Check all necessary GUCs for slot synchronization are set
+ * appropriately, otherwise, raise ERROR.
+ */
+void
+ValidateSlotSyncParams(void)
+{
+   char       *dbname;
+
+   /*
+    * A physical replication slot(primary_slot_name) is required on the
+    * primary to ensure that the rows needed by the standby are not removed
+    * after restarting, so that the synchronized slot on the standby will not
+    * be invalidated.
+    */
+   if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
+       ereport(ERROR,
+       /* translator: %s is a GUC variable name */
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("bad configuration for slot synchronization"),
+               errhint("\"%s\" must be defined.", "primary_slot_name"));
+
+   /*
+    * hot_standby_feedback must be enabled to cooperate with the physical
+    * replication slot, which allows informing the primary about the xmin and
+    * catalog_xmin values on the standby.
+    */
+   if (!hot_standby_feedback)
+       ereport(ERROR,
+       /* translator: %s is a GUC variable name */
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("bad configuration for slot synchronization"),
+               errhint("\"%s\" must be enabled.", "hot_standby_feedback"));
+
+   /* Logical slot sync/creation requires wal_level >= logical. */
+   if (wal_level < WAL_LEVEL_LOGICAL)
+       ereport(ERROR,
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("bad configuration for slot synchronization"),
+               errhint("\"wal_level\" must be >= logical."));
+
+   /*
+    * The primary_conninfo is required to make connection to primary for
+    * getting slots information.
+    */
+   if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
+       ereport(ERROR,
+       /* translator: %s is a GUC variable name */
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("bad configuration for slot synchronization"),
+               errhint("\"%s\" must be defined.", "primary_conninfo"));
+
+   /*
+    * The slot synchronization needs a database connection for walrcv_exec to
+    * work.
+    */
+   dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
+   if (dbname == NULL)
+       ereport(ERROR,
+
+       /*
+        * translator: 'dbname' is a specific option; %s is a GUC variable
+        * name
+        */
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("bad configuration for slot synchronization"),
+               errhint("'dbname' must be specified in \"%s\".", "primary_conninfo"));
+}
+
+/*
+ * Is current process syncing replication slots ?
+ */
+bool
+IsSyncingReplicationSlots(void)
+{
+   return syncing_slots;
+}
+
+/*
+ * Amount of shared memory required for slot synchronization.
+ */
+Size
+SlotSyncShmemSize(void)
+{
+   return sizeof(SlotSyncCtxStruct);
+}
+
+/*
+ * Allocate and initialize the shared memory of slot synchronization.
+ */
+void
+SlotSyncShmemInit(void)
+{
+   bool        found;
+
+   SlotSyncCtx = (SlotSyncCtxStruct *)
+       ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found);
+
+   if (!found)
+   {
+       SlotSyncCtx->syncing = false;
+       SpinLockInit(&SlotSyncCtx->mutex);
+   }
+}
+
+/*
+ * Error cleanup callback for slot synchronization.
+ */
+static void
+slotsync_failure_callback(int code, Datum arg)
+{
+   WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+   if (syncing_slots)
+   {
+       /*
+        * If syncing_slots is true, it indicates that the process errored out
+        * without resetting the flag. So, we need to clean up shared memory
+        * and reset the flag here.
+        */
+       SpinLockAcquire(&SlotSyncCtx->mutex);
+       SlotSyncCtx->syncing = false;
+       SpinLockRelease(&SlotSyncCtx->mutex);
+
+       syncing_slots = false;
+   }
+
+   walrcv_disconnect(wrconn);
+}
+
+/*
+ * Synchronize the failover enabled replication slots using the specified
+ * primary server connection.
+ */
+void
+SyncReplicationSlots(WalReceiverConn *wrconn)
+{
+   PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+   {
+       validate_remote_info(wrconn);
+
+       synchronize_slots(wrconn);
+   }
+   PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
+}
index fd4e96c9d69e4ceeda6bba93d2ad0c5ad0abdc13..2180a380632d9fcc563f7b66cd511f8a99ca05ea 100644 (file)
@@ -46,6 +46,7 @@
 #include "common/string.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slotsync.h"
 #include "replication/slot.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -90,7 +91,7 @@ typedef struct ReplicationSlotOnDisk
    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC     0x1051CA1   /* format identifier */
-#define SLOT_VERSION   4       /* version for new files */
+#define SLOT_VERSION   5       /* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -103,7 +104,6 @@ int         max_replication_slots = 10; /* the maximum number of replication
                                         * slots */
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
-static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
 /* internal persistency functions */
@@ -250,11 +250,12 @@ ReplicationSlotValidateName(const char *name, int elevel)
  *     user will only get commit prepared.
  * failover: If enabled, allows the slot to be synced to standbys so
  *     that logical replication can be resumed after failover.
+ * synced: True if the slot is synchronized from the primary server.
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
                      ReplicationSlotPersistency persistency,
-                     bool two_phase, bool failover)
+                     bool two_phase, bool failover, bool synced)
 {
    ReplicationSlot *slot = NULL;
    int         i;
@@ -263,6 +264,34 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
    ReplicationSlotValidateName(name, ERROR);
 
+   if (failover)
+   {
+       /*
+        * Do not allow users to create the failover enabled slots on the
+        * standby as we do not support sync to the cascading standby.
+        *
+        * However, failover enabled slots can be created during slot
+        * synchronization because we need to retain the same values as the
+        * remote slot.
+        */
+       if (RecoveryInProgress() && !IsSyncingReplicationSlots())
+           ereport(ERROR,
+                   errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                   errmsg("cannot enable failover for a replication slot created on the standby"));
+
+       /*
+        * Do not allow users to create failover enabled temporary slots,
+        * because temporary slots will not be synced to the standby.
+        *
+        * However, failover enabled temporary slots can be created during
+        * slot synchronization. See the comments atop slotsync.c for details.
+        */
+       if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
+           ereport(ERROR,
+                   errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                   errmsg("cannot enable failover for a temporary replication slot"));
+   }
+
    /*
     * If some other backend ran this code concurrently with us, we'd likely
     * both allocate the same slot, and that would be bad.  We'd also be at
@@ -315,6 +344,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
    slot->data.two_phase = two_phase;
    slot->data.two_phase_at = InvalidXLogRecPtr;
    slot->data.failover = failover;
+   slot->data.synced = synced;
 
    /* and then data only present in shared memory */
    slot->just_dirtied = false;
@@ -677,6 +707,16 @@ ReplicationSlotDrop(const char *name, bool nowait)
 
    ReplicationSlotAcquire(name, nowait);
 
+   /*
+    * Do not allow users to drop the slots which are currently being synced
+    * from the primary to the standby.
+    */
+   if (RecoveryInProgress() && MyReplicationSlot->data.synced)
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("cannot drop replication slot \"%s\"", name),
+               errdetail("This slot is being synced from the primary server."));
+
    ReplicationSlotDropAcquired();
 }
 
@@ -696,6 +736,38 @@ ReplicationSlotAlter(const char *name, bool failover)
                errmsg("cannot use %s with a physical replication slot",
                       "ALTER_REPLICATION_SLOT"));
 
+   if (RecoveryInProgress())
+   {
+       /*
+        * Do not allow users to alter the slots which are currently being
+        * synced from the primary to the standby.
+        */
+       if (MyReplicationSlot->data.synced)
+           ereport(ERROR,
+                   errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                   errmsg("cannot alter replication slot \"%s\"", name),
+                   errdetail("This slot is being synced from the primary server."));
+
+       /*
+        * Do not allow users to enable failover on the standby as we do not
+        * support sync to the cascading standby.
+        */
+       if (failover)
+           ereport(ERROR,
+                   errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                   errmsg("cannot enable failover for a replication slot"
+                          " on the standby"));
+   }
+
+   /*
+    * Do not allow users to enable failover for temporary slots as we do not
+    * support syncing temporary slots to the standby.
+    */
+   if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
+       ereport(ERROR,
+               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+               errmsg("cannot enable failover for a temporary replication slot"));
+
    if (MyReplicationSlot->data.failover != failover)
    {
        SpinLockAcquire(&MyReplicationSlot->mutex);
@@ -712,7 +784,7 @@ ReplicationSlotAlter(const char *name, bool failover)
 /*
  * Permanently drop the currently acquired replication slot.
  */
-static void
+void
 ReplicationSlotDropAcquired(void)
 {
    ReplicationSlot *slot = MyReplicationSlot;
@@ -868,8 +940,8 @@ ReplicationSlotMarkDirty(void)
 }
 
 /*
- * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
- * guaranteeing it will be there after an eventual crash.
+ * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
+ * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
  */
 void
 ReplicationSlotPersist(void)
@@ -2189,3 +2261,25 @@ RestoreSlotFromDisk(const char *name)
                (errmsg("too many replication slots active before shutdown"),
                 errhint("Increase max_replication_slots and try again.")));
 }
+
+/*
+ * Maps the pg_replication_slots.conflict_reason text value to
+ * ReplicationSlotInvalidationCause enum value
+ */
+ReplicationSlotInvalidationCause
+GetSlotInvalidationCause(char *conflict_reason)
+{
+   Assert(conflict_reason);
+
+   if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0)
+       return RS_INVAL_WAL_REMOVED;
+   else if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0)
+       return RS_INVAL_HORIZON;
+   else if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0)
+       return RS_INVAL_WAL_LEVEL;
+   else
+       Assert(0);
+
+   /* Keep compiler quiet */
+   return RS_INVAL_NONE;
+}
index eb685089b36e3e274ab5422547b81e01e67a6a6c..d2fa5e669a32f19989b0d987d3c7329851a1272e 100644 (file)
@@ -21,7 +21,9 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
+#include "replication/slotsync.h"
 #include "utils/builtins.h"
+#include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/pg_lsn.h"
 #include "utils/resowner.h"
@@ -43,7 +45,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
    /* acquire replication slot, this will check for conflicting names */
    ReplicationSlotCreate(name, false,
                          temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-                         false);
+                         false, false);
 
    if (immediately_reserve)
    {
@@ -136,7 +138,7 @@ create_logical_replication_slot(char *name, char *plugin,
     */
    ReplicationSlotCreate(name, true,
                          temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-                         failover);
+                         failover, false);
 
    /*
     * Create logical decoding context to find start point or, if we don't
@@ -237,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 16
+#define PG_GET_REPLICATION_SLOTS_COLS 17
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    XLogRecPtr  currlsn;
    int         slotno;
@@ -418,21 +420,23 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                    break;
 
                case RS_INVAL_WAL_REMOVED:
-                   values[i++] = CStringGetTextDatum("wal_removed");
+                   values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT);
                    break;
 
                case RS_INVAL_HORIZON:
-                   values[i++] = CStringGetTextDatum("rows_removed");
+                   values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT);
                    break;
 
                case RS_INVAL_WAL_LEVEL:
-                   values[i++] = CStringGetTextDatum("wal_level_insufficient");
+                   values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT);
                    break;
            }
        }
 
        values[i++] = BoolGetDatum(slot_contents.data.failover);
 
+       values[i++] = BoolGetDatum(slot_contents.data.synced);
+
        Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
        tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@@ -700,7 +704,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
    XLogRecPtr  src_restart_lsn;
    bool        src_islogical;
    bool        temporary;
-   bool        failover;
    char       *plugin;
    Datum       values[2];
    bool        nulls[2];
@@ -756,7 +759,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
    src_islogical = SlotIsLogical(&first_slot_contents);
    src_restart_lsn = first_slot_contents.data.restart_lsn;
    temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
-   failover = first_slot_contents.data.failover;
    plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
 
    /* Check type of replication slot */
@@ -791,12 +793,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         * We must not try to read WAL, since we haven't reserved it yet --
         * hence pass find_startpoint false.  confirmed_flush will be set
         * below, by copying from the source slot.
+        *
+        * To avoid potential issues with the slot synchronization where the
+        * restart_lsn of a replication slot can go backward, we set the
+        * failover option to false here.  This situation occurs when a slot
+        * on the primary server is dropped and immediately replaced with a
+        * new slot of the same name, created by copying from another existing
+        * slot.  However, the slot synchronization will only observe the
+        * restart_lsn of the same slot going backward.
         */
        create_logical_replication_slot(NameStr(*dst_name),
                                        plugin,
                                        temporary,
                                        false,
-                                       failover,
+                                       false,
                                        src_restart_lsn,
                                        false);
    }
@@ -943,3 +953,49 @@ pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
 {
    return copy_replication_slot(fcinfo, false);
 }
+
+/*
+ * Synchronize failover enabled replication slots to a standby server
+ * from the primary server.
+ */
+Datum
+pg_sync_replication_slots(PG_FUNCTION_ARGS)
+{
+   WalReceiverConn *wrconn;
+   char       *err;
+   StringInfoData app_name;
+
+   CheckSlotPermissions();
+
+   if (!RecoveryInProgress())
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("replication slots can only be synchronized to a standby server"));
+
+   /* Load the libpq-specific functions */
+   load_file("libpqwalreceiver", false);
+
+   ValidateSlotSyncParams();
+
+   initStringInfo(&app_name);
+   if (cluster_name[0])
+       appendStringInfo(&app_name, "%s_slotsync", cluster_name);
+   else
+       appendStringInfoString(&app_name, "slotsync");
+
+   /* Connect to the primary server. */
+   wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
+                           app_name.data, &err);
+   pfree(app_name.data);
+
+   if (!wrconn)
+       ereport(ERROR,
+               errcode(ERRCODE_CONNECTION_FAILURE),
+               errmsg("could not connect to the primary server: %s", err));
+
+   SyncReplicationSlots(wrconn);
+
+   walrcv_disconnect(wrconn);
+
+   PG_RETURN_VOID();
+}
index 146826d5db936598a3c91de18b4360d71e57c46b..4e54779a9ebb50ebd21317e7050d9d2b3bd08b55 100644 (file)
@@ -72,6 +72,7 @@
 #include "postmaster/interrupt.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/slotsync.h"
 #include "replication/slot.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -243,7 +244,6 @@ static void WalSndShutdown(void) pg_attribute_noreturn();
 static void XLogSendPhysical(void);
 static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
-static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
 static void IdentifySystem(void);
 static void UploadManifest(void);
 static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset,
@@ -1224,7 +1224,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
    {
        ReplicationSlotCreate(cmd->slotname, false,
                              cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-                             false, false);
+                             false, false, false);
 
        if (reserve_wal)
        {
@@ -1255,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
         */
        ReplicationSlotCreate(cmd->slotname, true,
                              cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-                             two_phase, failover);
+                             two_phase, failover, false);
 
        /*
         * Do options check early so that we can bail before calling the
@@ -3385,14 +3385,17 @@ WalSndDone(WalSndSendDataCallback send_data)
 }
 
 /*
- * Returns the latest point in WAL that has been safely flushed to disk, and
- * can be sent to the standby. This should only be called when in recovery,
- * ie. we're streaming to a cascaded standby.
+ * Returns the latest point in WAL that has been safely flushed to disk.
+ * This should only be called when in recovery.
+ *
+ * This is called either by cascading walsender to find WAL postion to be sent
+ * to a cascaded standby or by slot synchronization function to validate remote
+ * slot's lsn before syncing it locally.
  *
  * As a side-effect, *tli is updated to the TLI of the last
  * replayed WAL record.
  */
-static XLogRecPtr
+XLogRecPtr
 GetStandbyFlushRecPtr(TimeLineID *tli)
 {
    XLogRecPtr  replayPtr;
@@ -3401,6 +3404,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
    TimeLineID  receiveTLI;
    XLogRecPtr  result;
 
+   Assert(am_cascading_walsender || IsSyncingReplicationSlots());
+
    /*
     * We can safely send what's already been replayed. Also, if walreceiver
     * is streaming WAL from the same timeline, we can send anything that it
index 7084e18861b52ff17d238bb0b0e907ff36389859..7e7941d62595eb4e378995d860b61bdf57530363 100644 (file)
@@ -36,6 +36,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
+#include "replication/slotsync.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -153,6 +154,7 @@ CalculateShmemSize(int *num_semaphores)
    size = add_size(size, StatsShmemSize());
    size = add_size(size, WaitEventExtensionShmemSize());
    size = add_size(size, InjectionPointShmemSize());
+   size = add_size(size, SlotSyncShmemSize());
 #ifdef EXEC_BACKEND
    size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -347,6 +349,7 @@ CreateOrAttachShmemStructs(void)
    WalSummarizerShmemInit();
    PgArchShmemInit();
    ApplyLauncherShmemInit();
+   SlotSyncShmemInit();
 
    /*
     * Set up other modules that need some shared memory space
index 9fc8ac92905532125a3b5bf317219ef3d890242c..75e1fc8433dd1c88dc67a13f4471af6fbad201fe 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202401301
+#define CATALOG_VERSION_NO 202402141
 
 #endif
index 29af4ce65d5c77f64547262939800d03e91c712d..9c120fc2b7fa1e09b919de69f0dc1a7a18ef61e4 100644 (file)
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
   prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
   prosrc => 'pg_logical_emit_message_bytea' },
+{ oid => '9929', descr => 'sync replication slots from the primary to the standby',
+  proname => 'pg_sync_replication_slots', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => '',
+  prosrc => 'pg_sync_replication_slots' },
 
 # event triggers
 { oid => '3566', descr => 'list objects dropped by the current command',
index da4c7764921480a6203998b592f2aa2a04d6de1d..e706ca834c092567d8ee8e1da50fb717152d78ea 100644 (file)
@@ -52,6 +52,14 @@ typedef enum ReplicationSlotInvalidationCause
    RS_INVAL_WAL_LEVEL,
 } ReplicationSlotInvalidationCause;
 
+/*
+ * The possible values for 'conflict_reason' returned in
+ * pg_get_replication_slots.
+ */
+#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed"
+#define SLOT_INVAL_HORIZON_TEXT     "rows_removed"
+#define SLOT_INVAL_WAL_LEVEL_TEXT   "wal_level_insufficient"
+
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -112,6 +120,11 @@ typedef struct ReplicationSlotPersistentData
    /* plugin name */
    NameData    plugin;
 
+   /*
+    * Was this slot synchronized from the primary server?
+    */
+   char        synced;
+
    /*
     * Is this a failover slot (sync candidate for standbys)? Only relevant
     * for logical slots on the primary server.
@@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void);
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
                                  ReplicationSlotPersistency persistency,
-                                 bool two_phase, bool failover);
+                                 bool two_phase, bool failover,
+                                 bool synced);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
+extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, bool failover);
 
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
@@ -259,5 +274,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown);
 
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
+extern ReplicationSlotInvalidationCause
+           GetSlotInvalidationCause(char *conflict_reason);
 
 #endif                         /* SLOT_H */
diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h
new file mode 100644 (file)
index 0000000..e86d8a4
--- /dev/null
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ *
+ * slotsync.h
+ *   Exports for slot synchronization.
+ *
+ * Portions Copyright (c) 2016-2024, PostgreSQL Global Development Group
+ *
+ * src/include/replication/slotsync.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOTSYNC_H
+#define SLOTSYNC_H
+
+#include "replication/walreceiver.h"
+
+extern void ValidateSlotSyncParams(void);
+extern bool IsSyncingReplicationSlots(void);
+extern Size SlotSyncShmemSize(void);
+extern void SlotSyncShmemInit(void);
+extern void SyncReplicationSlots(WalReceiverConn *wrconn);
+
+#endif                         /* SLOTSYNC_H */
index 1b58d50b3b9e2ce4029ff74edec71ef3efb641dc..0c3996e926333564fae2f476eea5bf78d588b2d4 100644 (file)
@@ -12,6 +12,8 @@
 #ifndef _WALSENDER_H
 #define _WALSENDER_H
 
+#include "access/xlogdefs.h"
+
 /*
  * What to do with a snapshot in create replication slot command.
  */
@@ -37,6 +39,7 @@ extern void InitWalSender(void);
 extern bool exec_replication_command(const char *cmd_string);
 extern void WalSndErrorCleanup(void);
 extern void WalSndResourceCleanup(bool isCommit);
+extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
index bc58ff4cab2494d3fe3ec35fcda3cedf8af0adc6..c96515d178ba7c880627d615f9e9713d20408770 100644 (file)
@@ -97,4 +97,241 @@ my ($result, $stdout, $stderr) = $subscriber1->psql('postgres',
 ok( $stderr =~ /ERROR:  cannot set failover for enabled subscription/,
    "altering failover is not allowed for enabled subscription");
 
+##################################################
+# Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
+##################################################
+
+($result, $stdout, $stderr) =
+  $publisher->psql('postgres', "SELECT pg_sync_replication_slots();");
+ok( $stderr =~
+     /ERROR:  replication slots can only be synchronized to a standby server/,
+   "cannot sync slots on a non-standby server");
+
+##################################################
+# Test logical failover slots on the standby
+# Configure standby1 to replicate and synchronize logical slots configured
+# for failover on the primary
+#
+#              failover slot lsub1_slot ->| ----> subscriber1 (connected via logical replication)
+#              failover slot lsub2_slot   |       inactive
+# primary --->                            |
+#              physical slot sb1_slot --->| ----> standby1 (connected via streaming replication)
+#                                         |                 lsub1_slot, lsub2_slot (synced_slot)
+##################################################
+
+my $primary = $publisher;
+my $backup_name = 'backup';
+$primary->backup($backup_name);
+
+# Create a standby
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+   $primary, $backup_name,
+   has_streaming => 1,
+   has_restoring => 1);
+
+my $connstr_1 = $primary->connstr;
+$standby1->append_conf(
+   'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr_1 dbname=postgres'
+));
+
+$primary->psql('postgres',
+   q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
+);
+
+$primary->psql('postgres',
+   q{SELECT pg_create_physical_replication_slot('sb1_slot');});
+
+# Start the standby so that slot syncing can begin
+$standby1->start;
+
+$primary->wait_for_catchup('regress_mysub1');
+
+# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
+$subscriber1->safe_psql('postgres',
+   "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+   'postgres',
+   "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
+   1);
+
+# Wait for the standby to catch up so that the standby is not lagging behind
+# the subscriber.
+$primary->wait_for_replay_catchup($standby1);
+
+# Synchronize the primary server slots to the standby.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the logical failover slots are created on the standby and are
+# flagged as 'synced'
+is( $standby1->safe_psql(
+       'postgres',
+       q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced;}
+   ),
+   "t",
+   'logical slots have synced as true on standby');
+
+##################################################
+# Test that the synchronized slot will be dropped if the corresponding remote
+# slot on the primary server has been dropped.
+##################################################
+
+$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
+
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+is( $standby1->safe_psql(
+       'postgres',
+       q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
+   ),
+   "t",
+   'synchronized slot has been dropped');
+
+##################################################
+# Test that if the synchronized slot is invalidated while the remote slot is
+# still valid, the slot will be dropped and re-created on the standby by
+# executing pg_sync_replication_slots() again.
+##################################################
+
+# Configure the max_slot_wal_keep_size so that the synced slot can be
+# invalidated due to wal removal.
+$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = 64kB');
+$standby1->reload;
+
+# Generate some activity and switch WAL file on the primary
+$primary->advance_wal(1);
+$primary->psql('postgres', "CHECKPOINT");
+$primary->wait_for_replay_catchup($standby1);
+
+# Request a checkpoint on the standby to trigger the WAL file(s) removal
+$standby1->safe_psql('postgres', "CHECKPOINT");
+
+# Check if the synced slot is invalidated
+is( $standby1->safe_psql(
+       'postgres',
+       q{SELECT conflict_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+   ),
+   "t",
+   'synchronized slot has been invalidated');
+
+# Reset max_slot_wal_keep_size to avoid further wal removal
+$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
+$standby1->reload;
+
+# Enable the subscription to let it catch up to the latest wal position
+$subscriber1->safe_psql('postgres',
+   "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+
+$primary->wait_for_catchup('regress_mysub1');
+
+# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
+$subscriber1->safe_psql('postgres',
+   "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
+
+# Wait for the replication slot to become inactive on the publisher
+$primary->poll_query_until(
+   'postgres',
+   "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
+   1);
+
+# Wait for the standby to catch up so that the standby is not lagging behind
+# the subscriber.
+$primary->wait_for_replay_catchup($standby1);
+
+my $log_offset = -s $standby1->logfile;
+
+# Synchronize the primary server slots to the standby.
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Confirm that the invalidated slot has been dropped.
+$standby1->wait_for_log(qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/,
+   $log_offset);
+
+# Confirm that the logical slot has been re-created on the standby and is
+# flagged as 'synced'
+is( $standby1->safe_psql(
+       'postgres',
+       q{SELECT conflict_reason IS NULL AND synced FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+   ),
+   "t",
+   'logical slot is re-synced');
+
+##################################################
+# Test that a synchronized slot can not be decoded, altered or dropped by the
+# user
+##################################################
+
+# Attempting to perform logical decoding on a synced slot should result in an error
+($result, $stdout, $stderr) = $standby1->psql('postgres',
+   "select * from pg_logical_slot_get_changes('lsub1_slot', NULL, NULL);");
+ok( $stderr =~
+     /ERROR:  cannot use replication slot "lsub1_slot" for logical decoding/,
+   "logical decoding is not allowed on synced slot");
+
+# Attempting to alter a synced slot should result in an error
+($result, $stdout, $stderr) = $standby1->psql(
+   'postgres',
+   qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);],
+   replication => 'database');
+ok($stderr =~ /ERROR:  cannot alter replication slot "lsub1_slot"/,
+   "synced slot on standby cannot be altered");
+
+# Attempting to drop a synced slot should result in an error
+($result, $stdout, $stderr) = $standby1->psql('postgres',
+   "SELECT pg_drop_replication_slot('lsub1_slot');");
+ok($stderr =~ /ERROR:  cannot drop replication slot "lsub1_slot"/,
+   "synced slot on standby cannot be dropped");
+
+##################################################
+# Test that we cannot synchronize slots if dbname is not specified in the
+# primary_conninfo.
+##################################################
+
+$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1'");
+$standby1->reload;
+
+($result, $stdout, $stderr) =
+  $standby1->psql('postgres', "SELECT pg_sync_replication_slots();");
+ok( $stderr =~
+     /HINT:  'dbname' must be specified in "primary_conninfo"/,
+   "cannot sync slots if dbname is not specified in primary_conninfo");
+
+##################################################
+# Test that we cannot synchronize slots to a cascading standby server.
+##################################################
+
+# Create a cascading standby
+$backup_name = 'backup2';
+$standby1->backup($backup_name);
+
+my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+$cascading_standby->init_from_backup(
+   $standby1, $backup_name,
+   has_streaming => 1,
+   has_restoring => 1);
+
+my $cascading_connstr = $standby1->connstr;
+$cascading_standby->append_conf(
+   'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'cascading_sb_slot'
+primary_conninfo = '$cascading_connstr dbname=postgres'
+));
+
+$standby1->psql('postgres',
+   q{SELECT pg_create_physical_replication_slot('cascading_sb_slot');});
+
+$cascading_standby->start;
+
+($result, $stdout, $stderr) =
+  $cascading_standby->psql('postgres', "SELECT pg_sync_replication_slots();");
+ok( $stderr =~
+     /ERROR:  cannot synchronize replication slots from a standby server/,
+   "cannot sync slots to a cascading standby server");
+
 done_testing();
index abc944e8b825b81a286c008cf7054cb031f80b4c..b7488d760e5ea74ad8302fd955fd40721d64d214 100644 (file)
@@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.safe_wal_size,
     l.two_phase,
     l.conflict_reason,
-    l.failover
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover)
+    l.failover,
+    l.synced
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
index 91433d439b7d9913aebce8a3e0edf06f00afbc70..d808aad8b05bb9abb2fdb5d1f2c8d5b61cc81e96 100644 (file)
@@ -2325,6 +2325,7 @@ RelocationBufferInfo
 RelptrFreePageBtree
 RelptrFreePageManager
 RelptrFreePageSpanLeader
+RemoteSlot
 RenameStmt
 ReopenPtrType
 ReorderBuffer
@@ -2584,6 +2585,7 @@ SlabBlock
 SlabContext
 SlabSlot
 SlotNumber
+SlotSyncCtxStruct
 SlruCtl
 SlruCtlData
 SlruErrorCause