Add PQsendFlushRequest to libpq
authorAlvaro Herrera <[email protected]>
Tue, 29 Jun 2021 18:37:39 +0000 (14:37 -0400)
committerAlvaro Herrera <[email protected]>
Tue, 29 Jun 2021 18:37:39 +0000 (14:37 -0400)
This new libpq function allows the application to send an 'H' message,
which instructs the server to flush its outgoing buffer.

This hasn't been needed so far because the Sync message already requests
a buffer; and I failed to realize that this was needed in pipeline mode
because PQpipelineSync also causes the buffer to be flushed.  However,
sometimes it is useful to request a flush without establishing a
synchronization point.

Backpatch to 14, where pipeline mode was introduced in libpq.

Reported-by: Boris Kolpackov <[email protected]>
Author: Álvaro Herrera <[email protected]>
Discussion: https://postgr.es/m/202106252350[email protected]

doc/src/sgml/libpq.sgml
src/interfaces/libpq/exports.txt
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/libpq-fe.h

index 641970f2a612d3f9dbd5b273e4abccd78f3172ed..59e3e678f9e7620a72ca5f74f76fec0beb2bd541 100644 (file)
@@ -5102,10 +5102,13 @@ int PQflush(PGconn *conn);
      The server executes statements, and returns results, in the order the
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
+     Note that results are buffered on the server side; the server flushes
+     that buffer when a synchronization point is established with
+     <function>PQpipelineSync</function>, or when
+     <function>PQsendFlushRequest</function> is called.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
-     until the next synchronization point established by
-     <function>PQpipelineSync</function>;
+     until the next synchronization point;
      a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
      each such command.
      (This remains true even if the commands in the pipeline would rollback
@@ -5399,6 +5402,32 @@ int PQpipelineSync(PGconn *conn);
       </para>
      </listitem>
     </varlistentry>
+
+    <varlistentry id="libpq-PQsendFlushRequest">
+     <term><function>PQsendFlushRequest</function><indexterm><primary>PQsendFlushRequest</primary></indexterm></term>
+
+      <listitem>
+       <para>
+        Sends a request for the server to flush its output buffer.
+<synopsis>
+int PQsendFlushRequest(PGconn *conn);
+</synopsis>
+       </para>
+
+       <para>
+        Returns 1 for success.  Returns 0 on any failure.
+       </para>
+       <para>
+        The server flushes its output buffer automatically as a result of
+        <function>PQpipelineSync</function> being called, or
+        on any request when not in pipeline mode; this function is useful
+        to cause the server to flush its output buffer in pipeline mode
+        without establishing a synchronization point.
+        Note that the request is not itself flushed to the server automatically;
+        use <function>PQflush</function> if necessary.
+       </para>
+      </listitem>
+     </varlistentry>
    </variablelist>
   </sect2>
 
index 824a03ffbdc88f2c72d92752828583c59f74eb9c..e8bcc883709160d3fc8fbf07adaf9b851150c04d 100644 (file)
@@ -184,4 +184,5 @@ PQexitPipelineMode        181
 PQpipelineSync            182
 PQpipelineStatus          183
 PQsetTraceFlags           184
-PQmblenBounded              185
+PQmblenBounded            185
+PQsendFlushRequest        186
index 7bd5b3a7b9da3212d366ef349671467c00b6653a..c1b1269672585f150f2fe96b3f73273f7042089a 100644 (file)
@@ -3099,6 +3099,42 @@ sendFailed:
    return 0;
 }
 
+/*
+ * PQsendFlushRequest
+ *     Send request for server to flush its buffer.  Useful in pipeline
+ *     mode when a sync point is not desired.
+ */
+int
+PQsendFlushRequest(PGconn *conn)
+{
+   if (!conn)
+       return 0;
+
+   /* Don't try to send if we know there's no live connection. */
+   if (conn->status != CONNECTION_OK)
+   {
+       appendPQExpBufferStr(&conn->errorMessage,
+                            libpq_gettext("no connection to the server\n"));
+       return 0;
+   }
+
+   /* Can't send while already busy, either, unless enqueuing for later */
+   if (conn->asyncStatus != PGASYNC_IDLE &&
+       conn->pipelineStatus == PQ_PIPELINE_OFF)
+   {
+       appendPQExpBufferStr(&conn->errorMessage,
+                            libpq_gettext("another command is already in progress\n"));
+       return false;
+   }
+
+   if (pqPutMsgStart('H', conn) < 0 ||
+       pqPutMsgEnd(conn) < 0)
+   {
+       return 0;
+   }
+
+   return 1;
+}
 
 /* ====== accessor funcs for PGresult ======== */
 
index cc6032b15bd95fe58089429a53d909a8133b6883..a6fd69acebcc8aa60901cc56ab9d57c55b818325 100644 (file)
@@ -470,6 +470,7 @@ extern int  PQconsumeInput(PGconn *conn);
 extern int PQenterPipelineMode(PGconn *conn);
 extern int PQexitPipelineMode(PGconn *conn);
 extern int PQpipelineSync(PGconn *conn);
+extern int PQsendFlushRequest(PGconn *conn);
 
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);