#include "pgstat.h"
#include "port/atomics.h"
#include "postmaster/bgwriter.h"
+#include "postmaster/postmaster.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
*
* The requests array holds fsync requests sent by backends and not yet
* absorbed by the checkpointer.
- *
- * Unlike the checkpoint fields, num_backend_writes, num_backend_fsync, and
- * the requests fields are protected by CheckpointerCommLock.
*----------
*/
typedef struct
{
+ uint32 type;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber segno; /* see md.c for special values */
+ bool contains_fd;
/* might add a real request-type field later; not needed yet */
} CheckpointerRequest;
+#define CKPT_REQUEST_RNODE 1
+#define CKPT_REQUEST_SYN 2
+
typedef struct
{
pid_t checkpointer_pid; /* PID (0 if not started) */
pg_atomic_uint32 num_backend_fsync; /* counts user backend fsync calls */
pg_atomic_uint32 ckpt_cycle; /* cycle */
- int num_requests; /* current # of requests */
- int max_requests; /* allocated array size */
CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
} CheckpointerShmemStruct;
static pg_time_t last_checkpoint_time;
static pg_time_t last_xlog_switch_time;
+static BlockNumber next_syn_rqst;
+static BlockNumber received_syn_rqst;
+
/* Prototypes for private functions */
static void CheckArchiveTimeout(void);
static bool IsCheckpointOnSchedule(double progress);
static bool ImmediateCheckpointRequested(void);
-static bool CompactCheckpointerRequestQueue(void);
static void UpdateSharedMemoryConfig(void);
+static void SendFsyncRequest(CheckpointerRequest *request, int fd);
+static bool AbsorbFsyncRequest(void);
/* Signal handlers */
cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);
}
- rc = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
- cur_timeout * 1000L /* convert to ms */ ,
- WAIT_EVENT_CHECKPOINTER_MAIN);
+ rc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
+ fsync_fds[FSYNC_FD_PROCESS],
+ cur_timeout * 1000L /* convert to ms */ ,
+ WAIT_EVENT_CHECKPOINTER_MAIN);
/*
* Emergency bailout if postmaster has died. This is to avoid the
{
Size size;
- /*
- * Currently, the size of the requests[] array is arbitrarily set equal to
- * NBuffers. This may prove too large or small ...
- */
size = offsetof(CheckpointerShmemStruct, requests);
- size = add_size(size, mul_size(NBuffers, sizeof(CheckpointerRequest)));
return size;
}
if (!found)
{
/*
- * First time through, so initialize. Note that we zero the whole
- * requests array; this is so that CompactCheckpointerRequestQueue can
- * assume that any pad bytes in the request structs are zeroes.
+ * First time through, so initialize.
*/
MemSet(CheckpointerShmem, 0, size);
SpinLockInit(&CheckpointerShmem->ckpt_lck);
- CheckpointerShmem->max_requests = NBuffers;
pg_atomic_init_u32(&CheckpointerShmem->ckpt_cycle, 0);
pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0);
pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0);
* the queue is full and contains no duplicate entries. In that case, we
* let the backend know by returning false.
*/
-bool
-ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+void
+ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno,
+ File file)
{
- CheckpointerRequest *request;
- bool too_full;
+ CheckpointerRequest request = {0};
if (!IsUnderPostmaster)
- return false; /* probably shouldn't even get here */
+ elog(ERROR, "ForwardFsyncRequest must not be called in single user mode");
if (AmCheckpointerProcess())
elog(ERROR, "ForwardFsyncRequest must not be called in checkpointer");
- LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
- /*
- * If the checkpointer isn't running or the request queue is full, the
- * backend will have to perform its own fsync request. But before forcing
- * that to happen, we can try to compact the request queue.
- */
- if (CheckpointerShmem->checkpointer_pid == 0 ||
- (CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests &&
- !CompactCheckpointerRequestQueue()))
- {
- /*
- * Count the subset of writes where backends have to do their own
- * fsync
- */
- if (!AmBackgroundWriterProcess())
- pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_fsync, 1);
- LWLockRelease(CheckpointerCommLock);
- return false;
- }
-
- /* OK, insert request */
- request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
- request->rnode = rnode;
- request->forknum = forknum;
- request->segno = segno;
-
- /* If queue is more than half full, nudge the checkpointer to empty it */
- too_full = (CheckpointerShmem->num_requests >=
- CheckpointerShmem->max_requests / 2);
-
- LWLockRelease(CheckpointerCommLock);
+ request.type = CKPT_REQUEST_RNODE;
+ request.rnode = rnode;
+ request.forknum = forknum;
+ request.segno = segno;
+ request.contains_fd = file != -1;
- /* ... but not till after we release the lock */
- if (too_full && ProcGlobal->checkpointerLatch)
- SetLatch(ProcGlobal->checkpointerLatch);
-
- return true;
+ SendFsyncRequest(&request, request.contains_fd ? FileGetRawDesc(file) : -1);
}
/*
- * CompactCheckpointerRequestQueue
- * Remove duplicates from the request queue to avoid backend fsyncs.
- * Returns "true" if any entries were removed.
- *
- * Although a full fsync request queue is not common, it can lead to severe
- * performance problems when it does happen. So far, this situation has
- * only been observed to occur when the system is under heavy write load,
- * and especially during the "sync" phase of a checkpoint. Without this
- * logic, each backend begins doing an fsync for every block written, which
- * gets very expensive and can slow down the whole system.
+ * AbsorbFsyncRequests
+ * Retrieve queued fsync requests and pass them to local smgr. Stop when
+ * resources would be exhausted by absorbing more.
*
- * Trying to do this every time the queue is full could lose if there
- * aren't any removable entries. But that should be vanishingly rare in
- * practice: there's one queue entry per shared buffer.
+ * This is exported because we want to continue accepting requests during
+ * mdsync().
*/
-static bool
-CompactCheckpointerRequestQueue(void)
+void
+AbsorbFsyncRequests(void)
{
- struct CheckpointerSlotMapping
- {
- CheckpointerRequest request;
- int slot;
- };
-
- int n,
- preserve_count;
- int num_skipped = 0;
- HASHCTL ctl;
- HTAB *htab;
- bool *skip_slot;
-
- /* must hold CheckpointerCommLock in exclusive mode */
- Assert(LWLockHeldByMe(CheckpointerCommLock));
-
- /* Initialize skip_slot array */
- skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
-
- /* Initialize temporary hash table */
- MemSet(&ctl, 0, sizeof(ctl));
- ctl.keysize = sizeof(CheckpointerRequest);
- ctl.entrysize = sizeof(struct CheckpointerSlotMapping);
- ctl.hcxt = CurrentMemoryContext;
-
- htab = hash_create("CompactCheckpointerRequestQueue",
- CheckpointerShmem->num_requests,
- &ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
- /*
- * The basic idea here is that a request can be skipped if it's followed
- * by a later, identical request. It might seem more sensible to work
- * backwards from the end of the queue and check whether a request is
- * *preceded* by an earlier, identical request, in the hopes of doing less
- * copying. But that might change the semantics, if there's an
- * intervening FORGET_RELATION_FSYNC or FORGET_DATABASE_FSYNC request, so
- * we do it this way. It would be possible to be even smarter if we made
- * the code below understand the specific semantics of such requests (it
- * could blow away preceding entries that would end up being canceled
- * anyhow), but it's not clear that the extra complexity would buy us
- * anything.
- */
- for (n = 0; n < CheckpointerShmem->num_requests; n++)
- {
- CheckpointerRequest *request;
- struct CheckpointerSlotMapping *slotmap;
- bool found;
-
- /*
- * We use the request struct directly as a hashtable key. This
- * assumes that any padding bytes in the structs are consistently the
- * same, which should be okay because we zeroed them in
- * CheckpointerShmemInit. Note also that RelFileNode had better
- * contain no pad bytes.
- */
- request = &CheckpointerShmem->requests[n];
- slotmap = hash_search(htab, request, HASH_ENTER, &found);
- if (found)
- {
- /* Duplicate, so mark the previous occurrence as skippable */
- skip_slot[slotmap->slot] = true;
- num_skipped++;
- }
- /* Remember slot containing latest occurrence of this request value */
- slotmap->slot = n;
- }
+ if (!AmCheckpointerProcess())
+ return;
- /* Done with the hash table. */
- hash_destroy(htab);
+ /* Transfer stats counts into pending pgstats message */
+ BgWriterStats.m_buf_written_backend +=
+ pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
+ BgWriterStats.m_buf_fsync_backend +=
+ pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
- /* If no duplicates, we're out of luck. */
- if (!num_skipped)
+ while (true)
{
- pfree(skip_slot);
- return false;
- }
+ if (!FlushFsyncRequestQueueIfNecessary())
+ break;
- /* We found some duplicates; remove them. */
- preserve_count = 0;
- for (n = 0; n < CheckpointerShmem->num_requests; n++)
- {
- if (skip_slot[n])
- continue;
- CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
+ if (!AbsorbFsyncRequest())
+ break;
}
- ereport(DEBUG1,
- (errmsg("compacted fsync request queue from %d entries to %d entries",
- CheckpointerShmem->num_requests, preserve_count)));
- CheckpointerShmem->num_requests = preserve_count;
-
- /* Cleanup. */
- pfree(skip_slot);
- return true;
}
/*
- * AbsorbFsyncRequests
- * Retrieve queued fsync requests and pass them to local smgr.
+ * AbsorbAllFsyncRequests
+ * Retrieve all already pending fsync requests and pass them to local
+ * smgr.
*
* This is exported because it must be called during CreateCheckPoint;
* we have to be sure we have accepted all pending requests just before
* non-checkpointer processes, do nothing if not checkpointer.
*/
void
-AbsorbFsyncRequests(void)
+AbsorbAllFsyncRequests(void)
{
- CheckpointerRequest *requests = NULL;
- CheckpointerRequest *request;
- int n;
+ CheckpointerRequest request = {0};
if (!AmCheckpointerProcess())
return;
- LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
/* Transfer stats counts into pending pgstats message */
BgWriterStats.m_buf_written_backend +=
pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
/*
- * We try to avoid holding the lock for a long time by copying the request
- * array, and processing the requests after releasing the lock.
- *
- * Once we have cleared the requests from shared memory, we have to PANIC
- * if we then fail to absorb them (eg, because our hashtable runs out of
- * memory). This is because the system cannot run safely if we are unable
- * to fsync what we have been told to fsync. Fortunately, the hashtable
- * is so small that the problem is quite unlikely to arise in practice.
+ * For mdsync()'s guarantees to work, all pending fsync requests need to
+ * be executed. But we don't want to absorb requests till the queue is
+ * empty, as that could take a long while. So instead we enqueue
*/
- n = CheckpointerShmem->num_requests;
- if (n > 0)
+ request.type = CKPT_REQUEST_SYN;
+ request.segno = ++next_syn_rqst;
+ SendFsyncRequest(&request, -1);
+
+ received_syn_rqst = next_syn_rqst + 1;
+ while (received_syn_rqst != request.segno)
{
- requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
- memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
+ if (!FlushFsyncRequestQueueIfNecessary())
+ elog(FATAL, "may not happen");
+
+ if (!AbsorbFsyncRequest())
+ break;
}
+}
- START_CRIT_SECTION();
+/*
+ * AbsorbFsyncRequest
+ * Retrieve one queued fsync request and pass them to local smgr.
+ */
+static bool
+AbsorbFsyncRequest(void)
+{
+ CheckpointerRequest req;
+ int fd;
+ int ret;
- CheckpointerShmem->num_requests = 0;
+ ReleaseLruFiles();
- LWLockRelease(CheckpointerCommLock);
+ START_CRIT_SECTION();
+ ret = pg_uds_recv_with_fd(fsync_fds[FSYNC_FD_PROCESS], &req, sizeof(req), &fd);
+ if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+ {
+ END_CRIT_SECTION();
+ return false;
+ }
+ else if (ret < 0)
+ elog(ERROR, "recvmsg failed: %m");
- for (request = requests; n > 0; request++, n--)
- RememberFsyncRequest(request->rnode, request->forknum, request->segno);
+ if (req.contains_fd != (fd != -1))
+ {
+ elog(FATAL, "message should have fd associated, but doesn't");
+ }
+ if (req.type == CKPT_REQUEST_SYN)
+ {
+ received_syn_rqst = req.segno;
+ Assert(fd == -1);
+ }
+ else
+ {
+ RememberFsyncRequest(req.rnode, req.forknum, req.segno, fd);
+ }
END_CRIT_SECTION();
- if (requests)
- pfree(requests);
+ return true;
}
/*
{
return pg_atomic_fetch_add_u32(&CheckpointerShmem->ckpt_cycle, 1);
}
+
+void
+CountBackendWrite(void)
+{
+ pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_writes, 1);
+}
+
+static void
+SendFsyncRequest(CheckpointerRequest *request, int fd)
+{
+ ssize_t ret;
+
+ while (true)
+ {
+ ret = pg_uds_send_with_fd(fsync_fds[FSYNC_FD_SUBMIT], request, sizeof(*request),
+ request->contains_fd ? fd : -1);
+
+ if (ret >= 0)
+ {
+ /*
+ * Don't think short reads will ever happen in realistic
+ * implementations, but better make sure that's true...
+ */
+ if (ret != sizeof(*request))
+ elog(FATAL, "oops, gotta do better");
+ break;
+ }
+ else if (errno == EWOULDBLOCK || errno == EAGAIN)
+ {
+ /* blocked on write - wait for socket to become readable */
+ /* FIXME: postmaster death? Other interrupts? */
+ WaitLatchOrSocket(NULL, WL_SOCKET_WRITEABLE, fsync_fds[FSYNC_FD_SUBMIT], -1, 0);
+ }
+ else
+ {
+ ereport(FATAL, (errmsg("could not receive fsync request: %m")));
+ }
+ }
+}
CycleCtr cycle_ctr; /* sync cycle of oldest request */
/* requests[f] has bit n set if we need to fsync segment n of fork f */
Bitmapset *requests[MAX_FORKNUM + 1];
- /* canceled[f] is true if we canceled fsyncs for fork "recently" */
- bool canceled[MAX_FORKNUM + 1];
+ File *syncfds[MAX_FORKNUM + 1];
+ int syncfd_len[MAX_FORKNUM + 1];
} PendingOperationEntry;
typedef struct
CycleCtr cycle_ctr; /* mdckpt_cycle_ctr when request was made */
} PendingUnlinkEntry;
+static uint32 open_fsync_queue_files = 0;
+static bool mdsync_in_progress = false;
static HTAB *pendingOpsTable = NULL;
static List *pendingUnlinks = NIL;
static MemoryContext pendingOpsCxt; /* context for the above */
BlockNumber blkno, bool skipFsync, int behavior);
static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
MdfdVec *seg);
+static char *mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno);
+static void mdsyncpass(bool include_current);
/*
}
/*
- * mdsync() -- Sync previous writes to stable storage.
+ * Do one pass over the the fsync request hashtable and perform the necessary
+ * fsyncs. Increments the mdsync cycle counter.
+ *
+ * If include_current is true perform all fsyncs (this is done if too many
+ * files are open), otherwise only perform the fsyncs belonging to the cycle
+ * valid at call time.
*/
-void
-mdsync(void)
+static void
+mdsyncpass(bool include_current)
{
- static bool mdsync_in_progress = false;
-
HASH_SEQ_STATUS hstat;
PendingOperationEntry *entry;
int absorb_counter;
/* Statistics on sync times */
- int processed = 0;
instr_time sync_start,
sync_end,
sync_diff;
uint64 elapsed;
- uint64 longest = 0;
- uint64 total_elapsed = 0;
-
- /*
- * This is only called during checkpoints, and checkpoints should only
- * occur in processes that have created a pendingOpsTable.
- */
- if (!pendingOpsTable)
- elog(ERROR, "cannot sync without a pendingOpsTable");
-
- /*
- * If we are in the checkpointer, the sync had better include all fsync
- * requests that were queued by backends up to this point. The tightest
- * race condition that could occur is that a buffer that must be written
- * and fsync'd for the checkpoint could have been dumped by a backend just
- * before it was visited by BufferSync(). We know the backend will have
- * queued an fsync request before clearing the buffer's dirtybit, so we
- * are safe as long as we do an Absorb after completing BufferSync().
- */
- AbsorbFsyncRequests();
+ int processed = CheckpointStats.ckpt_sync_rels;
+ uint64 longest = CheckpointStats.ckpt_longest_sync;
+ uint64 total_elapsed = CheckpointStats.ckpt_agg_sync_time;
/*
* To avoid excess fsync'ing (in the worst case, maybe a never-terminating
while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL)
{
ForkNumber forknum;
+ bool has_remaining;
/*
- * If the entry is new then don't process it this time; it might
- * contain multiple fsync-request bits, but they are all new. Note
- * "continue" bypasses the hash-remove call at the bottom of the loop.
+ * If processing fsync requests because of too may file handles, close
+ * regardless of cycle. Otherwise nothing to be closed might be found,
+ * and we want to make room as quickly as possible so more requests
+ * can be absorbed.
*/
- if (entry->cycle_ctr == GetCheckpointSyncCycle())
- continue;
+ if (!include_current)
+ {
+ /*
+ * If the entry is new then don't process it this time; it might
+ * contain multiple fsync-request bits, but they are all new. Note
+ * "continue" bypasses the hash-remove call at the bottom of the loop.
+ */
+ if (entry->cycle_ctr == GetCheckpointSyncCycle())
+ continue;
- /* Else assert we haven't missed it */
- Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+ /* Else assert we haven't missed it */
+ Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+ }
/*
* Scan over the forks and segments represented by the entry.
*/
for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
- Bitmapset *requests = entry->requests[forknum];
int segno;
- entry->requests[forknum] = NULL;
- entry->canceled[forknum] = false;
-
- while ((segno = bms_first_member(requests)) >= 0)
+ segno = -1;
+ while ((segno = bms_next_member(entry->requests[forknum], segno)) >= 0)
{
- int failures;
+ int returnCode;
/*
- * If fsync is off then we don't have to bother opening the
- * file at all. (We delay checking until this point so that
- * changing fsync on the fly behaves sensibly.)
+ * Temporarily mark as processed. Have to do so before
+ * absorbing further requests, otherwise we might delete a new
+ * requests in a new cycle.
*/
- if (!enableFsync)
- continue;
+ bms_del_member(entry->requests[forknum], segno);
- /*
- * If in checkpointer, we want to absorb pending requests
- * every so often to prevent overflow of the fsync request
- * queue. It is unspecified whether newly-added entries will
- * be visited by hash_seq_search, but we don't care since we
- * don't need to process them anyway.
- */
- if (--absorb_counter <= 0)
+ if (entry->syncfd_len[forknum] <= segno ||
+ entry->syncfds[forknum][segno] == -1)
{
- AbsorbFsyncRequests();
- absorb_counter = FSYNCS_PER_ABSORB;
+ /*
+ * Optionally open file, if we want to support not
+ * transporting fds as well.
+ */
+ elog(FATAL, "file not opened");
}
/*
- * The fsync table could contain requests to fsync segments
- * that have been deleted (unlinked) by the time we get to
- * them. Rather than just hoping an ENOENT (or EACCES on
- * Windows) error can be ignored, what we do on error is
- * absorb pending requests and then retry. Since mdunlink()
- * queues a "cancel" message before actually unlinking, the
- * fsync request is guaranteed to be marked canceled after the
- * absorb if it really was this case. DROP DATABASE likewise
- * has to tell us to forget fsync requests before it starts
- * deletions.
+ * If fsync is off then we don't have to bother opening the
+ * file at all. (We delay checking until this point so that
+ * changing fsync on the fly behaves sensibly.)
+ *
+ * XXX: Why is that an important goal? Doesn't give any
+ * interesting guarantees afaict?
*/
- for (failures = 0;; failures++) /* loop exits at "break" */
+ if (enableFsync)
{
- SMgrRelation reln;
- MdfdVec *seg;
- char *path;
- int save_errno;
-
/*
- * Find or create an smgr hash entry for this relation.
- * This may seem a bit unclean -- md calling smgr? But
- * it's really the best solution. It ensures that the
- * open file reference isn't permanently leaked if we get
- * an error here. (You may say "but an unreferenced
- * SMgrRelation is still a leak!" Not really, because the
- * only case in which a checkpoint is done by a process
- * that isn't about to shut down is in the checkpointer,
- * and it will periodically do smgrcloseall(). This fact
- * justifies our not closing the reln in the success path
- * either, which is a good thing since in non-checkpointer
- * cases we couldn't safely do that.)
+ * The fsync table could contain requests to fsync
+ * segments that have been deleted (unlinked) by the time
+ * we get to them. That used to be problematic, but now
+ * we have a filehandle to the deleted file. That means we
+ * might fsync an empty file superfluously, in a
+ * relatively tight window, which is acceptable.
*/
- reln = smgropen(entry->rnode, InvalidBackendId);
-
- /* Attempt to open and fsync the target segment */
- seg = _mdfd_getseg(reln, forknum,
- (BlockNumber) segno * (BlockNumber) RELSEG_SIZE,
- false,
- EXTENSION_RETURN_NULL
- | EXTENSION_DONT_CHECK_SIZE);
INSTR_TIME_SET_CURRENT(sync_start);
- if (seg != NULL &&
- FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) >= 0)
- {
- /* Success; update statistics about sync timing */
- INSTR_TIME_SET_CURRENT(sync_end);
- sync_diff = sync_end;
- INSTR_TIME_SUBTRACT(sync_diff, sync_start);
- elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
- if (elapsed > longest)
- longest = elapsed;
- total_elapsed += elapsed;
- processed++;
- if (log_checkpoints)
- elog(DEBUG1, "checkpoint sync: number=%d file=%s time=%.3f msec",
- processed,
- FilePathName(seg->mdfd_vfd),
- (double) elapsed / 1000);
-
- break; /* out of retry loop */
- }
+ returnCode = FileSync(entry->syncfds[forknum][segno], WAIT_EVENT_DATA_FILE_SYNC);
- /* Compute file name for use in message */
- save_errno = errno;
- path = _mdfd_segpath(reln, forknum, (BlockNumber) segno);
- errno = save_errno;
+ if (returnCode < 0)
+ {
+ /* XXX: decide on policy */
+ bms_add_member(entry->requests[forknum], segno);
- /*
- * It is possible that the relation has been dropped or
- * truncated since the fsync request was entered.
- * Therefore, allow ENOENT, but only if we didn't fail
- * already on this file. This applies both for
- * _mdfd_getseg() and for FileSync, since fd.c might have
- * closed the file behind our back.
- *
- * XXX is there any point in allowing more than one retry?
- * Don't see one at the moment, but easy to change the
- * test here if so.
- */
- if (!FILE_POSSIBLY_DELETED(errno) ||
- failures > 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not fsync file \"%s\": %m",
- path)));
- else
+ FilePathName(entry->syncfds[forknum][segno]))));
+ }
+
+ /* Success; update statistics about sync timing */
+ INSTR_TIME_SET_CURRENT(sync_end);
+ sync_diff = sync_end;
+ INSTR_TIME_SUBTRACT(sync_diff, sync_start);
+ elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
+ if (elapsed > longest)
+ longest = elapsed;
+ total_elapsed += elapsed;
+ processed++;
+ if (log_checkpoints)
ereport(DEBUG1,
- (errcode_for_file_access(),
- errmsg("could not fsync file \"%s\" but retrying: %m",
- path)));
- pfree(path);
+ (errmsg("checkpoint sync: number=%d file=%s time=%.3f msec",
+ processed,
+ FilePathName(entry->syncfds[forknum][segno]),
+ (double) elapsed / 1000),
+ errhidestmt(true),
+ errhidecontext(true)));
+ }
+
+ /*
+ * It shouldn't be possible for a new request to arrive during
+ * the fsync (on error this will not be reached).
+ */
+ Assert(!bms_is_member(segno, entry->requests[forknum]));
+ /*
+ * Close file. XXX: centralize code.
+ */
+ {
+ open_fsync_queue_files--;
+ FileClose(entry->syncfds[forknum][segno]);
+ entry->syncfds[forknum][segno] = -1;
+ }
+
+ /*
+ * If in checkpointer, we want to absorb pending requests every so
+ * often to prevent overflow of the fsync request queue. It is
+ * unspecified whether newly-added entries will be visited by
+ * hash_seq_search, but we don't care since we don't need to process
+ * them anyway.
+ */
+ if (absorb_counter-- <= 0)
+ {
/*
- * Absorb incoming requests and check to see if a cancel
- * arrived for this relation fork.
+ * Don't absorb if too many files are open. This pass will
+ * soon close some, so check again later.
*/
- AbsorbFsyncRequests();
- absorb_counter = FSYNCS_PER_ABSORB; /* might as well... */
-
- if (entry->canceled[forknum])
- break;
- } /* end retry loop */
+ if (open_fsync_queue_files < ((max_safe_fds * 7) / 10))
+ AbsorbFsyncRequests();
+ absorb_counter = FSYNCS_PER_ABSORB;
+ }
}
- bms_free(requests);
}
/*
- * We've finished everything that was requested before we started to
- * scan the entry. If no new requests have been inserted meanwhile,
- * remove the entry. Otherwise, update its cycle counter, as all the
- * requests now in it must have arrived during this cycle.
+ * We've finished everything for the file that was requested before we
+ * started to scan the entry. If no new requests have been inserted
+ * meanwhile, remove the entry. Otherwise, update its cycle counter,
+ * as all the requests now in it must have arrived during this cycle.
+ *
+ * This needs to be checked separately from the above for-each-fork
+ * loop, as new requests for this relation could have been absorbed.
*/
+ has_remaining = false;
for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
- if (entry->requests[forknum] != NULL)
- break;
+ if (bms_is_empty(entry->requests[forknum]))
+ {
+ if (entry->syncfds[forknum])
+ {
+ pfree(entry->syncfds[forknum]);
+ entry->syncfds[forknum] = NULL;
+ }
+ bms_free(entry->requests[forknum]);
+ entry->requests[forknum] = NULL;
+ }
+ else
+ has_remaining = true;
}
- if (forknum <= MAX_FORKNUM)
+ if (has_remaining)
entry->cycle_ctr = GetCheckpointSyncCycle();
else
{
}
} /* end loop over hashtable entries */
- /* Return sync performance metrics for report at checkpoint end */
+ /* Flag successful completion of mdsync */
+ mdsync_in_progress = false;
+
+ /* Maintain sync performance metrics for report at checkpoint end */
CheckpointStats.ckpt_sync_rels = processed;
CheckpointStats.ckpt_longest_sync = longest;
CheckpointStats.ckpt_agg_sync_time = total_elapsed;
+}
- /* Flag successful completion of mdsync */
- mdsync_in_progress = false;
+/*
+ * mdsync() -- Sync previous writes to stable storage.
+ */
+void
+mdsync(void)
+{
+ /*
+ * This is only called during checkpoints, and checkpoints should only
+ * occur in processes that have created a pendingOpsTable.
+ */
+ if (!pendingOpsTable)
+ elog(ERROR, "cannot sync without a pendingOpsTable");
+
+ /*
+ * If we are in the checkpointer, the sync had better include all fsync
+ * requests that were queued by backends up to this point. The tightest
+ * race condition that could occur is that a buffer that must be written
+ * and fsync'd for the checkpoint could have been dumped by a backend just
+ * before it was visited by BufferSync(). We know the backend will have
+ * queued an fsync request before clearing the buffer's dirtybit, so we
+ * are safe as long as we do an Absorb after completing BufferSync().
+ */
+ AbsorbAllFsyncRequests();
+
+ mdsyncpass(false);
+}
+
+/*
+ * Flush the fsync request queue enough to make sure there's room for at least
+ * one more entry.
+ */
+bool
+FlushFsyncRequestQueueIfNecessary(void)
+{
+ if (mdsync_in_progress)
+ return false;
+
+ while (true)
+ {
+ if (open_fsync_queue_files >= ((max_safe_fds * 7) / 10))
+ {
+ elog(DEBUG1,
+ "flush fsync request queue due to %u open files",
+ open_fsync_queue_files);
+ mdsyncpass(true);
+ elog(DEBUG1,
+ "flushed fsync request, now at %u open files",
+ open_fsync_queue_files);
+ }
+ else
+ break;
+ }
+
+ return true;
}
/*
*/
if (--absorb_counter <= 0)
{
- AbsorbFsyncRequests();
+ /* XXX: Centralize this condition */
+ if (open_fsync_queue_files < ((max_safe_fds * 7) / 10))
+ AbsorbFsyncRequests();
absorb_counter = UNLINKS_PER_ABSORB;
}
}
}
+
+/*
+ * Return the filename for the specified segment of the relation. The
+ * returned string is palloc'd.
+ */
+static char *
+mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+{
+ char *path,
+ *fullpath;
+
+ path = relpathperm(rnode, forknum);
+
+ if (segno > 0)
+ {
+ fullpath = psprintf("%s.%u", path, segno);
+ pfree(path);
+ }
+ else
+ fullpath = path;
+
+ return fullpath;
+}
+
/*
* register_dirty_segment() -- Mark a relation segment as needing fsync
*
pg_memory_barrier();
cycle = GetCheckpointSyncCycle();
+ /*
+ * For historical reasons checkpointer keeps track of the number of time
+ * backends perform writes themselves.
+ */
+ if (!AmBackgroundWriterProcess())
+ CountBackendWrite();
+
/*
* Don't repeatedly register the same segment as dirty.
*
if (pendingOpsTable)
{
- /* push it into local pending-ops table */
- RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno);
- seg->mdfd_dirtied_cycle = cycle;
- }
- else
- {
- if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno))
- {
- seg->mdfd_dirtied_cycle = cycle;
- return; /* passed it off successfully */
- }
-
- ereport(DEBUG1,
- (errmsg("could not forward fsync request because request queue is full")));
+ int fd;
- if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync file \"%s\": %m",
- FilePathName(seg->mdfd_vfd))));
+ /*
+ * Push it into local pending-ops table.
+ *
+ * Gotta duplicate the fd - we can't have fd.c close it behind our
+ * back, as that'd lead to loosing error reporting guarantees on
+ * linux. RememberFsyncRequest() will manage the lifetime.
+ */
+ ReleaseLruFiles();
+ fd = dup(FileGetRawDesc(seg->mdfd_vfd));
+ if (fd < 0)
+ elog(ERROR, "couldn't dup: %m");
+ RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, fd);
}
+ else
+ ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, seg->mdfd_vfd);
}
/*
{
/* push it into local pending-ops table */
RememberFsyncRequest(rnode.node, MAIN_FORKNUM,
- UNLINK_RELATION_REQUEST);
+ UNLINK_RELATION_REQUEST,
+ -1);
}
else
{
- /*
- * Notify the checkpointer about it. If we fail to queue the request
- * message, we have to sleep and try again, because we can't simply
- * delete the file now. Ugly, but hopefully won't happen often.
- *
- * XXX should we just leave the file orphaned instead?
- */
+ /* Notify the checkpointer about it. */
Assert(IsUnderPostmaster);
- while (!ForwardFsyncRequest(rnode.node, MAIN_FORKNUM,
- UNLINK_RELATION_REQUEST))
- pg_usleep(10000L); /* 10 msec seems a good number */
+ ForwardFsyncRequest(rnode.node, MAIN_FORKNUM, UNLINK_RELATION_REQUEST, -1);
}
}
* heavyweight operation anyhow, so we'll live with it.)
*/
void
-RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno, int fd)
{
Assert(pendingOpsTable);
/*
* We can't just delete the entry since mdsync could have an
* active hashtable scan. Instead we delete the bitmapsets; this
- * is safe because of the way mdsync is coded. We also set the
- * "canceled" flags so that mdsync can tell that a cancel arrived
- * for the fork(s).
+ * is safe because of the way mdsync is coded.
*/
if (forknum == InvalidForkNumber)
{
/* remove requests for all forks */
for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
+ int segno;
+
bms_free(entry->requests[forknum]);
entry->requests[forknum] = NULL;
- entry->canceled[forknum] = true;
+
+ for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+ {
+ if (entry->syncfds[forknum][segno] != -1)
+ {
+ open_fsync_queue_files--;
+ FileClose(entry->syncfds[forknum][segno]);
+ entry->syncfds[forknum][segno] = -1;
+ }
+ }
+
}
}
else
/* remove requests for single fork */
bms_free(entry->requests[forknum]);
entry->requests[forknum] = NULL;
- entry->canceled[forknum] = true;
+
+ for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+ {
+ if (entry->syncfds[forknum][segno] != -1)
+ {
+ open_fsync_queue_files--;
+ FileClose(entry->syncfds[forknum][segno]);
+ entry->syncfds[forknum][segno] = -1;
+ }
+ }
}
}
}
{
bms_free(entry->requests[forknum]);
entry->requests[forknum] = NULL;
- entry->canceled[forknum] = true;
}
}
}
{
entry->cycle_ctr = GetCheckpointSyncCycle();
MemSet(entry->requests, 0, sizeof(entry->requests));
- MemSet(entry->canceled, 0, sizeof(entry->canceled));
+ MemSet(entry->syncfds, 0, sizeof(entry->syncfds));
+ MemSet(entry->syncfd_len, 0, sizeof(entry->syncfd_len));
}
/*
entry->requests[forknum] = bms_add_member(entry->requests[forknum],
(int) segno);
+ if (fd >= 0)
+ {
+ /* make space for entry */
+ if (entry->syncfds[forknum] == NULL)
+ {
+ int i;
+
+ entry->syncfds[forknum] = palloc(sizeof(File) * (segno + 1));
+ entry->syncfd_len[forknum] = segno + 1;
+
+ for (i = 0; i <= segno; i++)
+ entry->syncfds[forknum][i] = -1;
+ }
+ else if (entry->syncfd_len[forknum] <= segno)
+ {
+ int i;
+
+ entry->syncfds[forknum] = repalloc(entry->syncfds[forknum],
+ sizeof(File) * (segno + 1));
+
+ /* initialize newly created entries */
+ for (i = entry->syncfd_len[forknum]; i <= segno; i++)
+ entry->syncfds[forknum][i] = -1;
+
+ entry->syncfd_len[forknum] = segno + 1;
+ }
+
+ if (entry->syncfds[forknum][segno] == -1)
+ {
+ char *path = mdpath(entry->rnode, forknum, segno);
+ open_fsync_queue_files++;
+ /* caller must have reserved entry */
+ entry->syncfds[forknum][segno] =
+ FileOpenForFd(fd, path);
+ pfree(path);
+ }
+ else
+ {
+ /*
+ * File is already open. Have to keep the older fd, errors
+ * might only be reported to it, thus close the one we just
+ * got.
+ *
+ * XXX: check for errrors.
+ */
+ close(fd);
+ }
+
+ FlushFsyncRequestQueueIfNecessary();
+ }
+
MemoryContextSwitchTo(oldcxt);
}
}
if (pendingOpsTable)
{
/* standalone backend or startup process: fsync state is local */
- RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC);
+ RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
}
else if (IsUnderPostmaster)
{
- /*
- * Notify the checkpointer about it. If we fail to queue the cancel
- * message, we have to sleep and try again ... ugly, but hopefully
- * won't happen often.
- *
- * XXX should we CHECK_FOR_INTERRUPTS in this loop? Escaping with an
- * error would leave the no-longer-used file still present on disk,
- * which would be bad, so I'm inclined to assume that the checkpointer
- * will always empty the queue soon.
- */
- while (!ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC))
- pg_usleep(10000L); /* 10 msec seems a good number */
+ /* Notify the checkpointer about it. */
+ ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
/*
* Note we don't wait for the checkpointer to actually absorb the
if (pendingOpsTable)
{
/* standalone backend or startup process: fsync state is local */
- RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC);
+ RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
}
else if (IsUnderPostmaster)
{
/* see notes in ForgetRelationFsyncRequests */
- while (!ForwardFsyncRequest(rnode, InvalidForkNumber,
- FORGET_DATABASE_FSYNC))
- pg_usleep(10000L); /* 10 msec seems a good number */
+ ForwardFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
}
}