Raise an error while trying to acquire an invalid slot.
authorAmit Kapila <[email protected]>
Fri, 31 Jan 2025 04:57:35 +0000 (10:27 +0530)
committerAmit Kapila <[email protected]>
Fri, 31 Jan 2025 04:57:35 +0000 (10:27 +0530)
Once a replication slot is invalidated, it cannot be altered or used to
fetch changes. However, a process could still acquire an invalid slot and
fail later.

For example, if a process acquires a logical slot that was invalidated due
to wal_removed, it will eventually fail in CreateDecodingContext() when
attempting to access the removed WAL. Similarly, for physical replication
slots, even if the slot is invalidated and invalidation_reason is set to
wal_removed, the walsender does not currently check for invalidation when
starting physical replication. Instead, replication starts, and an error
is only reported later while trying to access WAL. Similarly, we prohibit
modifying slot properties for invalid slots but give the error for the
same after acquiring the slot.

This patch improves error handling by detecting invalid slots earlier at
the time of slot acquisition which is the first step. This also helped in
unifying different ERROR messages at different places and gave a
consistent message for invalid slots. This means that the message for
invalid slots will change to a generic message.

This will also be helpful for future patches where we are planning to
invalidate slots due to more reasons like idle_timeout because we don't
have to modify multiple places in such cases and avoid the chances of
missing out on a particular place.

Author: Nisha Moond <[email protected]>
Author: Bharath Rupireddy <[email protected]>
Reviewed-by: Vignesh C <[email protected]>
Reviewed-by: Peter Smith <[email protected]>
Reviewed-by: Hayato Kuroda <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Discussion: https://postgr.es/m/CABdArM6pBL5hPnSQ+5nEVMANcF4FCH7LQmgskXyiLY75TMnKpw@mail.gmail.com

src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/logical/slotsync.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/backend/utils/adt/pg_upgrade_support.c
src/include/replication/slot.h
src/test/recovery/t/019_replslot_limit.pl
src/test/recovery/t/035_standby_logical_decoding.pl

index 0b25efafe2b4146a781bea579f01430ce44ea74c..8ea846bfc3b69cbc60670cf3679515832d3d4ad7 100644 (file)
@@ -542,28 +542,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                errdetail("This replication slot is being synchronized from the primary server."),
                errhint("Specify another replication slot."));
 
-   /*
-    * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
-    * "cannot get changes" wording in this errmsg because that'd be
-    * confusingly ambiguous about no changes being available when called from
-    * pg_logical_slot_get_changes_guts().
-    */
-   if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
-       ereport(ERROR,
-               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                errmsg("can no longer get changes from replication slot \"%s\"",
-                       NameStr(MyReplicationSlot->data.name)),
-                errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
-   if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-       ereport(ERROR,
-               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                errmsg("can no longer get changes from replication slot \"%s\"",
-                       NameStr(MyReplicationSlot->data.name)),
-                errdetail("This slot has been invalidated because it was conflicting with recovery.")));
-
-   Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
-   Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
+   /* slot must be valid to allow decoding */
+   Assert(slot->data.invalidated == RS_INVAL_NONE);
+   Assert(slot->data.restart_lsn != InvalidXLogRecPtr);
 
    if (start_lsn == InvalidXLogRecPtr)
    {
index 0148ec3678856d4eade84da31753ef5cde69dd3e..ca53caac2f2f5cb0766774631c2666bc4d56a1ac 100644 (file)
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
    else
        end_of_wal = GetXLogReplayRecPtr(NULL);
 
-   ReplicationSlotAcquire(NameStr(*name), true);
+   ReplicationSlotAcquire(NameStr(*name), true, true);
 
    PG_TRY();
    {
index f6945af1d4377c8c9fbbb3aec0b4aa767f0e9319..be6f87f00b28ae5507dd432c54e2a2b53af0b0c8 100644 (file)
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
            if (synced_slot)
            {
-               ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+               ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
                ReplicationSlotDropAcquired();
            }
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
         * pre-check to ensure that at least one of the slot properties is
         * changed before acquiring the slot.
         */
-       ReplicationSlotAcquire(remote_slot->name, true);
+       ReplicationSlotAcquire(remote_slot->name, true, false);
 
        Assert(slot == MyReplicationSlot);
 
index b30e0473e1ce73b0607c248456af73533a8e4b29..c57a13d82089ceeee19bba78c13963b620552af0 100644 (file)
@@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid. It should always be set to true, except when we are temporarily
+ * acquiring the slot and don't intend to change it.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
    ReplicationSlot *s;
    int         active_pid;
@@ -561,6 +565,19 @@ retry:
                        name)));
    }
 
+   /* Invalid slots can't be modified or used before accessing the WAL. */
+   if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+   {
+       LWLockRelease(ReplicationSlotControlLock);
+
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("can no longer access replication slot \"%s\"",
+                      NameStr(s->data.name)),
+               errdetail("This replication slot has been invalidated due to \"%s\".",
+                         SlotInvalidationCauses[s->data.invalidated]));
+   }
+
    /*
     * This is the slot we want; check if it's active under some other
     * process.  In single user mode, we don't need this check.
@@ -785,7 +802,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
    Assert(MyReplicationSlot == NULL);
 
-   ReplicationSlotAcquire(name, nowait);
+   ReplicationSlotAcquire(name, nowait, false);
 
    /*
     * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +829,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
    Assert(MyReplicationSlot == NULL);
    Assert(failover || two_phase);
 
-   ReplicationSlotAcquire(name, false);
+   ReplicationSlotAcquire(name, false, true);
 
    if (SlotIsPhysical(MyReplicationSlot))
        ereport(ERROR,
@@ -820,13 +837,6 @@ ReplicationSlotAlter(const char *name, const bool *failover,
                errmsg("cannot use %s with a physical replication slot",
                       "ALTER_REPLICATION_SLOT"));
 
-   if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-       ereport(ERROR,
-               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-               errmsg("cannot alter invalid replication slot \"%s\"", name),
-               errdetail("This replication slot has been invalidated due to \"%s\".",
-                         SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
-
    if (RecoveryInProgress())
    {
        /*
index 977146789fee081963fcb57d1bcec7a225a83915..8be4b8c65b562f736ba406344de76e30802d4458 100644 (file)
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
        moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
    /* Acquire the slot so we "own" it */
-   ReplicationSlotAcquire(NameStr(*slotname), true);
+   ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
    /* A slot whose restart_lsn has never been reserved cannot be advanced */
    if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
index bac504b554e9ec890d4767b40081efb0a11e982b..446d10c1a7d7a2e84fea286dc4a6cd1adec3aa35 100644 (file)
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
    if (cmd->slotname)
    {
-       ReplicationSlotAcquire(cmd->slotname, true);
+       ReplicationSlotAcquire(cmd->slotname, true, true);
        if (SlotIsLogical(MyReplicationSlot))
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
    Assert(!MyReplicationSlot);
 
-   ReplicationSlotAcquire(cmd->slotname, true);
+   ReplicationSlotAcquire(cmd->slotname, true, true);
 
    /*
     * Force a disconnect, so that the decoding code doesn't need to care
index 9a10907d05b62293b533573d6f9f25d4bc74ed0b..d44f8c262baa207266b6183292285655178777b9 100644 (file)
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
    slot_name = PG_GETARG_NAME(0);
 
    /* Acquire the given slot */
-   ReplicationSlotAcquire(NameStr(*slot_name), true);
+   ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
    Assert(SlotIsLogical(MyReplicationSlot));
 
index bf62b36ad07a56f86b204f52ce690b6700b21352..47ebdaecb6afeddffcd742e69ee31b3d5cd1d0a8 100644 (file)
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
                                 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+                                  bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
index ae2ad5c933ab3b538eaf26469d98761a82b22a79..6468784b83d6fd1175bbd22f070b5b6f349c79ad 100644 (file)
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
    if ($node_standby->log_contains(
-           "requested WAL segment [0-9A-F]+ has already been removed",
+           "This replication slot has been invalidated due to \"wal_removed\".",
            $logstart))
    {
        $failed = 1;
index 7e794c5bea3dc2e0c039003147ea7c1a97051631..505e85d1eb6b9f0fe80ccdab581d9fd290c999da 100644 (file)
@@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
    qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
    replication => 'database');
 ok( $stderr =~
-     /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+     /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
      && $stderr =~
      /DETAIL:  This replication slot has been invalidated due to "rows_removed"./,
    "invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-   "can no longer get changes from replication slot \"vacuum_full_activeslot\""
-);
+   "can no longer access replication slot \"vacuum_full_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-   "can no longer get changes from replication slot \"row_removal_activeslot\""
-);
+   "can no longer access replication slot \"row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-   "can no longer get changes from replication slot \"shared_row_removal_activeslot\""
+   "can no longer access replication slot \"shared_row_removal_activeslot\""
 );
 
 ##################################################
@@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-   "can no longer get changes from replication slot \"pruning_activeslot\"");
+   "can no longer access replication slot \"pruning_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # as the slot has been invalidated we should not be able to read
 check_pg_recvlogical_stderr($handle,
-   "can no longer get changes from replication slot \"wal_level_activeslot\""
-);
+   "can no longer access replication slot \"wal_level_activeslot\"");
 
 ##################################################
 # DROP DATABASE should drop its slots, including active slots.