Allow multiple xacts during table sync in logical replication.
authorAmit Kapila <[email protected]>
Fri, 12 Feb 2021 02:11:51 +0000 (07:41 +0530)
committerAmit Kapila <[email protected]>
Fri, 12 Feb 2021 02:11:51 +0000 (07:41 +0530)
For the initial table data synchronization in logical replication, we use
a single transaction to copy the entire table and then synchronize the
position in the stream with the main apply worker.

There are multiple downsides of this approach: (a) We have to perform the
entire copy operation again if there is any error (network breakdown,
error in the database operation, etc.) while we synchronize the WAL
position between tablesync worker and apply worker; this will be onerous
especially for large copies, (b) Using a single transaction in the
synchronization-phase (where we can receive WAL from multiple
transactions) will have the risk of exceeding the CID limit, (c) The slot
will hold the WAL till the entire sync is complete because we never commit
till the end.

This patch solves all the above downsides by allowing multiple
transactions during the tablesync phase. The initial copy is done in a
single transaction and after that, we commit each transaction as we
receive. To allow recovery after any error or crash, we use a permanent
slot and origin to track the progress. The slot and origin will be removed
once we finish the synchronization of the table. We also remove slot and
origin of tablesync workers if the user performs DROP SUBSCRIPTION .. or
ALTER SUBSCRIPTION .. REFERESH and some of the table syncs are still not
finished.

The commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and
ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh option as true
cannot be executed inside a transaction block because they can now drop
the slots for which we have no provision to rollback.

This will also open up the path for logical replication of 2PC
transactions on the subscriber side. Previously, we can't do that because
of the requirement of maintaining a single transaction in tablesync
workers.

Bump catalog version due to change of state in the catalog
(pg_subscription_rel).

Author: Peter Smith, Amit Kapila, and Takamichi Osumi
Reviewed-by: Ajin Cherian, Petr Jelinek, Hou Zhijie and Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1KHJxaZS-fod-0fey=0tq3=Gkn4ho=8N4-5HWiCfu0H1A@mail.gmail.com

23 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/drop_subscription.sgml
src/backend/access/transam/xact.c
src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/tcop/utility.c
src/include/catalog/catversion.h
src/include/catalog/pg_subscription_rel.h
src/include/commands/subscriptioncmds.h
src/include/replication/logicallauncher.h
src/include/replication/slot.h
src/include/replication/walreceiver.h
src/include/replication/worker_internal.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/004_sync.pl
src/tools/pgindent/typedefs.list

index ea222c04640b123c2cf54f36f60be987bfb354a6..692ad65de2d77cf807792df8111b6c77f373ea68 100644 (file)
@@ -7673,6 +7673,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        State code:
        <literal>i</literal> = initialize,
        <literal>d</literal> = data is being copied,
+       <literal>f</literal> = finished table copy,
        <literal>s</literal> = synchronized,
        <literal>r</literal> = ready (normal replication)
       </para></entry>
index a560ad69b44bb6b7f8b7bd533a035e1a41f3ba70..d0742f2c526b197b7f068efce321cc015f83afb1 100644 (file)
 
   <para>
    Each subscription will receive changes via one replication slot (see
-   <xref linkend="streaming-replication-slots"/>).  Additional temporary
-   replication slots may be required for the initial data synchronization
-   of pre-existing table data.
+   <xref linkend="streaming-replication-slots"/>).  Additional replication
+   slots may be required for the initial data synchronization of
+   pre-existing table data and those will be dropped at the end of data
+   synchronization.
   </para>
 
   <para>
 
    <para>
     As mentioned earlier, each (active) subscription receives changes from a
-    replication slot on the remote (publishing) side.  Normally, the remote
-    replication slot is created automatically when the subscription is created
-    using <command>CREATE SUBSCRIPTION</command> and it is dropped
-    automatically when the subscription is dropped using <command>DROP
-    SUBSCRIPTION</command>.  In some situations, however, it can be useful or
-    necessary to manipulate the subscription and the underlying replication
-    slot separately.  Here are some scenarios:
+    replication slot on the remote (publishing) side.
+   </para>
+   <para>
+    Additional table synchronization slots are normally transient, created
+    internally to perform initial table synchronization and dropped
+    automatically when they are no longer needed. These table synchronization
+    slots have generated names: <quote><literal>pg_%u_sync_%u_%llu</literal></quote>
+    (parameters: Subscription <parameter>oid</parameter>,
+    Table <parameter>relid</parameter>, system identifier <parameter>sysid</parameter>)
+   </para>
+   <para>
+    Normally, the remote replication slot is created automatically when the
+    subscription is created using <command>CREATE SUBSCRIPTION</command> and it
+    is dropped automatically when the subscription is dropped using
+    <command>DROP SUBSCRIPTION</command>.  In some situations, however, it can
+    be useful or necessary to manipulate the subscription and the underlying
+    replication slot separately.  Here are some scenarios:
 
     <itemizedlist>
      <listitem>
        using <command>ALTER SUBSCRIPTION</command> before attempting to drop
        the subscription.  If the remote database instance no longer exists, no
        further action is then necessary.  If, however, the remote database
-       instance is just unreachable, the replication slot should then be
-       dropped manually; otherwise it would continue to reserve WAL and might
+       instance is just unreachable, the replication slot (and any still 
+       remaining table synchronization slots) should then be
+       dropped manually; otherwise it/they would continue to reserve WAL and might
        eventually cause the disk to fill up.  Such cases should be carefully
        investigated.
       </para>
   <sect2 id="logical-replication-snapshot">
     <title>Initial Snapshot</title>
     <para>
-      The initial data in existing subscribed tables are snapshotted and
-      copied in a parallel instance of a special kind of apply process.
-      This process will create its own temporary replication slot and
-      copy the existing data. Once existing data is copied, the worker
-      enters synchronization mode, which ensures that the table is brought
-      up to a synchronized state with the main apply process by streaming
-      any changes that happened during the initial data copy using standard
-      logical replication. Once the synchronization is done, the control
-      of the replication of the table is given back to the main apply
-      process where the replication continues as normal.
+     The initial data in existing subscribed tables are snapshotted and
+     copied in a parallel instance of a special kind of apply process.
+     This process will create its own replication slot and copy the existing
+     data.  As soon as the copy is finished the table contents will become
+     visible to other backends.  Once existing data is copied, the worker
+     enters synchronization mode, which ensures that the table is brought
+     up to a synchronized state with the main apply process by streaming
+     any changes that happened during the initial data copy using standard
+     logical replication.  During this synchronization phase, the changes
+     are applied and committed in the same order as they happened on the
+     publisher.  Once the synchronization is done, the control of the
+     replication of the table is given back to the main apply process where
+     the replication continues as normal.
     </para>
   </sect2>
  </sect1>
index db5e59f707c6a61cab48fb34c1353f142ccc6edf..bcb0acf28d8f9cf3433efce17435f7e7778acbd3 100644 (file)
@@ -48,6 +48,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
    (Currently, all subscription owners must be superusers, so the owner checks
    will be bypassed in practice.  But this might change in the future.)
   </para>
+  
+  <para>
+   When refreshing a publication we remove the relations that are no longer
+   part of the publication and we also remove the tablesync slots if there are
+   any. It is necessary to remove tablesync slots so that the resources
+   allocated for the subscription on the remote host are released. If due to
+   network breakdown or some other error, <productname>PostgreSQL</productname>
+   is unable to remove the slots, an ERROR will be reported. To proceed in this
+   situation, either the user need to retry the operation or disassociate the
+   slot from the subscription and drop the subscription as explained in
+   <xref linkend="sql-dropsubscription"/>.
+  </para>
+
+  <para>
+   Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> and
+   <command>ALTER SUBSCRIPTION ... SET PUBLICATION ...</command> with refresh
+   option as true cannot be executed inside a transaction block.
+  </para>
  </refsect1>
 
  <refsect1>
index adbdeafb4e18c222170f8c67b31b41c908a4fe60..aee961554635a8057307baad62004c5ef665f1cd 100644 (file)
@@ -79,7 +79,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
   <para>
    When dropping a subscription that is associated with a replication slot on
    the remote host (the normal state), <command>DROP SUBSCRIPTION</command>
-   will connect to the remote host and try to drop the replication slot as
+   will connect to the remote host and try to drop the replication slot (and
+   any remaining table synchronization slots) as
    part of its operation.  This is necessary so that the resources allocated
    for the subscription on the remote host are released.  If this fails,
    either because the remote host is not reachable or because the remote
@@ -89,7 +90,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
    executing <literal>ALTER SUBSCRIPTION ... SET (slot_name = NONE)</literal>.
    After that, <command>DROP SUBSCRIPTION</command> will no longer attempt any
    actions on a remote host.  Note that if the remote replication slot still
-   exists, it should then be dropped manually; otherwise it will continue to
+   exists, it (and any related table synchronization slots) should then be
+   dropped manually; otherwise it/they will continue to
    reserve WAL and might eventually cause the disk to fill up.  See
    also <xref linkend="logical-replication-subscription-slot"/>.
   </para>
index a2068e3fd45d83026b97496e1ba0c3139eec4524..3c8b4eb36223c41b66dc52b97e88661f39ee05ba 100644 (file)
@@ -2432,15 +2432,6 @@ PrepareTransaction(void)
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
-   /*
-    * Don't allow PREPARE but for transaction that has/might kill logical
-    * replication workers.
-    */
-   if (XactManipulatesLogicalReplicationWorkers())
-       ereport(ERROR,
-               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
-
    /* Prevent cancel/die interrupt while cleaning up */
    HOLD_INTERRUPTS();
 
@@ -4899,7 +4890,6 @@ CommitSubTransaction(void)
    AtEOSubXact_HashTables(true, s->nestingLevel);
    AtEOSubXact_PgStat(true, s->nestingLevel);
    AtSubCommit_Snapshot(s->nestingLevel);
-   AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
    /*
     * We need to restore the upper transaction's read-only state, in case the
@@ -5059,7 +5049,6 @@ AbortSubTransaction(void)
        AtEOSubXact_HashTables(false, s->nestingLevel);
        AtEOSubXact_PgStat(false, s->nestingLevel);
        AtSubAbort_Snapshot(s->nestingLevel);
-       AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
    }
 
    /*
index 44cb285b686501c5df2bf610f7fd34e950d3a317..c32fc8137d8efad42ffe93b62432aca83ed7f55f 100644 (file)
@@ -29,6 +29,7 @@
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
 #include "utils/pg_lsn.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -337,6 +338,13 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
    char        substate;
    bool        isnull;
    Datum       d;
+   Relation    rel;
+
+   /*
+    * This is to avoid the race condition with AlterSubscription which tries
+    * to remove this relstate.
+    */
+   rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
    /* Try finding the mapping. */
    tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
@@ -363,6 +371,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
    /* Cleanup */
    ReleaseSysCache(tup);
 
+   table_close(rel, AccessShareLock);
+
    return substate;
 }
 
@@ -403,6 +413,34 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
    scan = table_beginscan_catalog(rel, nkeys, skey);
    while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
    {
+       Form_pg_subscription_rel subrel;
+
+       subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+       /*
+        * We don't allow to drop the relation mapping when the table
+        * synchronization is in progress unless the caller updates the
+        * corresponding subscription as well. This is to ensure that we don't
+        * leave tablesync slots or origins in the system when the
+        * corresponding table is dropped.
+        */
+       if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
+       {
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                    errmsg("could not drop relation mapping for subscription \"%s\"",
+                           get_subscription_name(subrel->srsubid, false)),
+                    errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
+                              get_rel_name(relid), subrel->srsubstate),
+           /*
+            * translator: first %s is a SQL ALTER command and second %s is a
+            * SQL DROP command
+            */
+                    errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
+                            "ALTER SUBSCRIPTION ... ENABLE",
+                            "DROP SUBSCRIPTION ...")));
+       }
+
        CatalogTupleDelete(rel, &tup->t_self);
    }
    table_endscan(scan);
index 5ccbc9dd50f0908dec35cdd42201609c42196f03..5cf874e0b46101223ee2424d6a03364e25dec936 100644 (file)
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
@@ -46,6 +47,8 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -566,107 +569,207 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
    Oid        *pubrel_local_oids;
    ListCell   *lc;
    int         off;
+   int         remove_rel_len;
+   Relation    rel = NULL;
+   typedef struct SubRemoveRels
+   {
+       Oid         relid;
+       char        state;
+   } SubRemoveRels;
+   SubRemoveRels *sub_remove_rels;
 
    /* Load the library providing us libpq calls. */
    load_file("libpqwalreceiver", false);
 
-   /* Try to connect to the publisher. */
-   wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
-   if (!wrconn)
-       ereport(ERROR,
-               (errmsg("could not connect to the publisher: %s", err)));
-
-   /* Get the table list from publisher. */
-   pubrel_names = fetch_table_list(wrconn, sub->publications);
+   PG_TRY();
+   {
+       /* Try to connect to the publisher. */
+       wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+       if (!wrconn)
+           ereport(ERROR,
+                   (errmsg("could not connect to the publisher: %s", err)));
 
-   /* We are done with the remote side, close connection. */
-   walrcv_disconnect(wrconn);
+       /* Get the table list from publisher. */
+       pubrel_names = fetch_table_list(wrconn, sub->publications);
 
-   /* Get local table list. */
-   subrel_states = GetSubscriptionRelations(sub->oid);
+       /* Get local table list. */
+       subrel_states = GetSubscriptionRelations(sub->oid);
 
-   /*
-    * Build qsorted array of local table oids for faster lookup. This can
-    * potentially contain all tables in the database so speed of lookup is
-    * important.
-    */
-   subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
-   off = 0;
-   foreach(lc, subrel_states)
-   {
-       SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+       /*
+        * Build qsorted array of local table oids for faster lookup. This can
+        * potentially contain all tables in the database so speed of lookup
+        * is important.
+        */
+       subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+       off = 0;
+       foreach(lc, subrel_states)
+       {
+           SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
 
-       subrel_local_oids[off++] = relstate->relid;
-   }
-   qsort(subrel_local_oids, list_length(subrel_states),
-         sizeof(Oid), oid_cmp);
+           subrel_local_oids[off++] = relstate->relid;
+       }
+       qsort(subrel_local_oids, list_length(subrel_states),
+             sizeof(Oid), oid_cmp);
+
+       /*
+        * Rels that we want to remove from subscription and drop any slots
+        * and origins corresponding to them.
+        */
+       sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+       /*
+        * Walk over the remote tables and try to match them to locally known
+        * tables. If the table is not known locally create a new state for
+        * it.
+        *
+        * Also builds array of local oids of remote tables for the next step.
+        */
+       off = 0;
+       pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+       foreach(lc, pubrel_names)
+       {
+           RangeVar   *rv = (RangeVar *) lfirst(lc);
+           Oid         relid;
 
-   /*
-    * Walk over the remote tables and try to match them to locally known
-    * tables. If the table is not known locally create a new state for it.
-    *
-    * Also builds array of local oids of remote tables for the next step.
-    */
-   off = 0;
-   pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+           relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
-   foreach(lc, pubrel_names)
-   {
-       RangeVar   *rv = (RangeVar *) lfirst(lc);
-       Oid         relid;
+           /* Check for supported relkind. */
+           CheckSubscriptionRelkind(get_rel_relkind(relid),
+                                    rv->schemaname, rv->relname);
 
-       relid = RangeVarGetRelid(rv, AccessShareLock, false);
+           pubrel_local_oids[off++] = relid;
 
-       /* Check for supported relkind. */
-       CheckSubscriptionRelkind(get_rel_relkind(relid),
-                                rv->schemaname, rv->relname);
+           if (!bsearch(&relid, subrel_local_oids,
+                        list_length(subrel_states), sizeof(Oid), oid_cmp))
+           {
+               AddSubscriptionRelState(sub->oid, relid,
+                                       copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+                                       InvalidXLogRecPtr);
+               ereport(DEBUG1,
+                       (errmsg("table \"%s.%s\" added to subscription \"%s\"",
+                               rv->schemaname, rv->relname, sub->name)));
+           }
+       }
 
-       pubrel_local_oids[off++] = relid;
+       /*
+        * Next remove state for tables we should not care about anymore using
+        * the data we collected above
+        */
+       qsort(pubrel_local_oids, list_length(pubrel_names),
+             sizeof(Oid), oid_cmp);
 
-       if (!bsearch(&relid, subrel_local_oids,
-                    list_length(subrel_states), sizeof(Oid), oid_cmp))
+       remove_rel_len = 0;
+       for (off = 0; off < list_length(subrel_states); off++)
        {
-           AddSubscriptionRelState(sub->oid, relid,
-                                   copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-                                   InvalidXLogRecPtr);
-           ereport(DEBUG1,
-                   (errmsg("table \"%s.%s\" added to subscription \"%s\"",
-                           rv->schemaname, rv->relname, sub->name)));
-       }
-   }
+           Oid         relid = subrel_local_oids[off];
 
-   /*
-    * Next remove state for tables we should not care about anymore using the
-    * data we collected above
-    */
-   qsort(pubrel_local_oids, list_length(pubrel_names),
-         sizeof(Oid), oid_cmp);
+           if (!bsearch(&relid, pubrel_local_oids,
+                        list_length(pubrel_names), sizeof(Oid), oid_cmp))
+           {
+               char        state;
+               XLogRecPtr  statelsn;
+
+               /*
+                * Lock pg_subscription_rel with AccessExclusiveLock to
+                * prevent any race conditions with the apply worker
+                * re-launching workers at the same time this code is trying
+                * to remove those tables.
+                *
+                * Even if new worker for this particular rel is restarted it
+                * won't be able to make any progress as we hold exclusive
+                * lock on subscription_rel till the transaction end. It will
+                * simply exit as there is no corresponding rel entry.
+                *
+                * This locking also ensures that the state of rels won't
+                * change till we are done with this refresh operation.
+                */
+               if (!rel)
+                   rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+               /* Last known rel state. */
+               state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+               sub_remove_rels[remove_rel_len].relid = relid;
+               sub_remove_rels[remove_rel_len++].state = state;
+
+               RemoveSubscriptionRel(sub->oid, relid);
+
+               logicalrep_worker_stop(sub->oid, relid);
+
+               /*
+                * For READY state, we would have already dropped the
+                * tablesync origin.
+                */
+               if (state != SUBREL_STATE_READY)
+               {
+                   char        originname[NAMEDATALEN];
+
+                   /*
+                    * Drop the tablesync's origin tracking if exists.
+                    *
+                    * It is possible that the origin is not yet created for
+                    * tablesync worker, this can happen for the states before
+                    * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+                    * concurrently try to drop the origin and by this time
+                    * the origin might be already removed. For these reasons,
+                    * passing missing_ok = true.
+                    */
+                   ReplicationOriginNameForTablesync(sub->oid, relid, originname);
+                   replorigin_drop_by_name(originname, true, false);
+               }
 
-   for (off = 0; off < list_length(subrel_states); off++)
-   {
-       Oid         relid = subrel_local_oids[off];
+               ereport(DEBUG1,
+                       (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
+                               get_namespace_name(get_rel_namespace(relid)),
+                               get_rel_name(relid),
+                               sub->name)));
+           }
+       }
 
-       if (!bsearch(&relid, pubrel_local_oids,
-                    list_length(pubrel_names), sizeof(Oid), oid_cmp))
+       /*
+        * Drop the tablesync slots associated with removed tables. This has
+        * to be at the end because otherwise if there is an error while doing
+        * the database operations we won't be able to rollback dropped slots.
+        */
+       for (off = 0; off < remove_rel_len; off++)
        {
-           RemoveSubscriptionRel(sub->oid, relid);
-
-           logicalrep_worker_stop_at_commit(sub->oid, relid);
-
-           ereport(DEBUG1,
-                   (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
-                           get_namespace_name(get_rel_namespace(relid)),
-                           get_rel_name(relid),
-                           sub->name)));
+           if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+               sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+           {
+               char        syncslotname[NAMEDATALEN] = {0};
+
+               /*
+                * For READY/SYNCDONE states we know the tablesync slot has
+                * already been dropped by the tablesync worker.
+                *
+                * For other states, there is no certainty, maybe the slot
+                * does not exist yet. Also, if we fail after removing some of
+                * the slots, next time, it will again try to drop already
+                * dropped slots and fail. For these reasons, we allow
+                * missing_ok = true for the drop.
+                */
+               ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid, syncslotname);
+               ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+           }
        }
    }
+   PG_FINALLY();
+   {
+       if (wrconn)
+           walrcv_disconnect(wrconn);
+   }
+   PG_END_TRY();
+
+   if (rel)
+       table_close(rel, NoLock);
 }
 
 /*
  * Alter the existing subscription.
  */
 ObjectAddress
-AlterSubscription(AlterSubscriptionStmt *stmt)
+AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 {
    Relation    rel;
    ObjectAddress myself;
@@ -848,6 +951,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
                                 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
 
+                   PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
                    /* Make sure refresh sees the new list of publications. */
                    sub->publications = stmt->publication;
 
@@ -877,6 +982,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                           NULL, NULL,  /* no "binary" */
                                           NULL, NULL); /* no "streaming" */
 
+               PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
+
                AlterSubscription_refresh(sub, copy_data);
 
                break;
@@ -927,8 +1034,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    char        originname[NAMEDATALEN];
    char       *err = NULL;
    WalReceiverConn *wrconn = NULL;
-   StringInfoData cmd;
    Form_pg_subscription form;
+   List       *rstates;
 
    /*
     * Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1041,6 +1148,36 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    }
    list_free(subworkers);
 
+   /*
+    * Cleanup of tablesync replication origins.
+    *
+    * Any READY-state relations would already have dealt with clean-ups.
+    *
+    * Note that the state can't change because we have already stopped both
+    * the apply and tablesync workers and they can't restart because of
+    * exclusive lock on the subscription.
+    */
+   rstates = GetSubscriptionNotReadyRelations(subid);
+   foreach(lc, rstates)
+   {
+       SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+       Oid         relid = rstate->relid;
+
+       /* Only cleanup resources of tablesync workers */
+       if (!OidIsValid(relid))
+           continue;
+
+       /*
+        * Drop the tablesync's origin tracking if exists.
+        *
+        * It is possible that the origin is not yet created for tablesync
+        * worker so passing missing_ok = true. This can happen for the states
+        * before SUBREL_STATE_FINISHEDCOPY.
+        */
+       ReplicationOriginNameForTablesync(subid, relid, originname);
+       replorigin_drop_by_name(originname, true, false);
+   }
+
    /* Clean up dependencies */
    deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
@@ -1055,30 +1192,110 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
     * If there is no slot associated with the subscription, we can finish
     * here.
     */
-   if (!slotname)
+   if (!slotname && rstates == NIL)
    {
        table_close(rel, NoLock);
        return;
    }
 
    /*
-    * Otherwise drop the replication slot at the publisher node using the
-    * replication connection.
+    * Try to acquire the connection necessary for dropping slots.
+    *
+    * Note: If the slotname is NONE/NULL then we allow the command to finish
+    * and users need to manually cleanup the apply and tablesync worker slots
+    * later.
+    *
+    * This has to be at the end because otherwise if there is an error while
+    * doing the database operations we won't be able to rollback dropped
+    * slot.
     */
    load_file("libpqwalreceiver", false);
 
-   initStringInfo(&cmd);
-   appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
-
    wrconn = walrcv_connect(conninfo, true, subname, &err);
    if (wrconn == NULL)
-       ereport(ERROR,
-               (errmsg("could not connect to publisher when attempting to "
-                       "drop the replication slot \"%s\"", slotname),
-                errdetail("The error was: %s", err),
-       /* translator: %s is an SQL ALTER command */
-                errhint("Use %s to disassociate the subscription from the slot.",
-                        "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+   {
+       if (!slotname)
+       {
+           /* be tidy */
+           list_free(rstates);
+           table_close(rel, NoLock);
+           return;
+       }
+       else
+       {
+           ReportSlotConnectionError(rstates, subid, slotname, err);
+       }
+   }
+
+   PG_TRY();
+   {
+       foreach(lc, rstates)
+       {
+           SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+           Oid         relid = rstate->relid;
+
+           /* Only cleanup resources of tablesync workers */
+           if (!OidIsValid(relid))
+               continue;
+
+           /*
+            * Drop the tablesync slots associated with removed tables.
+            *
+            * For SYNCDONE/READY states, the tablesync slot is known to have
+            * already been dropped by the tablesync worker.
+            *
+            * For other states, there is no certainty, maybe the slot does
+            * not exist yet. Also, if we fail after removing some of the
+            * slots, next time, it will again try to drop already dropped
+            * slots and fail. For these reasons, we allow missing_ok = true
+            * for the drop.
+            */
+           if (rstate->state != SUBREL_STATE_SYNCDONE)
+           {
+               char        syncslotname[NAMEDATALEN] = {0};
+
+               ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+               ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+           }
+       }
+
+       list_free(rstates);
+
+       /*
+        * If there is a slot associated with the subscription, then drop the
+        * replication slot at the publisher.
+        */
+       if (slotname)
+           ReplicationSlotDropAtPubNode(wrconn, slotname, false);
+
+   }
+   PG_FINALLY();
+   {
+       walrcv_disconnect(wrconn);
+   }
+   PG_END_TRY();
+
+   table_close(rel, NoLock);
+}
+
+/*
+ * Drop the replication slot at the publisher node using the replication
+ * connection.
+ *
+ * missing_ok - if true then only issue a LOG message if the slot doesn't
+ * exist.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
+{
+   StringInfoData cmd;
+
+   Assert(wrconn);
+
+   load_file("libpqwalreceiver", false);
+
+   initStringInfo(&cmd);
+   appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
 
    PG_TRY();
    {
@@ -1086,27 +1303,39 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
        res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 
-       if (res->status != WALRCV_OK_COMMAND)
-           ereport(ERROR,
+       if (res->status == WALRCV_OK_COMMAND)
+       {
+           /* NOTICE. Success. */
+           ereport(NOTICE,
+                   (errmsg("dropped replication slot \"%s\" on publisher",
+                           slotname)));
+       }
+       else if (res->status == WALRCV_ERROR &&
+                missing_ok &&
+                res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
+       {
+           /* LOG. Error, but missing_ok = true. */
+           ereport(LOG,
                    (errmsg("could not drop the replication slot \"%s\" on publisher",
                            slotname),
                     errdetail("The error was: %s", res->err)));
+       }
        else
-           ereport(NOTICE,
-                   (errmsg("dropped replication slot \"%s\" on publisher",
-                           slotname)));
+       {
+           /* ERROR. */
+           ereport(ERROR,
+                   (errmsg("could not drop the replication slot \"%s\" on publisher",
+                           slotname),
+                    errdetail("The error was: %s", res->err)));
+       }
 
        walrcv_clear_result(res);
    }
    PG_FINALLY();
    {
-       walrcv_disconnect(wrconn);
+       pfree(cmd.data);
    }
    PG_END_TRY();
-
-   pfree(cmd.data);
-
-   table_close(rel, NoLock);
 }
 
 /*
@@ -1275,3 +1504,45 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
    return tablelist;
 }
+
+/*
+ * This is to report the connection failure while dropping replication slots.
+ * Here, we report the WARNING for all tablesync slots so that user can drop
+ * them manually, if required.
+ */
+static void
+ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
+{
+   ListCell   *lc;
+
+   foreach(lc, rstates)
+   {
+       SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+       Oid         relid = rstate->relid;
+
+       /* Only cleanup resources of tablesync workers */
+       if (!OidIsValid(relid))
+           continue;
+
+       /*
+        * Caller needs to ensure that relstate doesn't change underneath us.
+        * See DropSubscription where we get the relstates.
+        */
+       if (rstate->state != SUBREL_STATE_SYNCDONE)
+       {
+           char        syncslotname[NAMEDATALEN] = {0};
+
+           ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+           elog(WARNING, "could not drop tablesync replication slot \"%s\"",
+                syncslotname);
+       }
+   }
+
+   ereport(ERROR,
+           (errmsg("could not connect to publisher when attempting to "
+                   "drop the replication slot \"%s\"", slotname),
+            errdetail("The error was: %s", err),
+   /* translator: %s is an SQL ALTER command */
+            errhint("Use %s to disassociate the subscription from the slot.",
+                    "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+}
index e9582748617f09dd3d3d55fd272e7a09a7c44384..7714696140873d6b415d1e20c634de3d28d7e11a 100644 (file)
@@ -982,6 +982,7 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 {
    PGresult   *pgres = NULL;
    WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
+   char       *diag_sqlstate;
 
    if (MyDatabaseId == InvalidOid)
        ereport(ERROR,
@@ -1025,6 +1026,13 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
        case PGRES_BAD_RESPONSE:
            walres->status = WALRCV_ERROR;
            walres->err = pchomp(PQerrorMessage(conn->streamConn));
+           diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
+           if (diag_sqlstate)
+               walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+                                                diag_sqlstate[1],
+                                                diag_sqlstate[2],
+                                                diag_sqlstate[3],
+                                                diag_sqlstate[4]);
            break;
    }
 
index 186514cd9ed43f4cbfd8d3a782ef12490e060a3b..58082dde186e2807c25302763558250b5f6e1d62 100644 (file)
@@ -73,20 +73,6 @@ typedef struct LogicalRepWorkerId
    Oid         relid;
 } LogicalRepWorkerId;
 
-typedef struct StopWorkersData
-{
-   int         nestDepth;      /* Sub-transaction nest level */
-   List       *workers;        /* List of LogicalRepWorkerId */
-   struct StopWorkersData *parent; /* This need not be an immediate
-                                    * subtransaction parent */
-} StopWorkersData;
-
-/*
- * Stack of StopWorkersData elements. Each stack element contains the workers
- * to be stopped for that subtransaction.
- */
-static StopWorkersData *on_commit_stop_workers = NULL;
-
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -546,51 +532,6 @@ logicalrep_worker_stop(Oid subid, Oid relid)
    LWLockRelease(LogicalRepWorkerLock);
 }
 
-/*
- * Request worker for specified sub/rel to be stopped on commit.
- */
-void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
-{
-   int         nestDepth = GetCurrentTransactionNestLevel();
-   LogicalRepWorkerId *wid;
-   MemoryContext oldctx;
-
-   /* Make sure we store the info in context that survives until commit. */
-   oldctx = MemoryContextSwitchTo(TopTransactionContext);
-
-   /* Check that previous transactions were properly cleaned up. */
-   Assert(on_commit_stop_workers == NULL ||
-          nestDepth >= on_commit_stop_workers->nestDepth);
-
-   /*
-    * Push a new stack element if we don't already have one for the current
-    * nestDepth.
-    */
-   if (on_commit_stop_workers == NULL ||
-       nestDepth > on_commit_stop_workers->nestDepth)
-   {
-       StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
-
-       newdata->nestDepth = nestDepth;
-       newdata->workers = NIL;
-       newdata->parent = on_commit_stop_workers;
-       on_commit_stop_workers = newdata;
-   }
-
-   /*
-    * Finally add a new worker into the worker list of the current
-    * subtransaction.
-    */
-   wid = palloc(sizeof(LogicalRepWorkerId));
-   wid->subid = subid;
-   wid->relid = relid;
-   on_commit_stop_workers->workers =
-       lappend(on_commit_stop_workers->workers, wid);
-
-   MemoryContextSwitchTo(oldctx);
-}
-
 /*
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
@@ -819,109 +760,21 @@ ApplyLauncherShmemInit(void)
    }
 }
 
-/*
- * Check whether current transaction has manipulated logical replication
- * workers.
- */
-bool
-XactManipulatesLogicalReplicationWorkers(void)
-{
-   return (on_commit_stop_workers != NULL);
-}
-
 /*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-
-   Assert(on_commit_stop_workers == NULL ||
-          (on_commit_stop_workers->nestDepth == 1 &&
-           on_commit_stop_workers->parent == NULL));
-
    if (isCommit)
    {
-       ListCell   *lc;
-
-       if (on_commit_stop_workers != NULL)
-       {
-           List       *workers = on_commit_stop_workers->workers;
-
-           foreach(lc, workers)
-           {
-               LogicalRepWorkerId *wid = lfirst(lc);
-
-               logicalrep_worker_stop(wid->subid, wid->relid);
-           }
-       }
-
        if (on_commit_launcher_wakeup)
            ApplyLauncherWakeup();
    }
 
-   /*
-    * No need to pfree on_commit_stop_workers.  It was allocated in
-    * transaction memory context, which is going to be cleaned soon.
-    */
-   on_commit_stop_workers = NULL;
    on_commit_launcher_wakeup = false;
 }
 
-/*
- * On commit, merge the current on_commit_stop_workers list into the
- * immediate parent, if present.
- * On rollback, discard the current on_commit_stop_workers list.
- * Pop out the stack.
- */
-void
-AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
-{
-   StopWorkersData *parent;
-
-   /* Exit immediately if there's no work to do at this level. */
-   if (on_commit_stop_workers == NULL ||
-       on_commit_stop_workers->nestDepth < nestDepth)
-       return;
-
-   Assert(on_commit_stop_workers->nestDepth == nestDepth);
-
-   parent = on_commit_stop_workers->parent;
-
-   if (isCommit)
-   {
-       /*
-        * If the upper stack element is not an immediate parent
-        * subtransaction, just decrement the notional nesting depth without
-        * doing any real work.  Else, we need to merge the current workers
-        * list into the parent.
-        */
-       if (!parent || parent->nestDepth < nestDepth - 1)
-       {
-           on_commit_stop_workers->nestDepth--;
-           return;
-       }
-
-       parent->workers =
-           list_concat(parent->workers, on_commit_stop_workers->workers);
-   }
-   else
-   {
-       /*
-        * Abandon everything that was done at this nesting level.  Explicitly
-        * free memory to avoid a transaction-lifespan leak.
-        */
-       list_free_deep(on_commit_stop_workers->workers);
-   }
-
-   /*
-    * We have taken care of the current subtransaction workers list for both
-    * abort or commit. So we are ready to pop the stack.
-    */
-   pfree(on_commit_stop_workers);
-   on_commit_stop_workers = parent;
-}
-
 /*
  * Request wakeup of the launcher on commit of the transaction.
  *
index ccbdbcf08f9181382c8a859c327a3ed645702dea..19cc80467867953040c50606e7164217395bdb72 100644 (file)
  *      table state to INIT.
  *    - Tablesync worker starts; changes table state from INIT to DATASYNC while
  *      copying.
- *    - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
- *      waits for state change.
+ *    - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
+ *      worker specific) state to indicate when the copy phase has completed, so
+ *      if the worker crashes with this (non-memory) state then the copy will not
+ *      be re-attempted.
+ *    - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
  *    - Apply worker periodically checks for tables in SYNCWAIT state.  When
  *      any appear, it sets the table state to CATCHUP and starts loop-waiting
  *      until either the table state is set to SYNCDONE or the sync worker
@@ -48,8 +51,8 @@
  *      point it sets state to READY and stops tracking.  Again, there might
  *      be zero changes in between.
  *
- *   So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
- *   CATCHUP -> SYNCDONE -> READY.
+ *   So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
+ *   -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
  *
  *   The catalog pg_subscription_rel is used to keep information about
  *   subscribed tables and their state.  The catalog holds all states
@@ -58,6 +61,7 @@
  *   Example flows look like this:
  *    - Apply is in front:
  *       sync:8
+ *         -> set in catalog FINISHEDCOPY
  *         -> set in memory SYNCWAIT
  *       apply:10
  *         -> set in memory CATCHUP
@@ -73,6 +77,7 @@
  *
  *    - Sync is in front:
  *       sync:10
+ *         -> set in catalog FINISHEDCOPY
  *         -> set in memory SYNCWAIT
  *       apply:8
  *         -> set in memory CATCHUP
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -269,26 +277,52 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-   Assert(IsTransactionState());
-
    SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
    if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
        current_lsn >= MyLogicalRepWorker->relstate_lsn)
    {
        TimeLineID  tli;
+       char        syncslotname[NAMEDATALEN] = {0};
 
        MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
        MyLogicalRepWorker->relstate_lsn = current_lsn;
 
        SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+       /*
+        * UpdateSubscriptionRelState must be called within a transaction.
+        * That transaction will be ended within the finish_sync_worker().
+        */
+       if (!IsTransactionState())
+           StartTransactionCommand();
+
        UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                   MyLogicalRepWorker->relid,
                                   MyLogicalRepWorker->relstate,
                                   MyLogicalRepWorker->relstate_lsn);
 
+       /* End wal streaming so wrconn can be re-used to drop the slot. */
        walrcv_endstreaming(wrconn, &tli);
+
+       /*
+        * Cleanup the tablesync slot.
+        *
+        * This has to be done after updating the state because otherwise if
+        * there is an error while doing the database operations we won't be
+        * able to rollback dropped slot.
+        */
+       ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
+                                       MyLogicalRepWorker->relid,
+                                       syncslotname);
+
+       /*
+        * It is important to give an error if we are unable to drop the slot,
+        * otherwise, it won't be dropped till the corresponding subscription
+        * is dropped. So passing missing_ok = false.
+        */
+       ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+
        finish_sync_worker();
    }
    else
@@ -403,6 +437,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
             */
            if (current_lsn >= rstate->lsn)
            {
+               char        originname[NAMEDATALEN];
+
                rstate->state = SUBREL_STATE_READY;
                rstate->lsn = current_lsn;
                if (!started_tx)
@@ -411,6 +447,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                    started_tx = true;
                }
 
+               /*
+                * Remove the tablesync origin tracking if exists.
+                *
+                * The normal case origin drop is done here instead of in the
+                * process_syncing_tables_for_sync function because we don't
+                * allow to drop the origin till the process owning the origin
+                * is alive.
+                *
+                * There is a chance that the user is concurrently performing
+                * refresh for the subscription where we remove the table
+                * state and its origin and by this time the origin might be
+                * already removed. So passing missing_ok = true.
+                */
+               ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+                                                 rstate->relid,
+                                                 originname);
+               replorigin_drop_by_name(originname, true, false);
+
+               /*
+                * Update the state to READY only after the origin cleanup.
+                */
                UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                           rstate->relid, rstate->state,
                                           rstate->lsn);
@@ -805,6 +862,50 @@ copy_table(Relation rel)
    logicalrep_rel_close(relmapentry, NoLock);
 }
 
+/*
+ * Determine the tablesync slot name.
+ *
+ * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
+ * on slot name length. We append system_identifier to avoid slot_name
+ * collision with subscriptions in other clusters. With the current scheme
+ * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
+ * length of slot_name will be 50.
+ *
+ * The returned slot name is either:
+ * - stored in the supplied buffer (syncslotname), or
+ * - palloc'ed in current memory context (if syncslotname = NULL).
+ *
+ * Note: We don't use the subscription slot name as part of tablesync slot name
+ * because we are responsible for cleaning up these slots and it could become
+ * impossible to recalculate what name to cleanup if the subscription slot name
+ * had changed.
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
+                               char syncslotname[NAMEDATALEN])
+{
+   if (syncslotname)
+       sprintf(syncslotname, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
+               GetSystemIdentifier());
+   else
+       syncslotname = psprintf("pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
+                               GetSystemIdentifier());
+
+   return syncslotname;
+}
+
+/*
+ * Form the origin name for tablesync.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
+                                 char originname[NAMEDATALEN])
+{
+   snprintf(originname, NAMEDATALEN, "pg_%u_%u", suboid, relid);
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -822,6 +923,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    XLogRecPtr  relstate_lsn;
    Relation    rel;
    WalRcvExecResult *res;
+   char        originname[NAMEDATALEN];
+   RepOriginId originid;
 
    /* Check the state of the table synchronization. */
    StartTransactionCommand();
@@ -847,19 +950,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
            finish_sync_worker();   /* doesn't return */
    }
 
-   /*
-    * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-    * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-    * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-    * NAMEDATALEN on the remote that matters, but this scheme will also work
-    * reasonably if that is different.)
-    */
-   StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");   /* for sanity */
-   slotname = psprintf("%.*s_%u_sync_%u",
-                       NAMEDATALEN - 28,
-                       MySubscription->slotname,
-                       MySubscription->oid,
-                       MyLogicalRepWorker->relid);
+   /* Calculate the name of the tablesync slot. */
+   slotname = ReplicationSlotNameForTablesync(MySubscription->oid,
+                                              MyLogicalRepWorker->relid,
+                                              NULL /* use palloc */ );
 
    /*
     * Here we use the slot name instead of the subscription name as the
@@ -872,7 +966,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                (errmsg("could not connect to the publisher: %s", err)));
 
    Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-          MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+          MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+          MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+
+   /* Assign the origin tracking record name. */
+   ReplicationOriginNameForTablesync(MySubscription->oid,
+                                     MyLogicalRepWorker->relid,
+                                     originname);
+
+   if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
+   {
+       /*
+        * We have previously errored out before finishing the copy so the
+        * replication slot might exist. We want to remove the slot if it
+        * already exists and proceed.
+        *
+        * XXX We could also instead try to drop the slot, last time we failed
+        * but for that, we might need to clean up the copy state as it might
+        * be in the middle of fetching the rows. Also, if there is a network
+        * breakdown then it wouldn't have succeeded so trying it next time
+        * seems like a better bet.
+        */
+       ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+   }
+   else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
+   {
+       /*
+        * The COPY phase was previously done, but tablesync then crashed
+        * before it was able to finish normally.
+        */
+       StartTransactionCommand();
+
+       /*
+        * The origin tracking name must already exist. It was created first
+        * time this tablesync was launched.
+        */
+       originid = replorigin_by_name(originname, false);
+       replorigin_session_setup(originid);
+       replorigin_session_origin = originid;
+       *origin_startpos = replorigin_session_get_progress(false);
+
+       CommitTransactionCommand();
+
+       goto copy_table_done;
+   }
 
    SpinLockAcquire(&MyLogicalRepWorker->relmutex);
    MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -888,9 +1025,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    CommitTransactionCommand();
    pgstat_report_stat(false);
 
-   /*
-    * We want to do the table data sync in a single transaction.
-    */
    StartTransactionCommand();
 
    /*
@@ -916,13 +1050,46 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    walrcv_clear_result(res);
 
    /*
-    * Create a new temporary logical decoding slot.  This slot will be used
+    * Create a new permanent logical decoding slot. This slot will be used
     * for the catchup phase after COPY is done, so tell it to use the
     * snapshot to make the final data consistent.
     */
-   walrcv_create_slot(wrconn, slotname, true,
+   walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
                       CRS_USE_SNAPSHOT, origin_startpos);
 
+   /*
+    * Setup replication origin tracking. The purpose of doing this before the
+    * copy is to avoid doing the copy again due to any error in setting up
+    * origin tracking.
+    */
+   originid = replorigin_by_name(originname, true);
+   if (!OidIsValid(originid))
+   {
+       /*
+        * Origin tracking does not exist, so create it now.
+        *
+        * Then advance to the LSN got from walrcv_create_slot. This is WAL
+        * logged for the purpose of recovery. Locks are to prevent the
+        * replication origin from vanishing while advancing.
+        */
+       originid = replorigin_create(originname);
+
+       LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+       replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+                          true /* go backward */ , true /* WAL log */ );
+       UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+       replorigin_session_setup(originid);
+       replorigin_session_origin = originid;
+   }
+   else
+   {
+       ereport(ERROR,
+               (errcode(ERRCODE_DUPLICATE_OBJECT),
+                errmsg("replication origin \"%s\" already exists",
+                       originname)));
+   }
+
    /* Now do the initial data copy */
    PushActiveSnapshot(GetTransactionSnapshot());
    copy_table(rel);
@@ -940,6 +1107,25 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
    /* Make the copy visible. */
    CommandCounterIncrement();
 
+   /*
+    * Update the persisted state to indicate the COPY phase is done; make it
+    * visible to others.
+    */
+   UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                              MyLogicalRepWorker->relid,
+                              SUBREL_STATE_FINISHEDCOPY,
+                              MyLogicalRepWorker->relstate_lsn);
+
+   CommitTransactionCommand();
+
+copy_table_done:
+
+   elog(DEBUG1,
+        "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+        originname,
+        (uint32) (*origin_startpos >> 32),
+        (uint32) *origin_startpos);
+
    /*
     * We are done with the initial data synchronization, update the state.
     */
index eb7db89cef7d5fccddca202164fa622da7123e37..cfc924cd8935ac11b5e92bcd397d294b38e1b651 100644 (file)
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
    /* We must be in a valid transaction state */
    Assert(IsTransactionState());
 
-   /* The synchronization worker runs in single transaction. */
-   if (!am_tablesync_worker())
-   {
-       /* Commit the per-stream transaction */
-       CommitTransactionCommand();
-   }
+   /* Commit the per-stream transaction */
+   CommitTransactionCommand();
 
    in_streamed_transaction = false;
 
@@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s)
            /* Cleanup the subxact info */
            cleanup_subxact_info();
 
-           /* The synchronization worker runs in single transaction */
-           if (!am_tablesync_worker())
-               CommitTransactionCommand();
+           CommitTransactionCommand();
            return;
        }
 
@@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s)
        /* write the updated subxact list */
        subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-       if (!am_tablesync_worker())
-           CommitTransactionCommand();
+       CommitTransactionCommand();
    }
 }
 
@@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
 {
-   /* The synchronization worker runs in single transaction. */
-   if (IsTransactionState() && !am_tablesync_worker())
+   if (IsTransactionState())
    {
        /*
         * Update origin state so we can restart streaming from correct
index 1d81071c35724a0c8db91b43bb2f40d4dc491ccd..05bb698cf456bfe6fab62e1791a569df67394d2a 100644 (file)
@@ -1786,7 +1786,8 @@ ProcessUtilitySlow(ParseState *pstate,
                break;
 
            case T_AlterSubscriptionStmt:
-               address = AlterSubscription((AlterSubscriptionStmt *) parsetree);
+               address = AlterSubscription((AlterSubscriptionStmt *) parsetree,
+                                           isTopLevel);
                break;
 
            case T_DropSubscriptionStmt:
index 638830aaac1aae58180157db3801551f6e0000ad..2efd937e12e554f93e2018a9c6eeb77a8fe61706 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202102021
+#define CATALOG_VERSION_NO 202102121
 
 #endif
index 2bea2c52aa760efa1cde938bd8eff2e87eb744b3..ed94f57baa1376ae38f2b606320fcb30c100d258 100644 (file)
@@ -61,6 +61,8 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg
 #define SUBREL_STATE_INIT      'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC  'd' /* data is being synchronized (sublsn
                                     * NULL) */
+#define SUBREL_STATE_FINISHEDCOPY 'f'  /* tablesync copy phase is completed
+                                        * (sublsn NULL) */
 #define SUBREL_STATE_SYNCDONE  's' /* synchronization finished in front of
                                     * apply (sublsn set) */
 #define SUBREL_STATE_READY     'r' /* ready (sublsn set) */
index a81865079d1c7478167229dc1ca930d57e247633..3b926f35d7613b72e819a1e208cedc8d9dc992ad 100644 (file)
@@ -20,7 +20,7 @@
 
 extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt,
                                        bool isTopLevel);
-extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt);
+extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel);
 extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
index 421ec1580d8065052993e8c571bb4b329897707b..301e494f7ba3c17e10e97bd4cfca5f9493c5a3d9 100644 (file)
@@ -22,9 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
-extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
-extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 
 extern bool IsLogicalLauncher(void);
 
index 53f636c56f5b3ad6568b6ac629edd7d0e42e1b9c..5f52335f15f001dcc618e418682e46a340355473 100644 (file)
@@ -15,6 +15,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
index 4313f516d35625206b68a3b7bb92c016b4fc38ba..a97a59a6a30aaedf74b356f01094cdef4ee3af46 100644 (file)
@@ -210,6 +210,7 @@ typedef enum
 typedef struct WalRcvExecResult
 {
    WalRcvExecStatus status;
+   int         sqlstate;
    char       *err;
    Tuplestorestate *tuplestore;
    TupleDesc   tupledesc;
index d046022e49cff0db0e0961f5b375e43d8c0bd19f..4a5adc2fdac2a15fc2e578dedd02c24db7f8992f 100644 (file)
@@ -77,13 +77,14 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                                     Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int logicalrep_sync_worker_count(Oid subid);
 
+extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+
 void       process_syncing_tables(XLogRecPtr current_lsn);
 void       invalidate_syncing_table_states(Datum arg, int cacheid,
                                            uint32 hashvalue);
index 2fa9bce66a422273316e22812708ee31c0d08a8e..7802279cb2e0714f3c694a0a53d6fe138daed304 100644 (file)
@@ -201,6 +201,27 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub
+       WITH (enabled = true, create_slot = false, copy_data = false);
+-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction
+-- block or function
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true);
+ERROR:  ALTER SUBSCRIPTION with refresh cannot run inside a transaction block
+END;
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
+ERROR:  ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block
+END;
+CREATE FUNCTION func() RETURNS VOID AS
+$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
+SELECT func();
+ERROR:  ALTER SUBSCRIPTION with refresh cannot be executed from a function
+CONTEXT:  SQL function "func" statement 1
+ALTER SUBSCRIPTION regress_testsub DISABLE;
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+DROP FUNCTION func;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
index 14fa0b247e1b24e0f9d55ec9474b82cdeba11e52..ca0d7827429d8852b7a5f00f400b2fafebc407ad 100644 (file)
@@ -147,6 +147,28 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 
 DROP SUBSCRIPTION regress_testsub;
 
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub
+       WITH (enabled = true, create_slot = false, copy_data = false);
+
+-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction
+-- block or function
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true);
+END;
+
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
+END;
+
+CREATE FUNCTION func() RETURNS VOID AS
+$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
+SELECT func();
+
+ALTER SUBSCRIPTION regress_testsub DISABLE;
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+DROP FUNCTION func;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
index e111ab918105fbcdd6d155dde50632c15943b252..c7926681b66c8f179a5133ebee166488d0e62d9c 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 8;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -149,7 +149,26 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, qq(20),
    'changes for table added after subscription initialized replicated');
 
+# clean up
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep_next");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep_next");
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 
+# Table tap_rep already has the same records on both publisher and subscriber
+# at this time. Recreate the subscription which will do the initial copy of
+# the table again and fails due to unique constraint violation.
+$node_subscriber->safe_psql('postgres',
+    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub");
+
+$result = $node_subscriber->poll_query_until('postgres', $started_query)
+    or die "Timed out while waiting for subscriber to start sync";
+
+# DROP SUBSCRIPTION must clean up slots on the publisher side when the
+# subscriber is stuck on data copy for constraint violation.
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
index 1d540fe489fe7ee3efee1d4c90a44d08ccd5185a..bab4f3adb3be55d7cc9d5ffab1120bd7c627966d 100644 (file)
@@ -2397,7 +2397,6 @@ StdAnalyzeData
 StdRdOptions
 Step
 StopList
-StopWorkersData
 StrategyNumber
 StreamCtl
 StreamXidHash
@@ -2408,6 +2407,7 @@ SubLink
 SubLinkType
 SubPlan
 SubPlanState
+SubRemoveRels
 SubTransactionId
 SubXactCallback
 SubXactCallbackItem