Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command.
authorAmit Kapila <[email protected]>
Fri, 14 Mar 2025 03:27:40 +0000 (08:57 +0530)
committerAmit Kapila <[email protected]>
Fri, 14 Mar 2025 03:27:40 +0000 (08:57 +0530)
The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead
to restarting of apply worker and after the restart, the apply worker will
use the existing slot and replication origin corresponding to the
subscription. Now, it is possible that before the restart, the origin has
not been updated, and the WAL start location points to a location before
where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that
can lead to an error like: "ERROR:  publication "pub1" does not exist".
Once this error occurs, apply worker will never be able to proceed and
will always return the same error.

We decided to skip loading the publication if the publication does not
exist. The publication is loaded later and updates the relation entry when
the publication gets created.

We decided not to backpatch this as this is a behaviour change, and we don't
see field reports. This problem has been found by intermittent buildfarm
failures.

Author: vignesh C <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Masahiko Sawada <[email protected]>
Discussion: https://postgr.es/m/flat/CALDaNm0-n8FGAorM%2BbTxkzn%2BAOUyx5%3DL_XmnvOP6T24%2B-NcBKg%40mail.gmail.com
Discussion: https://postgr.es/m/CAA4eK1+T-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q@mail.gmail.com

src/backend/replication/pgoutput/pgoutput.c
src/test/subscription/t/024_add_drop_pub.pl

index ed806c543004f9c0e1bb3b629dfcb42b60a78ada..8357bf8b4c0c9efb95d37379fd59140ba9ec2e90 100644 (file)
@@ -1764,6 +1764,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
 
 /*
  * Load publications from the list of publication names.
+ *
+ * Here, we skip the publications that don't exist yet. This will allow us
+ * to silently continue the replication in the absence of a missing publication.
+ * This is required because we allow the users to create publications after they
+ * have specified the required publications at the time of replication start.
  */
 static List *
 LoadPublications(List *pubnames)
@@ -1774,9 +1779,16 @@ LoadPublications(List *pubnames)
    foreach(lc, pubnames)
    {
        char       *pubname = (char *) lfirst(lc);
-       Publication *pub = GetPublicationByName(pubname, false);
+       Publication *pub = GetPublicationByName(pubname, true);
 
-       result = lappend(result, pub);
+       if (pub)
+           result = lappend(result, pub);
+       else
+           ereport(WARNING,
+                   errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                   errmsg("skipped loading publication: %s", pubname),
+                   errdetail("The publication does not exist at this point in the WAL."),
+                   errhint("Create the publication if it does not exist."));
    }
 
    return result;
index 4428a3413dbb65274f2866593bea044151cc913d..b594941c7cb4295b2db7c8ea1d5ce1c086a3592a 100644 (file)
@@ -1,7 +1,9 @@
 
 # Copyright (c) 2021-2025, PostgreSQL Global Development Group
 
-# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
+# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and
+# ensures that creating a publication associated with a subscription at a later
+# point of time does not break logical replication.
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
@@ -80,6 +82,46 @@ $result = $node_subscriber->safe_psql('postgres',
    "SELECT count(*), min(a), max(a) FROM tab_1");
 is($result, qq(20|1|10), 'check initial data is copied to subscriber');
 
+# Ensure that setting a missing publication to the subscription does not
+# disrupt existing logical replication. Instead, it should log a warning
+# while allowing replication to continue. Additionally, verify that replication
+# resumes after the missing publication is created for the publication table.
+
+# Create table on publisher and subscriber
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+
+# Set the subscription with a missing publication
+$node_subscriber->safe_psql('postgres',
+   "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3");
+
+my $offset = -s $node_publisher->logfile;
+
+$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)");
+
+# Verify that a warning is logged.
+$node_publisher->wait_for_log(
+   qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset);
+
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3");
+
+$node_subscriber->safe_psql('postgres',
+   "ALTER SUBSCRIPTION tap_sub REFRESH  PUBLICATION");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Verify that the insert operation gets replicated to subscriber after
+# publication is created.
+$result = $node_subscriber->safe_psql('postgres',
+   "SELECT * FROM tab_3");
+is($result, qq(1
+2), 'check that the incremental data is replicated after the publication is created');
+
 # shutdown
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');