injection_points: Add wait and wakeup of processes
authorMichael Paquier <[email protected]>
Mon, 4 Mar 2024 00:19:13 +0000 (09:19 +0900)
committerMichael Paquier <[email protected]>
Mon, 4 Mar 2024 00:19:13 +0000 (09:19 +0900)
This commit adds two features to the in-core module for injection
points:
- A new callback called "wait" that can be attached to an injection
point to make it wait.
- A new SQL function to update the shared state and broadcast the update
using a condition variable.  This function uses an input an injection
point name.

This offers the possibility to stop a process in flight and wake it up
in a controlled manner, which is useful when implementing tests that aim
to trigger scenarios for race conditions (some tests are planned for
integration).  The logic uses a set of counters with a condition
variable to monitor and broadcast the changes.  Up to 8 waits can be
registered in a single run, which should be plenty enough.  Waits can be
monitored in pg_stat_activity, based on the injection point name which
is registered in a custom wait event under the "Extension" category.

The shared memory state used by the module is registered using the DSM
registry, and is optional, so there is no need to load the module with
shared_preload_libraries to be able to use these features.

Author: Michael Paquier
Reviewed-by: Andrey Borodin, Bertrand Drouvot
Discussion: https://postgr.es/m/[email protected]

src/test/modules/injection_points/injection_points--1.0.sql
src/test/modules/injection_points/injection_points.c
src/tools/pgindent/typedefs.list

index 5944c41716446e29e064f7a5729ae51d1beeac77..0a2e59aba7936a5a05743dfd00e367f4cb4a2f05 100644 (file)
@@ -24,6 +24,16 @@ RETURNS void
 AS 'MODULE_PATHNAME', 'injection_points_run'
 LANGUAGE C STRICT PARALLEL UNSAFE;
 
+--
+-- injection_points_wakeup()
+--
+-- Wakes up a waiting injection point.
+--
+CREATE FUNCTION injection_points_wakeup(IN point_name TEXT)
+RETURNS void
+AS 'MODULE_PATHNAME', 'injection_points_wakeup'
+LANGUAGE C STRICT PARALLEL UNSAFE;
+
 --
 -- injection_points_detach()
 --
index e843e6594feb73042c50998d28461a04f850fbdd..7f52d758c5d9eef1341add97ccf21853d3ef55a4 100644 (file)
 #include "postgres.h"
 
 #include "fmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "storage/dsm_registry.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
+/* Maximum number of waits usable in injection points at once */
+#define INJ_MAX_WAIT   8
+#define INJ_NAME_MAXLEN    64
+
+/* Shared state information for injection points. */
+typedef struct InjectionPointSharedState
+{
+   /* Protects access to other fields */
+   slock_t     lock;
+
+   /* Counters advancing when injection_points_wakeup() is called */
+   uint32      wait_counts[INJ_MAX_WAIT];
+
+   /* Names of injection points attached to wait counters */
+   char        name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
+
+   /* Condition variable used for waits and wakeups */
+   ConditionVariable wait_point;
+} InjectionPointSharedState;
+
+/* Pointer to shared-memory state. */
+static InjectionPointSharedState *inj_state = NULL;
+
 extern PGDLLEXPORT void injection_error(const char *name);
 extern PGDLLEXPORT void injection_notice(const char *name);
+extern PGDLLEXPORT void injection_wait(const char *name);
+
+
+/*
+ * Callback for shared memory area initialization.
+ */
+static void
+injection_point_init_state(void *ptr)
+{
+   InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
+
+   SpinLockInit(&state->lock);
+   memset(state->wait_counts, 0, sizeof(state->wait_counts));
+   memset(state->name, 0, sizeof(state->name));
+   ConditionVariableInit(&state->wait_point);
+}
+
+/*
+ * Initialize shared memory area for this module.
+ */
+static void
+injection_init_shmem(void)
+{
+   bool        found;
 
+   if (inj_state != NULL)
+       return;
+
+   inj_state = GetNamedDSMSegment("injection_points",
+                                  sizeof(InjectionPointSharedState),
+                                  injection_point_init_state,
+                                  &found);
+}
 
 /* Set of callbacks available to be attached to an injection point. */
 void
@@ -43,6 +100,66 @@ injection_notice(const char *name)
    elog(NOTICE, "notice triggered for injection point %s", name);
 }
 
+/* Wait on a condition variable, awaken by injection_points_wakeup() */
+void
+injection_wait(const char *name)
+{
+   uint32      old_wait_counts = 0;
+   int         index = -1;
+   uint32      injection_wait_event = 0;
+
+   if (inj_state == NULL)
+       injection_init_shmem();
+
+   /*
+    * Use the injection point name for this custom wait event.  Note that
+    * this custom wait event name is not released, but we don't care much for
+    * testing as this should be short-lived.
+    */
+   injection_wait_event = WaitEventExtensionNew(name);
+
+   /*
+    * Find a free slot to wait for, and register this injection point's name.
+    */
+   SpinLockAcquire(&inj_state->lock);
+   for (int i = 0; i < INJ_MAX_WAIT; i++)
+   {
+       if (inj_state->name[i][0] == '\0')
+       {
+           index = i;
+           strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
+           old_wait_counts = inj_state->wait_counts[i];
+           break;
+       }
+   }
+   SpinLockRelease(&inj_state->lock);
+
+   if (index < 0)
+       elog(ERROR, "could not find free slot for wait of injection point %s ",
+            name);
+
+   /* And sleep.. */
+   ConditionVariablePrepareToSleep(&inj_state->wait_point);
+   for (;;)
+   {
+       uint32      new_wait_counts;
+
+       SpinLockAcquire(&inj_state->lock);
+       new_wait_counts = inj_state->wait_counts[index];
+       SpinLockRelease(&inj_state->lock);
+
+       if (old_wait_counts != new_wait_counts)
+           break;
+       ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
+   }
+   ConditionVariableCancelSleep();
+
+   /* Remove this injection point from the waiters. */
+   SpinLockAcquire(&inj_state->lock);
+   inj_state->name[index][0] = '\0';
+   SpinLockRelease(&inj_state->lock);
+}
+
 /*
  * SQL function for creating an injection point.
  */
@@ -58,6 +175,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
        function = "injection_error";
    else if (strcmp(action, "notice") == 0)
        function = "injection_notice";
+   else if (strcmp(action, "wait") == 0)
+       function = "injection_wait";
    else
        elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
 
@@ -80,6 +199,42 @@ injection_points_run(PG_FUNCTION_ARGS)
    PG_RETURN_VOID();
 }
 
+/*
+ * SQL function for waking up an injection point waiting in injection_wait().
+ */
+PG_FUNCTION_INFO_V1(injection_points_wakeup);
+Datum
+injection_points_wakeup(PG_FUNCTION_ARGS)
+{
+   char       *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+   int         index = -1;
+
+   if (inj_state == NULL)
+       injection_init_shmem();
+
+   /* First bump the wait counter for the injection point to wake up */
+   SpinLockAcquire(&inj_state->lock);
+   for (int i = 0; i < INJ_MAX_WAIT; i++)
+   {
+       if (strcmp(name, inj_state->name[i]) == 0)
+       {
+           index = i;
+           break;
+       }
+   }
+   if (index < 0)
+   {
+       SpinLockRelease(&inj_state->lock);
+       elog(ERROR, "could not find injection point %s to wake up", name);
+   }
+   inj_state->wait_counts[index]++;
+   SpinLockRelease(&inj_state->lock);
+
+   /* And broadcast the change to the waiters */
+   ConditionVariableBroadcast(&inj_state->wait_point);
+   PG_RETURN_VOID();
+}
+
 /*
  * SQL function for dropping an injection point.
  */
index ee40a341d3f92cb92f8f126265e82dd6ac16ba7b..782b7d7b1c81143b3f4b0b45513bd0825033e6a7 100644 (file)
@@ -1210,6 +1210,7 @@ InitializeDSMForeignScan_function
 InitializeWorkerForeignScan_function
 InjectionPointCacheEntry
 InjectionPointEntry
+InjectionPointSharedState
 InlineCodeBlock
 InsertStmt
 Instrumentation