Fix incorrect output from pgoutput when using column lists.
authorAmit Kapila <[email protected]>
Fri, 2 Dec 2022 05:22:08 +0000 (10:52 +0530)
committerAmit Kapila <[email protected]>
Fri, 2 Dec 2022 05:22:58 +0000 (10:52 +0530)
For Updates and Deletes, we were not honoring the columns list for old
tuple values while sending tuple data via pgoutput. This results in
pgoutput emitting more columns than expected.

This is not a problem for built-in logical replication as we simply ignore
additional columns based on the relation information sent previously which
didn't have those columns. However, some other users of pgoutput plugin
may expect the columns as per the column list. Also, sending extra columns
unnecessarily consumes network bandwidth defeating the purpose of the
column list feature.

Reported-by: Gunnar Morling
Author: Hou Zhijie
Reviewed-by: Amit Kapila
Backpatch-through: 15
Discussion: https://postgr.es/m/CADGJaX9kiRZ-OH0EpWF5Fkyh1ZZYofoNRCrhapBfdk02tj5EKg@mail.gmail.com

src/backend/replication/logical/proto.c
src/backend/replication/pgoutput/pgoutput.c
src/include/replication/logicalproto.h
src/test/subscription/t/031_column_list.pl

index ff8513e2d2980e0010a1566c06e7435b5193bfee..f5f2bc24d8fb9469ab27306d61bfcaf536dc4c7d 100644 (file)
@@ -478,7 +478,7 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
            pq_sendbyte(out, 'O');  /* old tuple follows */
        else
            pq_sendbyte(out, 'K');  /* old key follows */
-       logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+       logicalrep_write_tuple(out, rel, oldslot, binary, columns);
    }
 
    pq_sendbyte(out, 'N');      /* new tuple follows */
@@ -531,7 +531,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
  */
 void
 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
-                       TupleTableSlot *oldslot, bool binary)
+                       TupleTableSlot *oldslot, bool binary,
+                       Bitmapset *columns)
 {
    Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
           rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -551,7 +552,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
    else
        pq_sendbyte(out, 'K');  /* old key follows */
 
-   logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+   logicalrep_write_tuple(out, rel, oldslot, binary, columns);
 }
 
 /*
index f2128190d899d0ab7d3f88ff1f99fd07636c3026..ca46fba3afe0daa8052c648fb005c632d1b5f205 100644 (file)
@@ -1532,7 +1532,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                    break;
                case REORDER_BUFFER_CHANGE_DELETE:
                    logicalrep_write_delete(ctx->out, xid, targetrel,
-                                           old_slot, data->binary);
+                                           old_slot, data->binary,
+                                           relentry->columns);
                    break;
                default:
                    Assert(false);
@@ -1578,7 +1579,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
                OutputPluginPrepareWrite(ctx, true);
                logicalrep_write_delete(ctx->out, xid, targetrel,
-                                       old_slot, data->binary);
+                                       old_slot, data->binary,
+                                       relentry->columns);
                OutputPluginWrite(ctx, true);
            }
            else
index 7eaa4c97eda9cabf37e633515f153d9f81e81447..1eb7dda529fdad58d9b2f454771ba33cd1592ae9 100644 (file)
@@ -220,7 +220,7 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in,
                                              LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
                                    Relation rel, TupleTableSlot *oldslot,
-                                   bool binary);
+                                   bool binary, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
                                              LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
index ae022faa785b2f7091a88d0ac98502f3d58e872c..2ca120f7a47ba9459cb4705a3e28605b7e204eaf 100644 (file)
@@ -1151,6 +1151,39 @@ is( $node_subscriber->safe_psql(
 4||),
    'publication containing both parent and child relation');
 
+# TEST: Only columns in the column list should exist in the old tuple of UPDATE
+# and DELETE.
+
+$node_publisher->safe_psql(
+   'postgres', qq(
+   CREATE TABLE test_oldtuple_col (a int PRIMARY KEY, b int, c int);
+   CREATE PUBLICATION pub_check_oldtuple FOR TABLE test_oldtuple_col (a, b);
+   INSERT INTO test_oldtuple_col VALUES(1, 2, 3);
+   SELECT * FROM pg_create_logical_replication_slot('test_slot', 'pgoutput');
+   UPDATE test_oldtuple_col SET a = 2;
+   DELETE FROM test_oldtuple_col;
+));
+
+
+# Check at 7th byte of binary data for the number of columns in the old tuple.
+#
+# 7 = 1 (count from 1) + 1 byte (message type) + 4 byte (relid) + 1 byte (flag
+# for old key).
+#
+# The message type of UPDATE is 85('U').
+# The message type of DELETE is 68('D').
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT substr(data, 7, 2) = int2send(2::smallint)
+       FROM pg_logical_slot_peek_binary_changes('test_slot', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'pub_check_oldtuple')
+       WHERE get_byte(data, 0) = 85 OR get_byte(data, 0) = 68
+));
+
+is( $result, qq(t
+t), 'check the number of columns in the old tuple');
+
 
 # TEST: With a table included in multiple publications with different column
 # lists, we should catch the error when creating the subscription.