Skip to content

Commit 673f162

Browse files
Alexander PyhalovCommitfest Bot
Alexander Pyhalov
authored and
Commitfest Bot
committed
MergeAppend should support Async Foreign Scan subplans
1 parent e0022f5 commit 673f162

File tree

14 files changed

+885
-5
lines changed

14 files changed

+885
-5
lines changed

contrib/postgres_fdw/expected/postgres_fdw.out

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11308,6 +11308,46 @@ SELECT * FROM result_tbl ORDER BY a;
1130811308
(2 rows)
1130911309

1131011310
DELETE FROM result_tbl;
11311+
-- Test Merge Append
11312+
EXPLAIN (VERBOSE, COSTS OFF)
11313+
SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
11314+
QUERY PLAN
11315+
------------------------------------------------------------------------------------------------------------------------------
11316+
Merge Append
11317+
Sort Key: async_pt.b, async_pt.a
11318+
-> Async Foreign Scan on public.async_p1 async_pt_1
11319+
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
11320+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
11321+
-> Async Foreign Scan on public.async_p2 async_pt_2
11322+
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
11323+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
11324+
(8 rows)
11325+
11326+
SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
11327+
a | b | c
11328+
------+-----+------
11329+
1000 | 0 | 0000
11330+
2000 | 0 | 0000
11331+
1100 | 100 | 0100
11332+
2100 | 100 | 0100
11333+
1200 | 200 | 0200
11334+
2200 | 200 | 0200
11335+
1300 | 300 | 0300
11336+
2300 | 300 | 0300
11337+
1400 | 400 | 0400
11338+
2400 | 400 | 0400
11339+
1500 | 500 | 0500
11340+
2500 | 500 | 0500
11341+
1600 | 600 | 0600
11342+
2600 | 600 | 0600
11343+
1700 | 700 | 0700
11344+
2700 | 700 | 0700
11345+
1800 | 800 | 0800
11346+
2800 | 800 | 0800
11347+
1900 | 900 | 0900
11348+
2900 | 900 | 0900
11349+
(20 rows)
11350+
1131111351
-- Test error handling, if accessing one of the foreign partitions errors out
1131211352
CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
1131311353
SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -11351,6 +11391,35 @@ SELECT * FROM result_tbl ORDER BY a;
1135111391
(3 rows)
1135211392

1135311393
DELETE FROM result_tbl;
11394+
-- Test Merge Append
11395+
EXPLAIN (VERBOSE, COSTS OFF)
11396+
SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
11397+
QUERY PLAN
11398+
------------------------------------------------------------------------------------------------------
11399+
Merge Append
11400+
Sort Key: async_pt.b, async_pt.a
11401+
-> Async Foreign Scan on public.async_p1 async_pt_1
11402+
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
11403+
Filter: (async_pt_1.b === 505)
11404+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
11405+
-> Async Foreign Scan on public.async_p2 async_pt_2
11406+
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
11407+
Filter: (async_pt_2.b === 505)
11408+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
11409+
-> Async Foreign Scan on public.async_p3 async_pt_3
11410+
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
11411+
Filter: (async_pt_3.b === 505)
11412+
Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
11413+
(14 rows)
11414+
11415+
SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
11416+
a | b | c
11417+
------+-----+------
11418+
1505 | 505 | 0505
11419+
2505 | 505 | 0505
11420+
3505 | 505 | 0505
11421+
(3 rows)
11422+
1135411423
DROP FOREIGN TABLE async_p3;
1135511424
DROP TABLE base_tbl3;
1135611425
-- Check case where the partitioned table has local/remote partitions
@@ -11386,6 +11455,37 @@ SELECT * FROM result_tbl ORDER BY a;
1138611455
(3 rows)
1138711456

1138811457
DELETE FROM result_tbl;
11458+
-- Test Merge Append
11459+
EXPLAIN (VERBOSE, COSTS OFF)
11460+
SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
11461+
QUERY PLAN
11462+
------------------------------------------------------------------------------------------------------
11463+
Merge Append
11464+
Sort Key: async_pt.b, async_pt.a
11465+
-> Async Foreign Scan on public.async_p1 async_pt_1
11466+
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
11467+
Filter: (async_pt_1.b === 505)
11468+
Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
11469+
-> Async Foreign Scan on public.async_p2 async_pt_2
11470+
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
11471+
Filter: (async_pt_2.b === 505)
11472+
Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
11473+
-> Sort
11474+
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
11475+
Sort Key: async_pt_3.b, async_pt_3.a
11476+
-> Seq Scan on public.async_p3 async_pt_3
11477+
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
11478+
Filter: (async_pt_3.b === 505)
11479+
(16 rows)
11480+
11481+
SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
11482+
a | b | c
11483+
------+-----+------
11484+
1505 | 505 | 0505
11485+
2505 | 505 | 0505
11486+
3505 | 505 | 0505
11487+
(3 rows)
11488+
1138911489
-- partitionwise joins
1139011490
SET enable_partitionwise_join TO true;
1139111491
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
@@ -12168,6 +12268,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
1216812268
DROP FOREIGN TABLE foreign_tbl CASCADE;
1216912269
NOTICE: drop cascades to foreign table foreign_tbl2
1217012270
DROP TABLE base_tbl;
12271+
-- Test async Merge Append
12272+
CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
12273+
CREATE TABLE base1 (i int, j int, k text);
12274+
CREATE TABLE base2 (i int, j int, k text);
12275+
CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
12276+
SERVER loopback OPTIONS (table_name 'base1');
12277+
CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
12278+
SERVER loopback OPTIONS (table_name 'base2');
12279+
CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
12280+
CREATE TABLE base3 (i int, j int, k text);
12281+
CREATE TABLE base4 (i int, j int, k text);
12282+
CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
12283+
SERVER loopback OPTIONS (table_name 'base3');
12284+
CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
12285+
SERVER loopback OPTIONS (table_name 'base4');
12286+
INSERT INTO distr1
12287+
SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
12288+
INSERT INTO distr2
12289+
SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
12290+
ANALYZE distr1_p1;
12291+
ANALYZE distr1_p2;
12292+
ANALYZE distr2_p1;
12293+
ANALYZE distr2_p2;
12294+
SET enable_partitionwise_join TO ON;
12295+
-- Test joins with async Merge Append
12296+
EXPLAIN (VERBOSE, COSTS OFF)
12297+
SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
12298+
ORDER BY distr2.i LIMIT 10;
12299+
QUERY PLAN
12300+
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
12301+
Limit
12302+
Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
12303+
-> Merge Append
12304+
Sort Key: distr1.i
12305+
-> Async Foreign Scan
12306+
Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
12307+
Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1)
12308+
Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST
12309+
-> Async Foreign Scan
12310+
Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
12311+
Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2)
12312+
Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST
12313+
(12 rows)
12314+
12315+
SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
12316+
ORDER BY distr2.i LIMIT 10;
12317+
i | j | k | i | j | k
12318+
----+-----+---------+----+-----+---------
12319+
10 | 100 | data_10 | 10 | 100 | data_10
12320+
11 | 110 | data_11 | 11 | 110 | data_11
12321+
12 | 120 | data_12 | 12 | 120 | data_12
12322+
13 | 130 | data_13 | 13 | 130 | data_13
12323+
14 | 140 | data_14 | 14 | 140 | data_14
12324+
15 | 150 | data_15 | 15 | 150 | data_15
12325+
16 | 160 | data_16 | 16 | 160 | data_16
12326+
17 | 170 | data_17 | 17 | 170 | data_17
12327+
18 | 180 | data_18 | 18 | 180 | data_18
12328+
19 | 190 | data_19 | 19 | 190 | data_19
12329+
(10 rows)
12330+
12331+
EXPLAIN (VERBOSE, COSTS OFF)
12332+
SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
12333+
ORDER BY distr1.i LIMIT 20;
12334+
QUERY PLAN
12335+
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
12336+
Limit
12337+
Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
12338+
-> Merge Append
12339+
Sort Key: distr1.i
12340+
-> Async Foreign Scan
12341+
Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
12342+
Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1)
12343+
Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST
12344+
-> Async Foreign Scan
12345+
Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
12346+
Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2)
12347+
Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST
12348+
(12 rows)
12349+
12350+
SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
12351+
ORDER BY distr1.i LIMIT 20;
12352+
i | j | k | i | j | k
12353+
-----+------+----------+-----+------+----------
12354+
91 | 910 | data_91 | 91 | 910 | data_91
12355+
92 | 920 | data_92 | 92 | 920 | data_92
12356+
93 | 930 | data_93 | 93 | 930 | data_93
12357+
94 | 940 | data_94 | 94 | 940 | data_94
12358+
95 | 950 | data_95 | 95 | 950 | data_95
12359+
96 | 960 | data_96 | 96 | 960 | data_96
12360+
97 | 970 | data_97 | 97 | 970 | data_97
12361+
98 | 980 | data_98 | 98 | 980 | data_98
12362+
99 | 990 | data_99 | 99 | 990 | data_99
12363+
100 | 1000 | data_100 | 100 | 1000 | data_100
12364+
101 | 1010 | data_101 | | |
12365+
102 | 1020 | data_102 | | |
12366+
103 | 1030 | data_103 | | |
12367+
104 | 1040 | data_104 | | |
12368+
105 | 1050 | data_105 | | |
12369+
106 | 1060 | data_106 | | |
12370+
107 | 1070 | data_107 | | |
12371+
108 | 1080 | data_108 | | |
12372+
109 | 1090 | data_109 | | |
12373+
110 | 1100 | data_110 | | |
12374+
(20 rows)
12375+
12376+
-- Test pruning with async Merge Append
12377+
DELETE FROM distr2;
12378+
INSERT INTO distr2
12379+
SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
12380+
DEALLOCATE ALL;
12381+
SET plan_cache_mode TO force_generic_plan;
12382+
PREPARE async_pt_query (int, int) AS
12383+
SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
12384+
ORDER BY i,j
12385+
LIMIT 10;
12386+
EXPLAIN (VERBOSE, COSTS OFF)
12387+
EXECUTE async_pt_query(1, 1);
12388+
QUERY PLAN
12389+
------------------------------------------------------------------------------------------------------------------------------------------------------------
12390+
Limit
12391+
Output: distr2.i, distr2.j, distr2.k
12392+
-> Merge Append
12393+
Sort Key: distr2.i, distr2.j
12394+
Subplans Removed: 1
12395+
-> Async Foreign Scan on public.distr2_p1 distr2_1
12396+
Output: distr2_1.i, distr2_1.j, distr2_1.k
12397+
Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST
12398+
(8 rows)
12399+
12400+
EXECUTE async_pt_query(1, 1);
12401+
i | j | k
12402+
---+-----+---------
12403+
1 | 10 | data_1
12404+
1 | 110 | data_11
12405+
1 | 210 | data_21
12406+
1 | 310 | data_31
12407+
1 | 410 | data_41
12408+
1 | 510 | data_51
12409+
1 | 610 | data_61
12410+
1 | 710 | data_71
12411+
1 | 810 | data_81
12412+
1 | 910 | data_91
12413+
(10 rows)
12414+
12415+
RESET plan_cache_mode;
12416+
RESET enable_partitionwise_join;
12417+
DROP TABLE distr1, distr2, base1, base2, base3, base4;
1217112418
ALTER SERVER loopback OPTIONS (DROP async_capable);
1217212419
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
1217312420
-- ===================================================================

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7291,12 +7291,16 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
72917291
ForeignScanState *node = (ForeignScanState *) areq->requestee;
72927292
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
72937293
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
7294-
AppendState *requestor = (AppendState *) areq->requestor;
7295-
WaitEventSet *set = requestor->as_eventset;
7294+
PlanState *requestor = areq->requestor;
7295+
WaitEventSet *set;
7296+
Bitmapset *needrequest;
72967297

72977298
/* This should not be called unless callback_pending */
72987299
Assert(areq->callback_pending);
72997300

7301+
set = GetAppendEventSet(requestor);
7302+
needrequest = GetNeedRequest(requestor);
7303+
73007304
/*
73017305
* If process_pending_request() has been invoked on the given request
73027306
* before we get here, we might have some tuples already; in which case
@@ -7334,7 +7338,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
73347338
* below, because we might otherwise end up with no configured events
73357339
* other than the postmaster death event.
73367340
*/
7337-
if (!bms_is_empty(requestor->as_needrequest))
7341+
if (!bms_is_empty(needrequest))
73387342
return;
73397343
if (GetNumRegisteredWaitEvents(set) > 1)
73407344
return;

0 commit comments

Comments
 (0)