Fix libpq state machine in pipeline mode
authorAlvaro Herrera <[email protected]>
Tue, 29 Jun 2021 19:01:29 +0000 (15:01 -0400)
committerAlvaro Herrera <[email protected]>
Tue, 29 Jun 2021 19:01:29 +0000 (15:01 -0400)
The original coding required that PQpipelineSync had been called before
the first call to PQgetResult, and failure to do that would result in an
unexpected NULL result being returned.  Fix by setting the right state
when a query is sent, rather than leaving it unchanged and having
PQpipelineSync apply the necessary state change.

A new test case to verify the behavior is added, which relies on the new
PQsendFlushRequest() function added by commit a7192326c74d.

Backpatch to 14, where pipeline mode was added.

Reported-by: Boris Kolpackov <[email protected]>
Author: Álvaro Herrera <[email protected]>
Discussion: https://postgr.es/m/boris.20210616110321@codesynthesis.com

src/interfaces/libpq/fe-exec.c
src/test/modules/libpq_pipeline/libpq_pipeline.c
src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
src/test/modules/libpq_pipeline/traces/nosync.trace [new file with mode: 0644]

index c1b1269672585f150f2fe96b3f73273f7042089a..b13ddab393be59b4258baa07c91dab77b53d12e0 100644 (file)
@@ -1375,8 +1375,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
-   if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-       conn->asyncStatus = PGASYNC_BUSY;
+   conn->asyncStatus = PGASYNC_BUSY;
    return 1;
 
 sendFailed:
@@ -1513,8 +1512,7 @@ PQsendPrepare(PGconn *conn,
 
    pqAppendCmdQueueEntry(conn, entry);
 
-   if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-       conn->asyncStatus = PGASYNC_BUSY;
+   conn->asyncStatus = PGASYNC_BUSY;
 
    /*
     * Give the data a push (in pipeline mode, only if we're past the size
@@ -1817,8 +1815,7 @@ PQsendQueryGuts(PGconn *conn,
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
-   if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-       conn->asyncStatus = PGASYNC_BUSY;
+   conn->asyncStatus = PGASYNC_BUSY;
    return 1;
 
 sendFailed:
@@ -2448,8 +2445,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 
    /* OK, it's launched! */
    pqAppendCmdQueueEntry(conn, entry);
-   if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-       conn->asyncStatus = PGASYNC_BUSY;
+   conn->asyncStatus = PGASYNC_BUSY;
    return 1;
 
 sendFailed:
@@ -3084,12 +3080,7 @@ PQpipelineSync(PGconn *conn)
     */
    if (PQflush(conn) < 0)
        goto sendFailed;
-
-   /*
-    * Call pqPipelineProcessQueue so the user can call start calling
-    * PQgetResult.
-    */
-   pqPipelineProcessQueue(conn);
+   conn->asyncStatus = PGASYNC_BUSY;
 
    return 1;
 
index 71eedb6dbb4b8285dfe287bf4cf745e2803f43a6..249ee22105c18f82e840a1fbc96675be07f46561 100644 (file)
@@ -230,6 +230,93 @@ test_multi_pipelines(PGconn *conn)
    fprintf(stderr, "ok\n");
 }
 
+/*
+ * Test behavior when a pipeline dispatches a number of commands that are
+ * not flushed by a sync point.
+ */
+static void
+test_nosync(PGconn *conn)
+{
+   int         numqueries = 10;
+   int         results = 0;
+   int         sock = PQsocket(conn);
+
+   fprintf(stderr, "nosync... ");
+
+   if (sock < 0)
+       pg_fatal("invalid socket");
+
+   if (PQenterPipelineMode(conn) != 1)
+       pg_fatal("could not enter pipeline mode");
+   for (int i = 0; i < numqueries; i++)
+   {
+       fd_set      input_mask;
+       struct timeval tv;
+
+       if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
+                             0, NULL, NULL, NULL, NULL, 0) != 1)
+           pg_fatal("error sending select: %s", PQerrorMessage(conn));
+       PQflush(conn);
+
+       /*
+        * If the server has written anything to us, read (some of) it now.
+        */
+       FD_ZERO(&input_mask);
+       FD_SET(sock, &input_mask);
+       tv.tv_sec = 0;
+       tv.tv_usec = 0;
+       if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+       {
+           fprintf(stderr, "select() failed: %s\n", strerror(errno));
+           exit_nicely(conn);
+       }
+       if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
+           pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
+   }
+
+   /* tell server to flush its output buffer */
+   if (PQsendFlushRequest(conn) != 1)
+       pg_fatal("failed to send flush request");
+   PQflush(conn);
+
+   /* Now read all results */
+   for (;;)
+   {
+       PGresult   *res;
+
+       res = PQgetResult(conn);
+
+       /* NULL results are only expected after TUPLES_OK */
+       if (res == NULL)
+           pg_fatal("got unexpected NULL result after %d results", results);
+
+       /* We expect exactly one TUPLES_OK result for each query we sent */
+       if (PQresultStatus(res) == PGRES_TUPLES_OK)
+       {
+           PGresult   *res2;
+
+           /* and one NULL result should follow each */
+           res2 = PQgetResult(conn);
+           if (res2 != NULL)
+               pg_fatal("expected NULL, got %s",
+                        PQresStatus(PQresultStatus(res2)));
+           PQclear(res);
+           results++;
+
+           /* if we're done, we're done */
+           if (results == numqueries)
+               break;
+
+           continue;
+       }
+
+       /* anything else is unexpected */
+       pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
+   }
+
+   fprintf(stderr, "ok\n");
+}
+
 /*
  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
  * still have to get results for each pipeline item, but the item will just be
@@ -1237,6 +1324,7 @@ print_test_list(void)
 {
    printf("disallowed_in_pipeline\n");
    printf("multi_pipelines\n");
+   printf("nosync\n");
    printf("pipeline_abort\n");
    printf("pipelined_insert\n");
    printf("prepared\n");
@@ -1334,6 +1422,8 @@ main(int argc, char **argv)
        test_disallowed_in_pipeline(conn);
    else if (strcmp(testname, "multi_pipelines") == 0)
        test_multi_pipelines(conn);
+   else if (strcmp(testname, "nosync") == 0)
+       test_nosync(conn);
    else if (strcmp(testname, "pipeline_abort") == 0)
        test_pipeline_abort(conn);
    else if (strcmp(testname, "pipelined_insert") == 0)
index 2bc0e6c223683a9793405ef59b3fb0fdd0e50a58..4101ef950ee467da2328eb76bca9cb1adf629adc 100644 (file)
@@ -26,7 +26,7 @@ for my $testname (@tests)
 {
    my @extraargs = ('-r', $numrows);
    my $cmptrace = grep(/^$testname$/,
-       qw(simple_pipeline multi_pipelines prepared singlerow
+       qw(simple_pipeline nosync multi_pipelines prepared singlerow
          pipeline_abort transaction disallowed_in_pipeline)) > 0;
 
    # For a bunch of tests, generate a libpq trace file too.
diff --git a/src/test/modules/libpq_pipeline/traces/nosync.trace b/src/test/modules/libpq_pipeline/traces/nosync.trace
new file mode 100644 (file)
index 0000000..d99aac6
--- /dev/null
@@ -0,0 +1,92 @@
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  34  Parse    "" "SELECT repeat('xyzxz', 12)" 0
+F  14  Bind     "" "" 0 0 1 0
+F  6   Describe     P ""
+F  9   Execute  "" 0
+F  4   Flush
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+B  4   ParseComplete
+B  4   BindComplete
+B  31  RowDescription   1 "repeat" NNNN 0 NNNN 65535 -1 0
+B  70  DataRow  1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B  13  CommandComplete  "SELECT 1"
+F  4   Terminate