Fix the logical replication timeout during large transactions.
authorAmit Kapila <[email protected]>
Wed, 11 May 2022 04:31:35 +0000 (10:01 +0530)
committerAmit Kapila <[email protected]>
Wed, 11 May 2022 04:31:35 +0000 (10:01 +0530)
The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.

To fix this we try to send the keepalive message if required after
processing certain threshold of changes.

Reported-by: Fabrice Chapuis
Author: Wang wei and Amit Kapila
Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda
Backpatch-through: 10
Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com

src/backend/replication/logical/logical.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/replication/walsender.c
src/include/replication/logical.h

index 1851a626f98f147fe8593c15a8409df14935bcaf..59d133af0184f08ee1d9b8fe199ef42be43da2b1 100644 (file)
@@ -589,6 +589,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 
    /* set output state */
    ctx->accept_writes = false;
+   ctx->end_xact = false;
 
    /* do the actual work: call callback */
    ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -614,6 +615,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 
    /* set output state */
    ctx->accept_writes = false;
+   ctx->end_xact = false;
 
    /* do the actual work: call callback */
    ctx->callbacks.shutdown_cb(ctx);
@@ -647,6 +649,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
    ctx->accept_writes = true;
    ctx->write_xid = txn->xid;
    ctx->write_location = txn->first_lsn;
+   ctx->end_xact = false;
 
    /* do the actual work: call callback */
    ctx->callbacks.begin_cb(ctx, txn);
@@ -676,6 +679,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    ctx->accept_writes = true;
    ctx->write_xid = txn->xid;
    ctx->write_location = txn->end_lsn; /* points to the end of the record */
+   ctx->end_xact = true;
 
    /* do the actual work: call callback */
    ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -713,6 +717,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     */
    ctx->write_location = change->lsn;
 
+   ctx->end_xact = false;
+
    ctx->callbacks.change_cb(ctx, txn, relation, change);
 
    /* Pop the error context stack */
@@ -737,6 +743,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 
    /* set output state */
    ctx->accept_writes = false;
+   ctx->end_xact = false;
 
    /* do the actual work: call callback */
    ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -772,6 +779,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    ctx->accept_writes = true;
    ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    ctx->write_location = message_lsn;
+   ctx->end_xact = false;
 
    /* do the actual work: call callback */
    ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
index 529a5c0b4861b7fb557e5fadd25f7524af9508be..f96fde3df0d9a52874cab7980308ab29ee8c7e3c 100644 (file)
@@ -48,6 +48,7 @@ static bool publications_valid;
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
                            uint32 hashvalue);
+static void update_replication_progress(LogicalDecodingContext *ctx);
 
 /* Entry in the map used to remember which relation schemas we sent. */
 typedef struct RelationSyncEntry
@@ -246,7 +247,7 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                    XLogRecPtr commit_lsn)
 {
-   OutputPluginUpdateProgress(ctx);
+   update_replication_progress(ctx);
 
    OutputPluginPrepareWrite(ctx, true);
    logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -264,6 +265,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    MemoryContext old;
    RelationSyncEntry *relentry;
 
+   update_replication_progress(ctx);
+
    if (!is_publishable_relation(relation))
        return;
 
@@ -628,3 +631,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
        entry->replicate_valid = false;
 }
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are not published.
+ */
+static void
+update_replication_progress(LogicalDecodingContext *ctx)
+{
+   static int  changes_count = 0;
+
+   /*
+    * We don't want to try sending a keepalive message after processing each
+    * change as that can have overhead. Tests revealed that there is no
+    * noticeable overhead in doing it after continuously processing 100 or so
+    * changes.
+    */
+#define CHANGES_THRESHOLD 100
+
+   /*
+    * If we are at the end of transaction LSN, update progress tracking.
+    * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+    * try to send a keepalive message if required.
+    */
+   if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+   {
+       OutputPluginUpdateProgress(ctx);
+       changes_count = 0;
+   }
+}
index 3c9b9f5376cb0fef875c455677b8c92eb65aab6f..4aac92749c3d70592ee340ef408ba6f203638cd9 100644 (file)
@@ -243,6 +243,7 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
+static void ProcessPendingWrites(void);
 static void WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
@@ -1190,6 +1191,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
    }
 
    /* If we have pending write here, go to slow path */
+   ProcessPendingWrites();
+}
+
+/*
+ * Wait until there is no pending write. Also process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+ProcessPendingWrites(void)
+{
    for (;;)
    {
        int         wakeEvents;
@@ -1256,18 +1267,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 {
    static TimestampTz sendTime = 0;
    TimestampTz now = GetCurrentTimestamp();
+   bool        end_xact = ctx->end_xact;
 
    /*
     * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
     * avoid flooding the lag tracker when we commit frequently.
+    *
+    * We don't have a mechanism to get the ack for any LSN other than end
+    * xact LSN from the downstream. So, we track lag only for end of
+    * transaction LSN.
     */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
-   if (!TimestampDifferenceExceeds(sendTime, now,
-                                   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
-       return;
+   if (end_xact && TimestampDifferenceExceeds(sendTime, now,
+                                              WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+   {
+       LagTrackerWrite(lsn, now);
+       sendTime = now;
+   }
 
-   LagTrackerWrite(lsn, now);
-   sendTime = now;
+   /*
+    * Try to send a keepalive if required. We don't need to try sending keep
+    * alive messages at the transaction end as that will be done at a later
+    * point in time. This is required only for large transactions where we
+    * don't send any changes to the downstream and the receiver can timeout
+    * due to that.
+    */
+   if (!end_xact &&
+       now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+                                          wal_sender_timeout / 2))
+       ProcessPendingWrites();
 }
 
 /*
index 7f0e0fa881a79f40476ef82091e1d339726eb548..b25a935bc08cbeb6a91c7eefd5394d8253c24fcc 100644 (file)
@@ -80,6 +80,7 @@ typedef struct LogicalDecodingContext
     */
    bool        accept_writes;
    bool        prepared_write;
+   bool        end_xact;
    XLogRecPtr  write_location;
    TransactionId write_xid;
 } LogicalDecodingContext;