24
24
#include "utils/guc.h"
25
25
#include "utils/rel.h"
26
26
27
+
28
+ #define MTHackField (mt_state , field ) ( (mt_state)->field )
29
+
30
+
27
31
bool pg_pathman_enable_partition_router = true;
28
32
29
33
CustomScanMethods partition_router_plan_methods ;
30
34
CustomExecMethods partition_router_exec_methods ;
31
35
32
36
37
+ /* FIXME: replace this magic with a CustomScan */
38
+ static ExecProcNodeMtd mt_method = NULL ;
39
+
40
+
41
+ static TupleTableSlot * router_run_modify_table (PlanState * state );
42
+
43
+ static TupleTableSlot * router_set_slot (PartitionRouterState * state ,
44
+ TupleTableSlot * slot ,
45
+ CmdType operation );
46
+ static TupleTableSlot * router_get_slot (PartitionRouterState * state ,
47
+ bool * should_process );
48
+
33
49
static void router_lazy_init_junkfilter (PartitionRouterState * state , EState * estate );
34
50
static void router_lazy_init_constraint (PartitionRouterState * state );
35
51
@@ -110,6 +126,7 @@ prepare_modify_table_for_partition_router(PlanState *state, void *context)
110
126
if (IsA (state , ModifyTableState ))
111
127
{
112
128
ModifyTableState * mt_state = (ModifyTableState * ) state ;
129
+ bool changed_method = false;
113
130
int i ;
114
131
115
132
for (i = 0 ; i < mt_state -> mt_nplans ; i ++ )
@@ -121,8 +138,19 @@ prepare_modify_table_for_partition_router(PlanState *state, void *context)
121
138
if (IsPartitionFilterState (pf_state ) &&
122
139
IsPartitionRouterState (pr_state = linitial (pf_state -> custom_ps )))
123
140
{
124
- /* HACK: PartitionRouter might change ModifyTable's state */
141
+ /* HACK: point to ModifyTable in PartitionRouter */
125
142
pr_state -> mt_state = mt_state ;
143
+
144
+ if (!changed_method )
145
+ {
146
+ if (!mt_method )
147
+ mt_method = state -> ExecProcNodeReal ;
148
+
149
+ /* HACK: replace ModifyTable's execution method */
150
+ ExecSetExecProcNode (state , router_run_modify_table );
151
+
152
+ changed_method = true;
153
+ }
126
154
}
127
155
}
128
156
}
@@ -166,17 +194,18 @@ partition_router_begin(CustomScanState *node, EState *estate, int eflags)
166
194
TupleTableSlot *
167
195
partition_router_exec (CustomScanState * node )
168
196
{
169
- EState * estate = node -> ss .ps .state ;
170
- PlanState * child_ps = (PlanState * ) linitial ( node -> custom_ps ) ;
171
- PartitionRouterState * state = ( PartitionRouterState * ) node ;
172
- TupleTableSlot * slot ;
197
+ EState * estate = node -> ss .ps .state ;
198
+ PartitionRouterState * state = (PartitionRouterState * ) node ;
199
+ TupleTableSlot * slot ;
200
+ bool should_process ;
173
201
174
202
take_next_tuple :
175
- /* execute PartitionFilter child node */
176
- slot = ExecProcNode ( child_ps );
203
+ /* Get next tuple for processing */
204
+ slot = router_get_slot ( state , & should_process );
177
205
178
- if (! TupIsNull ( slot ) )
206
+ if (should_process )
179
207
{
208
+ CmdType new_cmd ;
180
209
bool deleted ;
181
210
ItemPointerData ctid ;
182
211
@@ -203,13 +232,14 @@ partition_router_exec(CustomScanState *node)
203
232
if (TupIsNull (slot ))
204
233
goto take_next_tuple ;
205
234
206
- /* HACK: change command type in ModifyTable */
207
- state -> mt_state -> operation = deleted ? CMD_INSERT : CMD_UPDATE ;
235
+ /* Should we use UPDATE or DELETE + INSERT? */
236
+ new_cmd = deleted ? CMD_INSERT : CMD_UPDATE ;
208
237
209
- return slot ;
238
+ /* Alter ModifyTable's state and return */
239
+ return router_set_slot (state , slot , new_cmd );
210
240
}
211
241
212
- return NULL ;
242
+ return slot ;
213
243
}
214
244
215
245
void
@@ -218,15 +248,20 @@ partition_router_end(CustomScanState *node)
218
248
PartitionRouterState * state = (PartitionRouterState * ) node ;
219
249
220
250
Assert (list_length (node -> custom_ps ) == 1 );
221
- EvalPlanQualEnd (& state -> epqstate );
222
251
ExecEndNode ((PlanState * ) linitial (node -> custom_ps ));
252
+
253
+ EvalPlanQualEnd (& state -> epqstate );
223
254
}
224
255
225
256
void
226
257
partition_router_rescan (CustomScanState * node )
227
258
{
259
+ PartitionRouterState * state = (PartitionRouterState * ) node ;
260
+
228
261
Assert (list_length (node -> custom_ps ) == 1 );
229
262
ExecReScan ((PlanState * ) linitial (node -> custom_ps ));
263
+
264
+ state -> saved_slot = NULL ;
230
265
}
231
266
232
267
void
@@ -236,6 +271,87 @@ partition_router_explain(CustomScanState *node, List *ancestors, ExplainState *e
236
271
}
237
272
238
273
274
+ static TupleTableSlot *
275
+ router_run_modify_table (PlanState * state )
276
+ {
277
+ ModifyTableState * mt_state ;
278
+ TupleTableSlot * slot ;
279
+ int mt_plans_old ,
280
+ mt_plans_new ;
281
+
282
+ mt_state = (ModifyTableState * ) state ;
283
+
284
+ mt_plans_old = MTHackField (mt_state , mt_nplans );
285
+
286
+ /* Fetch next tuple */
287
+ slot = mt_method (state );
288
+
289
+ mt_plans_new = MTHackField (mt_state , mt_nplans );
290
+
291
+ /* PartitionRouter asked us to restart */
292
+ if (mt_plans_new != mt_plans_old )
293
+ {
294
+ int state_idx = mt_state -> mt_whichplan - 1 ;
295
+
296
+ /* HACK: partially restore ModifyTable's state */
297
+ MTHackField (mt_state , mt_done ) = false;
298
+ MTHackField (mt_state , mt_nplans ) = mt_plans_old ;
299
+ MTHackField (mt_state , mt_whichplan ) = state_idx ;
300
+
301
+ /* Restart ModifyTable */
302
+ return mt_method (state );
303
+ }
304
+
305
+ return slot ;
306
+ }
307
+
308
+ static TupleTableSlot *
309
+ router_set_slot (PartitionRouterState * state ,
310
+ TupleTableSlot * slot ,
311
+ CmdType operation )
312
+ {
313
+ ModifyTableState * mt_state = state -> mt_state ;
314
+
315
+ Assert (!TupIsNull (slot ));
316
+
317
+ if (mt_state -> operation == operation )
318
+ return slot ;
319
+
320
+ /* HACK: alter ModifyTable's state */
321
+ MTHackField (mt_state , mt_nplans ) = - mt_state -> mt_whichplan ;
322
+ MTHackField (mt_state , operation ) = operation ;
323
+
324
+ /* Set saved_slot and yield */
325
+ state -> saved_slot = slot ;
326
+ return NULL ;
327
+ }
328
+
329
+ static TupleTableSlot *
330
+ router_get_slot (PartitionRouterState * state ,
331
+ bool * should_process )
332
+ {
333
+ TupleTableSlot * slot ;
334
+
335
+ if (!TupIsNull (state -> saved_slot ))
336
+ {
337
+ /* Reset saved_slot */
338
+ slot = state -> saved_slot ;
339
+ state -> saved_slot = NULL ;
340
+
341
+ /* We shouldn't process preserved slot... */
342
+ * should_process = false;
343
+ }
344
+ else
345
+ {
346
+ slot = ExecProcNode ((PlanState * ) linitial (state -> css .custom_ps ));
347
+
348
+ /* But we have to process non-empty slot */
349
+ * should_process = !TupIsNull (slot );
350
+ }
351
+
352
+ return slot ;
353
+ }
354
+
239
355
static void
240
356
router_lazy_init_junkfilter (PartitionRouterState * state , EState * estate )
241
357
{
0 commit comments