Add replication command READ_REPLICATION_SLOT
authorMichael Paquier <[email protected]>
Sun, 24 Oct 2021 22:40:42 +0000 (07:40 +0900)
committerMichael Paquier <[email protected]>
Sun, 24 Oct 2021 22:40:42 +0000 (07:40 +0900)
The command is supported for physical slots for now, and returns the
type of slot, its restart_lsn and its restart_tli.

This will be useful for an upcoming patch related to pg_receivewal, to
allow the tool to be able to stream from the position of a slot, rather
than the last WAL position flushed by the backend (as reported by
IDENTIFY_SYSTEM) if the archive directory is found as empty, which would
be an advantage in the case of switching to a different archive
locations with the same slot used to avoid holes in WAL segment
archives.

Author: Ronan Dunklau
Reviewed-by: Kyotaro Horiguchi, Michael Paquier, Bharath Rupireddy
Discussion: https://postgr.es/m/18708360.4lzOvYHigE@aivenronan

doc/src/sgml/protocol.sgml
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/walsender.c
src/include/nodes/nodes.h
src/include/nodes/replnodes.h
src/test/recovery/t/001_stream_rep.pl
src/test/recovery/t/006_logical_decoding.pl
src/tools/pgindent/typedefs.list

index b95cc88599a1850a46999c402129d2b661262565..132436c6e6842956d8987297b5a0c62f90f8ee37 100644 (file)
@@ -2067,6 +2067,54 @@ The commands accepted in replication mode are:
     </listitem>
   </varlistentry>
 
+  <varlistentry>
+    <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
+      <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
+    </term>
+    <listitem>
+     <para>
+      Read some information associated to a replication slot. Returns a tuple
+      with <literal>NULL</literal> values if the replication slot does not
+      exist. This command is currently only supported for physical replication
+      slots.
+     </para>
+     <para>
+      In response to this command, the server will return a one-row result set,
+      containing the following fields:
+      <variablelist>
+       <varlistentry>
+        <term><literal>slot_type</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's type, either <literal>physical</literal> or
+          <literal>NULL</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_lsn</literal> (<type>text</type>)</term>
+        <listitem>
+         <para>
+          The replication slot's <literal>restart_lsn</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
+        <term><literal>restart_tli</literal> (<type>int8</type>)</term>
+        <listitem>
+         <para>
+          The timeline ID associated to <literal>restart_lsn</literal>,
+          following the current timeline history.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
   <varlistentry>
     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
      <indexterm><primary>START_REPLICATION</primary></indexterm>
index 126380e2df77d3f29e391c0db0d68f9d38bf5bfe..dcb1108579e0ac931a935b21b95e697d9aee2180 100644 (file)
@@ -64,6 +64,7 @@ static SQLCmd *make_sqlcmd(void);
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_READ_REPLICATION_SLOT
 %token K_SHOW
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
@@ -94,7 +95,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>   command
 %type <node>   base_backup start_replication start_logical_replication
                                create_replication_slot drop_replication_slot identify_system
-                               timeline_history show sql_cmd
+                               read_replication_slot timeline_history show sql_cmd
 %type <list>   base_backup_legacy_opt_list generic_option_list
 %type <defelt> base_backup_legacy_opt generic_option
 %type <uintval>        opt_timeline
@@ -125,6 +126,7 @@ command:
                        | start_logical_replication
                        | create_replication_slot
                        | drop_replication_slot
+                       | read_replication_slot
                        | timeline_history
                        | show
                        | sql_cmd
@@ -140,6 +142,18 @@ identify_system:
                                }
                        ;
 
+/*
+ * READ_REPLICATION_SLOT %s
+ */
+read_replication_slot:
+                       K_READ_REPLICATION_SLOT var_name
+                               {
+                                       ReadReplicationSlotCmd *n = makeNode(ReadReplicationSlotCmd);
+                                       n->slotname = $2;
+                                       $$ = (Node *) n;
+                               }
+                       ;
+
 /*
  * SHOW setting
  */
index c038a636c3839baf25f8fc2ad6586b74b47b3fff..1b599c255e8fbcb5d2e7439d2adbf081eec7de7a 100644 (file)
@@ -85,6 +85,7 @@ identifier            {ident_start}{ident_cont}*
 BASE_BACKUP                    { return K_BASE_BACKUP; }
 FAST                   { return K_FAST; }
 IDENTIFY_SYSTEM                { return K_IDENTIFY_SYSTEM; }
+READ_REPLICATION_SLOT  { return K_READ_REPLICATION_SLOT; }
 SHOW           { return K_SHOW; }
 LABEL                  { return K_LABEL; }
 NOWAIT                 { return K_NOWAIT; }
index b811a5c0ef2b77e89eb6e5ca91bafb7e4ea42c1e..d8f5f113c02ef341a459cecf02c0d9c018ecd82a 100644 (file)
@@ -232,6 +232,7 @@ static void XLogSendLogical(void);
 static void WalSndDone(WalSndSendDataCallback send_data);
 static XLogRecPtr GetStandbyFlushRecPtr(void);
 static void IdentifySystem(void);
+static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
 static void StartReplication(StartReplicationCmd *cmd);
@@ -457,6 +458,104 @@ IdentifySystem(void)
        end_tup_output(tstate);
 }
 
+/* Handle READ_REPLICATION_SLOT command */
+static void
+ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
+{
+#define READ_REPLICATION_SLOT_COLS 3
+       ReplicationSlot *slot;
+       DestReceiver *dest;
+       TupOutputState *tstate;
+       TupleDesc       tupdesc;
+       Datum           values[READ_REPLICATION_SLOT_COLS];
+       bool            nulls[READ_REPLICATION_SLOT_COLS];
+
+       tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
+                                                         TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
+                                                         TEXTOID, -1, 0);
+       /* TimeLineID is unsigned, so int4 is not wide enough. */
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
+                                                         INT8OID, -1, 0);
+
+       MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum));
+       MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
+
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+       slot = SearchNamedReplicationSlot(cmd->slotname, false);
+       if (slot == NULL || !slot->in_use)
+       {
+               LWLockRelease(ReplicationSlotControlLock);
+       }
+       else
+       {
+               ReplicationSlot slot_contents;
+               int                     i = 0;
+
+               /* Copy slot contents while holding spinlock */
+               SpinLockAcquire(&slot->mutex);
+               slot_contents = *slot;
+               SpinLockRelease(&slot->mutex);
+               LWLockRelease(ReplicationSlotControlLock);
+
+               if (OidIsValid(slot_contents.data.database))
+                       ereport(ERROR,
+                                       errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                       errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
+                                                  "READ_REPLICATION_SLOT",
+                                                  NameStr(slot_contents.data.name)));
+
+               /* slot type */
+               values[i] = CStringGetTextDatum("physical");
+               nulls[i] = false;
+               i++;
+
+               /* start LSN */
+               if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+               {
+                       char            xloc[64];
+
+                       snprintf(xloc, sizeof(xloc), "%X/%X",
+                                        LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
+                       values[i] = CStringGetTextDatum(xloc);
+                       nulls[i] = false;
+               }
+               i++;
+
+               /* timeline this WAL was produced on */
+               if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+               {
+                       TimeLineID      slots_position_timeline;
+                       TimeLineID      current_timeline;
+                       List       *timeline_history = NIL;
+
+                       /*
+                        * While in recovery, use as timeline the currently-replaying one
+                        * to get the LSN position's history.
+                        */
+                       if (RecoveryInProgress())
+                               (void) GetXLogReplayRecPtr(&current_timeline);
+                       else
+                               current_timeline = ThisTimeLineID;
+
+                       timeline_history = readTimeLineHistory(current_timeline);
+                       slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
+                                                                                                                 timeline_history);
+                       values[i] = Int64GetDatum((int64) slots_position_timeline);
+                       nulls[i] = false;
+               }
+               i++;
+
+               Assert(i == READ_REPLICATION_SLOT_COLS);
+       }
+
+       dest = CreateDestReceiver(DestRemoteSimple);
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+       do_tup_output(tstate, values, nulls);
+       end_tup_output(tstate);
+}
+
 
 /*
  * Handle TIMELINE_HISTORY command.
@@ -1622,6 +1721,13 @@ exec_replication_command(const char *cmd_string)
                        EndReplicationCommand(cmdtag);
                        break;
 
+               case T_ReadReplicationSlotCmd:
+                       cmdtag = "READ_REPLICATION_SLOT";
+                       set_ps_display(cmdtag);
+                       ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
+                       EndReplicationCommand(cmdtag);
+                       break;
+
                case T_BaseBackupCmd:
                        cmdtag = "BASE_BACKUP";
                        set_ps_display(cmdtag);
index e0057daa0616a3cbf35a6aa1afedbbe877e5fd36..541e9861bab388b05de1dc7654822d4c75701282 100644 (file)
@@ -496,6 +496,7 @@ typedef enum NodeTag
        T_BaseBackupCmd,
        T_CreateReplicationSlotCmd,
        T_DropReplicationSlotCmd,
+       T_ReadReplicationSlotCmd,
        T_StartReplicationCmd,
        T_TimeLineHistoryCmd,
        T_SQLCmd,
index faa3a251f26d51b7a0f06ad1217a7cbae64f4198..a746fafc1216c6f853bcfa941b8d79a1f7ffb0b8 100644 (file)
@@ -87,6 +87,17 @@ typedef struct StartReplicationCmd
 } StartReplicationCmd;
 
 
+/* ----------------------
+ *             READ_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct ReadReplicationSlotCmd
+{
+       NodeTag         type;
+       char       *slotname;
+} ReadReplicationSlotCmd;
+
+
 /* ----------------------
  *             TIMELINE_HISTORY command
  * ----------------------
index bc62ec66bce9da80d4d5bec87ee9809b92951c38..b3dc68d54bc827ee61d4b1f185e60d0ecf09bd74 100644 (file)
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 49;
+use Test::More tests => 53;
 
 # Initialize primary node
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
@@ -254,6 +254,36 @@ ok( $ret == 0,
        "SHOW with superuser-settable parameter, replication role and logical replication"
 );
 
+note "testing READ_REPLICATION_SLOT command for replication connection";
+
+my $slotname = 'test_read_replication_slot_physical';
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+       'postgres',
+       'READ_REPLICATION_SLOT non_existent_slot;',
+       extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success");
+like($stdout, qr/^||$/,
+       "READ_REPLICATION_SLOT returns NULL values if slot does not exist");
+
+$node_primary->psql(
+       'postgres',
+       "CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
+       extra_params => [ '-d', $connstr_rep ]);
+
+($ret, $stdout, $stderr) = $node_primary->psql(
+       'postgres',
+       "READ_REPLICATION_SLOT $slotname;",
+       extra_params => [ '-d', $connstr_rep ]);
+ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot");
+like($stdout, qr/^physical\|[^|]*\|1$/,
+       "READ_REPLICATION_SLOT returns tuple with slot information");
+
+$node_primary->psql(
+       'postgres',
+       "DROP_REPLICATION_SLOT $slotname;",
+       extra_params => [ '-d', $connstr_rep ]);
+
 note "switching to physical replication slot";
 
 # Switch to using a physical replication slot. We can do this without a new
index 1655298bf5ba1a6d1cce0492ff39c184f45b6c26..46ea5a3866ef69240e416a9a2ddec8543fd36b51 100644 (file)
@@ -10,7 +10,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 14;
+use Test::More tests => 15;
 use Config;
 
 # Initialize primary node
@@ -39,6 +39,15 @@ ok( $stderr =~
          m/replication slot "test_slot" was not created in this database/,
        "Logical decoding correctly fails to start");
 
+($result, $stdout, $stderr) = $node_primary->psql(
+       'template1',
+       qq[READ_REPLICATION_SLOT test_slot;],
+       replication => 'database');
+like(
+       $stderr,
+       qr/cannot use "READ_REPLICATION_SLOT" with logical replication slot "test_slot"/,
+       'READ_REPLICATION_SLOT not supported for logical slots');
+
 # Check case of walsender not using a database connection.  Logical
 # decoding should not be allowed.
 ($result, $stdout, $stderr) = $node_primary->psql(
index 1c7bac0578c7893da9693fd256a09982bae1ef58..40fbcddd2012e664e084521ef31784522fbd33bc 100644 (file)
@@ -2129,6 +2129,7 @@ ReadBufferMode
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
+ReadReplicationSlotCmd
 ReassignOwnedStmt
 RecheckForeignScan_function
 RecordCacheEntry