Glue layer to connect the executor to the shm_mq mechanism.
authorRobert Haas <[email protected]>
Sat, 19 Sep 2015 01:10:08 +0000 (21:10 -0400)
committerRobert Haas <[email protected]>
Sat, 19 Sep 2015 01:56:58 +0000 (21:56 -0400)
The shm_mq mechanism was built to send error (and notice) messages and
tuples between backends.  However, shm_mq itself only deals in raw
bytes.  Since commit 2bd9e412f92bc6a68f3e8bcb18e04955cc35001d, we have
had infrastructure for one message to redirect protocol messages to a
queue and for another backend to parse them and do useful things with
them.  This commit introduces a somewhat analogous facility for tuples
by adding a new type of DestReceiver, DestTupleQueue, which writes
each tuple generated by a query into a shm_mq, and a new
TupleQueueFunnel facility which reads raw tuples out of the queue and
reconstructs the HeapTuple format expected by the executor.

The TupleQueueFunnel abstraction supports reading from multiple tuple
streams at the same time, but only in round-robin fashion.  Someone
could imaginably want other policies, but this should be good enough
to meet our short-term needs related to parallel query, and we can
always extend it later.

This also makes one minor addition to the shm_mq API that didn'
seem worth breaking out as a separate patch.

Extracted from Amit Kapila's parallel sequential scan patch.  This
code was originally written by me, and then it was revised by Amit,
and then it was revised some more by me.

src/backend/executor/Makefile
src/backend/executor/tqueue.c [new file with mode: 0644]
src/backend/storage/ipc/shm_mq.c
src/backend/tcop/dest.c
src/include/executor/tqueue.h [new file with mode: 0644]
src/include/storage/shm_mq.h
src/include/tcop/dest.h
src/tools/pgindent/typedefs.list

index 08cba6fa2b5f2bfbd444ee5e36d06a1ac305ba58..249534bb92720ac15c7498493eb15aa7a2b68bec 100644 (file)
@@ -24,6 +24,6 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
        nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
        nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \
        nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \
-       nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o
+       nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
new file mode 100644 (file)
index 0000000..d0edf4e
--- /dev/null
@@ -0,0 +1,262 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.c
+ *       Use shm_mq to send & receive tuples between parallel backends
+ *
+ * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
+ * under the hood, writes tuples from the executor to a shm_mq.
+ *
+ * A TupleQueueFunnel helps manage the process of reading tuples from
+ * one or more shm_mq objects being used as tuple queues.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/executor/tqueue.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/tqueue.h"
+#include "miscadmin.h"
+
+typedef struct
+{
+       DestReceiver pub;
+       shm_mq_handle *handle;
+}      TQueueDestReceiver;
+
+struct TupleQueueFunnel
+{
+       int                     nqueues;
+       int                     maxqueues;
+       int                     nextqueue;
+       shm_mq_handle **queue;
+};
+
+/*
+ * Receive a tuple.
+ */
+static void
+tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
+{
+       TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+       HeapTuple       tuple;
+
+       tuple = ExecMaterializeSlot(slot);
+       shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
+}
+
+/*
+ * Prepare to receive tuples from executor.
+ */
+static void
+tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+       /* do nothing */
+}
+
+/*
+ * Clean up at end of an executor run
+ */
+static void
+tqueueShutdownReceiver(DestReceiver *self)
+{
+       /* do nothing */
+}
+
+/*
+ * Destroy receiver when done with it
+ */
+static void
+tqueueDestroyReceiver(DestReceiver *self)
+{
+       pfree(self);
+}
+
+/*
+ * Create a DestReceiver that writes tuples to a tuple queue.
+ */
+DestReceiver *
+CreateTupleQueueDestReceiver(shm_mq_handle *handle)
+{
+       TQueueDestReceiver *self;
+
+       self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
+
+       self->pub.receiveSlot = tqueueReceiveSlot;
+       self->pub.rStartup = tqueueStartupReceiver;
+       self->pub.rShutdown = tqueueShutdownReceiver;
+       self->pub.rDestroy = tqueueDestroyReceiver;
+       self->pub.mydest = DestTupleQueue;
+       self->handle = handle;
+
+       return (DestReceiver *) self;
+}
+
+/*
+ * Create a tuple queue funnel.
+ */
+TupleQueueFunnel *
+CreateTupleQueueFunnel(void)
+{
+       TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel));
+
+       funnel->maxqueues = 8;
+       funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+
+       return funnel;
+}
+
+/*
+ * Destroy a tuple queue funnel.
+ */
+void
+DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
+{
+       int                     i;
+
+       for (i = 0; i < funnel->nqueues; i++)
+               shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
+       pfree(funnel->queue);
+       pfree(funnel);
+}
+
+/*
+ * Remember the shared memory queue handle in funnel.
+ */
+void
+RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
+{
+       if (funnel->nqueues < funnel->maxqueues)
+       {
+               funnel->queue[funnel->nqueues++] = handle;
+               return;
+       }
+
+       if (funnel->nqueues >= funnel->maxqueues)
+       {
+               int                     newsize = funnel->nqueues * 2;
+
+               Assert(funnel->nqueues == funnel->maxqueues);
+
+               funnel->queue = repalloc(funnel->queue,
+                                                                newsize * sizeof(shm_mq_handle *));
+               funnel->maxqueues = newsize;
+       }
+
+       funnel->queue[funnel->nqueues++] = handle;
+}
+
+/*
+ * Fetch a tuple from a tuple queue funnel.
+ *
+ * We try to read from the queues in round-robin fashion so as to avoid
+ * the situation where some workers get their tuples read expediently while
+ * others are barely ever serviced.
+ *
+ * Even when nowait = false, we read from the individual queues in
+ * non-blocking mode.  Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
+ * it can still accumulate bytes from a partially-read message, so doing it
+ * this way should outperform doing a blocking read on each queue in turn.
+ *
+ * The return value is NULL if there are no remaining queues or if
+ * nowait = true and no queue returned a tuple without blocking.  *done, if
+ * not NULL, is set to true when there are no remaining queues and false in
+ * any other case.
+ */
+HeapTuple
+TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+{
+       int                     waitpos = funnel->nextqueue;
+
+       /* Corner case: called before adding any queues, or after all are gone. */
+       if (funnel->nqueues == 0)
+       {
+               if (done != NULL)
+                       *done = true;
+               return NULL;
+       }
+
+       if (done != NULL)
+               *done = false;
+
+       for (;;)
+       {
+               shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
+               shm_mq_result result;
+               Size            nbytes;
+               void       *data;
+
+               /* Attempt to read a message. */
+               result = shm_mq_receive(mqh, &nbytes, &data, true);
+
+               /*
+                * Normally, we advance funnel->nextqueue to the next queue at this
+                * point, but if we're pointing to a queue that we've just discovered
+                * is detached, then forget that queue and leave the pointer where it
+                * is until the number of remaining queues fall below that pointer and
+                * at that point make the pointer point to the first queue.
+                */
+               if (result != SHM_MQ_DETACHED)
+                       funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
+               else
+               {
+                       --funnel->nqueues;
+                       if (funnel->nqueues == 0)
+                       {
+                               if (done != NULL)
+                                       *done = true;
+                               return NULL;
+                       }
+
+                       memmove(&funnel->queue[funnel->nextqueue],
+                                       &funnel->queue[funnel->nextqueue + 1],
+                                       sizeof(shm_mq_handle *)
+                                       * (funnel->nqueues - funnel->nextqueue));
+
+                       if (funnel->nextqueue >= funnel->nqueues)
+                               funnel->nextqueue = 0;
+
+                       if (funnel->nextqueue < waitpos)
+                               --waitpos;
+
+                       continue;
+               }
+
+               /* If we got a message, return it. */
+               if (result == SHM_MQ_SUCCESS)
+               {
+                       HeapTupleData htup;
+
+                       /*
+                        * The tuple data we just read from the queue is only valid until
+                        * we again attempt to read from it.  Copy the tuple into a single
+                        * palloc'd chunk as callers will expect.
+                        */
+                       ItemPointerSetInvalid(&htup.t_self);
+                       htup.t_tableOid = InvalidOid;
+                       htup.t_len = nbytes;
+                       htup.t_data = data;
+                       return heap_copytuple(&htup);
+               }
+
+               /*
+                * If we've visited all of the queues, then we should either give up
+                * and return NULL (if we're in non-blocking mode) or wait for the
+                * process latch to be set (otherwise).
+                */
+               if (funnel->nextqueue == waitpos)
+               {
+                       if (nowait)
+                               return NULL;
+                       WaitLatch(MyLatch, WL_LATCH_SET, 0);
+                       CHECK_FOR_INTERRUPTS();
+                       ResetLatch(MyLatch);
+               }
+       }
+}
index 0e60dbcddc82ec14612980372af4a790b1450a73..c78f1650e6af4eaf741be75448766ce7a09ec03c 100644 (file)
@@ -745,6 +745,15 @@ shm_mq_detach(shm_mq *mq)
                SetLatch(&victim->procLatch);
 }
 
+/*
+ * Get the shm_mq from handle.
+ */
+shm_mq *
+shm_mq_get_queue(shm_mq_handle *mqh)
+{
+       return mqh->mqh_queue;
+}
+
 /*
  * Write bytes into a shared message queue.
  */
index bcf38952a8c2af18e009a1340e7177e6f684ce58..d645751ff580a92dff9b4ad80d5c983b4d1381a7 100644 (file)
@@ -34,6 +34,7 @@
 #include "commands/createas.h"
 #include "commands/matview.h"
 #include "executor/functions.h"
+#include "executor/tqueue.h"
 #include "executor/tstoreReceiver.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -129,6 +130,9 @@ CreateDestReceiver(CommandDest dest)
 
                case DestTransientRel:
                        return CreateTransientRelDestReceiver(InvalidOid);
+
+               case DestTupleQueue:
+                       return CreateTupleQueueDestReceiver(NULL);
        }
 
        /* should never get here */
@@ -162,6 +166,7 @@ EndCommand(const char *commandTag, CommandDest dest)
                case DestCopyOut:
                case DestSQLFunction:
                case DestTransientRel:
+               case DestTupleQueue:
                        break;
        }
 }
@@ -204,6 +209,7 @@ NullCommand(CommandDest dest)
                case DestCopyOut:
                case DestSQLFunction:
                case DestTransientRel:
+               case DestTupleQueue:
                        break;
        }
 }
@@ -248,6 +254,7 @@ ReadyForQuery(CommandDest dest)
                case DestCopyOut:
                case DestSQLFunction:
                case DestTransientRel:
+               case DestTupleQueue:
                        break;
        }
 }
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
new file mode 100644 (file)
index 0000000..6f8eb73
--- /dev/null
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * tqueue.h
+ *       Use shm_mq to send & receive tuples between parallel backends
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/tqueue.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef TQUEUE_H
+#define TQUEUE_H
+
+#include "storage/shm_mq.h"
+#include "tcop/dest.h"
+
+/* Use this to send tuples to a shm_mq. */
+extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
+
+/* Use these to receive tuples from a shm_mq. */
+typedef struct TupleQueueFunnel TupleQueueFunnel;
+extern TupleQueueFunnel *CreateTupleQueueFunnel(void);
+extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel);
+extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *);
+extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait,
+                                        bool *done);
+
+#endif   /* TQUEUE_H */
index 1a2ba040cb43abfafbb6fbf8063a04f4247f597b..7621a358ab469a5e70f7e9586ac8406d5b80293b 100644 (file)
@@ -65,6 +65,9 @@ extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
 /* Break connection. */
 extern void shm_mq_detach(shm_mq *);
 
+/* Get the shm_mq from handle. */
+extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
+
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
                        Size nbytes, const void *data, bool nowait);
index 5bcca3fbcaacdf9cba7784c977dac2fefe3d6790..b560672fd40f74fac9770d70fa0999602a015e18 100644 (file)
@@ -94,7 +94,8 @@ typedef enum
        DestIntoRel,                            /* results sent to relation (SELECT INTO) */
        DestCopyOut,                            /* results sent to COPY TO code */
        DestSQLFunction,                        /* results sent to SQL-language func mgr */
-       DestTransientRel                        /* results sent to transient relation */
+       DestTransientRel,                       /* results sent to transient relation */
+       DestTupleQueue                          /* results sent to tuple queue */
 } CommandDest;
 
 /* ----------------
index 4b650d1bde73157e246eb227a1de6d2d4b06d224..a037f818acc95819d47a5c42436391a67b9da34c 100644 (file)
@@ -2014,6 +2014,7 @@ TupleHashEntry
 TupleHashEntryData
 TupleHashIterator
 TupleHashTable
+TupleQueueFunnel
 TupleTableSlot
 Tuplesortstate
 Tuplestorestate