Update parallel executor support to reuse the same DSM.
authorRobert Haas <[email protected]>
Fri, 30 Oct 2015 09:43:00 +0000 (10:43 +0100)
committerRobert Haas <[email protected]>
Fri, 30 Oct 2015 09:44:54 +0000 (10:44 +0100)
Commit b0b0d84b3d663a148022e900ebfc164284a95f55 purported to make it
possible to relaunch workers using the same parallel context, but it had
an unpleasant race condition: we might reinitialize after the workers
have sent their last control message but before they have dettached the
DSM, leaving to crashes.  Repair by introducing a new ParallelContext
operation, ReinitializeParallelDSM.

Adjust execParallel.c to use this new support, so that we can rescan a
Gather node by relaunching workers but without needing to recreate the
DSM.

Amit Kapila, with some adjustments by me.  Extracted from latest parallel
sequential scan patch.

src/backend/access/transam/README.parallel
src/backend/access/transam/parallel.c
src/backend/executor/execParallel.c
src/backend/executor/nodeGather.c
src/include/access/parallel.h
src/include/executor/execParallel.h

index dfcbafabf0831563c027ad0e143f1abeb9decd62..db9ac3d504de788bf7a0a0729c18cfa5bff5d0c0 100644 (file)
@@ -222,7 +222,9 @@ pattern looks like this:
 
    ExitParallelMode();
 
-If desired, after WaitForParallelWorkersToFinish() has been called, another
-call to LaunchParallelWorkers() can be made using the same parallel context.
-Calls to these two functions can be alternated any number of times before
-destroying the parallel context.
+If desired, after WaitForParallelWorkersToFinish() has been called, the
+context can be reset so that workers can be launched anew using the same
+parallel context.  To do this, first call ReinitializeParallelDSM() to
+reinitialize state managed by the parallel context machinery itself; then,
+perform any other necessary resetting of state; after that, you can again
+call LaunchParallelWorkers.
index 35a873de6bac4aa5425ce173fed1eaa5e64d1bb1..79cc9880bbbacc175871c8a14827799d3615e002 100644 (file)
@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
 static void ParallelErrorContext(void *arg);
 static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
 static void ParallelWorkerMain(Datum main_arg);
+static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
 
 /*
  * Establish a new parallel context.  This should be done after entering
@@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
    MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * Reinitialize the dynamic shared memory segment for a parallel context such
+ * that we could launch workers for it again.
+ */
+void
+ReinitializeParallelDSM(ParallelContext *pcxt)
+{
+   FixedParallelState *fps;
+   char       *error_queue_space;
+   int         i;
+
+   if (pcxt->nworkers_launched == 0)
+       return;
+
+   WaitForParallelWorkersToFinish(pcxt);
+   WaitForParallelWorkersToExit(pcxt);
+
+   /* Reset a few bits of fixed parallel state to a clean state. */
+   fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+   fps->workers_attached = 0;
+   fps->last_xlog_end = 0;
+
+   /* Recreate error queues. */
+   error_queue_space =
+       shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+   for (i = 0; i < pcxt->nworkers; ++i)
+   {
+       char       *start;
+       shm_mq     *mq;
+
+       start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+       mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+       shm_mq_set_receiver(mq, MyProc);
+       pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+   }
+
+   /* Reset number of workers launched. */
+   pcxt->nworkers_launched = 0;
+}
+
 /*
  * Launch parallel workers.
  */
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
    /* We might be running in a short-lived memory context. */
    oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 
-   /*
-    * This function can be called for a parallel context for which it has
-    * already been called previously, but only if all of the old workers
-    * have already exited.  When this case arises, we need to do some extra
-    * reinitialization.
-    */
-   if (pcxt->nworkers_launched > 0)
-   {
-       FixedParallelState *fps;
-       char       *error_queue_space;
-
-       /* Clean out old worker handles. */
-       for (i = 0; i < pcxt->nworkers; ++i)
-       {
-           if (pcxt->worker[i].error_mqh != NULL)
-               elog(ERROR, "previously launched worker still alive");
-           if (pcxt->worker[i].bgwhandle != NULL)
-           {
-               pfree(pcxt->worker[i].bgwhandle);
-               pcxt->worker[i].bgwhandle = NULL;
-           }
-       }
-
-       /* Reset a few bits of fixed parallel state to a clean state. */
-       fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
-       fps->workers_attached = 0;
-       fps->last_xlog_end = 0;
-
-       /* Recreate error queues. */
-       error_queue_space =
-           shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
-       for (i = 0; i < pcxt->nworkers; ++i)
-       {
-           char       *start;
-           shm_mq     *mq;
-
-           start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
-           mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
-           shm_mq_set_receiver(mq, MyProc);
-           pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
-       }
-
-       /* Reset number of workers launched. */
-       pcxt->nworkers_launched = 0;
-   }
-
    /* Configure a worker. */
    snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
             MyProcPid);
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
 }
 
 /*
- * Wait for all workers to exit.
+ * Wait for all workers to finish computing.
  *
  * Even if the parallel operation seems to have completed successfully, it's
  * important to call this function afterwards.  We must not miss any errors
@@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
    }
 }
 
+/*
+ * Wait for all workers to exit.
+ *
+ * This function ensures that workers have been completely shutdown.  The
+ * difference between WaitForParallelWorkersToFinish and this function is
+ * that former just ensures that last message sent by worker backend is
+ * received by master backend whereas this ensures the complete shutdown.
+ */
+static void
+WaitForParallelWorkersToExit(ParallelContext *pcxt)
+{
+   int         i;
+
+   /* Wait until the workers actually die. */
+   for (i = 0; i < pcxt->nworkers; ++i)
+   {
+       BgwHandleStatus status;
+
+       if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
+           continue;
+
+       status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+
+       /*
+        * If the postmaster kicked the bucket, we have no chance of cleaning
+        * up safely -- we won't be able to tell when our workers are actually
+        * dead.  This doesn't necessitate a PANIC since they will all abort
+        * eventually, but we can't safely continue this session.
+        */
+       if (status == BGWH_POSTMASTER_DIED)
+           ereport(FATAL,
+                   (errcode(ERRCODE_ADMIN_SHUTDOWN),
+                errmsg("postmaster exited during a parallel transaction")));
+
+       /* Release memory. */
+       pfree(pcxt->worker[i].bgwhandle);
+       pcxt->worker[i].bgwhandle = NULL;
+   }
+}
+
 /*
  * Destroy a parallel context.
  *
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
    {
        for (i = 0; i < pcxt->nworkers; ++i)
        {
-           if (pcxt->worker[i].bgwhandle != NULL)
-               TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
            if (pcxt->worker[i].error_mqh != NULL)
            {
+               TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+
                pfree(pcxt->worker[i].error_mqh);
                pcxt->worker[i].error_mqh = NULL;
            }
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
        pcxt->private_memory = NULL;
    }
 
-   /* Wait until the workers actually die. */
-   for (i = 0; i < pcxt->nworkers; ++i)
-   {
-       BgwHandleStatus status;
-
-       if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
-           continue;
-
-       /*
-        * We can't finish transaction commit or abort until all of the
-        * workers are dead.  This means, in particular, that we can't respond
-        * to interrupts at this stage.
-        */
-       HOLD_INTERRUPTS();
-       status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
-       RESUME_INTERRUPTS();
-
-       /*
-        * If the postmaster kicked the bucket, we have no chance of cleaning
-        * up safely -- we won't be able to tell when our workers are actually
-        * dead.  This doesn't necessitate a PANIC since they will all abort
-        * eventually, but we can't safely continue this session.
-        */
-       if (status == BGWH_POSTMASTER_DIED)
-           ereport(FATAL,
-                   (errcode(ERRCODE_ADMIN_SHUTDOWN),
-                errmsg("postmaster exited during a parallel transaction")));
-
-       /* Release memory. */
-       pfree(pcxt->worker[i].bgwhandle);
-       pcxt->worker[i].bgwhandle = NULL;
-   }
+   /*
+    * We can't finish transaction commit or abort until all of the
+    * workers have exited.  This means, in particular, that we can't respond
+    * to interrupts at this stage.
+    */
+   HOLD_INTERRUPTS();
+   WaitForParallelWorkersToExit(pcxt);
+   RESUME_INTERRUPTS();
 
    /* Free the worker array itself. */
    if (pcxt->worker != NULL)
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 
        case 'X':               /* Terminate, indicating clean exit */
            {
-               pfree(pcxt->worker[i].bgwhandle);
                pfree(pcxt->worker[i].error_mqh);
-               pcxt->worker[i].bgwhandle = NULL;
                pcxt->worker[i].error_mqh = NULL;
                break;
            }
index efcbaef416c7b29d00de0cb5367bc326944e6f0e..99a9de3cdc397fde4dab1623cdb968508ff87445 100644 (file)
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
                     ExecParallelEstimateContext *e);
 static bool ExecParallelInitializeDSM(PlanState *node,
                     ExecParallelInitializeDSMContext *d);
-static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
+                            bool reinitialize);
 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
                          SharedExecutorInstrumentation *instrumentation);
 
@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
  * to the main backend and start the workers.
  */
 static shm_mq_handle **
-ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
 {
    shm_mq_handle **responseq;
    char       *tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
    responseq = (shm_mq_handle **)
        palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
 
-   /* Allocate space from the DSM for the queues themselves. */
-   tqueuespace = shm_toc_allocate(pcxt->toc,
-                                PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+   /*
+    * If not reinitializing, allocate space from the DSM for the queues;
+    * otherwise, find the already allocated space.
+    */
+   if (!reinitialize)
+       tqueuespace =
+           shm_toc_allocate(pcxt->toc,
+                            PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+   else
+       tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
 
    /* Create the queues, and become the receiver for each. */
    for (i = 0; i < pcxt->nworkers; ++i)
@@ -248,12 +256,23 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
    }
 
    /* Add array of queues to shm_toc, so others can find it. */
-   shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+   if (!reinitialize)
+       shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
 
    /* Return array of handles. */
    return responseq;
 }
 
+/*
+ * Re-initialize the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+shm_mq_handle **
+ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+{
+   return ExecParallelSetupTupleQueues(pcxt, true);
+}
+
 /*
  * Sets up the required infrastructure for backend workers to perform
  * execution and return results to the main backend.
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
    pei->buffer_usage = bufusage_space;
 
    /* Set up tuple queues. */
-   pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+   pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
    /*
     * If instrumentation options were supplied, allocate space for the
index 9c1533e311341f04b295785a6b9691f69c0b881b..5f589614dc2a56c6c2009ff3745369f61e7cac76 100644 (file)
@@ -41,6 +41,7 @@
 
 
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static void ExecShutdownGatherWorkers(GatherState *node);
 
 
 /* ----------------------------------------------------------------
@@ -150,9 +151,10 @@ ExecGather(GatherState *node)
            bool    got_any_worker = false;
 
            /* Initialize the workers required to execute Gather node. */
-           node->pei = ExecInitParallelPlan(node->ps.lefttree,
-                                            estate,
-                                            gather->num_workers);
+           if (!node->pei)
+               node->pei = ExecInitParallelPlan(node->ps.lefttree,
+                                                estate,
+                                                gather->num_workers);
 
            /*
             * Register backend workers. We might not get as many as we
@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate)
                                       gatherstate->need_to_scan_locally,
                                       &done);
            if (done)
-               ExecShutdownGather(gatherstate);
+               ExecShutdownGatherWorkers(gatherstate);
 
            if (HeapTupleIsValid(tup))
            {
@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate)
 }
 
 /* ----------------------------------------------------------------
- *     ExecShutdownGather
+ *     ExecShutdownGatherWorkers
  *
- *     Destroy the setup for parallel workers.  Collect all the
- *     stats after workers are stopped, else some work done by
- *     workers won't be accounted.
+ *     Destroy the parallel workers.  Collect all the stats after
+ *     workers are stopped, else some work done by workers won't be
+ *     accounted.
  * ----------------------------------------------------------------
  */
 void
-ExecShutdownGather(GatherState *node)
+ExecShutdownGatherWorkers(GatherState *node)
 {
    /* Shut down tuple queue funnel before shutting down workers. */
    if (node->funnel != NULL)
@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node)
 
    /* Now shut down the workers. */
    if (node->pei != NULL)
-   {
        ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecShutdownGather
+ *
+ *     Destroy the setup for parallel workers including parallel context.
+ *     Collect all the stats after workers are stopped, else some work
+ *     done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+   ExecShutdownGatherWorkers(node);
+
+   /* Now destroy the parallel context. */
+   if (node->pei != NULL)
+   {
        ExecParallelCleanup(node->pei);
        node->pei = NULL;
    }
@@ -349,14 +368,21 @@ void
 ExecReScanGather(GatherState *node)
 {
    /*
-    * Re-initialize the parallel context and workers to perform rescan of
-    * relation.  We want to gracefully shutdown all the workers so that they
+    * Re-initialize the parallel workers to perform rescan of relation.
+    * We want to gracefully shutdown all the workers so that they
     * should be able to propagate any error or other information to master
-    * backend before dying.
+    * backend before dying.  Parallel context will be reused for rescan.
     */
-   ExecShutdownGather(node);
+   ExecShutdownGatherWorkers(node);
 
    node->initialized = false;
 
+   if (node->pei)
+   {
+       ReinitializeParallelDSM(node->pei->pcxt);
+       node->pei->tqueue =
+               ExecParallelReinitializeTupleQueues(node->pei->pcxt);
+   }
+
    ExecReScan(node->ps.lefttree);
 }
index d4b7c5dd75b4a7361df1597df10f0b198e618330..411db7964dbbefed2b3e58bcec4a8c43b2921a59 100644 (file)
@@ -56,6 +56,7 @@ extern bool InitializingParallelWorker;
 extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
 extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
 extern void InitializeParallelDSM(ParallelContext *);
+extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void LaunchParallelWorkers(ParallelContext *);
 extern void WaitForParallelWorkersToFinish(ParallelContext *);
 extern void DestroyParallelContext(ParallelContext *);
index 505500e76b54898bbe12105a51da6a8ef91a7154..23c29ebb9027de0c40794d82b951e09eda5d0958 100644 (file)
@@ -33,5 +33,6 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
                     EState *estate, int nworkers);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
+extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt);
 
 #endif   /* EXECPARALLEL_H */