Clean up thread management in parallel pg_dump for Windows.
authorTom Lane <[email protected]>
Fri, 27 May 2016 16:02:09 +0000 (12:02 -0400)
committerTom Lane <[email protected]>
Fri, 27 May 2016 16:02:09 +0000 (12:02 -0400)
Since we start the worker threads with _beginthreadex(), we should use
_endthreadex() to terminate them.  We got this right in the normal-exit
code path, but not so much during an error exit from a worker.
In addition, be sure to apply CloseHandle to the thread handle after
each thread exits.

It's not clear that these oversights cause any user-visible problems,
since the pg_dump run is about to terminate anyway.  Still, it's clearly
better to follow Microsoft's API specifications than ignore them.

Also a few cosmetic cleanups in WaitForTerminatingWorkers(), including
being a bit less random about where to cast between uintptr_t and HANDLE,
and being sure to clear the worker identity field for each dead worker
(not that false matches should be possible later, but let's be careful).

Original observation and patch by Armin Schöffmann, cosmetic improvements
by Michael Paquier and me.  (Armin's patch also included closing sockets
in ShutdownWorkersHard(), but that's been dealt with already in commit
df8d2d8c4.)  Back-patch to 9.3 where parallel pg_dump was introduced.

Discussion: <zarafa.570306bd.3418.074bf1420d8f2ba2@root.aegaeon.de>

src/bin/pg_dump/parallel.c
src/bin/pg_dump/pg_backup_utils.c

index 6debe9d4e5bc0f0f32ba318ed3b6b9ceb7f8d2ee..6389e9dc65aa6a59c7d40e6f147ab6226378f5ea 100644 (file)
@@ -328,7 +328,7 @@ ShutdownWorkersHard(ParallelState *pstate)
 }
 
 /*
- * Wait for the termination of the processes using the OS-specific method.
+ * Wait for all workers to terminate.
  */
 static void
 WaitForTerminatingWorkers(ParallelState *pstate)
@@ -339,39 +339,58 @@ WaitForTerminatingWorkers(ParallelState *pstate)
                int                     j;
 
 #ifndef WIN32
+               /* On non-Windows, use wait() to wait for next worker to end */
                int                     status;
                pid_t           pid = wait(&status);
 
+               /* Find dead worker's slot, and clear the PID field */
                for (j = 0; j < pstate->numWorkers; j++)
-                       if (pstate->parallelSlot[j].pid == pid)
-                               slot = &(pstate->parallelSlot[j]);
-#else
-               uintptr_t       hThread;
-               DWORD           ret;
-               uintptr_t  *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
+               {
+                       slot = &(pstate->parallelSlot[j]);
+                       if (slot->pid == pid)
+                       {
+                               slot->pid = 0;
+                               break;
+                       }
+               }
+#else                                                  /* WIN32 */
+               /* On Windows, we must use WaitForMultipleObjects() */
+               HANDLE     *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
                int                     nrun = 0;
+               DWORD           ret;
+               uintptr_t       hThread;
 
                for (j = 0; j < pstate->numWorkers; j++)
+               {
                        if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
                        {
-                               lpHandles[nrun] = pstate->parallelSlot[j].hThread;
+                               lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
                                nrun++;
                        }
-               ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
+               }
+               ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
                Assert(ret != WAIT_FAILED);
-               hThread = lpHandles[ret - WAIT_OBJECT_0];
+               hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
+               free(lpHandles);
 
+               /* Find dead worker's slot, and clear the hThread field */
                for (j = 0; j < pstate->numWorkers; j++)
-                       if (pstate->parallelSlot[j].hThread == hThread)
-                               slot = &(pstate->parallelSlot[j]);
-
-               free(lpHandles);
-#endif
-               Assert(slot);
+               {
+                       slot = &(pstate->parallelSlot[j]);
+                       if (slot->hThread == hThread)
+                       {
+                               /* For cleanliness, close handles for dead threads */
+                               CloseHandle((HANDLE) slot->hThread);
+                               slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
+                               break;
+                       }
+               }
+#endif   /* WIN32 */
 
+               /* On all platforms, update workerStatus as well */
+               Assert(j < pstate->numWorkers);
                slot->workerStatus = WRKR_TERMINATED;
        }
-       Assert(HasEveryWorkerTerminated(pstate));
 }
 
 #ifndef WIN32
index 71822d580544f5110ce947c452c017aba3c34188..7fbc77628df0dd00dc7d5c005fa5aaef8eef7faa 100644 (file)
@@ -149,7 +149,7 @@ exit_nicely(int code)
 
 #ifdef WIN32
        if (parallel_init_done && GetCurrentThreadId() != mainThreadId)
-               ExitThread(code);
+               _endthreadex(code);
 #endif
 
        exit(code);