Heavily-WIP: Send file descriptors to checkpointer for fsyncing. clone-fd-checkpointer
authorAndres Freund <[email protected]>
Mon, 21 May 2018 22:43:30 +0000 (15:43 -0700)
committerAndres Freund <[email protected]>
Mon, 21 May 2018 22:43:30 +0000 (15:43 -0700)
This addresses the issue that, at least on linux, fsyncs only reliably
see errors that occurred after they've been opeend.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:

src/backend/access/transam/xlog.c
src/backend/postmaster/checkpointer.c
src/backend/postmaster/postmaster.c
src/backend/storage/smgr/md.c
src/include/postmaster/bgwriter.h
src/include/postmaster/postmaster.h
src/include/storage/smgr.h

index adbd6a2126420441d63e73c2a71a3cfacd497624..427774152ebd9db2b856f0adafb6921eba4b7aaa 100644 (file)
@@ -8634,8 +8634,10 @@ CreateCheckPoint(int flags)
     * Note: because it is possible for log_checkpoints to change while a
     * checkpoint proceeds, we always accumulate stats, even if
     * log_checkpoints is currently off.
+    *
+    * Note #2: this is reset at the end of the checkpoint, not here, because
+    * we might have to fsync before getting here (see mdsync()).
     */
-   MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
    CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
 
    /*
@@ -8999,6 +9001,9 @@ CreateCheckPoint(int flags)
                                     CheckpointStats.ckpt_segs_recycled);
 
    LWLockRelease(CheckpointLock);
+
+   /* reset stats */
+   MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
 }
 
 /*
index 333eb91c9de1a3774d9f2afe06337c9d845d38a0..c2be529bca49f3b1aed974dc2c9b925aa4e691ac 100644 (file)
@@ -48,6 +48,7 @@
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/postmaster.h"
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
  *
  * The requests array holds fsync requests sent by backends and not yet
  * absorbed by the checkpointer.
- *
- * Unlike the checkpoint fields, num_backend_writes, num_backend_fsync, and
- * the requests fields are protected by CheckpointerCommLock.
  *----------
  */
 typedef struct
 {
+   uint32      type;
    RelFileNode rnode;
    ForkNumber  forknum;
    BlockNumber segno;          /* see md.c for special values */
+   bool        contains_fd;
    /* might add a real request-type field later; not needed yet */
 } CheckpointerRequest;
 
+#define CKPT_REQUEST_RNODE         1
+#define CKPT_REQUEST_SYN           2
+
 typedef struct
 {
    pid_t       checkpointer_pid;   /* PID (0 if not started) */
@@ -131,8 +134,6 @@ typedef struct
    pg_atomic_uint32 num_backend_fsync; /* counts user backend fsync calls */
    pg_atomic_uint32 ckpt_cycle; /* cycle */
 
-   int         num_requests;   /* current # of requests */
-   int         max_requests;   /* allocated array size */
    CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
 } CheckpointerShmemStruct;
 
@@ -168,13 +169,17 @@ static double ckpt_cached_elapsed;
 static pg_time_t last_checkpoint_time;
 static pg_time_t last_xlog_switch_time;
 
+static BlockNumber next_syn_rqst;
+static BlockNumber received_syn_rqst;
+
 /* Prototypes for private functions */
 
 static void CheckArchiveTimeout(void);
 static bool IsCheckpointOnSchedule(double progress);
 static bool ImmediateCheckpointRequested(void);
-static bool CompactCheckpointerRequestQueue(void);
 static void UpdateSharedMemoryConfig(void);
+static void SendFsyncRequest(CheckpointerRequest *request, int fd);
+static bool AbsorbFsyncRequest(void);
 
 /* Signal handlers */
 
@@ -557,10 +562,11 @@ CheckpointerMain(void)
            cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);
        }
 
-       rc = WaitLatch(MyLatch,
-                      WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-                      cur_timeout * 1000L /* convert to ms */ ,
-                      WAIT_EVENT_CHECKPOINTER_MAIN);
+       rc = WaitLatchOrSocket(MyLatch,
+                              WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
+                              fsync_fds[FSYNC_FD_PROCESS],
+                              cur_timeout * 1000L /* convert to ms */ ,
+                              WAIT_EVENT_CHECKPOINTER_MAIN);
 
        /*
         * Emergency bailout if postmaster has died.  This is to avoid the
@@ -910,12 +916,7 @@ CheckpointerShmemSize(void)
 {
    Size        size;
 
-   /*
-    * Currently, the size of the requests[] array is arbitrarily set equal to
-    * NBuffers.  This may prove too large or small ...
-    */
    size = offsetof(CheckpointerShmemStruct, requests);
-   size = add_size(size, mul_size(NBuffers, sizeof(CheckpointerRequest)));
 
    return size;
 }
@@ -938,13 +939,10 @@ CheckpointerShmemInit(void)
    if (!found)
    {
        /*
-        * First time through, so initialize.  Note that we zero the whole
-        * requests array; this is so that CompactCheckpointerRequestQueue can
-        * assume that any pad bytes in the request structs are zeroes.
+        * First time through, so initialize.
         */
        MemSet(CheckpointerShmem, 0, size);
        SpinLockInit(&CheckpointerShmem->ckpt_lck);
-       CheckpointerShmem->max_requests = NBuffers;
        pg_atomic_init_u32(&CheckpointerShmem->ckpt_cycle, 0);
        pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0);
        pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0);
@@ -1124,176 +1122,61 @@ RequestCheckpoint(int flags)
  * the queue is full and contains no duplicate entries.  In that case, we
  * let the backend know by returning false.
  */
-bool
-ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+void
+ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno,
+                   File file)
 {
-   CheckpointerRequest *request;
-   bool        too_full;
+   CheckpointerRequest request = {0};
 
    if (!IsUnderPostmaster)
-       return false;           /* probably shouldn't even get here */
+       elog(ERROR, "ForwardFsyncRequest must not be called in single user mode");
 
    if (AmCheckpointerProcess())
        elog(ERROR, "ForwardFsyncRequest must not be called in checkpointer");
 
-   LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
-   /*
-    * If the checkpointer isn't running or the request queue is full, the
-    * backend will have to perform its own fsync request.  But before forcing
-    * that to happen, we can try to compact the request queue.
-    */
-   if (CheckpointerShmem->checkpointer_pid == 0 ||
-       (CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests &&
-        !CompactCheckpointerRequestQueue()))
-   {
-       /*
-        * Count the subset of writes where backends have to do their own
-        * fsync
-        */
-       if (!AmBackgroundWriterProcess())
-           pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_fsync, 1);
-       LWLockRelease(CheckpointerCommLock);
-       return false;
-   }
-
-   /* OK, insert request */
-   request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
-   request->rnode = rnode;
-   request->forknum = forknum;
-   request->segno = segno;
-
-   /* If queue is more than half full, nudge the checkpointer to empty it */
-   too_full = (CheckpointerShmem->num_requests >=
-               CheckpointerShmem->max_requests / 2);
-
-   LWLockRelease(CheckpointerCommLock);
+   request.type = CKPT_REQUEST_RNODE;
+   request.rnode = rnode;
+   request.forknum = forknum;
+   request.segno = segno;
+   request.contains_fd = file != -1;
 
-   /* ... but not till after we release the lock */
-   if (too_full && ProcGlobal->checkpointerLatch)
-       SetLatch(ProcGlobal->checkpointerLatch);
-
-   return true;
+   SendFsyncRequest(&request, request.contains_fd ? FileGetRawDesc(file) : -1);
 }
 
 /*
- * CompactCheckpointerRequestQueue
- *     Remove duplicates from the request queue to avoid backend fsyncs.
- *     Returns "true" if any entries were removed.
- *
- * Although a full fsync request queue is not common, it can lead to severe
- * performance problems when it does happen.  So far, this situation has
- * only been observed to occur when the system is under heavy write load,
- * and especially during the "sync" phase of a checkpoint.  Without this
- * logic, each backend begins doing an fsync for every block written, which
- * gets very expensive and can slow down the whole system.
+ * AbsorbFsyncRequests
+ *     Retrieve queued fsync requests and pass them to local smgr. Stop when
+ *     resources would be exhausted by absorbing more.
  *
- * Trying to do this every time the queue is full could lose if there
- * aren't any removable entries.  But that should be vanishingly rare in
- * practice: there's one queue entry per shared buffer.
+ * This is exported because we want to continue accepting requests during
+ * mdsync().
  */
-static bool
-CompactCheckpointerRequestQueue(void)
+void
+AbsorbFsyncRequests(void)
 {
-   struct CheckpointerSlotMapping
-   {
-       CheckpointerRequest request;
-       int         slot;
-   };
-
-   int         n,
-               preserve_count;
-   int         num_skipped = 0;
-   HASHCTL     ctl;
-   HTAB       *htab;
-   bool       *skip_slot;
-
-   /* must hold CheckpointerCommLock in exclusive mode */
-   Assert(LWLockHeldByMe(CheckpointerCommLock));
-
-   /* Initialize skip_slot array */
-   skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
-
-   /* Initialize temporary hash table */
-   MemSet(&ctl, 0, sizeof(ctl));
-   ctl.keysize = sizeof(CheckpointerRequest);
-   ctl.entrysize = sizeof(struct CheckpointerSlotMapping);
-   ctl.hcxt = CurrentMemoryContext;
-
-   htab = hash_create("CompactCheckpointerRequestQueue",
-                      CheckpointerShmem->num_requests,
-                      &ctl,
-                      HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-   /*
-    * The basic idea here is that a request can be skipped if it's followed
-    * by a later, identical request.  It might seem more sensible to work
-    * backwards from the end of the queue and check whether a request is
-    * *preceded* by an earlier, identical request, in the hopes of doing less
-    * copying.  But that might change the semantics, if there's an
-    * intervening FORGET_RELATION_FSYNC or FORGET_DATABASE_FSYNC request, so
-    * we do it this way.  It would be possible to be even smarter if we made
-    * the code below understand the specific semantics of such requests (it
-    * could blow away preceding entries that would end up being canceled
-    * anyhow), but it's not clear that the extra complexity would buy us
-    * anything.
-    */
-   for (n = 0; n < CheckpointerShmem->num_requests; n++)
-   {
-       CheckpointerRequest *request;
-       struct CheckpointerSlotMapping *slotmap;
-       bool        found;
-
-       /*
-        * We use the request struct directly as a hashtable key.  This
-        * assumes that any padding bytes in the structs are consistently the
-        * same, which should be okay because we zeroed them in
-        * CheckpointerShmemInit.  Note also that RelFileNode had better
-        * contain no pad bytes.
-        */
-       request = &CheckpointerShmem->requests[n];
-       slotmap = hash_search(htab, request, HASH_ENTER, &found);
-       if (found)
-       {
-           /* Duplicate, so mark the previous occurrence as skippable */
-           skip_slot[slotmap->slot] = true;
-           num_skipped++;
-       }
-       /* Remember slot containing latest occurrence of this request value */
-       slotmap->slot = n;
-   }
+   if (!AmCheckpointerProcess())
+       return;
 
-   /* Done with the hash table. */
-   hash_destroy(htab);
+   /* Transfer stats counts into pending pgstats message */
+   BgWriterStats.m_buf_written_backend +=
+       pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
+   BgWriterStats.m_buf_fsync_backend +=
+       pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
 
-   /* If no duplicates, we're out of luck. */
-   if (!num_skipped)
+   while (true)
    {
-       pfree(skip_slot);
-       return false;
-   }
+       if (!FlushFsyncRequestQueueIfNecessary())
+           break;
 
-   /* We found some duplicates; remove them. */
-   preserve_count = 0;
-   for (n = 0; n < CheckpointerShmem->num_requests; n++)
-   {
-       if (skip_slot[n])
-           continue;
-       CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
+       if (!AbsorbFsyncRequest())
+           break;
    }
-   ereport(DEBUG1,
-           (errmsg("compacted fsync request queue from %d entries to %d entries",
-                   CheckpointerShmem->num_requests, preserve_count)));
-   CheckpointerShmem->num_requests = preserve_count;
-
-   /* Cleanup. */
-   pfree(skip_slot);
-   return true;
 }
 
 /*
- * AbsorbFsyncRequests
- *     Retrieve queued fsync requests and pass them to local smgr.
+ * AbsorbAllFsyncRequests
+ *     Retrieve all already pending fsync requests and pass them to local
+ *     smgr.
  *
  * This is exported because it must be called during CreateCheckPoint;
  * we have to be sure we have accepted all pending requests just before
@@ -1301,17 +1184,13 @@ CompactCheckpointerRequestQueue(void)
  * non-checkpointer processes, do nothing if not checkpointer.
  */
 void
-AbsorbFsyncRequests(void)
+AbsorbAllFsyncRequests(void)
 {
-   CheckpointerRequest *requests = NULL;
-   CheckpointerRequest *request;
-   int         n;
+   CheckpointerRequest request = {0};
 
    if (!AmCheckpointerProcess())
        return;
 
-   LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
    /* Transfer stats counts into pending pgstats message */
    BgWriterStats.m_buf_written_backend +=
        pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
@@ -1319,35 +1198,65 @@ AbsorbFsyncRequests(void)
        pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
 
    /*
-    * We try to avoid holding the lock for a long time by copying the request
-    * array, and processing the requests after releasing the lock.
-    *
-    * Once we have cleared the requests from shared memory, we have to PANIC
-    * if we then fail to absorb them (eg, because our hashtable runs out of
-    * memory).  This is because the system cannot run safely if we are unable
-    * to fsync what we have been told to fsync.  Fortunately, the hashtable
-    * is so small that the problem is quite unlikely to arise in practice.
+    * For mdsync()'s guarantees to work, all pending fsync requests need to
+    * be executed. But we don't want to absorb requests till the queue is
+    * empty, as that could take a long while.  So instead we enqueue
     */
-   n = CheckpointerShmem->num_requests;
-   if (n > 0)
+   request.type = CKPT_REQUEST_SYN;
+   request.segno = ++next_syn_rqst;
+   SendFsyncRequest(&request, -1);
+
+   received_syn_rqst = next_syn_rqst + 1;
+   while (received_syn_rqst != request.segno)
    {
-       requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
-       memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
+       if (!FlushFsyncRequestQueueIfNecessary())
+           elog(FATAL, "may not happen");
+
+       if (!AbsorbFsyncRequest())
+           break;
    }
+}
 
-   START_CRIT_SECTION();
+/*
+ * AbsorbFsyncRequest
+ *     Retrieve one queued fsync request and pass them to local smgr.
+ */
+static bool
+AbsorbFsyncRequest(void)
+{
+   CheckpointerRequest req;
+   int fd;
+   int ret;
 
-   CheckpointerShmem->num_requests = 0;
+   ReleaseLruFiles();
 
-   LWLockRelease(CheckpointerCommLock);
+   START_CRIT_SECTION();
+   ret = pg_uds_recv_with_fd(fsync_fds[FSYNC_FD_PROCESS], &req, sizeof(req), &fd);
+   if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+   {
+       END_CRIT_SECTION();
+       return false;
+   }
+   else if (ret < 0)
+       elog(ERROR, "recvmsg failed: %m");
 
-   for (request = requests; n > 0; request++, n--)
-       RememberFsyncRequest(request->rnode, request->forknum, request->segno);
+   if (req.contains_fd != (fd != -1))
+   {
+       elog(FATAL, "message should have fd associated, but doesn't");
+   }
 
+   if (req.type == CKPT_REQUEST_SYN)
+   {
+       received_syn_rqst = req.segno;
+       Assert(fd == -1);
+   }
+   else
+   {
+       RememberFsyncRequest(req.rnode, req.forknum, req.segno, fd);
+   }
    END_CRIT_SECTION();
 
-   if (requests)
-       pfree(requests);
+   return true;
 }
 
 /*
@@ -1402,3 +1311,42 @@ IncCheckpointSyncCycle(void)
 {
    return pg_atomic_fetch_add_u32(&CheckpointerShmem->ckpt_cycle, 1);
 }
+
+void
+CountBackendWrite(void)
+{
+   pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_writes, 1);
+}
+
+static void
+SendFsyncRequest(CheckpointerRequest *request, int fd)
+{
+   ssize_t ret;
+
+   while (true)
+   {
+       ret = pg_uds_send_with_fd(fsync_fds[FSYNC_FD_SUBMIT], request, sizeof(*request),
+                                 request->contains_fd ? fd : -1);
+
+       if (ret >= 0)
+       {
+           /*
+            * Don't think short reads will ever happen in realistic
+            * implementations, but better make sure that's true...
+            */
+           if (ret != sizeof(*request))
+               elog(FATAL, "oops, gotta do better");
+           break;
+       }
+       else if (errno == EWOULDBLOCK || errno == EAGAIN)
+       {
+           /* blocked on write - wait for socket to become readable */
+           /* FIXME: postmaster death? Other interrupts? */
+           WaitLatchOrSocket(NULL, WL_SOCKET_WRITEABLE, fsync_fds[FSYNC_FD_SUBMIT], -1, 0);
+       }
+       else
+       {
+           ereport(FATAL, (errmsg("could not receive fsync request: %m")));
+       }
+   }
+}
index a4b53b33cdde912cc0d4639d8c7d27c0ae6693de..135aa29bfeb5cf57fc606bdea3094de738fdadb3 100644 (file)
@@ -70,6 +70,7 @@
 #include <time.h>
 #include <sys/wait.h>
 #include <ctype.h>
+#include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <fcntl.h>
@@ -434,6 +435,7 @@ static pid_t StartChildProcess(AuxProcType type);
 static void StartAutovacuumWorker(void);
 static void MaybeStartWalReceiver(void);
 static void InitPostmasterDeathWatchHandle(void);
+static void InitFsyncFdSocketPair(void);
 
 /*
  * Archiver is allowed to start up at the current postmaster state?
@@ -568,6 +570,8 @@ int         postmaster_alive_fds[2] = {-1, -1};
 HANDLE     PostmasterHandle;
 #endif
 
+int            fsync_fds[2] = {-1, -1};
+
 /*
  * Postmaster main entry point
  */
@@ -1195,6 +1199,11 @@ PostmasterMain(int argc, char *argv[])
     */
    InitPostmasterDeathWatchHandle();
 
+   /*
+    * Initialize socket pair used to transport file descriptors over.
+    */
+   InitFsyncFdSocketPair();
+
 #ifdef WIN32
 
    /*
@@ -6443,3 +6452,32 @@ InitPostmasterDeathWatchHandle(void)
                                 GetLastError())));
 #endif                         /* WIN32 */
 }
+
+/* Create socket used for requesting fsyncs by checkpointer */
+static void
+InitFsyncFdSocketPair(void)
+{
+   Assert(MyProcPid == PostmasterPid);
+   if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fsync_fds) < 0)
+       ereport(FATAL,
+               (errcode_for_file_access(),
+                errmsg_internal("could not create fsync sockets: %m")));
+
+   /*
+    * Set O_NONBLOCK on both fds.
+    */
+   if (fcntl(fsync_fds[FSYNC_FD_PROCESS], F_SETFL, O_NONBLOCK) == -1)
+       ereport(FATAL,
+               (errcode_for_socket_access(),
+                errmsg_internal("could not set fsync process socket to nonblocking mode: %m")));
+
+   if (fcntl(fsync_fds[FSYNC_FD_SUBMIT], F_SETFL, O_NONBLOCK) == -1)
+       ereport(FATAL,
+               (errcode_for_socket_access(),
+                errmsg_internal("could not set fsync submit socket to nonblocking mode: %m")));
+
+   /*
+    * FIXME: do DuplicateHandle dance for windows - can that work
+    * trivially?
+    */
+}
index 555774320b51e98e1cfd9b1393f1be9f0c0b34e4..ae3a5bf023f7c0eb03de1d9096778a701040b4e7 100644 (file)
@@ -142,8 +142,8 @@ typedef struct
    CycleCtr    cycle_ctr;      /* sync cycle of oldest request */
    /* requests[f] has bit n set if we need to fsync segment n of fork f */
    Bitmapset  *requests[MAX_FORKNUM + 1];
-   /* canceled[f] is true if we canceled fsyncs for fork "recently" */
-   bool        canceled[MAX_FORKNUM + 1];
+   File       *syncfds[MAX_FORKNUM + 1];
+   int         syncfd_len[MAX_FORKNUM + 1];
 } PendingOperationEntry;
 
 typedef struct
@@ -152,6 +152,8 @@ typedef struct
    CycleCtr    cycle_ctr;      /* mdckpt_cycle_ctr when request was made */
 } PendingUnlinkEntry;
 
+static uint32 open_fsync_queue_files = 0;
+static bool mdsync_in_progress = false;
 static HTAB *pendingOpsTable = NULL;
 static List *pendingUnlinks = NIL;
 static MemoryContext pendingOpsCxt; /* context for the above  */
@@ -196,6 +198,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
             BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
           MdfdVec *seg);
+static char *mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno);
+static void mdsyncpass(bool include_current);
 
 
 /*
@@ -1049,43 +1053,28 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
 }
 
 /*
- * mdsync() -- Sync previous writes to stable storage.
+ * Do one pass over the the fsync request hashtable and perform the necessary
+ * fsyncs. Increments the mdsync cycle counter.
+ *
+ * If include_current is true perform all fsyncs (this is done if too many
+ * files are open), otherwise only perform the fsyncs belonging to the cycle
+ * valid at call time.
  */
-void
-mdsync(void)
+static void
+mdsyncpass(bool include_current)
 {
-   static bool mdsync_in_progress = false;
-
    HASH_SEQ_STATUS hstat;
    PendingOperationEntry *entry;
    int         absorb_counter;
 
    /* Statistics on sync times */
-   int         processed = 0;
    instr_time  sync_start,
                sync_end,
                sync_diff;
    uint64      elapsed;
-   uint64      longest = 0;
-   uint64      total_elapsed = 0;
-
-   /*
-    * This is only called during checkpoints, and checkpoints should only
-    * occur in processes that have created a pendingOpsTable.
-    */
-   if (!pendingOpsTable)
-       elog(ERROR, "cannot sync without a pendingOpsTable");
-
-   /*
-    * If we are in the checkpointer, the sync had better include all fsync
-    * requests that were queued by backends up to this point.  The tightest
-    * race condition that could occur is that a buffer that must be written
-    * and fsync'd for the checkpoint could have been dumped by a backend just
-    * before it was visited by BufferSync().  We know the backend will have
-    * queued an fsync request before clearing the buffer's dirtybit, so we
-    * are safe as long as we do an Absorb after completing BufferSync().
-    */
-   AbsorbFsyncRequests();
+   int         processed = CheckpointStats.ckpt_sync_rels;
+   uint64      longest = CheckpointStats.ckpt_longest_sync;
+   uint64      total_elapsed = CheckpointStats.ckpt_agg_sync_time;
 
    /*
     * To avoid excess fsync'ing (in the worst case, maybe a never-terminating
@@ -1133,17 +1122,27 @@ mdsync(void)
    while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL)
    {
        ForkNumber  forknum;
+       bool has_remaining;
 
        /*
-        * If the entry is new then don't process it this time; it might
-        * contain multiple fsync-request bits, but they are all new.  Note
-        * "continue" bypasses the hash-remove call at the bottom of the loop.
+        * If processing fsync requests because of too may file handles, close
+        * regardless of cycle. Otherwise nothing to be closed might be found,
+        * and we want to make room as quickly as possible so more requests
+        * can be absorbed.
         */
-       if (entry->cycle_ctr == GetCheckpointSyncCycle())
-           continue;
+       if (!include_current)
+       {
+           /*
+            * If the entry is new then don't process it this time; it might
+            * contain multiple fsync-request bits, but they are all new.  Note
+            * "continue" bypasses the hash-remove call at the bottom of the loop.
+            */
+           if (entry->cycle_ctr == GetCheckpointSyncCycle())
+               continue;
 
-       /* Else assert we haven't missed it */
-       Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+           /* Else assert we haven't missed it */
+           Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+       }
 
        /*
         * Scan over the forks and segments represented by the entry.
@@ -1158,158 +1157,144 @@ mdsync(void)
         */
        for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
        {
-           Bitmapset  *requests = entry->requests[forknum];
            int         segno;
 
-           entry->requests[forknum] = NULL;
-           entry->canceled[forknum] = false;
-
-           while ((segno = bms_first_member(requests)) >= 0)
+           segno = -1;
+           while ((segno = bms_next_member(entry->requests[forknum], segno)) >= 0)
            {
-               int         failures;
+               int         returnCode;
 
                /*
-                * If fsync is off then we don't have to bother opening the
-                * file at all.  (We delay checking until this point so that
-                * changing fsync on the fly behaves sensibly.)
+                * Temporarily mark as processed. Have to do so before
+                * absorbing further requests, otherwise we might delete a new
+                * requests in a new cycle.
                 */
-               if (!enableFsync)
-                   continue;
+               bms_del_member(entry->requests[forknum], segno);
 
-               /*
-                * If in checkpointer, we want to absorb pending requests
-                * every so often to prevent overflow of the fsync request
-                * queue.  It is unspecified whether newly-added entries will
-                * be visited by hash_seq_search, but we don't care since we
-                * don't need to process them anyway.
-                */
-               if (--absorb_counter <= 0)
+               if (entry->syncfd_len[forknum] <= segno ||
+                   entry->syncfds[forknum][segno] == -1)
                {
-                   AbsorbFsyncRequests();
-                   absorb_counter = FSYNCS_PER_ABSORB;
+                   /*
+                    * Optionally open file, if we want to support not
+                    * transporting fds as well.
+                    */
+                   elog(FATAL, "file not opened");
                }
 
                /*
-                * The fsync table could contain requests to fsync segments
-                * that have been deleted (unlinked) by the time we get to
-                * them. Rather than just hoping an ENOENT (or EACCES on
-                * Windows) error can be ignored, what we do on error is
-                * absorb pending requests and then retry.  Since mdunlink()
-                * queues a "cancel" message before actually unlinking, the
-                * fsync request is guaranteed to be marked canceled after the
-                * absorb if it really was this case. DROP DATABASE likewise
-                * has to tell us to forget fsync requests before it starts
-                * deletions.
+                * If fsync is off then we don't have to bother opening the
+                * file at all.  (We delay checking until this point so that
+                * changing fsync on the fly behaves sensibly.)
+                *
+                * XXX: Why is that an important goal? Doesn't give any
+                * interesting guarantees afaict?
                 */
-               for (failures = 0;; failures++) /* loop exits at "break" */
+               if (enableFsync)
                {
-                   SMgrRelation reln;
-                   MdfdVec    *seg;
-                   char       *path;
-                   int         save_errno;
-
                    /*
-                    * Find or create an smgr hash entry for this relation.
-                    * This may seem a bit unclean -- md calling smgr?  But
-                    * it's really the best solution.  It ensures that the
-                    * open file reference isn't permanently leaked if we get
-                    * an error here. (You may say "but an unreferenced
-                    * SMgrRelation is still a leak!" Not really, because the
-                    * only case in which a checkpoint is done by a process
-                    * that isn't about to shut down is in the checkpointer,
-                    * and it will periodically do smgrcloseall(). This fact
-                    * justifies our not closing the reln in the success path
-                    * either, which is a good thing since in non-checkpointer
-                    * cases we couldn't safely do that.)
+                    * The fsync table could contain requests to fsync
+                    * segments that have been deleted (unlinked) by the time
+                    * we get to them.  That used to be problematic, but now
+                    * we have a filehandle to the deleted file. That means we
+                    * might fsync an empty file superfluously, in a
+                    * relatively tight window, which is acceptable.
                     */
-                   reln = smgropen(entry->rnode, InvalidBackendId);
-
-                   /* Attempt to open and fsync the target segment */
-                   seg = _mdfd_getseg(reln, forknum,
-                                      (BlockNumber) segno * (BlockNumber) RELSEG_SIZE,
-                                      false,
-                                      EXTENSION_RETURN_NULL
-                                      | EXTENSION_DONT_CHECK_SIZE);
 
                    INSTR_TIME_SET_CURRENT(sync_start);
 
-                   if (seg != NULL &&
-                       FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) >= 0)
-                   {
-                       /* Success; update statistics about sync timing */
-                       INSTR_TIME_SET_CURRENT(sync_end);
-                       sync_diff = sync_end;
-                       INSTR_TIME_SUBTRACT(sync_diff, sync_start);
-                       elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
-                       if (elapsed > longest)
-                           longest = elapsed;
-                       total_elapsed += elapsed;
-                       processed++;
-                       if (log_checkpoints)
-                           elog(DEBUG1, "checkpoint sync: number=%d file=%s time=%.3f msec",
-                                processed,
-                                FilePathName(seg->mdfd_vfd),
-                                (double) elapsed / 1000);
-
-                       break;  /* out of retry loop */
-                   }
+                   returnCode = FileSync(entry->syncfds[forknum][segno], WAIT_EVENT_DATA_FILE_SYNC);
 
-                   /* Compute file name for use in message */
-                   save_errno = errno;
-                   path = _mdfd_segpath(reln, forknum, (BlockNumber) segno);
-                   errno = save_errno;
+                   if (returnCode < 0)
+                   {
+                       /* XXX: decide on policy */
+                       bms_add_member(entry->requests[forknum], segno);
 
-                   /*
-                    * It is possible that the relation has been dropped or
-                    * truncated since the fsync request was entered.
-                    * Therefore, allow ENOENT, but only if we didn't fail
-                    * already on this file.  This applies both for
-                    * _mdfd_getseg() and for FileSync, since fd.c might have
-                    * closed the file behind our back.
-                    *
-                    * XXX is there any point in allowing more than one retry?
-                    * Don't see one at the moment, but easy to change the
-                    * test here if so.
-                    */
-                   if (!FILE_POSSIBLY_DELETED(errno) ||
-                       failures > 0)
                        ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not fsync file \"%s\": %m",
-                                       path)));
-                   else
+                                       FilePathName(entry->syncfds[forknum][segno]))));
+                   }
+
+                   /* Success; update statistics about sync timing */
+                   INSTR_TIME_SET_CURRENT(sync_end);
+                   sync_diff = sync_end;
+                   INSTR_TIME_SUBTRACT(sync_diff, sync_start);
+                   elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
+                   if (elapsed > longest)
+                       longest = elapsed;
+                   total_elapsed += elapsed;
+                   processed++;
+                   if (log_checkpoints)
                        ereport(DEBUG1,
-                               (errcode_for_file_access(),
-                                errmsg("could not fsync file \"%s\" but retrying: %m",
-                                       path)));
-                   pfree(path);
+                               (errmsg("checkpoint sync: number=%d file=%s time=%.3f msec",
+                                       processed,
+                                       FilePathName(entry->syncfds[forknum][segno]),
+                                       (double) elapsed / 1000),
+                                errhidestmt(true),
+                                errhidecontext(true)));
+               }
+
+               /*
+                * It shouldn't be possible for a new request to arrive during
+                * the fsync (on error this will not be reached).
+                */
+               Assert(!bms_is_member(segno, entry->requests[forknum]));
 
+               /*
+                * Close file.  XXX: centralize code.
+                */
+               {
+                   open_fsync_queue_files--;
+                   FileClose(entry->syncfds[forknum][segno]);
+                   entry->syncfds[forknum][segno] = -1;
+               }
+
+               /*
+                * If in checkpointer, we want to absorb pending requests every so
+                * often to prevent overflow of the fsync request queue.  It is
+                * unspecified whether newly-added entries will be visited by
+                * hash_seq_search, but we don't care since we don't need to process
+                * them anyway.
+                */
+               if (absorb_counter-- <= 0)
+               {
                    /*
-                    * Absorb incoming requests and check to see if a cancel
-                    * arrived for this relation fork.
+                    * Don't absorb if too many files are open. This pass will
+                    * soon close some, so check again later.
                     */
-                   AbsorbFsyncRequests();
-                   absorb_counter = FSYNCS_PER_ABSORB; /* might as well... */
-
-                   if (entry->canceled[forknum])
-                       break;
-               }               /* end retry loop */
+                   if (open_fsync_queue_files < ((max_safe_fds * 7) / 10))
+                       AbsorbFsyncRequests();
+                   absorb_counter = FSYNCS_PER_ABSORB;
+               }
            }
-           bms_free(requests);
        }
 
        /*
-        * We've finished everything that was requested before we started to
-        * scan the entry.  If no new requests have been inserted meanwhile,
-        * remove the entry.  Otherwise, update its cycle counter, as all the
-        * requests now in it must have arrived during this cycle.
+        * We've finished everything for the file that was requested before we
+        * started to scan the entry.  If no new requests have been inserted
+        * meanwhile, remove the entry.  Otherwise, update its cycle counter,
+        * as all the requests now in it must have arrived during this cycle.
+        *
+        * This needs to be checked separately from the above for-each-fork
+        * loop, as new requests for this relation could have been absorbed.
         */
+       has_remaining = false;
        for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
        {
-           if (entry->requests[forknum] != NULL)
-               break;
+           if (bms_is_empty(entry->requests[forknum]))
+           {
+               if (entry->syncfds[forknum])
+               {
+                   pfree(entry->syncfds[forknum]);
+                   entry->syncfds[forknum] = NULL;
+               }
+               bms_free(entry->requests[forknum]);
+               entry->requests[forknum] = NULL;
+           }
+           else
+               has_remaining = true;
        }
-       if (forknum <= MAX_FORKNUM)
+       if (has_remaining)
            entry->cycle_ctr = GetCheckpointSyncCycle();
        else
        {
@@ -1320,13 +1305,69 @@ mdsync(void)
        }
    }                           /* end loop over hashtable entries */
 
-   /* Return sync performance metrics for report at checkpoint end */
+   /* Flag successful completion of mdsync */
+   mdsync_in_progress = false;
+
+   /* Maintain sync performance metrics for report at checkpoint end */
    CheckpointStats.ckpt_sync_rels = processed;
    CheckpointStats.ckpt_longest_sync = longest;
    CheckpointStats.ckpt_agg_sync_time = total_elapsed;
+}
 
-   /* Flag successful completion of mdsync */
-   mdsync_in_progress = false;
+/*
+ * mdsync() -- Sync previous writes to stable storage.
+ */
+void
+mdsync(void)
+{
+   /*
+    * This is only called during checkpoints, and checkpoints should only
+    * occur in processes that have created a pendingOpsTable.
+    */
+   if (!pendingOpsTable)
+       elog(ERROR, "cannot sync without a pendingOpsTable");
+
+   /*
+    * If we are in the checkpointer, the sync had better include all fsync
+    * requests that were queued by backends up to this point.  The tightest
+    * race condition that could occur is that a buffer that must be written
+    * and fsync'd for the checkpoint could have been dumped by a backend just
+    * before it was visited by BufferSync().  We know the backend will have
+    * queued an fsync request before clearing the buffer's dirtybit, so we
+    * are safe as long as we do an Absorb after completing BufferSync().
+    */
+   AbsorbAllFsyncRequests();
+
+   mdsyncpass(false);
+}
+
+/*
+ * Flush the fsync request queue enough to make sure there's room for at least
+ * one more entry.
+ */
+bool
+FlushFsyncRequestQueueIfNecessary(void)
+{
+   if (mdsync_in_progress)
+       return false;
+
+   while (true)
+   {
+       if (open_fsync_queue_files >= ((max_safe_fds * 7) / 10))
+       {
+           elog(DEBUG1,
+                "flush fsync request queue due to %u open files",
+                open_fsync_queue_files);
+           mdsyncpass(true);
+           elog(DEBUG1,
+                "flushed fsync request, now at %u open files",
+                open_fsync_queue_files);
+       }
+       else
+           break;
+   }
+
+   return true;
 }
 
 /*
@@ -1411,12 +1452,38 @@ mdpostckpt(void)
         */
        if (--absorb_counter <= 0)
        {
-           AbsorbFsyncRequests();
+           /* XXX: Centralize this condition */
+           if (open_fsync_queue_files < ((max_safe_fds * 7) / 10))
+               AbsorbFsyncRequests();
            absorb_counter = UNLINKS_PER_ABSORB;
        }
    }
 }
 
+
+/*
+ * Return the filename for the specified segment of the relation. The
+ * returned string is palloc'd.
+ */
+static char *
+mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+{
+   char       *path,
+              *fullpath;
+
+   path = relpathperm(rnode, forknum);
+
+   if (segno > 0)
+   {
+       fullpath = psprintf("%s.%u", path, segno);
+       pfree(path);
+   }
+   else
+       fullpath = path;
+
+   return fullpath;
+}
+
 /*
  * register_dirty_segment() -- Mark a relation segment as needing fsync
  *
@@ -1437,6 +1504,13 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
    pg_memory_barrier();
    cycle = GetCheckpointSyncCycle();
 
+   /*
+    * For historical reasons checkpointer keeps track of the number of time
+    * backends perform writes themselves.
+    */
+   if (!AmBackgroundWriterProcess())
+       CountBackendWrite();
+
    /*
     * Don't repeatedly register the same segment as dirty.
     *
@@ -1449,27 +1523,23 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 
    if (pendingOpsTable)
    {
-       /* push it into local pending-ops table */
-       RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno);
-       seg->mdfd_dirtied_cycle = cycle;
-   }
-   else
-   {
-       if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno))
-       {
-           seg->mdfd_dirtied_cycle = cycle;
-           return;             /* passed it off successfully */
-       }
-
-       ereport(DEBUG1,
-               (errmsg("could not forward fsync request because request queue is full")));
+       int fd;
 
-       if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0)
-           ereport(ERROR,
-                   (errcode_for_file_access(),
-                    errmsg("could not fsync file \"%s\": %m",
-                           FilePathName(seg->mdfd_vfd))));
+       /*
+        * Push it into local pending-ops table.
+        *
+        * Gotta duplicate the fd - we can't have fd.c close it behind our
+        * back, as that'd lead to loosing error reporting guarantees on
+        * linux. RememberFsyncRequest() will manage the lifetime.
+        */
+       ReleaseLruFiles();
+       fd = dup(FileGetRawDesc(seg->mdfd_vfd));
+       if (fd < 0)
+           elog(ERROR, "couldn't dup: %m");
+       RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, fd);
    }
+   else
+       ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, seg->mdfd_vfd);
 }
 
 /*
@@ -1491,21 +1561,14 @@ register_unlink(RelFileNodeBackend rnode)
    {
        /* push it into local pending-ops table */
        RememberFsyncRequest(rnode.node, MAIN_FORKNUM,
-                            UNLINK_RELATION_REQUEST);
+                            UNLINK_RELATION_REQUEST,
+                            -1);
    }
    else
    {
-       /*
-        * Notify the checkpointer about it.  If we fail to queue the request
-        * message, we have to sleep and try again, because we can't simply
-        * delete the file now.  Ugly, but hopefully won't happen often.
-        *
-        * XXX should we just leave the file orphaned instead?
-        */
+       /* Notify the checkpointer about it. */
        Assert(IsUnderPostmaster);
-       while (!ForwardFsyncRequest(rnode.node, MAIN_FORKNUM,
-                                   UNLINK_RELATION_REQUEST))
-           pg_usleep(10000L);  /* 10 msec seems a good number */
+       ForwardFsyncRequest(rnode.node, MAIN_FORKNUM, UNLINK_RELATION_REQUEST, -1);
    }
 }
 
@@ -1531,7 +1594,7 @@ register_unlink(RelFileNodeBackend rnode)
  * heavyweight operation anyhow, so we'll live with it.)
  */
 void
-RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno, int fd)
 {
    Assert(pendingOpsTable);
 
@@ -1549,18 +1612,28 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
            /*
             * We can't just delete the entry since mdsync could have an
             * active hashtable scan.  Instead we delete the bitmapsets; this
-            * is safe because of the way mdsync is coded.  We also set the
-            * "canceled" flags so that mdsync can tell that a cancel arrived
-            * for the fork(s).
+            * is safe because of the way mdsync is coded.
             */
            if (forknum == InvalidForkNumber)
            {
                /* remove requests for all forks */
                for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
                {
+                   int segno;
+
                    bms_free(entry->requests[forknum]);
                    entry->requests[forknum] = NULL;
-                   entry->canceled[forknum] = true;
+
+                   for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+                   {
+                       if (entry->syncfds[forknum][segno] != -1)
+                       {
+                           open_fsync_queue_files--;
+                           FileClose(entry->syncfds[forknum][segno]);
+                           entry->syncfds[forknum][segno] = -1;
+                       }
+                   }
+
                }
            }
            else
@@ -1568,7 +1641,16 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
                /* remove requests for single fork */
                bms_free(entry->requests[forknum]);
                entry->requests[forknum] = NULL;
-               entry->canceled[forknum] = true;
+
+               for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+               {
+                   if (entry->syncfds[forknum][segno] != -1)
+                   {
+                       open_fsync_queue_files--;
+                       FileClose(entry->syncfds[forknum][segno]);
+                       entry->syncfds[forknum][segno] = -1;
+                   }
+               }
            }
        }
    }
@@ -1592,7 +1674,6 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
                {
                    bms_free(entry->requests[forknum]);
                    entry->requests[forknum] = NULL;
-                   entry->canceled[forknum] = true;
                }
            }
        }
@@ -1646,7 +1727,8 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
        {
            entry->cycle_ctr = GetCheckpointSyncCycle();
            MemSet(entry->requests, 0, sizeof(entry->requests));
-           MemSet(entry->canceled, 0, sizeof(entry->canceled));
+           MemSet(entry->syncfds, 0, sizeof(entry->syncfds));
+           MemSet(entry->syncfd_len, 0, sizeof(entry->syncfd_len));
        }
 
        /*
@@ -1658,6 +1740,57 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
        entry->requests[forknum] = bms_add_member(entry->requests[forknum],
                                                  (int) segno);
 
+       if (fd >= 0)
+       {
+           /* make space for entry */
+           if (entry->syncfds[forknum] == NULL)
+           {
+               int i;
+
+               entry->syncfds[forknum] = palloc(sizeof(File) * (segno + 1));
+               entry->syncfd_len[forknum] = segno + 1;
+
+               for (i = 0; i <= segno; i++)
+                   entry->syncfds[forknum][i] = -1;
+           }
+           else  if (entry->syncfd_len[forknum] <= segno)
+           {
+               int i;
+
+               entry->syncfds[forknum] = repalloc(entry->syncfds[forknum],
+                                                  sizeof(File) * (segno + 1));
+
+               /* initialize newly created entries */
+               for (i = entry->syncfd_len[forknum]; i <= segno; i++)
+                   entry->syncfds[forknum][i] = -1;
+
+               entry->syncfd_len[forknum] = segno + 1;
+           }
+
+           if (entry->syncfds[forknum][segno] == -1)
+           {
+               char *path = mdpath(entry->rnode, forknum, segno);
+               open_fsync_queue_files++;
+               /* caller must have reserved entry */
+               entry->syncfds[forknum][segno] =
+                   FileOpenForFd(fd, path);
+               pfree(path);
+           }
+           else
+           {
+               /*
+                * File is already open. Have to keep the older fd, errors
+                * might only be reported to it, thus close the one we just
+                * got.
+                *
+                * XXX: check for errrors.
+                */
+               close(fd);
+           }
+
+           FlushFsyncRequestQueueIfNecessary();
+       }
+
        MemoryContextSwitchTo(oldcxt);
    }
 }
@@ -1674,22 +1807,12 @@ ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum)
    if (pendingOpsTable)
    {
        /* standalone backend or startup process: fsync state is local */
-       RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC);
+       RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
    }
    else if (IsUnderPostmaster)
    {
-       /*
-        * Notify the checkpointer about it.  If we fail to queue the cancel
-        * message, we have to sleep and try again ... ugly, but hopefully
-        * won't happen often.
-        *
-        * XXX should we CHECK_FOR_INTERRUPTS in this loop?  Escaping with an
-        * error would leave the no-longer-used file still present on disk,
-        * which would be bad, so I'm inclined to assume that the checkpointer
-        * will always empty the queue soon.
-        */
-       while (!ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC))
-           pg_usleep(10000L);  /* 10 msec seems a good number */
+       /* Notify the checkpointer about it. */
+       ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
 
        /*
         * Note we don't wait for the checkpointer to actually absorb the
@@ -1713,14 +1836,12 @@ ForgetDatabaseFsyncRequests(Oid dbid)
    if (pendingOpsTable)
    {
        /* standalone backend or startup process: fsync state is local */
-       RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC);
+       RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
    }
    else if (IsUnderPostmaster)
    {
        /* see notes in ForgetRelationFsyncRequests */
-       while (!ForwardFsyncRequest(rnode, InvalidForkNumber,
-                                   FORGET_DATABASE_FSYNC))
-           pg_usleep(10000L);  /* 10 msec seems a good number */
+       ForwardFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
    }
 }
 
index 87a5cfad41517aa39351dd0eca407affae6f02cb..58ba671a9071374f22901318c0753fdd8cdbc9fe 100644 (file)
@@ -16,6 +16,7 @@
 #define _BGWRITER_H
 
 #include "storage/block.h"
+#include "storage/fd.h"
 #include "storage/relfilenode.h"
 
 
@@ -31,9 +32,10 @@ extern void CheckpointerMain(void) pg_attribute_noreturn();
 extern void RequestCheckpoint(int flags);
 extern void CheckpointWriteDelay(int flags, double progress);
 
-extern bool ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum,
-                   BlockNumber segno);
+extern void ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum,
+                               BlockNumber segno, File file);
 extern void AbsorbFsyncRequests(void);
+extern void AbsorbAllFsyncRequests(void);
 
 extern Size CheckpointerShmemSize(void);
 extern void CheckpointerShmemInit(void);
@@ -43,4 +45,6 @@ extern uint32 IncCheckpointSyncCycle(void);
 
 extern bool FirstCallSinceLastCheckpoint(void);
 
+extern void CountBackendWrite(void);
+
 #endif                         /* _BGWRITER_H */
index 1877eef2391639e39e4d99b95e7d56214b7fe3b6..e2ba64e898433bcb1a260aa030ac78c0cb93f3aa 100644 (file)
@@ -44,6 +44,11 @@ extern int   postmaster_alive_fds[2];
 #define POSTMASTER_FD_OWN      1   /* kept open by postmaster only */
 #endif
 
+#define FSYNC_FD_SUBMIT            0
+#define FSYNC_FD_PROCESS       1
+
+extern int fsync_fds[2];
+
 extern PGDLLIMPORT const char *progname;
 
 extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn();
index 558e4d8518b19193af3688469181cfd19b70133f..798a96529276ee3a354b22f9df3713292a5c5ac6 100644 (file)
@@ -140,7 +140,8 @@ extern void mdpostckpt(void);
 
 extern void SetForwardFsyncRequests(void);
 extern void RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum,
-                    BlockNumber segno);
+                    BlockNumber segno, int fd);
+extern bool FlushFsyncRequestQueueIfNecessary(void);
 extern void ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum);
 extern void ForgetDatabaseFsyncRequests(Oid dbid);