Hand off requests to the ring-flusher in batches. pg_ringflush
authorRobert Haas <[email protected]>
Thu, 11 Apr 2013 15:09:59 +0000 (15:09 +0000)
committerRobert Haas <[email protected]>
Thu, 11 Apr 2013 15:09:59 +0000 (15:09 +0000)
contrib/pg_ringflush/pg_ringflush.c

index e3c4589d1c3e5ce9ef18828be40a0ab5313d9056..cf46ec1872862abdf7bd741ea0334046ed5a09d4 100644 (file)
@@ -17,6 +17,7 @@
 #include "storage/shmem.h"
 
 #define REQUEST_QUEUE_SIZE                     8192
+#define LOCAL_CACHE_SIZE                       32
 
 typedef struct
 {
@@ -37,6 +38,8 @@ static ring_drop_hook_type prev_ring_drop_hook;
 static bool                                    got_sigterm;
 static RingFlushQueue     *pg_ringflush_queue;
 static LWLockId                                lwlockid;
+static int                                     nlocalcache;
+static Buffer                          localcache[LOCAL_CACHE_SIZE];
 
 /* Function prototypes. */
 extern void            _PG_init(void);
@@ -164,26 +167,58 @@ static void
 pg_ringflush_ring_drop_hook(Buffer buffer)
 {
        uint64          nrequest;
+       int                     i;
        PGPROC     *bgproc;
 
+       /* Call any other hook. */
        if (prev_ring_drop_hook != NULL)
                prev_ring_drop_hook(buffer);
 
+       /* If flush queue doesn't exist, forget it. */
        if (pg_ringflush_queue == NULL)
                return;
 
+       /*
+        * If we've managed to completely fill the queue, that means our previous
+        * attempts to pass this data off to the shared queue were unsuccesful.
+        * We could try to wrap our local cache, but it's not worth it; assume the
+        * background process is overloaded and just start over.
+        */
+       if (nlocalcache >= LOCAL_CACHE_SIZE)
+               nlocalcache = 0;
+
+       /* Remember this request. */
+       localcache[nlocalcache++] = buffer;
+
+       /*
+        * If we've used up more than half our cache slots, we'll try to hand off
+        * the list of buffers to the background process.
+        */
+       if (nlocalcache < LOCAL_CACHE_SIZE / 2)
+               return;
+
+       /* If we can't get the lock, skip it for now; retry on next buffer. */
        Assert(lwlockid != 0);
        if (!LWLockConditionalAcquire(lwlockid, LW_EXCLUSIVE))
                return;
 
+       /* Stuff requests into queue. */
        nrequest = pg_ringflush_queue->nrequest;
-       pg_ringflush_queue->data[nrequest % REQUEST_QUEUE_SIZE] = buffer;
-       pg_ringflush_queue->nrequest = nrequest + 1;
+       for (i = 0; i < nlocalcache; ++i)
+               pg_ringflush_queue->data[(nrequest + i) % REQUEST_QUEUE_SIZE] =
+                       localcache[i];
+       pg_ringflush_queue->nrequest = nrequest + nlocalcache;
+
+       /* Remember identity of background process we need to kick. */
        bgproc = pg_ringflush_queue->flushproc;
 
+       /* Release lock. */
        LWLockRelease(lwlockid);
 
-       /* Kick him. */
+       /* Forget all the buffer IDs we remembered. */
+       nlocalcache = 0;
+
+       /* Kick background process. */
        if (bgproc != NULL)
                SetLatch(&bgproc->procLatch);
 }