Skip to content

Commit 6dfba1a

Browse files
committed
Initial commit
1 parent 8f5b657 commit 6dfba1a

File tree

6 files changed

+336
-22
lines changed

6 files changed

+336
-22
lines changed

LICENSE

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,11 @@
1-
BSD 2-Clause License
1+
pg_repeater is released under the PostgreSQL License, a liberal Open Source license, similar to the BSD or MIT licenses.
22

3-
Copyright (c) 2019, Andrey Lepikhov
4-
All rights reserved.
3+
Copyright (c) 2018-2019, Postgres Professional
4+
Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
5+
Portions Copyright (c) 1994, The Regents of the University of California
56

6-
Redistribution and use in source and binary forms, with or without
7-
modification, are permitted provided that the following conditions are met:
7+
Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies.
88

9-
* Redistributions of source code must retain the above copyright notice, this
10-
list of conditions and the following disclaimer.
9+
IN NO EVENT SHALL POSTGRES PROFESSIONAL BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF POSTGRES PROFESSIONAL HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1110

12-
* Redistributions in binary form must reproduce the above copyright notice,
13-
this list of conditions and the following disclaimer in the documentation
14-
and/or other materials provided with the distribution.
15-
16-
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17-
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18-
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19-
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
20-
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21-
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22-
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23-
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24-
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25-
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
11+
POSTGRES PROFESSIONAL SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND POSTGRES PROFESSIONAL HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.

Makefile

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# contrib/pg_repeater/Makefile
2+
3+
MODULE_big = pg_repeater
4+
EXTENSION = pg_repeater
5+
EXTVERSION = 0.1
6+
PGFILEDESC = "pg_repeater"
7+
MODULES = pg_repeater1
8+
OBJS = pg_repeater.o $(WIN32RES)
9+
10+
fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw/
11+
execplan_srcdir = $(top_srcdir)/contrib/pg_execplan/
12+
13+
PG_CPPFLAGS = -I$(libpq_srcdir) -I$(fdw_srcdir) -L$(fdw_srcdir) -I$(execplan_srcdir) -L$(execplan_srcdir)
14+
SHLIB_LINK_INTERNAL = $(libpq)
15+
16+
DATA_built = $(EXTENSION)--$(EXTVERSION).sql
17+
18+
ifdef USE_PGXS
19+
PG_CONFIG = pg_config
20+
PGXS := $(shell $(PG_CONFIG) --pgxs)
21+
include $(PGXS)
22+
else
23+
EXTRA_INSTALL = contrib/postgres_fdw contrib/pg_execplan
24+
SHLIB_PREREQS = submake-libpq
25+
subdir = contrib/pg_repeater
26+
top_builddir = ../..
27+
include $(top_builddir)/src/Makefile.global
28+
include $(top_srcdir)/contrib/contrib-global.mk
29+
endif
30+
31+
$(EXTENSION)--$(EXTVERSION).sql: init.sql
32+
cat $^ > $@

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,20 @@
11
# pg_repeater
2-
Repeat all queries at remote instance
2+
PostgreSQL patch & extension for UTILITY queries and query plans execution at
3+
remote instance.
4+
5+
Plan is passed by postgres_fdw connection service. It executed by pg_exec_plan()
6+
routine, introduced by pg_execplan extension.
7+
8+
This project dedicated to query execution problem in DBMS for computing systems
9+
with cluster architecture.
10+
11+
The DBMS may need to execute an identical query plan at each computing node.
12+
Today PostgreSQL can process only SQL statements. But it is not guaranteed, that
13+
the planner at each node will construct same query plan, because different
14+
statistics, relation sizes e.t.c.
15+
16+
This solution based on postgres-xl approach: plan tree is serialized by the
17+
nodeToString() routine.
18+
During serialization we transform all database object identifiers (oid) at each
19+
node field to portable representation.
20+

init.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
\echo Use "CREATE EXTENSION pg_repeater" to load this file. \quit

pg_repeater.c

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* repeater.c
4+
* Simple demo for remote plan execution patch.
5+
*
6+
* Transfer query plan to a remote instance and wait for result.
7+
* Remote instance parameters (host, port) defines by GUCs.
8+
*
9+
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10+
* Portions Copyright (c) 2018-2019, Postgres Professional
11+
*-------------------------------------------------------------------------
12+
*/
13+
14+
#include "postgres.h"
15+
16+
#include "access/xact.h"
17+
#include "commands/extension.h"
18+
#include "executor/executor.h"
19+
#include "fmgr.h"
20+
#include "libpq/libpq.h"
21+
#include "libpq-fe.h"
22+
#include "optimizer/planner.h"
23+
#include "tcop/utility.h"
24+
#include "utils/guc.h"
25+
26+
PG_MODULE_MAGIC;
27+
28+
void _PG_init(void);
29+
30+
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
31+
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
32+
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
33+
34+
static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
35+
ProcessUtilityContext context, ParamListInfo params,
36+
QueryEnvironment *queryEnv, DestReceiver *dest,
37+
char *completionTag);
38+
static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
39+
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
40+
41+
/* Remote instance parameters. */
42+
char *remote_server_fdwname;
43+
44+
static bool ExtensionIsActivated = false;
45+
static PGconn *conn = NULL;
46+
47+
/*
48+
* Module load/unload callback
49+
*/
50+
void
51+
_PG_init(void)
52+
{
53+
DefineCustomStringVariable("repeater.fdwname",
54+
"Remote host fdw name",
55+
NULL,
56+
&remote_server_fdwname,
57+
"remoteserv",
58+
PGC_SIGHUP,
59+
GUC_NOT_IN_SAMPLE,
60+
NULL,
61+
NULL,
62+
NULL);
63+
64+
/* ProcessUtility hook */
65+
next_ProcessUtility_hook = ProcessUtility_hook;
66+
ProcessUtility_hook = HOOK_Utility_injection;
67+
68+
prev_ExecutorStart = ExecutorStart_hook;
69+
ExecutorStart_hook = HOOK_ExecStart_injection;
70+
71+
prev_ExecutorEnd = ExecutorEnd_hook;
72+
ExecutorEnd_hook = HOOK_ExecEnd_injection;
73+
}
74+
75+
static bool
76+
ExtensionIsActive(void)
77+
{
78+
if (ExtensionIsActivated)
79+
return true;
80+
81+
if (
82+
!IsTransactionState() ||
83+
!OidIsValid(get_extension_oid("repeater", true))
84+
)
85+
return false;
86+
87+
ExtensionIsActivated = true;
88+
return ExtensionIsActivated;
89+
}
90+
91+
#include "miscadmin.h"
92+
#include "pgstat.h"
93+
#include "storage/latch.h"
94+
95+
#include "foreign/foreign.h"
96+
#include "postgres_fdw.h"
97+
98+
static Oid serverid = InvalidOid;
99+
static UserMapping *user = NULL;
100+
101+
static bool
102+
pgfdw_cancel_query(PGconn *conn)
103+
{
104+
PGcancel *cancel;
105+
char errbuf[256];
106+
PGresult *result = NULL;
107+
108+
if ((cancel = PQgetCancel(conn)))
109+
{
110+
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
111+
{
112+
ereport(WARNING,
113+
(errcode(ERRCODE_CONNECTION_FAILURE),
114+
errmsg("could not send cancel request: %s",
115+
errbuf)));
116+
PQfreeCancel(cancel);
117+
return false;
118+
}
119+
120+
PQfreeCancel(cancel);
121+
}
122+
else
123+
elog(FATAL, "Can't get connection cancel descriptor");
124+
125+
PQconsumeInput(conn);
126+
PQclear(result);
127+
128+
return true;
129+
}
130+
131+
static void
132+
cancelQueryIfNeeded(PGconn *conn, const char *query)
133+
{
134+
Assert(conn != NULL);
135+
Assert(query != NULL);
136+
137+
if (PQtransactionStatus(conn) != PQTRANS_IDLE)
138+
{
139+
PGresult *res;
140+
141+
printf("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n",
142+
PQstatus(conn),
143+
PQtransactionStatus(conn),
144+
PQerrorMessage(conn));
145+
146+
res = PQgetResult(conn);
147+
148+
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
149+
Assert(pgfdw_cancel_query(conn));
150+
else
151+
pgfdw_get_result(conn, query);
152+
}
153+
154+
}
155+
156+
/*
157+
* We need to send some DML queries for sync database schema to a plan execution
158+
* at a remote instance.
159+
*/
160+
static void
161+
HOOK_Utility_injection(PlannedStmt *pstmt,
162+
const char *queryString,
163+
ProcessUtilityContext context,
164+
ParamListInfo params,
165+
QueryEnvironment *queryEnv,
166+
DestReceiver *dest,
167+
char *completionTag)
168+
{
169+
Node *parsetree = pstmt->utilityStmt;
170+
171+
if (ExtensionIsActive() &&
172+
pstmt->canSetTag &&
173+
(context != PROCESS_UTILITY_SUBCOMMAND)
174+
)
175+
{
176+
if (!user)
177+
{
178+
MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
179+
180+
serverid = get_foreign_server_oid(remote_server_fdwname, true);
181+
Assert(OidIsValid(serverid));
182+
183+
user = GetUserMapping(GetUserId(), serverid);
184+
MemoryContextSwitchTo(oldCxt);
185+
}
186+
switch (nodeTag(parsetree))
187+
{
188+
case T_CopyStmt:
189+
case T_CreateExtensionStmt:
190+
case T_ExplainStmt:
191+
case T_FetchStmt:
192+
case T_VacuumStmt:
193+
break;
194+
default:
195+
if (nodeTag(parsetree) == T_TransactionStmt)
196+
{
197+
TransactionStmt *stmt = (TransactionStmt *) parsetree;
198+
199+
if (
200+
// (stmt->kind != TRANS_STMT_ROLLBACK_TO) &&
201+
(stmt->kind != TRANS_STMT_SAVEPOINT)
202+
)
203+
break;
204+
}
205+
if (conn)
206+
cancelQueryIfNeeded(conn, queryString);
207+
conn = GetConnection(user, true);
208+
cancelQueryIfNeeded(conn, queryString);
209+
Assert(conn != NULL);
210+
211+
Assert(PQsendQuery(conn, queryString));
212+
break;
213+
};
214+
}
215+
216+
if (next_ProcessUtility_hook)
217+
(*next_ProcessUtility_hook) (pstmt, queryString, context, params,
218+
queryEnv, dest, completionTag);
219+
else
220+
standard_ProcessUtility(pstmt, queryString,
221+
context, params, queryEnv,
222+
dest, completionTag);
223+
if (conn)
224+
cancelQueryIfNeeded(conn, queryString);
225+
}
226+
227+
static void
228+
HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
229+
{
230+
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
231+
232+
if (prev_ExecutorStart)
233+
prev_ExecutorStart(queryDesc, eflags);
234+
else
235+
standard_ExecutorStart(queryDesc, eflags);
236+
237+
/*
238+
* This not fully correct sign for prevent passing each subquery to
239+
* the remote instance. Only for demo.
240+
*/
241+
if (ExtensionIsActive() &&
242+
queryDesc->plannedstmt->canSetTag &&
243+
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)) &&
244+
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
245+
{
246+
Oid serverid;
247+
UserMapping *user;
248+
249+
serverid = get_foreign_server_oid(remote_server_fdwname, true);
250+
Assert(OidIsValid(serverid));
251+
252+
user = GetUserMapping(GetUserId(), serverid);
253+
conn = GetConnection(user, true);
254+
cancelQueryIfNeeded(conn, queryDesc->sourceText);
255+
256+
if (PQsendPlan(conn, serialize_plan(queryDesc, eflags)) == 0)
257+
pgfdw_report_error(ERROR, NULL, conn, false, queryDesc->sourceText);
258+
}
259+
}
260+
261+
static void
262+
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
263+
{
264+
if (conn)
265+
cancelQueryIfNeeded(conn, queryDesc->sourceText);
266+
267+
if (prev_ExecutorEnd)
268+
prev_ExecutorEnd(queryDesc);
269+
else
270+
standard_ExecutorEnd(queryDesc);
271+
}

pg_repeater.control

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# pg_repeater extension
2+
comment = 'Pass raw query plan to a remote node'
3+
default_version = '0.1'
4+
module_pathname = '$libdir/pg_repeater'
5+
relocatable = false
6+
requires = 'postgres_fdw pg_execplan'

0 commit comments

Comments
 (0)