From 436d3e1fabe3bbd4d83f591a1e60bde1a4222f29 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 11 Apr 2013 15:09:59 +0000 Subject: [PATCH] Hand off requests to the ring-flusher in batches. --- contrib/pg_ringflush/pg_ringflush.c | 41 ++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/contrib/pg_ringflush/pg_ringflush.c b/contrib/pg_ringflush/pg_ringflush.c index e3c4589d1c..cf46ec1872 100644 --- a/contrib/pg_ringflush/pg_ringflush.c +++ b/contrib/pg_ringflush/pg_ringflush.c @@ -17,6 +17,7 @@ #include "storage/shmem.h" #define REQUEST_QUEUE_SIZE 8192 +#define LOCAL_CACHE_SIZE 32 typedef struct { @@ -37,6 +38,8 @@ static ring_drop_hook_type prev_ring_drop_hook; static bool got_sigterm; static RingFlushQueue *pg_ringflush_queue; static LWLockId lwlockid; +static int nlocalcache; +static Buffer localcache[LOCAL_CACHE_SIZE]; /* Function prototypes. */ extern void _PG_init(void); @@ -164,26 +167,58 @@ static void pg_ringflush_ring_drop_hook(Buffer buffer) { uint64 nrequest; + int i; PGPROC *bgproc; + /* Call any other hook. */ if (prev_ring_drop_hook != NULL) prev_ring_drop_hook(buffer); + /* If flush queue doesn't exist, forget it. */ if (pg_ringflush_queue == NULL) return; + /* + * If we've managed to completely fill the queue, that means our previous + * attempts to pass this data off to the shared queue were unsuccesful. + * We could try to wrap our local cache, but it's not worth it; assume the + * background process is overloaded and just start over. + */ + if (nlocalcache >= LOCAL_CACHE_SIZE) + nlocalcache = 0; + + /* Remember this request. */ + localcache[nlocalcache++] = buffer; + + /* + * If we've used up more than half our cache slots, we'll try to hand off + * the list of buffers to the background process. + */ + if (nlocalcache < LOCAL_CACHE_SIZE / 2) + return; + + /* If we can't get the lock, skip it for now; retry on next buffer. */ Assert(lwlockid != 0); if (!LWLockConditionalAcquire(lwlockid, LW_EXCLUSIVE)) return; + /* Stuff requests into queue. */ nrequest = pg_ringflush_queue->nrequest; - pg_ringflush_queue->data[nrequest % REQUEST_QUEUE_SIZE] = buffer; - pg_ringflush_queue->nrequest = nrequest + 1; + for (i = 0; i < nlocalcache; ++i) + pg_ringflush_queue->data[(nrequest + i) % REQUEST_QUEUE_SIZE] = + localcache[i]; + pg_ringflush_queue->nrequest = nrequest + nlocalcache; + + /* Remember identity of background process we need to kick. */ bgproc = pg_ringflush_queue->flushproc; + /* Release lock. */ LWLockRelease(lwlockid); - /* Kick him. */ + /* Forget all the buffer IDs we remembered. */ + nlocalcache = 0; + + /* Kick background process. */ if (bgproc != NULL) SetLatch(&bgproc->procLatch); } -- 2.30.2