</listitem>
</varlistentry>
+ <varlistentry id="app-psql-meta-command-pipeline">
+ <term><literal>\startpipeline</literal></term>
+ <term><literal>\syncpipeline</literal></term>
+ <term><literal>\endpipeline</literal></term>
+ <term><literal>\flushrequest</literal></term>
+ <term><literal>\flush</literal></term>
+ <term><literal>\getresults [ <replaceable class="parameter">number_results</replaceable> ]</literal></term>
+
+ <listitem>
+ <para>
+ This group of commands implements pipelining of SQL statements.
+ A pipeline must begin with a <command>\startpipeline</command>
+ and end with an <command>\endpipeline</command>. In between there
+ may be any number of <command>\syncpipeline</command> commands,
+ which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+ without ending the ongoing pipeline and flushing the send buffer.
+ In pipeline mode, statements are sent to the server without waiting
+ for the results of previous statements.
+ See <xref linkend="libpq-pipeline-mode"/> for more details.
+ </para>
+
+ <para>
+ Pipeline mode requires the use of the extended query protocol. All
+ queries need to be sent using the meta-commands
+ <literal>\bind</literal>, <literal>\bind_named</literal>,
+ <literal>\close</literal> or <literal>\parse</literal>. While a
+ pipeline is ongoing, <literal>\g</literal> will append the current
+ query buffer to the pipeline. Other meta-commands like
+ <literal>\gx</literal> or <literal>\gdesc</literal> are not allowed
+ in pipeline mode.
+ </para>
+
+ <para>
+ <command>\flushrequest</command> appends a flush command to the
+ pipeline, allowing to read results with
+ <command>\getresults</command> without issuing a sync or ending the
+ pipeline. <command>\getresults</command> will automatically push
+ unsent data to the server. <command>\flush</command> can be used to
+ manually push unsent data.
+ </para>
+
+ <para>
+ <command>\getresults</command> accepts an optional
+ <replaceable class="parameter">number_results</replaceable> parameter.
+ If provided, only the first
+ <replaceable class="parameter">number_results</replaceable> pending
+ results will be read. If not provided or <literal>0</literal>, all
+ pending results are read. The commands <literal>\bind</literal>,
+ <literal>\bind_named</literal>, <literal>\close</literal>,
+ <literal>\parse</literal> and <literal>\syncpipeline</literal>
+ generate one result to get.
+ </para>
+
+ <para>
+ Example:
+<programlisting>
+\startpipeline
+SELECT 1 \bind \g
+\flushrequest
+\getresults
+\endpipeline
+</programlisting>
+ </para>
+
+ </listitem>
+ </varlistentry>
+
<varlistentry id="app-psql-meta-command-t-lc">
<term><literal>\t</literal></term>
PQExpBuffer query_buf);
static backslashResult exec_command_endif(PsqlScanState scan_state, ConditionalStack cstack,
PQExpBuffer query_buf);
+static backslashResult exec_command_endpipeline(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_encoding(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_errverbose(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_f(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_flush(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_flushrequest(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_g(PsqlScanState scan_state, bool active_branch,
const char *cmd);
static backslashResult process_command_g_options(char *first_option,
static backslashResult exec_command_getenv(PsqlScanState scan_state, bool active_branch,
const char *cmd);
static backslashResult exec_command_gexec(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_getresults(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_gset(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_help(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_html(PsqlScanState scan_state, bool active_branch);
const char *cmd);
static backslashResult exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
const char *cmd, bool is_func);
+static backslashResult exec_command_startpipeline(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_t(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_T(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_timing(PsqlScanState scan_state, bool active_branch);
status = exec_command_else(scan_state, cstack, query_buf);
else if (strcmp(cmd, "endif") == 0)
status = exec_command_endif(scan_state, cstack, query_buf);
+ else if (strcmp(cmd, "endpipeline") == 0)
+ status = exec_command_endpipeline(scan_state, active_branch);
else if (strcmp(cmd, "encoding") == 0)
status = exec_command_encoding(scan_state, active_branch);
else if (strcmp(cmd, "errverbose") == 0)
status = exec_command_errverbose(scan_state, active_branch);
else if (strcmp(cmd, "f") == 0)
status = exec_command_f(scan_state, active_branch);
+ else if (strcmp(cmd, "flush") == 0)
+ status = exec_command_flush(scan_state, active_branch);
+ else if (strcmp(cmd, "flushrequest") == 0)
+ status = exec_command_flushrequest(scan_state, active_branch);
else if (strcmp(cmd, "g") == 0 || strcmp(cmd, "gx") == 0)
status = exec_command_g(scan_state, active_branch, cmd);
else if (strcmp(cmd, "gdesc") == 0)
status = exec_command_gdesc(scan_state, active_branch);
else if (strcmp(cmd, "getenv") == 0)
status = exec_command_getenv(scan_state, active_branch, cmd);
+ else if (strcmp(cmd, "getresults") == 0)
+ status = exec_command_getresults(scan_state, active_branch);
else if (strcmp(cmd, "gexec") == 0)
status = exec_command_gexec(scan_state, active_branch);
else if (strcmp(cmd, "gset") == 0)
status = exec_command_sf_sv(scan_state, active_branch, cmd, true);
else if (strcmp(cmd, "sv") == 0 || strcmp(cmd, "sv+") == 0)
status = exec_command_sf_sv(scan_state, active_branch, cmd, false);
+ else if (strcmp(cmd, "startpipeline") == 0)
+ status = exec_command_startpipeline(scan_state, active_branch);
+ else if (strcmp(cmd, "syncpipeline") == 0)
+ status = exec_command_syncpipeline(scan_state, active_branch);
else if (strcmp(cmd, "t") == 0)
status = exec_command_t(scan_state, active_branch);
else if (strcmp(cmd, "T") == 0)
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
}
+/*
+ * \flush -- call PQflush() on the connection
+ */
+static backslashResult
+exec_command_flush(PsqlScanState scan_state, bool active_branch)
+{
+ backslashResult status = PSQL_CMD_SKIP_LINE;
+
+ if (active_branch)
+ {
+ pset.send_mode = PSQL_SEND_FLUSH;
+ status = PSQL_CMD_SEND;
+ }
+ else
+ ignore_slash_options(scan_state);
+
+ return status;
+}
+
+/*
+ * \flushrequest -- call PQsendFlushRequest() on the connection
+ */
+static backslashResult
+exec_command_flushrequest(PsqlScanState scan_state, bool active_branch)
+{
+ backslashResult status = PSQL_CMD_SKIP_LINE;
+
+ if (active_branch)
+ {
+ pset.send_mode = PSQL_SEND_FLUSH_REQUEST;
+ status = PSQL_CMD_SEND;
+ }
+ else
+ ignore_slash_options(scan_state);
+
+ return status;
+}
+
/*
* \g [(pset-option[=pset-value] ...)] [filename/shell-command]
* \gx [(pset-option[=pset-value] ...)] [filename/shell-command]
if (status == PSQL_CMD_SKIP_LINE && active_branch)
{
+ if (strcmp(cmd, "gx") == 0 &&
+ PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ pg_log_error("\\gx not allowed in pipeline mode");
+ clean_extended_state();
+ return PSQL_CMD_ERROR;
+ }
+
if (!fname)
pset.gfname = NULL;
else
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
}
+/*
+ * \getresults -- read results
+ */
+static backslashResult
+exec_command_getresults(PsqlScanState scan_state, bool active_branch)
+{
+ backslashResult status = PSQL_CMD_SKIP_LINE;
+
+ if (active_branch)
+ {
+ char *opt;
+ int num_results;
+
+ pset.send_mode = PSQL_SEND_GET_RESULTS;
+ status = PSQL_CMD_SEND;
+ opt = psql_scan_slash_option(scan_state, OT_NORMAL, NULL, false);
+
+ pset.requested_results = 0;
+ if (opt != NULL)
+ {
+ num_results = atoi(opt);
+ if (num_results < 0)
+ {
+ pg_log_error("\\getresults: invalid number of requested results");
+ return PSQL_CMD_SKIP_LINE;
+ }
+ pset.requested_results = num_results;
+ }
+ }
+ else
+ ignore_slash_options(scan_state);
+
+ return status;
+}
+
+
/*
* \gexec -- send query and execute each field of result
*/
if (active_branch)
{
+ if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ pg_log_error("\\gexec not allowed in pipeline mode");
+ clean_extended_state();
+ return PSQL_CMD_ERROR;
+ }
pset.gexec_flag = true;
status = PSQL_CMD_SEND;
}
char *prefix = psql_scan_slash_option(scan_state,
OT_NORMAL, NULL, false);
+ if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ pg_log_error("\\gset not allowed in pipeline mode");
+ clean_extended_state();
+ return PSQL_CMD_ERROR;
+ }
+
if (prefix)
pset.gset_prefix = prefix;
else
return status;
}
+/*
+ * \startpipeline -- enter pipeline mode
+ */
+static backslashResult
+exec_command_startpipeline(PsqlScanState scan_state, bool active_branch)
+{
+ backslashResult status = PSQL_CMD_SKIP_LINE;
+
+ if (active_branch)
+ {
+ pset.send_mode = PSQL_SEND_START_PIPELINE_MODE;
+ status = PSQL_CMD_SEND;
+ }
+ else
+ ignore_slash_options(scan_state);
+
+ return status;
+}
+
+/*
+ * \syncpipeline -- send a sync message to an active pipeline
+ */
+static backslashResult
+exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch)
+{
+ backslashResult status = PSQL_CMD_SKIP_LINE;
+
+ if (active_branch)
+ {
+ pset.send_mode = PSQL_SEND_PIPELINE_SYNC;
+ status = PSQL_CMD_SEND;
+ }
+ else
+ ignore_slash_options(scan_state);
+
+ return status;
+}
+
+/*
+ * \endpipeline -- end pipeline mode
+ */
+static backslashResult
+exec_command_endpipeline(PsqlScanState scan_state, bool active_branch)
+{
+ backslashResult status = PSQL_CMD_SKIP_LINE;
+
+ if (active_branch)
+ {
+ pset.send_mode = PSQL_SEND_END_PIPELINE_MODE;
+ status = PSQL_CMD_SEND;
+ }
+ else
+ ignore_slash_options(scan_state);
+
+ return status;
+}
+
/*
* \t -- turn off table headers and row count
*/
}
}
+/*
+ * Reset pset pipeline state
+ */
+static void
+pipelineReset(void)
+{
+ pset.piped_syncs = 0;
+ pset.piped_commands = 0;
+ pset.available_results = 0;
+ pset.requested_results = 0;
+}
+
/*
* setQFout
* -- handler for -o command line option and \o command
fprintf(stderr, _("The connection to the server was lost. Attempting reset: "));
PQreset(pset.db);
+ pipelineReset();
OK = ConnectionUp();
if (!OK)
{
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
+ case PGRES_PIPELINE_SYNC:
/* Fine, do nothing */
OK = true;
break;
+ case PGRES_PIPELINE_ABORTED:
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
success = true;
break;
+ case PGRES_PIPELINE_ABORTED:
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
return OK;
}
+/*
+ * Read and discard all results in an aborted pipeline.
+ *
+ * If a synchronisation point is found, we can stop discarding results as
+ * the pipeline will switch back to a clean state. If no synchronisation
+ * point is available, we need to stop when ther are no more pending
+ * results, otherwise, calling PQgetResult() would block.
+ */
+static PGresult *
+discardAbortedPipelineResults(void)
+{
+ for (;;)
+ {
+ PGresult *res = PQgetResult(pset.db);
+ ExecStatusType result_status = PQresultStatus(res);
+
+ if (result_status == PGRES_PIPELINE_SYNC)
+ {
+ /*
+ * Found a synchronisation point. The sync counter is decremented
+ * by the caller.
+ */
+ return res;
+ }
+ else if (res == NULL)
+ {
+ /* A query was processed, decrement the counters */
+ Assert(pset.available_results > 0);
+ Assert(pset.requested_results > 0);
+ pset.available_results--;
+ pset.requested_results--;
+ }
+
+ if (pset.requested_results == 0)
+ {
+ /* We have read all the requested results, leave */
+ return res;
+ }
+
+ if (pset.available_results == 0 && pset.piped_syncs == 0)
+ {
+ /*
+ * There are no more results to get and there is no
+ * synchronisation point to stop at. This will leave the pipeline
+ * in an aborted state.
+ */
+ return res;
+ }
+
+ /*
+ * An aborted pipeline will have either NULL results or results in an
+ * PGRES_PIPELINE_ABORTED status.
+ */
+ Assert(res == NULL || result_status == PGRES_PIPELINE_ABORTED);
+ PQclear(res);
+ }
+}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
* input or output stream. In that event, we'll marshal data for the COPY.
*
* For other commands, the results are processed normally, depending on their
- * status.
+ * status and the status of a pipeline.
*
* When invoked from \watch, is_watch is true and min_rows is the value
* of that option, or 0 if it wasn't set.
bool timing = pset.timing;
bool success = false;
bool return_early = false;
+ bool end_pipeline = false;
instr_time before,
after;
PGresult *result;
{
case PSQL_SEND_EXTENDED_CLOSE:
success = PQsendClosePrepared(pset.db, pset.stmtName);
+ if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ pset.piped_commands++;
break;
case PSQL_SEND_EXTENDED_PARSE:
success = PQsendPrepare(pset.db, pset.stmtName, query, 0, NULL);
+ if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ pset.piped_commands++;
break;
case PSQL_SEND_EXTENDED_QUERY_PARAMS:
Assert(pset.stmtName == NULL);
pset.bind_nparams, NULL,
(const char *const *) pset.bind_params,
NULL, NULL, 0);
+ if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ pset.piped_commands++;
break;
case PSQL_SEND_EXTENDED_QUERY_PREPARED:
Assert(pset.stmtName != NULL);
pset.bind_nparams,
(const char *const *) pset.bind_params,
NULL, NULL, 0);
+ if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ pset.piped_commands++;
+ break;
+ case PSQL_SEND_START_PIPELINE_MODE:
+ success = PQenterPipelineMode(pset.db);
+ break;
+ case PSQL_SEND_END_PIPELINE_MODE:
+ success = PQpipelineSync(pset.db);
+ if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ /*
+ * End of the pipeline, all queued commands need to be
+ * processed.
+ */
+ end_pipeline = true;
+ pset.piped_syncs++;
+
+ /*
+ * The server will send a ReadyForQuery after a Sync is
+ * processed, flushing all the results back to the client.
+ */
+ pset.available_results += pset.piped_commands;
+ pset.piped_commands = 0;
+
+ /* We want to read all results */
+ pset.requested_results = pset.available_results + pset.piped_syncs;
+ }
+ break;
+ case PSQL_SEND_PIPELINE_SYNC:
+ success = PQsendPipelineSync(pset.db);
+ if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ pset.piped_syncs++;
+
+ /*
+ * The server will send a ReadyForQuery after a Sync is
+ * processed, flushing all the results back to the client.
+ */
+ pset.available_results += pset.piped_commands;
+ pset.piped_commands = 0;
+ }
+ break;
+ case PSQL_SEND_FLUSH:
+ success = PQflush(pset.db);
+ break;
+ case PSQL_SEND_FLUSH_REQUEST:
+ success = PQsendFlushRequest(pset.db);
+ if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ /*
+ * With the flush request, all commands in the pipeline are
+ * pushed and the server will flush the results back to the
+ * client, making them available.
+ */
+ pset.available_results += pset.piped_commands;
+ pset.piped_commands = 0;
+ }
+ break;
+ case PSQL_SEND_GET_RESULTS:
+ if (pset.available_results == 0 && pset.piped_syncs == 0)
+ {
+ /*
+ * If no sync or flush request were sent, PQgetResult() would
+ * block as there are no results available. Forbid any
+ * attempt to get pending results should we try to reach this
+ * state.
+ */
+ pg_log_info("No pending results to get");
+ success = false;
+ pset.requested_results = 0;
+ }
+ else
+ {
+ success = true;
+
+ /*
+ * Cap requested_results to the maximum number of known
+ * results.
+ */
+ if (pset.requested_results == 0 ||
+ pset.requested_results > (pset.available_results + pset.piped_syncs))
+ pset.requested_results = pset.available_results + pset.piped_syncs;
+ }
break;
case PSQL_SEND_QUERY:
success = PQsendQuery(pset.db, query);
return -1;
}
+ if (pset.requested_results == 0 && !end_pipeline &&
+ PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ /*
+ * We are in a pipeline and have not reached the pipeline end, or
+ * there was no request to read pipeline results, exit.
+ */
+ return 1;
+ }
+
/*
* Fetch the result in chunks if FETCH_COUNT is set, except when:
*
{
ExecStatusType result_status;
bool is_chunked_result = false;
- PGresult *next_result;
+ PGresult *next_result = NULL;
bool last;
if (!AcceptResult(result, false))
ClearOrSaveResult(result);
success = false;
+ if (result_status == PGRES_PIPELINE_ABORTED)
+ pg_log_info("Pipeline aborted, command did not run");
+
/*
* switch to next result
*/
*/
result = NULL;
}
+ else if ((end_pipeline || pset.requested_results > 0)
+ && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+ {
+ /*
+ * Error within a pipeline. All commands are aborted until
+ * the next synchronisation point. We need to consume all the
+ * results until this synchronisation point, or stop when
+ * there are no more result to discard.
+ *
+ * Checking the pipeline status is necessary for the case
+ * where the connection was reset. The new connection is not
+ * in any kind of pipeline state and thus has no result to
+ * discard.
+ */
+ result = discardAbortedPipelineResults();
+ }
else
result = PQgetResult(pset.db);
}
}
+ if (result_status == PGRES_PIPELINE_SYNC)
+ {
+ Assert(pset.piped_syncs > 0);
+
+ /*
+ * Sync response, decrease the sync and requested_results
+ * counters.
+ */
+ pset.piped_syncs--;
+ pset.requested_results--;
+
+ /*
+ * After a synchronisation point, reset success state to print
+ * possible successful results that will be processed after this.
+ */
+ success = true;
+
+ /*
+ * If all syncs were processed and pipeline end was requested,
+ * exit pipeline mode.
+ */
+ if (end_pipeline && pset.piped_syncs == 0)
+ success &= PQexitPipelineMode(pset.db);
+ }
+ else if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF &&
+ result_status != PGRES_PIPELINE_SYNC)
+ {
+ /*
+ * In a pipeline with a non-sync response? Decrease the result
+ * counters.
+ */
+ pset.available_results--;
+ pset.requested_results--;
+ }
+
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
* to process. We need to do that to check whether this is the last.
*/
- next_result = PQgetResult(pset.db);
+ if (PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
+ next_result = PQgetResult(pset.db);
+ else
+ {
+ /*
+ * In pipeline mode, a NULL result indicates the end of the
+ * current query being processed. Call PQgetResult() once to
+ * consume this state.
+ */
+ if (result_status != PGRES_PIPELINE_SYNC)
+ {
+ next_result = PQgetResult(pset.db);
+ Assert(next_result == NULL);
+ }
+
+ /* Now, we can get the next result in the pipeline. */
+ if (pset.requested_results > 0)
+ next_result = PQgetResult(pset.db);
+ }
+
last = (next_result == NULL);
/*
*elapsed_msec = INSTR_TIME_GET_MILLISEC(after);
}
- /* this may or may not print something depending on settings */
- if (result != NULL)
+ /*
+ * This may or may not print something depending on settings.
+ *
+ * A pipeline sync will have a non-NULL result but does not have
+ * anything to print, thus ignore results in this case.
+ */
+ if (result != NULL && result_status != PGRES_PIPELINE_SYNC)
{
/*
* If results need to be printed into the file specified by \g,
ClearOrSaveResult(result);
result = next_result;
- if (cancel_pressed)
+ if (cancel_pressed && PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
{
- /* drop this next result, as well as any others not yet read */
+ /*
+ * Outside of a pipeline, drop the next result, as well as any
+ * others not yet read.
+ *
+ * Within a pipeline, we can let the outer loop handle this as an
+ * aborted pipeline, which will discard then all the results.
+ */
ClearOrSaveResult(result);
ClearOrSaveAllResults();
break;
/* close \g file if we opened it */
CloseGOutput(gfile_fout, gfile_is_pipe);
+ if (end_pipeline)
+ {
+ /* after a pipeline is processed, pipeline piped_syncs should be 0 */
+ Assert(pset.piped_syncs == 0);
+ /* all commands have been processed */
+ Assert(pset.piped_commands == 0);
+ /* all results were read */
+ Assert(pset.available_results == 0);
+ }
+ Assert(pset.requested_results == 0);
+
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
return -1;
pset.bind_params = NULL;
break;
case PSQL_SEND_QUERY:
+ case PSQL_SEND_START_PIPELINE_MODE: /* \startpipeline */
+ case PSQL_SEND_END_PIPELINE_MODE: /* \endpipeline */
+ case PSQL_SEND_PIPELINE_SYNC: /* \syncpipeline */
+ case PSQL_SEND_FLUSH: /* \flush */
+ case PSQL_SEND_GET_RESULTS: /* \getresults */
+ case PSQL_SEND_FLUSH_REQUEST: /* \flushrequest */
break;
}
HELP0(" \\close STMT_NAME close an existing prepared statement\n");
HELP0(" \\copyright show PostgreSQL usage and distribution terms\n");
HELP0(" \\crosstabview [COLUMNS] execute query and display result in crosstab\n");
+ HELP0(" \\endpipeline exit pipeline mode\n");
HELP0(" \\errverbose show most recent error message at maximum verbosity\n");
+ HELP0(" \\flush push unsent data to the server\n");
+ HELP0(" \\flushrequest send a flushrequest command\n");
HELP0(" \\g [(OPTIONS)] [FILE] execute query (and send result to file or |pipe);\n"
" \\g with no arguments is equivalent to a semicolon\n");
HELP0(" \\gdesc describe result of query, without executing it\n");
+ HELP0(" \\getresults [NUM_RES] read NUM_RES pending results. All pending results are\n"
+ " read if no argument is provided\n");
HELP0(" \\gexec execute query, then execute each value in its result\n");
HELP0(" \\gset [PREFIX] execute query and store result in psql variables\n");
HELP0(" \\gx [(OPTIONS)] [FILE] as \\g, but forces expanded output mode\n");
HELP0(" \\parse STMT_NAME create a prepared statement\n");
HELP0(" \\q quit psql\n");
+ HELP0(" \\startpipeline enter pipeline mode\n");
+ HELP0(" \\syncpipeline add a synchronisation point to an ongoing pipeline\n");
HELP0(" \\watch [[i=]SEC] [c=N] [m=MIN]\n"
" execute query every SEC seconds, up to N times,\n"
" stop if less than MIN rows are returned\n");
PSQL_SEND_EXTENDED_PARSE,
PSQL_SEND_EXTENDED_QUERY_PARAMS,
PSQL_SEND_EXTENDED_QUERY_PREPARED,
+ PSQL_SEND_PIPELINE_SYNC,
+ PSQL_SEND_START_PIPELINE_MODE,
+ PSQL_SEND_END_PIPELINE_MODE,
+ PSQL_SEND_FLUSH,
+ PSQL_SEND_FLUSH_REQUEST,
+ PSQL_SEND_GET_RESULTS,
} PSQL_SEND_MODE;
typedef enum
char **bind_params; /* parameters for extended query protocol call */
char *stmtName; /* prepared statement name used for extended
* query protocol commands */
+ int piped_commands; /* number of piped commands */
+ int piped_syncs; /* number of piped syncs */
+ int available_results; /* number of results available to get */
+ int requested_results; /* number of requested results, including
+ * sync messages. Used to read a limited
+ * subset of the available_results. */
bool crosstab_flag; /* one-shot request to crosstab result */
char *ctv_args[4]; /* \crosstabview arguments */
"\\drds", "\\drg", "\\dRs", "\\dRp", "\\ds",
"\\dt", "\\dT", "\\dv", "\\du", "\\dx", "\\dX", "\\dy",
"\\echo", "\\edit", "\\ef", "\\elif", "\\else", "\\encoding",
- "\\endif", "\\errverbose", "\\ev",
- "\\f",
- "\\g", "\\gdesc", "\\getenv", "\\gexec", "\\gset", "\\gx",
+ "\\endif", "\\endpipeline", "\\errverbose", "\\ev",
+ "\\f", "\\flush", "\\flushrequest",
+ "\\g", "\\gdesc", "\\getenv", "\\getresults", "\\gexec", "\\gset", "\\gx",
"\\help", "\\html",
"\\if", "\\include", "\\include_relative", "\\ir",
"\\list", "\\lo_import", "\\lo_export", "\\lo_list", "\\lo_unlink",
"\\parse", "\\password", "\\print", "\\prompt", "\\pset",
"\\qecho", "\\quit",
"\\reset",
- "\\s", "\\set", "\\setenv", "\\sf", "\\sv",
+ "\\s", "\\set", "\\setenv", "\\sf", "\\startpipeline", "\\sv", "\\syncpipeline",
"\\t", "\\T", "\\timing",
"\\unset",
"\\x",
\echo arg1 arg2 arg3 arg4 arg5
\echo arg1
\encoding arg1
+ \endpipeline
\errverbose
\f arg1
+ \flush
+ \flushrequest
\g arg1
\gx arg1
\gexec
+ \getresults
SELECT 1 AS one \gset
\h
\?
\setenv arg1 arg2
\sf whole_line
\sv whole_line
+ \startpipeline
+ \syncpipeline
\t arg1
\T arg1
\timing arg1
--- /dev/null
+--
+-- Tests using psql pipelining
+--
+CREATE TABLE psql_pipeline(a INTEGER PRIMARY KEY, s TEXT);
+-- Single query
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+-- Multiple queries
+\startpipeline
+SELECT $1 \bind 'val1' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+ ?column? | ?column?
+----------+----------
+ val2 | val3
+(1 row)
+
+ ?column? | ?column?
+----------+----------
+ val2 | val3
+(1 row)
+
+-- Test \flush
+\startpipeline
+\flush
+SELECT $1 \bind 'val1' \g
+\flush
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+ ?column? | ?column?
+----------+----------
+ val2 | val3
+(1 row)
+
+ ?column? | ?column?
+----------+----------
+ val2 | val3
+(1 row)
+
+-- Send multiple syncs
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\syncpipeline
+\syncpipeline
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\syncpipeline
+SELECT $1, $2 \bind 'val4' 'val5' \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+ ?column? | ?column?
+----------+----------
+ val2 | val3
+(1 row)
+
+ ?column? | ?column?
+----------+----------
+ val4 | val5
+(1 row)
+
+-- \startpipeline should not have any effect if already in a pipeline.
+\startpipeline
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+-- Convert an implicit transaction block to an explicit transaction block.
+\startpipeline
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 2 \g
+ROLLBACK \bind \g
+\endpipeline
+-- Multiple explicit transactions
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+COMMIT \bind \g
+\endpipeline
+-- COPY FROM STDIN
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+-- COPY FROM STDIN with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\flushrequest
+\getresults
+ ?column?
+----------
+ val1
+(1 row)
+
+message type 0x5a arrived from server while idle
+\endpipeline
+-- COPY FROM STDIN with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\syncpipeline
+\getresults
+ ?column?
+----------
+ val1
+(1 row)
+
+\endpipeline
+-- COPY TO STDOUT
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+1 \N
+2 test2
+3 test3
+4 test4
+-- COPY TO STDOUT with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\flushrequest
+\getresults
+ ?column?
+----------
+ val1
+(1 row)
+
+1 \N
+2 test2
+3 test3
+4 test4
+\endpipeline
+-- COPY TO STDOUT with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\syncpipeline
+\getresults
+ ?column?
+----------
+ val1
+(1 row)
+
+1 \N
+2 test2
+3 test3
+4 test4
+\endpipeline
+-- Use \parse and \bind_named
+\startpipeline
+SELECT $1 \parse ''
+SELECT $1, $2 \parse ''
+SELECT $2 \parse pipeline_1
+\bind_named '' 1 2 \g
+\bind_named pipeline_1 2 \g
+\endpipeline
+ERROR: could not determine data type of parameter $1
+-- \getresults displays all results preceding a \flushrequest.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+ ?column?
+----------
+ 1
+(1 row)
+
+ ?column?
+----------
+ 2
+(1 row)
+
+\endpipeline
+-- \getresults displays all results preceding a \syncpipeline.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\syncpipeline
+\getresults
+ ?column?
+----------
+ 1
+(1 row)
+
+ ?column?
+----------
+ 2
+(1 row)
+
+\endpipeline
+-- \getresults immediately returns if there is no result to fetch.
+\startpipeline
+\getresults
+No pending results to get
+SELECT $1 \bind 2 \g
+\getresults
+No pending results to get
+\flushrequest
+\endpipeline
+ ?column?
+----------
+ 2
+(1 row)
+
+\getresults
+No pending results to get
+-- \getresults only fetches results preceding a \flushrequest.
+\startpipeline
+SELECT $1 \bind 2 \g
+\flushrequest
+SELECT $1 \bind 2 \g
+\getresults
+ ?column?
+----------
+ 2
+(1 row)
+
+\endpipeline
+ ?column?
+----------
+ 2
+(1 row)
+
+-- \getresults only fetches results preceding a \syncpipeline.
+\startpipeline
+SELECT $1 \bind 2 \g
+\syncpipeline
+SELECT $1 \bind 2 \g
+\getresults
+ ?column?
+----------
+ 2
+(1 row)
+
+\endpipeline
+ ?column?
+----------
+ 2
+(1 row)
+
+-- Use pipeline with chunked results for both \getresults and \endpipeline.
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+ ?column?
+----------
+ 2
+(1 row)
+
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column?
+----------
+ 2
+(1 row)
+
+\unset FETCH_COUNT
+-- \getresults with specific number of requested results.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 1
+ ?column?
+----------
+ 1
+(1 row)
+
+SELECT $1 \bind 4 \g
+\getresults 3
+ ?column?
+----------
+ 2
+(1 row)
+
+ ?column?
+----------
+ 3
+(1 row)
+
+\endpipeline
+ ?column?
+----------
+ 4
+(1 row)
+
+-- \syncpipeline count as one command to fetch for \getresults.
+\startpipeline
+\syncpipeline
+\syncpipeline
+SELECT $1 \bind 1 \g
+\flushrequest
+\getresults 2
+\getresults 1
+ ?column?
+----------
+ 1
+(1 row)
+
+\endpipeline
+-- \getresults 0 should get all the results.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 0
+ ?column?
+----------
+ 1
+(1 row)
+
+ ?column?
+----------
+ 2
+(1 row)
+
+ ?column?
+----------
+ 3
+(1 row)
+
+\endpipeline
+--
+-- Pipeline errors
+--
+-- \endpipeline outside of pipeline should fail
+\endpipeline
+cannot send pipeline when not in pipeline mode
+-- Query using simple protocol should not be sent and should leave the
+-- pipeline usable.
+\startpipeline
+SELECT 1;
+PQsendQuery not allowed in pipeline mode
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column?
+----------
+ val1
+(1 row)
+
+-- After an aborted pipeline, commands after a \syncpipeline should be
+-- displayed.
+\startpipeline
+SELECT $1 \bind \g
+\syncpipeline
+SELECT $1 \bind 1 \g
+\endpipeline
+ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
+ ?column?
+----------
+ 1
+(1 row)
+
+-- For an incorrect number of parameters, the pipeline is aborted and
+-- the following queries will not be executed.
+\startpipeline
+SELECT \bind 'val1' \g
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ERROR: bind message supplies 1 parameters, but prepared statement "" requires 0
+-- An explicit transaction with an error needs to be rollbacked after
+-- the pipeline.
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+\endpipeline
+ERROR: duplicate key value violates unique constraint "psql_pipeline_pkey"
+DETAIL: Key (a)=(1) already exists.
+ROLLBACK;
+-- \watch sends a simple query, something not allowed within a pipeline.
+\startpipeline
+SELECT \bind \g
+\watch 1
+PQsendQuery not allowed in pipeline mode
+
+\endpipeline
+--
+(1 row)
+
+-- \gdesc should fail as synchronous commands are not allowed in a pipeline,
+-- and the pipeline should still be usable.
+\startpipeline
+SELECT $1 \bind 1 \gdesc
+synchronous command execution functions are not allowed in pipeline mode
+SELECT $1 \bind 1 \g
+\endpipeline
+ ?column?
+----------
+ 1
+(1 row)
+
+-- \gset is not allowed in a pipeline, pipeline should still be usable.
+\startpipeline
+SELECT $1 as i, $2 as j \parse ''
+SELECT $1 as k, $2 as l \parse 'second'
+\bind_named '' 1 2 \gset
+\gset not allowed in pipeline mode
+\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
+\gset not allowed in pipeline mode
+\bind_named '' 1 2 \g
+\endpipeline
+ i | j
+---+---
+ 1 | 2
+(1 row)
+
+-- \gx is not allowed, pipeline should still be usable.
+\startpipeline
+SELECT $1 \bind 1 \gx
+\gx not allowed in pipeline mode
+\reset
+SELECT $1 \bind 1 \g
+\endpipeline
+ ?column?
+----------
+ 1
+(1 row)
+
+-- \gx warning should be emitted in an aborted pipeline, with
+-- pipeline still usable.
+\startpipeline
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
+SELECT $1 \bind 1 \gx
+\gx not allowed in pipeline mode
+\endpipeline
+-- \gexec is not allowed, pipeline should still be usable.
+\startpipeline
+SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
+\bind_named insert_stmt \gexec
+\gexec not allowed in pipeline mode
+\bind_named insert_stmt \g
+SELECT COUNT(*) FROM psql_pipeline \bind \g
+\endpipeline
+ ?column?
+------------------------------------------------------------
+ INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)
+(1 row)
+
+ count
+-------
+ 4
+(1 row)
+
+-- After an error, pipeline is aborted and requires \syncpipeline to be
+-- reusable.
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
+-- Pipeline is aborted.
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+-- Sync allows pipeline to recover.
+\syncpipeline
+\getresults
+Pipeline aborted, command did not run
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+ ?column?
+----------
+ 1
+(1 row)
+
+ ?column?
+----------
+ 1
+(1 row)
+
+\endpipeline
+-- In an aborted pipeline, \getresults 1 aborts commands one at a time.
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\syncpipeline
+\getresults 1
+ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
+\getresults 1
+Pipeline aborted, command did not run
+\getresults 1
+Pipeline aborted, command did not run
+\getresults 1
+Pipeline aborted, command did not run
+\getresults 1
+\endpipeline
+-- Test chunked results with an aborted pipeline.
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
+SELECT $1 \bind \g
+\endpipeline
+fetching results in chunked mode failed
+Pipeline aborted, command did not run
+\unset FETCH_COUNT
+-- \getresults returns an error when an incorrect number is provided.
+\startpipeline
+\getresults -1
+\getresults: invalid number of requested results
+\endpipeline
+-- \getresults when there is no result should not impact the next
+-- query executed.
+\getresults 1
+No pending results to get
+select 1;
+ ?column?
+----------
+ 1
+(1 row)
+
+-- Error messages accumulate and are repeated.
+\startpipeline
+SELECT 1 \bind \g
+SELECT 1;
+PQsendQuery not allowed in pipeline mode
+SELECT 1;
+PQsendQuery not allowed in pipeline mode
+PQsendQuery not allowed in pipeline mode
+\endpipeline
+ ?column?
+----------
+ 1
+(1 row)
+
+-- Clean up
+DROP TABLE psql_pipeline;
test: create_table_like alter_generic alter_operator misc async dbsize merge misc_functions sysviews tsrf tid tidscan tidrangescan collate.utf8 collate.icu.utf8 incremental_sort create_role without_overlaps generated_virtual
# collate.linux.utf8 and collate.icu.utf8 tests cannot be run in parallel with each other
-test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8 collate.windows.win1252
+test: rules psql psql_crosstab psql_pipeline amutils stats_ext collate.linux.utf8 collate.windows.win1252
# ----------
# Run these alone so they don't run out of parallel workers
\echo arg1 arg2 arg3 arg4 arg5
\echo arg1
\encoding arg1
+ \endpipeline
\errverbose
\f arg1
+ \flush
+ \flushrequest
\g arg1
\gx arg1
\gexec
+ \getresults
SELECT 1 AS one \gset
\h
\?
\setenv arg1 arg2
\sf whole_line
\sv whole_line
+ \startpipeline
+ \syncpipeline
\t arg1
\T arg1
\timing arg1
--- /dev/null
+--
+-- Tests using psql pipelining
+--
+
+CREATE TABLE psql_pipeline(a INTEGER PRIMARY KEY, s TEXT);
+
+-- Single query
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- Multiple queries
+\startpipeline
+SELECT $1 \bind 'val1' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+
+-- Test \flush
+\startpipeline
+\flush
+SELECT $1 \bind 'val1' \g
+\flush
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+
+-- Send multiple syncs
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\syncpipeline
+\syncpipeline
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\syncpipeline
+SELECT $1, $2 \bind 'val4' 'val5' \g
+\endpipeline
+
+-- \startpipeline should not have any effect if already in a pipeline.
+\startpipeline
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- Convert an implicit transaction block to an explicit transaction block.
+\startpipeline
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 2 \g
+ROLLBACK \bind \g
+\endpipeline
+
+-- Multiple explicit transactions
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+COMMIT \bind \g
+\endpipeline
+
+-- COPY FROM STDIN
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\endpipeline
+2 test2
+\.
+
+-- COPY FROM STDIN with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\flushrequest
+\getresults
+3 test3
+\.
+\endpipeline
+
+-- COPY FROM STDIN with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\syncpipeline
+\getresults
+4 test4
+\.
+\endpipeline
+
+-- COPY TO STDOUT
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\endpipeline
+
+-- COPY TO STDOUT with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\flushrequest
+\getresults
+\endpipeline
+
+-- COPY TO STDOUT with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\syncpipeline
+\getresults
+\endpipeline
+
+-- Use \parse and \bind_named
+\startpipeline
+SELECT $1 \parse ''
+SELECT $1, $2 \parse ''
+SELECT $2 \parse pipeline_1
+\bind_named '' 1 2 \g
+\bind_named pipeline_1 2 \g
+\endpipeline
+
+-- \getresults displays all results preceding a \flushrequest.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+\endpipeline
+
+-- \getresults displays all results preceding a \syncpipeline.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\syncpipeline
+\getresults
+\endpipeline
+
+-- \getresults immediately returns if there is no result to fetch.
+\startpipeline
+\getresults
+SELECT $1 \bind 2 \g
+\getresults
+\flushrequest
+\endpipeline
+\getresults
+
+-- \getresults only fetches results preceding a \flushrequest.
+\startpipeline
+SELECT $1 \bind 2 \g
+\flushrequest
+SELECT $1 \bind 2 \g
+\getresults
+\endpipeline
+
+-- \getresults only fetches results preceding a \syncpipeline.
+\startpipeline
+SELECT $1 \bind 2 \g
+\syncpipeline
+SELECT $1 \bind 2 \g
+\getresults
+\endpipeline
+
+-- Use pipeline with chunked results for both \getresults and \endpipeline.
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+SELECT $1 \bind 2 \g
+\endpipeline
+\unset FETCH_COUNT
+
+-- \getresults with specific number of requested results.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 1
+SELECT $1 \bind 4 \g
+\getresults 3
+\endpipeline
+
+-- \syncpipeline count as one command to fetch for \getresults.
+\startpipeline
+\syncpipeline
+\syncpipeline
+SELECT $1 \bind 1 \g
+\flushrequest
+\getresults 2
+\getresults 1
+\endpipeline
+
+-- \getresults 0 should get all the results.
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 0
+\endpipeline
+
+--
+-- Pipeline errors
+--
+
+-- \endpipeline outside of pipeline should fail
+\endpipeline
+
+-- Query using simple protocol should not be sent and should leave the
+-- pipeline usable.
+\startpipeline
+SELECT 1;
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- After an aborted pipeline, commands after a \syncpipeline should be
+-- displayed.
+\startpipeline
+SELECT $1 \bind \g
+\syncpipeline
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- For an incorrect number of parameters, the pipeline is aborted and
+-- the following queries will not be executed.
+\startpipeline
+SELECT \bind 'val1' \g
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- An explicit transaction with an error needs to be rollbacked after
+-- the pipeline.
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+\endpipeline
+ROLLBACK;
+
+-- \watch sends a simple query, something not allowed within a pipeline.
+\startpipeline
+SELECT \bind \g
+\watch 1
+\endpipeline
+
+-- \gdesc should fail as synchronous commands are not allowed in a pipeline,
+-- and the pipeline should still be usable.
+\startpipeline
+SELECT $1 \bind 1 \gdesc
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- \gset is not allowed in a pipeline, pipeline should still be usable.
+\startpipeline
+SELECT $1 as i, $2 as j \parse ''
+SELECT $1 as k, $2 as l \parse 'second'
+\bind_named '' 1 2 \gset
+\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
+\bind_named '' 1 2 \g
+\endpipeline
+
+-- \gx is not allowed, pipeline should still be usable.
+\startpipeline
+SELECT $1 \bind 1 \gx
+\reset
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- \gx warning should be emitted in an aborted pipeline, with
+-- pipeline still usable.
+\startpipeline
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+SELECT $1 \bind 1 \gx
+\endpipeline
+
+-- \gexec is not allowed, pipeline should still be usable.
+\startpipeline
+SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
+\bind_named insert_stmt \gexec
+\bind_named insert_stmt \g
+SELECT COUNT(*) FROM psql_pipeline \bind \g
+\endpipeline
+
+-- After an error, pipeline is aborted and requires \syncpipeline to be
+-- reusable.
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+-- Pipeline is aborted.
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+-- Sync allows pipeline to recover.
+\syncpipeline
+\getresults
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+\endpipeline
+
+-- In an aborted pipeline, \getresults 1 aborts commands one at a time.
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\syncpipeline
+\getresults 1
+\getresults 1
+\getresults 1
+\getresults 1
+\getresults 1
+\endpipeline
+
+-- Test chunked results with an aborted pipeline.
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+SELECT $1 \bind \g
+\endpipeline
+\unset FETCH_COUNT
+
+-- \getresults returns an error when an incorrect number is provided.
+\startpipeline
+\getresults -1
+\endpipeline
+
+-- \getresults when there is no result should not impact the next
+-- query executed.
+\getresults 1
+select 1;
+
+-- Error messages accumulate and are repeated.
+\startpipeline
+SELECT 1 \bind \g
+SELECT 1;
+SELECT 1;
+\endpipeline
+
+-- Clean up
+DROP TABLE psql_pipeline;