Skip to content

Commit 0bafb66

Browse files
committed
Encoding problem
1 parent 5a1aeec commit 0bafb66

File tree

2 files changed

+150
-56
lines changed

2 files changed

+150
-56
lines changed

exec_plan.c

Lines changed: 149 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "nodes/params.h"
1212
#include "optimizer/planner.h"
1313
#include "storage/lmgr.h"
14+
#include "tcop/utility.h"
1415
#include "utils/builtins.h"
1516
#include "utils/guc.h"
1617
#include "utils/plancache.h"
@@ -22,25 +23,33 @@ PG_FUNCTION_INFO_V1(pg_execute_plan);
2223

2324
void _PG_init(void);
2425

26+
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
2527
static planner_hook_type prev_planner_hook = NULL;
28+
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
2629
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
2730

31+
static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
32+
ProcessUtilityContext context, ParamListInfo params,
33+
QueryEnvironment *queryEnv, DestReceiver *dest,
34+
char *completionTag);
2835
static PlannedStmt *HOOK_Planner_injection(Query *parse, int cursorOptions,
2936
ParamListInfo boundParams);
37+
static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
3038
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
3139
static char * serialize_plan(PlannedStmt *pstmt, ParamListInfo boundParams,
32-
int *size);
40+
const char *querySourceText);
41+
static void execute_query(char *planString);
42+
3343
static PGconn *conn = NULL;
3444

3545
int node_number1 = 0;
36-
//#include "utils/guc.h"
46+
3747
/*
3848
* Module load/unload callback
3949
*/
4050
void
4151
_PG_init(void)
4252
{
43-
elog(LOG, "_PG_Init");
4453
DefineCustomIntVariable("pargres.node",
4554
"Node number in instances collaboration",
4655
NULL,
@@ -54,28 +63,90 @@ _PG_init(void)
5463
NULL,
5564
NULL);
5665

66+
/* ProcessUtility hook */
67+
next_ProcessUtility_hook = ProcessUtility_hook;
68+
ProcessUtility_hook = HOOK_Utility_injection;
69+
5770
/* Planner hook */
5871
prev_planner_hook = planner_hook;
5972
planner_hook = HOOK_Planner_injection;
6073

74+
prev_ExecutorStart = ExecutorStart_hook;
75+
ExecutorStart_hook = HOOK_ExecStart_injection;
76+
6177
prev_ExecutorEnd = ExecutorEnd_hook;
6278
ExecutorEnd_hook = HOOK_ExecEnd_injection;
6379
}
6480

6581
static void
66-
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
82+
HOOK_Utility_injection(PlannedStmt *pstmt,
83+
const char *queryString,
84+
ProcessUtilityContext context,
85+
ParamListInfo params,
86+
QueryEnvironment *queryEnv,
87+
DestReceiver *dest,
88+
char *completionTag)
6789
{
68-
PGresult *result;
90+
Node *parsetree = pstmt->utilityStmt;
91+
92+
if ((OidIsValid(get_extension_oid("execplan", true))) &&
93+
(node_number1 == 0) &&
94+
(nodeTag(parsetree) == T_CreateStmt))
95+
{
96+
char conninfo[1024];
97+
int status;
98+
99+
elog(LOG, "Send UTILITY query: %s", queryString);
100+
101+
/* Connect to slave and send it a query plan */
102+
sprintf(conninfo, "host=localhost port=5433%c", '\0');
103+
conn = PQconnectdb(conninfo);
104+
if (PQstatus(conn) == CONNECTION_BAD)
105+
elog(LOG, "Connection error. conninfo: %s", conninfo);
106+
107+
status = PQsendQuery(conn, queryString);
108+
if (status == 0)
109+
elog(ERROR, "Query sending error: %s", PQerrorMessage(conn));
110+
}
111+
112+
if (next_ProcessUtility_hook)
113+
(*next_ProcessUtility_hook) (pstmt, queryString, context, params,
114+
queryEnv, dest, completionTag);
115+
else
116+
standard_ProcessUtility(pstmt, queryString,
117+
context, params, queryEnv,
118+
dest, completionTag);
69119

70-
/* Execute before hook because it destruct memory context of exchange list */
71120
if (conn)
121+
{
122+
PGresult *result;
123+
72124
while ((result = PQgetResult(conn)) != NULL)
73125
Assert(PQresultStatus(result) != PGRES_FATAL_ERROR);
126+
PQfinish(conn);
127+
conn = NULL;
128+
}
129+
}
74130

75-
if (prev_ExecutorEnd)
76-
prev_ExecutorEnd(queryDesc);
77-
else
78-
standard_ExecutorEnd(queryDesc);
131+
static void
132+
execute_query(char *planString)
133+
{
134+
char conninfo[1024];
135+
char *SQLCommand;
136+
int status;
137+
138+
/* Connect to slave and send it a query plan */
139+
sprintf(conninfo, "host=localhost port=5433%c", '\0');
140+
conn = PQconnectdb(conninfo);
141+
if (PQstatus(conn) == CONNECTION_BAD)
142+
elog(LOG, "Connection error. conninfo: %s", conninfo);
143+
144+
SQLCommand = (char *) palloc0(strlen(planString)+100);
145+
sprintf(SQLCommand, "SELECT pg_execute_plan('%s')", planString);
146+
//elog(LOG, "query: %s", SQLCommand);
147+
status = PQsendQuery(conn, SQLCommand);
148+
if (status == 0)
149+
elog(ERROR, "Query sending error: %s", PQerrorMessage(conn));
79150
}
80151

81152
static PlannedStmt *
@@ -91,72 +162,103 @@ HOOK_Planner_injection(Query *parse, int cursorOptions,
91162
else
92163
pstmt = standard_planner(parse, cursorOptions, boundParams);
93164

94-
if (node_number1 > 0)
165+
if ((node_number1 > 0) || (parse->utilityStmt != NULL))
95166
return pstmt;
96-
else
97-
printf("SEND Query\n");
98167

99168
/* Extension is not initialized. */
100169
if (OidIsValid(get_extension_oid("execplan", true)))
101170
{
102-
char conninfo[1024];
103-
char *data,
104-
*SQLCommand;
105-
int status,
106-
data_size;
107171

108-
/* Connect to slave and send it a query plan */
109-
sprintf(conninfo, "host=localhost port=5433%c", '\0');
110-
conn = PQconnectdb(conninfo);
111-
if (PQstatus(conn) == CONNECTION_BAD)
112-
elog(LOG, "Connection error. conninfo: %s", conninfo);
113-
114-
data = serialize_plan(pstmt, boundParams, &data_size);
115-
SQLCommand = (char *) palloc0(strlen(data)+100);
116-
sprintf(SQLCommand, "SELECT pg_execute_plan('%s')", data);
117-
elog(LOG, "query: %s", SQLCommand);
118-
status = PQsendQuery(conn, SQLCommand);
119-
if (status == 0)
120-
elog(ERROR, "Query sending error: %s", PQerrorMessage(conn));
121172
}
122173
return pstmt;
123174
}
124175

176+
static void
177+
HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
178+
{
179+
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
180+
181+
if ((OidIsValid(get_extension_oid("execplan", true))) &&
182+
(node_number1 == 0) &&
183+
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)))
184+
{
185+
elog(LOG, "Send query: %s", queryDesc->sourceText);
186+
execute_query(serialize_plan(queryDesc->plannedstmt, queryDesc->params,
187+
queryDesc->sourceText));
188+
}
189+
else
190+
{
191+
// elog(LOG, "EXECUTOR Process query without sending. IsParsetree=%hhu node_number1=%d IsExt=%hhu", parsetree != NULL, node_number1, OidIsValid(get_extension_oid("execplan", true)));
192+
}
193+
194+
if (prev_ExecutorStart)
195+
prev_ExecutorStart(queryDesc, eflags);
196+
else
197+
standard_ExecutorStart(queryDesc, eflags);
198+
}
199+
200+
static void
201+
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
202+
{
203+
/* Execute before hook because it destruct memory context of exchange list */
204+
if (conn)
205+
{
206+
PGresult *result;
207+
208+
while ((result = PQgetResult(conn)) != NULL)
209+
Assert(PQresultStatus(result) != PGRES_FATAL_ERROR);
210+
PQfinish(conn);
211+
conn = NULL;
212+
}
213+
214+
if (prev_ExecutorEnd)
215+
prev_ExecutorEnd(queryDesc);
216+
else
217+
standard_ExecutorEnd(queryDesc);
218+
}
219+
125220
#include "utils/fmgrprotos.h"
126221

127222
static char *
128-
serialize_plan(PlannedStmt *pstmt, ParamListInfo boundParams, int *size)
223+
serialize_plan(PlannedStmt *pstmt, ParamListInfo boundParams,
224+
const char *querySourceText)
129225
{
130226
int splan_len,
131227
sparams_len,
228+
qtext_len,
132229
econtainer_len;
133230
char *serialized_plan,
134231
*container,
135232
*start_address,
136233
*econtainer;
137234

138-
Assert(size != NULL);
139-
140235
serialized_plan = nodeToString(pstmt);
141236

142-
/* We use splan_len+1 bytes for include end-of-string symbol. */
237+
/* We use len+1 bytes for include end-of-string symbol. */
143238
splan_len = strlen(serialized_plan) + 1;
239+
qtext_len = strlen(querySourceText) + 1;
144240

145241
sparams_len = EstimateParamListSpace(boundParams);
146242

147-
container = (char *) palloc0(splan_len+sparams_len);
243+
container = (char *) palloc0(splan_len + sparams_len + qtext_len);
148244
//elog(LOG, "Serialize sizes: plan: %d params: %d, numParams: %d", splan_len, sparams_len, boundParams->numParams);
149245
memcpy(container, serialized_plan, splan_len);
150-
start_address = container+splan_len;
246+
start_address = container + splan_len;
151247
SerializeParamList(boundParams, &start_address);
152248

153-
econtainer_len = esc_enc_len(container, splan_len+sparams_len);
154-
econtainer = (char *) palloc0(econtainer_len + 1);
249+
Assert(start_address == container + splan_len + sparams_len);
250+
memcpy(start_address, querySourceText, qtext_len);
155251

156-
Assert(econtainer_len == esc_encode(container, splan_len+sparams_len, econtainer));
252+
econtainer_len = pg_base64_enc_len(container, splan_len + sparams_len + qtext_len);
253+
econtainer = (char *) palloc0(econtainer_len + 1);
254+
if (econtainer_len != pg_base64_encode(container, splan_len + sparams_len +
255+
qtext_len, econtainer))
256+
elog(LOG, "econtainer_len: %d %d", econtainer_len, pg_base64_encode(container, splan_len + sparams_len +
257+
qtext_len, econtainer));
258+
Assert(econtainer_len == pg_base64_encode(container, splan_len + sparams_len +
259+
qtext_len, econtainer));
157260
econtainer[econtainer_len] = '\0';
158-
*size = econtainer_len + 1;
159-
elog(LOG, "Serialize sizes: econtainer: %d", *size);
261+
160262
return econtainer;
161263
}
162264

@@ -209,30 +311,22 @@ pg_execute_plan(PG_FUNCTION_ARGS)
209311
char *data = TextDatumGetCString(PG_GETARG_DATUM(0));
210312
PlannedStmt *pstmt;
211313
QueryDesc *queryDesc;
212-
char queryString[5] = "NONE";
314+
char *queryString;
213315
ParamListInfo paramLI = NULL;
214316
int dec_tot_len;
215317
char *dcontainer,
216318
*start_addr;
217319

218-
elog(LOG, "datalen=%lu\n", strlen(data));
219320
/* Compute decoded size of bytea data */
220-
dec_tot_len = esc_dec_len(data, strlen(data));
221-
elog(LOG, "dec_tot_len=%d datalen=%lu\n", dec_tot_len, strlen(data));
321+
dec_tot_len = pg_base64_dec_len(data, strlen(data));
222322
dcontainer = (char *) palloc0(dec_tot_len);
223-
Assert(dec_tot_len == esc_decode(data, strlen(data), dcontainer));
323+
Assert(dec_tot_len == pg_base64_decode(data, strlen(data), dcontainer));
224324

225325
pstmt = (PlannedStmt *) stringToNode((char *) dcontainer);
226-
elog(LOG, "Serialize Plan Size=%lu\n", strlen(dcontainer));
227326
start_addr = dcontainer + strlen(dcontainer) + 1;
228327
paramLI = RestoreParamList((char **) &start_addr);
229-
elog(LOG, "Decoded params. numParams: %d\n", paramLI->numParams);
230-
// printf("INCOMING: %s\n", data);
231-
// PG_RETURN_BOOL(true);
232-
/* Execute query plan. Based on execParallel.c ParallelQueryMain() */
233-
//
234-
// ptr += strlen((const char *) ptr);
235-
//
328+
queryString = start_addr;
329+
// elog(LOG, "Decoded query: %s\n", start_addr);
236330

237331
queryDesc = CreateQueryDesc(pstmt, queryString, GetActiveSnapshot(),
238332
InvalidSnapshot, None_Receiver, paramLI, NULL,

execplan.control

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Pargres extension
1+
# Execplan extension
22
comment = 'Execute raw query plan'
33
default_version = '0.1'
44
module_pathname = '$libdir/execplan'

0 commit comments

Comments
 (0)