Allow altering of two_phase option of a SUBSCRIPTION.
authorAmit Kapila <[email protected]>
Wed, 24 Jul 2024 04:43:36 +0000 (10:13 +0530)
committerAmit Kapila <[email protected]>
Wed, 24 Jul 2024 04:43:36 +0000 (10:13 +0530)
The two_phase option is controlled by both the publisher (as a slot
option) and the subscriber (as a subscription option), so the slot option
must also be modified.

Changing the 'two_phase' option for a subscription from 'true' to 'false'
is permitted only when there are no pending prepared transactions
corresponding to that subscription. Otherwise, the changes of already
prepared transactions can be replicated again along with their corresponding
commit leading to duplicate data or errors.

To avoid data loss, the 'two_phase' option for a subscription can only be
changed from 'false' to 'true' once the initial data synchronization is
completed. Therefore this is performed later by the logical replication worker.

Author: Hayato Kuroda, Ajin Cherian, Amit Kapila
Reviewed-by: Peter Smith, Hou Zhijie, Amit Kapila, Vitaly Davydov, Vignesh C
Discussion: https://postgr.es/m/8fab8-65d74c80-1-2f28e880@39088166

17 files changed:
doc/src/sgml/protocol.sgml
doc/src/sgml/ref/alter_subscription.sgml
src/backend/access/transam/twophase.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/worker.c
src/backend/replication/slot.c
src/backend/replication/walsender.c
src/bin/psql/tab-complete.c
src/include/access/twophase.h
src/include/replication/slot.h
src/include/replication/walreceiver.h
src/include/replication/worker_internal.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/021_twophase.pl

index 1b27d0a547998ca4304a2dbbbc263a551d33336e..79cd59969264e9ed2d6fc47a47153f43abf6d8aa 100644 (file)
@@ -2192,7 +2192,23 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
        </varlistentry>
       </variablelist>
 
-      <para>The following option is supported:</para>
+      <para>The following options are supported:</para>
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>TWO_PHASE [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
+        <listitem>
+         <para>
+          If true, this logical replication slot supports decoding of two-phase
+          commit. With this option, commands related to two-phase commit such as
+          <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
+          and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+          The transaction will be decoded and transmitted at
+          <literal>PREPARE TRANSACTION</literal> time.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
 
       <variablelist>
        <varlistentry>
index 476f1956223806cbcde14146723a67361ce5e131..6af6d0d2c8d3fa23bb023a9063cf83a70d54f06c 100644 (file)
@@ -68,8 +68,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
   <para>
    Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command>,
    <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command>
-   with <literal>refresh</literal> option as <literal>true</literal> and
-   <command>ALTER SUBSCRIPTION ... SET (failover = true|false)</command>
+   with <literal>refresh</literal> option as <literal>true</literal>,
+   <command>ALTER SUBSCRIPTION ... SET (failover = true|false)</command> and
+   <command>ALTER SUBSCRIPTION ... SET (two_phase = false)</command>
    cannot be executed inside a transaction block.
 
    These commands also cannot be executed when the subscription has
@@ -228,8 +229,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
       <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
-      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>.
+      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
+      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
+      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
      </para>
 
@@ -252,6 +254,32 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
       option is enabled.
      </para>
+
+     <para>
+      The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
+      and <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+      parameters can only be altered when the subscription is disabled.
+     </para>
+
+     <para>
+      When altering <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+      from <literal>true</literal> to <literal>false</literal>, the backend
+      process reports an error if any prepared transactions done by the
+      logical replication worker (from when <literal>two_phase</literal>
+      parameter was still <literal>true</literal>) are found. You can resolve
+      prepared transactions on the publisher node, or manually roll back them
+      on the subscriber, and then try again. The transactions prepared by
+      logical replication worker corresponding to a particular subscription have
+      the following pattern: <quote><literal>pg_gid_%u_%u</literal></quote>
+      (parameters: subscription <parameter>oid</parameter>, remote transaction id <parameter>xid</parameter>).
+      To resolve such transactions manually, you need to roll back all
+      the prepared transactions with corresponding subscription IDs in their
+      names. Applications can check
+      <link linkend="view-pg-prepared-xacts"><structname>pg_prepared_xacts</structname></link>
+      to find the required prepared transactions. After the <literal>two_phase</literal>
+      option is changed from <literal>true</literal> to <literal>false</literal>,
+      the publisher will replicate the transactions again when they are committed.
+     </para>
     </listitem>
    </varlistentry>
 
index 9a8257fcafbb1a8dab57614ca1cdc79723bda5f5..e98286d768b0788e94a3c8a0d4bb0a2198b81de7 100644 (file)
@@ -2681,3 +2681,82 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
    LWLockRelease(TwoPhaseStateLock);
    return found;
 }
+
+/*
+ * TwoPhaseTransactionGid
+ *     Form the prepared transaction GID for two_phase transactions.
+ *
+ * Return the GID in the supplied buffer.
+ */
+void
+TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
+{
+   Assert(OidIsValid(subid));
+
+   if (!TransactionIdIsValid(xid))
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("invalid two-phase transaction ID")));
+
+   snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
+}
+
+/*
+ * IsTwoPhaseTransactionGidForSubid
+ *     Check whether the given GID (as formed by TwoPhaseTransactionGid) is
+ *     for the specified 'subid'.
+ */
+static bool
+IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
+{
+   int         ret;
+   Oid         subid_from_gid;
+   TransactionId xid_from_gid;
+   char        gid_tmp[GIDSIZE];
+
+   /* Extract the subid and xid from the given GID */
+   ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
+
+   /*
+    * Check that the given GID has expected format, and at least the subid
+    * matches.
+    */
+   if (ret != 2 || subid != subid_from_gid)
+       return false;
+
+   /*
+    * Reconstruct a temporary GID based on the subid and xid extracted from
+    * the given GID and check whether the temporary GID and the given GID
+    * match.
+    */
+   TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
+
+   return strcmp(gid, gid_tmp) == 0;
+}
+
+/*
+ * LookupGXactBySubid
+ *     Check if the prepared transaction done by apply worker exists.
+ */
+bool
+LookupGXactBySubid(Oid subid)
+{
+   bool        found = false;
+
+   LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+   for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+   {
+       GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+       /* Ignore not-yet-valid GIDs. */
+       if (gxact->valid &&
+           IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
+       {
+           found = true;
+           break;
+       }
+   }
+   LWLockRelease(TwoPhaseStateLock);
+
+   return found;
+}
index 16d83b32539a677f67a29368c729d75af61f2303..d124bfe55caaaacef01f53bc5d93715cbf6ff6da 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "access/htup_details.h"
 #include "access/table.h"
+#include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
@@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn,
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static void CheckAlterSubOption(Subscription *sub, const char *option,
+                               bool slot_needs_update, bool isTopLevel);
 
 
 /*
@@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
            opts->specified_opts |= SUBOPT_STREAMING;
            opts->streaming = defGetStreamingMode(defel);
        }
-       else if (strcmp(defel->defname, "two_phase") == 0)
+       else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
+                strcmp(defel->defname, "two_phase") == 0)
        {
-           /*
-            * Do not allow toggling of two_phase option. Doing so could cause
-            * missing of transactions and lead to an inconsistent replica.
-            * See comments atop worker.c
-            *
-            * Note: Unsupported twophase indicates that this call originated
-            * from AlterSubscription.
-            */
-           if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
-               ereport(ERROR,
-                       (errcode(ERRCODE_SYNTAX_ERROR),
-                        errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
-
            if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
                errorConflictingDefElem(defel, pstate);
 
@@ -1079,6 +1070,60 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
        table_close(rel, NoLock);
 }
 
+/*
+ * Common checks for altering failover and two_phase options.
+ */
+static void
+CheckAlterSubOption(Subscription *sub, const char *option,
+                   bool slot_needs_update, bool isTopLevel)
+{
+   /*
+    * The checks in this function are required only for failover and
+    * two_phase options.
+    */
+   Assert(strcmp(option, "failover") == 0 ||
+          strcmp(option, "two_phase") == 0);
+
+   /*
+    * Do not allow changing the option if the subscription is enabled. This
+    * is because both failover and two_phase options of the slot on the
+    * publisher cannot be modified if the slot is currently acquired by the
+    * existing walsender.
+    *
+    * Note that two_phase is enabled (aka changed from 'false' to 'true') on
+    * the publisher by the existing walsender, so we could have allowed that
+    * even when the subscription is enabled. But we kept this restriction for
+    * the sake of consistency and simplicity.
+    */
+   if (sub->enabled)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("cannot set %s for enabled subscription",
+                       option)));
+
+   if (slot_needs_update)
+   {
+       StringInfoData cmd;
+
+       /*
+        * A valid slot must be associated with the subscription for us to
+        * modify any of the slot's properties.
+        */
+       if (!sub->slotname)
+           ereport(ERROR,
+                   (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                    errmsg("cannot set %s for a subscription that does not have a slot name",
+                           option)));
+
+       /* The changed option of the slot can't be rolled back. */
+       initStringInfo(&cmd);
+       appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
+
+       PreventInTransactionBlock(isTopLevel, cmd.data);
+       pfree(cmd.data);
+   }
+}
+
 /*
  * Alter the existing subscription.
  */
@@ -1094,6 +1139,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    HeapTuple   tup;
    Oid         subid;
    bool        update_tuple = false;
+   bool        update_failover = false;
+   bool        update_two_phase = false;
    Subscription *sub;
    Form_pg_subscription form;
    bits32      supported_opts;
@@ -1145,7 +1192,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
            {
                supported_opts = (SUBOPT_SLOT_NAME |
                                  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-                                 SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+                                 SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+                                 SUBOPT_DISABLE_ON_ERR |
                                  SUBOPT_PASSWORD_REQUIRED |
                                  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
                                  SUBOPT_ORIGIN);
@@ -1227,31 +1275,81 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                    replaces[Anum_pg_subscription_subrunasowner - 1] = true;
                }
 
-               if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+               if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
                {
-                   if (!sub->slotname)
+                   /*
+                    * We need to update both the slot and the subscription
+                    * for the two_phase option. We can enable the two_phase
+                    * option for a slot only once the initial data
+                    * synchronization is done. This is to avoid missing some
+                    * data as explained in comments atop worker.c.
+                    */
+                   update_two_phase = !opts.twophase;
+
+                   CheckAlterSubOption(sub, "two_phase", update_two_phase,
+                                       isTopLevel);
+
+                   /*
+                    * Modifying the two_phase slot option requires a slot
+                    * lookup by slot name, so changing the slot name at the
+                    * same time is not allowed.
+                    */
+                   if (update_two_phase &&
+                       IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
+                       ereport(ERROR,
+                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                errmsg("slot_name and two_phase cannot be altered at the same time")));
+
+                   /*
+                    * Note that workers may still survive even if the
+                    * subscription has been disabled.
+                    *
+                    * Ensure workers have already been exited to avoid
+                    * getting prepared transactions while we are disabling
+                    * the two_phase option. Otherwise, the changes of an
+                    * already prepared transaction can be replicated again
+                    * along with its corresponding commit, leading to
+                    * duplicate data or errors.
+                    */
+                   if (logicalrep_workers_find(subid, true, true))
                        ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("cannot set %s for a subscription that does not have a slot name",
-                                       "failover")));
+                                errmsg("cannot alter two_phase when logical replication worker is still running"),
+                                errhint("Try again after some time.")));
 
                    /*
-                    * Do not allow changing the failover state if the
-                    * subscription is enabled. This is because the failover
-                    * state of the slot on the publisher cannot be modified
-                    * if the slot is currently acquired by the apply worker.
+                    * two_phase cannot be disabled if there are any
+                    * uncommitted prepared transactions present otherwise it
+                    * can lead to duplicate data or errors as explained in
+                    * the comment above.
                     */
-                   if (sub->enabled)
+                   if (update_two_phase &&
+                       sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+                       LookupGXactBySubid(subid))
                        ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("cannot set %s for enabled subscription",
-                                       "failover")));
+                                errmsg("cannot disable two_phase when prepared transactions are present"),
+                                errhint("Resolve these transactions and try again.")));
+
+                   /* Change system catalog accordingly */
+                   values[Anum_pg_subscription_subtwophasestate - 1] =
+                       CharGetDatum(opts.twophase ?
+                                    LOGICALREP_TWOPHASE_STATE_PENDING :
+                                    LOGICALREP_TWOPHASE_STATE_DISABLED);
+                   replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+               }
 
+               if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+               {
                    /*
-                    * The changed failover option of the slot can't be rolled
-                    * back.
+                    * Similar to the two_phase case above, we need to update
+                    * the failover option for both the slot and the
+                    * subscription.
                     */
-                   PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (failover)");
+                   update_failover = true;
+
+                   CheckAlterSubOption(sub, "failover", update_failover,
+                                       isTopLevel);
 
                    values[Anum_pg_subscription_subfailover - 1] =
                        BoolGetDatum(opts.failover);
@@ -1501,13 +1599,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
    }
 
    /*
-    * Try to acquire the connection necessary for altering slot.
+    * Try to acquire the connection necessary for altering the slot, if
+    * needed.
     *
     * This has to be at the end because otherwise if there is an error while
     * doing the database operations we won't be able to rollback altered
     * slot.
     */
-   if (replaces[Anum_pg_subscription_subfailover - 1])
+   if (update_failover || update_two_phase)
    {
        bool        must_use_password;
        char       *err;
@@ -1528,7 +1627,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
        PG_TRY();
        {
-           walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
+           walrcv_alter_slot(wrconn, sub->slotname,
+                             update_failover ? &opts.failover : NULL,
+                             update_two_phase ? &opts.twophase : NULL);
        }
        PG_FINALLY();
        {
@@ -1675,9 +1776,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
     * New workers won't be started because we hold an exclusive lock on the
     * subscription till the end of the transaction.
     */
-   LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-   subworkers = logicalrep_workers_find(subid, false);
-   LWLockRelease(LogicalRepWorkerLock);
+   subworkers = logicalrep_workers_find(subid, false, true);
    foreach(lc, subworkers)
    {
        LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
index 6c42c209d2977c49c40890fda3a11ec8954d25b6..97f957cd87b48424171ec792a1a84e078dd5ff9e 100644 (file)
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
                                  CRSSnapshotAction snapshot_action,
                                  XLogRecPtr *lsn);
 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-                               bool failover);
+                               const bool *failover, const bool *two_phase);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
                                       const char *query,
@@ -1121,15 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  */
 static void
 libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-                   bool failover)
+                   const bool *failover, const bool *two_phase)
 {
    StringInfoData cmd;
    PGresult   *res;
 
    initStringInfo(&cmd);
-   appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
-                    quote_identifier(slotname),
-                    failover ? "true" : "false");
+   appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
+                    quote_identifier(slotname));
+
+   if (failover)
+       appendStringInfo(&cmd, "FAILOVER %s",
+                        *failover ? "true" : "false");
+
+   if (failover && two_phase)
+       appendStringInfo(&cmd, ", ");
+
+   if (two_phase)
+       appendStringInfo(&cmd, "TWO_PHASE %s",
+                        *two_phase ? "true" : "false");
+
+   appendStringInfoString(&cmd, " );");
 
    res = libpqrcv_PQexec(conn->streamConn, cmd.data);
    pfree(cmd.data);
index 27c3a91fb75ea6c769d12ab2d2b737873f9fd51c..c566d50a072b92bd07f4179100275d0d0b1f4c7c 100644 (file)
@@ -272,11 +272,14 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
  * the subscription, instead of just one.
  */
 List *
-logicalrep_workers_find(Oid subid, bool only_running)
+logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
 {
    int         i;
    List       *res = NIL;
 
+   if (acquire_lock)
+       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
    Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
    /* Search for attached worker for a given subscription id. */
@@ -288,6 +291,9 @@ logicalrep_workers_find(Oid subid, bool only_running)
            res = lappend(res, w);
    }
 
+   if (acquire_lock)
+       LWLockRelease(LogicalRepWorkerLock);
+
    return res;
 }
 
@@ -759,7 +765,7 @@ logicalrep_worker_detach(void)
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-       workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
+       workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
        foreach(lc, workers)
        {
            LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
index c0bda6269bd96cce6135a802bb4360cce94f4d41..ec96b5fe85e6d44173327142cd831745cb7884af 100644 (file)
@@ -401,9 +401,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
                                       LogicalRepTupleData *newtup,
                                       CmdType operation);
 
-/* Compute GID for two_phase transactions */
-static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
-
 /* Functions for skipping changes */
 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
 static void stop_skipping_changes(void);
@@ -3911,7 +3908,7 @@ maybe_reread_subscription(void)
    /* !slotname should never happen when enabled is true. */
    Assert(newsub->slotname);
 
-   /* two-phase should not be altered */
+   /* two-phase cannot be altered while the worker is running */
    Assert(newsub->twophasestate == MySubscription->twophasestate);
 
    /*
@@ -4396,24 +4393,6 @@ cleanup_subxact_info()
    subxact_data.nsubxacts_max = 0;
 }
 
-/*
- * Form the prepared transaction GID for two_phase transactions.
- *
- * Return the GID in the supplied buffer.
- */
-static void
-TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
-{
-   Assert(subid != InvalidRepOriginId);
-
-   if (!TransactionIdIsValid(xid))
-       ereport(ERROR,
-               (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                errmsg_internal("invalid two-phase transaction ID")));
-
-   snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
-}
-
 /*
  * Common function to run the apply loop with error handling. Disable the
  * subscription, if necessary.
@@ -5014,7 +4993,7 @@ AtEOXact_LogicalRepWorkers(bool isCommit)
            List       *workers;
            ListCell   *lc2;
 
-           workers = logicalrep_workers_find(subid, true);
+           workers = logicalrep_workers_find(subid, true, false);
            foreach(lc2, workers)
            {
                LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
index baf9b89dc4297632143ddd75e14cef766a40382c..c290339af5f05f33ec9d8cfbc2fb3c713099d2ea 100644 (file)
@@ -804,9 +804,13 @@ ReplicationSlotDrop(const char *name, bool nowait)
  * Change the definition of the slot identified by the specified name.
  */
 void
-ReplicationSlotAlter(const char *name, bool failover)
+ReplicationSlotAlter(const char *name, const bool *failover,
+                    const bool *two_phase)
 {
+   bool        update_slot = false;
+
    Assert(MyReplicationSlot == NULL);
+   Assert(failover || two_phase);
 
    ReplicationSlotAcquire(name, false);
 
@@ -832,28 +836,45 @@ ReplicationSlotAlter(const char *name, bool failover)
         * Do not allow users to enable failover on the standby as we do not
         * support sync to the cascading standby.
         */
-       if (failover)
+       if (failover && *failover)
            ereport(ERROR,
                    errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                    errmsg("cannot enable failover for a replication slot"
                           " on the standby"));
    }
 
-   /*
-    * Do not allow users to enable failover for temporary slots as we do not
-    * support syncing temporary slots to the standby.
-    */
-   if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
-       ereport(ERROR,
-               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-               errmsg("cannot enable failover for a temporary replication slot"));
+   if (failover)
+   {
+       /*
+        * Do not allow users to enable failover for temporary slots as we do
+        * not support syncing temporary slots to the standby.
+        */
+       if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
+           ereport(ERROR,
+                   errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                   errmsg("cannot enable failover for a temporary replication slot"));
+
+       if (MyReplicationSlot->data.failover != *failover)
+       {
+           SpinLockAcquire(&MyReplicationSlot->mutex);
+           MyReplicationSlot->data.failover = *failover;
+           SpinLockRelease(&MyReplicationSlot->mutex);
+
+           update_slot = true;
+       }
+   }
 
-   if (MyReplicationSlot->data.failover != failover)
+   if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
    {
        SpinLockAcquire(&MyReplicationSlot->mutex);
-       MyReplicationSlot->data.failover = failover;
+       MyReplicationSlot->data.two_phase = *two_phase;
        SpinLockRelease(&MyReplicationSlot->mutex);
 
+       update_slot = true;
+   }
+
+   if (update_slot)
+   {
        ReplicationSlotMarkDirty();
        ReplicationSlotSave();
    }
index ca205594bd09c9065ce781574729ebaa77cd0eeb..c5f1009f3706ed0153aba4d276af3d50a920e2f3 100644 (file)
@@ -1407,12 +1407,15 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 }
 
 /*
- * Process extra options given to ALTER_REPLICATION_SLOT.
+ * Change the definition of a replication slot.
  */
 static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
 {
    bool        failover_given = false;
+   bool        two_phase_given = false;
+   bool        failover;
+   bool        two_phase;
 
    /* Parse options */
    foreach_ptr(DefElem, defel, cmd->options)
@@ -1424,23 +1427,24 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
            failover_given = true;
-           *failover = defGetBoolean(defel);
+           failover = defGetBoolean(defel);
+       }
+       else if (strcmp(defel->defname, "two_phase") == 0)
+       {
+           if (two_phase_given)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           two_phase_given = true;
+           two_phase = defGetBoolean(defel);
        }
        else
            elog(ERROR, "unrecognized option: %s", defel->defname);
    }
-}
-
-/*
- * Change the definition of a replication slot.
- */
-static void
-AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
-{
-   bool        failover = false;
 
-   ParseAlterReplSlotOptions(cmd, &failover);
-   ReplicationSlotAlter(cmd->slotname, failover);
+   ReplicationSlotAlter(cmd->slotname,
+                        failover_given ? &failover : NULL,
+                        two_phase_given ? &two_phase : NULL);
 }
 
 /*
index d453e224d93301a696b79cbdb6ca3d307093332e..891face1b654b3ed30e22684f983bca98f65e9f7 100644 (file)
@@ -1948,7 +1948,7 @@ psql_completion(const char *text, int start, int end)
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
        COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
                      "password_required", "run_as_owner", "slot_name",
-                     "streaming", "synchronous_commit");
+                     "streaming", "synchronous_commit", "two_phase");
    /* ALTER SUBSCRIPTION <name> SKIP ( */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
        COMPLETE_WITH("lsn");
index 56248c0006317d38a45f664557e4a00774badcf0..b85b65c604e257d201d0a3d4a506eeb3c9868cdb 100644 (file)
@@ -62,4 +62,9 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
                        TimestampTz origin_prepare_timestamp);
+
+extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res,
+                                  int szgid);
+extern bool LookupGXactBySubid(Oid subid);
+
 #endif                         /* TWOPHASE_H */
index c9675ee87cc9c708a6fd9ed416f81908fc739edf..c2ee149fd666819bc9f7366f62495b345dfb5ebc 100644 (file)
@@ -243,7 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
-extern void ReplicationSlotAlter(const char *name, bool failover);
+extern void ReplicationSlotAlter(const char *name, const bool *failover,
+                                const bool *two_phase);
 
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
index 12f71fa99b0caf9b4617925e4b1ee0adadfc0f50..132e789948bb924de0c0b6db7b02b3b7279bdf55 100644 (file)
@@ -372,12 +372,14 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 /*
  * walrcv_alter_slot_fn
  *
- * Change the definition of a replication slot. Currently, it only supports
- * changing the failover property of the slot.
+ * Change the definition of a replication slot. Currently, it supports
+ * changing the failover and two_phase properties of the slot.
  */
 typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
                                      const char *slotname,
-                                     bool failover);
+                                     const bool *failover,
+                                     const bool *two_phase);
+
 
 /*
  * walrcv_get_backend_pid_fn
@@ -455,8 +457,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
    WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
 #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
    WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
-#define walrcv_alter_slot(conn, slotname, failover) \
-   WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
+#define walrcv_alter_slot(conn, slotname, failover, two_phase) \
+   WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
 #define walrcv_get_backend_pid(conn) \
    WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
index 515aefd519125d39550d8d00feb342cea6894026..9646261d7e99913793861f073b1af3d07693ad36 100644 (file)
@@ -240,7 +240,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                bool only_running);
-extern List *logicalrep_workers_find(Oid subid, bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running,
+                                    bool acquire_lock);
 extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                     Oid dbid, Oid subid, const char *subname,
                                     Oid userid, Oid relid,
index 5c2f1ee51718b21206b5b9728ecc4ccbbe28d499..17d48b16857d044a3aea9b423a14ea0ab4928c14 100644 (file)
@@ -377,10 +377,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-ERROR:  unrecognized subscription parameter: "two_phase"
--- but can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
                                                                                                                 List of subscriptions
index 3e5ba4cb8c6c3630023e07aa4f84bd1aa55a4d32..007c9e7037463e7fab9ea42b0359aa67c7d2dd07 100644 (file)
@@ -256,10 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 
 \dRs+
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-
--- but can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
 \dRs+
index 9437cd4c3b71308a51eeae070b32e24a03568b6d..a47d3b7dd6e1589989977721e6516426c0f09f1d 100644 (file)
@@ -367,6 +367,99 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
 is($result, qq(2), 'replicated data in subscriber table');
 
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+###############################
+# Alter the subscription to set two_phase to false.
+# Verify that the altered subscription reflects the new two_phase option.
+###############################
+
+# Confirm that the two-phase slot option is enabled before altering
+$result = $node_publisher->safe_psql('postgres',
+   "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';"
+);
+is($result, qq(t), 'two-phase is enabled');
+
+# Alter subscription two_phase to false
+$node_subscriber->safe_psql('postgres',
+   "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
+$node_subscriber->poll_query_until('postgres',
+   "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
+);
+$node_subscriber->safe_psql(
+   'postgres', "
+    ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = false);
+    ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
+
+# Wait for subscription startup
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
+
+# Make sure that the two-phase is disabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(d), 'two-phase subscription option should be disabled');
+
+# Make sure that the two-phase slot option is also disabled
+$result = $node_publisher->safe_psql('postgres',
+   "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';"
+);
+is($result, qq(f), 'two-phase slot option should be disabled');
+
+###############################
+# Now do a prepare on the publisher and verify that it is not replicated.
+###############################
+$node_publisher->safe_psql(
+   'postgres', qq{
+    BEGIN;
+    INSERT INTO tab_copy VALUES (100);
+    PREPARE TRANSACTION 'newgid';
+   });
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Make sure there are no prepared transactions on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'should be no prepared transactions on subscriber');
+
+###############################
+# Set two_phase to "true" and failover to "true" before the COMMIT PREPARED.
+#
+# This tests the scenario where both two_phase and failover are altered
+# simultaneously.
+###############################
+$node_subscriber->safe_psql('postgres',
+   "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
+$node_subscriber->poll_query_until('postgres',
+   "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
+);
+$node_subscriber->safe_psql(
+   'postgres', "
+    ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true, failover = true);
+    ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
+
+###############################
+# Now commit the insert and verify that it is replicated.
+###############################
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Make sure that the committed transaction is replicated.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(3), 'replicated data in subscriber table');
+
+# Make sure that the two-phase is enabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(e), 'two-phase should be enabled');
+
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
 
@@ -374,8 +467,6 @@ $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
 # check all the cleanup
 ###############################
 
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
-
 $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*) FROM pg_subscription");
 is($result, qq(0), 'check subscription was dropped on subscriber');