From 0c56c61a2b62e0e868926e94f72ebd9cc394dc1a Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Mon, 3 Feb 2025 15:20:40 +0530 Subject: [PATCH 1/2] Introduce inactive_timeout based replication slot invalidation Tools that create replication slots (e.g., for migrations or upgrades) may fail to remove them if an error occurs, leaving behind unused slots that take up space and resources. Manually cleaning them up can be tedious and error-prone, and without intervention, these lingering slots can cause unnecessary WAL retention and system bloat. Till now, postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, setting an optimal value for this is tricky since the amount of WAL a database generates, and the allocated storage per instance will vary greatly in production. A high value may allow orphaned slots to persist longer than necessary, leading to system bloat by retaining WAL unnecessarily. This commit introduces idle_replication_slot_timeout, a simpler and more intuitive way to manage inactive slots. Instead of relying on WAL size, users can set a time limit (e.g., 1 or 2 or n days), after which slots that remain idle for longer than this amount of time are automatically invalidated during checkpoints. Note that the idle timeout invalidation mechanism is not applicable for slots that do not reserve WAL or for slots on the standby server that are being synced from the primary server (i.e., standby slots having 'synced' field 'true'). Synced slots are always considered to be inactive because they don't perform logical decoding to produce changes. --- doc/src/sgml/config.sgml | 40 +++ doc/src/sgml/logical-replication.sgml | 5 + doc/src/sgml/system-views.sgml | 7 + src/backend/access/transam/xlog.c | 4 +- src/backend/replication/slot.c | 326 ++++++++++++++---- src/backend/replication/slotfuncs.c | 2 +- src/backend/utils/adt/timestamp.c | 18 + src/backend/utils/misc/guc_tables.c | 12 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/bin/pg_basebackup/pg_createsubscriber.c | 4 + src/bin/pg_upgrade/server.c | 7 + src/include/replication/slot.h | 22 +- src/include/utils/guc_hooks.h | 2 + src/include/utils/timestamp.h | 3 + src/tools/pgindent/typedefs.list | 1 + 15 files changed, 368 insertions(+), 86 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 336630ce417e..9eedcf6f0f41 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4429,6 +4429,46 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + idle_replication_slot_timeout (integer) + + idle_replication_slot_timeout configuration parameter + + + + + Invalidate replication slots that have remained idle longer than this + duration. If this value is specified without units, it is taken as + minutes. A value of zero (the default) disables the idle timeout + invalidation mechanism. This parameter can only be set in the + postgresql.conf file or on the server command + line. + + + + Slot invalidation due to idle timeout occurs during checkpoint. + Because checkpoints happen at checkpoint_timeout + intervals, there can be some lag between when the + idle_replication_slot_timeout was exceeded and when + the slot invalidation is triggered at the next checkpoint. + To avoid such lags, users can force a checkpoint to promptly invalidate + inactive slots. The duration of slot inactivity is calculated using the + slot's pg_replication_slots.inactive_since + value. + + + + Note that the idle timeout invalidation mechanism is not applicable + for slots that do not reserve WAL or for slots on the standby server + that are being synced from the primary server (i.e., standby slots + having pg_replication_slots.synced + value true). Synced slots are always considered to + be inactive because they don't perform logical decoding to produce + changes. + + + + wal_sender_timeout (integer) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 613abcd28b7d..3d18e507bbcd 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2390,6 +2390,11 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER plus some reserve for table synchronization. + + Logical replication slots are also affected by + idle_replication_slot_timeout. + + max_wal_senders should be set to at least the same as diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index ad2903d5ac74..3f5a306247e6 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2619,6 +2619,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx perform logical decoding. It is set only for logical slots. + + + idle_timeout means that the slot has remained + idle longer than the configured + duration. + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 25a5c6054049..f9bf5ba75091 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7337,7 +7337,7 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { @@ -7792,7 +7792,7 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fe5acd8b1fc7..674998b4e988 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -102,16 +102,24 @@ typedef struct /* * Lookup table for slot invalidation causes. */ -const char *const SlotInvalidationCauses[] = { - [RS_INVAL_NONE] = "none", - [RS_INVAL_WAL_REMOVED] = "wal_removed", - [RS_INVAL_HORIZON] = "rows_removed", - [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", +typedef struct SlotInvalidationCauseMap +{ + ReplicationSlotInvalidationCause cause; + const char *cause_name; +} SlotInvalidationCauseMap; + +static const SlotInvalidationCauseMap SlotInvalidationCauses[] = { + {RS_INVAL_NONE, "none"}, + {RS_INVAL_WAL_REMOVED, "wal_removed"}, + {RS_INVAL_HORIZON, "rows_removed"}, + {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"}, + {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"}, }; -/* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL - +/* + * Ensure that the lookup table is up-to-date with the enums defined in + * ReplicationSlotInvalidationCause. + */ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -141,6 +149,12 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 10; /* the maximum number of replication * slots */ +/* + * Invalidate replication slots that have remained idle longer than this + * duration; '0' disables it. + */ +int idle_replication_slot_timeout_mins = 0; + /* * This GUC lists streaming replication standby server slot names that * logical WAL sender processes will wait for. @@ -575,7 +589,7 @@ ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid) errmsg("can no longer access replication slot \"%s\"", NameStr(s->data.name)), errdetail("This replication slot has been invalidated due to \"%s\".", - SlotInvalidationCauses[s->data.invalidated])); + GetSlotInvalidationCauseName(s->data.invalidated))); } /* @@ -592,14 +606,23 @@ ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid) if (!nowait) ConditionVariablePrepareToSleep(&s->active_cv); + /* + * It is important to reset the inactive_since under spinlock here to + * avoid race conditions with slot invalidation. See comments related + * to inactive_since in InvalidatePossiblyObsoleteSlot. + */ SpinLockAcquire(&s->mutex); if (s->active_pid == 0) s->active_pid = MyProcPid; active_pid = s->active_pid; + ReplicationSlotSetInactiveSince(s, 0, false); SpinLockRelease(&s->mutex); } else + { active_pid = MyProcPid; + ReplicationSlotSetInactiveSince(s, 0, true); + } LWLockRelease(ReplicationSlotControlLock); /* @@ -640,11 +663,6 @@ ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid) if (SlotIsLogical(s)) pgstat_acquire_replslot(s); - /* - * Reset the time since the slot has become inactive as the slot is active - * now. - */ - ReplicationSlotSetInactiveSince(s, 0, true); if (am_walsender) { @@ -1512,12 +1530,14 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, - TransactionId snapshotConflictHorizon) + TransactionId snapshotConflictHorizon, + long slot_idle_seconds) { StringInfoData err_detail; - bool hint = false; + StringInfoData err_hint; initStringInfo(&err_detail); + initStringInfo(&err_hint); switch (cause) { @@ -1525,13 +1545,15 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, { unsigned long long ex = oldestLSN - restart_lsn; - hint = true; appendStringInfo(&err_detail, ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.", "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", ex), LSN_FORMAT_ARGS(restart_lsn), ex); + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_hint, _("You might need to increase \"%s\"."), + "max_slot_wal_keep_size"); break; } case RS_INVAL_HORIZON: @@ -1542,6 +1564,21 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_LEVEL: appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server.")); break; + + case RS_INVAL_IDLE_TIMEOUT: + { + int minutes = slot_idle_seconds / SECS_PER_MINUTE; + int secs = slot_idle_seconds % SECS_PER_MINUTE; + + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."), + minutes, secs, "idle_replication_slot_timeout", + idle_replication_slot_timeout_mins); + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_hint, _("You might need to increase \"%s\"."), + "idle_replication_slot_timeout"); + break; + } case RS_INVAL_NONE: pg_unreachable(); } @@ -1553,9 +1590,99 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, errmsg("invalidating obsolete replication slot \"%s\"", NameStr(slotname)), errdetail_internal("%s", err_detail.data), - hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0); + err_hint.len ? errhint("%s", err_hint.data) : 0); pfree(err_detail.data); + pfree(err_hint.data); +} + +/* + * Can we invalidate an idle replication slot? + * + * Idle timeout invalidation is allowed only when: + * + * 1. Idle timeout is set + * 2. Slot has reserved WAL + * 3. Slot is inactive + * 4. The slot is not being synced from the primary while the server is in + * recovery. This is because synced slots are always considered to be + * inactive because they don't perform logical decoding to produce changes. + */ +static inline bool +CanInvalidateIdleSlot(ReplicationSlot *s) +{ + return (idle_replication_slot_timeout_mins != 0 && + !XLogRecPtrIsInvalid(s->data.restart_lsn) && + s->inactive_since > 0 && + !(RecoveryInProgress() && s->data.synced)); +} + +/* + * DetermineSlotInvalidationCause - Determine the cause for which a slot + * becomes invalid among the given possible causes. + * + * This function sequentially checks all possible invalidation causes and + * returns the first one for which the slot is eligible for invalidation. + */ +static ReplicationSlotInvalidationCause +DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, + XLogRecPtr oldestLSN, Oid dboid, + TransactionId snapshotConflictHorizon, + TransactionId initial_effective_xmin, + TransactionId initial_catalog_effective_xmin, + XLogRecPtr initial_restart_lsn, + TimestampTz *inactive_since, TimestampTz now) +{ + Assert(possible_causes != RS_INVAL_NONE); + + if (possible_causes & RS_INVAL_WAL_REMOVED) + { + if (initial_restart_lsn != InvalidXLogRecPtr && + initial_restart_lsn < oldestLSN) + return RS_INVAL_WAL_REMOVED; + } + + if (possible_causes & RS_INVAL_HORIZON) + { + /* invalid DB oid signals a shared relation */ + if (SlotIsLogical(s) && + (dboid == InvalidOid || dboid == s->data.database)) + { + if (TransactionIdIsValid(initial_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_effective_xmin, + snapshotConflictHorizon)) + return RS_INVAL_HORIZON; + else if (TransactionIdIsValid(initial_catalog_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, + snapshotConflictHorizon)) + return RS_INVAL_HORIZON; + } + } + + if (possible_causes & RS_INVAL_WAL_LEVEL) + { + if (SlotIsLogical(s)) + return RS_INVAL_WAL_LEVEL; + } + + if (possible_causes & RS_INVAL_IDLE_TIMEOUT) + { + Assert(now > 0); + + /* + * Check if the slot needs to be invalidated due to + * idle_replication_slot_timeout GUC. + */ + if (CanInvalidateIdleSlot(s) && + TimestampDifferenceExceedsSeconds(s->inactive_since, now, + idle_replication_slot_timeout_mins * SECS_PER_MINUTE)) + { + *inactive_since = s->inactive_since; + return RS_INVAL_IDLE_TIMEOUT; + } + } + + return RS_INVAL_NONE; } /* @@ -1572,7 +1699,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, * for syscalls, so caller must restart if we return true. */ static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, +InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, @@ -1585,6 +1712,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, TransactionId initial_catalog_effective_xmin = InvalidTransactionId; XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; + TimestampTz inactive_since = 0; for (;;) { @@ -1592,6 +1720,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, NameData slotname; int active_pid = 0; ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE; + TimestampTz now = 0; + long slot_idle_secs = 0; Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); @@ -1602,6 +1732,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, break; } + if (possible_causes & RS_INVAL_IDLE_TIMEOUT) + { + /* + * Assign the current time here to avoid system call overhead + * while holding the spinlock in subsequent code. + */ + now = GetCurrentTimestamp(); + } + /* * Check if the slot needs to be invalidated. If it needs to be * invalidated, and is not currently acquired, acquire it and mark it @@ -1621,6 +1760,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * those values change since the process holding the slot has been * terminated (if any), so record them here to ensure that we * would report the correct invalidation cause. + * + * Unlike others, slot's inactive_since can't be changed once it + * is acquired till it gets released or the process owning it gets + * terminated. The slot remains active till some process owns it. + * So, the inactive slot can only be invalidated immediately + * without being terminated. */ if (!terminated) { @@ -1629,35 +1774,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, initial_catalog_effective_xmin = s->effective_catalog_xmin; } - switch (cause) - { - case RS_INVAL_WAL_REMOVED: - if (initial_restart_lsn != InvalidXLogRecPtr && - initial_restart_lsn < oldestLSN) - invalidation_cause = cause; - break; - case RS_INVAL_HORIZON: - if (!SlotIsLogical(s)) - break; - /* invalid DB oid signals a shared relation */ - if (dboid != InvalidOid && dboid != s->data.database) - break; - if (TransactionIdIsValid(initial_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_effective_xmin, - snapshotConflictHorizon)) - invalidation_cause = cause; - else if (TransactionIdIsValid(initial_catalog_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, - snapshotConflictHorizon)) - invalidation_cause = cause; - break; - case RS_INVAL_WAL_LEVEL: - if (SlotIsLogical(s)) - invalidation_cause = cause; - break; - case RS_INVAL_NONE: - pg_unreachable(); - } + invalidation_cause = DetermineSlotInvalidationCause(possible_causes, + s, oldestLSN, + dboid, + snapshotConflictHorizon, + initial_effective_xmin, + initial_catalog_effective_xmin, + initial_restart_lsn, + &inactive_since, + now); } /* @@ -1705,12 +1830,25 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, /* * The logical replication slots shouldn't be invalidated as GUC - * max_slot_wal_keep_size is set to -1 during the binary upgrade. See - * check_old_cluster_for_valid_slots() where we ensure that no - * invalidated before the upgrade. + * max_slot_wal_keep_size is set to -1 and + * idle_replication_slot_timeout is set to 0 during the binary + * upgrade. See check_old_cluster_for_valid_slots() where we ensure + * that no invalidated before the upgrade. */ Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)); + /* + * Calculate the idle time duration of the slot if slot is marked + * invalidated with RS_INVAL_IDLE_TIMEOUT. + */ + if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT) + { + int slot_idle_usecs; + + TimestampDifference(inactive_since, now, &slot_idle_secs, + &slot_idle_usecs); + } + if (active_pid != 0) { /* @@ -1739,7 +1877,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, { ReportSlotInvalidation(invalidation_cause, true, active_pid, slotname, restart_lsn, - oldestLSN, snapshotConflictHorizon); + oldestLSN, snapshotConflictHorizon, + slot_idle_secs); if (MyBackendType == B_STARTUP) (void) SendProcSignal(active_pid, @@ -1785,7 +1924,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReportSlotInvalidation(invalidation_cause, false, active_pid, slotname, restart_lsn, - oldestLSN, snapshotConflictHorizon); + oldestLSN, snapshotConflictHorizon, + slot_idle_secs); /* done with this slot for now */ break; @@ -1802,26 +1942,32 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * * Returns true when any slot have got invalidated. * - * Whether a slot needs to be invalidated depends on the cause. A slot is - * removed if it: + * Whether a slot needs to be invalidated depends on the invalidation cause. + * A slot is invalidated if it: * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations - * - RS_INVAL_WAL_LEVEL: is logical + * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient + * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured + * "idle_replication_slot_timeout" duration. + * + * Note: This function attempts to invalidate the slot for multiple possible + * causes in a single pass, minimizing redundant iterations. The "cause" + * parameter can be a MASK representing one or more of the defined causes. * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool -InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, +InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon) { XLogRecPtr oldestLSN; bool invalidated = false; - Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon)); - Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0); - Assert(cause != RS_INVAL_NONE); + Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon)); + Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0); + Assert(possible_causes != RS_INVAL_NONE); if (max_replication_slots == 0) return invalidated; @@ -1837,7 +1983,7 @@ InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid, + if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid, snapshotConflictHorizon, &invalidated)) { @@ -2426,26 +2572,37 @@ RestoreSlotFromDisk(const char *name) * ReplicationSlotInvalidationCause. */ ReplicationSlotInvalidationCause -GetSlotInvalidationCause(const char *invalidation_reason) +GetSlotInvalidationCause(const char *cause_name) { - ReplicationSlotInvalidationCause cause; - ReplicationSlotInvalidationCause result = RS_INVAL_NONE; - bool found PG_USED_FOR_ASSERTS_ONLY = false; + Assert(cause_name); - Assert(invalidation_reason); + /* Search lookup table for the cause having this name */ + for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++) + { + if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0) + return SlotInvalidationCauses[i].cause; + } + + Assert(false); + return RS_INVAL_NONE; /* to keep compiler quiet */ +} - for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++) +/* + * Maps an ReplicationSlotInvalidationCause to the invalidation + * reason for a replication slot. + */ +const char * +GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) +{ + /* Search lookup table for the name of this cause */ + for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++) { - if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0) - { - found = true; - result = cause; - break; - } + if (SlotInvalidationCauses[i].cause == cause) + return SlotInvalidationCauses[i].cause_name; } - Assert(found); - return result; + Assert(false); + return "none"; /* to keep compiler quiet */ } /* @@ -2802,3 +2959,22 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) ConditionVariableCancelSleep(); } + +/* + * GUC check_hook for idle_replication_slot_timeout + * + * The value of idle_replication_slot_timeout must be set to 0 during + * a binary upgrade. See start_postmaster() in pg_upgrade for more details. + */ +bool +check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source) +{ + if (IsBinaryUpgrade && *newval != 0) + { + GUC_check_errdetail("The value of \"%s\" must be set to 0 during binary upgrade mode.", + "idle_replication_slot_timeout"); + return false; + } + + return true; +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8be4b8c65b56..f652ec8a73e4 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -431,7 +431,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) if (cause == RS_INVAL_NONE) nulls[i++] = true; else - values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause)); values[i++] = BoolGetDatum(slot_contents.data.failover); diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index ba9bae05069a..9682f9dbdca1 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1786,6 +1786,24 @@ TimestampDifferenceExceeds(TimestampTz start_time, return (diff >= msec * INT64CONST(1000)); } +/* + * Check if the difference between two timestamps is >= a given + * threshold (expressed in seconds). + */ +bool +TimestampDifferenceExceedsSeconds(TimestampTz start_time, + TimestampTz stop_time, + int threshold_sec) +{ + long secs; + int usecs; + + /* Calculate the difference in seconds */ + TimestampDifference(start_time, stop_time, &secs, &usecs); + + return (secs >= threshold_sec); +} + /* * Convert a time_t to TimestampTz. * diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index cce733146090..f5a81a47dd0e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3068,6 +3068,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"idle_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the duration a replication slot can remain idle before " + "it is invalidated."), + NULL, + GUC_UNIT_MIN + }, + &idle_replication_slot_timeout_mins, + 0, 0, INT_MAX / SECS_PER_MINUTE, + check_idle_replication_slot_timeout, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d472987ed46a..415f253096c1 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -329,6 +329,7 @@ # (change requires restart) #wal_keep_size = 0 # in megabytes; 0 disables #max_slot_wal_keep_size = -1 # in megabytes; -1 disables +#idle_replication_slot_timeout = 0 # in minutes; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 37fdf150b413..9fdf15e5ac03 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -1457,6 +1457,10 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_ appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path); appendShellString(pg_ctl_cmd, subscriber_dir); appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\""); + + /* Prevent unintended slot invalidation */ + appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\""); + if (restricted_access) { appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port); diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c index de6971cde6a5..873e5b5117bf 100644 --- a/src/bin/pg_upgrade/server.c +++ b/src/bin/pg_upgrade/server.c @@ -252,6 +252,13 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error) if (GET_MAJOR_VERSION(cluster->major_version) >= 1700) appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1"); + /* + * Use idle_replication_slot_timeout=0 to prevent slot invalidation due to + * idle_timeout by checkpointer process during upgrade. + */ + if (GET_MAJOR_VERSION(cluster->major_version) >= 1800) + appendPQExpBufferStr(&pgoptions, " -c idle_replication_slot_timeout=0"); + /* * Use -b to disable autovacuum and logical replication launcher * (effective in PG17 or later for the latter). diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 000c36d30dd2..f5a24ccfbf2b 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -44,21 +44,25 @@ typedef enum ReplicationSlotPersistency * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the * 'invalidated' field is set to a value other than _NONE. * - * When adding a new invalidation cause here, remember to update - * SlotInvalidationCauses and RS_INVAL_MAX_CAUSES. + * When adding a new invalidation cause here, the value must be powers of 2 + * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update + * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c. */ typedef enum ReplicationSlotInvalidationCause { - RS_INVAL_NONE, + RS_INVAL_NONE = 0, /* required WAL has been removed */ - RS_INVAL_WAL_REMOVED, + RS_INVAL_WAL_REMOVED = (1 << 0), /* required rows have been removed */ - RS_INVAL_HORIZON, + RS_INVAL_HORIZON = (1 << 1), /* wal_level insufficient for slot */ - RS_INVAL_WAL_LEVEL, + RS_INVAL_WAL_LEVEL = (1 << 2), + /* idle slot timeout has occurred */ + RS_INVAL_IDLE_TIMEOUT = (1 << 3), } ReplicationSlotInvalidationCause; -extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; +/* Maximum number of invalidation causes */ +#define RS_INVAL_MAX_CAUSES 4 /* * On-Disk data of a replication slot, preserved across restarts. @@ -254,6 +258,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; extern PGDLLIMPORT char *synchronized_standby_slots; +extern PGDLLIMPORT int idle_replication_slot_timeout_mins; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -286,7 +291,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, +extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon); @@ -303,6 +308,7 @@ extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); extern ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason); +extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause); extern bool SlotExistsInSyncStandbySlots(const char *slot_name); extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel); diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 87999218d687..951451a9765f 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -174,5 +174,7 @@ extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); extern bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source); extern void assign_synchronized_standby_slots(const char *newval, void *extra); +extern bool check_idle_replication_slot_timeout(int *newval, void **extra, + GucSource source); #endif /* GUC_HOOKS_H */ diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index d26f023fb871..9963bddc0ece 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -107,6 +107,9 @@ extern long TimestampDifferenceMilliseconds(TimestampTz start_time, extern bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec); +extern bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, + TimestampTz stop_time, + int threshold_sec); extern TimestampTz time_t_to_timestamptz(pg_time_t tm); extern pg_time_t timestamptz_to_time_t(TimestampTz t); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index bce4214503d6..7286ebd1b138 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2689,6 +2689,7 @@ SkipPages SlabBlock SlabContext SlabSlot +SlotInvalidationCauseMap SlotNumber SlotSyncCtxStruct SlruCtl From 3a48575866490c426da5360442189e80bbc6f033 Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Tue, 11 Feb 2025 17:26:15 +0530 Subject: [PATCH 2/2] Add TAP test for slot invalidation based on inactive timeout. This test uses injection points to bypass the time overhead caused by the idle_replication_slot_timeout GUC, which has a minimum value of one minute. --- src/backend/replication/slot.c | 29 +++-- src/test/recovery/meson.build | 1 + .../t/044_invalidate_inactive_slots.pl | 106 ++++++++++++++++++ 3 files changed, 127 insertions(+), 9 deletions(-) create mode 100644 src/test/recovery/t/044_invalidate_inactive_slots.pl diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 674998b4e988..68d6f006f5c1 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -55,6 +55,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/guc_hooks.h" #include "utils/varlena.h" @@ -1669,16 +1670,26 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, { Assert(now > 0); - /* - * Check if the slot needs to be invalidated due to - * idle_replication_slot_timeout GUC. - */ - if (CanInvalidateIdleSlot(s) && - TimestampDifferenceExceedsSeconds(s->inactive_since, now, - idle_replication_slot_timeout_mins * SECS_PER_MINUTE)) + if (CanInvalidateIdleSlot(s)) { - *inactive_since = s->inactive_since; - return RS_INVAL_IDLE_TIMEOUT; + /* + * Check if the slot needs to be invalidated due to + * idle_replication_slot_timeout GUC. + * + * To test idle timeout slot invalidation, if the + * "slot-timeout-inval" injection point is attached, immediately + * invalidate the slot. + */ + if ( +#ifdef USE_INJECTION_POINTS + IS_INJECTION_POINT_ATTACHED("slot-timeout-inval") || +#endif + TimestampDifferenceExceedsSeconds(s->inactive_since, now, + idle_replication_slot_timeout_mins * SECS_PER_MINUTE)) + { + *inactive_since = s->inactive_since; + return RS_INVAL_IDLE_TIMEOUT; + } } } diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 0428704dbfd5..057bcde1434d 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -52,6 +52,7 @@ tests += { 't/041_checkpoint_at_promote.pl', 't/042_low_level_backup.pl', 't/043_no_contrecord_switch.pl', + 't/044_invalidate_inactive_slots.pl', ], }, } diff --git a/src/test/recovery/t/044_invalidate_inactive_slots.pl b/src/test/recovery/t/044_invalidate_inactive_slots.pl new file mode 100644 index 000000000000..949b0aa7be6e --- /dev/null +++ b/src/test/recovery/t/044_invalidate_inactive_slots.pl @@ -0,0 +1,106 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test for replication slots invalidation due to idle_timeout +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::Cluster; +use Test::More; + +# This test depends on injection point that forces slot invalidation +# due to idle_timeout. +# https://www.postgresql.org/docs/current/xfunc-c.html#XFUNC-ADDIN-INJECTION-POINTS +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# Wait for slot to first become idle and then get invalidated +sub wait_for_slot_invalidation +{ + my ($node, $slot_name, $offset) = @_; + my $node_name = $node->name; + + # The slot's invalidation should be logged + $node->wait_for_log( + qr/invalidating obsolete replication slot \"$slot_name\"/, $offset); + + # Check that the invalidation reason is 'idle_timeout' + $node->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE slot_name = '$slot_name' AND + invalidation_reason = 'idle_timeout'; + ]) + or die + "Timed out while waiting for invalidation reason of slot $slot_name to be set on node $node_name"; +} + +# ======================================================================== +# Testcase start +# +# Test invalidation of physical replication slot and logical replication slot +# due to idle timeout. + +# Initialize the node +my $node = PostgreSQL::Test::Cluster->new('node'); +$node->init(allows_streaming => 'logical'); + +# Avoid unpredictability +$node->append_conf( + 'postgresql.conf', qq{ +checkpoint_timeout = 1h +idle_replication_slot_timeout = 1min +}); +$node->start; + +# Check if the 'injection_points' extension is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$node->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Create both physical and logical replication slots +$node->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := 'physical_slot', immediately_reserve := true); + SELECT pg_create_logical_replication_slot('logical_slot', 'test_decoding'); +]); + +my $log_offset = -s $node->logfile; + +# Register an injection point on the node to forcibly cause a slot +# invalidation due to idle_timeout +$node->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +$node->safe_psql('postgres', + "SELECT injection_points_attach('slot-timeout-inval', 'error');"); + +# Idle timeout slot invalidation occurs during a checkpoint, so run a +# checkpoint to invalidate the slots. +$node->safe_psql('postgres', "CHECKPOINT"); + +# Wait for slots to become inactive. Note that since nobody has acquired the +# slot yet, then if it has been invalidated that can only be due to the idle +# timeout mechanism. +wait_for_slot_invalidation($node, 'physical_slot', $log_offset); +wait_for_slot_invalidation($node, 'logical_slot', $log_offset); + +# Check that the invalidated slot cannot be acquired +my ($result, $stdout, $stderr); +($result, $stdout, $stderr) = $node->psql( + 'postgres', qq[ + SELECT pg_replication_slot_advance('logical_slot', '0/1'); +]); +ok( $stderr =~ /can no longer access replication slot "logical_slot"/, + "detected error upon trying to acquire invalidated slot on node") + or die + "could not detect error upon trying to acquire invalidated slot \"logical_slot\" on node"; + +# Testcase end +# ============================================================================= + +done_testing();