Skip to content

Commit 5a1aeec

Browse files
committed
Add params to serialization
1 parent a2f1033 commit 5a1aeec

File tree

2 files changed

+145
-29
lines changed

2 files changed

+145
-29
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ PGXS := $(shell $(PG_CONFIG) --pgxs)
1818
include $(PGXS)
1919
else
2020
SHLIB_PREREQS = submake-libpq
21-
subdir = contrib/pargres
21+
subdir = contrib/execplan
2222
top_builddir = ../..
2323
include $(top_builddir)/src/Makefile.global
2424
include $(top_srcdir)/contrib/contrib-global.mk

exec_plan.c

Lines changed: 144 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11

22
#include "postgres.h"
33

4+
//#include "access/xact.h"
45
#include "commands/extension.h"
56
#include "executor/execdesc.h"
7+
#include "executor/executor.h"
68
#include "fmgr.h"
9+
#include "libpq/libpq.h"
10+
#include "libpq-fe.h"
11+
#include "nodes/params.h"
712
#include "optimizer/planner.h"
813
#include "storage/lmgr.h"
914
#include "utils/builtins.h"
15+
#include "utils/guc.h"
16+
#include "utils/plancache.h"
17+
#include "utils/snapmgr.h"
1018

1119
PG_MODULE_MAGIC;
1220

@@ -15,51 +23,142 @@ PG_FUNCTION_INFO_V1(pg_execute_plan);
1523
void _PG_init(void);
1624

1725
static planner_hook_type prev_planner_hook = NULL;
26+
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
1827

1928
static PlannedStmt *HOOK_Planner_injection(Query *parse, int cursorOptions,
2029
ParamListInfo boundParams);
30+
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
31+
static char * serialize_plan(PlannedStmt *pstmt, ParamListInfo boundParams,
32+
int *size);
33+
static PGconn *conn = NULL;
2134

35+
int node_number1 = 0;
36+
//#include "utils/guc.h"
2237
/*
2338
* Module load/unload callback
2439
*/
2540
void
2641
_PG_init(void)
2742
{
43+
elog(LOG, "_PG_Init");
44+
DefineCustomIntVariable("pargres.node",
45+
"Node number in instances collaboration",
46+
NULL,
47+
&node_number1,
48+
0,
49+
0,
50+
1023,
51+
PGC_SIGHUP,
52+
GUC_NOT_IN_SAMPLE,
53+
NULL,
54+
NULL,
55+
NULL);
56+
2857
/* Planner hook */
2958
prev_planner_hook = planner_hook;
3059
planner_hook = HOOK_Planner_injection;
60+
61+
prev_ExecutorEnd = ExecutorEnd_hook;
62+
ExecutorEnd_hook = HOOK_ExecEnd_injection;
3163
}
3264

33-
static PlannedStmt *HOOK_Planner_injection(Query *parse, int cursorOptions,
65+
static void
66+
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
67+
{
68+
PGresult *result;
69+
70+
/* Execute before hook because it destruct memory context of exchange list */
71+
if (conn)
72+
while ((result = PQgetResult(conn)) != NULL)
73+
Assert(PQresultStatus(result) != PGRES_FATAL_ERROR);
74+
75+
if (prev_ExecutorEnd)
76+
prev_ExecutorEnd(queryDesc);
77+
else
78+
standard_ExecutorEnd(queryDesc);
79+
}
80+
81+
static PlannedStmt *
82+
HOOK_Planner_injection(Query *parse, int cursorOptions,
3483
ParamListInfo boundParams)
3584
{
36-
PlannedStmt *stmt;
37-
char *serialized_plan;
85+
PlannedStmt *pstmt;
86+
87+
conn = NULL;
3888

3989
if (prev_planner_hook)
40-
stmt = prev_planner_hook(parse, cursorOptions, boundParams);
90+
pstmt = prev_planner_hook(parse, cursorOptions, boundParams);
91+
else
92+
pstmt = standard_planner(parse, cursorOptions, boundParams);
93+
94+
if (node_number1 > 0)
95+
return pstmt;
4196
else
42-
stmt = standard_planner(parse, cursorOptions, boundParams);
97+
printf("SEND Query\n");
4398

4499
/* Extension is not initialized. */
45100
if (OidIsValid(get_extension_oid("execplan", true)))
46101
{
47-
FILE *f = fopen("/home/andrey/plans.txt", "at");
48-
if (stmt->paramExecTypes == NIL)
49-
{
50-
elog(LOG, "commandType: %d\n", stmt->commandType);
51-
}
52-
//Assert(stmt->paramExecTypes != NIL);
53-
serialized_plan = nodeToString(stmt);
54-
// fprintf(f, "\n%s\n", serialized_plan);
55-
fclose(f);
102+
char conninfo[1024];
103+
char *data,
104+
*SQLCommand;
105+
int status,
106+
data_size;
107+
108+
/* Connect to slave and send it a query plan */
109+
sprintf(conninfo, "host=localhost port=5433%c", '\0');
110+
conn = PQconnectdb(conninfo);
111+
if (PQstatus(conn) == CONNECTION_BAD)
112+
elog(LOG, "Connection error. conninfo: %s", conninfo);
113+
114+
data = serialize_plan(pstmt, boundParams, &data_size);
115+
SQLCommand = (char *) palloc0(strlen(data)+100);
116+
sprintf(SQLCommand, "SELECT pg_execute_plan('%s')", data);
117+
elog(LOG, "query: %s", SQLCommand);
118+
status = PQsendQuery(conn, SQLCommand);
119+
if (status == 0)
120+
elog(ERROR, "Query sending error: %s", PQerrorMessage(conn));
56121
}
57-
return stmt;
122+
return pstmt;
58123
}
59124

60-
#include "executor/executor.h"
61-
#include "utils/plancache.h"
62-
#include "utils/snapmgr.h"
125+
#include "utils/fmgrprotos.h"
126+
127+
static char *
128+
serialize_plan(PlannedStmt *pstmt, ParamListInfo boundParams, int *size)
129+
{
130+
int splan_len,
131+
sparams_len,
132+
econtainer_len;
133+
char *serialized_plan,
134+
*container,
135+
*start_address,
136+
*econtainer;
137+
138+
Assert(size != NULL);
139+
140+
serialized_plan = nodeToString(pstmt);
141+
142+
/* We use splan_len+1 bytes for include end-of-string symbol. */
143+
splan_len = strlen(serialized_plan) + 1;
144+
145+
sparams_len = EstimateParamListSpace(boundParams);
146+
147+
container = (char *) palloc0(splan_len+sparams_len);
148+
//elog(LOG, "Serialize sizes: plan: %d params: %d, numParams: %d", splan_len, sparams_len, boundParams->numParams);
149+
memcpy(container, serialized_plan, splan_len);
150+
start_address = container+splan_len;
151+
SerializeParamList(boundParams, &start_address);
152+
153+
econtainer_len = esc_enc_len(container, splan_len+sparams_len);
154+
econtainer = (char *) palloc0(econtainer_len + 1);
155+
156+
Assert(econtainer_len == esc_encode(container, splan_len+sparams_len, econtainer));
157+
econtainer[econtainer_len] = '\0';
158+
*size = econtainer_len + 1;
159+
elog(LOG, "Serialize sizes: econtainer: %d", *size);
160+
return econtainer;
161+
}
63162

64163
static void
65164
ScanQueryForLocks(PlannedStmt *pstmt, bool acquire)
@@ -107,17 +206,34 @@ AcquirePlannerLocks(PlannedStmt *pstmt, bool acquire)
107206
Datum
108207
pg_execute_plan(PG_FUNCTION_ARGS)
109208
{
110-
char *data = TextDatumGetCString(PG_GETARG_DATUM(0));
111-
PlannedStmt *pstmt;
112-
QueryDesc *queryDesc;
113-
char queryString[5] = "NONE";
114-
ParamListInfo paramLI = NULL;
115-
116-
elog(INFO, "MESSAGE: %s", data);
117-
209+
char *data = TextDatumGetCString(PG_GETARG_DATUM(0));
210+
PlannedStmt *pstmt;
211+
QueryDesc *queryDesc;
212+
char queryString[5] = "NONE";
213+
ParamListInfo paramLI = NULL;
214+
int dec_tot_len;
215+
char *dcontainer,
216+
*start_addr;
217+
218+
elog(LOG, "datalen=%lu\n", strlen(data));
219+
/* Compute decoded size of bytea data */
220+
dec_tot_len = esc_dec_len(data, strlen(data));
221+
elog(LOG, "dec_tot_len=%d datalen=%lu\n", dec_tot_len, strlen(data));
222+
dcontainer = (char *) palloc0(dec_tot_len);
223+
Assert(dec_tot_len == esc_decode(data, strlen(data), dcontainer));
224+
225+
pstmt = (PlannedStmt *) stringToNode((char *) dcontainer);
226+
elog(LOG, "Serialize Plan Size=%lu\n", strlen(dcontainer));
227+
start_addr = dcontainer + strlen(dcontainer) + 1;
228+
paramLI = RestoreParamList((char **) &start_addr);
229+
elog(LOG, "Decoded params. numParams: %d\n", paramLI->numParams);
230+
// printf("INCOMING: %s\n", data);
231+
// PG_RETURN_BOOL(true);
118232
/* Execute query plan. Based on execParallel.c ParallelQueryMain() */
119-
pstmt = (PlannedStmt *) stringToNode(data);
120-
// pstmt->paramExecTypes = NIL;
233+
//
234+
// ptr += strlen((const char *) ptr);
235+
//
236+
121237
queryDesc = CreateQueryDesc(pstmt, queryString, GetActiveSnapshot(),
122238
InvalidSnapshot, None_Receiver, paramLI, NULL,
123239
0);

0 commit comments

Comments
 (0)