the pause, the rate of WAL generation and available disk space.
</para>
+ <para>
+ There are also procedures to control the progress of recovery.
+ They are shown in <xref linkend="procedures-recovery-control-table"/>.
+ These procedures may be executed only during recovery.
+ </para>
+
+ <table id="procedures-recovery-control-table">
+ <title>Recovery Control Procedures</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="func_table_entry"><para role="func_signature">
+ Procedure
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="func_table_entry"><para role="func_signature">
+ <indexterm>
+ <primary>pg_wal_replay_wait</primary>
+ </indexterm>
+ <function>pg_wal_replay_wait</function> (
+ <parameter>target_lsn</parameter> <type>pg_lsn</type>,
+ <parameter>timeout</parameter> <type>bigint</type> <literal>DEFAULT</literal> <literal>0</literal>)
+ <returnvalue>void</returnvalue>
+ </para>
+ <para>
+ If <parameter>timeout</parameter> is not specified or zero, this
+ procedure returns once WAL is replayed upto
+ <literal>target_lsn</literal>.
+ If the <parameter>timeout</parameter> is specified (in
+ milliseconds) and greater than zero, the procedure waits until the
+ server actually replays the WAL upto <literal>target_lsn</literal> or
+ until the given time has passed. On timeout, an error is emitted.
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ <para>
+ <function>pg_wal_replay_wait</function> waits till
+ <parameter>target_lsn</parameter> to be replayed on standby.
+ That is, after this function execution, the value returned by
+ <function>pg_last_wal_replay_lsn</function> should be greater or equal
+ to the <parameter>target_lsn</parameter> value. This is useful to achieve
+ read-your-writes-consistency, while using async replica for reads and
+ primary for writes. In that case <acronym>lsn</acronym> of the last
+ modification should be stored on the client application side or the
+ connection pooler side.
+ </para>
+
+ <para>
+ You can use <function>pg_wal_replay_wait</function> to wait for
+ the <type>pg_lsn</type> value. For example, an application could update
+ the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+ changes just made. This example uses <function>pg_current_wal_insert_lsn</function>
+ on primary server to get the <acronym>lsn</acronym> given that
+ <varname>synchronous_commit</varname> could be set to
+ <literal>off</literal>.
+
+ <programlisting>
+postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
+UPDATE 100
+postgres=# SELECT pg_current_wal_insert_lsn();
+pg_current_wal_insert_lsn
+--------------------
+0/306EE20
+(1 row)
+ </programlisting>
+
+ Then an application could run <function>pg_wal_replay_wait</function>
+ with the <acronym>lsn</acronym> obtained from primary. After that the
+ changes made of primary should be guaranteed to be visible on replica.
+
+ <programlisting>
+postgres=# CALL pg_wal_replay_wait('0/306EE20');
+CALL
+postgres=# SELECT * FROM movie WHERE genre = 'Drama';
+ genre
+-------
+(0 rows)
+ </programlisting>
+
+ It may also happen that target <acronym>lsn</acronym> is not achieved
+ within the timeout. In that case the error is thrown.
+
+ <programlisting>
+postgres=# CALL pg_wal_replay_wait('0/306EE20', 100);
+ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
+ </programlisting>
+
+ </para>
+
+ <para>
+ <function>pg_wal_replay_wait</function> can't be used within
+ the transaction, another procedure or function. Any of them holds a
+ snapshot, which could prevent the replay of WAL records.
+
+ <programlisting>
+postgres=# BEGIN;
+BEGIN
+postgres=*# CALL pg_wal_replay_wait('0/306EE20');
+ERROR: pg_wal_replay_wait() must be only called in non-atomic context
+DETAIL: Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.
+ </programlisting>
+
+ </para>
</sect2>
<sect2 id="functions-snapshot-synchronization">
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "catalog/pg_database.h"
+#include "commands/waitlsn.h"
#include "common/controldata_utils.h"
#include "common/file_utils.h"
#include "executor/instrument.h"
UpdateControlFile();
LWLockRelease(ControlFileLock);
+ /*
+ * Wake up all waiters for replay LSN. They need to report an error that
+ * recovery was ended before achieving the target LSN.
+ */
+ WaitLSNSetLatches(InvalidXLogRecPtr);
+
/*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
+#include "commands/waitlsn.h"
#include "common/file_utils.h"
#include "miscadmin.h"
#include "pgstat.h"
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for then walk
+ * over the shared memory array and set latches to notify the
+ * waiters.
+ */
+ if (waitLSN &&
+ (XLogRecoveryCtl->lastReplayedEndRecPtr >=
+ pg_atomic_read_u64(&waitLSN->minLSN)))
+ WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100 AS 'json_populate_recordset' PARALLEL SAFE;
+CREATE OR REPLACE PROCEDURE pg_wal_replay_wait(target_lsn pg_lsn, timeout int8 DEFAULT 0)
+ LANGUAGE internal AS 'pg_wal_replay_wait';
+
CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
OUT lsn pg_lsn, OUT xid xid, OUT data text)
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ waitlsn.o
include $(top_srcdir)/src/backend/common.mk
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'waitlsn.c',
)
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ * Implements waiting for the given LSN, which is used in
+ * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/waitlsn.h"
+#include "executor/spi.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void addLSNWaiter(XLogRecPtr lsn);
+static void deleteLSNWaiter(void);
+
+struct WaitLSNState *waitLSN = NULL;
+static volatile sig_atomic_t haveShmemItem = false;
+
+/*
+ * Report the amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitLSNState, procInfos);
+ size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+ return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+
+ waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&waitLSN->mutex);
+ waitLSN->numWaitedProcs = 0;
+ pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+ }
+}
+
+/*
+ * Add the information about the LSN waiter backend to the shared memory
+ * array.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+ WaitLSNProcInfo cur;
+ int i;
+
+ SpinLockAcquire(&waitLSN->mutex);
+
+ cur.procnum = MyProcNumber;
+ cur.waitLSN = lsn;
+
+ for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ {
+ if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
+ {
+ WaitLSNProcInfo tmp;
+
+ tmp = waitLSN->procInfos[i];
+ waitLSN->procInfos[i] = cur;
+ cur = tmp;
+ }
+ }
+ waitLSN->procInfos[i] = cur;
+ waitLSN->numWaitedProcs++;
+
+ pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+ SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete the information about the LSN waiter backend from the shared memory
+ * array.
+ */
+static void
+deleteLSNWaiter(void)
+{
+ int i;
+ bool found = false;
+
+ SpinLockAcquire(&waitLSN->mutex);
+
+ for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ {
+ if (waitLSN->procInfos[i].procnum == MyProcNumber)
+ found = true;
+
+ if (found && i < waitLSN->numWaitedProcs - 1)
+ {
+ waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
+ }
+ }
+
+ if (!found)
+ {
+ SpinLockRelease(&waitLSN->mutex);
+ return;
+ }
+ waitLSN->numWaitedProcs--;
+
+ if (waitLSN->numWaitedProcs != 0)
+ pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+ else
+ pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+ SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Set latches of LSN waiters whose LSN has been replayed. Set latches of all
+ * LSN waiters when InvalidXLogRecPtr is given.
+ */
+void
+WaitLSNSetLatches(XLogRecPtr currentLSN)
+{
+ int i;
+ int *wakeUpProcNums;
+ int numWakeUpProcs;
+
+ wakeUpProcNums = palloc(sizeof(int) * MaxBackends);
+
+ SpinLockAcquire(&waitLSN->mutex);
+
+ /*
+ * Remember processes, whose waited LSNs are already replayed. We should
+ * set their latches later after spinlock release.
+ */
+ for (i = 0; i < waitLSN->numWaitedProcs; i++)
+ {
+ if (!XLogRecPtrIsInvalid(currentLSN) &&
+ waitLSN->procInfos[i].waitLSN > currentLSN)
+ break;
+
+ wakeUpProcNums[i] = waitLSN->procInfos[i].procnum;
+ }
+
+ /*
+ * Immediately remove those processes from the shmem array. Otherwise,
+ * shmem array items will be here till corresponding processes wake up and
+ * delete themselves.
+ */
+ numWakeUpProcs = i;
+ for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
+ waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
+ waitLSN->numWaitedProcs -= numWakeUpProcs;
+
+ if (waitLSN->numWaitedProcs != 0)
+ pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+ else
+ pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+ SpinLockRelease(&waitLSN->mutex);
+
+ /*
+ * Set latches for processes, whose waited LSNs are already replayed. This
+ * involves spinlocks. So, we shouldn't do this under a spinlock.
+ */
+ for (i = 0; i < numWakeUpProcs; i++)
+ {
+ PGPROC *backend;
+
+ backend = GetPGProcByNumber(wakeUpProcNums[i]);
+ SetLatch(&backend->procLatch);
+ }
+ pfree(wakeUpProcNums);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+ if (haveShmemItem)
+ deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
+{
+ XLogRecPtr currentLSN;
+ TimestampTz endtime;
+
+ /* Shouldn't be called when shmem isn't initialized */
+ Assert(waitLSN);
+
+ /* Should be only called by a backend */
+ Assert(MyBackendType == B_BACKEND);
+
+ if (!RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for LSN can only be executed during recovery.")));
+
+ /* If target LSN is already replayed, exit immediately */
+ if (targetLSN <= GetXLogReplayRecPtr(NULL))
+ return;
+
+ if (timeout > 0)
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
+
+ addLSNWaiter(targetLSN);
+ haveShmemItem = true;
+
+ for (;;)
+ {
+ int rc;
+ int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+ long delay_ms = 0;
+
+ /* Check if the waited LSN has been replayed */
+ currentLSN = GetXLogReplayRecPtr(NULL);
+ if (targetLSN <= currentLSN)
+ break;
+
+ /* Recheck that recovery is still in-progress */
+ if (!RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.",
+ LSN_FORMAT_ARGS(targetLSN),
+ LSN_FORMAT_ARGS(currentLSN))));
+
+ if (timeout > 0)
+ {
+ delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+ latch_events |= WL_TIMEOUT;
+ if (delay_ms <= 0)
+ break;
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ rc = WaitLatch(MyLatch, latch_events, delay_ms,
+ WAIT_EVENT_WAIT_FOR_WAL_REPLAY);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ if (targetLSN > currentLSN)
+ {
+ deleteLSNWaiter();
+ haveShmemItem = false;
+ ereport(ERROR,
+ (errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
+ LSN_FORMAT_ARGS(targetLSN),
+ LSN_FORMAT_ARGS(currentLSN))));
+ }
+ else
+ {
+ haveShmemItem = false;
+ }
+}
+
+Datum
+pg_wal_replay_wait(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr target_lsn = PG_GETARG_LSN(0);
+ int64 timeout = PG_GETARG_INT64(1);
+ CallContext *context = (CallContext *) fcinfo->context;
+
+ if (timeout < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("\"timeout\" must not be negative")));
+
+ /*
+ * We are going to wait for the LSN replay. We should first care that we
+ * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+ * Otherwise, our snapshot could prevent the replay of WAL records
+ * implying a kind of self-deadlock. This is the reason why
+ * pg_wal_replay_wait() is a procedure, not a function.
+ *
+ * At first, we check that pg_wal_replay_wait() is called in a non-atomic
+ * context. That is, a procedure call isn't wrapped into a transaction,
+ * another procedure call, or a function call.
+ *
+ * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic
+ * context, CallStmt is processed with a snapshot. Thankfully, we can pop
+ * this snapshot, because PortalRunUtility() can tolerate this.
+ */
+ if (context->atomic)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("pg_wal_replay_wait() must be only called in non-atomic context"),
+ errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.")));
+
+ if (ActiveSnapshotSet())
+ PopActiveSnapshot();
+ Assert(!ActiveSnapshotSet());
+ InvalidateCatalogSnapshot();
+ Assert(MyProc->xmin == InvalidTransactionId);
+
+ (void) WaitForLSN(target_lsn, timeout);
+
+ PG_RETURN_VOID();
+}
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "commands/async.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
size = add_size(size, WaitEventExtensionShmemSize());
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
+ size = add_size(size, WaitLSNShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
/* Initialize subsystems */
CreateOrAttachShmemStructs();
+ /*
+ * Init array of Latches in shared memory for wait lsn
+ */
+ WaitLSNShmemInit();
+
#ifdef EXEC_BACKEND
/*
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_REPLAY "Waiting for a replay of the particular WAL position on the physical standby."
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202404021
+#define CATALOG_VERSION_NO 202404022
#endif
prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
prosrc => 'brin_minmax_multi_summary_send' },
+{ oid => '16387', descr => 'wait for LSN with timeout',
+ proname => 'pg_wal_replay_wait', prokind => 'p', prorettype => 'void',
+ proargtypes => 'pg_lsn int8', proargnames => '{target_lsn,timeout}',
+ prosrc => 'pg_wal_replay_wait' },
+
{ oid => '6291', descr => 'arbitrary value from among input values',
proname => 'any_value', prokind => 'a', proisstrict => 'f',
prorettype => 'anyelement', proargtypes => 'anyelement',
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ * Declarations for LSN waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "postgres.h"
+#include "port/atomics.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/* Shared memory structures */
+typedef struct WaitLSNProcInfo
+{
+ int procnum;
+ XLogRecPtr waitLSN;
+} WaitLSNProcInfo;
+
+typedef struct WaitLSNState
+{
+ pg_atomic_uint64 minLSN;
+ slock_t mutex;
+ int numWaitedProcs;
+ WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+extern PGDLLIMPORT struct WaitLSNState *waitLSN;
+
+extern void WaitForLSN(XLogRecPtr targetLSN, int64 timeout);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr currentLSN);
+extern void WaitLSNCleanup(void);
+
+#endif /* WAIT_LSN_H */
't/040_standby_failover_slots_sync.pl',
't/041_checkpoint_at_promote.pl',
't/042_low_level_backup.pl',
+ 't/043_wal_replay_wait.pl',
],
},
}
--- /dev/null
+# Checks waiting for the lsn replay on standby using
+# pg_wal_replay_wait() procedure.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf(
+ 'postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Make sure that pg_wal_replay_wait() works: add new content to
+# primary and memorize primary's insert LSN, then wait for that LSN to be
+# replayed on standby.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $output = $node_standby->safe_psql(
+ 'postgres', qq[
+ CALL pg_wal_replay_wait('${lsn1}', 1000000);
+ SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Make sure the current LSN on standby is at least as big as the LSN we
+# observed on primary's before.
+ok($output >= 0,
+ "standby reached the same LSN as primary after pg_wal_replay_wait()");
+
+# Check that new data is visible after calling pg_wal_replay_wait()
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ CALL pg_wal_replay_wait('${lsn2}');
+ SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the current LSN on standby and is the same as primary's LSN
+ok($output eq 30, "standby reached the same LSN as primary");
+
+# Check that waiting for unreachable LSN triggers the timeout. The
+# unreachable LSN must be well in advance. So WAL records issued by
+# the concurrent autovacuum could not affect that.
+my $lsn3 =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $stderr;
+$node_standby->safe_psql('postgres',
+ "CALL pg_wal_replay_wait('${lsn2}', 10);");
+$node_standby->psql(
+ 'postgres',
+ "CALL pg_wal_replay_wait('${lsn3}', 1000);",
+ stderr => \$stderr);
+ok( $stderr =~ /timed out while waiting for target LSN/,
+ "get timeout on waiting for unreachable LSN");
+
+# Check that the standby promotion terminates the wait on LSN. Start
+# waiting for unreachable LSN then promote. Check the log for the relevant
+# error message.
+my $psql_session = $node_standby->background_psql('postgres');
+$psql_session->query_until(
+ qr/start/, qq[
+ \\echo start
+ CALL pg_wal_replay_wait('${lsn3}');
+]);
+
+my $log_offset = -s $node_standby->logfile;
+$node_standby->promote;
+$node_standby->wait_for_log('recovery is not in progress', $log_offset);
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
WaitEventIPC
WaitEventSet
WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNState
WaitPMResult
WalCloseMethod
WalCompression