/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
/*
* Give the data a push (in pipeline mode, only if we're past the size
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
- conn->asyncStatus = PGASYNC_BUSY;
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
*/
if (PQflush(conn) < 0)
goto sendFailed;
-
- /*
- * Call pqPipelineProcessQueue so the user can call start calling
- * PQgetResult.
- */
- pqPipelineProcessQueue(conn);
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
fprintf(stderr, "ok\n");
}
+/*
+ * Test behavior when a pipeline dispatches a number of commands that are
+ * not flushed by a sync point.
+ */
+static void
+test_nosync(PGconn *conn)
+{
+ int numqueries = 10;
+ int results = 0;
+ int sock = PQsocket(conn);
+
+ fprintf(stderr, "nosync... ");
+
+ if (sock < 0)
+ pg_fatal("invalid socket");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("could not enter pipeline mode");
+ for (int i = 0; i < numqueries; i++)
+ {
+ fd_set input_mask;
+ struct timeval tv;
+
+ if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("error sending select: %s", PQerrorMessage(conn));
+ PQflush(conn);
+
+ /*
+ * If the server has written anything to us, read (some of) it now.
+ */
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+ if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
+ pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
+ }
+
+ /* tell server to flush its output buffer */
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ PQflush(conn);
+
+ /* Now read all results */
+ for (;;)
+ {
+ PGresult *res;
+
+ res = PQgetResult(conn);
+
+ /* NULL results are only expected after TUPLES_OK */
+ if (res == NULL)
+ pg_fatal("got unexpected NULL result after %d results", results);
+
+ /* We expect exactly one TUPLES_OK result for each query we sent */
+ if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ PGresult *res2;
+
+ /* and one NULL result should follow each */
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ PQclear(res);
+ results++;
+
+ /* if we're done, we're done */
+ if (results == numqueries)
+ break;
+
+ continue;
+ }
+
+ /* anything else is unexpected */
+ pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
+ }
+
+ fprintf(stderr, "ok\n");
+}
+
/*
* When an operation in a pipeline fails the rest of the pipeline is flushed. We
* still have to get results for each pipeline item, but the item will just be
{
printf("disallowed_in_pipeline\n");
printf("multi_pipelines\n");
+ printf("nosync\n");
printf("pipeline_abort\n");
printf("pipelined_insert\n");
printf("prepared\n");
test_disallowed_in_pipeline(conn);
else if (strcmp(testname, "multi_pipelines") == 0)
test_multi_pipelines(conn);
+ else if (strcmp(testname, "nosync") == 0)
+ test_nosync(conn);
else if (strcmp(testname, "pipeline_abort") == 0)
test_pipeline_abort(conn);
else if (strcmp(testname, "pipelined_insert") == 0)
{
my @extraargs = ('-r', $numrows);
my $cmptrace = grep(/^$testname$/,
- qw(simple_pipeline multi_pipelines prepared singlerow
+ qw(simple_pipeline nosync multi_pipelines prepared singlerow
pipeline_abort transaction disallowed_in_pipeline)) > 0;
# For a bunch of tests, generate a libpq trace file too.
--- /dev/null
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+F 4 Terminate