From 079d0cacf4fefc326bf776ecc2df5dba995f50de Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Sat, 26 Sep 2020 10:08:00 +0530 Subject: [PATCH] Fix the logical replication from HEAD to lower versions. Commit 464824323e changed the logical replication protocol to allow the streaming of in-progress transactions and used the new version of protocol irrespective of the server version. Use the appropriate version of the protocol based on the server version. Reported-by: Ashutosh Sharma Author: Dilip Kumar Reviewed-by: Ashutosh Sharma and Amit Kapila Discussion: https://postgr.es/m/CAE9k0P=9OpXcNrcU5Gsvd5MZ8GFpiN833vNHzX6Uc=8+h1ft1Q@mail.gmail.com --- src/backend/replication/logical/worker.c | 4 +++- src/backend/replication/pgoutput/pgoutput.c | 4 ++-- src/include/replication/logicalproto.h | 8 +++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d239d28c094..9c6fdeeb56c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3087,7 +3087,9 @@ ApplyWorkerMain(Datum main_arg) options.logical = true; options.startpoint = origin_startpos; options.slotname = myslotname; - options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; + options.proto.logical.proto_version = + walrcv_server_version(wrconn) >= 140000 ? + LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index eb1f23004e7..9c997aed836 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -272,11 +272,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, &enable_streaming); /* Check if we support requested protocol */ - if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) + if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("client sent proto_version=%d but we only support protocol %d or lower", - data->protocol_version, LOGICALREP_PROTO_VERSION_NUM))); + data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM))); if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM) ereport(ERROR, diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 607a728508b..0c2cda264e1 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -19,8 +19,9 @@ /* * Protocol capabilities * - * LOGICALREP_PROTO_VERSION_NUM is our native protocol and the greatest version - * we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we + * LOGICALREP_PROTO_VERSION_NUM is our native protocol. + * LOGICALREP_PROTO_MAX_VERSION_NUM is the greatest version we can support. + * LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we * have backwards compatibility for. The client requests protocol version at * connect time. * @@ -28,8 +29,9 @@ * support for streaming large transactions. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 +#define LOGICALREP_PROTO_VERSION_NUM 1 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 -#define LOGICALREP_PROTO_VERSION_NUM 2 +#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM /* * This struct stores a tuple received via logical replication. -- 2.30.2