Skip to content

Commit 74f40cc

Browse files
committed
COPYing FROM to parent table instead of foreign, assuming we using shardman.
Also, callback args simplified.
1 parent b605edd commit 74f40cc

File tree

3 files changed

+84
-99
lines changed

3 files changed

+84
-99
lines changed

src/include/partition_filter.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,15 @@ typedef struct
4343
} ResultRelInfoHolder;
4444

4545

46-
/* Forward declaration (for on_new_rri_holder()) */
46+
/* Forward declaration (for on_rri_holder()) */
4747
struct ResultPartsStorage;
4848
typedef struct ResultPartsStorage ResultPartsStorage;
4949

5050
/*
51-
* Callback to be fired at rri_holder creation.
51+
* Callback to be fired at rri_holder creation/destruction.
5252
*/
53-
typedef void (*on_new_rri_holder)(EState *estate,
54-
ResultRelInfoHolder *rri_holder,
55-
const ResultPartsStorage *rps_storage,
56-
void *arg);
53+
typedef void (*on_rri_holder)(ResultRelInfoHolder *rri_holder,
54+
const ResultPartsStorage *rps_storage);
5755

5856
/*
5957
* Cached ResultRelInfos of partitions.
@@ -66,7 +64,7 @@ struct ResultPartsStorage
6664

6765
bool speculative_inserts; /* for ExecOpenIndices() */
6866

69-
on_new_rri_holder on_new_rri_holder_callback;
67+
on_rri_holder on_new_rri_holder_callback;
7068
void *callback_arg;
7169

7270
EState *estate; /* pointer to executor's state */
@@ -116,11 +114,11 @@ void init_result_parts_storage(ResultPartsStorage *parts_storage,
116114
EState *estate,
117115
bool speculative_inserts,
118116
Size table_entry_size,
119-
on_new_rri_holder on_new_rri_holder_cb,
117+
on_rri_holder on_new_rri_holder_cb,
120118
void *on_new_rri_holder_cb_arg);
121119

122120
void fini_result_parts_storage(ResultPartsStorage *parts_storage,
123-
bool close_rels);
121+
bool close_rels, on_rri_holder hook);
124122

125123
ResultRelInfoHolder * scan_result_parts_storage(Oid partid,
126124
ResultPartsStorage *storage);

src/partition_filter.c

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,12 @@ CustomScanMethods partition_filter_plan_methods;
6868
CustomExecMethods partition_filter_exec_methods;
6969

7070

71-
static void prepare_rri_for_insert(EState *estate,
72-
ResultRelInfoHolder *rri_holder,
73-
const ResultPartsStorage *rps_storage,
74-
void *arg);
75-
static void prepare_rri_returning_for_insert(EState *estate,
76-
ResultRelInfoHolder *rri_holder,
77-
const ResultPartsStorage *rps_storage,
78-
void *arg);
79-
static void prepare_rri_fdw_for_insert(EState *estate,
80-
ResultRelInfoHolder *rri_holder,
81-
const ResultPartsStorage *rps_storage,
82-
void *arg);
71+
static void prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
72+
const ResultPartsStorage *rps_storage);
73+
static void prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
74+
const ResultPartsStorage *rps_storage);
75+
static void prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
76+
const ResultPartsStorage *rps_storage);
8377
static Node *fix_returning_list_mutator(Node *node, void *state);
8478

8579
static Index append_rte_to_estate(EState *estate, RangeTblEntry *rte);
@@ -143,7 +137,7 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,
143137
EState *estate,
144138
bool speculative_inserts,
145139
Size table_entry_size,
146-
on_new_rri_holder on_new_rri_holder_cb,
140+
on_rri_holder on_new_rri_holder_cb,
147141
void *on_new_rri_holder_cb_arg)
148142
{
149143
HASHCTL *result_rels_table_config = &parts_storage->result_rels_table_config;
@@ -177,16 +171,21 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,
177171

178172
/* Free ResultPartsStorage (close relations etc) */
179173
void
180-
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)
174+
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels,
175+
on_rri_holder hook)
181176
{
182177
HASH_SEQ_STATUS stat;
183178
ResultRelInfoHolder *rri_holder; /* ResultRelInfo holder */
184179

185-
/* Close partitions and free free conversion-related stuff */
186-
if (close_rels)
180+
hash_seq_init(&stat, parts_storage->result_rels_table);
181+
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
187182
{
188-
hash_seq_init(&stat, parts_storage->result_rels_table);
189-
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
183+
/* Call destruction hook, if needed */
184+
if (hook != NULL)
185+
hook(rri_holder, parts_storage);
186+
187+
/* Close partitions and free free conversion-related stuff */
188+
if (close_rels)
190189
{
191190
ExecCloseIndices(rri_holder->result_rel_info);
192191

@@ -202,13 +201,8 @@ fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)
202201

203202
free_conversion_map(rri_holder->tuple_map);
204203
}
205-
}
206-
207-
/* Else just free conversion-related stuff */
208-
else
209-
{
210-
hash_seq_init(&stat, parts_storage->result_rels_table);
211-
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
204+
/* Else just free conversion-related stuff */
205+
else
212206
{
213207
/* Skip if there's no map */
214208
if (!rri_holder->tuple_map)
@@ -329,10 +323,8 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
329323

330324
/* Call on_new_rri_holder_callback() if needed */
331325
if (parts_storage->on_new_rri_holder_callback)
332-
parts_storage->on_new_rri_holder_callback(parts_storage->estate,
333-
rri_holder,
334-
parts_storage,
335-
parts_storage->callback_arg);
326+
parts_storage->on_new_rri_holder_callback(rri_holder,
327+
parts_storage);
336328

337329
/* Finally append ResultRelInfo to storage->es_alloc_result_rels */
338330
append_rri_to_estate(parts_storage->estate, child_result_rel_info);
@@ -702,7 +694,7 @@ partition_filter_end(CustomScanState *node)
702694
PartitionFilterState *state = (PartitionFilterState *) node;
703695

704696
/* Executor will close rels via estate->es_result_relations */
705-
fini_result_parts_storage(&state->result_parts, false);
697+
fini_result_parts_storage(&state->result_parts, false, NULL);
706698

707699
Assert(list_length(node->custom_ps) == 1);
708700
ExecEndNode((PlanState *) linitial(node->custom_ps));
@@ -793,34 +785,33 @@ pfilter_build_tlist(Relation parent_rel, List *tlist)
793785

794786
/* Main trigger */
795787
static void
796-
prepare_rri_for_insert(EState *estate,
797-
ResultRelInfoHolder *rri_holder,
798-
const ResultPartsStorage *rps_storage,
799-
void *arg)
788+
prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
789+
const ResultPartsStorage *rps_storage)
800790
{
801-
prepare_rri_returning_for_insert(estate, rri_holder, rps_storage, arg);
802-
prepare_rri_fdw_for_insert(estate, rri_holder, rps_storage, arg);
791+
prepare_rri_returning_for_insert(rri_holder, rps_storage);
792+
prepare_rri_fdw_for_insert(rri_holder, rps_storage);
803793
}
804794

805795
/* Prepare 'RETURNING *' tlist & projection */
806796
static void
807-
prepare_rri_returning_for_insert(EState *estate,
808-
ResultRelInfoHolder *rri_holder,
809-
const ResultPartsStorage *rps_storage,
810-
void *arg)
797+
prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
798+
const ResultPartsStorage *rps_storage)
811799
{
812800
PartitionFilterState *pfstate;
813801
List *returning_list;
814802
ResultRelInfo *child_rri,
815803
*parent_rri;
816804
Index parent_rt_idx;
817805
TupleTableSlot *result_slot;
806+
EState *estate;
807+
808+
estate = rps_storage->estate;
818809

819810
/* We don't need to do anything ff there's no map */
820811
if (!rri_holder->tuple_map)
821812
return;
822813

823-
pfstate = (PartitionFilterState *) arg;
814+
pfstate = (PartitionFilterState *) rps_storage->callback_arg;
824815
returning_list = pfstate->returning_list;
825816

826817
/* Exit if there's no RETURNING list */
@@ -857,14 +848,15 @@ prepare_rri_returning_for_insert(EState *estate,
857848

858849
/* Prepare FDW access structs */
859850
static void
860-
prepare_rri_fdw_for_insert(EState *estate,
861-
ResultRelInfoHolder *rri_holder,
862-
const ResultPartsStorage *rps_storage,
863-
void *arg)
851+
prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
852+
const ResultPartsStorage *rps_storage)
864853
{
865854
ResultRelInfo *rri = rri_holder->result_rel_info;
866855
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
867856
Oid partid;
857+
EState *estate;
858+
859+
estate = rps_storage->estate;
868860

869861
/* Nothing to do if not FDW */
870862
if (fdw_routine == NULL)

src/utility_stmt_hooking.c

Lines changed: 40 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ static uint64 PathmanCopyFrom(CopyState cstate,
6464
List *range_table,
6565
bool old_protocol);
6666

67-
static void prepare_rri_for_copy(EState *estate,
68-
ResultRelInfoHolder *rri_holder,
69-
const ResultPartsStorage *rps_storage,
70-
void *arg);
67+
static void prepare_rri_for_copy(ResultRelInfoHolder *rri_holder,
68+
const ResultPartsStorage *rps_storage);
69+
static void finish_rri_copy(ResultRelInfoHolder *rri_holder,
70+
const ResultPartsStorage *rps_storage);
7171

7272

7373
/*
@@ -105,20 +105,6 @@ is_pathman_related_copy(Node *parsetree)
105105
/* Check that relation is partitioned */
106106
if (get_pathman_relation_info(parent_relid))
107107
{
108-
ListCell *lc;
109-
110-
/* Analyze options list */
111-
foreach (lc, copy_stmt->options)
112-
{
113-
DefElem *defel = (DefElem *) lfirst(lc);
114-
115-
Assert(IsA(defel, DefElem));
116-
117-
/* We do not support freeze */
118-
if (strcmp(defel->defname, "freeze") == 0)
119-
elog(ERROR, "freeze is not supported for partitioned tables");
120-
}
121-
122108
/* Emit ERROR if we can't see the necessary symbols */
123109
#ifdef DISABLE_PATHMAN_COPY
124110
elog(ERROR, "COPY is not supported for partitioned tables on Windows");
@@ -481,6 +467,10 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
481467

482468
uint64 processed = 0;
483469

470+
/* We do not support freeze */
471+
if (cstate->freeze)
472+
elog(ERROR, "freeze is not supported for partitioned tables");
473+
484474

485475
tupDesc = RelationGetDescr(parent_rel);
486476

@@ -684,26 +674,8 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
684674

685675
ExecResetTupleTable(estate->es_tupleTable, false);
686676

687-
{
688-
/* Shut down FDWs. TODO: make hook in fini_result_parts_storage? */
689-
HASH_SEQ_STATUS stat;
690-
ResultRelInfoHolder *rri_holder; /* ResultRelInfo holder */
691-
692-
hash_seq_init(&stat, parts_storage.result_rels_table);
693-
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
694-
{
695-
ResultRelInfo *resultRelInfo = rri_holder->result_rel_info;
696-
697-
if (resultRelInfo->ri_FdwRoutine)
698-
{
699-
resultRelInfo->ri_FdwRoutine->EndForeignCopyFrom(
700-
estate, resultRelInfo);
701-
}
702-
}
703-
}
704-
705677
/* Close partitions and destroy hash table */
706-
fini_result_parts_storage(&parts_storage, true);
678+
fini_result_parts_storage(&parts_storage, true, finish_rri_copy);
707679

708680
/* Close parent's indices */
709681
ExecCloseIndices(parent_result_rel);
@@ -717,23 +689,46 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
717689
* Init COPY FROM, if supported.
718690
*/
719691
static void
720-
prepare_rri_for_copy(EState *estate,
721-
ResultRelInfoHolder *rri_holder,
722-
const ResultPartsStorage *rps_storage,
723-
void *arg)
692+
prepare_rri_for_copy(ResultRelInfoHolder *rri_holder,
693+
const ResultPartsStorage *rps_storage)
724694
{
725-
ResultRelInfo *rri = rri_holder->result_rel_info;
726-
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
727-
CopyState cstate = (CopyState) arg;
695+
ResultRelInfo *rri = rri_holder->result_rel_info;
696+
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
697+
CopyState cstate = (CopyState) rps_storage->callback_arg;
698+
ResultRelInfo *parent_rri;
699+
const char *parent_relname;
700+
EState *estate;
701+
702+
estate = rps_storage->estate;
728703

729704
if (fdw_routine != NULL)
730705
{
706+
parent_rri = rps_storage->saved_rel_info;
707+
parent_relname = psprintf(
708+
"%s.%s", "public",
709+
quote_identifier(RelationGetRelationName(parent_rri->ri_RelationDesc)));
731710
if (!FdwCopyFromIsSupported(fdw_routine))
732711
ereport(ERROR,
733712
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
734713
errmsg("FDW adapter for relation \"%s\" doesn't support COPY FROM",
735714
RelationGetRelationName(rri->ri_RelationDesc))));
736-
rri->ri_FdwRoutine->BeginForeignCopyFrom(estate, rri, cstate);
715+
fdw_routine->BeginForeignCopyFrom(estate, rri, cstate, parent_relname);
716+
}
717+
}
718+
719+
/*
720+
* Shut down FDWs.
721+
*/
722+
static void
723+
finish_rri_copy(ResultRelInfoHolder *rri_holder,
724+
const ResultPartsStorage *rps_storage)
725+
{
726+
ResultRelInfo *resultRelInfo = rri_holder->result_rel_info;
727+
728+
if (resultRelInfo->ri_FdwRoutine)
729+
{
730+
resultRelInfo->ri_FdwRoutine->EndForeignCopyFrom(
731+
rps_storage->estate, resultRelInfo);
737732
}
738733
}
739734

0 commit comments

Comments
 (0)