From: Andres Freund Date: Mon, 21 May 2018 22:43:30 +0000 (-0700) Subject: Heavily-WIP: Send file descriptors to checkpointer for fsyncing. X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fclone-fd-checkpointer;p=users%2Fandresfreund%2Fpostgres.git Heavily-WIP: Send file descriptors to checkpointer for fsyncing. This addresses the issue that, at least on linux, fsyncs only reliably see errors that occurred after they've been opeend. Author: Reviewed-By: Discussion: https://postgr.es/m/ Backpatch: --- diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index adbd6a2126..427774152e 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8634,8 +8634,10 @@ CreateCheckPoint(int flags) * Note: because it is possible for log_checkpoints to change while a * checkpoint proceeds, we always accumulate stats, even if * log_checkpoints is currently off. + * + * Note #2: this is reset at the end of the checkpoint, not here, because + * we might have to fsync before getting here (see mdsync()). */ - MemSet(&CheckpointStats, 0, sizeof(CheckpointStats)); CheckpointStats.ckpt_start_t = GetCurrentTimestamp(); /* @@ -8999,6 +9001,9 @@ CreateCheckPoint(int flags) CheckpointStats.ckpt_segs_recycled); LWLockRelease(CheckpointLock); + + /* reset stats */ + MemSet(&CheckpointStats, 0, sizeof(CheckpointStats)); } /* diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 333eb91c9d..c2be529bca 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -48,6 +48,7 @@ #include "pgstat.h" #include "port/atomics.h" #include "postmaster/bgwriter.h" +#include "postmaster/postmaster.h" #include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/condition_variable.h" @@ -102,19 +103,21 @@ * * The requests array holds fsync requests sent by backends and not yet * absorbed by the checkpointer. - * - * Unlike the checkpoint fields, num_backend_writes, num_backend_fsync, and - * the requests fields are protected by CheckpointerCommLock. *---------- */ typedef struct { + uint32 type; RelFileNode rnode; ForkNumber forknum; BlockNumber segno; /* see md.c for special values */ + bool contains_fd; /* might add a real request-type field later; not needed yet */ } CheckpointerRequest; +#define CKPT_REQUEST_RNODE 1 +#define CKPT_REQUEST_SYN 2 + typedef struct { pid_t checkpointer_pid; /* PID (0 if not started) */ @@ -131,8 +134,6 @@ typedef struct pg_atomic_uint32 num_backend_fsync; /* counts user backend fsync calls */ pg_atomic_uint32 ckpt_cycle; /* cycle */ - int num_requests; /* current # of requests */ - int max_requests; /* allocated array size */ CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER]; } CheckpointerShmemStruct; @@ -168,13 +169,17 @@ static double ckpt_cached_elapsed; static pg_time_t last_checkpoint_time; static pg_time_t last_xlog_switch_time; +static BlockNumber next_syn_rqst; +static BlockNumber received_syn_rqst; + /* Prototypes for private functions */ static void CheckArchiveTimeout(void); static bool IsCheckpointOnSchedule(double progress); static bool ImmediateCheckpointRequested(void); -static bool CompactCheckpointerRequestQueue(void); static void UpdateSharedMemoryConfig(void); +static void SendFsyncRequest(CheckpointerRequest *request, int fd); +static bool AbsorbFsyncRequest(void); /* Signal handlers */ @@ -557,10 +562,11 @@ CheckpointerMain(void) cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs); } - rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - cur_timeout * 1000L /* convert to ms */ , - WAIT_EVENT_CHECKPOINTER_MAIN); + rc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE, + fsync_fds[FSYNC_FD_PROCESS], + cur_timeout * 1000L /* convert to ms */ , + WAIT_EVENT_CHECKPOINTER_MAIN); /* * Emergency bailout if postmaster has died. This is to avoid the @@ -910,12 +916,7 @@ CheckpointerShmemSize(void) { Size size; - /* - * Currently, the size of the requests[] array is arbitrarily set equal to - * NBuffers. This may prove too large or small ... - */ size = offsetof(CheckpointerShmemStruct, requests); - size = add_size(size, mul_size(NBuffers, sizeof(CheckpointerRequest))); return size; } @@ -938,13 +939,10 @@ CheckpointerShmemInit(void) if (!found) { /* - * First time through, so initialize. Note that we zero the whole - * requests array; this is so that CompactCheckpointerRequestQueue can - * assume that any pad bytes in the request structs are zeroes. + * First time through, so initialize. */ MemSet(CheckpointerShmem, 0, size); SpinLockInit(&CheckpointerShmem->ckpt_lck); - CheckpointerShmem->max_requests = NBuffers; pg_atomic_init_u32(&CheckpointerShmem->ckpt_cycle, 0); pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0); pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0); @@ -1124,176 +1122,61 @@ RequestCheckpoint(int flags) * the queue is full and contains no duplicate entries. In that case, we * let the backend know by returning false. */ -bool -ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) +void +ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno, + File file) { - CheckpointerRequest *request; - bool too_full; + CheckpointerRequest request = {0}; if (!IsUnderPostmaster) - return false; /* probably shouldn't even get here */ + elog(ERROR, "ForwardFsyncRequest must not be called in single user mode"); if (AmCheckpointerProcess()) elog(ERROR, "ForwardFsyncRequest must not be called in checkpointer"); - LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); - - /* - * If the checkpointer isn't running or the request queue is full, the - * backend will have to perform its own fsync request. But before forcing - * that to happen, we can try to compact the request queue. - */ - if (CheckpointerShmem->checkpointer_pid == 0 || - (CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests && - !CompactCheckpointerRequestQueue())) - { - /* - * Count the subset of writes where backends have to do their own - * fsync - */ - if (!AmBackgroundWriterProcess()) - pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_fsync, 1); - LWLockRelease(CheckpointerCommLock); - return false; - } - - /* OK, insert request */ - request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++]; - request->rnode = rnode; - request->forknum = forknum; - request->segno = segno; - - /* If queue is more than half full, nudge the checkpointer to empty it */ - too_full = (CheckpointerShmem->num_requests >= - CheckpointerShmem->max_requests / 2); - - LWLockRelease(CheckpointerCommLock); + request.type = CKPT_REQUEST_RNODE; + request.rnode = rnode; + request.forknum = forknum; + request.segno = segno; + request.contains_fd = file != -1; - /* ... but not till after we release the lock */ - if (too_full && ProcGlobal->checkpointerLatch) - SetLatch(ProcGlobal->checkpointerLatch); - - return true; + SendFsyncRequest(&request, request.contains_fd ? FileGetRawDesc(file) : -1); } /* - * CompactCheckpointerRequestQueue - * Remove duplicates from the request queue to avoid backend fsyncs. - * Returns "true" if any entries were removed. - * - * Although a full fsync request queue is not common, it can lead to severe - * performance problems when it does happen. So far, this situation has - * only been observed to occur when the system is under heavy write load, - * and especially during the "sync" phase of a checkpoint. Without this - * logic, each backend begins doing an fsync for every block written, which - * gets very expensive and can slow down the whole system. + * AbsorbFsyncRequests + * Retrieve queued fsync requests and pass them to local smgr. Stop when + * resources would be exhausted by absorbing more. * - * Trying to do this every time the queue is full could lose if there - * aren't any removable entries. But that should be vanishingly rare in - * practice: there's one queue entry per shared buffer. + * This is exported because we want to continue accepting requests during + * mdsync(). */ -static bool -CompactCheckpointerRequestQueue(void) +void +AbsorbFsyncRequests(void) { - struct CheckpointerSlotMapping - { - CheckpointerRequest request; - int slot; - }; - - int n, - preserve_count; - int num_skipped = 0; - HASHCTL ctl; - HTAB *htab; - bool *skip_slot; - - /* must hold CheckpointerCommLock in exclusive mode */ - Assert(LWLockHeldByMe(CheckpointerCommLock)); - - /* Initialize skip_slot array */ - skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests); - - /* Initialize temporary hash table */ - MemSet(&ctl, 0, sizeof(ctl)); - ctl.keysize = sizeof(CheckpointerRequest); - ctl.entrysize = sizeof(struct CheckpointerSlotMapping); - ctl.hcxt = CurrentMemoryContext; - - htab = hash_create("CompactCheckpointerRequestQueue", - CheckpointerShmem->num_requests, - &ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - /* - * The basic idea here is that a request can be skipped if it's followed - * by a later, identical request. It might seem more sensible to work - * backwards from the end of the queue and check whether a request is - * *preceded* by an earlier, identical request, in the hopes of doing less - * copying. But that might change the semantics, if there's an - * intervening FORGET_RELATION_FSYNC or FORGET_DATABASE_FSYNC request, so - * we do it this way. It would be possible to be even smarter if we made - * the code below understand the specific semantics of such requests (it - * could blow away preceding entries that would end up being canceled - * anyhow), but it's not clear that the extra complexity would buy us - * anything. - */ - for (n = 0; n < CheckpointerShmem->num_requests; n++) - { - CheckpointerRequest *request; - struct CheckpointerSlotMapping *slotmap; - bool found; - - /* - * We use the request struct directly as a hashtable key. This - * assumes that any padding bytes in the structs are consistently the - * same, which should be okay because we zeroed them in - * CheckpointerShmemInit. Note also that RelFileNode had better - * contain no pad bytes. - */ - request = &CheckpointerShmem->requests[n]; - slotmap = hash_search(htab, request, HASH_ENTER, &found); - if (found) - { - /* Duplicate, so mark the previous occurrence as skippable */ - skip_slot[slotmap->slot] = true; - num_skipped++; - } - /* Remember slot containing latest occurrence of this request value */ - slotmap->slot = n; - } + if (!AmCheckpointerProcess()) + return; - /* Done with the hash table. */ - hash_destroy(htab); + /* Transfer stats counts into pending pgstats message */ + BgWriterStats.m_buf_written_backend += + pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0); + BgWriterStats.m_buf_fsync_backend += + pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0); - /* If no duplicates, we're out of luck. */ - if (!num_skipped) + while (true) { - pfree(skip_slot); - return false; - } + if (!FlushFsyncRequestQueueIfNecessary()) + break; - /* We found some duplicates; remove them. */ - preserve_count = 0; - for (n = 0; n < CheckpointerShmem->num_requests; n++) - { - if (skip_slot[n]) - continue; - CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n]; + if (!AbsorbFsyncRequest()) + break; } - ereport(DEBUG1, - (errmsg("compacted fsync request queue from %d entries to %d entries", - CheckpointerShmem->num_requests, preserve_count))); - CheckpointerShmem->num_requests = preserve_count; - - /* Cleanup. */ - pfree(skip_slot); - return true; } /* - * AbsorbFsyncRequests - * Retrieve queued fsync requests and pass them to local smgr. + * AbsorbAllFsyncRequests + * Retrieve all already pending fsync requests and pass them to local + * smgr. * * This is exported because it must be called during CreateCheckPoint; * we have to be sure we have accepted all pending requests just before @@ -1301,17 +1184,13 @@ CompactCheckpointerRequestQueue(void) * non-checkpointer processes, do nothing if not checkpointer. */ void -AbsorbFsyncRequests(void) +AbsorbAllFsyncRequests(void) { - CheckpointerRequest *requests = NULL; - CheckpointerRequest *request; - int n; + CheckpointerRequest request = {0}; if (!AmCheckpointerProcess()) return; - LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); - /* Transfer stats counts into pending pgstats message */ BgWriterStats.m_buf_written_backend += pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0); @@ -1319,35 +1198,65 @@ AbsorbFsyncRequests(void) pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0); /* - * We try to avoid holding the lock for a long time by copying the request - * array, and processing the requests after releasing the lock. - * - * Once we have cleared the requests from shared memory, we have to PANIC - * if we then fail to absorb them (eg, because our hashtable runs out of - * memory). This is because the system cannot run safely if we are unable - * to fsync what we have been told to fsync. Fortunately, the hashtable - * is so small that the problem is quite unlikely to arise in practice. + * For mdsync()'s guarantees to work, all pending fsync requests need to + * be executed. But we don't want to absorb requests till the queue is + * empty, as that could take a long while. So instead we enqueue */ - n = CheckpointerShmem->num_requests; - if (n > 0) + request.type = CKPT_REQUEST_SYN; + request.segno = ++next_syn_rqst; + SendFsyncRequest(&request, -1); + + received_syn_rqst = next_syn_rqst + 1; + while (received_syn_rqst != request.segno) { - requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest)); - memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest)); + if (!FlushFsyncRequestQueueIfNecessary()) + elog(FATAL, "may not happen"); + + if (!AbsorbFsyncRequest()) + break; } +} - START_CRIT_SECTION(); +/* + * AbsorbFsyncRequest + * Retrieve one queued fsync request and pass them to local smgr. + */ +static bool +AbsorbFsyncRequest(void) +{ + CheckpointerRequest req; + int fd; + int ret; - CheckpointerShmem->num_requests = 0; + ReleaseLruFiles(); - LWLockRelease(CheckpointerCommLock); + START_CRIT_SECTION(); + ret = pg_uds_recv_with_fd(fsync_fds[FSYNC_FD_PROCESS], &req, sizeof(req), &fd); + if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + END_CRIT_SECTION(); + return false; + } + else if (ret < 0) + elog(ERROR, "recvmsg failed: %m"); - for (request = requests; n > 0; request++, n--) - RememberFsyncRequest(request->rnode, request->forknum, request->segno); + if (req.contains_fd != (fd != -1)) + { + elog(FATAL, "message should have fd associated, but doesn't"); + } + if (req.type == CKPT_REQUEST_SYN) + { + received_syn_rqst = req.segno; + Assert(fd == -1); + } + else + { + RememberFsyncRequest(req.rnode, req.forknum, req.segno, fd); + } END_CRIT_SECTION(); - if (requests) - pfree(requests); + return true; } /* @@ -1402,3 +1311,42 @@ IncCheckpointSyncCycle(void) { return pg_atomic_fetch_add_u32(&CheckpointerShmem->ckpt_cycle, 1); } + +void +CountBackendWrite(void) +{ + pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_writes, 1); +} + +static void +SendFsyncRequest(CheckpointerRequest *request, int fd) +{ + ssize_t ret; + + while (true) + { + ret = pg_uds_send_with_fd(fsync_fds[FSYNC_FD_SUBMIT], request, sizeof(*request), + request->contains_fd ? fd : -1); + + if (ret >= 0) + { + /* + * Don't think short reads will ever happen in realistic + * implementations, but better make sure that's true... + */ + if (ret != sizeof(*request)) + elog(FATAL, "oops, gotta do better"); + break; + } + else if (errno == EWOULDBLOCK || errno == EAGAIN) + { + /* blocked on write - wait for socket to become readable */ + /* FIXME: postmaster death? Other interrupts? */ + WaitLatchOrSocket(NULL, WL_SOCKET_WRITEABLE, fsync_fds[FSYNC_FD_SUBMIT], -1, 0); + } + else + { + ereport(FATAL, (errmsg("could not receive fsync request: %m"))); + } + } +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index a4b53b33cd..135aa29bfe 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -70,6 +70,7 @@ #include #include #include +#include #include #include #include @@ -434,6 +435,7 @@ static pid_t StartChildProcess(AuxProcType type); static void StartAutovacuumWorker(void); static void MaybeStartWalReceiver(void); static void InitPostmasterDeathWatchHandle(void); +static void InitFsyncFdSocketPair(void); /* * Archiver is allowed to start up at the current postmaster state? @@ -568,6 +570,8 @@ int postmaster_alive_fds[2] = {-1, -1}; HANDLE PostmasterHandle; #endif +int fsync_fds[2] = {-1, -1}; + /* * Postmaster main entry point */ @@ -1195,6 +1199,11 @@ PostmasterMain(int argc, char *argv[]) */ InitPostmasterDeathWatchHandle(); + /* + * Initialize socket pair used to transport file descriptors over. + */ + InitFsyncFdSocketPair(); + #ifdef WIN32 /* @@ -6443,3 +6452,32 @@ InitPostmasterDeathWatchHandle(void) GetLastError()))); #endif /* WIN32 */ } + +/* Create socket used for requesting fsyncs by checkpointer */ +static void +InitFsyncFdSocketPair(void) +{ + Assert(MyProcPid == PostmasterPid); + if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fsync_fds) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create fsync sockets: %m"))); + + /* + * Set O_NONBLOCK on both fds. + */ + if (fcntl(fsync_fds[FSYNC_FD_PROCESS], F_SETFL, O_NONBLOCK) == -1) + ereport(FATAL, + (errcode_for_socket_access(), + errmsg_internal("could not set fsync process socket to nonblocking mode: %m"))); + + if (fcntl(fsync_fds[FSYNC_FD_SUBMIT], F_SETFL, O_NONBLOCK) == -1) + ereport(FATAL, + (errcode_for_socket_access(), + errmsg_internal("could not set fsync submit socket to nonblocking mode: %m"))); + + /* + * FIXME: do DuplicateHandle dance for windows - can that work + * trivially? + */ +} diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 555774320b..ae3a5bf023 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -142,8 +142,8 @@ typedef struct CycleCtr cycle_ctr; /* sync cycle of oldest request */ /* requests[f] has bit n set if we need to fsync segment n of fork f */ Bitmapset *requests[MAX_FORKNUM + 1]; - /* canceled[f] is true if we canceled fsyncs for fork "recently" */ - bool canceled[MAX_FORKNUM + 1]; + File *syncfds[MAX_FORKNUM + 1]; + int syncfd_len[MAX_FORKNUM + 1]; } PendingOperationEntry; typedef struct @@ -152,6 +152,8 @@ typedef struct CycleCtr cycle_ctr; /* mdckpt_cycle_ctr when request was made */ } PendingUnlinkEntry; +static uint32 open_fsync_queue_files = 0; +static bool mdsync_in_progress = false; static HTAB *pendingOpsTable = NULL; static List *pendingUnlinks = NIL; static MemoryContext pendingOpsCxt; /* context for the above */ @@ -196,6 +198,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno, BlockNumber blkno, bool skipFsync, int behavior); static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg); +static char *mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno); +static void mdsyncpass(bool include_current); /* @@ -1049,43 +1053,28 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum) } /* - * mdsync() -- Sync previous writes to stable storage. + * Do one pass over the the fsync request hashtable and perform the necessary + * fsyncs. Increments the mdsync cycle counter. + * + * If include_current is true perform all fsyncs (this is done if too many + * files are open), otherwise only perform the fsyncs belonging to the cycle + * valid at call time. */ -void -mdsync(void) +static void +mdsyncpass(bool include_current) { - static bool mdsync_in_progress = false; - HASH_SEQ_STATUS hstat; PendingOperationEntry *entry; int absorb_counter; /* Statistics on sync times */ - int processed = 0; instr_time sync_start, sync_end, sync_diff; uint64 elapsed; - uint64 longest = 0; - uint64 total_elapsed = 0; - - /* - * This is only called during checkpoints, and checkpoints should only - * occur in processes that have created a pendingOpsTable. - */ - if (!pendingOpsTable) - elog(ERROR, "cannot sync without a pendingOpsTable"); - - /* - * If we are in the checkpointer, the sync had better include all fsync - * requests that were queued by backends up to this point. The tightest - * race condition that could occur is that a buffer that must be written - * and fsync'd for the checkpoint could have been dumped by a backend just - * before it was visited by BufferSync(). We know the backend will have - * queued an fsync request before clearing the buffer's dirtybit, so we - * are safe as long as we do an Absorb after completing BufferSync(). - */ - AbsorbFsyncRequests(); + int processed = CheckpointStats.ckpt_sync_rels; + uint64 longest = CheckpointStats.ckpt_longest_sync; + uint64 total_elapsed = CheckpointStats.ckpt_agg_sync_time; /* * To avoid excess fsync'ing (in the worst case, maybe a never-terminating @@ -1133,17 +1122,27 @@ mdsync(void) while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL) { ForkNumber forknum; + bool has_remaining; /* - * If the entry is new then don't process it this time; it might - * contain multiple fsync-request bits, but they are all new. Note - * "continue" bypasses the hash-remove call at the bottom of the loop. + * If processing fsync requests because of too may file handles, close + * regardless of cycle. Otherwise nothing to be closed might be found, + * and we want to make room as quickly as possible so more requests + * can be absorbed. */ - if (entry->cycle_ctr == GetCheckpointSyncCycle()) - continue; + if (!include_current) + { + /* + * If the entry is new then don't process it this time; it might + * contain multiple fsync-request bits, but they are all new. Note + * "continue" bypasses the hash-remove call at the bottom of the loop. + */ + if (entry->cycle_ctr == GetCheckpointSyncCycle()) + continue; - /* Else assert we haven't missed it */ - Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle()); + /* Else assert we haven't missed it */ + Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle()); + } /* * Scan over the forks and segments represented by the entry. @@ -1158,158 +1157,144 @@ mdsync(void) */ for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) { - Bitmapset *requests = entry->requests[forknum]; int segno; - entry->requests[forknum] = NULL; - entry->canceled[forknum] = false; - - while ((segno = bms_first_member(requests)) >= 0) + segno = -1; + while ((segno = bms_next_member(entry->requests[forknum], segno)) >= 0) { - int failures; + int returnCode; /* - * If fsync is off then we don't have to bother opening the - * file at all. (We delay checking until this point so that - * changing fsync on the fly behaves sensibly.) + * Temporarily mark as processed. Have to do so before + * absorbing further requests, otherwise we might delete a new + * requests in a new cycle. */ - if (!enableFsync) - continue; + bms_del_member(entry->requests[forknum], segno); - /* - * If in checkpointer, we want to absorb pending requests - * every so often to prevent overflow of the fsync request - * queue. It is unspecified whether newly-added entries will - * be visited by hash_seq_search, but we don't care since we - * don't need to process them anyway. - */ - if (--absorb_counter <= 0) + if (entry->syncfd_len[forknum] <= segno || + entry->syncfds[forknum][segno] == -1) { - AbsorbFsyncRequests(); - absorb_counter = FSYNCS_PER_ABSORB; + /* + * Optionally open file, if we want to support not + * transporting fds as well. + */ + elog(FATAL, "file not opened"); } /* - * The fsync table could contain requests to fsync segments - * that have been deleted (unlinked) by the time we get to - * them. Rather than just hoping an ENOENT (or EACCES on - * Windows) error can be ignored, what we do on error is - * absorb pending requests and then retry. Since mdunlink() - * queues a "cancel" message before actually unlinking, the - * fsync request is guaranteed to be marked canceled after the - * absorb if it really was this case. DROP DATABASE likewise - * has to tell us to forget fsync requests before it starts - * deletions. + * If fsync is off then we don't have to bother opening the + * file at all. (We delay checking until this point so that + * changing fsync on the fly behaves sensibly.) + * + * XXX: Why is that an important goal? Doesn't give any + * interesting guarantees afaict? */ - for (failures = 0;; failures++) /* loop exits at "break" */ + if (enableFsync) { - SMgrRelation reln; - MdfdVec *seg; - char *path; - int save_errno; - /* - * Find or create an smgr hash entry for this relation. - * This may seem a bit unclean -- md calling smgr? But - * it's really the best solution. It ensures that the - * open file reference isn't permanently leaked if we get - * an error here. (You may say "but an unreferenced - * SMgrRelation is still a leak!" Not really, because the - * only case in which a checkpoint is done by a process - * that isn't about to shut down is in the checkpointer, - * and it will periodically do smgrcloseall(). This fact - * justifies our not closing the reln in the success path - * either, which is a good thing since in non-checkpointer - * cases we couldn't safely do that.) + * The fsync table could contain requests to fsync + * segments that have been deleted (unlinked) by the time + * we get to them. That used to be problematic, but now + * we have a filehandle to the deleted file. That means we + * might fsync an empty file superfluously, in a + * relatively tight window, which is acceptable. */ - reln = smgropen(entry->rnode, InvalidBackendId); - - /* Attempt to open and fsync the target segment */ - seg = _mdfd_getseg(reln, forknum, - (BlockNumber) segno * (BlockNumber) RELSEG_SIZE, - false, - EXTENSION_RETURN_NULL - | EXTENSION_DONT_CHECK_SIZE); INSTR_TIME_SET_CURRENT(sync_start); - if (seg != NULL && - FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) >= 0) - { - /* Success; update statistics about sync timing */ - INSTR_TIME_SET_CURRENT(sync_end); - sync_diff = sync_end; - INSTR_TIME_SUBTRACT(sync_diff, sync_start); - elapsed = INSTR_TIME_GET_MICROSEC(sync_diff); - if (elapsed > longest) - longest = elapsed; - total_elapsed += elapsed; - processed++; - if (log_checkpoints) - elog(DEBUG1, "checkpoint sync: number=%d file=%s time=%.3f msec", - processed, - FilePathName(seg->mdfd_vfd), - (double) elapsed / 1000); - - break; /* out of retry loop */ - } + returnCode = FileSync(entry->syncfds[forknum][segno], WAIT_EVENT_DATA_FILE_SYNC); - /* Compute file name for use in message */ - save_errno = errno; - path = _mdfd_segpath(reln, forknum, (BlockNumber) segno); - errno = save_errno; + if (returnCode < 0) + { + /* XXX: decide on policy */ + bms_add_member(entry->requests[forknum], segno); - /* - * It is possible that the relation has been dropped or - * truncated since the fsync request was entered. - * Therefore, allow ENOENT, but only if we didn't fail - * already on this file. This applies both for - * _mdfd_getseg() and for FileSync, since fd.c might have - * closed the file behind our back. - * - * XXX is there any point in allowing more than one retry? - * Don't see one at the moment, but easy to change the - * test here if so. - */ - if (!FILE_POSSIBLY_DELETED(errno) || - failures > 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not fsync file \"%s\": %m", - path))); - else + FilePathName(entry->syncfds[forknum][segno])))); + } + + /* Success; update statistics about sync timing */ + INSTR_TIME_SET_CURRENT(sync_end); + sync_diff = sync_end; + INSTR_TIME_SUBTRACT(sync_diff, sync_start); + elapsed = INSTR_TIME_GET_MICROSEC(sync_diff); + if (elapsed > longest) + longest = elapsed; + total_elapsed += elapsed; + processed++; + if (log_checkpoints) ereport(DEBUG1, - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\" but retrying: %m", - path))); - pfree(path); + (errmsg("checkpoint sync: number=%d file=%s time=%.3f msec", + processed, + FilePathName(entry->syncfds[forknum][segno]), + (double) elapsed / 1000), + errhidestmt(true), + errhidecontext(true))); + } + + /* + * It shouldn't be possible for a new request to arrive during + * the fsync (on error this will not be reached). + */ + Assert(!bms_is_member(segno, entry->requests[forknum])); + /* + * Close file. XXX: centralize code. + */ + { + open_fsync_queue_files--; + FileClose(entry->syncfds[forknum][segno]); + entry->syncfds[forknum][segno] = -1; + } + + /* + * If in checkpointer, we want to absorb pending requests every so + * often to prevent overflow of the fsync request queue. It is + * unspecified whether newly-added entries will be visited by + * hash_seq_search, but we don't care since we don't need to process + * them anyway. + */ + if (absorb_counter-- <= 0) + { /* - * Absorb incoming requests and check to see if a cancel - * arrived for this relation fork. + * Don't absorb if too many files are open. This pass will + * soon close some, so check again later. */ - AbsorbFsyncRequests(); - absorb_counter = FSYNCS_PER_ABSORB; /* might as well... */ - - if (entry->canceled[forknum]) - break; - } /* end retry loop */ + if (open_fsync_queue_files < ((max_safe_fds * 7) / 10)) + AbsorbFsyncRequests(); + absorb_counter = FSYNCS_PER_ABSORB; + } } - bms_free(requests); } /* - * We've finished everything that was requested before we started to - * scan the entry. If no new requests have been inserted meanwhile, - * remove the entry. Otherwise, update its cycle counter, as all the - * requests now in it must have arrived during this cycle. + * We've finished everything for the file that was requested before we + * started to scan the entry. If no new requests have been inserted + * meanwhile, remove the entry. Otherwise, update its cycle counter, + * as all the requests now in it must have arrived during this cycle. + * + * This needs to be checked separately from the above for-each-fork + * loop, as new requests for this relation could have been absorbed. */ + has_remaining = false; for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) { - if (entry->requests[forknum] != NULL) - break; + if (bms_is_empty(entry->requests[forknum])) + { + if (entry->syncfds[forknum]) + { + pfree(entry->syncfds[forknum]); + entry->syncfds[forknum] = NULL; + } + bms_free(entry->requests[forknum]); + entry->requests[forknum] = NULL; + } + else + has_remaining = true; } - if (forknum <= MAX_FORKNUM) + if (has_remaining) entry->cycle_ctr = GetCheckpointSyncCycle(); else { @@ -1320,13 +1305,69 @@ mdsync(void) } } /* end loop over hashtable entries */ - /* Return sync performance metrics for report at checkpoint end */ + /* Flag successful completion of mdsync */ + mdsync_in_progress = false; + + /* Maintain sync performance metrics for report at checkpoint end */ CheckpointStats.ckpt_sync_rels = processed; CheckpointStats.ckpt_longest_sync = longest; CheckpointStats.ckpt_agg_sync_time = total_elapsed; +} - /* Flag successful completion of mdsync */ - mdsync_in_progress = false; +/* + * mdsync() -- Sync previous writes to stable storage. + */ +void +mdsync(void) +{ + /* + * This is only called during checkpoints, and checkpoints should only + * occur in processes that have created a pendingOpsTable. + */ + if (!pendingOpsTable) + elog(ERROR, "cannot sync without a pendingOpsTable"); + + /* + * If we are in the checkpointer, the sync had better include all fsync + * requests that were queued by backends up to this point. The tightest + * race condition that could occur is that a buffer that must be written + * and fsync'd for the checkpoint could have been dumped by a backend just + * before it was visited by BufferSync(). We know the backend will have + * queued an fsync request before clearing the buffer's dirtybit, so we + * are safe as long as we do an Absorb after completing BufferSync(). + */ + AbsorbAllFsyncRequests(); + + mdsyncpass(false); +} + +/* + * Flush the fsync request queue enough to make sure there's room for at least + * one more entry. + */ +bool +FlushFsyncRequestQueueIfNecessary(void) +{ + if (mdsync_in_progress) + return false; + + while (true) + { + if (open_fsync_queue_files >= ((max_safe_fds * 7) / 10)) + { + elog(DEBUG1, + "flush fsync request queue due to %u open files", + open_fsync_queue_files); + mdsyncpass(true); + elog(DEBUG1, + "flushed fsync request, now at %u open files", + open_fsync_queue_files); + } + else + break; + } + + return true; } /* @@ -1411,12 +1452,38 @@ mdpostckpt(void) */ if (--absorb_counter <= 0) { - AbsorbFsyncRequests(); + /* XXX: Centralize this condition */ + if (open_fsync_queue_files < ((max_safe_fds * 7) / 10)) + AbsorbFsyncRequests(); absorb_counter = UNLINKS_PER_ABSORB; } } } + +/* + * Return the filename for the specified segment of the relation. The + * returned string is palloc'd. + */ +static char * +mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) +{ + char *path, + *fullpath; + + path = relpathperm(rnode, forknum); + + if (segno > 0) + { + fullpath = psprintf("%s.%u", path, segno); + pfree(path); + } + else + fullpath = path; + + return fullpath; +} + /* * register_dirty_segment() -- Mark a relation segment as needing fsync * @@ -1437,6 +1504,13 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) pg_memory_barrier(); cycle = GetCheckpointSyncCycle(); + /* + * For historical reasons checkpointer keeps track of the number of time + * backends perform writes themselves. + */ + if (!AmBackgroundWriterProcess()) + CountBackendWrite(); + /* * Don't repeatedly register the same segment as dirty. * @@ -1449,27 +1523,23 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg) if (pendingOpsTable) { - /* push it into local pending-ops table */ - RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno); - seg->mdfd_dirtied_cycle = cycle; - } - else - { - if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno)) - { - seg->mdfd_dirtied_cycle = cycle; - return; /* passed it off successfully */ - } - - ereport(DEBUG1, - (errmsg("could not forward fsync request because request queue is full"))); + int fd; - if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not fsync file \"%s\": %m", - FilePathName(seg->mdfd_vfd)))); + /* + * Push it into local pending-ops table. + * + * Gotta duplicate the fd - we can't have fd.c close it behind our + * back, as that'd lead to loosing error reporting guarantees on + * linux. RememberFsyncRequest() will manage the lifetime. + */ + ReleaseLruFiles(); + fd = dup(FileGetRawDesc(seg->mdfd_vfd)); + if (fd < 0) + elog(ERROR, "couldn't dup: %m"); + RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, fd); } + else + ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, seg->mdfd_vfd); } /* @@ -1491,21 +1561,14 @@ register_unlink(RelFileNodeBackend rnode) { /* push it into local pending-ops table */ RememberFsyncRequest(rnode.node, MAIN_FORKNUM, - UNLINK_RELATION_REQUEST); + UNLINK_RELATION_REQUEST, + -1); } else { - /* - * Notify the checkpointer about it. If we fail to queue the request - * message, we have to sleep and try again, because we can't simply - * delete the file now. Ugly, but hopefully won't happen often. - * - * XXX should we just leave the file orphaned instead? - */ + /* Notify the checkpointer about it. */ Assert(IsUnderPostmaster); - while (!ForwardFsyncRequest(rnode.node, MAIN_FORKNUM, - UNLINK_RELATION_REQUEST)) - pg_usleep(10000L); /* 10 msec seems a good number */ + ForwardFsyncRequest(rnode.node, MAIN_FORKNUM, UNLINK_RELATION_REQUEST, -1); } } @@ -1531,7 +1594,7 @@ register_unlink(RelFileNodeBackend rnode) * heavyweight operation anyhow, so we'll live with it.) */ void -RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) +RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno, int fd) { Assert(pendingOpsTable); @@ -1549,18 +1612,28 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) /* * We can't just delete the entry since mdsync could have an * active hashtable scan. Instead we delete the bitmapsets; this - * is safe because of the way mdsync is coded. We also set the - * "canceled" flags so that mdsync can tell that a cancel arrived - * for the fork(s). + * is safe because of the way mdsync is coded. */ if (forknum == InvalidForkNumber) { /* remove requests for all forks */ for (forknum = 0; forknum <= MAX_FORKNUM; forknum++) { + int segno; + bms_free(entry->requests[forknum]); entry->requests[forknum] = NULL; - entry->canceled[forknum] = true; + + for (segno = 0; segno < entry->syncfd_len[forknum]; segno++) + { + if (entry->syncfds[forknum][segno] != -1) + { + open_fsync_queue_files--; + FileClose(entry->syncfds[forknum][segno]); + entry->syncfds[forknum][segno] = -1; + } + } + } } else @@ -1568,7 +1641,16 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) /* remove requests for single fork */ bms_free(entry->requests[forknum]); entry->requests[forknum] = NULL; - entry->canceled[forknum] = true; + + for (segno = 0; segno < entry->syncfd_len[forknum]; segno++) + { + if (entry->syncfds[forknum][segno] != -1) + { + open_fsync_queue_files--; + FileClose(entry->syncfds[forknum][segno]); + entry->syncfds[forknum][segno] = -1; + } + } } } } @@ -1592,7 +1674,6 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) { bms_free(entry->requests[forknum]); entry->requests[forknum] = NULL; - entry->canceled[forknum] = true; } } } @@ -1646,7 +1727,8 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) { entry->cycle_ctr = GetCheckpointSyncCycle(); MemSet(entry->requests, 0, sizeof(entry->requests)); - MemSet(entry->canceled, 0, sizeof(entry->canceled)); + MemSet(entry->syncfds, 0, sizeof(entry->syncfds)); + MemSet(entry->syncfd_len, 0, sizeof(entry->syncfd_len)); } /* @@ -1658,6 +1740,57 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) entry->requests[forknum] = bms_add_member(entry->requests[forknum], (int) segno); + if (fd >= 0) + { + /* make space for entry */ + if (entry->syncfds[forknum] == NULL) + { + int i; + + entry->syncfds[forknum] = palloc(sizeof(File) * (segno + 1)); + entry->syncfd_len[forknum] = segno + 1; + + for (i = 0; i <= segno; i++) + entry->syncfds[forknum][i] = -1; + } + else if (entry->syncfd_len[forknum] <= segno) + { + int i; + + entry->syncfds[forknum] = repalloc(entry->syncfds[forknum], + sizeof(File) * (segno + 1)); + + /* initialize newly created entries */ + for (i = entry->syncfd_len[forknum]; i <= segno; i++) + entry->syncfds[forknum][i] = -1; + + entry->syncfd_len[forknum] = segno + 1; + } + + if (entry->syncfds[forknum][segno] == -1) + { + char *path = mdpath(entry->rnode, forknum, segno); + open_fsync_queue_files++; + /* caller must have reserved entry */ + entry->syncfds[forknum][segno] = + FileOpenForFd(fd, path); + pfree(path); + } + else + { + /* + * File is already open. Have to keep the older fd, errors + * might only be reported to it, thus close the one we just + * got. + * + * XXX: check for errrors. + */ + close(fd); + } + + FlushFsyncRequestQueueIfNecessary(); + } + MemoryContextSwitchTo(oldcxt); } } @@ -1674,22 +1807,12 @@ ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum) if (pendingOpsTable) { /* standalone backend or startup process: fsync state is local */ - RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC); + RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1); } else if (IsUnderPostmaster) { - /* - * Notify the checkpointer about it. If we fail to queue the cancel - * message, we have to sleep and try again ... ugly, but hopefully - * won't happen often. - * - * XXX should we CHECK_FOR_INTERRUPTS in this loop? Escaping with an - * error would leave the no-longer-used file still present on disk, - * which would be bad, so I'm inclined to assume that the checkpointer - * will always empty the queue soon. - */ - while (!ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC)) - pg_usleep(10000L); /* 10 msec seems a good number */ + /* Notify the checkpointer about it. */ + ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1); /* * Note we don't wait for the checkpointer to actually absorb the @@ -1713,14 +1836,12 @@ ForgetDatabaseFsyncRequests(Oid dbid) if (pendingOpsTable) { /* standalone backend or startup process: fsync state is local */ - RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC); + RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1); } else if (IsUnderPostmaster) { /* see notes in ForgetRelationFsyncRequests */ - while (!ForwardFsyncRequest(rnode, InvalidForkNumber, - FORGET_DATABASE_FSYNC)) - pg_usleep(10000L); /* 10 msec seems a good number */ + ForwardFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1); } } diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h index 87a5cfad41..58ba671a90 100644 --- a/src/include/postmaster/bgwriter.h +++ b/src/include/postmaster/bgwriter.h @@ -16,6 +16,7 @@ #define _BGWRITER_H #include "storage/block.h" +#include "storage/fd.h" #include "storage/relfilenode.h" @@ -31,9 +32,10 @@ extern void CheckpointerMain(void) pg_attribute_noreturn(); extern void RequestCheckpoint(int flags); extern void CheckpointWriteDelay(int flags, double progress); -extern bool ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, - BlockNumber segno); +extern void ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, + BlockNumber segno, File file); extern void AbsorbFsyncRequests(void); +extern void AbsorbAllFsyncRequests(void); extern Size CheckpointerShmemSize(void); extern void CheckpointerShmemInit(void); @@ -43,4 +45,6 @@ extern uint32 IncCheckpointSyncCycle(void); extern bool FirstCallSinceLastCheckpoint(void); +extern void CountBackendWrite(void); + #endif /* _BGWRITER_H */ diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 1877eef239..e2ba64e898 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -44,6 +44,11 @@ extern int postmaster_alive_fds[2]; #define POSTMASTER_FD_OWN 1 /* kept open by postmaster only */ #endif +#define FSYNC_FD_SUBMIT 0 +#define FSYNC_FD_PROCESS 1 + +extern int fsync_fds[2]; + extern PGDLLIMPORT const char *progname; extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn(); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 558e4d8518..798a965292 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -140,7 +140,8 @@ extern void mdpostckpt(void); extern void SetForwardFsyncRequests(void); extern void RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, - BlockNumber segno); + BlockNumber segno, int fd); +extern bool FlushFsyncRequestQueueIfNecessary(void); extern void ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum); extern void ForgetDatabaseFsyncRequests(Oid dbid);