Add option to control snapshot export to CREATE_REPLICATION_SLOT
authorPeter Eisentraut <[email protected]>
Tue, 14 Mar 2017 21:13:56 +0000 (17:13 -0400)
committerPeter Eisentraut <[email protected]>
Tue, 14 Mar 2017 21:34:22 +0000 (17:34 -0400)
We used to export snapshots unconditionally in CREATE_REPLICATION_SLOT
in the replication protocol, but several upcoming patches want more
control over what happens.

Suppress snapshot export in pg_recvlogical, which neither needs nor can
use the exported snapshot.  Since snapshot exporting can fail this
improves reliability.

This also paves the way for allowing the creation of replication slots
on standbys, which cannot export snapshots because they cannot allocate
new XIDs.

Author: Petr Jelinek <[email protected]>

doc/src/sgml/logicaldecoding.sgml
doc/src/sgml/protocol.sgml
src/backend/commands/subscriptioncmds.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/walsender.c
src/bin/pg_basebackup/streamutil.c
src/include/nodes/replnodes.h
src/include/replication/walreceiver.h

index 03c2c691d111f5fcc1cddda2cb2a2ba678331aba..bb22f9ae22d36eecd224bf68a1baf6bb838c391a 100644 (file)
@@ -271,8 +271,9 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
    <sect2>
     <title>Exported Snapshots</title>
     <para>
-     When a new replication slot is created using the streaming replication interface,
-     a snapshot is exported
+     When a new replication slot is created using the streaming replication
+     interface (see <xref linkend="protocol-replication-create-slot">), a
+     snapshot is exported
      (see <xref linkend="functions-snapshot-synchronization">), which will show
      exactly the state of the database after which all changes will be
      included in the change stream. This can be used to create a new replica by
@@ -282,6 +283,12 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
      database's state at that point in time, which afterwards can be updated
      using the slot's contents without losing any changes.
     </para>
+    <para>
+     Creation of a snapshot is not always possible.  In particular, it will
+     fail when connected to a hot standby.  Applications that do not require
+     snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</>
+     option.
+    </para>
    </sect2>
   </sect1>
 
index 67f3f455dbe8e7c859d75460e43a7b6faad5310d..7c82b488459c26450d63931e5cda10f9b3728be6 100644 (file)
@@ -1486,8 +1486,8 @@ The commands accepted in walsender mode are:
     </listitem>
   </varlistentry>
 
-  <varlistentry>
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
+  <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> [ <literal>EXPORT_SNAPSHOT</> | <literal>NOEXPORT_SNAPSHOT</> ] }
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1538,6 +1538,21 @@ The commands accepted in walsender mode are:
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry>
+       <term><literal>EXPORT_SNAPSHOT</></term>
+       <term><literal>NOEXPORT_SNAPSHOT</></term>
+       <listitem>
+        <para>
+         Decides what to do with the snapshot created during logical slot
+         initialization. <literal>EXPORT_SNAPSHOT</>, which is the default,
+         will export the snapshot for use in other sessions. This option can't
+         be used inside a transaction.  <literal>NOEXPORT_SNAPSHOT</> will
+         just use the snapshot for logical decoding as normal but won't do
+         anything else with it.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
 
      <para>
index 1868bf5f9ee3e5f8ca027eef4dd631286ab12b70..0198e6d75bad9693c313ac75dc1f1b07c33d1d7d 100644 (file)
@@ -314,7 +314,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
        PG_TRY();
        {
-           walrcv_create_slot(wrconn, slotname, false, &lsn);
+           /*
+            * Create permanent slot for the subscription.  We won't use the
+            * initial snapshot for anything, so no need to export it.
+            */
+           walrcv_create_slot(wrconn, slotname, false, false, &lsn);
            ereport(NOTICE,
                    (errmsg("created replication slot \"%s\" on publisher",
                            slotname)));
index ebadf3680f62708fcdf4abe71d092d6c48a97e35..cd2e57867c062acc8132abc8f91656481aaa35c2 100644 (file)
@@ -68,6 +68,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
 static char *libpqrcv_create_slot(WalReceiverConn *conn,
                                  const char *slotname,
                                  bool temporary,
+                                 bool export_snapshot,
                                  XLogRecPtr *lsn);
 static bool libpqrcv_command(WalReceiverConn *conn,
                             const char *cmd, char **err);
@@ -720,7 +721,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
  */
 static char *
 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
-                    bool temporary, XLogRecPtr *lsn)
+                    bool temporary, bool export_snapshot, XLogRecPtr *lsn)
 {
    PGresult       *res;
    StringInfoData  cmd;
@@ -728,13 +729,19 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 
    initStringInfo(&cmd);
 
-   appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
+   appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
 
    if (temporary)
-       appendStringInfo(&cmd, "TEMPORARY ");
+       appendStringInfo(&cmd, " TEMPORARY");
 
    if (conn->logical)
-       appendStringInfo(&cmd, "LOGICAL pgoutput");
+   {
+       appendStringInfo(&cmd, " LOGICAL pgoutput");
+       if (export_snapshot)
+           appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
+       else
+           appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
+   }
 
    res = libpqrcv_PQexec(conn->streamConn, cmd.data);
    pfree(cmd.data);
index b35d0f0cd1acc6cfbefe1d29b12f047005be63ea..f1e43bc9f3d82bdac1ccb65b814a93f4d4bdea39 100644 (file)
@@ -79,6 +79,8 @@ Node *replication_parse_result;
 %token K_SLOT
 %token K_RESERVE_WAL
 %token K_TEMPORARY
+%token K_EXPORT_SNAPSHOT
+%token K_NOEXPORT_SNAPSHOT
 
 %type <node>   command
 %type <node>   base_backup start_replication start_logical_replication
@@ -91,7 +93,9 @@ Node *replication_parse_result;
 %type <defelt> plugin_opt_elem
 %type <node>   plugin_opt_arg
 %type <str>        opt_slot var_name
-%type <boolval>    opt_reserve_wal opt_temporary
+%type <boolval>    opt_temporary
+%type <list>   create_slot_opt_list
+%type <defelt> create_slot_opt
 
 %%
 
@@ -202,18 +206,18 @@ base_backup_opt:
 
 create_replication_slot:
            /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
-           K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
+           K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
                {
                    CreateReplicationSlotCmd *cmd;
                    cmd = makeNode(CreateReplicationSlotCmd);
                    cmd->kind = REPLICATION_KIND_PHYSICAL;
                    cmd->slotname = $2;
                    cmd->temporary = $3;
-                   cmd->reserve_wal = $5;
+                   cmd->options = $5;
                    $$ = (Node *) cmd;
                }
            /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-           | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
+           | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
                {
                    CreateReplicationSlotCmd *cmd;
                    cmd = makeNode(CreateReplicationSlotCmd);
@@ -221,10 +225,36 @@ create_replication_slot:
                    cmd->slotname = $2;
                    cmd->temporary = $3;
                    cmd->plugin = $5;
+                   cmd->options = $6;
                    $$ = (Node *) cmd;
                }
            ;
 
+create_slot_opt_list:
+           create_slot_opt_list create_slot_opt
+               { $$ = lappend($1, $2); }
+           | /* EMPTY */
+               { $$ = NIL; }
+           ;
+
+create_slot_opt:
+           K_EXPORT_SNAPSHOT
+               {
+                 $$ = makeDefElem("export_snapshot",
+                                  (Node *)makeInteger(TRUE), -1);
+               }
+           | K_NOEXPORT_SNAPSHOT
+               {
+                 $$ = makeDefElem("export_snapshot",
+                                  (Node *)makeInteger(FALSE), -1);
+               }
+           | K_RESERVE_WAL
+               {
+                 $$ = makeDefElem("reserve_wal",
+                                  (Node *)makeInteger(TRUE), -1);
+               }
+           ;
+
 /* DROP_REPLICATION_SLOT slot */
 drop_replication_slot:
            K_DROP_REPLICATION_SLOT IDENT
@@ -291,11 +321,6 @@ opt_physical:
            | /* EMPTY */
            ;
 
-opt_reserve_wal:
-           K_RESERVE_WAL                   { $$ = true; }
-           | /* EMPTY */                   { $$ = false; }
-           ;
-
 opt_temporary:
            K_TEMPORARY                     { $$ = true; }
            | /* EMPTY */                   { $$ = false; }
index 37f857925e4b73ec13f40238e7bd3d632c169d79..f56d41d59c75bde235bc5943797c26c6fa690672 100644 (file)
@@ -100,6 +100,8 @@ RESERVE_WAL         { return K_RESERVE_WAL; }
 LOGICAL                { return K_LOGICAL; }
 SLOT               { return K_SLOT; }
 TEMPORARY          { return K_TEMPORARY; }
+EXPORT_SNAPSHOT        { return K_EXPORT_SNAPSHOT; }
+NOEXPORT_SNAPSHOT  { return K_NOEXPORT_SNAPSHOT; }
 
 ","                { return ','; }
 ";"                { return ';'; }
index dd3a936fc68ffa5ec651ac85da2fe34f9725d9d7..127efecb27d265395d7eb33727024afbb3bb259e 100644 (file)
@@ -51,6 +51,7 @@
 
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
+#include "commands/defrem.h"
 #include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -737,6 +738,48 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
    return count;
 }
 
+/*
+ * Process extra options given to CREATE_REPLICATION_SLOT.
+ */
+static void
+parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
+                          bool *reserve_wal,
+                          bool *export_snapshot)
+{
+   ListCell   *lc;
+   bool        snapshot_action_given = false;
+   bool        reserve_wal_given = false;
+
+   /* Parse options */
+   foreach (lc, cmd->options)
+   {
+       DefElem    *defel = (DefElem *) lfirst(lc);
+
+       if (strcmp(defel->defname, "export_snapshot") == 0)
+       {
+           if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+
+           snapshot_action_given = true;
+           *export_snapshot = defGetBoolean(defel);
+       }
+       else if (strcmp(defel->defname, "reserve_wal") == 0)
+       {
+           if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+
+           reserve_wal_given = true;
+           *reserve_wal = true;
+       }
+       else
+           elog(ERROR, "unrecognized option: %s", defel->defname);
+   }
+}
+
 /*
  * Create a new replication slot.
  */
@@ -746,6 +789,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
    const char *snapshot_name = NULL;
    char        xpos[MAXFNAMELEN];
    char       *slot_name;
+   bool        reserve_wal = false;
+   bool        export_snapshot = true;
    DestReceiver *dest;
    TupOutputState *tstate;
    TupleDesc   tupdesc;
@@ -754,6 +799,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
    Assert(!MyReplicationSlot);
 
+   parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot);
+
    /* setup state for XLogReadPage */
    sendTimeLineIsHistoric = false;
    sendTimeLine = ThisTimeLineID;
@@ -799,10 +846,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
        DecodingContextFindStartpoint(ctx);
 
        /*
-        * Export a plain (not of the snapbuild.c type) snapshot to the user
-        * that can be imported into another session.
+        * Export the snapshot if we've been asked to do so.
+        *
+        * NB. We will convert the snapbuild.c kind of snapshot to normal
+        * snapshot when doing this.
         */
-       snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+       if (export_snapshot)
+           snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
 
        /* don't need the decoding context anymore */
        FreeDecodingContext(ctx);
@@ -810,7 +860,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
        if (!cmd->temporary)
            ReplicationSlotPersist();
    }
-   else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
+   else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
    {
        ReplicationSlotReserveWal();
 
index 1fe42efc215208a438c4c2a98984ad291155cb79..507da5e76d161065ce01948c9d2dd05371a8486d 100644 (file)
@@ -338,8 +338,13 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
        appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
                          slot_name);
    else
+   {
        appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
                          slot_name, plugin);
+       if (PQserverVersion(conn) >= 100000)
+           /* pg_recvlogical doesn't use an exported snapshot, so suppress */
+           appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
+   }
 
    res = PQexec(conn, query->data);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
index f27354faaf337077658c563005320350c85e3367..996da3c02ea01bd2f5774cadef2b87dc40956661 100644 (file)
@@ -56,7 +56,7 @@ typedef struct CreateReplicationSlotCmd
    ReplicationKind kind;
    char       *plugin;
    bool        temporary;
-   bool        reserve_wal;
+   List       *options;
 } CreateReplicationSlotCmd;
 
 
index 0857bdc5566f9a646c4517287b36b8fe8d2f6d9d..78e577c89b170527b25ef9d5a4ef9d49ca6332ba 100644 (file)
@@ -183,7 +183,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
                                int nbytes);
 typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
                                        const char *slotname, bool temporary,
-                                       XLogRecPtr *lsn);
+                                       bool export_snapshot, XLogRecPtr *lsn);
 typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
                                   char **err);
 typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
@@ -224,8 +224,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
    WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
 #define walrcv_send(conn, buffer, nbytes) \
    WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, lsn) \
-   WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
+   WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
 #define walrcv_command(conn, cmd, err) \
    WalReceiverFunctions->walrcv_command(conn, cmd, err)
 #define walrcv_disconnect(conn) \