Fix possibility of logical decoding partial transaction changes.
authorMasahiko Sawada <[email protected]>
Thu, 11 Jul 2024 13:48:23 +0000 (22:48 +0900)
committerMasahiko Sawada <[email protected]>
Thu, 11 Jul 2024 13:48:23 +0000 (22:48 +0900)
When creating and initializing a logical slot, the restart_lsn is set
to the latest WAL insertion point (or the latest replay point on
standbys). Subsequently, WAL records are decoded from that point to
find the start point for extracting changes in the
DecodingContextFindStartpoint() function. Since the initial
restart_lsn could be in the middle of a transaction, the start point
must be a consistent point where we won't see the data for partial
transactions.

Previously, when not building a full snapshot, serialized snapshots
were restored, and the SnapBuild jumps to the consistent state even
while finding the start point. Consequently, the slot's restart_lsn
and confirmed_flush could be set to the middle of a transaction. This
could lead to various unexpected consequences. Specifically, there
were reports of logical decoding decoding partial transactions, and
assertion failures occurred because only subtransactions were decoded
without decoding their top-level transaction until decoding the commit
record.

To resolve this issue, the changes prevent restoring the serialized
snapshot and jumping to the consistent state while finding the start
point.

On v17 and HEAD, a flag indicating whether snapshot restores should be
skipped has been added to the SnapBuild struct, and SNAPBUILD_VERSION
has been bumpded.

On backbranches, the flag is stored in the LogicalDecodingContext
instead, preserving on-disk compatibility.

Backpatch to all supported versions.

Reported-by: Drew Callahan
Reviewed-by: Amit Kapila, Hayato Kuroda
Discussion: https://postgr.es/m/2444AA15-D21B-4CCE-8052-52C7C2DAFE5C%40amazon.com
Backpatch-through: 12

contrib/test_decoding/Makefile
contrib/test_decoding/expected/skip_snapshot_restore.out [new file with mode: 0644]
contrib/test_decoding/meson.build
contrib/test_decoding/specs/skip_snapshot_restore.spec [new file with mode: 0644]
src/backend/replication/logical/logical.c
src/backend/replication/logical/snapbuild.c
src/include/replication/snapbuild.h

index c7ce6037064d848adb0c6fed4c6b40f5e6eaee90..a4ba1a509aec2e1ce821f56dfebac2bc4858590f 100644 (file)
@@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
    spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
    oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-   twophase_snapshot slot_creation_error catalog_change_snapshot
+   twophase_snapshot slot_creation_error catalog_change_snapshot \
+   skip_snapshot_restore
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out
new file mode 100644 (file)
index 0000000..c64dbd9
--- /dev/null
@@ -0,0 +1,45 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_insert1: INSERT INTO tbl VALUES (1);
+step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); <waiting ...>
+step s2_checkpoint: CHECKPOINT;
+step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert2: INSERT INTO tbl VALUES (2);
+step s0_commit: COMMIT;
+step s1_init: <... completed>
+?column?
+--------
+init    
+(1 row)
+
+step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                     
+-----------------------------------------
+BEGIN                                    
+table public.tbl: INSERT: val1[integer]:1
+table public.tbl: INSERT: val1[integer]:2
+COMMIT                                   
+(4 rows)
+
+step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
index f1548c0fafc6642a82a89717f7e1c1aaae281ac1..f643dc81a2c84c428dd50a642234043c2a343717 100644 (file)
@@ -62,6 +62,7 @@ tests += {
       'concurrent_stream',
       'twophase_snapshot',
       'slot_creation_error',
+      'skip_snapshot_restore',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec
new file mode 100644 (file)
index 0000000..3f1fb6f
--- /dev/null
@@ -0,0 +1,46 @@
+# Test that a slot creation skips to restore serialized snapshot to reach
+# the consistent state.
+
+setup
+{
+    DROP TABLE IF EXISTS tbl;
+    CREATE TABLE tbl (val1 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl;
+    SELECT 'stop' FROM pg_drop_replication_slot('slot0');
+    SELECT 'stop' FROM pg_drop_replication_slot('slot1');
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_insert1" { INSERT INTO tbl VALUES (1); }
+step "s0_insert2" { INSERT INTO tbl VALUES (2); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); }
+step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+session "s2"
+setup { SET synchronous_commit = on ;}
+step "s2_checkpoint" { CHECKPOINT; }
+step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+
+# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the
+# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1"
+# serializes consistent snapshots to the disk at LSNs where are before
+# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but
+# must not restore any serialized snapshots and will reach the consistent state
+# when decoding a RUNNING_XACT record generated after s0-transaction's commit.
+# We check if the get_changes on 'slot1' will not return any s0-transaction's
+# changes as its confirmed_flush_lsn will be after the s0-transaction's commit
+# record.
+permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1"
index 99f31849bb171dac0d25bea0f7fbfc2b86f1c65c..f8ef5d56d2693b1cabacdc519ff478a337e91851 100644 (file)
@@ -152,6 +152,7 @@ StartupDecodingContext(List *output_plugin_options,
                       TransactionId xmin_horizon,
                       bool need_full_snapshot,
                       bool fast_forward,
+                      bool in_create,
                       XLogReaderRoutine *xl_routine,
                       LogicalOutputPluginWriterPrepareWrite prepare_write,
                       LogicalOutputPluginWriterWrite do_write,
@@ -212,7 +213,7 @@ StartupDecodingContext(List *output_plugin_options,
    ctx->reorder = ReorderBufferAllocate();
    ctx->snapshot_builder =
        AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-                               need_full_snapshot, slot->data.two_phase_at);
+                               need_full_snapshot, in_create, slot->data.two_phase_at);
 
    ctx->reorder->private_data = ctx;
 
@@ -438,7 +439,7 @@ CreateInitDecodingContext(const char *plugin,
    ReplicationSlotSave();
 
    ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
-                                need_full_snapshot, false,
+                                need_full_snapshot, false, true,
                                 xl_routine, prepare_write, do_write,
                                 update_progress);
 
@@ -592,7 +593,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
    ctx = StartupDecodingContext(output_plugin_options,
                                 start_lsn, InvalidTransactionId, false,
-                                fast_forward, xl_routine, prepare_write,
+                                fast_forward, false, xl_routine, prepare_write,
                                 do_write, update_progress);
 
    /* call output plugin initialization callback */
index e37e22f4417c557e7457347250f95f6d1872d77d..ae676145e602e16b0d0a998900b13b0b4ff1d7f8 100644 (file)
@@ -189,6 +189,14 @@ struct SnapBuild
    /* Indicates if we are building full snapshot or just catalog one. */
    bool        building_full_snapshot;
 
+   /*
+    * Indicates if we are using the snapshot builder for the creation of a
+    * logical replication slot. If it's true, the start point for decoding
+    * changes is not determined yet. So we skip snapshot restores to properly
+    * find the start point. See SnapBuildFindSnapshot() for details.
+    */
+   bool        in_slot_creation;
+
    /*
     * Snapshot that's valid to see the catalog state seen at this moment.
     */
@@ -317,6 +325,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
                        TransactionId xmin_horizon,
                        XLogRecPtr start_lsn,
                        bool need_full_snapshot,
+                       bool in_slot_creation,
                        XLogRecPtr two_phase_at)
 {
    MemoryContext context;
@@ -347,6 +356,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 
    builder->initial_xmin_horizon = xmin_horizon;
    builder->start_decoding_at = start_lsn;
+   builder->in_slot_creation = in_slot_creation;
    builder->building_full_snapshot = need_full_snapshot;
    builder->two_phase_at = two_phase_at;
 
@@ -1327,10 +1337,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
     *    state while waiting on c)'s sub-states.
     *
     * b) This (in a previous run) or another decoding slot serialized a
-    *    snapshot to disk that we can use.  Can't use this method for the
-    *    initial snapshot when slot is being created and needs full snapshot
-    *    for export or direct use, as that snapshot will only contain catalog
-    *    modifying transactions.
+    *    snapshot to disk that we can use. Can't use this method while finding
+    *    the start point for decoding changes as the restart LSN would be an
+    *    arbitrary LSN but we need to find the start point to extract changes
+    *    where we won't see the data for partial transactions. Also, we cannot
+    *    use this method when a slot needs a full snapshot for export or direct
+    *    use, as that snapshot will only contain catalog modifying transactions.
     *
     * c) First incrementally build a snapshot for catalog tuples
     *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
@@ -1395,8 +1407,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 
        return false;
    }
-   /* b) valid on disk state and not building full snapshot */
+
+   /*
+    * b) valid on disk state and while neither building full snapshot nor
+    * creating a slot.
+    */
    else if (!builder->building_full_snapshot &&
+            !builder->in_slot_creation &&
             SnapBuildRestore(builder, lsn))
    {
        /* there won't be any state to cleanup */
@@ -1580,7 +1597,7 @@ typedef struct SnapBuildOnDisk
    offsetof(SnapBuildOnDisk, version)
 
 #define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 5
+#define SNAPBUILD_VERSION 6
 
 /*
  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
index a3360a1c5ea7a8d07021227d5c8c198426fc952b..caa5113ff81a8da3827c7e1c272079d533763969 100644 (file)
@@ -62,6 +62,7 @@ extern void CheckPointSnapBuild(void);
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
                                          TransactionId xmin_horizon, XLogRecPtr start_lsn,
                                          bool need_full_snapshot,
+                                         bool in_slot_creation,
                                          XLogRecPtr two_phase_at);
 extern void FreeSnapshotBuilder(SnapBuild *builder);