</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of in-progress transactions streamed to the decoding output plugin
+ after the memory used by logical decoding of changes from WAL for this
+ slot exceeds <literal>logical_decoding_work_mem</literal>. Streaming only
+ works with toplevel transactions (subtransactions can't be streamed
+ independently), so the counter does not get incremented for subtransactions.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_count</structfield><type>bigint</type>
+ </para>
+ <para>
+ Number of times in-progress transactions were streamed to the decoding
+ output plugin while decoding changes from WAL for this slot. Transactions
+ may get streamed repeatedly, and this counter gets incremented on every
+ such invocation.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stream_bytes</structfield><type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded in-progress transaction data streamed to the decoding
+ output plugin while decoding changes from WAL for this slot. This and other
+ streaming counters for this slot can be used to gauge the network I/O which
+ occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>.
+ </para>
+ </entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
*/
void
pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes)
+ int spillbytes, int streamtxns, int streamcount, int streambytes)
{
PgStat_MsgReplSlot msg;
msg.m_spill_txns = spilltxns;
msg.m_spill_count = spillcount;
msg.m_spill_bytes = spillbytes;
+ msg.m_stream_txns = streamtxns;
+ msg.m_stream_count = streamcount;
+ msg.m_stream_bytes = streambytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
replSlotStats[idx].spill_txns += msg->m_spill_txns;
replSlotStats[idx].spill_count += msg->m_spill_count;
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ replSlotStats[idx].stream_txns += msg->m_stream_txns;
+ replSlotStats[idx].stream_count += msg->m_stream_count;
+ replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
}
}
replSlotStats[i].spill_txns = 0;
replSlotStats[i].spill_count = 0;
replSlotStats[i].spill_bytes = 0;
+ replSlotStats[i].stream_txns = 0;
+ replSlotStats[i].stream_count = 0;
+ replSlotStats[i].stream_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
ReorderBuffer *rb = ctx->reorder;
/*
- * Nothing to do if we haven't spilled anything since the last time the
- * stats has been sent.
+ * Nothing to do if we haven't spilled or streamed anything since the last
+ * time the stats has been sent.
*/
- if (rb->spillBytes <= 0)
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
- (long long) rb->spillBytes);
+ (long long) rb->spillBytes,
+ (long long) rb->streamTxns,
+ (long long) rb->streamCount,
+ (long long) rb->streamBytes);
pgstat_report_replslot(NameStr(ctx->slot->data.name),
- rb->spillTxns, rb->spillCount, rb->spillBytes);
+ rb->spillTxns, rb->spillCount, rb->spillBytes,
+ rb->streamTxns, rb->streamCount, rb->streamBytes);
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
+ rb->streamTxns = 0;
+ rb->streamCount = 0;
+ rb->streamBytes = 0;
}
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
+ buffer->streamTxns = 0;
+ buffer->streamCount = 0;
+ buffer->streamBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
{
Snapshot snapshot_now;
CommandId command_id;
+ Size stream_bytes;
+ bool txn_is_streamed;
/* We can never reach here for a subtransaction. */
Assert(txn->toptxn == NULL);
txn->snapshot_now = NULL;
}
+ /*
+ * Remember this information to be used later to update stats. We can't
+ * update the stats here as an error while processing the changes would
+ * lead to the accumulation of stats even though we haven't streamed all
+ * the changes.
+ */
+ txn_is_streamed = rbtxn_is_streamed(txn);
+ stream_bytes = txn->total_size;
+
/* Process and send the changes to output plugin. */
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
command_id, true);
+ rb->streamCount += 1;
+ rb->streamBytes += stream_bytes;
+
+ /* Don't consider already streamed transaction. */
+ rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
- pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
/*
* Now that the slot has been marked as in_use and active, it's safe to
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 5
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
values[1] = Int64GetDatum(s->spill_txns);
values[2] = Int64GetDatum(s->spill_count);
values[3] = Int64GetDatum(s->spill_bytes);
+ values[4] = Int64GetDatum(s->stream_txns);
+ values[5] = Int64GetDatum(s->stream_count);
+ values[6] = Int64GetDatum(s->stream_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[4] = true;
+ nulls[7] = true;
else
- values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202010281
+#define CATALOG_VERSION_NO 202010291
#endif
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
PgStat_Counter m_spill_bytes;
+ PgStat_Counter m_stream_txns;
+ PgStat_Counter m_stream_count;
+ PgStat_Counter m_stream_bytes;
} PgStat_MsgReplSlot;
PgStat_Counter spill_txns;
PgStat_Counter spill_count;
PgStat_Counter spill_bytes;
+ PgStat_Counter stream_txns;
+ PgStat_Counter stream_count;
+ PgStat_Counter stream_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes);
+ int spillbytes, int streamtxns, int streamcount, int streambytes);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
int64 spillTxns; /* number of transactions spilled to disk */
int64 spillCount; /* spill-to-disk invocation counter */
int64 spillBytes; /* amount of data spilled to disk */
+
+ /* Statistics about transactions streamed to the decoding output plugin */
+ int64 streamTxns; /* number of transactions streamed */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamBytes; /* amount of data streamed */
};
s.spill_txns,
s.spill_count,
s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,