Fix the logical replication from HEAD to lower versions.
authorAmit Kapila <[email protected]>
Sat, 26 Sep 2020 04:38:00 +0000 (10:08 +0530)
committerAmit Kapila <[email protected]>
Sat, 26 Sep 2020 04:43:51 +0000 (10:13 +0530)
Commit 464824323e changed the logical replication protocol to allow the
streaming of in-progress transactions and used the new version of protocol
irrespective of the server version. Use the appropriate version of the
protocol based on the server version.

Reported-by: Ashutosh Sharma
Author: Dilip Kumar
Reviewed-by: Ashutosh Sharma and Amit Kapila
Discussion: https://postgr.es/m/CAE9k0P=9OpXcNrcU5Gsvd5MZ8GFpiN833vNHzX6Uc=8+h1ft1Q@mail.gmail.com

src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/include/replication/logicalproto.h

index d239d28c094b90249006b52eb0393ac94319d212..9c6fdeeb56c46f1f97e42619dc87d1c3dafa86e2 100644 (file)
@@ -3087,7 +3087,9 @@ ApplyWorkerMain(Datum main_arg)
    options.logical = true;
    options.startpoint = origin_startpos;
    options.slotname = myslotname;
-   options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
+   options.proto.logical.proto_version =
+       walrcv_server_version(wrconn) >= 140000 ?
+       LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
    options.proto.logical.publication_names = MySubscription->publications;
    options.proto.logical.binary = MySubscription->binary;
    options.proto.logical.streaming = MySubscription->stream;
index eb1f23004e773a74753aba6e6f3b689a0561b92a..9c997aed83676893757b69cc5716d5ffde4882ca 100644 (file)
@@ -272,11 +272,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                &enable_streaming);
 
        /* Check if we support requested protocol */
-       if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
+       if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("client sent proto_version=%d but we only support protocol %d or lower",
-                           data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
+                           data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
 
        if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
            ereport(ERROR,
index 607a728508bbab478460de205d43776864ae053f..0c2cda264e14f81ec3745c2cac6eb25f6133ea69 100644 (file)
@@ -19,8 +19,9 @@
 /*
  * Protocol capabilities
  *
- * LOGICALREP_PROTO_VERSION_NUM is our native protocol and the greatest version
- * we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
+ * LOGICALREP_PROTO_VERSION_NUM is our native protocol.
+ * LOGICALREP_PROTO_MAX_VERSION_NUM is the greatest version we can support.
+ * LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
  * have backwards compatibility for. The client requests protocol version at
  * connect time.
  *
@@ -28,8 +29,9 @@
  * support for streaming large transactions.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
+#define LOGICALREP_PROTO_VERSION_NUM 1
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
-#define LOGICALREP_PROTO_VERSION_NUM 2
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
 
 /*
  * This struct stores a tuple received via logical replication.