aio: callbackify IO action type specific handling, starting with retry.
authorAndres Freund <[email protected]>
Tue, 12 Jan 2021 01:14:04 +0000 (17:14 -0800)
committerAndres Freund <[email protected]>
Tue, 12 Jan 2021 01:14:04 +0000 (17:14 -0800)
src/backend/storage/ipc/aio.c

index bdded4399c252c8e5d25bdd36e8a31eeb16d5d36..9b19e92f513ab5ad9b1e99582d1cb8234c23b74c 100644 (file)
@@ -132,9 +132,6 @@ typedef enum PgAioInProgressFlags
 
 } PgAioInProgressFlags;
 
-/* IO completion callback */
-typedef bool (*PgAioCompletedCB)(PgAioInProgress *io);
-
 typedef uint16 PgAioIPFlags;
 
 struct PgAioInProgress
@@ -475,35 +472,86 @@ static int __sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complet
                                unsigned flags, sigset_t *sig);
 
 /* io completions */
-static bool pgaio_complete_nop(PgAioInProgress *io);
-static bool pgaio_complete_fsync(PgAioInProgress *io);
-static bool pgaio_complete_fsync_wal(PgAioInProgress *io);
-static bool pgaio_complete_flush_range(PgAioInProgress *io);
-static bool pgaio_complete_read_buffer(PgAioInProgress *io);
-static bool pgaio_complete_write_buffer(PgAioInProgress *io);
-static bool pgaio_complete_write_wal(PgAioInProgress *io);
-static bool pgaio_complete_write_generic(PgAioInProgress *io);
-
+static bool pgaio_nop_complete(PgAioInProgress *io);
+static bool pgaio_fsync_complete(PgAioInProgress *io);
+static bool pgaio_fsync_wal_complete(PgAioInProgress *io);
+static bool pgaio_flush_range_complete(PgAioInProgress *io);
+static bool pgaio_read_buffer_complete(PgAioInProgress *io);
+static void pgaio_read_buffer_retry(PgAioInProgress *io);
+static bool pgaio_write_buffer_complete(PgAioInProgress *io);
+static void pgaio_write_buffer_retry(PgAioInProgress *io);
+static bool pgaio_write_wal_complete(PgAioInProgress *io);
+static bool pgaio_write_generic_complete(PgAioInProgress *io);
 
-static MemoryContext aio_retry_context;
 
 /*
+ * Implementation of different AIO actions.
+ *
  * To support EXEC_BACKEND environments, where we cannot rely on callback
- * addresses being equivalent across processes, completion actions are just
- * indices into a process local array of callbacks, indexed by the type of
- * action.  Also makes the shared memory entries a bit smaller, but that's not
- * a huge win.
+ * addresses being equivalent across processes, PgAioInProgress does not point
+ * directly to the type's PgAioActionCBs, but contains an index instead.
+ */
+
+
+/*
+ * IO completion callback.
+ */
+typedef bool (*PgAioCompletedCB)(PgAioInProgress *io);
+
+/*
+ * IO retry callback.
  */
-static const PgAioCompletedCB completion_callbacks[] =
-{
-   [PGAIO_NOP] = pgaio_complete_nop,
-   [PGAIO_FSYNC] = pgaio_complete_fsync,
-   [PGAIO_FSYNC_WAL] = pgaio_complete_fsync_wal,
-   [PGAIO_FLUSH_RANGE] = pgaio_complete_flush_range,
-   [PGAIO_READ_BUFFER] = pgaio_complete_read_buffer,
-   [PGAIO_WRITE_BUFFER] = pgaio_complete_write_buffer,
-   [PGAIO_WRITE_WAL] = pgaio_complete_write_wal,
-   [PGAIO_WRITE_GENERIC] = pgaio_complete_write_generic,
+typedef void (*PgAioRetryCB)(PgAioInProgress *io);
+
+typedef struct PgAioActionCBs
+{
+   PgAioRetryCB retry;
+   PgAioCompletedCB complete;
+} PgAioActionCBs;
+
+static const PgAioActionCBs io_action_cbs[] =
+{
+   [PGAIO_NOP] =
+   {
+       .complete = pgaio_nop_complete
+   },
+
+   [PGAIO_FSYNC] =
+   {
+       .complete = pgaio_fsync_complete
+   },
+
+   [PGAIO_FSYNC_WAL] =
+   {
+       .complete = pgaio_fsync_wal_complete
+   },
+
+   [PGAIO_FLUSH_RANGE] =
+   {
+       .complete = pgaio_flush_range_complete
+   },
+
+   [PGAIO_READ_BUFFER] =
+   {
+       .retry = pgaio_read_buffer_retry,
+       .complete = pgaio_read_buffer_complete
+   },
+
+   [PGAIO_WRITE_BUFFER] =
+   {
+       .retry = pgaio_write_buffer_retry,
+       .complete = pgaio_write_buffer_complete
+   },
+
+   [PGAIO_WRITE_WAL] =
+   {
+       .complete = pgaio_write_wal_complete
+   },
+
+   [PGAIO_WRITE_GENERIC] =
+   {
+       .complete = pgaio_write_generic_complete
+   },
 };
 
 
@@ -732,19 +780,6 @@ pgaio_postmaster_init(void)
    dlist_init(&local_recycle_requests);
 
    // XXX: could create a local queue here.
-
-   /*
-    * Need to be allowed to re-open files during retries. Those can happen,
-    * e.g. when fsyncing WAL, within a critical section. Reopening files
-    * currently requires memory. So create a context with small reservation
-    * that's allowed to be used within a critical section.
-    */
-   aio_retry_context = AllocSetContextCreate(TopMemoryContext,
-                                             "aio retry context",
-                                             1024,
-                                             1024,
-                                             1024);
-   MemoryContextAllowInCriticalSection(aio_retry_context, true);
 }
 
 void
@@ -1085,7 +1120,6 @@ pgaio_complete_ios(bool in_error)
 
        if (!(io->flags & PGAIOIP_SHARED_CALLBACK_CALLED))
        {
-           PgAioCompletedCB cb;
            bool finished;
 
            /*
@@ -1094,8 +1128,7 @@ pgaio_complete_ios(bool in_error)
             */
            *(volatile PgAioIPFlags*) &io->flags |= PGAIOIP_SHARED_CALLBACK_CALLED;
 
-           cb = completion_callbacks[io->type];
-           finished = cb(io);
+           finished = io_action_cbs[io->type].complete(io);
 
            dlist_delete_from(&my_aio->reaped, node);
 
@@ -2093,28 +2126,13 @@ reopen_buffered(const AioBufferTag *tag)
 void
 pgaio_io_retry(PgAioInProgress *io)
 {
-   bool retryable = false;
    bool need_retry;
+   PgAioRetryCB retry_cb = NULL;
 
-   switch (io->type)
-   {
-       case PGAIO_READ_BUFFER:
-           retryable = true;
-           break;
-
-       case PGAIO_WRITE_BUFFER:
-           retryable = true;
-           break;
-
-       default:
-           break;
-   }
+   retry_cb = io_action_cbs[io->type].retry;
 
-   if (!retryable)
-   {
-       elog(WARNING, "non-retryable aio being retried");
-       return;
-   }
+   if (!retry_cb)
+       elog(PANIC, "non-retryable aio being retried");
 
    LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
 
@@ -2155,19 +2173,7 @@ pgaio_io_retry(PgAioInProgress *io)
        return;
    }
 
-   switch (io->type)
-   {
-       case PGAIO_READ_BUFFER:
-           io->d.read_buffer.fd = reopen_buffered(&io->d.read_buffer.tag);
-           break;
-
-       case PGAIO_WRITE_BUFFER:
-           io->d.write_buffer.fd = reopen_buffered(&io->d.write_buffer.tag);
-           break;
-
-       default:
-           break;
-   }
+   retry_cb(io);
 
    dlist_push_tail(&my_aio->pending, &io->io_node);
    my_aio->pending_count++;
@@ -3571,7 +3577,7 @@ pgaio_io_start_fsync_wal(PgAioInProgress *io, int fd, bool barrier, bool datasyn
 }
 
 static bool
-pgaio_complete_nop(PgAioInProgress *io)
+pgaio_nop_complete(PgAioInProgress *io)
 {
 #ifdef PGAIO_VERBOSE
    elog(DEBUG3, "completed nop");
@@ -3581,7 +3587,7 @@ pgaio_complete_nop(PgAioInProgress *io)
 }
 
 static bool
-pgaio_complete_fsync(PgAioInProgress *io)
+pgaio_fsync_complete(PgAioInProgress *io)
 {
 #ifdef PGAIO_VERBOSE
    elog(DEBUG3, "completed fsync: %zu",
@@ -3594,7 +3600,7 @@ pgaio_complete_fsync(PgAioInProgress *io)
 }
 
 static bool
-pgaio_complete_fsync_wal(PgAioInProgress *io)
+pgaio_fsync_wal_complete(PgAioInProgress *io)
 {
 #ifdef PGAIO_VERBOSE
    elog(DEBUG3, "completed fsync_wal: %zu",
@@ -3608,7 +3614,7 @@ pgaio_complete_fsync_wal(PgAioInProgress *io)
 }
 
 static bool
-pgaio_complete_flush_range(PgAioInProgress *io)
+pgaio_flush_range_complete(PgAioInProgress *io)
 {
 #ifdef PGAIO_VERBOSE
    elog(DEBUG3, "completed flush_range: %zu, %s",
@@ -3619,8 +3625,29 @@ pgaio_complete_flush_range(PgAioInProgress *io)
    return true;
 }
 
+/* stringify relfilenode, for debugging use only */
+static char *
+relpath_debug(AioBufferTag *tag)
+{
+   char *path;
+   MemoryContext oldcontext = MemoryContextSwitchTo(ErrorContext);
+
+   path = relpath(tag->rnode,
+                  tag->forkNum);
+
+   MemoryContextSwitchTo(oldcontext);
+
+   return path;
+}
+
+static void
+pgaio_read_buffer_retry(PgAioInProgress *io)
+{
+   io->d.read_buffer.fd = reopen_buffered(&io->d.read_buffer.tag);
+}
+
 static bool
-pgaio_complete_read_buffer(PgAioInProgress *io)
+pgaio_read_buffer_complete(PgAioInProgress *io)
 {
    Buffer      buffer = io->d.read_buffer.buf;
 
@@ -3647,8 +3674,6 @@ pgaio_complete_read_buffer(PgAioInProgress *io)
 
    if (io->result != (io->d.read_buffer.nbytes - io->d.read_buffer.already_done))
    {
-       MemoryContext old_context = MemoryContextSwitchTo(aio_retry_context);
-
        failed = true;
 
        //pgaio_io_print(io, NULL);
@@ -3665,8 +3690,7 @@ pgaio_complete_read_buffer(PgAioInProgress *io)
                        errcode_for_file_access(),
                        errmsg("could not read block %u in file \"%s\": %s",
                               io->d.read_buffer.tag.blockNum,
-                              relpath(io->d.read_buffer.tag.rnode,
-                                      io->d.read_buffer.tag.forkNum),
+                              relpath_debug(&io->d.read_buffer.tag),
                               strerror(-io->result)));
            }
 
@@ -3697,14 +3721,10 @@ pgaio_complete_read_buffer(PgAioInProgress *io)
                    errmsg("aio %zd: could not read block %u in file \"%s\": read only %d of %d bytes (init: %d, cur: %d)",
                           io - aio_ctl->in_progress_io,
                           io->d.read_buffer.tag.blockNum,
-                          relpath(io->d.read_buffer.tag.rnode,
-                                  io->d.read_buffer.tag.forkNum),
+                          relpath_debug(&io->d.read_buffer.tag),
                           io->result, BLCKSZ,
                           io->owner_id, MyProc ? MyProc->pgprocno : INVALID_PGPROCNO));
        }
-
-       MemoryContextSwitchTo(old_context);
-       MemoryContextReset(aio_retry_context);
    }
    else
    {
@@ -3726,8 +3746,14 @@ pgaio_complete_read_buffer(PgAioInProgress *io)
    return done;
 }
 
+static void
+pgaio_write_buffer_retry(PgAioInProgress *io)
+{
+   io->d.write_buffer.fd = reopen_buffered(&io->d.write_buffer.tag);
+}
+
 static bool
-pgaio_complete_write_buffer(PgAioInProgress *io)
+pgaio_write_buffer_complete(PgAioInProgress *io)
 {
    Buffer      buffer = io->d.write_buffer.buf;
 
@@ -3748,8 +3774,6 @@ pgaio_complete_write_buffer(PgAioInProgress *io)
 
    if (io->result != (io->d.write_buffer.nbytes - io->d.write_buffer.already_done))
    {
-       MemoryContext old_context = MemoryContextSwitchTo(aio_retry_context);
-
        failed = true;
 
        if (io->result < 0)
@@ -3783,8 +3807,7 @@ pgaio_complete_write_buffer(PgAioInProgress *io)
                    errmsg("aio %zd: could not write block %u in file \"%s\": %s",
                           io - aio_ctl->in_progress_io,
                           io->d.write_buffer.tag.blockNum,
-                          relpath(io->d.write_buffer.tag.rnode,
-                                  io->d.write_buffer.tag.forkNum),
+                          relpath_debug(&io->d.write_buffer.tag),
                           strerror(-io->result)),
                    errhint("Check free disk space."));
        }
@@ -3801,14 +3824,10 @@ pgaio_complete_write_buffer(PgAioInProgress *io)
                     errmsg("aio %zd: could not write block %u in file \"%s\": wrote only %d of %d bytes (init: %d, cur: %d)",
                            io - aio_ctl->in_progress_io,
                            io->d.write_buffer.tag.blockNum,
-                           relpath(io->d.write_buffer.tag.rnode,
-                                   io->d.write_buffer.tag.forkNum),
+                           relpath_debug(&io->d.write_buffer.tag),
                            io->result, (io->d.write_buffer.nbytes - io->d.write_buffer.already_done),
                            io->owner_id, MyProc ? MyProc->pgprocno : INVALID_PGPROCNO)));
        }
-
-       MemoryContextSwitchTo(old_context);
-       MemoryContextReset(aio_retry_context);
    }
    else
    {
@@ -3827,7 +3846,7 @@ pgaio_complete_write_buffer(PgAioInProgress *io)
 }
 
 static bool
-pgaio_complete_write_wal(PgAioInProgress *io)
+pgaio_write_wal_complete(PgAioInProgress *io)
 {
 #ifdef PGAIO_VERBOSE
    ereport(DEBUG3,
@@ -3867,7 +3886,7 @@ pgaio_complete_write_wal(PgAioInProgress *io)
 
 
 static bool
-pgaio_complete_write_generic(PgAioInProgress *io)
+pgaio_write_generic_complete(PgAioInProgress *io)
 {
 #ifdef PGAIO_VERBOSE
    ereport(DEBUG3,