Don't retreat slot's confirmed_flush LSN.
authorAmit Kapila <[email protected]>
Mon, 19 May 2025 06:43:06 +0000 (12:13 +0530)
committerAmit Kapila <[email protected]>
Mon, 19 May 2025 06:43:06 +0000 (12:13 +0530)
Prevent moving the confirmed_flush backwards, as this could lead to data
duplication issues caused by replicating already replicated changes.

This can happen when a client acknowledges an LSN it doesn't have to do
anything for, and thus didn't store persistently. After a restart, the
client can send the prior LSN that it stored persistently as an
acknowledgement, but we need to ignore such an LSN to avoid retreating
confirm_flush LSN.

Diagnosed-by: Zhijie Hou <[email protected]>
Author: shveta malik <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Tested-by: Nisha Moond <[email protected]>
Backpatch-through: 13
Discussion: https://postgr.es/m/CAJpy0uDZ29P=BYB1JDWMCh-6wXaNqMwG1u1mB4=10Ly0x7HhwQ@mail.gmail.com
Discussion: https://postgr.es/m/OS0PR01MB57164AB5716AF2E477D53F6F9489A@OS0PR01MB5716.jpnprd01.prod.outlook.com

src/backend/replication/logical/logical.c

index a8d2e024d344410167b3eb406c2aff02762fec29..1d56d0c4ef31450b30713ac8f4fca30a9c331b2b 100644 (file)
@@ -1828,7 +1828,19 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
        SpinLockAcquire(&MyReplicationSlot->mutex);
 
-       MyReplicationSlot->data.confirmed_flush = lsn;
+       /*
+        * Prevent moving the confirmed_flush backwards, as this could lead to
+        * data duplication issues caused by replicating already replicated
+        * changes.
+        *
+        * This can happen when a client acknowledges an LSN it doesn't have
+        * to do anything for, and thus didn't store persistently. After a
+        * restart, the client can send the prior LSN that it stored
+        * persistently as an acknowledgement, but we need to ignore such an
+        * LSN. See similar case handling in CreateDecodingContext.
+        */
+       if (lsn > MyReplicationSlot->data.confirmed_flush)
+           MyReplicationSlot->data.confirmed_flush = lsn;
 
        /* if we're past the location required for bumping xmin, do so */
        if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
@@ -1893,7 +1905,14 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    else
    {
        SpinLockAcquire(&MyReplicationSlot->mutex);
-       MyReplicationSlot->data.confirmed_flush = lsn;
+
+       /*
+        * Prevent moving the confirmed_flush backwards. See comments above
+        * for the details.
+        */
+       if (lsn > MyReplicationSlot->data.confirmed_flush)
+           MyReplicationSlot->data.confirmed_flush = lsn;
+
        SpinLockRelease(&MyReplicationSlot->mutex);
    }
 }