Don't retreat slot's confirmed_flush LSN. REL_16_STABLE github/REL_16_STABLE
authorAmit Kapila <[email protected]>
Mon, 19 May 2025 06:11:22 +0000 (11:41 +0530)
committerAmit Kapila <[email protected]>
Mon, 19 May 2025 06:11:22 +0000 (11:41 +0530)
Prevent moving the confirmed_flush backwards, as this could lead to data
duplication issues caused by replicating already replicated changes.

This can happen when a client acknowledges an LSN it doesn't have to do
anything for, and thus didn't store persistently. After a restart, the
client can send the prior LSN that it stored persistently as an
acknowledgement, but we need to ignore such an LSN to avoid retreating
confirm_flush LSN.

Diagnosed-by: Zhijie Hou <[email protected]>
Author: shveta malik <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Tested-by: Nisha Moond <[email protected]>
Backpatch-through: 13
Discussion: https://postgr.es/m/CAJpy0uDZ29P=BYB1JDWMCh-6wXaNqMwG1u1mB4=10Ly0x7HhwQ@mail.gmail.com
Discussion: https://postgr.es/m/OS0PR01MB57164AB5716AF2E477D53F6F9489A@OS0PR01MB5716.jpnprd01.prod.outlook.com

src/backend/replication/logical/logical.c

index 8f1aa3fde9cccd78e1ecfff091657303bf23ce4d..1dbb747465f28032a961c711e6dcd1f7ac9fae9c 100644 (file)
@@ -1830,7 +1830,19 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
        SpinLockAcquire(&MyReplicationSlot->mutex);
 
-       MyReplicationSlot->data.confirmed_flush = lsn;
+       /*
+        * Prevent moving the confirmed_flush backwards, as this could lead to
+        * data duplication issues caused by replicating already replicated
+        * changes.
+        *
+        * This can happen when a client acknowledges an LSN it doesn't have
+        * to do anything for, and thus didn't store persistently. After a
+        * restart, the client can send the prior LSN that it stored
+        * persistently as an acknowledgement, but we need to ignore such an
+        * LSN. See similar case handling in CreateDecodingContext.
+        */
+       if (lsn > MyReplicationSlot->data.confirmed_flush)
+           MyReplicationSlot->data.confirmed_flush = lsn;
 
        /* if we're past the location required for bumping xmin, do so */
        if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
@@ -1895,7 +1907,14 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    else
    {
        SpinLockAcquire(&MyReplicationSlot->mutex);
-       MyReplicationSlot->data.confirmed_flush = lsn;
+
+       /*
+        * Prevent moving the confirmed_flush backwards. See comments above
+        * for the details.
+        */
+       if (lsn > MyReplicationSlot->data.confirmed_flush)
+           MyReplicationSlot->data.confirmed_flush = lsn;
+
        SpinLockRelease(&MyReplicationSlot->mutex);
    }
 }