</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="guc-max-parallel-degree" xreflabel="max_parallel_degree">
+ <term><varname>max_parallel_degree</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_parallel_degree</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum degree of parallelism for an individual parallel
+ operation. Note that the requested number of workers may not actually
+ be available at runtime. Parallel workers are taken from the pool
+ of processes established by <xref linkend="guc-max-worker-processes">.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</sect2>
</sect1>
</listitem>
</varlistentry>
+ <varlistentry id="parallel-tuple-cost" xreflabel="parallel_tuple_cost">
+ <term><varname>parallel_tuple_cost</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>parallel_tuple_cost</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the planner's estimate of the cost of transferring a tuple
+ from a parallel worker process to another process.
+ The default is 0.1.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="parallel-setup-cost" xreflabel="parallel_setup_cost">
+ <term><varname>parallel_setup_cost</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>parallel_setup_cost</> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the planner's estimate of the cost of launching parallel worker
+ processes.
+ The default is 1000.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-effective-cache-size" xreflabel="effective_cache_size">
<term><varname>effective_cache_size</varname> (<type>integer</type>)
<indexterm>
case T_SampleScan:
pname = sname = "Sample Scan";
break;
+ case T_Gather:
+ pname = sname = "Gather";
+ break;
case T_IndexScan:
pname = sname = "Index Scan";
break;
show_instrumentation_count("Rows Removed by Filter", 1,
planstate, es);
break;
+ case T_Gather:
+ {
+ Gather *gather = (Gather *) plan;
+
+ show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
+ if (plan->qual)
+ show_instrumentation_count("Rows Removed by Filter", 1,
+ planstate, es);
+ ExplainPropertyInteger("Number of Workers",
+ gather->num_workers, es);
+ if (gather->single_copy)
+ ExplainPropertyText("Single Copy",
+ gather->single_copy ? "true" : "false",
+ es);
+ }
+ break;
case T_FunctionScan:
if (es->verbose)
{
execScan.o execTuples.o \
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
nodeBitmapAnd.o nodeBitmapOr.o \
- nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \
- nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
+ nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \
+ nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
nodeLimit.o nodeLockRows.o \
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeFunctionscan.h"
+#include "executor/nodeGather.h"
#include "executor/nodeGroup.h"
#include "executor/nodeGroup.h"
#include "executor/nodeHash.h"
ExecReScanSampleScan((SampleScanState *) node);
break;
+ case T_GatherState:
+ ExecReScanGather((GatherState *) node);
+ break;
+
case T_IndexScanState:
ExecReScanIndexScan((IndexScanState *) node);
break;
/* Simplify life for tablesample methods by disallowing this */
return false;
+ case T_Gather:
+ return false;
+
case T_IndexScan:
return IndexSupportsBackwardScan(((IndexScan *) node)->indexid) &&
TargetListSupportsBackwardScan(node->targetlist);
direction,
dest);
+ /* Allow nodes to release or shut down resources. */
+ (void) ExecShutdownNode(queryDesc->planstate);
+
/*
* shutdown tuple receiver, if we started it
*/
} ExecParallelInitializeDSMContext;
/* Helper functions that run in the parallel leader. */
-static char *ExecSerializePlan(Plan *plan, List *rangetable);
+static char *ExecSerializePlan(Plan *plan, EState *estate);
static bool ExecParallelEstimate(PlanState *node,
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
* Create a serialized representation of the plan to be sent to each worker.
*/
static char *
-ExecSerializePlan(Plan *plan, List *rangetable)
+ExecSerializePlan(Plan *plan, EState *estate)
{
PlannedStmt *pstmt;
ListCell *tlist;
pstmt->canSetTag = 1;
pstmt->transientPlan = 0;
pstmt->planTree = plan;
- pstmt->rtable = rangetable;
+ pstmt->rtable = estate->es_range_table;
pstmt->resultRelations = NIL;
pstmt->utilityStmt = NULL;
pstmt->subplans = NIL;
pstmt->rewindPlanIDs = NULL;
pstmt->rowMarks = NIL;
- pstmt->nParamExec = 0;
+ pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
pstmt->relationOids = NIL;
pstmt->invalItems = NIL; /* workers can't replan anyway... */
pstmt->hasRowSecurity = false;
pei->planstate = planstate;
/* Fix up and serialize plan to be sent to workers. */
- pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table);
+ pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */
pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
ExecutorStart(queryDesc, 0);
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
ExecutorFinish(queryDesc);
- ExecutorEnd(queryDesc);
/* Report buffer usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
ExecParallelReportInstrumentation(queryDesc->planstate,
instrumentation);
+ /* Must do this after capturing instrumentation. */
+ ExecutorEnd(queryDesc);
+
/* Cleanup. */
FreeQueryDesc(queryDesc);
(*receiver->rDestroy) (receiver);
#include "executor/nodeMergejoin.h"
#include "executor/nodeModifyTable.h"
#include "executor/nodeNestloop.h"
+#include "executor/nodeGather.h"
#include "executor/nodeRecursiveunion.h"
#include "executor/nodeResult.h"
#include "executor/nodeSamplescan.h"
#include "executor/nodeValuesscan.h"
#include "executor/nodeWindowAgg.h"
#include "executor/nodeWorktablescan.h"
+#include "nodes/nodeFuncs.h"
#include "miscadmin.h"
estate, eflags);
break;
+ case T_Gather:
+ result = (PlanState *) ExecInitGather((Gather *) node,
+ estate, eflags);
+ break;
+
case T_Hash:
result = (PlanState *) ExecInitHash((Hash *) node,
estate, eflags);
result = ExecUnique((UniqueState *) node);
break;
+ case T_GatherState:
+ result = ExecGather((GatherState *) node);
+ break;
+
case T_HashState:
result = ExecHash((HashState *) node);
break;
ExecEndSampleScan((SampleScanState *) node);
break;
+ case T_GatherState:
+ ExecEndGather((GatherState *) node);
+ break;
+
case T_IndexScanState:
ExecEndIndexScan((IndexScanState *) node);
break;
break;
}
}
+
+/*
+ * ExecShutdownNode
+ *
+ * Give execution nodes a chance to stop asynchronous resource consumption
+ * and release any resources still held. Currently, this is only used for
+ * parallel query, but we might want to extend it to other cases also (e.g.
+ * FDW). We might also want to call it sooner, as soon as it's evident that
+ * no more rows will be needed (e.g. when a Limit is filled) rather than only
+ * at the end of ExecutorRun.
+ */
+bool
+ExecShutdownNode(PlanState *node)
+{
+ if (node == NULL)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_GatherState:
+ {
+ ExecShutdownGather((GatherState *) node);
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(node, ExecShutdownNode, NULL);
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * nodeGather.c
+ * Support routines for scanning a plan via multiple workers.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/executor/nodeGather.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relscan.h"
+#include "executor/execdebug.h"
+#include "executor/execParallel.h"
+#include "executor/nodeGather.h"
+#include "executor/nodeSubplan.h"
+#include "executor/tqueue.h"
+#include "utils/rel.h"
+
+
+static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+
+
+/* ----------------------------------------------------------------
+ * ExecInitGather
+ * ----------------------------------------------------------------
+ */
+GatherState *
+ExecInitGather(Gather *node, EState *estate, int eflags)
+{
+ GatherState *gatherstate;
+
+ /* Gather node doesn't have innerPlan node. */
+ Assert(innerPlan(node) == NULL);
+
+ /*
+ * create state structure
+ */
+ gatherstate = makeNode(GatherState);
+ gatherstate->ps.plan = (Plan *) node;
+ gatherstate->ps.state = estate;
+ gatherstate->need_to_scan_workers = false;
+ gatherstate->need_to_scan_locally = !node->single_copy;
+
+ /*
+ * Miscellaneous initialization
+ *
+ * create expression context for node
+ */
+ ExecAssignExprContext(estate, &gatherstate->ps);
+
+ /*
+ * initialize child expressions
+ */
+ gatherstate->ps.targetlist = (List *)
+ ExecInitExpr((Expr *) node->plan.targetlist,
+ (PlanState *) gatherstate);
+ gatherstate->ps.qual = (List *)
+ ExecInitExpr((Expr *) node->plan.qual,
+ (PlanState *) gatherstate);
+
+ /*
+ * tuple table initialization
+ */
+ ExecInitResultTupleSlot(estate, &gatherstate->ps);
+
+ /*
+ * now initialize outer plan
+ */
+ outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+
+ gatherstate->ps.ps_TupFromTlist = false;
+
+ /*
+ * Initialize result tuple type and projection info.
+ */
+ ExecAssignResultTypeFromTL(&gatherstate->ps);
+ ExecAssignProjectionInfo(&gatherstate->ps, NULL);
+
+ return gatherstate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecGather(node)
+ *
+ * Scans the relation via multiple workers and returns
+ * the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGather(GatherState *node)
+{
+ int i;
+ TupleTableSlot *slot;
+
+ /*
+ * Initialize the parallel context and workers on first execution. We do
+ * this on first execution rather than during node initialization, as it
+ * needs to allocate large dynamic segement, so it is better to do if it
+ * is really needed.
+ */
+ if (!node->pei)
+ {
+ EState *estate = node->ps.state;
+
+ /* Initialize the workers required to execute Gather node. */
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ ((Gather *) (node->ps.plan))->num_workers);
+
+ /*
+ * Register backend workers. If the required number of workers are not
+ * available then we perform the scan with available workers and if
+ * there are no more workers available, then the Gather node will just
+ * scan locally.
+ */
+ LaunchParallelWorkers(node->pei->pcxt);
+
+ node->funnel = CreateTupleQueueFunnel();
+
+ for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+ {
+ if (node->pei->pcxt->worker[i].bgwhandle)
+ {
+ shm_mq_set_handle(node->pei->tqueue[i],
+ node->pei->pcxt->worker[i].bgwhandle);
+ RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]);
+ node->need_to_scan_workers = true;
+ }
+ }
+
+ /* If no workers are available, we must always scan locally. */
+ if (!node->need_to_scan_workers)
+ node->need_to_scan_locally = true;
+ }
+
+ slot = gather_getnext(node);
+
+ if (TupIsNull(slot))
+ {
+ /*
+ * Destroy the parallel context once we complete fetching all the
+ * tuples. Otherwise, the DSM and workers will stick around for the
+ * lifetime of the entire statement.
+ */
+ ExecShutdownGather(node);
+ }
+ return slot;
+}
+
+/* ----------------------------------------------------------------
+ * ExecEndGather
+ *
+ * frees any storage allocated through C routines.
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndGather(GatherState *node)
+{
+ ExecShutdownGather(node);
+ ExecFreeExprContext(&node->ps);
+ ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ ExecEndNode(outerPlanState(node));
+}
+
+/*
+ * gather_getnext
+ *
+ * Get the next tuple from shared memory queue. This function
+ * is reponsible for fetching tuples from all the queues associated
+ * with worker backends used in Gather node execution and if there is
+ * no data available from queues or no worker is available, it does
+ * fetch the data from local node.
+ */
+TupleTableSlot *
+gather_getnext(GatherState *gatherstate)
+{
+ PlanState *outerPlan;
+ TupleTableSlot *outerTupleSlot;
+ TupleTableSlot *slot;
+ HeapTuple tup;
+
+ /*
+ * We can use projection info of Gather for the tuples received from
+ * worker backends as currently for all cases worker backends sends the
+ * projected tuple as required by Gather node.
+ */
+ slot = gatherstate->ps.ps_ProjInfo->pi_slot;
+
+ while (gatherstate->need_to_scan_workers ||
+ gatherstate->need_to_scan_locally)
+ {
+ if (gatherstate->need_to_scan_workers)
+ {
+ bool done = false;
+
+ /* wait only if local scan is done */
+ tup = TupleQueueFunnelNext(gatherstate->funnel,
+ gatherstate->need_to_scan_locally,
+ &done);
+ if (done)
+ gatherstate->need_to_scan_workers = false;
+
+ if (HeapTupleIsValid(tup))
+ {
+ ExecStoreTuple(tup, /* tuple to store */
+ slot, /* slot to store in */
+ InvalidBuffer, /* buffer associated with this
+ * tuple */
+ true); /* pfree this pointer if not from heap */
+
+ return slot;
+ }
+ }
+
+ if (gatherstate->need_to_scan_locally)
+ {
+ outerPlan = outerPlanState(gatherstate);
+
+ outerTupleSlot = ExecProcNode(outerPlan);
+
+ if (!TupIsNull(outerTupleSlot))
+ return outerTupleSlot;
+
+ gatherstate->need_to_scan_locally = false;
+ }
+ }
+
+ return ExecClearTuple(slot);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGather
+ *
+ * Destroy the setup for parallel workers. Collect all the
+ * stats after workers are stopped, else some work done by
+ * workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+ Gather *gather;
+
+ if (node->pei == NULL || node->pei->pcxt == NULL)
+ return;
+
+ /*
+ * Ensure all workers have finished before destroying the parallel context
+ * to ensure a clean exit.
+ */
+ if (node->funnel)
+ {
+ DestroyTupleQueueFunnel(node->funnel);
+ node->funnel = NULL;
+ }
+
+ ExecParallelFinish(node->pei);
+
+ /* destroy parallel context. */
+ DestroyParallelContext(node->pei->pcxt);
+ node->pei->pcxt = NULL;
+
+ gather = (Gather *) node->ps.plan;
+ node->need_to_scan_locally = !gather->single_copy;
+ node->need_to_scan_workers = false;
+}
+
+/* ----------------------------------------------------------------
+ * Join Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecReScanGather
+ *
+ * Re-initialize the workers and rescans a relation via them.
+ * ----------------------------------------------------------------
+ */
+void
+ExecReScanGather(GatherState *node)
+{
+ /*
+ * Re-initialize the parallel context and workers to perform rescan of
+ * relation. We want to gracefully shutdown all the workers so that they
+ * should be able to propagate any error or other information to master
+ * backend before dying.
+ */
+ ExecShutdownGather(node);
+
+ ExecReScan(node->ps.lefttree);
+}
return newnode;
}
+/*
+ * _copyGather
+ */
+static Gather *
+_copyGather(const Gather *from)
+{
+ Gather *newnode = makeNode(Gather);
+
+ /*
+ * copy node superclass fields
+ */
+ CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+ /*
+ * copy remainder of node
+ */
+ COPY_SCALAR_FIELD(num_workers);
+ COPY_SCALAR_FIELD(single_copy);
+
+ return newnode;
+}
+
/*
* CopyScanFields
case T_Scan:
retval = _copyScan(from);
break;
+ case T_Gather:
+ retval = _copyGather(from);
+ break;
case T_SeqScan:
retval = _copySeqScan(from);
break;
WRITE_NODE_FIELD(bitmapplans);
}
+static void
+_outGather(StringInfo str, const Gather *node)
+{
+ WRITE_NODE_TYPE("GATHER");
+
+ _outPlanInfo(str, (const Plan *) node);
+
+ WRITE_UINT_FIELD(num_workers);
+ WRITE_UINT_FIELD(single_copy);
+}
+
static void
_outScan(StringInfo str, const Scan *node)
{
case T_BitmapOr:
_outBitmapOr(str, obj);
break;
+ case T_Gather:
+ _outGather(str, obj);
+ break;
case T_Scan:
_outScan(str, obj);
break;
* cpu_tuple_cost Cost of typical CPU time to process a tuple
* cpu_index_tuple_cost Cost of typical CPU time to process an index tuple
* cpu_operator_cost Cost of CPU time to execute an operator or function
+ * parallel_tuple_cost Cost of CPU time to pass a tuple from worker to master backend
+ * parallel_setup_cost Cost of setting up shared memory for parallelism
*
* We expect that the kernel will typically do some amount of read-ahead
* optimization; this in conjunction with seek costs means that seq_page_cost
double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST;
double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST;
double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST;
+double parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST;
+double parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST;
int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE;
Cost disable_cost = 1.0e10;
+int max_parallel_degree = 0;
+
bool enable_seqscan = true;
bool enable_indexscan = true;
bool enable_indexonlyscan = true;
path->total_cost = startup_cost + run_cost;
}
+/*
+ * cost_gather
+ * Determines and returns the cost of gather path.
+ *
+ * 'rel' is the relation to be operated upon
+ * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL
+ */
+void
+cost_gather(GatherPath *path, PlannerInfo *root,
+ RelOptInfo *rel, ParamPathInfo *param_info)
+{
+ Cost startup_cost = 0;
+ Cost run_cost = 0;
+
+ /* Mark the path with the correct row estimate */
+ if (param_info)
+ path->path.rows = param_info->ppi_rows;
+ else
+ path->path.rows = rel->rows;
+
+ startup_cost = path->subpath->startup_cost;
+
+ run_cost = path->subpath->total_cost - path->subpath->startup_cost;
+
+ /* Parallel setup and communication cost. */
+ startup_cost += parallel_setup_cost;
+ run_cost += parallel_tuple_cost * rel->tuples;
+
+ path->path.startup_cost = startup_cost;
+ path->path.total_cost = (startup_cost + run_cost);
+}
+
/*
* cost_index
* Determines and returns the cost of scanning a relation using an index.
List *tlist, List *scan_clauses);
static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path,
List *tlist, List *scan_clauses);
+static Gather *create_gather_plan(PlannerInfo *root,
+ GatherPath *best_path);
static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path,
List *tlist, List *scan_clauses, bool indexonly);
static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid);
static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid,
TableSampleClause *tsc);
+static Gather *make_gather(List *qptlist, List *qpqual,
+ int nworkers, bool single_copy, Plan *subplan);
static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid,
Oid indexid, List *indexqual, List *indexqualorig,
List *indexorderby, List *indexorderbyorig,
plan = create_unique_plan(root,
(UniquePath *) best_path);
break;
+ case T_Gather:
+ plan = (Plan *) create_gather_plan(root,
+ (GatherPath *) best_path);
+ break;
default:
elog(ERROR, "unrecognized node type: %d",
(int) best_path->pathtype);
return plan;
}
+/*
+ * create_gather_plan
+ *
+ * Create a Gather plan for 'best_path' and (recursively) plans
+ * for its subpaths.
+ */
+static Gather *
+create_gather_plan(PlannerInfo *root, GatherPath *best_path)
+{
+ Gather *gather_plan;
+ Plan *subplan;
+
+ subplan = create_plan_recurse(root, best_path->subpath);
+
+ gather_plan = make_gather(subplan->targetlist,
+ NIL,
+ best_path->num_workers,
+ best_path->single_copy,
+ subplan);
+
+ copy_path_costsize(&gather_plan->plan, &best_path->path);
+
+ /* use parallel mode for parallel plans. */
+ root->glob->parallelModeNeeded = true;
+
+ return gather_plan;
+}
+
/*****************************************************************************
*
return node;
}
+static Gather *
+make_gather(List *qptlist,
+ List *qpqual,
+ int nworkers,
+ bool single_copy,
+ Plan *subplan)
+{
+ Gather *node = makeNode(Gather);
+ Plan *plan = &node->plan;
+
+ /* cost should be inserted by caller */
+ plan->targetlist = qptlist;
+ plan->qual = qpqual;
+ plan->lefttree = subplan;
+ plan->righttree = NULL;
+ node->num_workers = nworkers;
+ node->single_copy = single_copy;
+
+ return node;
+}
+
/*
* distinctList is a list of SortGroupClauses, identifying the targetlist
* items that should be considered by the SetOp filter. The input path must
case T_Sort:
case T_Unique:
case T_SetOp:
+ case T_Gather:
/*
* These plan types don't actually bother to evaluate their
case T_Material:
case T_Sort:
case T_Unique:
+ case T_Gather:
case T_SetOp:
case T_Group:
break;
return pathnode;
}
+/*
+ * create_gather_path
+ *
+ * Creates a path corresponding to a gather scan, returning the
+ * pathnode.
+ */
+GatherPath *
+create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
+ Relids required_outer, int nworkers)
+{
+ GatherPath *pathnode = makeNode(GatherPath);
+
+ pathnode->path.pathtype = T_Gather;
+ pathnode->path.parent = rel;
+ pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
+ required_outer);
+ pathnode->path.pathkeys = NIL; /* Gather has unordered result */
+
+ pathnode->subpath = subpath;
+ pathnode->num_workers = nworkers;
+
+ cost_gather(pathnode, root, rel, pathnode->path.param_info);
+
+ return pathnode;
+}
+
/*
* translate_sub_tlist - get subquery column numbers represented by tlist
*
check_autovacuum_max_workers, NULL, NULL
},
+ {
+ {"max_parallel_degree", PGC_SUSET, RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Sets the maximum number of parallel processes per executor node."),
+ NULL
+ },
+ &max_parallel_degree,
+ 0, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
+
{
{"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."),
DEFAULT_CPU_OPERATOR_COST, 0, DBL_MAX,
NULL, NULL, NULL
},
+ {
+ {"parallel_tuple_cost", PGC_USERSET, QUERY_TUNING_COST,
+ gettext_noop("Sets the planner's estimate of the cost of "
+ "passing each tuple (row) from worker to master backend."),
+ NULL
+ },
+ ¶llel_tuple_cost,
+ DEFAULT_PARALLEL_TUPLE_COST, 0, DBL_MAX,
+ NULL, NULL, NULL
+ },
+ {
+ {"parallel_setup_cost", PGC_USERSET, QUERY_TUNING_COST,
+ gettext_noop("Sets the planner's estimate of the cost of "
+ "starting up worker processes for parallel query."),
+ NULL
+ },
+ ¶llel_setup_cost,
+ DEFAULT_PARALLEL_SETUP_COST, 0, DBL_MAX,
+ NULL, NULL, NULL
+ },
{
{"cursor_tuple_fraction", PGC_USERSET, QUERY_TUNING_OTHER,
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#max_worker_processes = 8
+#max_parallel_degree = 0 # max number of worker processes per node
#------------------------------------------------------------------------------
#cpu_tuple_cost = 0.01 # same scale as above
#cpu_index_tuple_cost = 0.005 # same scale as above
#cpu_operator_cost = 0.0025 # same scale as above
+#parallel_tuple_cost = 0.1 # same scale as above
+#parallel_setup_cost = 1000.0 # same scale as above
#effective_cache_size = 4GB
# - Genetic Query Optimizer -
extern TupleTableSlot *ExecProcNode(PlanState *node);
extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
+extern bool ExecShutdownNode(PlanState *node);
/*
* prototypes from functions in execQual.c
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * nodeGather.h
+ * prototypes for nodeGather.c
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeGather.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEGATHER_H
+#define NODEGATHER_H
+
+#include "nodes/execnodes.h"
+
+extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags);
+extern TupleTableSlot *ExecGather(GatherState *node);
+extern void ExecEndGather(GatherState *node);
+extern void ExecShutdownGather(GatherState *node);
+extern void ExecReScanGather(GatherState *node);
+
+#endif /* NODEGATHER_H */
MemoryContext tempContext; /* short-term context for comparisons */
} UniqueState;
+/* ----------------
+ * GatherState information
+ *
+ * Gather nodes launch 1 or more parallel workers, run a subplan
+ * in those workers, and collect the results.
+ * ----------------
+ */
+typedef struct GatherState
+{
+ PlanState ps; /* its first field is NodeTag */
+ struct ParallelExecutorInfo *pei;
+ struct TupleQueueFunnel *funnel;
+ bool need_to_scan_workers;
+ bool need_to_scan_locally;
+} GatherState;
+
/* ----------------
* HashState information
* ----------------
T_Agg,
T_WindowAgg,
T_Unique,
+ T_Gather,
T_Hash,
T_SetOp,
T_LockRows,
T_AggState,
T_WindowAggState,
T_UniqueState,
+ T_GatherState,
T_HashState,
T_SetOpState,
T_LockRowsState,
T_ResultPath,
T_MaterialPath,
T_UniquePath,
+ T_GatherPath,
T_EquivalenceClass,
T_EquivalenceMember,
T_PathKey,
Oid *uniqOperators; /* equality operators to compare with */
} Unique;
+/* ------------
+ * gather node
+ * ------------
+ */
+typedef struct Gather
+{
+ Plan plan;
+ int num_workers;
+ bool single_copy;
+} Gather;
+
/* ----------------
* hash build node
*
List *uniq_exprs; /* expressions to be made unique */
} UniquePath;
+/*
+ * GatherPath runs several copies of a plan in parallel and collects the
+ * results. The parallel leader may also execute the plan, unless the
+ * single_copy flag is set.
+ */
+typedef struct GatherPath
+{
+ Path path;
+ Path *subpath; /* path for each worker */
+ int num_workers; /* number of workers sought to help */
+ bool single_copy; /* path must not be executed >1x */
+} GatherPath;
+
/*
* All join-type paths share these fields.
*/
#define DEFAULT_CPU_TUPLE_COST 0.01
#define DEFAULT_CPU_INDEX_TUPLE_COST 0.005
#define DEFAULT_CPU_OPERATOR_COST 0.0025
+#define DEFAULT_PARALLEL_TUPLE_COST 0.1
+#define DEFAULT_PARALLEL_SETUP_COST 1000.0
#define DEFAULT_EFFECTIVE_CACHE_SIZE 524288 /* measured in pages */
extern PGDLLIMPORT double cpu_tuple_cost;
extern PGDLLIMPORT double cpu_index_tuple_cost;
extern PGDLLIMPORT double cpu_operator_cost;
+extern PGDLLIMPORT double parallel_tuple_cost;
+extern PGDLLIMPORT double parallel_setup_cost;
extern PGDLLIMPORT int effective_cache_size;
extern Cost disable_cost;
+extern int max_parallel_degree;
extern bool enable_seqscan;
extern bool enable_indexscan;
extern bool enable_indexonlyscan;
JoinCostWorkspace *workspace,
SpecialJoinInfo *sjinfo,
SemiAntiJoinFactors *semifactors);
+extern void cost_gather(GatherPath *path, PlannerInfo *root,
+ RelOptInfo *baserel, ParamPathInfo *param_info);
extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan);
extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root);
extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root);
extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath);
extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
Path *subpath, SpecialJoinInfo *sjinfo);
+extern GatherPath *create_gather_path(PlannerInfo *root,
+ RelOptInfo *rel, Path *subpath, Relids required_outer,
+ int nworkers);
extern Path *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel,
List *pathkeys, Relids required_outer);
extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel,
FunctionScan
FunctionScanPerFuncState
FunctionScanState
+Gather
+GatherPath
+GatherState
FuzzyAttrMatchState
GBT_NUMKEY
GBT_NUMKEY_R
OverrideStackEntry
PACE_HEADER
PACL
+ParallelExecutorInfo
PATH
PBOOL
PCtxtHandle