return num_staged_ios;
}
+static void
+pgaio_uring_completion_error_callback(void *arg)
+{
+ ProcNumber owner;
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioHandle *ioh = arg;
+
+ if (!ioh)
+ return;
+
+ /* No need for context if a backend is completing the IO for itself */
+ if (ioh->owner_procno == MyProcNumber)
+ return;
+
+ owner = ioh->owner_procno;
+ owner_proc = GetPGProcByNumber(owner);
+ owner_pid = owner_proc->pid;
+
+ errcontext("completing I/O on behalf of process %d", owner_pid);
+}
+
static void
pgaio_uring_drain_locked(PgAioUringContext *context)
{
int ready;
int orig_ready;
+ ErrorContextCallback errcallback = {0};
Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE));
+ errcallback.callback = pgaio_uring_completion_error_callback;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
/*
* Don't drain more events than available right now. Otherwise it's
* plausible that one backend could get stuck, for a while, receiving CQEs
PgAioHandle *ioh;
ioh = io_uring_cqe_get_data(cqe);
+ errcallback.arg = ioh;
io_uring_cqe_seen(&context->io_uring_ring, cqe);
pgaio_io_process_completion(ioh, cqe->res);
+ errcallback.arg = NULL;
}
END_CRIT_SECTION();
"drained %d/%d, now expecting %d",
ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
}
+
+ error_context_stack = errcallback.previous;
}
static void
on_shmem_exit(pgaio_worker_die, 0);
}
+static void
+pgaio_worker_error_callback(void *arg)
+{
+ ProcNumber owner;
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioHandle *ioh = arg;
+
+ if (!ioh)
+ return;
+
+ Assert(ioh->owner_procno != MyProcNumber);
+ Assert(MyBackendType == B_IO_WORKER);
+
+ owner = ioh->owner_procno;
+ owner_proc = GetPGProcByNumber(owner);
+ owner_pid = owner_proc->pid;
+
+ errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
PgAioHandle *volatile error_ioh = NULL;
+ ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
sprintf(cmd, "%d", MyIoWorkerId);
set_ps_display(cmd);
+ errcallback.callback = pgaio_worker_error_callback;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
/* see PostgresMain() */
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
+ errcallback.arg = ioh;
pgaio_debug_io(DEBUG4, ioh,
"worker %d processing IO",
pgaio_io_perform_synchronously(ioh);
RESUME_INTERRUPTS();
+ errcallback.arg = NULL;
}
else
{
CHECK_FOR_INTERRUPTS();
}
+ error_context_stack = errcallback.previous;
proc_exit(0);
}