Rewrite async-connection loop in libpqwalreceiver.c, once again.
authorTom Lane <[email protected]>
Wed, 15 Mar 2017 17:26:26 +0000 (13:26 -0400)
committerTom Lane <[email protected]>
Wed, 15 Mar 2017 17:26:26 +0000 (13:26 -0400)
The original coding in commit 1e8a85009 didn't use PQconnectPoll per
spec, and while the rewrite in e434ad39a is closer, it still doesn't
guarantee to wait until the socket is read-ready or write-ready (as
appropriate) before calling PQconnectPoll.  It's not clear whether
that omission is causing the continuing failures on buildfarm member
bowerbird; but given the lack of other explanations meeting the
available facts, let's tighten that up and see what happens.

An independent issue in the same loop was that it had a race condition
whereby it could clear the process's latch without having serviced an
interrupt request, causing failure to respond to a cancel while waiting
for connection (the very problem 1e8a85009 was meant to fix).

Discussion: https://postgr.es/m/7295.1489596949@sss.pgh.pa.us

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

index cd2e57867c062acc8132abc8f91656481aaa35c2..65a9e6c81ce82960f9b6837e18a378623fc898e7 100644 (file)
@@ -159,41 +159,41 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
    /*
     * Poll connection until we have OK or FAILED status.
     *
-    * Note that the initial state after PQconnectStartParams is
-    * PGRES_POLLING_WRITING.
+    * Per spec for PQconnectPoll, first wait till socket is write-ready.
     */
-   for (status = PGRES_POLLING_WRITING;
-        status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED;
-        status = PQconnectPoll(conn->streamConn))
+   status = PGRES_POLLING_WRITING;
+   do
    {
-       /* Sleep a bit if waiting for socket. */
-       if (status == PGRES_POLLING_READING ||
-           status == PGRES_POLLING_WRITING)
+       /* Wait for socket ready and/or other events. */
+       int         io_flag;
+       int         rc;
+
+       io_flag = (status == PGRES_POLLING_READING
+                  ? WL_SOCKET_READABLE
+                  : WL_SOCKET_WRITEABLE);
+
+       rc = WaitLatchOrSocket(&MyProc->procLatch,
+                              WL_POSTMASTER_DEATH |
+                              WL_LATCH_SET | io_flag,
+                              PQsocket(conn->streamConn),
+                              0,
+                              WAIT_EVENT_LIBPQWALRECEIVER);
+
+       /* Emergency bailout? */
+       if (rc & WL_POSTMASTER_DEATH)
+           exit(1);
+
+       /* Interrupted? */
+       if (rc & WL_LATCH_SET)
        {
-           int     extra_flag;
-           int     rc;
-
-           extra_flag = (status == PGRES_POLLING_READING
-                         ? WL_SOCKET_READABLE
-                         : WL_SOCKET_WRITEABLE);
-
            ResetLatch(&MyProc->procLatch);
-           rc = WaitLatchOrSocket(&MyProc->procLatch,
-                                  WL_POSTMASTER_DEATH |
-                                  WL_LATCH_SET | extra_flag,
-                                  PQsocket(conn->streamConn),
-                                  0,
-                                  WAIT_EVENT_LIBPQWALRECEIVER);
-
-           /* Emergency bailout. */
-           if (rc & WL_POSTMASTER_DEATH)
-               exit(1);
-
-           /* Interrupted. */
-           if (rc & WL_LATCH_SET)
-               CHECK_FOR_INTERRUPTS();
+           CHECK_FOR_INTERRUPTS();
        }
-   }
+
+       /* If socket is ready, advance the libpq state machine */
+       if (rc & io_flag)
+           status = PQconnectPoll(conn->streamConn);
+   } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
    if (PQstatus(conn->streamConn) != CONNECTION_OK)
    {