aio: Add io_method=worker
authorAndres Freund <[email protected]>
Tue, 18 Mar 2025 14:52:33 +0000 (10:52 -0400)
committerAndres Freund <[email protected]>
Tue, 18 Mar 2025 15:54:01 +0000 (11:54 -0400)
The previous commit introduced the infrastructure to start io_workers. This
commit actually makes the workers execute IOs.

IO workers consume IOs from a shared memory submission queue, run traditional
synchronous system calls, and perform the shared completion handling
immediately.  Client code submits most requests by pushing IOs into the
submission queue, and waits (if necessary) using condition variables.  Some
IOs cannot be performed in another process due to lack of infrastructure for
reopening the file, and must processed synchronously by the client code when
submitted.

For now the default io_method is changed to "worker". We should re-evaluate
that around beta1, we might want to be careful and set the default to "sync"
for 18.

Reviewed-by: Noah Misch <[email protected]>
Co-authored-by: Thomas Munro <[email protected]>
Co-authored-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344[email protected]
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m

doc/src/sgml/config.sgml
src/backend/storage/aio/aio.c
src/backend/storage/aio/aio_init.c
src/backend/storage/aio/method_worker.c
src/backend/utils/activity/wait_event_names.txt
src/backend/utils/misc/postgresql.conf.sample
src/include/storage/aio.h
src/include/storage/aio_internal.h
src/include/storage/lwlocklist.h
src/tools/pgindent/typedefs.list

index c749bc0631af298d577ca497cb11ae169afe0fad..cd8891427738460217b4986a3d85727bce446102 100644 (file)
@@ -2676,6 +2676,11 @@ include_dir 'conf.d'
          Selects the method for executing asynchronous I/O.
          Possible values are:
          <itemizedlist>
+          <listitem>
+           <para>
+            <literal>worker</literal> (execute asynchronous I/O using worker processes)
+           </para>
+          </listitem>
           <listitem>
            <para>
             <literal>sync</literal> (execute asynchronous-eligible I/O synchronously)
index 4d5439c73fd184e5a52819c5802f77287f89a350..3ed4b1dfdac794c04a2a485bc13b47ea0a59c952 100644 (file)
@@ -64,6 +64,7 @@ static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
 /* Options for io_method. */
 const struct config_enum_entry io_method_options[] = {
    {"sync", IOMETHOD_SYNC, false},
+   {"worker", IOMETHOD_WORKER, false},
    {NULL, 0, false}
 };
 
@@ -80,6 +81,7 @@ PgAioBackend *pgaio_my_backend;
 
 static const IoMethodOps *const pgaio_method_ops_table[] = {
    [IOMETHOD_SYNC] = &pgaio_sync_ops,
+   [IOMETHOD_WORKER] = &pgaio_worker_ops,
 };
 
 /* callbacks for the configured io_method, set by assign_io_method */
index 6fe55510faee4997393e3c1f9d3a11f1684b95f4..4e405ce7ca8d840d29aa09316363817ae58f0726 100644 (file)
@@ -18,6 +18,7 @@
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
 #include "storage/aio_subsys.h"
+#include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
@@ -39,6 +40,11 @@ AioCtlShmemSize(void)
 static uint32
 AioProcs(void)
 {
+   /*
+    * While AIO workers don't need their own AIO context, we can't currently
+    * guarantee nothing gets assigned to the a ProcNumber for an IO worker if
+    * we just subtracted MAX_IO_WORKERS.
+    */
    return MaxBackends + NUM_AUXILIARY_PROCS;
 }
 
@@ -223,6 +229,9 @@ pgaio_init_backend(void)
    /* shouldn't be initialized twice */
    Assert(!pgaio_my_backend);
 
+   if (MyBackendType == B_IO_WORKER)
+       return;
+
    if (MyProc == NULL || MyProcNumber >= AioProcs())
        elog(ERROR, "aio requires a normal PGPROC");
 
index 0ef9ef93e2bcaf9fbb4adfef2720efcdb26c0893..b6fbcc68bb10cc3906e77b2558bc5f4658d20a85 100644 (file)
@@ -3,6 +3,21 @@
  * method_worker.c
  *    AIO - perform AIO using worker processes
  *
+ * IO workers consume IOs from a shared memory submission queue, run
+ * traditional synchronous system calls, and perform the shared completion
+ * handling immediately.  Client code submits most requests by pushing IOs
+ * into the submission queue, and waits (if necessary) using condition
+ * variables.  Some IOs cannot be performed in another process due to lack of
+ * infrastructure for reopening the file, and must processed synchronously by
+ * the client code when submitted.
+ *
+ * So that the submitter can make just one system call when submitting a batch
+ * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
+ * could be improved by using futexes instead of latches to wake N waiters.
+ *
+ * This method of AIO is available in all builds on all operating systems, and
+ * is the default.
+ *
  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
 
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "port/pg_bitutils.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
 #include "storage/aio_subsys.h"
 #include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
+#include "utils/ps_status.h"
 #include "utils/wait_event.h"
 
 
+/* How many workers should each worker wake up if needed? */
+#define IO_WORKER_WAKEUP_FANOUT 2
+
+
+typedef struct AioWorkerSubmissionQueue
+{
+   uint32      size;
+   uint32      mask;
+   uint32      head;
+   uint32      tail;
+   uint32      ios[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerSubmissionQueue;
+
+typedef struct AioWorkerSlot
+{
+   Latch      *latch;
+   bool        in_use;
+} AioWorkerSlot;
+
+typedef struct AioWorkerControl
+{
+   uint64      idle_worker_mask;
+   AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerControl;
+
+
+static size_t pgaio_worker_shmem_size(void);
+static void pgaio_worker_shmem_init(bool first_time);
+
+static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
+static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
+
+
+const IoMethodOps pgaio_worker_ops = {
+   .shmem_size = pgaio_worker_shmem_size,
+   .shmem_init = pgaio_worker_shmem_init,
+
+   .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
+   .submit = pgaio_worker_submit,
+};
+
+
 /* GUCs */
 int            io_workers = 3;
 
 
+static int io_worker_queue_size = 64;
+static int MyIoWorkerId;
+static AioWorkerSubmissionQueue *io_worker_submission_queue;
+static AioWorkerControl *io_worker_control;
+
+
+static size_t
+pgaio_worker_queue_shmem_size(int *queue_size)
+{
+   /* Round size up to next power of two so we can make a mask. */
+   *queue_size = pg_nextpower2_32(io_worker_queue_size);
+
+   return offsetof(AioWorkerSubmissionQueue, ios) +
+       sizeof(uint32) * *queue_size;
+}
+
+static size_t
+pgaio_worker_control_shmem_size(void)
+{
+   return offsetof(AioWorkerControl, workers) +
+       sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
+}
+
+static size_t
+pgaio_worker_shmem_size(void)
+{
+   size_t      sz;
+   int         queue_size;
+
+   sz = pgaio_worker_queue_shmem_size(&queue_size);
+   sz = add_size(sz, pgaio_worker_control_shmem_size());
+
+   return sz;
+}
+
+static void
+pgaio_worker_shmem_init(bool first_time)
+{
+   bool        found;
+   int         queue_size;
+
+   io_worker_submission_queue =
+       ShmemInitStruct("AioWorkerSubmissionQueue",
+                       pgaio_worker_queue_shmem_size(&queue_size),
+                       &found);
+   if (!found)
+   {
+       io_worker_submission_queue->size = queue_size;
+       io_worker_submission_queue->head = 0;
+       io_worker_submission_queue->tail = 0;
+   }
+
+   io_worker_control =
+       ShmemInitStruct("AioWorkerControl",
+                       pgaio_worker_control_shmem_size(),
+                       &found);
+   if (!found)
+   {
+       io_worker_control->idle_worker_mask = 0;
+       for (int i = 0; i < MAX_IO_WORKERS; ++i)
+       {
+           io_worker_control->workers[i].latch = NULL;
+           io_worker_control->workers[i].in_use = false;
+       }
+   }
+}
+
+static int
+pgaio_choose_idle_worker(void)
+{
+   int         worker;
+
+   if (io_worker_control->idle_worker_mask == 0)
+       return -1;
+
+   /* Find the lowest bit position, and clear it. */
+   worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
+   io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+
+   return worker;
+}
+
+static bool
+pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
+{
+   AioWorkerSubmissionQueue *queue;
+   uint32      new_head;
+
+   queue = io_worker_submission_queue;
+   new_head = (queue->head + 1) & (queue->size - 1);
+   if (new_head == queue->tail)
+   {
+       pgaio_debug(DEBUG3, "io queue is full, at %u elements",
+                   io_worker_submission_queue->size);
+       return false;           /* full */
+   }
+
+   queue->ios[queue->head] = pgaio_io_get_id(ioh);
+   queue->head = new_head;
+
+   return true;
+}
+
+static uint32
+pgaio_worker_submission_queue_consume(void)
+{
+   AioWorkerSubmissionQueue *queue;
+   uint32      result;
+
+   queue = io_worker_submission_queue;
+   if (queue->tail == queue->head)
+       return UINT32_MAX;      /* empty */
+
+   result = queue->ios[queue->tail];
+   queue->tail = (queue->tail + 1) & (queue->size - 1);
+
+   return result;
+}
+
+static uint32
+pgaio_worker_submission_queue_depth(void)
+{
+   uint32      head;
+   uint32      tail;
+
+   head = io_worker_submission_queue->head;
+   tail = io_worker_submission_queue->tail;
+
+   if (tail > head)
+       head += io_worker_submission_queue->size;
+
+   Assert(head >= tail);
+
+   return head - tail;
+}
+
+static bool
+pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
+{
+   return
+       !IsUnderPostmaster
+       || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
+       || !pgaio_io_can_reopen(ioh);
+}
+
+static void
+pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+{
+   PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+   int         nsync = 0;
+   Latch      *wakeup = NULL;
+   int         worker;
+
+   Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+   LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+   for (int i = 0; i < nios; ++i)
+   {
+       Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
+       if (!pgaio_worker_submission_queue_insert(ios[i]))
+       {
+           /*
+            * We'll do it synchronously, but only after we've sent as many as
+            * we can to workers, to maximize concurrency.
+            */
+           synchronous_ios[nsync++] = ios[i];
+           continue;
+       }
+
+       if (wakeup == NULL)
+       {
+           /* Choose an idle worker to wake up if we haven't already. */
+           worker = pgaio_choose_idle_worker();
+           if (worker >= 0)
+               wakeup = io_worker_control->workers[worker].latch;
+
+           pgaio_debug_io(DEBUG4, ios[i],
+                          "choosing worker %d",
+                          worker);
+       }
+   }
+   LWLockRelease(AioWorkerSubmissionQueueLock);
+
+   if (wakeup)
+       SetLatch(wakeup);
+
+   /* Run whatever is left synchronously. */
+   if (nsync > 0)
+   {
+       for (int i = 0; i < nsync; ++i)
+       {
+           pgaio_io_perform_synchronously(synchronous_ios[i]);
+       }
+   }
+}
+
+static int
+pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
+{
+   for (int i = 0; i < num_staged_ios; i++)
+   {
+       PgAioHandle *ioh = staged_ios[i];
+
+       pgaio_io_prepare_submit(ioh);
+   }
+
+   pgaio_worker_submit_internal(num_staged_ios, staged_ios);
+
+   return num_staged_ios;
+}
+
+/*
+ * on_shmem_exit() callback that releases the worker's slot in
+ * io_worker_control.
+ */
+static void
+pgaio_worker_die(int code, Datum arg)
+{
+   LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+   Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+   Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+   io_worker_control->workers[MyIoWorkerId].in_use = false;
+   io_worker_control->workers[MyIoWorkerId].latch = NULL;
+   LWLockRelease(AioWorkerSubmissionQueueLock);
+}
+
+/*
+ * Register the worker in shared memory, assign MyWorkerId and register a
+ * shutdown callback to release registration.
+ */
+static void
+pgaio_worker_register(void)
+{
+   MyIoWorkerId = -1;
+
+   /*
+    * XXX: This could do with more fine-grained locking. But it's also not
+    * very common for the number of workers to change at the moment...
+    */
+   LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+
+   for (int i = 0; i < MAX_IO_WORKERS; ++i)
+   {
+       if (!io_worker_control->workers[i].in_use)
+       {
+           Assert(io_worker_control->workers[i].latch == NULL);
+           io_worker_control->workers[i].in_use = true;
+           MyIoWorkerId = i;
+           break;
+       }
+       else
+           Assert(io_worker_control->workers[i].latch != NULL);
+   }
+
+   if (MyIoWorkerId == -1)
+       elog(ERROR, "couldn't find a free worker slot");
+
+   io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+   io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
+   LWLockRelease(AioWorkerSubmissionQueueLock);
+
+   on_shmem_exit(pgaio_worker_die, 0);
+}
+
 void
 IoWorkerMain(const void *startup_data, size_t startup_data_len)
 {
    sigjmp_buf  local_sigjmp_buf;
+   PgAioHandle *volatile error_ioh = NULL;
+   volatile int error_errno = 0;
+   char        cmd[128];
 
    MyBackendType = B_IO_WORKER;
    AuxiliaryProcessMainCommon();
@@ -53,6 +382,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
    pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
 
+   /* also registers a shutdown callback to unregister */
+   pgaio_worker_register();
+
+   sprintf(cmd, "io worker: %d", MyIoWorkerId);
+   set_ps_display(cmd);
+
    /* see PostgresMain() */
    if (sigsetjmp(local_sigjmp_buf, 1) != 0)
    {
@@ -61,6 +396,27 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 
        EmitErrorReport();
 
+       /*
+        * In the - very unlikely - case that the IO failed in a way that
+        * raises an error we need to mark the IO as failed.
+        *
+        * Need to do just enough error recovery so that we can mark the IO as
+        * failed and then exit (postmaster will start a new worker).
+        */
+       LWLockReleaseAll();
+
+       if (error_ioh != NULL)
+       {
+           /* should never fail without setting error_errno */
+           Assert(error_errno != 0);
+
+           errno = error_errno;
+
+           START_CRIT_SECTION();
+           pgaio_io_process_completion(error_ioh, -error_errno);
+           END_CRIT_SECTION();
+       }
+
        proc_exit(1);
    }
 
@@ -71,9 +427,89 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 
    while (!ShutdownRequestPending)
    {
-       WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
-                 WAIT_EVENT_IO_WORKER_MAIN);
-       ResetLatch(MyLatch);
+       uint32      io_index;
+       Latch      *latches[IO_WORKER_WAKEUP_FANOUT];
+       int         nlatches = 0;
+       int         nwakeups = 0;
+       int         worker;
+
+       /* Try to get a job to do. */
+       LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+       if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+       {
+           /*
+            * Nothing to do.  Mark self idle.
+            *
+            * XXX: Invent some kind of back pressure to reduce useless
+            * wakeups?
+            */
+           io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+       }
+       else
+       {
+           /* Got one.  Clear idle flag. */
+           io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+
+           /* See if we can wake up some peers. */
+           nwakeups = Min(pgaio_worker_submission_queue_depth(),
+                          IO_WORKER_WAKEUP_FANOUT);
+           for (int i = 0; i < nwakeups; ++i)
+           {
+               if ((worker = pgaio_choose_idle_worker()) < 0)
+                   break;
+               latches[nlatches++] = io_worker_control->workers[worker].latch;
+           }
+       }
+       LWLockRelease(AioWorkerSubmissionQueueLock);
+
+       for (int i = 0; i < nlatches; ++i)
+           SetLatch(latches[i]);
+
+       if (io_index != UINT32_MAX)
+       {
+           PgAioHandle *ioh = NULL;
+
+           ioh = &pgaio_ctl->io_handles[io_index];
+           error_ioh = ioh;
+
+           pgaio_debug_io(DEBUG4, ioh,
+                          "worker %d processing IO",
+                          MyIoWorkerId);
+
+           /*
+            * It's very unlikely, but possible, that reopen fails. E.g. due
+            * to memory allocations failing or file permissions changing or
+            * such.  In that case we need to fail the IO.
+            *
+            * There's not really a good errno we can report here.
+            */
+           error_errno = ENOENT;
+           pgaio_io_reopen(ioh);
+
+           /*
+            * To be able to exercise the reopen-fails path, allow injection
+            * points to trigger a failure at this point.
+            */
+           pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN");
+
+           error_errno = 0;
+           error_ioh = NULL;
+
+           /*
+            * We don't expect this to ever fail with ERROR or FATAL, no need
+            * to keep error_ioh set to the IO.
+            * pgaio_io_perform_synchronously() contains a critical section to
+            * ensure we don't accidentally fail.
+            */
+           pgaio_io_perform_synchronously(ioh);
+       }
+       else
+       {
+           WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+                     WAIT_EVENT_IO_WORKER_MAIN);
+           ResetLatch(MyLatch);
+       }
+
        CHECK_FOR_INTERRUPTS();
    }
 
@@ -83,6 +519,5 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 bool
 pgaio_workers_enabled(void)
 {
-   /* placeholder for future commit */
-   return false;
+   return io_method == IOMETHOD_WORKER;
 }
index 3f6dc3876b45820fdfd95e325a76e4bb86e84d27..9fa12a555e83b0f8d2f1a6e7f06b80879ebf0050 100644 (file)
@@ -348,6 +348,7 @@ WALSummarizer   "Waiting to read or update WAL summarization state."
 DSMRegistry    "Waiting to read or update the dynamic shared memory registry."
 InjectionPoint "Waiting to read or update information related to injection points."
 SerialControl  "Waiting to read or update shared <filename>pg_serial</filename> state."
+AioWorkerSubmissionQueue   "Waiting to access AIO worker submission queue."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
index 7d0bf1dc006e2c0105aa36a694eb97abdadf0efb..4c55d0c13831d193c77a4e79462b986974b2b4df 100644 (file)
 #maintenance_io_concurrency = 16   # 1-1000; 0 disables prefetching
 #io_combine_limit = 128kB      # usually 1-32 blocks (depends on OS)
 
-#io_method = sync          # sync (change requires restart)
+#io_method = worker            # worker, sync (change requires restart)
 #io_max_concurrency = -1       # Max number of IOs that one process
                    # can execute simultaneously
                    # -1 sets based on shared_buffers
index f48a496208915473f2da88b0ced5bc216cc228e5..7b6b7d20a85d672a31c6cfe1900a41942168ff67 100644 (file)
 typedef enum IoMethod
 {
    IOMETHOD_SYNC = 0,
+   IOMETHOD_WORKER,
 } IoMethod;
 
-/* We'll default to synchronous execution. */
-#define DEFAULT_IO_METHOD IOMETHOD_SYNC
+/* We'll default to worker based execution. */
+#define DEFAULT_IO_METHOD IOMETHOD_WORKER
 
 
 /*
index 0ba3c4f1476b7c75c5d2c1225eea8643771c2f99..108fe61c7b4483507f8b0542756d8986a3aa3a93 100644 (file)
@@ -385,6 +385,7 @@ extern PgAioHandle *pgaio_inj_io_get(void);
 
 /* Declarations for the tables of function pointers exposed by each IO method. */
 extern PGDLLIMPORT const IoMethodOps pgaio_sync_ops;
+extern PGDLLIMPORT const IoMethodOps pgaio_worker_ops;
 
 extern PGDLLIMPORT const IoMethodOps *pgaio_method_ops;
 extern PGDLLIMPORT PgAioCtl *pgaio_ctl;
index cf565452382be392f7db53e4baacb3bd27d133c5..932024b1b0ba5f1c10f5006aa7b999631b88aa53 100644 (file)
@@ -83,3 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
 PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
+PG_LWLOCK(53, AioWorkerSubmissionQueue)
index c04a47cf222b8dfb865ae2b4800842e254dc40c9..bfa276d2d355a6edc01221ff6defde5177ff8da1 100644 (file)
@@ -55,6 +55,9 @@ AggStrategy
 AggTransInfo
 Aggref
 AggregateInstrumentation
+AioWorkerControl
+AioWorkerSlot
+AioWorkerSubmissionQueue
 AlenState
 Alias
 AllocBlock