#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+/* Minimum interval used by walsender for stats flushes, in ms */
+#define WALSENDER_STATS_FLUSH_INTERVAL 1000
+
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
*
int wakeEvents;
uint32 wait_event = 0;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+ TimestampTz last_flush = 0;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
{
bool wait_for_standby_at_stop = false;
long sleeptime;
+ TimestampTz now;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
* new WAL to be generated. (But if we have nothing to send, we don't
* want to wake on socket-writable.)
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_SOCKET_READABLE;
Assert(wait_event != 0);
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ last_flush = now;
+ }
+
WalSndWait(wakeEvents, sleeptime, wait_event);
}
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
+ TimestampTz last_flush = 0;
+
/*
* Initialize the last reply timestamp. That enables timeout processing
* from hereon.
* WalSndWaitForWal() handle any other blocking; idle receivers need
* its additional actions. For physical replication, also block if
* caught up; its send_data does not block.
+ *
+ * The IO statistics are reported in WalSndWaitForWal() for the
+ * logical WAL senders.
*/
if ((WalSndCaughtUp && send_data != XLogSendLogical &&
!streamingDoneSending) ||
{
long sleeptime;
int wakeEvents;
+ TimestampTz now;
if (!streamingDoneReceiving)
wakeEvents = WL_SOCKET_READABLE;
* Use fresh timestamp, not last_processing, to reduce the chance
* of reaching wal_sender_timeout before sending a keepalive.
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ last_flush = now;
+ }
+
/* Sleep until something happens or we time out */
WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
}