Dirty replication slots when using sql interface
authorSimon Riggs <[email protected]>
Mon, 5 Sep 2016 08:44:38 +0000 (09:44 +0100)
committerSimon Riggs <[email protected]>
Mon, 5 Sep 2016 08:44:38 +0000 (09:44 +0100)
When pg_logical_slot_get_changes(...) sets confirmed_flush_lsn to the point at
which replay stopped, it doesn't dirty the replication slot.  So if the replay
didn't cause restart_lsn or catalog_xmin to change as well, this change will
not get written out to disk. Even on a clean shutdown.

If Pg crashes or restarts, a subsequent pg_logical_slot_get_changes(...) call
will see the same changes already replayed since it uses the slot's
confirmed_flush_lsn as the start point for fetching changes. The caller can't
specify a start LSN when using the SQL interface.

Mark the slot as dirty after reading changes using the SQL interface so that
users won't see repeated changes after a clean shutdown. Repeated changes still
occur when using the walsender interface or after an unclean shutdown.

Craig Ringer

src/backend/replication/logical/logicalfuncs.c
src/test/recovery/Makefile
src/test/recovery/t/006_logical_decoding.pl [new file with mode: 0644]

index 4e4c8cdaeb21427c9e93526f97533f36046530ef..9c7be2dc7b69e068b8b2c66e7625dd7dabf897ef 100644 (file)
@@ -321,7 +321,22 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
                 * business..)
                 */
                if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
+               {
                        LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
+                       /*
+                        * If only the confirmed_flush_lsn has changed the slot won't get
+                        * marked as dirty by the above. Callers on the walsender interface
+                        * are expected to keep track of their own progress and don't need
+                        * it written out. But SQL-interface users cannot specify their own
+                        * start positions and it's harder for them to keep track of their
+                        * progress, so we should make more of an effort to save it for them.
+                        *
+                        * Dirty the slot so it's written out at the next checkpoint. We'll
+                        * still lose its position on crash, as documented, but it's better
+                        * than always losing the position even on clean restart.
+                        */
+                       ReplicationSlotMarkDirty();
+               }
 
                /* free context, call shutdown callback */
                FreeDecodingContext(ctx);
index 929071909ad0412f06416f6b5477283cbbaae177..a847952d72260d166efdb2d62dcf30fe94f2a861 100644 (file)
@@ -18,3 +18,5 @@ check:
 
 clean distclean maintainer-clean:
        rm -rf tmp_check
+
+EXTRA_INSTALL = contrib/test_decoding
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
new file mode 100644 (file)
index 0000000..b80a9a9
--- /dev/null
@@ -0,0 +1,40 @@
+# Testing of logical decoding using SQL interface and/or pg_recvlogical
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf(
+               'postgresql.conf', qq(
+max_replication_slots = 4
+wal_level = logical
+));
+$node_master->start;
+my $backup_name = 'master_backup';
+
+$node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
+
+$node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
+
+$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
+
+# Basic decoding works
+my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
+is(scalar(split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
+
+# If we immediately crash the server we might lose the progress we just made
+# and replay the same changes again. But a clean shutdown should never repeat
+# the same changes when we use the SQL decoding interface.
+$node_master->restart('fast');
+
+# There are no new writes, so the result should be empty.
+$result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
+chomp($result);
+is($result, '', 'Decoding after fast restart repeats no rows');
+
+# done with the node
+$node_master->stop;