Allow pg_receivewal to stream from a slot's restart LSN
authorMichael Paquier <[email protected]>
Tue, 26 Oct 2021 00:30:37 +0000 (09:30 +0900)
committerMichael Paquier <[email protected]>
Tue, 26 Oct 2021 00:30:37 +0000 (09:30 +0900)
Prior to this patch, when running pg_receivewal, the streaming start
point would be the current location of the archives if anything is
found in the local directory where WAL segments are written, and
pg_receivewal would fall back to the current WAL flush location if there
are no archives, as of the result of an IDENTIFY_SYSTEM command.

If for some reason the WAL files from pg_receivewal were moved, it is
better to try a restart where we left at, which is the replication
slot's restart_lsn instead of skipping right to the current flush
location, to avoid holes in the WAL backed up.  This commit changes
pg_receivewal to use the following sequence of methods to determine the
starting streaming LSN:
- Scan the local archives.
- Use the slot's restart_lsn, if supported by the backend and if a slot
is defined.
- Fallback to the current flush LSN as reported by IDENTIFY_SYSTEM.

To keep compatibility with older server versions, we only attempt to use
READ_REPLICATION_SLOT if the backend version is at least 15, and
fallback to the older behavior of streaming from the current flush
LSN if the command is not supported.

Some TAP tests are added to cover this feature.

Author: Ronan Dunklau
Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath Rupireddy
Discussion: https://postgr.es/m/18708360.4lzOvYHigE@aivenronan

doc/src/sgml/ref/pg_receivewal.sgml
src/bin/pg_basebackup/pg_receivewal.c
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h
src/bin/pg_basebackup/t/020_pg_receivewal.pl

index 6da8b2be8c09182b9586b9365b8e221bfb5236d6..d3c74882937538916230700c4ba9e85f3b43cca0 100644 (file)
@@ -88,6 +88,17 @@ PostgreSQL documentation
      </para>
     </listitem>
 
+    <listitem>
+     <para>
+      If a starting point cannot not be calculated with the previous method,
+      and if a replication slot is used, an extra
+      <command>READ_REPLICATION_SLOT</command> command is issued to retrieve
+      the slot's <literal>restart_lsn</literal> to use as starting point.
+      This option is only available when streaming write-ahead logs from
+      <productname>PostgreSQL</productname> 15 and up.
+     </para>
+    </listitem>
+
     <listitem>
      <para>
       If a starting point cannot be calculated with the previous method,
index d5140a79fea9cab59d4afa5ad35e8609e0c6b27b..04ba20b1974168e9f48cd8680de14f87de69fd79 100644 (file)
@@ -404,15 +404,40 @@ StreamLog(void)
                exit(1);
 
        /*
-        * Figure out where to start streaming.
+        * Figure out where to start streaming.  First scan the local directory.
         */
        stream.startpos = FindStreamingStart(&stream.timeline);
        if (stream.startpos == InvalidXLogRecPtr)
        {
-               stream.startpos = serverpos;
-               stream.timeline = servertli;
+               /*
+                * Try to get the starting point from the slot if any.  This is
+                * supported in PostgreSQL 15 and newer.
+                */
+               if (replication_slot != NULL &&
+                       PQserverVersion(conn) >= 150000)
+               {
+                       if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
+                                                                       &stream.timeline))
+                       {
+                               /* Error is logged by GetSlotInformation() */
+                               return;
+                       }
+               }
+
+               /*
+                * If it the starting point is still not known, use the current WAL
+                * flush value as last resort.
+                */
+               if (stream.startpos == InvalidXLogRecPtr)
+               {
+                       stream.startpos = serverpos;
+                       stream.timeline = servertli;
+               }
        }
 
+       Assert(stream.startpos != InvalidXLogRecPtr &&
+                  stream.timeline != 0);
+
        /*
         * Always start streaming at the beginning of a segment
         */
index a9bc1ce2149363c687f881e711d84b6dbd8218d7..2a3e0c688fd5552925c0282ea5a134c7bf293338 100644 (file)
@@ -479,6 +479,103 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
        return true;
 }
 
+/*
+ * Run READ_REPLICATION_SLOT through a given connection and give back to
+ * caller some result information if requested for this slot:
+ * - Start LSN position, InvalidXLogRecPtr if unknown.
+ * - Current timeline ID, 0 if unknown.
+ * Returns false on failure, and true otherwise.
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name,
+                                  XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
+{
+       PGresult   *res;
+       PQExpBuffer query;
+       XLogRecPtr      lsn_loc = InvalidXLogRecPtr;
+       TimeLineID      tli_loc = 0;
+
+       if (restart_lsn)
+               *restart_lsn = lsn_loc;
+       if (restart_tli)
+               *restart_tli = tli_loc;
+
+       query = createPQExpBuffer();
+       appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+       res = PQexec(conn, query->data);
+       destroyPQExpBuffer(query);
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               pg_log_error("could not send replication command \"%s\": %s",
+                                        "READ_REPLICATION_SLOT", PQerrorMessage(conn));
+               PQclear(res);
+               return false;
+       }
+
+       /* The command should always return precisely one tuple and three fields */
+       if (PQntuples(res) != 1 || PQnfields(res) != 3)
+       {
+               pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
+                                        slot_name, PQntuples(res), PQnfields(res), 1, 3);
+               PQclear(res);
+               return false;
+       }
+
+       /*
+        * When the slot doesn't exist, the command returns a tuple with NULL
+        * values.  This checks only the slot type field.
+        */
+       if (PQgetisnull(res, 0, 0))
+       {
+               pg_log_error("could not find replication slot \"%s\"", slot_name);
+               PQclear(res);
+               return false;
+       }
+
+       /*
+        * Note that this cannot happen as READ_REPLICATION_SLOT supports only
+        * physical slots, but play it safe.
+        */
+       if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
+       {
+               pg_log_error("expected a physical replication slot, got type \"%s\" instead",
+                                        PQgetvalue(res, 0, 0));
+               PQclear(res);
+               return false;
+       }
+
+       /* restart LSN */
+       if (!PQgetisnull(res, 0, 1))
+       {
+               uint32          hi,
+                                       lo;
+
+               if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+               {
+                       pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
+                                                PQgetvalue(res, 0, 1), slot_name);
+                       PQclear(res);
+                       return false;
+               }
+               lsn_loc = ((uint64) hi) << 32 | lo;
+       }
+
+       /* current TLI */
+       if (!PQgetisnull(res, 0, 2))
+               tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2));
+
+       PQclear(res);
+
+       /* Assign results if requested */
+       if (restart_lsn)
+               *restart_lsn = lsn_loc;
+       if (restart_tli)
+               *restart_tli = tli_loc;
+
+       return true;
+}
+
 /*
  * Create a replication slot for the given connection. This function
  * returns true in case of success.
index 65135c79e080edc183b0798e5d17858a6f757993..7918935cb3153e2d403f0fc30abe32018fc5b735 100644 (file)
@@ -52,6 +52,9 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf,
                                                                           bool use_new_option_syntax,
                                                                           char *option_name, int32 option_value);
 
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name,
+                                                          XLogRecPtr *restart_lsn,
+                                                          TimeLineID *restart_tli);
 extern bool RetrieveWalSegSize(PGconn *conn);
 extern TimestampTz feGetCurrentTimestamp(void);
 extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
index b93493b5e9b24002985da35ead7002b86a8b29d4..092c9b6f2582c5c3a974d8faa3e6bc95aab60786 100644 (file)
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Utils;
 use PostgreSQL::Test::Cluster;
-use Test::More tests => 27;
+use Test::More tests => 31;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -72,6 +72,8 @@ $primary->command_ok(
 my @partial_wals = glob "$stream_dir/*\.partial";
 is(scalar(@partial_wals), 1, "one partial WAL segment was created");
 
+note "Testing pg_receivewal with compression methods";
+
 # Check ZLIB compression if available.
 SKIP:
 {
@@ -155,3 +157,52 @@ SKIP:
        ok(check_mode_recursive($stream_dir, 0700, 0600),
                "check stream dir permissions");
 }
+
+note "Testing pg_receivewal with slot as starting streaming point";
+
+# When using a replication slot, archiving should be resumed from the slot's
+# restart LSN.  Use a new archive location and new slot for this test.
+my $slot_dir = $primary->basedir . '/slot_wal';
+mkdir($slot_dir);
+$slot_name = 'archive_slot';
+
+# Setup the slot, reserving WAL at creation (corresponding to the
+# last redo LSN here, actually).
+$primary->psql('postgres',
+       "SELECT pg_create_physical_replication_slot('$slot_name', true);");
+
+# Get the segment name associated with the slot's restart LSN, that should
+# be archived.
+my $walfile_streamed = $primary->safe_psql(
+       'postgres',
+       "SELECT pg_walfile_name(restart_lsn)
+  FROM pg_replication_slots
+  WHERE slot_name = '$slot_name';");
+
+# Switch to a new segment, to make sure that the segment retained by the
+# slot is still streamed.  This may not be necessary, but play it safe.
+$primary->psql('postgres',
+       'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+
+# Check case where the slot does not exist.
+$primary->command_fails_like(
+       [
+               'pg_receivewal',   '-D', $slot_dir,   '--slot',
+               'nonexistentslot', '-n', '--no-sync', '--verbose',
+               '--endpos',        $nextlsn
+       ],
+       qr/pg_receivewal: error: could not find replication slot "nonexistentslot"/,
+       'pg_receivewal fails with non-existing slot');
+$primary->command_ok(
+       [
+               'pg_receivewal', '-D', $slot_dir,   '--slot',
+               $slot_name,      '-n', '--no-sync', '--verbose',
+               '--endpos',      $nextlsn
+       ],
+       "WAL streamed from the slot's restart_lsn");
+ok(-e "$slot_dir/$walfile_streamed",
+       "WAL from the slot's restart_lsn has been archived");