Skip to content

Commit d3c5f37

Browse files
committed
Make dblink interruptible, via new libpqsrv APIs.
This replaces dblink's blocking libpq calls, allowing cancellation and allowing DROP DATABASE (of a database not involved in the query). Apart from explicit dblink_cancel_query() calls, dblink still doesn't cancel the remote side. The replacement for the blocking calls consists of new, general-purpose query execution wrappers in the libpqsrv facility. Out-of-tree extensions should adopt these. Use them in postgres_fdw, replacing a local implementation from which the libpqsrv implementation derives. This is a bug fix for dblink. Code inspection identified the bug at least thirteen years ago, but user complaints have not appeared. Hence, no back-patch for now. Discussion: https://postgr.es/m/[email protected]
1 parent 0efc831 commit d3c5f37

File tree

8 files changed

+180
-95
lines changed

8 files changed

+180
-95
lines changed

contrib/dblink/dblink.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "utils/memutils.h"
6262
#include "utils/rel.h"
6363
#include "utils/varlena.h"
64+
#include "utils/wait_event.h"
6465

6566
PG_MODULE_MAGIC;
6667

@@ -133,6 +134,7 @@ static HTAB *remoteConnHash = NULL;
133134
/* custom wait event values, retrieved from shared memory */
134135
static uint32 dblink_we_connect = 0;
135136
static uint32 dblink_we_get_conn = 0;
137+
static uint32 dblink_we_get_result = 0;
136138

137139
/*
138140
* Following is list that holds multiple remote connections.
@@ -252,6 +254,9 @@ dblink_init(void)
252254
{
253255
if (!pconn)
254256
{
257+
if (dblink_we_get_result == 0)
258+
dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
259+
255260
pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
256261
pconn->conn = NULL;
257262
pconn->openCursorCount = 0;
@@ -442,7 +447,7 @@ dblink_open(PG_FUNCTION_ARGS)
442447
/* If we are not in a transaction, start one */
443448
if (PQtransactionStatus(conn) == PQTRANS_IDLE)
444449
{
445-
res = PQexec(conn, "BEGIN");
450+
res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
446451
if (PQresultStatus(res) != PGRES_COMMAND_OK)
447452
dblink_res_internalerror(conn, res, "begin error");
448453
PQclear(res);
@@ -461,7 +466,7 @@ dblink_open(PG_FUNCTION_ARGS)
461466
(rconn->openCursorCount)++;
462467

463468
appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
464-
res = PQexec(conn, buf.data);
469+
res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
465470
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
466471
{
467472
dblink_res_error(conn, conname, res, fail,
@@ -530,7 +535,7 @@ dblink_close(PG_FUNCTION_ARGS)
530535
appendStringInfo(&buf, "CLOSE %s", curname);
531536

532537
/* close the cursor */
533-
res = PQexec(conn, buf.data);
538+
res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
534539
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
535540
{
536541
dblink_res_error(conn, conname, res, fail,
@@ -550,7 +555,7 @@ dblink_close(PG_FUNCTION_ARGS)
550555
{
551556
rconn->newXactForCursor = false;
552557

553-
res = PQexec(conn, "COMMIT");
558+
res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
554559
if (PQresultStatus(res) != PGRES_COMMAND_OK)
555560
dblink_res_internalerror(conn, res, "commit error");
556561
PQclear(res);
@@ -632,7 +637,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
632637
* PGresult will be long-lived even though we are still in a short-lived
633638
* memory context.
634639
*/
635-
res = PQexec(conn, buf.data);
640+
res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
636641
if (!res ||
637642
(PQresultStatus(res) != PGRES_COMMAND_OK &&
638643
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -780,7 +785,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
780785
else
781786
{
782787
/* async result retrieval, do it the old way */
783-
PGresult *res = PQgetResult(conn);
788+
PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);
784789

785790
/* NULL means we're all done with the async results */
786791
if (res)
@@ -1088,7 +1093,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
10881093
PQclear(sinfo.last_res);
10891094
PQclear(sinfo.cur_res);
10901095
/* and clear out any pending data in libpq */
1091-
while ((res = PQgetResult(conn)) != NULL)
1096+
while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
1097+
NULL)
10921098
PQclear(res);
10931099
PG_RE_THROW();
10941100
}
@@ -1115,7 +1121,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
11151121
{
11161122
CHECK_FOR_INTERRUPTS();
11171123

1118-
sinfo->cur_res = PQgetResult(conn);
1124+
sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
11191125
if (!sinfo->cur_res)
11201126
break;
11211127

@@ -1443,7 +1449,7 @@ dblink_exec(PG_FUNCTION_ARGS)
14431449
if (!conn)
14441450
dblink_conn_not_avail(conname);
14451451

1446-
res = PQexec(conn, sql);
1452+
res = libpqsrv_exec(conn, sql, dblink_we_get_result);
14471453
if (!res ||
14481454
(PQresultStatus(res) != PGRES_COMMAND_OK &&
14491455
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2739,8 +2745,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
27392745

27402746
/*
27412747
* If we don't get a message from the PGresult, try the PGconn. This is
2742-
* needed because for connection-level failures, PQexec may just return
2743-
* NULL, not a PGresult at all.
2748+
* needed because for connection-level failures, PQgetResult may just
2749+
* return NULL, not a PGresult at all.
27442750
*/
27452751
if (message_primary == NULL)
27462752
message_primary = pchomp(PQerrorMessage(conn));

contrib/postgres_fdw/connection.c

Lines changed: 17 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
187187
{
188188
HASHCTL ctl;
189189

190+
if (pgfdw_we_get_result == 0)
191+
pgfdw_we_get_result =
192+
WaitEventExtensionNew("PostgresFdwGetResult");
193+
190194
ctl.keysize = sizeof(ConnCacheKey);
191195
ctl.entrysize = sizeof(ConnCacheEntry);
192196
ConnectionHash = hash_create("postgres_fdw connections", 8,
@@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
716720
*/
717721
if (consume_input && !PQconsumeInput(conn))
718722
pgfdw_report_error(ERROR, NULL, conn, false, sql);
719-
res = pgfdw_get_result(conn, sql);
723+
res = pgfdw_get_result(conn);
720724
if (PQresultStatus(res) != PGRES_COMMAND_OK)
721725
pgfdw_report_error(ERROR, res, conn, true, sql);
722726
PQclear(res);
@@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn)
819823
/*
820824
* Submit a query and wait for the result.
821825
*
822-
* This function is interruptible by signals.
826+
* Since we don't use non-blocking mode, this can't process interrupts while
827+
* pushing the query text to the server. That risk is relatively small, so we
828+
* ignore that for now.
823829
*
824830
* Caller is responsible for the error handling on the result.
825831
*/
@@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
830836
if (state && state->pendingAreq)
831837
process_pending_request(state->pendingAreq);
832838

833-
/*
834-
* Submit a query. Since we don't use non-blocking mode, this also can
835-
* block. But its risk is relatively small, so we ignore that for now.
836-
*/
837839
if (!PQsendQuery(conn, query))
838-
pgfdw_report_error(ERROR, NULL, conn, false, query);
839-
840-
/* Wait for the result. */
841-
return pgfdw_get_result(conn, query);
840+
return NULL;
841+
return pgfdw_get_result(conn);
842842
}
843843

844844
/*
845-
* Wait for the result from a prior asynchronous execution function call.
846-
*
847-
* This function offers quick responsiveness by checking for any interruptions.
848-
*
849-
* This function emulates PQexec()'s behavior of returning the last result
850-
* when there are many.
845+
* Wrap libpqsrv_get_result_last(), adding wait event.
851846
*
852847
* Caller is responsible for the error handling on the result.
853848
*/
854849
PGresult *
855-
pgfdw_get_result(PGconn *conn, const char *query)
850+
pgfdw_get_result(PGconn *conn)
856851
{
857-
PGresult *volatile last_res = NULL;
858-
859-
/* In what follows, do not leak any PGresults on an error. */
860-
PG_TRY();
861-
{
862-
for (;;)
863-
{
864-
PGresult *res;
865-
866-
while (PQisBusy(conn))
867-
{
868-
int wc;
869-
870-
/* first time, allocate or get the custom wait event */
871-
if (pgfdw_we_get_result == 0)
872-
pgfdw_we_get_result = WaitEventExtensionNew("PostgresFdwGetResult");
873-
874-
/* Sleep until there's something to do */
875-
wc = WaitLatchOrSocket(MyLatch,
876-
WL_LATCH_SET | WL_SOCKET_READABLE |
877-
WL_EXIT_ON_PM_DEATH,
878-
PQsocket(conn),
879-
-1L, pgfdw_we_get_result);
880-
ResetLatch(MyLatch);
881-
882-
CHECK_FOR_INTERRUPTS();
883-
884-
/* Data available in socket? */
885-
if (wc & WL_SOCKET_READABLE)
886-
{
887-
if (!PQconsumeInput(conn))
888-
pgfdw_report_error(ERROR, NULL, conn, false, query);
889-
}
890-
}
891-
892-
res = PQgetResult(conn);
893-
if (res == NULL)
894-
break; /* query is complete */
895-
896-
PQclear(last_res);
897-
last_res = res;
898-
}
899-
}
900-
PG_CATCH();
901-
{
902-
PQclear(last_res);
903-
PG_RE_THROW();
904-
}
905-
PG_END_TRY();
906-
907-
return last_res;
852+
return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
908853
}
909854

910855
/*
@@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
945890

946891
/*
947892
* If we don't get a message from the PGresult, try the PGconn. This
948-
* is needed because for connection-level failures, PQexec may just
949-
* return NULL, not a PGresult at all.
893+
* is needed because for connection-level failures, PQgetResult may
894+
* just return NULL, not a PGresult at all.
950895
*/
951896
if (message_primary == NULL)
952897
message_primary = pchomp(PQerrorMessage(conn));
@@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
1046991
*/
1047992
if (entry->have_prep_stmt && entry->have_error)
1048993
{
1049-
res = PQexec(entry->conn, "DEALLOCATE ALL");
994+
res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
995+
NULL);
1050996
PQclear(res);
1051997
}
1052998
entry->have_prep_stmt = false;

contrib/postgres_fdw/deparse.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3815,7 +3815,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first,
38153815
* Print the representation of a parameter to be sent to the remote side.
38163816
*
38173817
* Note: we always label the Param's type explicitly rather than relying on
3818-
* transmitting a numeric type OID in PQexecParams(). This allows us to
3818+
* transmitting a numeric type OID in PQsendQueryParams(). This allows us to
38193819
* avoid assuming that types have the same OIDs on the remote side as they
38203820
* do locally --- they need only have the same names.
38213821
*/

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3760,7 +3760,7 @@ create_cursor(ForeignScanState *node)
37603760
* We don't use a PG_TRY block here, so be careful not to throw error
37613761
* without releasing the PGresult.
37623762
*/
3763-
res = pgfdw_get_result(conn, buf.data);
3763+
res = pgfdw_get_result(conn);
37643764
if (PQresultStatus(res) != PGRES_COMMAND_OK)
37653765
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
37663766
PQclear(res);
@@ -3810,7 +3810,7 @@ fetch_more_data(ForeignScanState *node)
38103810
* The query was already sent by an earlier call to
38113811
* fetch_more_data_begin. So now we just fetch the result.
38123812
*/
3813-
res = pgfdw_get_result(conn, fsstate->query);
3813+
res = pgfdw_get_result(conn);
38143814
/* On error, report the original query, not the FETCH. */
38153815
if (PQresultStatus(res) != PGRES_TUPLES_OK)
38163816
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -4159,7 +4159,7 @@ execute_foreign_modify(EState *estate,
41594159
* We don't use a PG_TRY block here, so be careful not to throw error
41604160
* without releasing the PGresult.
41614161
*/
4162-
res = pgfdw_get_result(fmstate->conn, fmstate->query);
4162+
res = pgfdw_get_result(fmstate->conn);
41634163
if (PQresultStatus(res) !=
41644164
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
41654165
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -4229,7 +4229,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
42294229
* We don't use a PG_TRY block here, so be careful not to throw error
42304230
* without releasing the PGresult.
42314231
*/
4232-
res = pgfdw_get_result(fmstate->conn, fmstate->query);
4232+
res = pgfdw_get_result(fmstate->conn);
42334233
if (PQresultStatus(res) != PGRES_COMMAND_OK)
42344234
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
42354235
PQclear(res);
@@ -4571,7 +4571,7 @@ execute_dml_stmt(ForeignScanState *node)
45714571
* We don't use a PG_TRY block here, so be careful not to throw error
45724572
* without releasing the PGresult.
45734573
*/
4574-
dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
4574+
dmstate->result = pgfdw_get_result(dmstate->conn);
45754575
if (PQresultStatus(dmstate->result) !=
45764576
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
45774577
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,

contrib/postgres_fdw/postgres_fdw.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ extern void ReleaseConnection(PGconn *conn);
162162
extern unsigned int GetCursorNumber(PGconn *conn);
163163
extern unsigned int GetPrepStmtNumber(PGconn *conn);
164164
extern void do_sql_command(PGconn *conn, const char *sql);
165-
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
165+
extern PGresult *pgfdw_get_result(PGconn *conn);
166166
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
167167
PgFdwConnState *state);
168168
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,

doc/src/sgml/dblink.sgml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@
3737
</para>
3838
</listitem>
3939
</varlistentry>
40+
41+
<varlistentry>
42+
<term><literal>DblinkGetResult</literal></term>
43+
<listitem>
44+
<para>
45+
Waiting to receive the results of a query from a remote server.
46+
</para>
47+
</listitem>
48+
</varlistentry>
4049
</variablelist>
4150

4251
<para>

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -653,12 +653,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
653653
* Send a query and wait for the results by using the asynchronous libpq
654654
* functions and socket readiness events.
655655
*
656-
* We must not use the regular blocking libpq functions like PQexec()
657-
* since they are uninterruptible by signals on some platforms, such as
658-
* Windows.
659-
*
660-
* The function is modeled on PQexec() in libpq, but only implements
661-
* those parts that are in use in the walreceiver api.
656+
* The function is modeled on libpqsrv_exec(), with the behavior difference
657+
* being that it calls ProcessWalRcvInterrupts(). As an optimization, it
658+
* skips try/catch, since all errors terminate the process.
662659
*
663660
* May return NULL, rather than an error result, on failure.
664661
*/

0 commit comments

Comments
 (0)