Skip to content

Refactor some old code, add some comments, update included headers #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 63 additions & 107 deletions collector.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,65 @@
* collector.c
* Collector of wait event history and profile.
*
* Copyright (c) 2015-2016, Postgres Professional
* Copyright (c) 2015-2025, Postgres Professional
*
* IDENTIFICATION
* contrib/pg_wait_sampling/pg_wait_sampling.c
*/
#include "postgres.h"

#include "catalog/pg_type.h"
#if PG_VERSION_NUM >= 130000
#include "common/hashfn.h"
#endif
#include "funcapi.h"
#include "miscadmin.h"
#include "pg_wait_sampling.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/ipc.h"
#include "storage/procarray.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/spin.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/timestamp.h"
#if PG_VERSION_NUM < 140000
#include "pgstat.h"
#else
#include "utils/wait_event.h"
#endif

#include "compat.h"
#include "pg_wait_sampling.h"
static inline shm_mq_result
shm_mq_send_compat(shm_mq_handle *mqh, Size nbytes, const void *data,
bool nowait, bool force_flush)
{
#if PG_VERSION_NUM >= 150000
return shm_mq_send(mqh, nbytes, data, nowait, force_flush);
#else
return shm_mq_send(mqh, nbytes, data, nowait);
#endif
}

static volatile sig_atomic_t shutdown_requested = false;
#if PG_VERSION_NUM < 170000
#define INIT_PG_LOAD_SESSION_LIBS 0x0001
#define INIT_PG_OVERRIDE_ALLOW_CONNS 0x0002
#endif

static void handle_sigterm(SIGNAL_ARGS);
static inline void
InitPostgresCompat(const char *in_dbname, Oid dboid,
const char *username, Oid useroid,
bits32 flags,
char *out_dbname)
{
#if PG_VERSION_NUM >= 170000
InitPostgres(in_dbname, dboid, username, useroid, flags, out_dbname);
#elif PG_VERSION_NUM >= 150000
InitPostgres(in_dbname, dboid, username, useroid,
flags & INIT_PG_LOAD_SESSION_LIBS,
flags & INIT_PG_OVERRIDE_ALLOW_CONNS, out_dbname);
#else
InitPostgres(in_dbname, dboid, username, useroid, out_dbname,
flags & INIT_PG_OVERRIDE_ALLOW_CONNS);
#endif
}

/*
* Register background worker for collecting waits history.
Expand Down Expand Up @@ -111,16 +139,6 @@ realloc_history(History *observations, int count)
observations->wraparound = false;
}

static void
handle_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
shutdown_requested = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}

/*
* Get next item of history with rotation.
*/
Expand All @@ -129,6 +147,7 @@ get_next_observation(History *observations)
{
HistoryItem *result;

/* Check for wraparound */
if (observations->index >= observations->count)
{
observations->index = 0;
Expand Down Expand Up @@ -215,6 +234,7 @@ send_history(History *observations, shm_mq_handle *mqh)
else
count = observations->index;

/* Send array size first since receive_array expects this */
mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true);
if (mq_result == SHM_MQ_DETACHED)
{
Expand Down Expand Up @@ -251,6 +271,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
Size count = hash_get_num_entries(profile_hash);
shm_mq_result mq_result;

/* Send array size first since receive_array expects this */
mq_result = shm_mq_send_compat(mqh, sizeof(count), &count, false, true);
if (mq_result == SHM_MQ_DETACHED)
{
Expand Down Expand Up @@ -283,32 +304,11 @@ make_profile_hash()
{
HASHCTL hash_ctl;

hash_ctl.hash = tag_hash;
hash_ctl.hcxt = TopMemoryContext;

if (pgws_profileQueries)
hash_ctl.keysize = offsetof(ProfileItem, count);
else
hash_ctl.keysize = offsetof(ProfileItem, queryId);

/* We always include queryId in hash key */
hash_ctl.keysize = offsetof(ProfileItem, count);
hash_ctl.entrysize = sizeof(ProfileItem);
return hash_create("Waits profile hash", 1024, &hash_ctl,
HASH_FUNCTION | HASH_ELEM);
}

/*
* Delta between two timestamps in milliseconds.
*/
static int64
millisecs_diff(TimestampTz tz1, TimestampTz tz2)
{
long secs;
int microsecs;

TimestampDifference(tz1, tz2, &secs, &microsecs);

return secs * 1000 + microsecs / 1000;

HASH_ELEM | HASH_BLOBS);
}

/*
Expand All @@ -319,77 +319,49 @@ pgws_collector_main(Datum main_arg)
{
HTAB *profile_hash = NULL;
History observations;
MemoryContext old_context,
collector_context;
TimestampTz current_ts,
history_ts,
profile_ts;

/*
* Establish signal handlers.
*
* We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
* it would a normal user backend. To make that happen, we establish a
* signal handler that is a stripped-down version of die(). We don't have
* any equivalent of the backend's command-read loop, where interrupts can
* be processed immediately, so make sure ImmediateInterruptOK is turned
* off.
*
* We also want to respond to the ProcSignal notifications. This is done
* in the upstream provided procsignal_sigusr1_handler, which is
* automatically used if a bgworker connects to a database. But since our
* worker doesn't connect to any database even though it calls
* InitPostgres, which will still initializze a new backend and thus
* partitipate to the ProcSignal infrastructure.
*/
pqsignal(SIGTERM, handle_sigterm);
/* Establish signal handlers */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
BackgroundWorkerUnblockSignals();
InitPostgresCompat(NULL, InvalidOid, NULL, InvalidOid, 0, NULL);
SetProcessingMode(NormalProcessing);

/* Make pg_wait_sampling recognisable in pg_stat_activity */
pgstat_report_appname("pg_wait_sampling collector");

profile_hash = make_profile_hash();
pgws_collector_hdr->latch = &MyProc->procLatch;

CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_wait_sampling collector");
collector_context = AllocSetContextCreate(TopMemoryContext,
"pg_wait_sampling context", ALLOCSET_DEFAULT_SIZES);
old_context = MemoryContextSwitchTo(collector_context);
alloc_history(&observations, pgws_historySize);
MemoryContextSwitchTo(old_context);
profile_hash = make_profile_hash();

ereport(LOG, (errmsg("pg_wait_sampling collector started")));
ereport(LOG, errmsg("pg_wait_sampling collector started"));

/* Start counting time for history and profile samples */
profile_ts = history_ts = GetCurrentTimestamp();

while (1)
{
int rc;
shm_mq_handle *mqh;
int64 history_diff,
profile_diff;
bool write_history,
write_profile;

/* We need an explicit call for at least ProcSignal notifications. */
CHECK_FOR_INTERRUPTS();
HandleMainLoopInterrupts();

if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}

/* Wait calculate time to next sample for history or profile */
/* Calculate time to next sample for history or profile */
current_ts = GetCurrentTimestamp();

history_diff = millisecs_diff(history_ts, current_ts);
profile_diff = millisecs_diff(profile_ts, current_ts);
history_diff = TimestampDifferenceMilliseconds(history_ts, current_ts);
profile_diff = TimestampDifferenceMilliseconds(profile_ts, current_ts);

write_history = (history_diff >= (int64)pgws_historyPeriod);
write_profile = (profile_diff >= (int64)pgws_profilePeriod);
Expand All @@ -412,20 +384,15 @@ pgws_collector_main(Datum main_arg)
}
}

/* Shutdown if requested */
if (shutdown_requested)
break;

/*
* Wait until next sample time or request to do something through
* Wait for sample time or until request to do something through
* shared memory.
*/
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
Min(pgws_historyPeriod - (int)history_diff,
pgws_historyPeriod - (int)profile_diff), PG_WAIT_EXTENSION);

if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
Min(pgws_historyPeriod - (int)history_diff,
pgws_profilePeriod - (int)profile_diff),
PG_WAIT_EXTENSION);

ResetLatch(&MyProc->procLatch);

Expand Down Expand Up @@ -484,15 +451,4 @@ pgws_collector_main(Datum main_arg)
LockRelease(&tag, ExclusiveLock, false);
}
}

MemoryContextReset(collector_context);

/*
* We're done. Explicitly detach the shared memory segment so that we
* don't get a resource leak warning at commit time. This will fire any
* on_dsm_detach callbacks we've registered, as well. Once that's done,
* we can go ahead and exit.
*/
ereport(LOG, (errmsg("pg_wait_sampling collector shutting down")));
proc_exit(0);
}
54 changes: 0 additions & 54 deletions compat.h

This file was deleted.

Loading