Implement pg_wal_replay_wait() stored procedure
authorAlexander Korotkov <[email protected]>
Tue, 2 Apr 2024 19:48:03 +0000 (22:48 +0300)
committerAlexander Korotkov <[email protected]>
Tue, 2 Apr 2024 19:48:03 +0000 (22:48 +0300)
pg_wal_replay_wait() is to be used on standby and specifies waiting for
the specific WAL location to be replayed before starting the transaction.
This option is useful when the user makes some data changes on primary and
needs a guarantee to see these changes on standby.

The queue of waiters is stored in the shared memory array sorted by LSN.
During replay of WAL waiters whose LSNs are already replayed are deleted from
the shared memory array and woken up by setting of their latches.

pg_wal_replay_wait() needs to wait without any snapshot held.  Otherwise,
the snapshot could prevent the replay of WAL records implying a kind of
self-deadlock.  This is why it is only possible to implement
pg_wal_replay_wait() as a procedure working in a non-atomic context,
not a function.

Catversion is bumped.

Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru
Author: Kartyshov Ivan, Alexander Korotkov
Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila
Reviewed-by: Alexander Lakhin, Bharath Rupireddy, Euler Taveira
16 files changed:
doc/src/sgml/func.sgml
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogrecovery.c
src/backend/catalog/system_functions.sql
src/backend/commands/Makefile
src/backend/commands/meson.build
src/backend/commands/waitlsn.c [new file with mode: 0644]
src/backend/storage/ipc/ipci.c
src/backend/storage/lmgr/proc.c
src/backend/utils/activity/wait_event_names.txt
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/commands/waitlsn.h [new file with mode: 0644]
src/test/recovery/meson.build
src/test/recovery/t/043_wal_replay_wait.pl [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index b694b2883c32997050465b9a2dad0d91a0c41798..192959ebc1122c957c6a551dcb76ddf739b85896 100644 (file)
@@ -28284,6 +28284,119 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
     the pause, the rate of WAL generation and available disk space.
    </para>
 
+   <para>
+    There are also procedures to control the progress of recovery.
+    They are shown in <xref linkend="procedures-recovery-control-table"/>.
+    These procedures may be executed only during recovery.
+   </para>
+
+   <table id="procedures-recovery-control-table">
+    <title>Recovery Control Procedures</title>
+    <tgroup cols="1">
+     <thead>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        Procedure
+       </para>
+       <para>
+        Description
+       </para></entry>
+      </row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_wal_replay_wait</primary>
+        </indexterm>
+        <function>pg_wal_replay_wait</function> (
+          <parameter>target_lsn</parameter> <type>pg_lsn</type>,
+          <parameter>timeout</parameter> <type>bigint</type> <literal>DEFAULT</literal> <literal>0</literal>)
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        If <parameter>timeout</parameter> is not specified or zero, this
+        procedure returns once WAL is replayed upto
+        <literal>target_lsn</literal>.
+        If the <parameter>timeout</parameter> is specified (in
+        milliseconds) and greater than zero, the procedure waits until the
+        server actually replays the WAL upto <literal>target_lsn</literal> or
+        until the given time has passed. On timeout, an error is emitted.
+       </para></entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+   <para>
+    <function>pg_wal_replay_wait</function> waits till
+    <parameter>target_lsn</parameter> to be replayed on standby.
+    That is, after this function execution, the value returned by
+    <function>pg_last_wal_replay_lsn</function> should be greater or equal
+    to the <parameter>target_lsn</parameter> value.  This is useful to achieve
+    read-your-writes-consistency, while using async replica for reads and
+    primary for writes.  In that case <acronym>lsn</acronym> of the last
+    modification should be stored on the client application side or the
+    connection pooler side.
+   </para>
+
+   <para>
+    You can use <function>pg_wal_replay_wait</function> to wait for
+    the <type>pg_lsn</type> value.  For example, an application could update
+    the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+    on primary server to get the <acronym>lsn</acronym> given that
+    <varname>synchronous_commit</varname> could be set to
+    <literal>off</literal>.
+
+   <programlisting>
+postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
+UPDATE 100
+postgres=# SELECT pg_current_wal_insert_lsn();
+pg_current_wal_insert_lsn
+--------------------
+0/306EE20
+(1 row)
+   </programlisting>
+
+   Then an application could run <function>pg_wal_replay_wait</function>
+   with the <acronym>lsn</acronym> obtained from primary.  After that the
+   changes made of primary should be guaranteed to be visible on replica.
+
+   <programlisting>
+postgres=# CALL pg_wal_replay_wait('0/306EE20');
+CALL
+postgres=# SELECT * FROM movie WHERE genre = 'Drama';
+ genre
+-------
+(0 rows)
+   </programlisting>
+
+   It may also happen that target <acronym>lsn</acronym> is not achieved
+   within the timeout.  In that case the error is thrown.
+
+   <programlisting>
+postgres=# CALL pg_wal_replay_wait('0/306EE20', 100);
+ERROR:  timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
+    </programlisting>
+
+   </para>
+
+   <para>
+     <function>pg_wal_replay_wait</function> can't be used within
+     the transaction, another procedure or function.  Any of them holds a
+     snapshot, which could prevent the replay of WAL records.
+
+   <programlisting>
+postgres=# BEGIN;
+BEGIN
+postgres=*# CALL pg_wal_replay_wait('0/306EE20');
+ERROR:  pg_wal_replay_wait() must be only called in non-atomic context
+DETAIL:  Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.
+   </programlisting>
+
+   </para>
   </sect2>
 
   <sect2 id="functions-snapshot-synchronization">
index 20a5f862090533ce3c21e0220824fd7eb722d78d..1446639ea09acb92bfc48760e73f492af0cbe772 100644 (file)
@@ -66,6 +66,7 @@
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
+#include "commands/waitlsn.h"
 #include "common/controldata_utils.h"
 #include "common/file_utils.h"
 #include "executor/instrument.h"
@@ -6040,6 +6041,12 @@ StartupXLOG(void)
    UpdateControlFile();
    LWLockRelease(ControlFileLock);
 
+   /*
+    * Wake up all waiters for replay LSN.  They need to report an error that
+    * recovery was ended before achieving the target LSN.
+    */
+   WaitLSNSetLatches(InvalidXLogRecPtr);
+
    /*
     * Shutdown the recovery environment.  This must occur after
     * RecoverPreparedTransactions() (see notes in lock_twophase_recover())
index 29c5bec084771d9b233056f7887eef7d10b799b0..24ab1b2b21364adf6be913022ad13cf5a053ad36 100644 (file)
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/waitlsn.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1828,6 +1829,16 @@ PerformWalRecovery(void)
                break;
            }
 
+           /*
+            * If we replayed an LSN that someone was waiting for then walk
+            * over the shared memory array and set latches to notify the
+            * waiters.
+            */
+           if (waitLSN &&
+               (XLogRecoveryCtl->lastReplayedEndRecPtr >=
+                pg_atomic_read_u64(&waitLSN->minLSN)))
+               WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
            /* Else, try to fetch the next WAL record */
            record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
        } while (record != NULL);
index fe2bb50f46d0df88123835a1d33096eba34f424c..a79bb80c951e9f254fb536295930f79475757838 100644 (file)
@@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION
   json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
   RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100  AS 'json_populate_recordset' PARALLEL SAFE;
 
+CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0)
+  LANGUAGE internal AS 'pg_wal_replay_wait';
+
 CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
     IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
     OUT lsn pg_lsn, OUT xid xid, OUT data text)
index 48f7348f91c72274edda9723f42ccc7f246917d6..cede90c3b98c0282aaef9d9a87bd64d3478a40fd 100644 (file)
@@ -61,6 +61,7 @@ OBJS = \
    vacuum.o \
    vacuumparallel.o \
    variable.o \
-   view.o
+   view.o \
+   waitlsn.o
 
 include $(top_srcdir)/src/backend/common.mk
index 6dd00a4abdeaf625e65103679d37d932fecc98b4..7549be5dc3b75b8135556da80e07b3585d8d4b2b 100644 (file)
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'waitlsn.c',
 )
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644 (file)
index 0000000..6679378
--- /dev/null
@@ -0,0 +1,348 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ *   Implements waiting for the given LSN, which is used in
+ *   CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *   src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/waitlsn.h"
+#include "executor/spi.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void addLSNWaiter(XLogRecPtr lsn);
+static void deleteLSNWaiter(void);
+
+struct WaitLSNState *waitLSN = NULL;
+static volatile sig_atomic_t haveShmemItem = false;
+
+/*
+ * Report the amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+   Size        size;
+
+   size = offsetof(WaitLSNState, procInfos);
+   size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+   return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory */
+void
+WaitLSNShmemInit(void)
+{
+   bool        found;
+
+   waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+                                              WaitLSNShmemSize(),
+                                              &found);
+   if (!found)
+   {
+       SpinLockInit(&waitLSN->mutex);
+       waitLSN->numWaitedProcs = 0;
+       pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+   }
+}
+
+/*
+ * Add the information about the LSN waiter backend to the shared memory
+ * array.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+   WaitLSNProcInfo cur;
+   int         i;
+
+   SpinLockAcquire(&waitLSN->mutex);
+
+   cur.procnum = MyProcNumber;
+   cur.waitLSN = lsn;
+
+   for (i = 0; i < waitLSN->numWaitedProcs; i++)
+   {
+       if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
+       {
+           WaitLSNProcInfo tmp;
+
+           tmp = waitLSN->procInfos[i];
+           waitLSN->procInfos[i] = cur;
+           cur = tmp;
+       }
+   }
+   waitLSN->procInfos[i] = cur;
+   waitLSN->numWaitedProcs++;
+
+   pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+   SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete the information about the LSN waiter backend from the shared memory
+ * array.
+ */
+static void
+deleteLSNWaiter(void)
+{
+   int         i;
+   bool        found = false;
+
+   SpinLockAcquire(&waitLSN->mutex);
+
+   for (i = 0; i < waitLSN->numWaitedProcs; i++)
+   {
+       if (waitLSN->procInfos[i].procnum == MyProcNumber)
+           found = true;
+
+       if (found && i < waitLSN->numWaitedProcs - 1)
+       {
+           waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
+       }
+   }
+
+   if (!found)
+   {
+       SpinLockRelease(&waitLSN->mutex);
+       return;
+   }
+   waitLSN->numWaitedProcs--;
+
+   if (waitLSN->numWaitedProcs != 0)
+       pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+   else
+       pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+   SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Set latches of LSN waiters whose LSN has been replayed.  Set latches of all
+ * LSN waiters when InvalidXLogRecPtr is given.
+ */
+void
+WaitLSNSetLatches(XLogRecPtr currentLSN)
+{
+   int         i;
+   int        *wakeUpProcNums;
+   int         numWakeUpProcs;
+
+   wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
+
+   SpinLockAcquire(&waitLSN->mutex);
+
+   /*
+    * Remember processes, whose waited LSNs are already replayed.  We should
+    * set their latches later after spinlock release.
+    */
+   for (i = 0; i < waitLSN->numWaitedProcs; i++)
+   {
+       if (!XLogRecPtrIsInvalid(currentLSN) &&
+           waitLSN->procInfos[i].waitLSN > currentLSN)
+           break;
+
+       wakeUpProcNums[i] = waitLSN->procInfos[i].procnum;
+   }
+
+   /*
+    * Immediately remove those processes from the shmem array.  Otherwise,
+    * shmem array items will be here till corresponding processes wake up and
+    * delete themselves.
+    */
+   numWakeUpProcs = i;
+   for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
+       waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
+   waitLSN->numWaitedProcs -= numWakeUpProcs;
+
+   if (waitLSN->numWaitedProcs != 0)
+       pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+   else
+       pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+   SpinLockRelease(&waitLSN->mutex);
+
+   /*
+    * Set latches for processes, whose waited LSNs are already replayed. This
+    * involves spinlocks.  So, we shouldn't do this under a spinlock.
+    */
+   for (i = 0; i < numWakeUpProcs; i++)
+   {
+       PGPROC     *backend;
+
+       backend = GetPGProcByNumber(wakeUpProcNums[i]);
+       SetLatch(&backend->procLatch);
+   }
+   pfree(wakeUpProcNums);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+   if (haveShmemItem)
+       deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
+{
+   XLogRecPtr  currentLSN;
+   TimestampTz endtime;
+
+   /* Shouldn't be called when shmem isn't initialized */
+   Assert(waitLSN);
+
+   /* Should be only called by a backend */
+   Assert(MyBackendType == B_BACKEND);
+
+   if (!RecoveryInProgress())
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("recovery is not in progress"),
+                errhint("Waiting for LSN can only be executed during recovery.")));
+
+   /* If target LSN is already replayed, exit immediately */
+   if (targetLSN <= GetXLogReplayRecPtr(NULL))
+       return;
+
+   if (timeout > 0)
+       endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
+
+   addLSNWaiter(targetLSN);
+   haveShmemItem = true;
+
+   for (;;)
+   {
+       int         rc;
+       int         latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+       long        delay_ms = 0;
+
+       /* Check if the waited LSN has been replayed */
+       currentLSN = GetXLogReplayRecPtr(NULL);
+       if (targetLSN <= currentLSN)
+           break;
+
+       /* Recheck that recovery is still in-progress */
+       if (!RecoveryInProgress())
+           ereport(ERROR,
+                   (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                    errmsg("recovery is not in progress"),
+                    errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.",
+                              LSN_FORMAT_ARGS(targetLSN),
+                              LSN_FORMAT_ARGS(currentLSN))));
+
+       if (timeout > 0)
+       {
+           delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+           latch_events |= WL_TIMEOUT;
+           if (delay_ms <= 0)
+               break;
+       }
+
+       CHECK_FOR_INTERRUPTS();
+
+       rc = WaitLatch(MyLatch, latch_events, delay_ms,
+                      WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
+
+       if (rc & WL_LATCH_SET)
+           ResetLatch(MyLatch);
+   }
+
+   if (targetLSN > currentLSN)
+   {
+       deleteLSNWaiter();
+       haveShmemItem = false;
+       ereport(ERROR,
+               (errcode(ERRCODE_QUERY_CANCELED),
+                errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
+                       LSN_FORMAT_ARGS(targetLSN),
+                       LSN_FORMAT_ARGS(currentLSN))));
+   }
+   else
+   {
+       haveShmemItem = false;
+   }
+}
+
+Datum
+pg_wal_replay_wait(PG_FUNCTION_ARGS)
+{
+   XLogRecPtr  target_lsn = PG_GETARG_LSN(0);
+   int64       timeout = PG_GETARG_INT64(1);
+   CallContext *context = (CallContext *) fcinfo->context;
+
+   if (timeout < 0)
+       ereport(ERROR,
+               (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+                errmsg("\"timeout\" must not be negative")));
+
+   /*
+    * We are going to wait for the LSN replay.  We should first care that we
+    * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+    * Otherwise, our snapshot could prevent the replay of WAL records
+    * implying a kind of self-deadlock.  This is the reason why
+    * pg_wal_replay_wait() is a procedure, not a function.
+    *
+    * At first, we check that pg_wal_replay_wait() is called in a non-atomic
+    * context.  That is, a procedure call isn't wrapped into a transaction,
+    * another procedure call, or a function call.
+    *
+    * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic
+    * context, CallStmt is processed with a snapshot.  Thankfully, we can pop
+    * this snapshot, because PortalRunUtility() can tolerate this.
+    */
+   if (context->atomic)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("pg_wal_replay_wait() must be only called in non-atomic context"),
+                errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.")));
+
+   if (ActiveSnapshotSet())
+       PopActiveSnapshot();
+   Assert(!ActiveSnapshotSet());
+   InvalidateCatalogSnapshot();
+   Assert(MyProc->xmin == InvalidTransactionId);
+
+   (void) WaitForLSN(target_lsn, timeout);
+
+   PG_RETURN_VOID();
+}
index 521ed5418ccc019b2a14f9d30225cafc408a76c7..5aed90c9355ff568137de17644ec1380a40f3f06 100644 (file)
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
    size = add_size(size, WaitEventExtensionShmemSize());
    size = add_size(size, InjectionPointShmemSize());
    size = add_size(size, SlotSyncShmemSize());
+   size = add_size(size, WaitLSNShmemSize());
 #ifdef EXEC_BACKEND
    size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
    /* Initialize subsystems */
    CreateOrAttachShmemStructs();
 
+   /*
+    * Init array of Latches in shared memory for wait lsn
+    */
+   WaitLSNShmemInit();
+
 #ifdef EXEC_BACKEND
 
    /*
index 162b1f919dbc693c5e43bc7ea276cb4a3307331e..4b830dc3c85afcb1793a9dc2e4cf55ba121b6e17 100644 (file)
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
     */
    LWLockReleaseAll();
 
+   /*
+    * Cleanup waiting for LSN if any.
+    */
+   WaitLSNCleanup();
+
    /* Cancel any pending condition variable sleep, too */
    ConditionVariableCancelSleep();
 
index d39d8d7e878f8d3e66f680fc849f7b364c382063..0d288d6b3d83b367e79355696d7bb73532469edf 100644 (file)
@@ -79,6 +79,7 @@ LIBPQWALRECEIVER_CONNECT  "Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE   "Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER    "Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION  "Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_REPLAY    "Waiting for a replay of the particular WAL position on the physical standby."
 WAL_SENDER_WAIT_FOR_WAL    "Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA  "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
index 950f00bed48ab419b6fc7da760adc01bc2f2f60b..f8fab27da94e3711c2d943ed8552c3d93d7e1865 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202404021
+#define CATALOG_VERSION_NO 202404022
 
 #endif
index 134e3b22fd8feb74b0114cf971f0dc8063c98655..153d816a0532554793934317e273a4d228dbee7d 100644 (file)
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+{ oid => '16387', descr => 'wait for LSN with timeout',
+  proname => 'pg_wal_replay_wait', prokind => 'p', prorettype => 'void',
+  proargtypes => 'pg_lsn int8', proargnames => '{target_lsn,timeout}',
+  prosrc => 'pg_wal_replay_wait' },
+
 { oid => '6291', descr => 'arbitrary value from among input values',
   proname => 'any_value', prokind => 'a', proisstrict => 'f',
   prorettype => 'anyelement', proargtypes => 'anyelement',
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644 (file)
index 0000000..10ef63f
--- /dev/null
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ *   Declarations for LSN waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "postgres.h"
+#include "port/atomics.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/* Shared memory structures */
+typedef struct WaitLSNProcInfo
+{
+   int         procnum;
+   XLogRecPtr  waitLSN;
+} WaitLSNProcInfo;
+
+typedef struct WaitLSNState
+{
+   pg_atomic_uint64 minLSN;
+   slock_t     mutex;
+   int         numWaitedProcs;
+   WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+extern PGDLLIMPORT struct WaitLSNState *waitLSN;
+
+extern void WaitForLSN(XLogRecPtr targetLSN, int64 timeout);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr currentLSN);
+extern void WaitLSNCleanup(void);
+
+#endif                         /* WAIT_LSN_H */
index b1eb77b1ec107e5dcd7258843c0ca77d0770fb1c..712924c2fad52dd5d96b20ac16db12b004d5f4f1 100644 (file)
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_wal_replay_wait.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_wal_replay_wait.pl b/src/test/recovery/t/043_wal_replay_wait.pl
new file mode 100644 (file)
index 0000000..bbd64aa
--- /dev/null
@@ -0,0 +1,97 @@
+# Checks waiting for the lsn replay on standby using
+# pg_wal_replay_wait() procedure.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+   "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+   has_streaming => 1);
+$node_standby->append_conf(
+   'postgresql.conf', qq[
+   recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Make sure that pg_wal_replay_wait() works: add new content to
+# primary and memorize primary's insert LSN, then wait for that LSN to be
+# replayed on standby.
+$node_primary->safe_psql('postgres',
+   "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $output = $node_standby->safe_psql(
+   'postgres', qq[
+   CALL pg_wal_replay_wait('${lsn1}', 1000000);
+   SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Make sure the current LSN on standby is at least as big as the LSN we
+# observed on primary's before.
+ok($output >= 0,
+   "standby reached the same LSN as primary after pg_wal_replay_wait()");
+
+# Check that new data is visible after calling pg_wal_replay_wait()
+$node_primary->safe_psql('postgres',
+   "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+   'postgres', qq[
+   CALL pg_wal_replay_wait('${lsn2}');
+   SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the current LSN on standby and is the same as primary's LSN
+ok($output eq 30, "standby reached the same LSN as primary");
+
+# Check that waiting for unreachable LSN triggers the timeout.  The
+# unreachable LSN must be well in advance.  So WAL records issued by
+# the concurrent autovacuum could not affect that.
+my $lsn3 =
+  $node_primary->safe_psql('postgres',
+   "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $stderr;
+$node_standby->safe_psql('postgres',
+   "CALL pg_wal_replay_wait('${lsn2}', 10);");
+$node_standby->psql(
+   'postgres',
+   "CALL pg_wal_replay_wait('${lsn3}', 1000);",
+   stderr => \$stderr);
+ok( $stderr =~ /timed out while waiting for target LSN/,
+   "get timeout on waiting for unreachable LSN");
+
+# Check that the standby promotion terminates the wait on LSN.  Start
+# waiting for unreachable LSN then promote.  Check the log for the relevant
+# error message.
+my $psql_session = $node_standby->background_psql('postgres');
+$psql_session->query_until(
+   qr/start/, qq[
+   \\echo start
+   CALL pg_wal_replay_wait('${lsn3}');
+]);
+
+my $log_offset = -s $node_standby->logfile;
+$node_standby->promote;
+$node_standby->wait_for_log('recovery is not in progress', $log_offset);
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
index 8bc8dd6f1c64530ed99a4f91e17e2585c379dff2..d23e78b898a5be3119a3a8f62c3632428eea1335 100644 (file)
@@ -3059,6 +3059,8 @@ WaitEventIO
 WaitEventIPC
 WaitEventSet
 WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNState
 WaitPMResult
 WalCloseMethod
 WalCompression