#include "postgres.h"
#include "fmgr.h"
+#include "storage/condition_variable.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
+#include "storage/dsm_registry.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/wait_event.h"
PG_MODULE_MAGIC;
+/* Maximum number of waits usable in injection points at once */
+#define INJ_MAX_WAIT 8
+#define INJ_NAME_MAXLEN 64
+
+/* Shared state information for injection points. */
+typedef struct InjectionPointSharedState
+{
+ /* Protects access to other fields */
+ slock_t lock;
+
+ /* Counters advancing when injection_points_wakeup() is called */
+ uint32 wait_counts[INJ_MAX_WAIT];
+
+ /* Names of injection points attached to wait counters */
+ char name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
+
+ /* Condition variable used for waits and wakeups */
+ ConditionVariable wait_point;
+} InjectionPointSharedState;
+
+/* Pointer to shared-memory state. */
+static InjectionPointSharedState *inj_state = NULL;
+
extern PGDLLEXPORT void injection_error(const char *name);
extern PGDLLEXPORT void injection_notice(const char *name);
+extern PGDLLEXPORT void injection_wait(const char *name);
+
+
+/*
+ * Callback for shared memory area initialization.
+ */
+static void
+injection_point_init_state(void *ptr)
+{
+ InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
+
+ SpinLockInit(&state->lock);
+ memset(state->wait_counts, 0, sizeof(state->wait_counts));
+ memset(state->name, 0, sizeof(state->name));
+ ConditionVariableInit(&state->wait_point);
+}
+
+/*
+ * Initialize shared memory area for this module.
+ */
+static void
+injection_init_shmem(void)
+{
+ bool found;
+ if (inj_state != NULL)
+ return;
+
+ inj_state = GetNamedDSMSegment("injection_points",
+ sizeof(InjectionPointSharedState),
+ injection_point_init_state,
+ &found);
+}
/* Set of callbacks available to be attached to an injection point. */
void
elog(NOTICE, "notice triggered for injection point %s", name);
}
+/* Wait on a condition variable, awaken by injection_points_wakeup() */
+void
+injection_wait(const char *name)
+{
+ uint32 old_wait_counts = 0;
+ int index = -1;
+ uint32 injection_wait_event = 0;
+
+ if (inj_state == NULL)
+ injection_init_shmem();
+
+ /*
+ * Use the injection point name for this custom wait event. Note that
+ * this custom wait event name is not released, but we don't care much for
+ * testing as this should be short-lived.
+ */
+ injection_wait_event = WaitEventExtensionNew(name);
+
+ /*
+ * Find a free slot to wait for, and register this injection point's name.
+ */
+ SpinLockAcquire(&inj_state->lock);
+ for (int i = 0; i < INJ_MAX_WAIT; i++)
+ {
+ if (inj_state->name[i][0] == '\0')
+ {
+ index = i;
+ strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
+ old_wait_counts = inj_state->wait_counts[i];
+ break;
+ }
+ }
+ SpinLockRelease(&inj_state->lock);
+
+ if (index < 0)
+ elog(ERROR, "could not find free slot for wait of injection point %s ",
+ name);
+
+ /* And sleep.. */
+ ConditionVariablePrepareToSleep(&inj_state->wait_point);
+ for (;;)
+ {
+ uint32 new_wait_counts;
+
+ SpinLockAcquire(&inj_state->lock);
+ new_wait_counts = inj_state->wait_counts[index];
+ SpinLockRelease(&inj_state->lock);
+
+ if (old_wait_counts != new_wait_counts)
+ break;
+ ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
+ }
+ ConditionVariableCancelSleep();
+
+ /* Remove this injection point from the waiters. */
+ SpinLockAcquire(&inj_state->lock);
+ inj_state->name[index][0] = '\0';
+ SpinLockRelease(&inj_state->lock);
+}
+
/*
* SQL function for creating an injection point.
*/
function = "injection_error";
else if (strcmp(action, "notice") == 0)
function = "injection_notice";
+ else if (strcmp(action, "wait") == 0)
+ function = "injection_wait";
else
elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
PG_RETURN_VOID();
}
+/*
+ * SQL function for waking up an injection point waiting in injection_wait().
+ */
+PG_FUNCTION_INFO_V1(injection_points_wakeup);
+Datum
+injection_points_wakeup(PG_FUNCTION_ARGS)
+{
+ char *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ int index = -1;
+
+ if (inj_state == NULL)
+ injection_init_shmem();
+
+ /* First bump the wait counter for the injection point to wake up */
+ SpinLockAcquire(&inj_state->lock);
+ for (int i = 0; i < INJ_MAX_WAIT; i++)
+ {
+ if (strcmp(name, inj_state->name[i]) == 0)
+ {
+ index = i;
+ break;
+ }
+ }
+ if (index < 0)
+ {
+ SpinLockRelease(&inj_state->lock);
+ elog(ERROR, "could not find injection point %s to wake up", name);
+ }
+ inj_state->wait_counts[index]++;
+ SpinLockRelease(&inj_state->lock);
+
+ /* And broadcast the change to the waiters */
+ ConditionVariableBroadcast(&inj_state->wait_point);
+ PG_RETURN_VOID();
+}
+
/*
* SQL function for dropping an injection point.
*/