Track statistics for streaming of changes from ReorderBuffer.
authorAmit Kapila <[email protected]>
Thu, 29 Oct 2020 03:41:51 +0000 (09:11 +0530)
committerAmit Kapila <[email protected]>
Thu, 29 Oct 2020 03:41:51 +0000 (09:11 +0530)
This adds the statistics about transactions streamed to the decoding
output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular slot.
Users can pass NULL in pg_stat_reset_replication_slot to reset stats of
all the slots.

Commit 9868167500 has added the basic infrastructure to capture the stats
of slot and this commit extends the statistics collector to track
additional information about slots.

Bump the catversion as we have added new columns in the catalog entry.

Author: Ajin Cherian and Amit Kapila
Reviewed-by: Sawada Masahiko and Dilip Kumar
Discussion: https://postgr.es/m/CAA4eK1+chpEomLzgSoky-D31qev19AmECNiEAietPQUGEFhtVA@mail.gmail.com

12 files changed:
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/postmaster/pgstat.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/slot.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/include/replication/reorderbuffer.h
src/test/regress/expected/rules.out

index 313e44ed54987993b9de90bb62680b19ce319d5e..98e1995453882809903a2c19200463210f1caa30 100644 (file)
@@ -2632,6 +2632,44 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of in-progress transactions streamed to the decoding output plugin
+        after the memory used by logical decoding of changes from WAL for this
+        slot exceeds <literal>logical_decoding_work_mem</literal>. Streaming only
+        works with toplevel transactions (subtransactions can't be streamed
+        independently), so the counter does not get incremented for subtransactions.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_count</structfield><type>bigint</type>
+       </para>
+       <para>
+        Number of times in-progress transactions were streamed to the decoding
+        output plugin while decoding changes from WAL for this slot. Transactions
+        may get streamed repeatedly, and this counter gets incremented on every
+        such invocation.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stream_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded in-progress transaction data streamed to the decoding
+        output plugin while decoding changes from WAL for this slot. This and other
+        streaming counters for this slot can be used to gauge the network I/O which
+        occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
index c6dd084fbccb7da7015df568b4a8650fa344d58c..5171ea05c7ea9b65cf5ada73f1535e328fce6d5d 100644 (file)
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
             s.spill_txns,
             s.spill_count,
             s.spill_bytes,
+            s.stream_txns,
+            s.stream_count,
+            s.stream_bytes,
             s.stats_reset
     FROM pg_stat_get_replication_slots() AS s;
 
index 822f0ebc6285a70c8bef1efb301a58b152c08b64..f1dca2f25b75ec601f43e8294beaf1d308a616be 100644 (file)
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
  */
 void
 pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-                      int spillbytes)
+                      int spillbytes, int streamtxns, int streamcount, int streambytes)
 {
    PgStat_MsgReplSlot msg;
 
@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
    msg.m_spill_txns = spilltxns;
    msg.m_spill_count = spillcount;
    msg.m_spill_bytes = spillbytes;
+   msg.m_stream_txns = streamtxns;
+   msg.m_stream_count = streamcount;
+   msg.m_stream_bytes = streambytes;
    pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
        replSlotStats[idx].spill_txns += msg->m_spill_txns;
        replSlotStats[idx].spill_count += msg->m_spill_count;
        replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+       replSlotStats[idx].stream_txns += msg->m_stream_txns;
+       replSlotStats[idx].stream_count += msg->m_stream_count;
+       replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
    }
 }
 
@@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts)
    replSlotStats[i].spill_txns = 0;
    replSlotStats[i].spill_count = 0;
    replSlotStats[i].spill_bytes = 0;
+   replSlotStats[i].stream_txns = 0;
+   replSlotStats[i].stream_count = 0;
+   replSlotStats[i].stream_bytes = 0;
    replSlotStats[i].stat_reset_timestamp = ts;
 }
 
index 8675832f4d6e08f4a35fe5825a50d318fc15259d..d5cfbeaa4affd00e32d754d49079e938592c5a4c 100644 (file)
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
    ReorderBuffer *rb = ctx->reorder;
 
    /*
-    * Nothing to do if we haven't spilled anything since the last time the
-    * stats has been sent.
+    * Nothing to do if we haven't spilled or streamed anything since the last
+    * time the stats has been sent.
     */
-   if (rb->spillBytes <= 0)
+   if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
        return;
 
-   elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld",
+   elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
         rb,
         (long long) rb->spillTxns,
         (long long) rb->spillCount,
-        (long long) rb->spillBytes);
+        (long long) rb->spillBytes,
+        (long long) rb->streamTxns,
+        (long long) rb->streamCount,
+        (long long) rb->streamBytes);
 
    pgstat_report_replslot(NameStr(ctx->slot->data.name),
-                          rb->spillTxns, rb->spillCount, rb->spillBytes);
+                          rb->spillTxns, rb->spillCount, rb->spillBytes,
+                          rb->streamTxns, rb->streamCount, rb->streamBytes);
    rb->spillTxns = 0;
    rb->spillCount = 0;
    rb->spillBytes = 0;
+   rb->streamTxns = 0;
+   rb->streamCount = 0;
+   rb->streamBytes = 0;
 }
index 7a8bf760791c0db33df330d0636a12135725763e..c1bd68011c5935cfbd68a8ba50829a8c67c785e1 100644 (file)
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
    buffer->spillTxns = 0;
    buffer->spillCount = 0;
    buffer->spillBytes = 0;
+   buffer->streamTxns = 0;
+   buffer->streamCount = 0;
+   buffer->streamBytes = 0;
 
    buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
    Snapshot    snapshot_now;
    CommandId   command_id;
+   Size        stream_bytes;
+   bool        txn_is_streamed;
 
    /* We can never reach here for a subtransaction. */
    Assert(txn->toptxn == NULL);
@@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        txn->snapshot_now = NULL;
    }
 
+   /*
+    * Remember this information to be used later to update stats. We can't
+    * update the stats here as an error while processing the changes would
+    * lead to the accumulation of stats even though we haven't streamed all
+    * the changes.
+    */
+   txn_is_streamed = rbtxn_is_streamed(txn);
+   stream_bytes = txn->total_size;
+
    /* Process and send the changes to output plugin. */
    ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
                            command_id, true);
 
+   rb->streamCount += 1;
+   rb->streamBytes += stream_bytes;
+
+   /* Don't consider already streamed transaction. */
+   rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+
    Assert(dlist_is_empty(&txn->changes));
    Assert(txn->nentries == 0);
    Assert(txn->nentries_mem == 0);
index 220b4cd6e99cde2008309a73abcb0dc5511a3f16..09be1d8c4851956ed5ed9affcda01a82140dfd08 100644 (file)
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
     * ReplicationSlotAllocationLock.
     */
    if (SlotIsLogical(slot))
-       pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+       pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
 
    /*
     * Now that the slot has been marked as in_use and active, it's safe to
index 472fa596e1f8ad3a69d11165168c81637b751f33..a210fc93b415691abc7d068bca4589dc79fc6d8b 100644 (file)
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 5
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc   tupdesc;
    Tuplestorestate *tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
        values[1] = Int64GetDatum(s->spill_txns);
        values[2] = Int64GetDatum(s->spill_count);
        values[3] = Int64GetDatum(s->spill_bytes);
+       values[4] = Int64GetDatum(s->stream_txns);
+       values[5] = Int64GetDatum(s->stream_count);
+       values[6] = Int64GetDatum(s->stream_bytes);
 
        if (s->stat_reset_timestamp == 0)
-           nulls[4] = true;
+           nulls[7] = true;
        else
-           values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+           values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
 
        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    }
index 8cf02fc0d852257a4b928a50c0a354bf730e68ef..73650f88e94740ba06d13b6ee6b38e6566d7673a 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202010281
+#define CATALOG_VERSION_NO 202010291
 
 #endif
index 24ec2cfed6a390d3ba576e4ac956ecd20f47f065..d9770bbadd8ba22a432ad81093e228ab53b1de2d 100644 (file)
   proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stats_reset}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
index a821ff4f158fee684410ed4512b004ad63961002..257e515bfe75f67d92e9975cc6a8848815d07497 100644 (file)
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
    PgStat_Counter m_spill_txns;
    PgStat_Counter m_spill_count;
    PgStat_Counter m_spill_bytes;
+   PgStat_Counter m_stream_txns;
+   PgStat_Counter m_stream_count;
+   PgStat_Counter m_stream_bytes;
 } PgStat_MsgReplSlot;
 
 
@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
    PgStat_Counter spill_txns;
    PgStat_Counter spill_count;
    PgStat_Counter spill_bytes;
+   PgStat_Counter stream_txns;
+   PgStat_Counter stream_count;
+   PgStat_Counter stream_bytes;
    TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotStats;
 
@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
-                                  int spillbytes);
+                                  int spillbytes, int streamtxns, int streamcount, int streambytes);
 extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
index 1c77819aad25cbc76b25ffb96416113b292e1911..dfdda938b2a9c7a451febbc83b895e49bbb0868b 100644 (file)
@@ -551,6 +551,11 @@ struct ReorderBuffer
    int64       spillTxns;      /* number of transactions spilled to disk */
    int64       spillCount;     /* spill-to-disk invocation counter */
    int64       spillBytes;     /* amount of data spilled to disk */
+
+   /* Statistics about transactions streamed to the decoding output plugin */
+   int64       streamTxns;     /* number of transactions streamed */
+   int64       streamCount;    /* streaming invocation counter */
+   int64       streamBytes;    /* amount of data streamed */
 };
 
 
index 492cdcf74c36abf02b644f1b1501d765b206f3da..097ff5d111f73cdd0ed55576998ed9af78850640 100644 (file)
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.spill_txns,
     s.spill_count,
     s.spill_bytes,
+    s.stream_txns,
+    s.stream_count,
+    s.stream_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stats_reset);
+   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,