} PgAioInProgressFlags;
-/* IO completion callback */
-typedef bool (*PgAioCompletedCB)(PgAioInProgress *io);
-
typedef uint16 PgAioIPFlags;
struct PgAioInProgress
unsigned flags, sigset_t *sig);
/* io completions */
-static bool pgaio_complete_nop(PgAioInProgress *io);
-static bool pgaio_complete_fsync(PgAioInProgress *io);
-static bool pgaio_complete_fsync_wal(PgAioInProgress *io);
-static bool pgaio_complete_flush_range(PgAioInProgress *io);
-static bool pgaio_complete_read_buffer(PgAioInProgress *io);
-static bool pgaio_complete_write_buffer(PgAioInProgress *io);
-static bool pgaio_complete_write_wal(PgAioInProgress *io);
-static bool pgaio_complete_write_generic(PgAioInProgress *io);
-
+static bool pgaio_nop_complete(PgAioInProgress *io);
+static bool pgaio_fsync_complete(PgAioInProgress *io);
+static bool pgaio_fsync_wal_complete(PgAioInProgress *io);
+static bool pgaio_flush_range_complete(PgAioInProgress *io);
+static bool pgaio_read_buffer_complete(PgAioInProgress *io);
+static void pgaio_read_buffer_retry(PgAioInProgress *io);
+static bool pgaio_write_buffer_complete(PgAioInProgress *io);
+static void pgaio_write_buffer_retry(PgAioInProgress *io);
+static bool pgaio_write_wal_complete(PgAioInProgress *io);
+static bool pgaio_write_generic_complete(PgAioInProgress *io);
-static MemoryContext aio_retry_context;
/*
+ * Implementation of different AIO actions.
+ *
* To support EXEC_BACKEND environments, where we cannot rely on callback
- * addresses being equivalent across processes, completion actions are just
- * indices into a process local array of callbacks, indexed by the type of
- * action. Also makes the shared memory entries a bit smaller, but that's not
- * a huge win.
+ * addresses being equivalent across processes, PgAioInProgress does not point
+ * directly to the type's PgAioActionCBs, but contains an index instead.
+ */
+
+
+/*
+ * IO completion callback.
+ */
+typedef bool (*PgAioCompletedCB)(PgAioInProgress *io);
+
+/*
+ * IO retry callback.
*/
-static const PgAioCompletedCB completion_callbacks[] =
-{
- [PGAIO_NOP] = pgaio_complete_nop,
- [PGAIO_FSYNC] = pgaio_complete_fsync,
- [PGAIO_FSYNC_WAL] = pgaio_complete_fsync_wal,
- [PGAIO_FLUSH_RANGE] = pgaio_complete_flush_range,
- [PGAIO_READ_BUFFER] = pgaio_complete_read_buffer,
- [PGAIO_WRITE_BUFFER] = pgaio_complete_write_buffer,
- [PGAIO_WRITE_WAL] = pgaio_complete_write_wal,
- [PGAIO_WRITE_GENERIC] = pgaio_complete_write_generic,
+typedef void (*PgAioRetryCB)(PgAioInProgress *io);
+
+typedef struct PgAioActionCBs
+{
+ PgAioRetryCB retry;
+ PgAioCompletedCB complete;
+} PgAioActionCBs;
+
+static const PgAioActionCBs io_action_cbs[] =
+{
+ [PGAIO_NOP] =
+ {
+ .complete = pgaio_nop_complete
+ },
+
+ [PGAIO_FSYNC] =
+ {
+ .complete = pgaio_fsync_complete
+ },
+
+ [PGAIO_FSYNC_WAL] =
+ {
+ .complete = pgaio_fsync_wal_complete
+ },
+
+ [PGAIO_FLUSH_RANGE] =
+ {
+ .complete = pgaio_flush_range_complete
+ },
+
+ [PGAIO_READ_BUFFER] =
+ {
+ .retry = pgaio_read_buffer_retry,
+ .complete = pgaio_read_buffer_complete
+ },
+
+ [PGAIO_WRITE_BUFFER] =
+ {
+ .retry = pgaio_write_buffer_retry,
+ .complete = pgaio_write_buffer_complete
+ },
+
+ [PGAIO_WRITE_WAL] =
+ {
+ .complete = pgaio_write_wal_complete
+ },
+
+ [PGAIO_WRITE_GENERIC] =
+ {
+ .complete = pgaio_write_generic_complete
+ },
};
dlist_init(&local_recycle_requests);
// XXX: could create a local queue here.
-
- /*
- * Need to be allowed to re-open files during retries. Those can happen,
- * e.g. when fsyncing WAL, within a critical section. Reopening files
- * currently requires memory. So create a context with small reservation
- * that's allowed to be used within a critical section.
- */
- aio_retry_context = AllocSetContextCreate(TopMemoryContext,
- "aio retry context",
- 1024,
- 1024,
- 1024);
- MemoryContextAllowInCriticalSection(aio_retry_context, true);
}
void
if (!(io->flags & PGAIOIP_SHARED_CALLBACK_CALLED))
{
- PgAioCompletedCB cb;
bool finished;
/*
*/
*(volatile PgAioIPFlags*) &io->flags |= PGAIOIP_SHARED_CALLBACK_CALLED;
- cb = completion_callbacks[io->type];
- finished = cb(io);
+ finished = io_action_cbs[io->type].complete(io);
dlist_delete_from(&my_aio->reaped, node);
void
pgaio_io_retry(PgAioInProgress *io)
{
- bool retryable = false;
bool need_retry;
+ PgAioRetryCB retry_cb = NULL;
- switch (io->type)
- {
- case PGAIO_READ_BUFFER:
- retryable = true;
- break;
-
- case PGAIO_WRITE_BUFFER:
- retryable = true;
- break;
-
- default:
- break;
- }
+ retry_cb = io_action_cbs[io->type].retry;
- if (!retryable)
- {
- elog(WARNING, "non-retryable aio being retried");
- return;
- }
+ if (!retry_cb)
+ elog(PANIC, "non-retryable aio being retried");
LWLockAcquire(SharedAIOCtlLock, LW_EXCLUSIVE);
return;
}
- switch (io->type)
- {
- case PGAIO_READ_BUFFER:
- io->d.read_buffer.fd = reopen_buffered(&io->d.read_buffer.tag);
- break;
-
- case PGAIO_WRITE_BUFFER:
- io->d.write_buffer.fd = reopen_buffered(&io->d.write_buffer.tag);
- break;
-
- default:
- break;
- }
+ retry_cb(io);
dlist_push_tail(&my_aio->pending, &io->io_node);
my_aio->pending_count++;
}
static bool
-pgaio_complete_nop(PgAioInProgress *io)
+pgaio_nop_complete(PgAioInProgress *io)
{
#ifdef PGAIO_VERBOSE
elog(DEBUG3, "completed nop");
}
static bool
-pgaio_complete_fsync(PgAioInProgress *io)
+pgaio_fsync_complete(PgAioInProgress *io)
{
#ifdef PGAIO_VERBOSE
elog(DEBUG3, "completed fsync: %zu",
}
static bool
-pgaio_complete_fsync_wal(PgAioInProgress *io)
+pgaio_fsync_wal_complete(PgAioInProgress *io)
{
#ifdef PGAIO_VERBOSE
elog(DEBUG3, "completed fsync_wal: %zu",
}
static bool
-pgaio_complete_flush_range(PgAioInProgress *io)
+pgaio_flush_range_complete(PgAioInProgress *io)
{
#ifdef PGAIO_VERBOSE
elog(DEBUG3, "completed flush_range: %zu, %s",
return true;
}
+/* stringify relfilenode, for debugging use only */
+static char *
+relpath_debug(AioBufferTag *tag)
+{
+ char *path;
+ MemoryContext oldcontext = MemoryContextSwitchTo(ErrorContext);
+
+ path = relpath(tag->rnode,
+ tag->forkNum);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return path;
+}
+
+static void
+pgaio_read_buffer_retry(PgAioInProgress *io)
+{
+ io->d.read_buffer.fd = reopen_buffered(&io->d.read_buffer.tag);
+}
+
static bool
-pgaio_complete_read_buffer(PgAioInProgress *io)
+pgaio_read_buffer_complete(PgAioInProgress *io)
{
Buffer buffer = io->d.read_buffer.buf;
if (io->result != (io->d.read_buffer.nbytes - io->d.read_buffer.already_done))
{
- MemoryContext old_context = MemoryContextSwitchTo(aio_retry_context);
-
failed = true;
//pgaio_io_print(io, NULL);
errcode_for_file_access(),
errmsg("could not read block %u in file \"%s\": %s",
io->d.read_buffer.tag.blockNum,
- relpath(io->d.read_buffer.tag.rnode,
- io->d.read_buffer.tag.forkNum),
+ relpath_debug(&io->d.read_buffer.tag),
strerror(-io->result)));
}
errmsg("aio %zd: could not read block %u in file \"%s\": read only %d of %d bytes (init: %d, cur: %d)",
io - aio_ctl->in_progress_io,
io->d.read_buffer.tag.blockNum,
- relpath(io->d.read_buffer.tag.rnode,
- io->d.read_buffer.tag.forkNum),
+ relpath_debug(&io->d.read_buffer.tag),
io->result, BLCKSZ,
io->owner_id, MyProc ? MyProc->pgprocno : INVALID_PGPROCNO));
}
-
- MemoryContextSwitchTo(old_context);
- MemoryContextReset(aio_retry_context);
}
else
{
return done;
}
+static void
+pgaio_write_buffer_retry(PgAioInProgress *io)
+{
+ io->d.write_buffer.fd = reopen_buffered(&io->d.write_buffer.tag);
+}
+
static bool
-pgaio_complete_write_buffer(PgAioInProgress *io)
+pgaio_write_buffer_complete(PgAioInProgress *io)
{
Buffer buffer = io->d.write_buffer.buf;
if (io->result != (io->d.write_buffer.nbytes - io->d.write_buffer.already_done))
{
- MemoryContext old_context = MemoryContextSwitchTo(aio_retry_context);
-
failed = true;
if (io->result < 0)
errmsg("aio %zd: could not write block %u in file \"%s\": %s",
io - aio_ctl->in_progress_io,
io->d.write_buffer.tag.blockNum,
- relpath(io->d.write_buffer.tag.rnode,
- io->d.write_buffer.tag.forkNum),
+ relpath_debug(&io->d.write_buffer.tag),
strerror(-io->result)),
errhint("Check free disk space."));
}
errmsg("aio %zd: could not write block %u in file \"%s\": wrote only %d of %d bytes (init: %d, cur: %d)",
io - aio_ctl->in_progress_io,
io->d.write_buffer.tag.blockNum,
- relpath(io->d.write_buffer.tag.rnode,
- io->d.write_buffer.tag.forkNum),
+ relpath_debug(&io->d.write_buffer.tag),
io->result, (io->d.write_buffer.nbytes - io->d.write_buffer.already_done),
io->owner_id, MyProc ? MyProc->pgprocno : INVALID_PGPROCNO)));
}
-
- MemoryContextSwitchTo(old_context);
- MemoryContextReset(aio_retry_context);
}
else
{
}
static bool
-pgaio_complete_write_wal(PgAioInProgress *io)
+pgaio_write_wal_complete(PgAioInProgress *io)
{
#ifdef PGAIO_VERBOSE
ereport(DEBUG3,
static bool
-pgaio_complete_write_generic(PgAioInProgress *io)
+pgaio_write_generic_complete(PgAioInProgress *io)
{
#ifdef PGAIO_VERBOSE
ereport(DEBUG3,