Add a Gather executor node.
authorRobert Haas <[email protected]>
Wed, 30 Sep 2015 23:23:36 +0000 (19:23 -0400)
committerRobert Haas <[email protected]>
Wed, 30 Sep 2015 23:23:36 +0000 (19:23 -0400)
A Gather executor node runs any number of copies of a plan in an equal
number of workers and merges all of the results into a single tuple
stream.  It can also run the plan itself, if the workers are
unavailable or haven't started up yet.  It is intended to work with
the Partial Seq Scan node which will be added in future commits.

It could also be used to implement parallel query of a different sort
by itself, without help from Partial Seq Scan, if the single_copy mode
is used.  In that mode, a worker executes the plan, and the parallel
leader does not, merely collecting the worker's results.  So, a Gather
node could be inserted into a plan to split the execution of that plan
across two processes.  Nested Gather nodes aren't currently supported,
but we might want to add support for that in the future.

There's nothing in the planner to actually generate Gather nodes yet,
so it's not quite time to break out the champagne.  But we're getting
close.

Amit Kapila.  Some designs suggestions were provided by me, and I also
reviewed the patch.  Single-copy mode, documentation, and other minor
changes also by me.

26 files changed:
doc/src/sgml/config.sgml
src/backend/commands/explain.c
src/backend/executor/Makefile
src/backend/executor/execAmi.c
src/backend/executor/execMain.c
src/backend/executor/execParallel.c
src/backend/executor/execProcnode.c
src/backend/executor/nodeGather.c [new file with mode: 0644]
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/backend/optimizer/plan/setrefs.c
src/backend/optimizer/plan/subselect.c
src/backend/optimizer/util/pathnode.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/executor/executor.h
src/include/executor/nodeGather.h [new file with mode: 0644]
src/include/nodes/execnodes.h
src/include/nodes/nodes.h
src/include/nodes/plannodes.h
src/include/nodes/relation.h
src/include/optimizer/cost.h
src/include/optimizer/pathnode.h
src/tools/pgindent/typedefs.list

index 5c6d93fe07a5acf7fc2a884c9652a59d8c368296..696f286b760fac697e0731e99b8a0fe2f673a86e 100644 (file)
@@ -1928,6 +1928,22 @@ include_dir 'conf.d'
         </para>
        </listitem>
       </varlistentry>
+
+      <varlistentry id="guc-max-parallel-degree" xreflabel="max_parallel_degree">
+       <term><varname>max_parallel_degree</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_parallel_degree</> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Sets the maximum degree of parallelism for an individual parallel
+         operation.  Note that the requested number of workers may not actually
+         be available at runtime.  Parallel workers are taken from the pool
+         of processes established by <xref linkend="guc-max-worker-processes">.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
     </sect2>
    </sect1>
@@ -3398,6 +3414,36 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="parallel-tuple-cost" xreflabel="parallel_tuple_cost">
+      <term><varname>parallel_tuple_cost</varname> (<type>floating point</type>)
+      <indexterm>
+       <primary><varname>parallel_tuple_cost</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the planner's estimate of the cost of transferring a tuple
+        from a parallel worker process to another process.
+        The default is 0.1.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry id="parallel-setup-cost" xreflabel="parallel_setup_cost">
+      <term><varname>parallel_setup_cost</varname> (<type>floating point</type>)
+      <indexterm>
+       <primary><varname>parallel_setup_cost</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the planner's estimate of the cost of launching parallel worker
+        processes.
+        The default is 1000.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-effective-cache-size" xreflabel="effective_cache_size">
       <term><varname>effective_cache_size</varname> (<type>integer</type>)
       <indexterm>
index f0d9e94eed2d19a14a22b43eef935b050212690d..7fb8a1458dfa427d87b296a6f5d8768d8aac5170 100644 (file)
@@ -853,6 +853,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
                case T_SampleScan:
                        pname = sname = "Sample Scan";
                        break;
+               case T_Gather:
+                       pname = sname = "Gather";
+                       break;
                case T_IndexScan:
                        pname = sname = "Index Scan";
                        break;
@@ -1276,6 +1279,22 @@ ExplainNode(PlanState *planstate, List *ancestors,
                                show_instrumentation_count("Rows Removed by Filter", 1,
                                                                                   planstate, es);
                        break;
+               case T_Gather:
+                       {
+                               Gather *gather = (Gather *) plan;
+
+                               show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
+                               if (plan->qual)
+                                       show_instrumentation_count("Rows Removed by Filter", 1,
+                                                                                          planstate, es);
+                               ExplainPropertyInteger("Number of Workers",
+                                                                          gather->num_workers, es);
+                               if (gather->single_copy)
+                                       ExplainPropertyText("Single Copy",
+                                                                               gather->single_copy ? "true" : "false",
+                                                                               es);
+                       }
+                       break;
                case T_FunctionScan:
                        if (es->verbose)
                        {
index f5e1e1aefcdd30b1cd5408c6ac63120563aa3746..51edd4c5e709590d75fd9459f43b13d7eca2bad2 100644 (file)
@@ -17,8 +17,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
        execScan.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
        nodeBitmapAnd.o nodeBitmapOr.o \
-       nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \
-       nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
+       nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \
+       nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
        nodeLimit.o nodeLockRows.o \
        nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
        nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \
index 93e1e9a691c507b08aa58beddbcb74c66a1a8501..163650cecd1cb748af692a7dfa74b20f0fad013c 100644 (file)
@@ -24,6 +24,7 @@
 #include "executor/nodeCustom.h"
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeFunctionscan.h"
+#include "executor/nodeGather.h"
 #include "executor/nodeGroup.h"
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
@@ -160,6 +161,10 @@ ExecReScan(PlanState *node)
                        ExecReScanSampleScan((SampleScanState *) node);
                        break;
 
+               case T_GatherState:
+                       ExecReScanGather((GatherState *) node);
+                       break;
+
                case T_IndexScanState:
                        ExecReScanIndexScan((IndexScanState *) node);
                        break;
@@ -467,6 +472,9 @@ ExecSupportsBackwardScan(Plan *node)
                        /* Simplify life for tablesample methods by disallowing this */
                        return false;
 
+               case T_Gather:
+                       return false;
+
                case T_IndexScan:
                        return IndexSupportsBackwardScan(((IndexScan *) node)->indexid) &&
                                TargetListSupportsBackwardScan(node->targetlist);
index 85ff46b8026f6aace5b622b0d3b4ddddfc8a364e..37b7bbd413b3c629d95d2e27f0e9ea01adfebf04 100644 (file)
@@ -347,6 +347,9 @@ standard_ExecutorRun(QueryDesc *queryDesc,
                                        direction,
                                        dest);
 
+       /* Allow nodes to release or shut down resources. */
+       (void) ExecShutdownNode(queryDesc->planstate);
+
        /*
         * shutdown tuple receiver, if we started it
         */
index a409a9a571f10bd3e4143ef3aa844f2f81236d60..e6930c1d51c929f4c1306ea38b749a757543929e 100644 (file)
@@ -71,7 +71,7 @@ typedef struct ExecParallelInitializeDSMContext
 } ExecParallelInitializeDSMContext;
 
 /* Helper functions that run in the parallel leader. */
-static char *ExecSerializePlan(Plan *plan, List *rangetable);
+static char *ExecSerializePlan(Plan *plan, EState *estate);
 static bool ExecParallelEstimate(PlanState *node,
                                         ExecParallelEstimateContext *e);
 static bool ExecParallelInitializeDSM(PlanState *node,
@@ -88,7 +88,7 @@ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
  * Create a serialized representation of the plan to be sent to each worker.
  */
 static char *
-ExecSerializePlan(Plan *plan, List *rangetable)
+ExecSerializePlan(Plan *plan, EState *estate)
 {
        PlannedStmt *pstmt;
        ListCell   *tlist;
@@ -125,13 +125,13 @@ ExecSerializePlan(Plan *plan, List *rangetable)
        pstmt->canSetTag = 1;
        pstmt->transientPlan = 0;
        pstmt->planTree = plan;
-       pstmt->rtable = rangetable;
+       pstmt->rtable = estate->es_range_table;
        pstmt->resultRelations = NIL;
        pstmt->utilityStmt = NULL;
        pstmt->subplans = NIL;
        pstmt->rewindPlanIDs = NULL;
        pstmt->rowMarks = NIL;
-       pstmt->nParamExec = 0;
+       pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
        pstmt->relationOids = NIL;
        pstmt->invalItems = NIL;        /* workers can't replan anyway... */
        pstmt->hasRowSecurity = false;
@@ -271,7 +271,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
        pei->planstate = planstate;
 
        /* Fix up and serialize plan to be sent to workers. */
-       pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table);
+       pstmt_data = ExecSerializePlan(planstate->plan, estate);
 
        /* Create a parallel context. */
        pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
@@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        ExecutorStart(queryDesc, 0);
        ExecutorRun(queryDesc, ForwardScanDirection, 0L);
        ExecutorFinish(queryDesc);
-       ExecutorEnd(queryDesc);
 
        /* Report buffer usage during parallel execution. */
        buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
@@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
                ExecParallelReportInstrumentation(queryDesc->planstate,
                                                                                  instrumentation);
 
+       /* Must do this after capturing instrumentation. */
+       ExecutorEnd(queryDesc);
+
        /* Cleanup. */
        FreeQueryDesc(queryDesc);
        (*receiver->rDestroy) (receiver);
index 03c2febc3e13a5cf8ad9e9ff1071b8173e1fe3f7..5bc1d489421242298c759142fbd4307aa9b0b21b 100644 (file)
 #include "executor/nodeMergejoin.h"
 #include "executor/nodeModifyTable.h"
 #include "executor/nodeNestloop.h"
+#include "executor/nodeGather.h"
 #include "executor/nodeRecursiveunion.h"
 #include "executor/nodeResult.h"
 #include "executor/nodeSamplescan.h"
 #include "executor/nodeValuesscan.h"
 #include "executor/nodeWindowAgg.h"
 #include "executor/nodeWorktablescan.h"
+#include "nodes/nodeFuncs.h"
 #include "miscadmin.h"
 
 
@@ -307,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
                                                                                                  estate, eflags);
                        break;
 
+               case T_Gather:
+                       result = (PlanState *) ExecInitGather((Gather *) node,
+                                                                                                 estate, eflags);
+                       break;
+
                case T_Hash:
                        result = (PlanState *) ExecInitHash((Hash *) node,
                                                                                                estate, eflags);
@@ -504,6 +511,10 @@ ExecProcNode(PlanState *node)
                        result = ExecUnique((UniqueState *) node);
                        break;
 
+               case T_GatherState:
+                       result = ExecGather((GatherState *) node);
+                       break;
+
                case T_HashState:
                        result = ExecHash((HashState *) node);
                        break;
@@ -658,6 +669,10 @@ ExecEndNode(PlanState *node)
                        ExecEndSampleScan((SampleScanState *) node);
                        break;
 
+               case T_GatherState:
+                       ExecEndGather((GatherState *) node);
+                       break;
+
                case T_IndexScanState:
                        ExecEndIndexScan((IndexScanState *) node);
                        break;
@@ -769,3 +784,34 @@ ExecEndNode(PlanState *node)
                        break;
        }
 }
+
+/*
+ * ExecShutdownNode
+ *
+ * Give execution nodes a chance to stop asynchronous resource consumption
+ * and release any resources still held.  Currently, this is only used for
+ * parallel query, but we might want to extend it to other cases also (e.g.
+ * FDW).  We might also want to call it sooner, as soon as it's evident that
+ * no more rows will be needed (e.g. when a Limit is filled) rather than only
+ * at the end of ExecutorRun.
+ */
+bool
+ExecShutdownNode(PlanState *node)
+{
+       if (node == NULL)
+               return false;
+
+       switch (nodeTag(node))
+       {
+               case T_GatherState:
+                       {
+                               ExecShutdownGather((GatherState *) node);
+                               return true;
+                       }
+                       break;
+               default:
+                       break;
+       }
+
+       return planstate_tree_walker(node, ExecShutdownNode, NULL);
+}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
new file mode 100644 (file)
index 0000000..735dbaa
--- /dev/null
@@ -0,0 +1,299 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeGather.c
+ *       Support routines for scanning a plan via multiple workers.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/executor/nodeGather.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relscan.h"
+#include "executor/execdebug.h"
+#include "executor/execParallel.h"
+#include "executor/nodeGather.h"
+#include "executor/nodeSubplan.h"
+#include "executor/tqueue.h"
+#include "utils/rel.h"
+
+
+static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+
+
+/* ----------------------------------------------------------------
+ *             ExecInitGather
+ * ----------------------------------------------------------------
+ */
+GatherState *
+ExecInitGather(Gather *node, EState *estate, int eflags)
+{
+       GatherState *gatherstate;
+
+       /* Gather node doesn't have innerPlan node. */
+       Assert(innerPlan(node) == NULL);
+
+       /*
+        * create state structure
+        */
+       gatherstate = makeNode(GatherState);
+       gatherstate->ps.plan = (Plan *) node;
+       gatherstate->ps.state = estate;
+       gatherstate->need_to_scan_workers = false;
+       gatherstate->need_to_scan_locally = !node->single_copy;
+
+       /*
+        * Miscellaneous initialization
+        *
+        * create expression context for node
+        */
+       ExecAssignExprContext(estate, &gatherstate->ps);
+
+       /*
+        * initialize child expressions
+        */
+       gatherstate->ps.targetlist = (List *)
+               ExecInitExpr((Expr *) node->plan.targetlist,
+                                        (PlanState *) gatherstate);
+       gatherstate->ps.qual = (List *)
+               ExecInitExpr((Expr *) node->plan.qual,
+                                        (PlanState *) gatherstate);
+
+       /*
+        * tuple table initialization
+        */
+       ExecInitResultTupleSlot(estate, &gatherstate->ps);
+
+       /*
+        * now initialize outer plan
+        */
+       outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+
+       gatherstate->ps.ps_TupFromTlist = false;
+
+       /*
+        * Initialize result tuple type and projection info.
+        */
+       ExecAssignResultTypeFromTL(&gatherstate->ps);
+       ExecAssignProjectionInfo(&gatherstate->ps, NULL);
+
+       return gatherstate;
+}
+
+/* ----------------------------------------------------------------
+ *             ExecGather(node)
+ *
+ *             Scans the relation via multiple workers and returns
+ *             the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGather(GatherState *node)
+{
+       int                     i;
+       TupleTableSlot *slot;
+
+       /*
+        * Initialize the parallel context and workers on first execution. We do
+        * this on first execution rather than during node initialization, as it
+        * needs to allocate large dynamic segement, so it is better to do if it
+        * is really needed.
+        */
+       if (!node->pei)
+       {
+               EState     *estate = node->ps.state;
+
+               /* Initialize the workers required to execute Gather node. */
+               node->pei = ExecInitParallelPlan(node->ps.lefttree,
+                                                                                estate,
+                                                                 ((Gather *) (node->ps.plan))->num_workers);
+
+               /*
+                * Register backend workers. If the required number of workers are not
+                * available then we perform the scan with available workers and if
+                * there are no more workers available, then the Gather node will just
+                * scan locally.
+                */
+               LaunchParallelWorkers(node->pei->pcxt);
+
+               node->funnel = CreateTupleQueueFunnel();
+
+               for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+               {
+                       if (node->pei->pcxt->worker[i].bgwhandle)
+                       {
+                               shm_mq_set_handle(node->pei->tqueue[i],
+                                                                 node->pei->pcxt->worker[i].bgwhandle);
+                               RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]);
+                               node->need_to_scan_workers = true;
+                       }
+               }
+
+               /* If no workers are available, we must always scan locally. */
+               if (!node->need_to_scan_workers)
+                       node->need_to_scan_locally = true;
+       }
+
+       slot = gather_getnext(node);
+
+       if (TupIsNull(slot))
+       {
+               /*
+                * Destroy the parallel context once we complete fetching all the
+                * tuples.  Otherwise, the DSM and workers will stick around for the
+                * lifetime of the entire statement.
+                */
+               ExecShutdownGather(node);
+       }
+       return slot;
+}
+
+/* ----------------------------------------------------------------
+ *             ExecEndGather
+ *
+ *             frees any storage allocated through C routines.
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndGather(GatherState *node)
+{
+       ExecShutdownGather(node);
+       ExecFreeExprContext(&node->ps);
+       ExecClearTuple(node->ps.ps_ResultTupleSlot);
+       ExecEndNode(outerPlanState(node));
+}
+
+/*
+ * gather_getnext
+ *
+ * Get the next tuple from shared memory queue.  This function
+ * is reponsible for fetching tuples from all the queues associated
+ * with worker backends used in Gather node execution and if there is
+ * no data available from queues or no worker is available, it does
+ * fetch the data from local node.
+ */
+TupleTableSlot *
+gather_getnext(GatherState *gatherstate)
+{
+       PlanState  *outerPlan;
+       TupleTableSlot *outerTupleSlot;
+       TupleTableSlot *slot;
+       HeapTuple       tup;
+
+       /*
+        * We can use projection info of Gather for the tuples received from
+        * worker backends as currently for all cases worker backends sends the
+        * projected tuple as required by Gather node.
+        */
+       slot = gatherstate->ps.ps_ProjInfo->pi_slot;
+
+       while (gatherstate->need_to_scan_workers ||
+                  gatherstate->need_to_scan_locally)
+       {
+               if (gatherstate->need_to_scan_workers)
+               {
+                       bool            done = false;
+
+                       /* wait only if local scan is done */
+                       tup = TupleQueueFunnelNext(gatherstate->funnel,
+                                                                          gatherstate->need_to_scan_locally,
+                                                                          &done);
+                       if (done)
+                               gatherstate->need_to_scan_workers = false;
+
+                       if (HeapTupleIsValid(tup))
+                       {
+                               ExecStoreTuple(tup,             /* tuple to store */
+                                                          slot,        /* slot to store in */
+                                                          InvalidBuffer,       /* buffer associated with this
+                                                                                                * tuple */
+                                                          true);       /* pfree this pointer if not from heap */
+
+                               return slot;
+                       }
+               }
+
+               if (gatherstate->need_to_scan_locally)
+               {
+                       outerPlan = outerPlanState(gatherstate);
+
+                       outerTupleSlot = ExecProcNode(outerPlan);
+
+                       if (!TupIsNull(outerTupleSlot))
+                               return outerTupleSlot;
+
+                       gatherstate->need_to_scan_locally = false;
+               }
+       }
+
+       return ExecClearTuple(slot);
+}
+
+/* ----------------------------------------------------------------
+ *             ExecShutdownGather
+ *
+ *             Destroy the setup for parallel workers.  Collect all the
+ *             stats after workers are stopped, else some work done by
+ *             workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+       Gather *gather;
+
+       if (node->pei == NULL || node->pei->pcxt == NULL)
+               return;
+
+       /*
+        * Ensure all workers have finished before destroying the parallel context
+        * to ensure a clean exit.
+        */
+       if (node->funnel)
+       {
+               DestroyTupleQueueFunnel(node->funnel);
+               node->funnel = NULL;
+       }
+
+       ExecParallelFinish(node->pei);
+
+       /* destroy parallel context. */
+       DestroyParallelContext(node->pei->pcxt);
+       node->pei->pcxt = NULL;
+
+       gather = (Gather *) node->ps.plan;
+       node->need_to_scan_locally = !gather->single_copy;
+       node->need_to_scan_workers = false;
+}
+
+/* ----------------------------------------------------------------
+ *                                             Join Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *             ExecReScanGather
+ *
+ *             Re-initialize the workers and rescans a relation via them.
+ * ----------------------------------------------------------------
+ */
+void
+ExecReScanGather(GatherState *node)
+{
+       /*
+        * Re-initialize the parallel context and workers to perform rescan of
+        * relation.  We want to gracefully shutdown all the workers so that they
+        * should be able to propagate any error or other information to master
+        * backend before dying.
+        */
+       ExecShutdownGather(node);
+
+       ExecReScan(node->ps.lefttree);
+}
index 4b4ddec4c21b259f9b5c4ca4410a4787fe65bfc8..88dc0858707bdbe8be90e69abb3e23f26743ee38 100644 (file)
@@ -314,6 +314,28 @@ _copyBitmapOr(const BitmapOr *from)
        return newnode;
 }
 
+/*
+ * _copyGather
+ */
+static Gather *
+_copyGather(const Gather *from)
+{
+       Gather     *newnode = makeNode(Gather);
+
+       /*
+        * copy node superclass fields
+        */
+       CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+       /*
+        * copy remainder of node
+        */
+       COPY_SCALAR_FIELD(num_workers);
+       COPY_SCALAR_FIELD(single_copy);
+
+       return newnode;
+}
+
 
 /*
  * CopyScanFields
@@ -4235,6 +4257,9 @@ copyObject(const void *from)
                case T_Scan:
                        retval = _copyScan(from);
                        break;
+               case T_Gather:
+                       retval = _copyGather(from);
+                       break;
                case T_SeqScan:
                        retval = _copySeqScan(from);
                        break;
index ee9c360345f67271b2cb19f260aebd28faa58861..4645ecb804fa0fd4e78fd14f3436ba5b7a8e333c 100644 (file)
@@ -432,6 +432,17 @@ _outBitmapOr(StringInfo str, const BitmapOr *node)
        WRITE_NODE_FIELD(bitmapplans);
 }
 
+static void
+_outGather(StringInfo str, const Gather *node)
+{
+       WRITE_NODE_TYPE("GATHER");
+
+       _outPlanInfo(str, (const Plan *) node);
+
+       WRITE_UINT_FIELD(num_workers);
+       WRITE_UINT_FIELD(single_copy);
+}
+
 static void
 _outScan(StringInfo str, const Scan *node)
 {
@@ -3000,6 +3011,9 @@ _outNode(StringInfo str, const void *obj)
                        case T_BitmapOr:
                                _outBitmapOr(str, obj);
                                break;
+                       case T_Gather:
+                               _outGather(str, obj);
+                               break;
                        case T_Scan:
                                _outScan(str, obj);
                                break;
index d107d76a3c04d9027a19236a4138a86e73475330..1b61fd9d4eaa7206e3b6a1b46dfa022b2a2c16d6 100644 (file)
@@ -11,6 +11,8 @@
  *     cpu_tuple_cost          Cost of typical CPU time to process a tuple
  *     cpu_index_tuple_cost  Cost of typical CPU time to process an index tuple
  *     cpu_operator_cost       Cost of CPU time to execute an operator or function
+ *     parallel_tuple_cost Cost of CPU time to pass a tuple from worker to master backend
+ *     parallel_setup_cost Cost of setting up shared memory for parallelism
  *
  * We expect that the kernel will typically do some amount of read-ahead
  * optimization; this in conjunction with seek costs means that seq_page_cost
@@ -102,11 +104,15 @@ double            random_page_cost = DEFAULT_RANDOM_PAGE_COST;
 double         cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST;
 double         cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST;
 double         cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST;
+double         parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST;
+double         parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST;
 
 int                    effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE;
 
 Cost           disable_cost = 1.0e10;
 
+int                    max_parallel_degree = 0;
+
 bool           enable_seqscan = true;
 bool           enable_indexscan = true;
 bool           enable_indexonlyscan = true;
@@ -289,6 +295,38 @@ cost_samplescan(Path *path, PlannerInfo *root,
        path->total_cost = startup_cost + run_cost;
 }
 
+/*
+ * cost_gather
+ *       Determines and returns the cost of gather path.
+ *
+ * 'rel' is the relation to be operated upon
+ * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ */
+void
+cost_gather(GatherPath *path, PlannerInfo *root,
+                       RelOptInfo *rel, ParamPathInfo *param_info)
+{
+       Cost            startup_cost = 0;
+       Cost            run_cost = 0;
+
+       /* Mark the path with the correct row estimate */
+       if (param_info)
+               path->path.rows = param_info->ppi_rows;
+       else
+               path->path.rows = rel->rows;
+
+       startup_cost = path->subpath->startup_cost;
+
+       run_cost = path->subpath->total_cost - path->subpath->startup_cost;
+
+       /* Parallel setup and communication cost. */
+       startup_cost += parallel_setup_cost;
+       run_cost += parallel_tuple_cost * rel->tuples;
+
+       path->path.startup_cost = startup_cost;
+       path->path.total_cost = (startup_cost + run_cost);
+}
+
 /*
  * cost_index
  *       Determines and returns the cost of scanning a relation using an index.
index 404c6f593d7e5150dae984df348cf3a253c0206c..0ee7392bcce53dc2b80650176f4c3072f4ff1c4a 100644 (file)
@@ -60,6 +60,8 @@ static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path,
                                        List *tlist, List *scan_clauses);
 static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path,
                                           List *tlist, List *scan_clauses);
+static Gather *create_gather_plan(PlannerInfo *root,
+                                  GatherPath *best_path);
 static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path,
                                          List *tlist, List *scan_clauses, bool indexonly);
 static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
@@ -104,6 +106,8 @@ static void copy_plan_costsize(Plan *dest, Plan *src);
 static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid);
 static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid,
                                TableSampleClause *tsc);
+static Gather *make_gather(List *qptlist, List *qpqual,
+                       int nworkers, bool single_copy, Plan *subplan);
 static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid,
                           Oid indexid, List *indexqual, List *indexqualorig,
                           List *indexorderby, List *indexorderbyorig,
@@ -273,6 +277,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path)
                        plan = create_unique_plan(root,
                                                                          (UniquePath *) best_path);
                        break;
+               case T_Gather:
+                       plan = (Plan *) create_gather_plan(root,
+                                                                                          (GatherPath *) best_path);
+                       break;
                default:
                        elog(ERROR, "unrecognized node type: %d",
                                 (int) best_path->pathtype);
@@ -1101,6 +1109,34 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path)
        return plan;
 }
 
+/*
+ * create_gather_plan
+ *
+ *       Create a Gather plan for 'best_path' and (recursively) plans
+ *       for its subpaths.
+ */
+static Gather *
+create_gather_plan(PlannerInfo *root, GatherPath *best_path)
+{
+       Gather     *gather_plan;
+       Plan       *subplan;
+
+       subplan = create_plan_recurse(root, best_path->subpath);
+
+       gather_plan = make_gather(subplan->targetlist,
+                                                         NIL,
+                                                         best_path->num_workers,
+                                                         best_path->single_copy,
+                                                         subplan);
+
+       copy_path_costsize(&gather_plan->plan, &best_path->path);
+
+       /* use parallel mode for parallel plans. */
+       root->glob->parallelModeNeeded = true;
+
+       return gather_plan;
+}
+
 
 /*****************************************************************************
  *
@@ -4735,6 +4771,27 @@ make_unique(Plan *lefttree, List *distinctList)
        return node;
 }
 
+static Gather *
+make_gather(List *qptlist,
+                       List *qpqual,
+                       int nworkers,
+                       bool single_copy,
+                       Plan *subplan)
+{
+       Gather     *node = makeNode(Gather);
+       Plan       *plan = &node->plan;
+
+       /* cost should be inserted by caller */
+       plan->targetlist = qptlist;
+       plan->qual = qpqual;
+       plan->lefttree = subplan;
+       plan->righttree = NULL;
+       node->num_workers = nworkers;
+       node->single_copy = single_copy;
+
+       return node;
+}
+
 /*
  * distinctList is a list of SortGroupClauses, identifying the targetlist
  * items that should be considered by the SetOp filter.  The input path must
index 3c8169725a08728e6e0b89ae84fc7346154f6684..b1cede2ef0d20e3c5d984c09797e9c8fc11c70ba 100644 (file)
@@ -607,6 +607,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
                case T_Sort:
                case T_Unique:
                case T_SetOp:
+               case T_Gather:
 
                        /*
                         * These plan types don't actually bother to evaluate their
index d0bc412c8339773c68d5dac53f975cb298951d9c..6b32f85d6c074c0643aee36f338bf22e9a0995ea 100644 (file)
@@ -2584,6 +2584,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
                case T_Material:
                case T_Sort:
                case T_Unique:
+               case T_Gather:
                case T_SetOp:
                case T_Group:
                        break;
index 4336ca1b782fabadd1f96f38e95ab96a96079cbb..1895a6894a37081b6c8278314cb42af113b8fcf0 100644 (file)
@@ -1307,6 +1307,32 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
        return pathnode;
 }
 
+/*
+ * create_gather_path
+ *
+ *       Creates a path corresponding to a gather scan, returning the
+ *       pathnode.
+ */
+GatherPath *
+create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
+                                  Relids required_outer, int nworkers)
+{
+       GatherPath *pathnode = makeNode(GatherPath);
+
+       pathnode->path.pathtype = T_Gather;
+       pathnode->path.parent = rel;
+       pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
+                                                                                                                 required_outer);
+       pathnode->path.pathkeys = NIL;          /* Gather has unordered result */
+
+       pathnode->subpath = subpath;
+       pathnode->num_workers = nworkers;
+
+       cost_gather(pathnode, root, rel, pathnode->path.param_info);
+
+       return pathnode;
+}
+
 /*
  * translate_sub_tlist - get subquery column numbers represented by tlist
  *
index 17053aff68b7d05f7a361566708e2aba46444fc1..7684bff79b15cdc1f4df5d8c121f3cb347615002 100644 (file)
@@ -2534,6 +2534,16 @@ static struct config_int ConfigureNamesInt[] =
                check_autovacuum_max_workers, NULL, NULL
        },
 
+       {
+               {"max_parallel_degree", PGC_SUSET, RESOURCES_ASYNCHRONOUS,
+                       gettext_noop("Sets the maximum number of parallel processes per executor node."),
+                       NULL
+               },
+               &max_parallel_degree,
+               0, 0, MAX_BACKENDS,
+               NULL, NULL, NULL
+       },
+
        {
                {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
                        gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."),
@@ -2711,6 +2721,26 @@ static struct config_real ConfigureNamesReal[] =
                DEFAULT_CPU_OPERATOR_COST, 0, DBL_MAX,
                NULL, NULL, NULL
        },
+       {
+               {"parallel_tuple_cost", PGC_USERSET, QUERY_TUNING_COST,
+                       gettext_noop("Sets the planner's estimate of the cost of "
+                                 "passing each tuple (row) from worker to master backend."),
+                       NULL
+               },
+               &parallel_tuple_cost,
+               DEFAULT_PARALLEL_TUPLE_COST, 0, DBL_MAX,
+               NULL, NULL, NULL
+       },
+       {
+               {"parallel_setup_cost", PGC_USERSET, QUERY_TUNING_COST,
+                       gettext_noop("Sets the planner's estimate of the cost of "
+                                 "starting up worker processes for parallel query."),
+                       NULL
+               },
+               &parallel_setup_cost,
+               DEFAULT_PARALLEL_SETUP_COST, 0, DBL_MAX,
+               NULL, NULL, NULL
+       },
 
        {
                {"cursor_tuple_fraction", PGC_USERSET, QUERY_TUNING_OTHER,
index 8c65287e3094dd6c418b904d61e40cfe0e358bbc..b2adda95958c4fc814d734acccab8e0fa5651bd4 100644 (file)
 
 #effective_io_concurrency = 1          # 1-1000; 0 disables prefetching
 #max_worker_processes = 8
+#max_parallel_degree = 0               # max number of worker processes per node
 
 
 #------------------------------------------------------------------------------
 #cpu_tuple_cost = 0.01                 # same scale as above
 #cpu_index_tuple_cost = 0.005          # same scale as above
 #cpu_operator_cost = 0.0025            # same scale as above
+#parallel_tuple_cost = 0.1             # same scale as above
+#parallel_setup_cost = 1000.0  # same scale as above
 #effective_cache_size = 4GB
 
 # - Genetic Query Optimizer -
index 226f905c3cc8cd096d24155d6e313344519ee9c1..4f77692aa32f65944fc85178e7d286275e2f4e6c 100644 (file)
@@ -225,6 +225,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
+extern bool ExecShutdownNode(PlanState *node);
 
 /*
  * prototypes from functions in execQual.c
diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h
new file mode 100644 (file)
index 0000000..9e5d8fc
--- /dev/null
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeGather.h
+ *             prototypes for nodeGather.c
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeGather.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEGATHER_H
+#define NODEGATHER_H
+
+#include "nodes/execnodes.h"
+
+extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
+extern TupleTableSlot *ExecGather(GatherState *node);
+extern void ExecEndGather(GatherState *node);
+extern void ExecShutdownGather(GatherState *node);
+extern void ExecReScanGather(GatherState *node);
+
+#endif   /* NODEGATHER_H */
index 4ae2f3e067b9317e243f8a7998fd719c3a34f258..b6895f94c397ad0c5342b9e2c48f0fcfd145b5e2 100644 (file)
@@ -1950,6 +1950,22 @@ typedef struct UniqueState
        MemoryContext tempContext;      /* short-term context for comparisons */
 } UniqueState;
 
+/* ----------------
+ * GatherState information
+ *
+ *             Gather nodes launch 1 or more parallel workers, run a subplan
+ *             in those workers, and collect the results.
+ * ----------------
+ */
+typedef struct GatherState
+{
+       PlanState       ps;                             /* its first field is NodeTag */
+       struct ParallelExecutorInfo *pei;
+       struct TupleQueueFunnel *funnel;
+       bool            need_to_scan_workers;
+       bool            need_to_scan_locally;
+} GatherState;
+
 /* ----------------
  *      HashState information
  * ----------------
index 274480e2c9200c61466a095f334422a2f8582313..94bdb7c9af50f3ce761ec39ded1437381b332948 100644 (file)
@@ -74,6 +74,7 @@ typedef enum NodeTag
        T_Agg,
        T_WindowAgg,
        T_Unique,
+       T_Gather,
        T_Hash,
        T_SetOp,
        T_LockRows,
@@ -121,6 +122,7 @@ typedef enum NodeTag
        T_AggState,
        T_WindowAggState,
        T_UniqueState,
+       T_GatherState,
        T_HashState,
        T_SetOpState,
        T_LockRowsState,
@@ -238,6 +240,7 @@ typedef enum NodeTag
        T_ResultPath,
        T_MaterialPath,
        T_UniquePath,
+       T_GatherPath,
        T_EquivalenceClass,
        T_EquivalenceMember,
        T_PathKey,
index 1e2d2bbaa10e5cae6012268126500ac3742a661a..1f9213c09b0862959a95b1a1e839710d80f02ec6 100644 (file)
@@ -753,6 +753,17 @@ typedef struct Unique
        Oid                *uniqOperators;      /* equality operators to compare with */
 } Unique;
 
+/* ------------
+ *             gather node
+ * ------------
+ */
+typedef struct Gather
+{
+       Plan            plan;
+       int                     num_workers;
+       bool            single_copy;
+} Gather;
+
 /* ----------------
  *             hash build node
  *
index 961b5d17cfb337df55e6a9f30d7094aa94dea512..6cf2e24ce7d30e06cb759d32f64962723ef28dd5 100644 (file)
@@ -1046,6 +1046,19 @@ typedef struct UniquePath
        List       *uniq_exprs;         /* expressions to be made unique */
 } UniquePath;
 
+/*
+ * GatherPath runs several copies of a plan in parallel and collects the
+ * results.  The parallel leader may also execute the plan, unless the
+ * single_copy flag is set.
+ */
+typedef struct GatherPath
+{
+       Path            path;
+       Path       *subpath;            /* path for each worker */
+       int                     num_workers;    /* number of workers sought to help */
+       bool            single_copy;    /* path must not be executed >1x */
+} GatherPath;
+
 /*
  * All join-type paths share these fields.
  */
index dd43e45d0c0a5b6c98f54c654f06b0c08ed7bff9..25a730362a845bbe74b6cb1a2f3ae271daf4f2e0 100644 (file)
@@ -26,6 +26,8 @@
 #define DEFAULT_CPU_TUPLE_COST 0.01
 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005
 #define DEFAULT_CPU_OPERATOR_COST  0.0025
+#define DEFAULT_PARALLEL_TUPLE_COST 0.1
+#define DEFAULT_PARALLEL_SETUP_COST  1000.0
 
 #define DEFAULT_EFFECTIVE_CACHE_SIZE  524288   /* measured in pages */
 
@@ -48,8 +50,11 @@ extern PGDLLIMPORT double random_page_cost;
 extern PGDLLIMPORT double cpu_tuple_cost;
 extern PGDLLIMPORT double cpu_index_tuple_cost;
 extern PGDLLIMPORT double cpu_operator_cost;
+extern PGDLLIMPORT double parallel_tuple_cost;
+extern PGDLLIMPORT double parallel_setup_cost;
 extern PGDLLIMPORT int effective_cache_size;
 extern Cost disable_cost;
+extern int     max_parallel_degree;
 extern bool enable_seqscan;
 extern bool enable_indexscan;
 extern bool enable_indexonlyscan;
@@ -144,6 +149,8 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path,
                                        JoinCostWorkspace *workspace,
                                        SpecialJoinInfo *sjinfo,
                                        SemiAntiJoinFactors *semifactors);
+extern void cost_gather(GatherPath *path, PlannerInfo *root,
+                       RelOptInfo *baserel, ParamPathInfo *param_info);
 extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan);
 extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root);
 extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root);
index 161644c343b6d689b36a4241384b03cdc6c22c04..7a4940c7d20bf77a7a21f79bdda096c67d1e8340 100644 (file)
@@ -69,6 +69,9 @@ extern ResultPath *create_result_path(List *quals);
 extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath);
 extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
                                   Path *subpath, SpecialJoinInfo *sjinfo);
+extern GatherPath *create_gather_path(PlannerInfo *root,
+                                  RelOptInfo *rel, Path *subpath, Relids required_outer,
+                                  int nworkers);
 extern Path *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
                                                 List *pathkeys, Relids required_outer);
 extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
index 0e149ea2f4dfcd63aa0c51f55d241fab55da1943..feb821b409b85a46a54c901eccadc47e25d90aca 100644 (file)
@@ -707,6 +707,9 @@ FunctionParameterMode
 FunctionScan
 FunctionScanPerFuncState
 FunctionScanState
+Gather
+GatherPath
+GatherState
 FuzzyAttrMatchState
 GBT_NUMKEY
 GBT_NUMKEY_R
@@ -1195,6 +1198,7 @@ OverrideSearchPath
 OverrideStackEntry
 PACE_HEADER
 PACL
+ParallelExecutorInfo
 PATH
 PBOOL
 PCtxtHandle