aio: Add errcontext for processing I/Os for another backend
authorMelanie Plageman <[email protected]>
Tue, 1 Apr 2025 23:53:07 +0000 (19:53 -0400)
committerMelanie Plageman <[email protected]>
Tue, 1 Apr 2025 23:53:07 +0000 (19:53 -0400)
Push an ErrorContextCallback adding additional detail about the process
performing the I/O and the owner of the I/O when those are not the same.

For io_method worker, this adds context specifying which process owns
the I/O that the I/O worker is processing.

For io_method io_uring, this adds context only when a backend is
*completing* I/O for another backend. It specifies the pid of the owning
process.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/rdml3fpukrqnas7qc5uimtl2fyytrnu6ymc2vjf2zuflbsjuul%40hyizyjsexwmm

src/backend/storage/aio/method_io_uring.c
src/backend/storage/aio/method_worker.c

index 0bcdab14ae7e3e2ae7784d752a3dd6babc55f2e6..c719ba2727a813aed58563f3da17f723d8a59387 100644 (file)
@@ -302,14 +302,41 @@ pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
    return num_staged_ios;
 }
 
+static void
+pgaio_uring_completion_error_callback(void *arg)
+{
+   ProcNumber  owner;
+   PGPROC     *owner_proc;
+   int32       owner_pid;
+   PgAioHandle *ioh = arg;
+
+   if (!ioh)
+       return;
+
+   /* No need for context if a backend is completing the IO for itself */
+   if (ioh->owner_procno == MyProcNumber)
+       return;
+
+   owner = ioh->owner_procno;
+   owner_proc = GetPGProcByNumber(owner);
+   owner_pid = owner_proc->pid;
+
+   errcontext("completing I/O on behalf of process %d", owner_pid);
+}
+
 static void
 pgaio_uring_drain_locked(PgAioUringContext *context)
 {
    int         ready;
    int         orig_ready;
+   ErrorContextCallback errcallback = {0};
 
    Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE));
 
+   errcallback.callback = pgaio_uring_completion_error_callback;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
    /*
     * Don't drain more events than available right now. Otherwise it's
     * plausible that one backend could get stuck, for a while, receiving CQEs
@@ -337,9 +364,11 @@ pgaio_uring_drain_locked(PgAioUringContext *context)
            PgAioHandle *ioh;
 
            ioh = io_uring_cqe_get_data(cqe);
+           errcallback.arg = ioh;
            io_uring_cqe_seen(&context->io_uring_ring, cqe);
 
            pgaio_io_process_completion(ioh, cqe->res);
+           errcallback.arg = NULL;
        }
 
        END_CRIT_SECTION();
@@ -348,6 +377,8 @@ pgaio_uring_drain_locked(PgAioUringContext *context)
                    "drained %d/%d, now expecting %d",
                    ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
    }
+
+   error_context_stack = errcallback.previous;
 }
 
 static void
index 4a7853d13fac987efb12109cc23b2c8869185ff9..31d94ac82c54039f1d0a205b8e6743003f6d66b6 100644 (file)
@@ -357,11 +357,33 @@ pgaio_worker_register(void)
    on_shmem_exit(pgaio_worker_die, 0);
 }
 
+static void
+pgaio_worker_error_callback(void *arg)
+{
+   ProcNumber  owner;
+   PGPROC     *owner_proc;
+   int32       owner_pid;
+   PgAioHandle *ioh = arg;
+
+   if (!ioh)
+       return;
+
+   Assert(ioh->owner_procno != MyProcNumber);
+   Assert(MyBackendType == B_IO_WORKER);
+
+   owner = ioh->owner_procno;
+   owner_proc = GetPGProcByNumber(owner);
+   owner_pid = owner_proc->pid;
+
+   errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
+}
+
 void
 IoWorkerMain(const void *startup_data, size_t startup_data_len)
 {
    sigjmp_buf  local_sigjmp_buf;
    PgAioHandle *volatile error_ioh = NULL;
+   ErrorContextCallback errcallback = {0};
    volatile int error_errno = 0;
    char        cmd[128];
 
@@ -388,6 +410,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
    sprintf(cmd, "%d", MyIoWorkerId);
    set_ps_display(cmd);
 
+   errcallback.callback = pgaio_worker_error_callback;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
    /* see PostgresMain() */
    if (sigsetjmp(local_sigjmp_buf, 1) != 0)
    {
@@ -471,6 +497,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 
            ioh = &pgaio_ctl->io_handles[io_index];
            error_ioh = ioh;
+           errcallback.arg = ioh;
 
            pgaio_debug_io(DEBUG4, ioh,
                           "worker %d processing IO",
@@ -511,6 +538,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
            pgaio_io_perform_synchronously(ioh);
 
            RESUME_INTERRUPTS();
+           errcallback.arg = NULL;
        }
        else
        {
@@ -522,6 +550,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
        CHECK_FOR_INTERRUPTS();
    }
 
+   error_context_stack = errcallback.previous;
    proc_exit(0);
 }