#include "storage/shmem.h"
#define REQUEST_QUEUE_SIZE 8192
+#define LOCAL_CACHE_SIZE 32
typedef struct
{
static bool got_sigterm;
static RingFlushQueue *pg_ringflush_queue;
static LWLockId lwlockid;
+static int nlocalcache;
+static Buffer localcache[LOCAL_CACHE_SIZE];
/* Function prototypes. */
extern void _PG_init(void);
pg_ringflush_ring_drop_hook(Buffer buffer)
{
uint64 nrequest;
+ int i;
PGPROC *bgproc;
+ /* Call any other hook. */
if (prev_ring_drop_hook != NULL)
prev_ring_drop_hook(buffer);
+ /* If flush queue doesn't exist, forget it. */
if (pg_ringflush_queue == NULL)
return;
+ /*
+ * If we've managed to completely fill the queue, that means our previous
+ * attempts to pass this data off to the shared queue were unsuccesful.
+ * We could try to wrap our local cache, but it's not worth it; assume the
+ * background process is overloaded and just start over.
+ */
+ if (nlocalcache >= LOCAL_CACHE_SIZE)
+ nlocalcache = 0;
+
+ /* Remember this request. */
+ localcache[nlocalcache++] = buffer;
+
+ /*
+ * If we've used up more than half our cache slots, we'll try to hand off
+ * the list of buffers to the background process.
+ */
+ if (nlocalcache < LOCAL_CACHE_SIZE / 2)
+ return;
+
+ /* If we can't get the lock, skip it for now; retry on next buffer. */
Assert(lwlockid != 0);
if (!LWLockConditionalAcquire(lwlockid, LW_EXCLUSIVE))
return;
+ /* Stuff requests into queue. */
nrequest = pg_ringflush_queue->nrequest;
- pg_ringflush_queue->data[nrequest % REQUEST_QUEUE_SIZE] = buffer;
- pg_ringflush_queue->nrequest = nrequest + 1;
+ for (i = 0; i < nlocalcache; ++i)
+ pg_ringflush_queue->data[(nrequest + i) % REQUEST_QUEUE_SIZE] =
+ localcache[i];
+ pg_ringflush_queue->nrequest = nrequest + nlocalcache;
+
+ /* Remember identity of background process we need to kick. */
bgproc = pg_ringflush_queue->flushproc;
+ /* Release lock. */
LWLockRelease(lwlockid);
- /* Kick him. */
+ /* Forget all the buffer IDs we remembered. */
+ nlocalcache = 0;
+
+ /* Kick background process. */
if (bgproc != NULL)
SetLatch(&bgproc->procLatch);
}