Allow enabling two-phase option via replication protocol.
authorAmit Kapila <[email protected]>
Wed, 30 Jun 2021 03:15:47 +0000 (08:45 +0530)
committerAmit Kapila <[email protected]>
Wed, 30 Jun 2021 03:15:47 +0000 (08:45 +0530)
Extend the replication command CREATE_REPLICATION_SLOT to support the
TWO_PHASE option. This will allow decoding commands like PREPARE
TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED for slots created with
this option. The decoding of the transaction happens at prepare command.

This patch also adds support of two-phase in pg_recvlogical via a new
option --two-phase.

This option will also be used by future patches that allow streaming of
transactions at prepare time for built-in logical replication. With this,
the out-of-core logical replication solutions can enable replication of
two-phase transactions via replication protocol.

Author: Ajin Cherian
Reviewed-By: Jeff Davis, Vignesh C, Amit Kapila
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/64b9f783c6e125f18f88fbc0c0234e34e71d8639[email protected]

12 files changed:
doc/src/sgml/logicaldecoding.sgml
doc/src/sgml/protocol.sgml
doc/src/sgml/ref/pg_recvlogical.sgml
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/walsender.c
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivewal.c
src/bin/pg_basebackup/pg_recvlogical.c
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h
src/bin/pg_basebackup/t/030_pg_recvlogical.pl

index 5b8065901a4f250ae8e3a0b0aefd70f4a6b4cccf..985db5ca11e72a4e953448c044f3ee103d66bc38 100644 (file)
@@ -144,16 +144,19 @@ postgres=# SELECT pg_drop_replication_slot('regression_slot');
 </programlisting>
 
    <para>
-    The following example shows how logical decoding is controlled over the
+    The following examples shows how logical decoding is controlled over the
     streaming replication protocol, using the
     program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
     distribution.  This requires that client authentication is set up to allow
     replication connections
     (see <xref linkend="streaming-replication-authentication"/>) and
     that <varname>max_wal_senders</varname> is set sufficiently high to allow
-    an additional connection.
+    an additional connection.  The second example shows how to stream two-phase
+    transactions.  Before you use two-phase commands, you must set
+    <xref linkend="guc-max-prepared-transactions"/> to atleast 1.
    </para>
 <programlisting>
+Example 1:
 $ pg_recvlogical -d postgres --slot=test --create-slot
 $ pg_recvlogical -d postgres --slot=test --start -f -
 <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
@@ -164,6 +167,22 @@ table public.data: INSERT: id[integer]:4 data[text]:'4'
 COMMIT 693
 <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
 $ pg_recvlogical -d postgres --slot=test --drop-slot
+
+Example 2:
+$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
+$ pg_recvlogical -d postgres --slot=test --start -f -
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
+$ fg
+BEGIN 694
+table public.data: INSERT: id[integer]:5 data[text]:'5'
+PREPARE TRANSACTION 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "COMMIT PREPARED 'test';"
+$ fg
+COMMIT PREPARED 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
+$ pg_recvlogical -d postgres --slot=test --drop-slot
 </programlisting>
 
   <para>
index 01e87617f4051da1e2576d95747633f77b6aaf63..a3562f3d0891d77d9f40522e5fc6ebe5aba2eff7 100644 (file)
@@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
   </varlistentry>
 
   <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> ] }
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1955,6 +1955,20 @@ The commands accepted in replication mode are:
        </listitem>
       </varlistentry>
 
+      <varlistentry>
+       <term><literal>TWO_PHASE</literal></term>
+       <listitem>
+        <para>
+         Specify that this logical replication slot supports decoding of two-phase
+         transactions. With this option, two-phase commands like
+         <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
+         and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+         The transaction will be decoded and transmitted at
+         <literal>PREPARE TRANSACTION</literal> time.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry>
        <term><literal>RESERVE_WAL</literal></term>
        <listitem>
index 6b1d98d06ef1f69c06840aba465e9b79ba946ecd..1a882254095c4790fc4aabd5b424535634969db1 100644 (file)
@@ -65,6 +65,11 @@ PostgreSQL documentation
         <option>--plugin</option>, for the database specified
         by <option>--dbname</option>.
        </para>
+
+       <para>
+        The <option>--two-phase</option> can be specified with
+        <option>--create-slot</option> to enable two-phase decoding.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -256,6 +261,17 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+       <term><option>-t</option></term>
+       <term><option>--two-phase</option></term>
+       <listitem>
+       <para>
+        Enables two-phase decoding. This option should only be specified with
+        <option>--create-slot</option>
+       </para>
+       </listitem>
+     </varlistentry>
+
      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
index eb283a86327c824bf45a207bd3f9f8c852ec614b..e1e8ec29cc4a5ec4fd87a290f075d204a54448ad 100644 (file)
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
 %token K_SLOT
 %token K_RESERVE_WAL
 %token K_TEMPORARY
+%token K_TWO_PHASE
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
@@ -283,6 +284,11 @@ create_slot_opt:
                  $$ = makeDefElem("reserve_wal",
                                   (Node *)makeInteger(true), -1);
                }
+           | K_TWO_PHASE
+               {
+                 $$ = makeDefElem("two_phase",
+                                  (Node *)makeInteger(true), -1);
+               }
            ;
 
 /* DROP_REPLICATION_SLOT slot */
index dcc3c3fc515cb66960e94299211b359ce70c496f..c038a636c3839baf25f8fc2ad6586b74b47b3fff 100644 (file)
@@ -103,6 +103,7 @@ RESERVE_WAL         { return K_RESERVE_WAL; }
 LOGICAL                { return K_LOGICAL; }
 SLOT               { return K_SLOT; }
 TEMPORARY          { return K_TEMPORARY; }
+TWO_PHASE          { return K_TWO_PHASE; }
 EXPORT_SNAPSHOT        { return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT  { return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT       { return K_USE_SNAPSHOT; }
index 322453635613c83553bd34c1858e449d6e00d662..92c755f346eba61fd61679944eeff3fb19d07e97 100644 (file)
@@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 static void
 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
                           bool *reserve_wal,
-                          CRSSnapshotAction *snapshot_action)
+                          CRSSnapshotAction *snapshot_action,
+                          bool *two_phase)
 {
    ListCell   *lc;
    bool        snapshot_action_given = false;
    bool        reserve_wal_given = false;
+   bool        two_phase_given = false;
 
    /* Parse options */
    foreach(lc, cmd->options)
@@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
            reserve_wal_given = true;
            *reserve_wal = true;
        }
+       else if (strcmp(defel->defname, "two_phase") == 0)
+       {
+           if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           two_phase_given = true;
+           *two_phase = true;
+       }
        else
            elog(ERROR, "unrecognized option: %s", defel->defname);
    }
@@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
    char        xloc[MAXFNAMELEN];
    char       *slot_name;
    bool        reserve_wal = false;
+   bool        two_phase = false;
    CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
    DestReceiver *dest;
    TupOutputState *tstate;
@@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
    Assert(!MyReplicationSlot);
 
-   parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
+   parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
 
    /* setup state for WalSndSegmentOpen */
    sendTimeLineIsHistoric = false;
@@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
         */
        ReplicationSlotCreate(cmd->slotname, true,
                              cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-                             false);
+                             two_phase);
    }
 
    if (cmd->kind == REPLICATION_KIND_LOGICAL)
index 16d8929b238fd18276e30c4a0090a41b83f65162..8bb0acf498eb7095cc993e5a6f1301c1c7b7ccf9 100644 (file)
@@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
    if (temp_replication_slot || create_slot)
    {
        if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
-                                  temp_replication_slot, true, true, false))
+                                  temp_replication_slot, true, true, false, false))
            exit(1);
 
        if (verbose)
index 0d15012c295f62340e1e77575e53ce221cdfa708..c1334fad357105c130f1fc433620999bf4662e48 100644 (file)
@@ -741,7 +741,7 @@ main(int argc, char **argv)
            pg_log_info("creating replication slot \"%s\"", replication_slot);
 
        if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
-                                  slot_exists_ok))
+                                  slot_exists_ok, false))
            exit(1);
        exit(0);
    }
index 5efec160e884759e15fc948751b2e81b91797269..76bd153fac2413f897c3ab35c9209d6d3566193e 100644 (file)
@@ -35,6 +35,7 @@
 /* Global Options */
 static char *outfile = NULL;
 static int verbose = 0;
+static bool two_phase = false;
 static int noloop = 0;
 static int standby_message_timeout = 10 * 1000;    /* 10 sec = default */
 static int fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -93,6 +94,7 @@ usage(void)
    printf(_("  -s, --status-interval=SECS\n"
             "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
    printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
+   printf(_("  -t, --two-phase        enable two-phase decoding when creating a slot\n"));
    printf(_("  -v, --verbose          output verbose messages\n"));
    printf(_("  -V, --version          output version information, then exit\n"));
    printf(_("  -?, --help             show this help, then exit\n"));
@@ -678,6 +680,7 @@ main(int argc, char **argv)
        {"fsync-interval", required_argument, NULL, 'F'},
        {"no-loop", no_argument, NULL, 'n'},
        {"verbose", no_argument, NULL, 'v'},
+       {"two-phase", no_argument, NULL, 't'},
        {"version", no_argument, NULL, 'V'},
        {"help", no_argument, NULL, '?'},
 /* connection options */
@@ -726,7 +729,7 @@ main(int argc, char **argv)
        }
    }
 
-   while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
+   while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
                            long_options, &option_index)) != -1)
    {
        switch (c)
@@ -749,6 +752,9 @@ main(int argc, char **argv)
            case 'v':
                verbose++;
                break;
+           case 't':
+               two_phase = true;
+               break;
 /* connection options */
            case 'd':
                dbname = pg_strdup(optarg);
@@ -920,6 +926,15 @@ main(int argc, char **argv)
        exit(1);
    }
 
+   if (two_phase && !do_create_slot)
+   {
+       pg_log_error("--two-phase may only be specified with --create-slot");
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+
 #ifndef WIN32
    pqsignal(SIGINT, sigint_handler);
    pqsignal(SIGHUP, sighup_handler);
@@ -976,7 +991,7 @@ main(int argc, char **argv)
            pg_log_info("creating replication slot \"%s\"", replication_slot);
 
        if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
-                                  false, false, slot_exists_ok))
+                                  false, false, slot_exists_ok, two_phase))
            exit(1);
        startpos = InvalidXLogRecPtr;
    }
index 99daf0e97278d2440e83f772e55852133239f208..f5b3b476e5229047d512d529fc82e55756dba23b 100644 (file)
@@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
                      bool is_temporary, bool is_physical, bool reserve_wal,
-                     bool slot_exists_ok)
+                     bool slot_exists_ok, bool two_phase)
 {
    PQExpBuffer query;
    PGresult   *res;
@@ -495,6 +495,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 
    Assert((is_physical && plugin == NULL) ||
           (!is_physical && plugin != NULL));
+   Assert(!(two_phase && is_physical));
    Assert(slot_name != NULL);
 
    /* Build query */
@@ -510,6 +511,9 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
    else
    {
        appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+       if (two_phase && PQserverVersion(conn) >= 150000)
+           appendPQExpBufferStr(query, " TWO_PHASE");
+
        if (PQserverVersion(conn) >= 100000)
            /* pg_recvlogical doesn't use an exported snapshot, so suppress */
            appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
index 10f87ad0c14b31987176a20486e5ab1c29427b4a..504803b976354e659821c7fcbc24e460e8246ffd 100644 (file)
@@ -34,7 +34,7 @@ extern PGconn *GetConnection(void);
 extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
                                  const char *plugin, bool is_temporary,
                                  bool is_physical, bool reserve_wal,
-                                 bool slot_exists_ok);
+                                 bool slot_exists_ok, bool two_phase);
 extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
                              TimeLineID *starttli,
index 53f41814b0b205207b1b628b90d4e994335885e0..bbbf9e21dba868e9fed5e81bad0110b0635ecb95 100644 (file)
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 15;
+use Test::More tests => 20;
 
 program_help_ok('pg_recvlogical');
 program_version_ok('pg_recvlogical');
@@ -22,6 +22,7 @@ max_replication_slots = 4
 max_wal_senders = 4
 log_min_messages = 'debug1'
 log_error_verbosity = verbose
+max_prepared_transactions = 10
 });
 $node->dump_info;
 $node->start;
@@ -63,3 +64,45 @@ $node->command_ok(
        '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
    ],
    'replayed a transaction');
+
+$node->command_ok(
+   [
+       'pg_recvlogical',           '-S',
+       'test',                     '-d',
+       $node->connstr('postgres'), '--drop-slot'
+   ],
+   'slot dropped');
+
+#test with two-phase option enabled
+$node->command_ok(
+   [
+       'pg_recvlogical',           '-S',
+       'test',                     '-d',
+       $node->connstr('postgres'), '--create-slot', '--two-phase'
+   ],
+   'slot with two-phase created');
+
+$slot = $node->slot('test');
+isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
+
+$node->safe_psql('postgres',
+   "BEGIN; INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'");
+$node->safe_psql('postgres',
+   "COMMIT PREPARED 'test'");
+$nextlsn =
+  $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
+chomp($nextlsn);
+
+$node->command_fails(
+   [
+       'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+       '--start', '--endpos', "$nextlsn", '--two-phase', '--no-loop', '-f', '-'
+   ],
+   'incorrect usage');
+
+$node->command_ok(
+   [
+       'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+       '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+   ],
+   'replayed a two-phase transaction');