Skip to content

Commit 0c867b9

Browse files
committed
Use Postgres-XL approach with postgres_fdw extension
1 parent d2d5401 commit 0c867b9

File tree

7 files changed

+234
-118
lines changed

7 files changed

+234
-118
lines changed

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ PGFILEDESC = "Repeater"
77
MODULES = repeater
88
OBJS = repeater.o $(WIN32RES)
99

10-
PG_CPPFLAGS = -I$(libpq_srcdir)
10+
fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw/
11+
12+
PG_CPPFLAGS = -I$(libpq_srcdir) -I$(fdw_srcdir) -L$(fdw_srcdir)
1113
SHLIB_LINK_INTERNAL = $(libpq)
1214

1315
DATA_built = $(EXTENSION)--$(EXTVERSION).sql
@@ -17,10 +19,12 @@ PG_CONFIG = pg_config
1719
PGXS := $(shell $(PG_CONFIG) --pgxs)
1820
include $(PGXS)
1921
else
22+
EXTRA_INSTALL = contrib/postgres_fdw
2023
SHLIB_PREREQS = submake-libpq
2124
subdir = contrib/repeater
2225
top_builddir = ../..
2326
include $(top_builddir)/src/Makefile.global
27+
#include $(top_builddir)/contrib/postgres_fdw/Makefile
2428
include $(top_srcdir)/contrib/contrib-global.mk
2529
endif
2630

repeater.c

Lines changed: 143 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
3939
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
4040

4141
/* Remote instance parameters. */
42-
char *repeater_host_name;
43-
int repeater_port_number;
42+
char *remote_server_fdwname;
4443

4544
static bool ExtensionIsActivated = false;
4645
static PGconn *conn = NULL;
@@ -51,24 +50,11 @@ static PGconn *conn = NULL;
5150
void
5251
_PG_init(void)
5352
{
54-
DefineCustomStringVariable("repeater.host",
55-
"Remote host name for plan execution",
53+
DefineCustomStringVariable("repeater.fdwname",
54+
"Remote host fdw name",
5655
NULL,
57-
&repeater_host_name,
58-
"localhost",
59-
PGC_SIGHUP,
60-
GUC_NOT_IN_SAMPLE,
61-
NULL,
62-
NULL,
63-
NULL);
64-
65-
DefineCustomIntVariable("repeater.port",
66-
"Port number of remote instance",
67-
NULL,
68-
&repeater_port_number,
69-
5432,
70-
1,
71-
65565,
56+
&remote_server_fdwname,
57+
"remoteserv",
7258
PGC_SIGHUP,
7359
GUC_NOT_IN_SAMPLE,
7460
NULL,
@@ -86,26 +72,6 @@ _PG_init(void)
8672
ExecutorEnd_hook = HOOK_ExecEnd_injection;
8773
}
8874

89-
static PGconn*
90-
EstablishConnection(void)
91-
{
92-
char conninfo[1024];
93-
94-
if (conn != NULL)
95-
return conn;
96-
97-
/* Connect to slave and send it a query plan */
98-
sprintf(conninfo, "host=%s port=%d %c", repeater_host_name, repeater_port_number, '\0');
99-
conn = PQconnectdb(conninfo);
100-
101-
if (PQstatus(conn) == CONNECTION_BAD)
102-
elog(LOG, "Connection error. conninfo: %s", conninfo);
103-
else
104-
elog(LOG, "Connection established: host=%s, port=%d", repeater_host_name, repeater_port_number);
105-
106-
return conn;
107-
}
108-
10975
static bool
11076
ExtensionIsActive(void)
11177
{
@@ -122,6 +88,82 @@ ExtensionIsActive(void)
12288
return ExtensionIsActivated;
12389
}
12490

91+
#include "miscadmin.h"
92+
#include "pgstat.h"
93+
#include "storage/latch.h"
94+
95+
#include "foreign/foreign.h"
96+
#include "postgres_fdw.h"
97+
98+
static Oid serverid = InvalidOid;
99+
static UserMapping *user = NULL;
100+
101+
static bool
102+
pgfdw_cancel_query(PGconn *conn)
103+
{
104+
PGcancel *cancel;
105+
char errbuf[256];
106+
PGresult *result = NULL;
107+
108+
if ((cancel = PQgetCancel(conn)))
109+
{
110+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
111+
{
112+
printf("LAV: Cancel - ERROR\n");
113+
ereport(WARNING,
114+
(errcode(ERRCODE_CONNECTION_FAILURE),
115+
errmsg("could not send cancel request: %s",
116+
errbuf)));
117+
PQfreeCancel(cancel);
118+
return false;
119+
}
120+
else
121+
printf("LAV: Cancel - OK\n");
122+
123+
PQfreeCancel(cancel);
124+
}
125+
else
126+
printf("---ERROR---");
127+
128+
PQconsumeInput(conn);
129+
PQclear(result);
130+
131+
return true;
132+
}
133+
134+
static void
135+
cancelQueryIfNeeded(PGconn *conn, const char *query)
136+
{
137+
Assert(conn != NULL);
138+
Assert(query != NULL);
139+
140+
if (PQtransactionStatus(conn) != PQTRANS_IDLE)
141+
{
142+
PGresult *res;
143+
144+
printf("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n",
145+
PQstatus(conn),
146+
PQtransactionStatus(conn),
147+
PQerrorMessage(conn));
148+
149+
res = PQgetResult(conn);
150+
// printf("status AFTER result request=%d, txs: %d errmsg: %s, resstatus: %s\n",
151+
// PQstatus(conn),
152+
// PQtransactionStatus(conn),
153+
// PQerrorMessage(conn),
154+
// PQresStatus(PQresultStatus(res)));
155+
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
156+
// {
157+
Assert(pgfdw_cancel_query(conn));
158+
// printf("TRY to CANCEL query. status=%d, txs: %d errmsg: %s, resstatus: %s\n", PQstatus(conn), PQtransactionStatus(conn), PQerrorMessage(conn),
159+
// PQresStatus(PQresultStatus(res)));
160+
// }
161+
else
162+
pgfdw_get_result(conn, query);
163+
}
164+
165+
}
166+
125167
/*
126168
* We need to send some DML queries for sync database schema to a plan execution
127169
* at a remote instance.
@@ -136,33 +178,50 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
136178
char *completionTag)
137179
{
138180
Node *parsetree = pstmt->utilityStmt;
139-
PGresult *result;
140181

141-
/*
142-
* Very non-trivial decision about transferring utility query to data nodes.
143-
* This exception list used for demonstration and let us to execute some
144-
* simple queries.
145-
*/
146182
if (ExtensionIsActive() &&
147183
pstmt->canSetTag &&
148-
(nodeTag(parsetree) != T_CopyStmt) &&
149-
(nodeTag(parsetree) != T_CreateExtensionStmt) &&
150-
(nodeTag(parsetree) != T_ExplainStmt) &&
151-
(nodeTag(parsetree) != T_FetchStmt) &&
152184
(context != PROCESS_UTILITY_SUBCOMMAND)
153185
)
154186
{
155-
/*
156-
* Previous query could be completed with error report at this instance.
157-
* In this case, we need to prepare connection to the remote instance.
158-
*/
159-
while ((result = PQgetResult(EstablishConnection())) != NULL);
160-
161-
if (PQsendQuery(EstablishConnection(), queryString) == 0)
162-
elog(ERROR, "Connection error: query: %s, status=%d, errmsg=%s",
163-
queryString,
164-
PQstatus(EstablishConnection()),
165-
PQerrorMessage(EstablishConnection()));
187+
if (!user)
188+
{
189+
MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
190+
191+
serverid = get_foreign_server_oid(remote_server_fdwname, true);
192+
Assert(OidIsValid(serverid));
193+
194+
user = GetUserMapping(GetUserId(), serverid);
195+
MemoryContextSwitchTo(oldCxt);
196+
}
197+
switch (nodeTag(parsetree))
198+
{
199+
case T_CopyStmt:
200+
case T_CreateExtensionStmt:
201+
case T_ExplainStmt:
202+
case T_FetchStmt:
203+
case T_VacuumStmt:
204+
break;
205+
default:
206+
if (nodeTag(parsetree) == T_TransactionStmt)
207+
{
208+
TransactionStmt *stmt = (TransactionStmt *) parsetree;
209+
210+
if (
211+
// (stmt->kind != TRANS_STMT_ROLLBACK_TO) &&
212+
(stmt->kind != TRANS_STMT_SAVEPOINT)
213+
)
214+
break;
215+
}
216+
if (conn)
217+
cancelQueryIfNeeded(conn, queryString);
218+
conn = GetConnection(user, true);
219+
cancelQueryIfNeeded(conn, queryString);
220+
Assert(conn != NULL);
221+
222+
Assert(PQsendQuery(conn, queryString));
223+
break;
224+
};
166225
}
167226

168227
if (next_ProcessUtility_hook)
@@ -172,26 +231,22 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
172231
standard_ProcessUtility(pstmt, queryString,
173232
context, params, queryEnv,
174233
dest, completionTag);
175-
176-
/*
177-
* Check end of query execution at the remote instance.
178-
*/
179234
if (conn)
180-
while ((result = PQgetResult(conn)) != NULL);
235+
cancelQueryIfNeeded(conn, queryString);
236+
// pgfdw_get_result(conn, queryString);
181237
}
182238

239+
183240
static void
184241
HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
185242
{
186243
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
187-
PGresult *result;
188-
PGconn *dest = EstablishConnection();
189244

190245
if (prev_ExecutorStart)
191246
prev_ExecutorStart(queryDesc, eflags);
192247
else
193248
standard_ExecutorStart(queryDesc, eflags);
194-
elog(LOG, "QUERY: %s", queryDesc->sourceText);
249+
195250
/*
196251
* This not fully correct sign for prevent passing each subquery to
197252
* the remote instance. Only for demo.
@@ -200,30 +255,32 @@ elog(LOG, "QUERY: %s", queryDesc->sourceText);
200255
queryDesc->plannedstmt->canSetTag &&
201256
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)) &&
202257
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
203-
{
204-
/*
205-
* Prepare connection.
206-
*/
207-
while ((result = PQgetResult(dest)) != NULL);
208-
elog(LOG, "->QUERY: %s", queryDesc->sourceText);
209-
if (PQsendPlan(dest, serialize_plan(queryDesc, eflags)) == 0)
210-
/*
211-
* Report about remote execution error.
212-
*/
213-
elog(ERROR, "Connection errors during PLAN transferring: status=%d, errmsg=%s",
214-
PQstatus(dest), PQerrorMessage(dest));
215-
}
258+
{
259+
Oid serverid;
260+
UserMapping *user;
261+
262+
serverid = get_foreign_server_oid(remote_server_fdwname, true);
263+
Assert(OidIsValid(serverid));
264+
265+
user = GetUserMapping(GetUserId(), serverid);
266+
conn = GetConnection(user, true);
267+
cancelQueryIfNeeded(conn, queryDesc->sourceText);
268+
269+
if (PQsendPlan(conn, serialize_plan(queryDesc, eflags)) == 0)
270+
{
271+
pgfdw_report_error(ERROR, NULL, conn, false, queryDesc->sourceText);
272+
Assert(0);
273+
}
274+
else
275+
printf("Send Query %s - OK\n", queryDesc->sourceText);
276+
}
216277
}
217278

218279
static void
219280
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
220281
{
221282
if (conn)
222-
{
223-
PGresult *result;
224-
225-
while ((result = PQgetResult(conn)) != NULL);
226-
}
283+
cancelQueryIfNeeded(conn, queryDesc->sourceText);
227284

228285
if (prev_ExecutorEnd)
229286
prev_ExecutorEnd(queryDesc);

repeater.control

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ comment = 'Execute raw query plan on remote node'
33
default_version = '0.1'
44
module_pathname = '$libdir/repeater'
55
relocatable = false
6+
requires = 'postgres_fdw'

0 commit comments

Comments
 (0)