#include "executor/nodeGather.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
+#include "utils/memutils.h"
#include "utils/rel.h"
ExecInitGather(Gather *node, EState *estate, int eflags)
{
GatherState *gatherstate;
+ Plan *outerNode;
+ bool hasoid;
+ TupleDesc tupDesc;
/* Gather node doesn't have innerPlan node. */
Assert(innerPlan(node) == NULL);
/*
* tuple table initialization
*/
+ gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
ExecInitResultTupleSlot(estate, &gatherstate->ps);
/*
* now initialize outer plan
*/
- outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
-
+ outerNode = outerPlan(node);
+ outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
gatherstate->ps.ps_TupFromTlist = false;
ExecAssignResultTypeFromTL(&gatherstate->ps);
ExecAssignProjectionInfo(&gatherstate->ps, NULL);
+ /*
+ * Initialize funnel slot to same tuple descriptor as outer plan.
+ */
+ if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
+ hasoid = false;
+ tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
+ ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
+
return gatherstate;
}
{
int i;
TupleTableSlot *slot;
+ TupleTableSlot *resultSlot;
+ ExprDoneCond isDone;
+ ExprContext *econtext;
/*
* Initialize the parallel context and workers on first execution. We do
node->initialized = true;
}
- slot = gather_getnext(node);
+ /*
+ * Check to see if we're still projecting out tuples from a previous scan
+ * tuple (because there is a function-returning-set in the projection
+ * expressions). If so, try to project another one.
+ */
+ if (node->ps.ps_TupFromTlist)
+ {
+ resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+ if (isDone == ExprMultipleResult)
+ return resultSlot;
+ /* Done with that source tuple... */
+ node->ps.ps_TupFromTlist = false;
+ }
+
+ /*
+ * Reset per-tuple memory context to free any expression evaluation
+ * storage allocated in the previous tuple cycle. Note we can't do this
+ * until we're done projecting.
+ */
+ econtext = node->ps.ps_ExprContext;
+ ResetExprContext(econtext);
+
+ /* Get and return the next tuple, projecting if necessary. */
+ for (;;)
+ {
+ /*
+ * Get next tuple, either from one of our workers, or by running the
+ * plan ourselves.
+ */
+ slot = gather_getnext(node);
+ if (TupIsNull(slot))
+ return NULL;
+
+ /*
+ * form the result tuple using ExecProject(), and return it --- unless
+ * the projection produces an empty set, in which case we must loop
+ * back around for another tuple
+ */
+ econtext->ecxt_outertuple = slot;
+ resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+
+ if (isDone != ExprEndResult)
+ {
+ node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
+ return resultSlot;
+ }
+ }
return slot;
}
static TupleTableSlot *
gather_getnext(GatherState *gatherstate)
{
- PlanState *outerPlan;
+ PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot;
- TupleTableSlot *slot;
+ TupleTableSlot *fslot = gatherstate->funnel_slot;
HeapTuple tup;
- /*
- * We can use projection info of Gather for the tuples received from
- * worker backends as currently for all cases worker backends sends the
- * projected tuple as required by Gather node.
- */
- slot = gatherstate->ps.ps_ProjInfo->pi_slot;
-
while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
{
if (gatherstate->funnel != NULL)
if (HeapTupleIsValid(tup))
{
ExecStoreTuple(tup, /* tuple to store */
- slot, /* slot to store in */
+ fslot, /* slot in which to store the tuple */
InvalidBuffer, /* buffer associated with this
* tuple */
true); /* pfree this pointer if not from heap */
- return slot;
+ return fslot;
}
}
if (gatherstate->need_to_scan_locally)
{
- outerPlan = outerPlanState(gatherstate);
-
outerTupleSlot = ExecProcNode(outerPlan);
if (!TupIsNull(outerTupleSlot))
}
}
- return ExecClearTuple(slot);
+ return ExecClearTuple(fslot);
}
/* ----------------------------------------------------------------