diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 85252cbdbcf1..5151fa1abe31 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -11308,6 +11308,46 @@ SELECT * FROM result_tbl ORDER BY a; (2 rows) DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------ + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(8 rows) + +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; + a | b | c +------+-----+------ + 1000 | 0 | 0000 + 2000 | 0 | 0000 + 1100 | 100 | 0100 + 2100 | 100 | 0100 + 1200 | 200 | 0200 + 2200 | 200 | 0200 + 1300 | 300 | 0300 + 2300 | 300 | 0300 + 1400 | 400 | 0400 + 2400 | 400 | 0400 + 1500 | 500 | 0500 + 2500 | 500 | 0500 + 1600 | 600 | 0600 + 2600 | 600 | 0600 + 1700 | 700 | 0700 + 2700 | 700 | 0700 + 1800 | 800 | 0800 + 2800 | 800 | 0800 + 1900 | 900 | 0900 + 2900 | 900 | 0900 +(20 rows) + -- Test error handling, if accessing one of the foreign partitions errors out CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001) SERVER loopback OPTIONS (table_name 'non_existent_table'); @@ -11351,6 +11391,35 @@ SELECT * FROM result_tbl ORDER BY a; (3 rows) DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(14 rows) + +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + DROP FOREIGN TABLE async_p3; DROP TABLE base_tbl3; -- Check case where the partitioned table has local/remote partitions @@ -11386,6 +11455,37 @@ SELECT * FROM result_tbl ORDER BY a; (3 rows) DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Sort + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Sort Key: async_pt_3.b, async_pt_3.a + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) +(16 rows) + +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + -- partitionwise joins SET enable_partitionwise_join TO true; CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text); @@ -12168,6 +12268,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f DROP FOREIGN TABLE foreign_tbl CASCADE; NOTICE: drop cascades to foreign table foreign_tbl2 DROP TABLE base_tbl; +-- Test async Merge Append +CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base1 (i int, j int, k text); +CREATE TABLE base2 (i int, j int, k text); +CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base1'); +CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base2'); +CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base3 (i int, j int, k text); +CREATE TABLE base4 (i int, j int, k text); +CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base3'); +CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base4'); +INSERT INTO distr1 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i; +INSERT INTO distr2 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i; +ANALYZE distr1_p1; +ANALYZE distr1_p2; +ANALYZE distr2_p1; +ANALYZE distr2_p2; +SET enable_partitionwise_join TO ON; +-- Test joins with async Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k + -> Merge Append + Sort Key: distr1.i + -> Async Foreign Scan + Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k + Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1) + Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST + -> Async Foreign Scan + Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k + Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2) + Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST +(12 rows) + +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; + i | j | k | i | j | k +----+-----+---------+----+-----+--------- + 10 | 100 | data_10 | 10 | 100 | data_10 + 11 | 110 | data_11 | 11 | 110 | data_11 + 12 | 120 | data_12 | 12 | 120 | data_12 + 13 | 130 | data_13 | 13 | 130 | data_13 + 14 | 140 | data_14 | 14 | 140 | data_14 + 15 | 150 | data_15 | 15 | 150 | data_15 + 16 | 160 | data_16 | 16 | 160 | data_16 + 17 | 170 | data_17 | 17 | 170 | data_17 + 18 | 180 | data_18 | 18 | 180 | data_18 + 19 | 190 | data_19 | 19 | 190 | data_19 +(10 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k + -> Merge Append + Sort Key: distr1.i + -> Async Foreign Scan + Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k + Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1) + Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST + -> Async Foreign Scan + Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k + Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2) + Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST +(12 rows) + +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; + i | j | k | i | j | k +-----+------+----------+-----+------+---------- + 91 | 910 | data_91 | 91 | 910 | data_91 + 92 | 920 | data_92 | 92 | 920 | data_92 + 93 | 930 | data_93 | 93 | 930 | data_93 + 94 | 940 | data_94 | 94 | 940 | data_94 + 95 | 950 | data_95 | 95 | 950 | data_95 + 96 | 960 | data_96 | 96 | 960 | data_96 + 97 | 970 | data_97 | 97 | 970 | data_97 + 98 | 980 | data_98 | 98 | 980 | data_98 + 99 | 990 | data_99 | 99 | 990 | data_99 + 100 | 1000 | data_100 | 100 | 1000 | data_100 + 101 | 1010 | data_101 | | | + 102 | 1020 | data_102 | | | + 103 | 1030 | data_103 | | | + 104 | 1040 | data_104 | | | + 105 | 1050 | data_105 | | | + 106 | 1060 | data_106 | | | + 107 | 1070 | data_107 | | | + 108 | 1080 | data_108 | | | + 109 | 1090 | data_109 | | | + 110 | 1100 | data_110 | | | +(20 rows) + +-- Test pruning with async Merge Append +DELETE FROM distr2; +INSERT INTO distr2 +SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i; +DEALLOCATE ALL; +SET plan_cache_mode TO force_generic_plan; +PREPARE async_pt_query (int, int) AS + SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2]) + ORDER BY i,j + LIMIT 10; +EXPLAIN (VERBOSE, COSTS OFF) + EXECUTE async_pt_query(1, 1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------ + Limit + Output: distr2.i, distr2.j, distr2.k + -> Merge Append + Sort Key: distr2.i, distr2.j + Subplans Removed: 1 + -> Async Foreign Scan on public.distr2_p1 distr2_1 + Output: distr2_1.i, distr2_1.j, distr2_1.k + Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST +(8 rows) + +EXECUTE async_pt_query(1, 1); + i | j | k +---+-----+--------- + 1 | 10 | data_1 + 1 | 110 | data_11 + 1 | 210 | data_21 + 1 | 310 | data_31 + 1 | 410 | data_41 + 1 | 510 | data_51 + 1 | 610 | data_61 + 1 | 710 | data_71 + 1 | 810 | data_81 + 1 | 910 | data_91 +(10 rows) + +RESET plan_cache_mode; +RESET enable_partitionwise_join; +DROP TABLE distr1, distr2, base1, base2, base3, base4; ALTER SERVER loopback OPTIONS (DROP async_capable); ALTER SERVER loopback2 OPTIONS (DROP async_capable); -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index b92e2a0fc9f8..b38f80cf3df7 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -7291,12 +7291,16 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; - AppendState *requestor = (AppendState *) areq->requestor; - WaitEventSet *set = requestor->as_eventset; + PlanState *requestor = areq->requestor; + WaitEventSet *set; + Bitmapset *needrequest; /* This should not be called unless callback_pending */ Assert(areq->callback_pending); + set = GetAppendEventSet(requestor); + needrequest = GetNeedRequest(requestor); + /* * If process_pending_request() has been invoked on the given request * before we get here, we might have some tuples already; in which case @@ -7334,7 +7338,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) * below, because we might otherwise end up with no configured events * other than the postmaster death event. */ - if (!bms_is_empty(requestor->as_needrequest)) + if (!bms_is_empty(needrequest)) return; if (GetNumRegisteredWaitEvents(set) > 1) return; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index b58ab6ee5867..45b68eb9bc38 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3814,6 +3814,11 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505; SELECT * FROM result_tbl ORDER BY a; DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; + -- Test error handling, if accessing one of the foreign partitions errors out CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001) SERVER loopback OPTIONS (table_name 'non_existent_table'); @@ -3834,6 +3839,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; SELECT * FROM result_tbl ORDER BY a; DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + DROP FOREIGN TABLE async_p3; DROP TABLE base_tbl3; @@ -3849,6 +3859,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; SELECT * FROM result_tbl ORDER BY a; DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + -- partitionwise joins SET enable_partitionwise_join TO true; @@ -4087,6 +4102,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f DROP FOREIGN TABLE foreign_tbl CASCADE; DROP TABLE base_tbl; +-- Test async Merge Append +CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base1 (i int, j int, k text); +CREATE TABLE base2 (i int, j int, k text); +CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base1'); +CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base2'); + +CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base3 (i int, j int, k text); +CREATE TABLE base4 (i int, j int, k text); +CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base3'); +CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base4'); + +INSERT INTO distr1 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i; + +INSERT INTO distr2 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i; + +ANALYZE distr1_p1; +ANALYZE distr1_p2; +ANALYZE distr2_p1; +ANALYZE distr2_p2; + +SET enable_partitionwise_join TO ON; + +-- Test joins with async Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; + +-- Test pruning with async Merge Append +DELETE FROM distr2; +INSERT INTO distr2 +SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i; + +DEALLOCATE ALL; +SET plan_cache_mode TO force_generic_plan; +PREPARE async_pt_query (int, int) AS + SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2]) + ORDER BY i,j + LIMIT 10; +EXPLAIN (VERBOSE, COSTS OFF) + EXECUTE async_pt_query(1, 1); +EXECUTE async_pt_query(1, 1); +RESET plan_cache_mode; + +RESET enable_partitionwise_join; + +DROP TABLE distr1, distr2, base1, base2, base3, base4; + ALTER SERVER loopback OPTIONS (DROP async_capable); ALTER SERVER loopback2 OPTIONS (DROP async_capable); diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a782f1099824..cc04ffac462d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5211,6 +5211,20 @@ ANY num_sync ( + enable_async_merge_append (boolean) + + enable_async_merge_append configuration parameter + + + + + Enables or disables the query planner's use of async-aware + merge append plan types. The default is on. + + + + enable_bitmapscan (boolean) diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index 5d3cabe73e34..6dc19ebc3744 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -17,6 +17,7 @@ #include "executor/execAsync.h" #include "executor/executor.h" #include "executor/nodeAppend.h" +#include "executor/nodeMergeAppend.h" #include "executor/nodeForeignscan.h" /* @@ -121,6 +122,9 @@ ExecAsyncResponse(AsyncRequest *areq) case T_AppendState: ExecAsyncAppendResponse(areq); break; + case T_MergeAppendState: + ExecAsyncMergeAppendResponse(areq); + break; default: /* If the node doesn't support async, caller messed up. */ elog(ERROR, "unrecognized node type: %d", diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index e152c9ee3a07..d081e3a7ba3b 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -39,10 +39,15 @@ #include "postgres.h" #include "executor/executor.h" +#include "executor/execAsync.h" #include "executor/execPartition.h" #include "executor/nodeMergeAppend.h" #include "lib/binaryheap.h" #include "miscadmin.h" +#include "storage/latch.h" +#include "utils/wait_event.h" + +#define EVENT_BUFFER_SIZE 16 /* * We have one slot for each item in the heap array. We use SlotNumber @@ -54,6 +59,12 @@ typedef int32 SlotNumber; static TupleTableSlot *ExecMergeAppend(PlanState *pstate); static int heap_compare_slots(Datum a, Datum b, void *arg); +static void classify_matching_subplans(MergeAppendState *node); +static void ExecMergeAppendAsyncBegin(MergeAppendState *node); +static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan); +static bool ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan); +static void ExecMergeAppendAsyncEventWait(MergeAppendState *node); + /* ---------------------------------------------------------------- * ExecInitMergeAppend @@ -71,6 +82,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) int nplans; int i, j; + Bitmapset *asyncplans; + int nasyncplans; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -105,7 +118,10 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) * later calls to ExecFindMatchingSubPlans. */ if (!prunestate->do_exec_prune && nplans > 0) + { mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1); + mergestate->ms_valid_subplans_identified = true; + } } else { @@ -118,6 +134,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) Assert(nplans > 0); mergestate->ms_valid_subplans = validsubplans = bms_add_range(NULL, 0, nplans - 1); + mergestate->ms_valid_subplans_identified = true; mergestate->ms_prune_state = NULL; } @@ -134,11 +151,25 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) * the results into the mergeplanstates array. */ j = 0; + asyncplans = NULL; + nasyncplans = 0; + i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->mergeplans, i); + /* + * Record async subplans. When executing EvalPlanQual, we treat them + * as sync ones; don't do this when initializing an EvalPlanQual plan + * tree. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags); } @@ -169,6 +200,45 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) */ mergestate->ps.ps_ProjInfo = NULL; + /* Initialize async state */ + mergestate->ms_asyncplans = asyncplans; + mergestate->ms_nasyncplans = nasyncplans; + mergestate->ms_asyncrequests = NULL; + mergestate->ms_asyncresults = NULL; + mergestate->ms_has_asyncresults = NULL; + mergestate->ms_asyncremain = NULL; + mergestate->ms_needrequest = NULL; + mergestate->ms_eventset = NULL; + mergestate->ms_valid_asyncplans = NULL; + + if (nasyncplans > 0) + { + mergestate->ms_asyncrequests = (AsyncRequest **) + palloc0(nplans * sizeof(AsyncRequest *)); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc(sizeof(AsyncRequest)); + areq->requestor = (PlanState *) mergestate; + areq->requestee = mergeplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + mergestate->ms_asyncrequests[i] = areq; + } + + mergestate->ms_asyncresults = (TupleTableSlot **) + palloc0(nplans * sizeof(TupleTableSlot *)); + + if (mergestate->ms_valid_subplans_identified) + classify_matching_subplans(mergestate); + } + /* * initialize sort-key information */ @@ -230,9 +300,16 @@ ExecMergeAppend(PlanState *pstate) * run-time pruning is disabled then the valid subplans will always be * set to all subplans. */ - if (node->ms_valid_subplans == NULL) + if (!node->ms_valid_subplans_identified) + { node->ms_valid_subplans = ExecFindMatchingSubPlans(node->ms_prune_state, false); + node->ms_valid_subplans_identified = true; + } + + /* If there are any async subplans, begin executing them. */ + if (node->ms_nasyncplans > 0) + ExecMergeAppendAsyncBegin(node); /* * First time through: pull the first tuple from each valid subplan, @@ -245,6 +322,16 @@ ExecMergeAppend(PlanState *pstate) if (!TupIsNull(node->ms_slots[i])) binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); } + + /* Look at async subplans */ + i = -1; + while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0) + { + ExecMergeAppendAsyncGetNext(node, i); + if (!TupIsNull(node->ms_slots[i])) + binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); + } + binaryheap_build(node->ms_heap); node->ms_initialized = true; } @@ -259,7 +346,13 @@ ExecMergeAppend(PlanState *pstate) * to not pull tuples until necessary.) */ i = DatumGetInt32(binaryheap_first(node->ms_heap)); - node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + if (bms_is_member(i, node->ms_asyncplans)) + ExecMergeAppendAsyncGetNext(node, i); + else + { + Assert(bms_is_member(i, node->ms_valid_subplans)); + node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + } if (!TupIsNull(node->ms_slots[i])) binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); else @@ -275,6 +368,8 @@ ExecMergeAppend(PlanState *pstate) { i = DatumGetInt32(binaryheap_first(node->ms_heap)); result = node->ms_slots[i]; + /* For async plan record that we can get the next tuple */ + node->ms_has_asyncresults = bms_del_member(node->ms_has_asyncresults, i); } return result; @@ -354,6 +449,7 @@ void ExecReScanMergeAppend(MergeAppendState *node) { int i; + int nasyncplans = node->ms_nasyncplans; /* * If any PARAM_EXEC Params used in pruning expressions have changed, then @@ -364,8 +460,11 @@ ExecReScanMergeAppend(MergeAppendState *node) bms_overlap(node->ps.chgParam, node->ms_prune_state->execparamids)) { + node->ms_valid_subplans_identified = false; bms_free(node->ms_valid_subplans); node->ms_valid_subplans = NULL; + bms_free(node->ms_valid_asyncplans); + node->ms_valid_asyncplans = NULL; } for (i = 0; i < node->ms_nplans; i++) @@ -386,6 +485,361 @@ ExecReScanMergeAppend(MergeAppendState *node) if (subnode->chgParam == NULL) ExecReScan(subnode); } + + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->ms_asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + bms_free(node->ms_asyncremain); + node->ms_asyncremain = NULL; + bms_free(node->ms_needrequest); + node->ms_needrequest = NULL; + bms_free(node->ms_has_asyncresults); + node->ms_has_asyncresults = NULL; + } binaryheap_reset(node->ms_heap); node->ms_initialized = false; } + +/* ---------------------------------------------------------------- + * classify_matching_subplans + * + * Classify the node's ms_valid_subplans into sync ones and + * async ones, adjust it to contain sync ones only, and save + * async ones in the node's ms_valid_asyncplans. + * ---------------------------------------------------------------- + */ +static void +classify_matching_subplans(MergeAppendState *node) +{ + Bitmapset *valid_asyncplans; + + Assert(node->ms_valid_subplans_identified); + Assert(node->ms_valid_asyncplans == NULL); + + /* Nothing to do if there are no valid subplans. */ + if (bms_is_empty(node->ms_valid_subplans)) + { + node->ms_asyncremain = NULL; + return; + } + + /* Nothing to do if there are no valid async subplans. */ + if (!bms_overlap(node->ms_valid_subplans, node->ms_asyncplans)) + { + node->ms_asyncremain = NULL; + return; + } + + /* Get valid async subplans. */ + valid_asyncplans = bms_intersect(node->ms_asyncplans, + node->ms_valid_subplans); + + /* Adjust the valid subplans to contain sync subplans only. */ + node->ms_valid_subplans = bms_del_members(node->ms_valid_subplans, + valid_asyncplans); + + /* Save valid async subplans. */ + node->ms_valid_asyncplans = valid_asyncplans; +} + +/* ---------------------------------------------------------------- + * ExecMergeAppendAsyncBegin + * + * Begin executing designed async-capable subplans. + * ---------------------------------------------------------------- + */ +static void +ExecMergeAppendAsyncBegin(MergeAppendState *node) +{ + int i; + + /* Backward scan is not supported by async-aware MergeAppends. */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no subplans */ + Assert(node->ms_nplans > 0); + + /* We should never be called when there are no async subplans. */ + Assert(node->ms_nasyncplans > 0); + + /* If we've yet to determine the valid subplans then do so now. */ + if (!node->ms_valid_subplans_identified) + { + node->ms_valid_subplans = + ExecFindMatchingSubPlans(node->ms_prune_state, false); + node->ms_valid_subplans_identified = true; + + classify_matching_subplans(node); + } + + /* Initialize state variables. */ + node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans); + + /* Nothing to do if there are no valid async subplans. */ + if (bms_is_empty(node->ms_asyncremain)) + return; + + /* Make a request for each of the valid async subplans. */ + i = -1; + while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->ms_asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } +} + +/* ---------------------------------------------------------------- + * ExecMergeAppendAsyncGetNext + * + * Get the next tuple from specified asynchronous subplan. + * ---------------------------------------------------------------- + */ +static void +ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan) +{ + node->ms_slots[mplan] = NULL; + + /* Request a tuple asynchronously. */ + if (ExecMergeAppendAsyncRequest(node, mplan)) + return; + + /* + * node->ms_asyncremain can be NULL if we have fetched tuples, but haven't + * returned them yet. In this case ExecMergeAppendAsyncRequest() above just + * returns tuples without performing a request. + */ + while (bms_is_member(mplan, node->ms_asyncremain)) + { + CHECK_FOR_INTERRUPTS(); + + /* Wait or poll for async events. */ + ExecMergeAppendAsyncEventWait(node); + + /* Request a tuple asynchronously. */ + if (ExecMergeAppendAsyncRequest(node, mplan)) + return; + + /* + * Waiting until there's no async requests pending or we got some + * tuples from our request + */ + } + + /* No tuples */ + return; +} + +/* ---------------------------------------------------------------- + * ExecMergeAppendAsyncRequest + * + * Request a tuple asynchronously. + * ---------------------------------------------------------------- + */ +static bool +ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan) +{ + Bitmapset *needrequest; + int i; + + /* + * If we've already fetched necessary data, just return it + */ + if (bms_is_member(mplan, node->ms_has_asyncresults)) + { + node->ms_slots[mplan] = node->ms_asyncresults[mplan]; + return true; + } + + /* + * Get a list of members which can process request and don't have data + * ready. + */ + needrequest = NULL; + i = -1; + while ((i = bms_next_member(node->ms_needrequest, i)) >= 0) + { + if (!bms_is_member(i, node->ms_has_asyncresults)) + needrequest = bms_add_member(needrequest, i); + } + + /* + * If there's no members, which still need request, no need to send it. + */ + if (bms_is_empty(needrequest)) + return false; + + /* Clear ms_needrequest flag, as we are going to send requests now */ + node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest); + + /* Make a new request for each of the async subplans that need it. */ + i = -1; + while ((i = bms_next_member(needrequest, i)) >= 0) + { + AsyncRequest *areq = node->ms_asyncrequests[i]; + + /* + * We've just checked that subplan doesn't already have some fetched + * data + */ + Assert(!bms_is_member(i, node->ms_has_asyncresults)); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } + bms_free(needrequest); + + /* Return needed asynchronously-generated results if any. */ + if (bms_is_member(mplan, node->ms_has_asyncresults)) + { + node->ms_slots[mplan] = node->ms_asyncresults[mplan]; + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAsyncMergeAppendResponse + * + * Receive a response from an asynchronous request we made. + * ---------------------------------------------------------------- + */ +void +ExecAsyncMergeAppendResponse(AsyncRequest *areq) +{ + MergeAppendState *node = (MergeAppendState *) areq->requestor; + TupleTableSlot *slot = areq->result; + + /* The result should be a TupleTableSlot or NULL. */ + Assert(slot == NULL || IsA(slot, TupleTableSlot)); + Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults)); + + node->ms_asyncresults[areq->request_index] = NULL; + /* Nothing to do if the request is pending. */ + if (!areq->request_complete) + { + /* The request would have been pending for a callback. */ + Assert(areq->callback_pending); + return; + } + + /* If the result is NULL or an empty slot, there's nothing more to do. */ + if (TupIsNull(slot)) + { + /* The ending subplan wouldn't have been pending for a callback. */ + Assert(!areq->callback_pending); + node->ms_asyncremain = bms_del_member(node->ms_asyncremain, areq->request_index); + return; + } + + node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults, areq->request_index); + /* Save result so we can return it. */ + node->ms_asyncresults[areq->request_index] = slot; + + /* + * Mark the subplan that returned a result as ready for a new request. We + * don't launch another one here immediately because it might complete. + */ + node->ms_needrequest = bms_add_member(node->ms_needrequest, + areq->request_index); +} + +/* ---------------------------------------------------------------- + * ExecMergeAppendAsyncEventWait + * + * Wait or poll for file descriptor events and fire callbacks. + * ---------------------------------------------------------------- + */ +static void +ExecMergeAppendAsyncEventWait(MergeAppendState *node) +{ + int nevents = node->ms_nasyncplans + 1; /* one for PM death */ + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred; + int i; + + /* We should never be called when there are no valid async subplans. */ + Assert(bms_num_members(node->ms_asyncremain) > 0); + + node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); + AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->ms_asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* + * No need for further processing if there are no configured events other + * than the postmaster death event. + */ + if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1) + { + FreeWaitEventSet(node->ms_eventset); + node->ms_eventset = NULL; + return; + } + + /* We wait on at most EVENT_BUFFER_SIZE events. */ + if (nevents > EVENT_BUFFER_SIZE) + nevents = EVENT_BUFFER_SIZE; + + /* + * Wait until at least one event occurs. + */ + noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */, occurred_event, + nevents, WAIT_EVENT_APPEND_READY); + FreeWaitEventSet(node->ms_eventset); + node->ms_eventset = NULL; + if (noccurred == 0) + return; + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + if (areq->callback_pending) + { + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } + } +} diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index ec004ed9493a..8c7746d7d876 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -163,6 +163,7 @@ bool enable_parallel_hash = true; bool enable_partition_pruning = true; bool enable_presorted_aggregate = true; bool enable_async_append = true; +bool enable_async_merge_append = true; typedef struct { diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 1106cd85f0c2..3e5eb19442e5 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1149,10 +1149,10 @@ mark_async_capable_plan(Plan *plan, Path *path) SubqueryScan *scan_plan = (SubqueryScan *) plan; /* - * If the generated plan node includes a gating Result node, - * we can't execute it asynchronously. + * If the generated plan node includes a gating Result node or + * a Sort node, we can't execute it asynchronously. */ - if (IsA(plan, Result)) + if (IsA(plan, Result) || IsA(plan, Sort)) return false; /* @@ -1170,10 +1170,10 @@ mark_async_capable_plan(Plan *plan, Path *path) FdwRoutine *fdwroutine = path->parent->fdwroutine; /* - * If the generated plan node includes a gating Result node, - * we can't execute it asynchronously. + * If the generated plan node includes a gating Result node or + * a Sort node, we can't execute it asynchronously. */ - if (IsA(plan, Result)) + if (IsA(plan, Result) || IsA(plan, Sort)) return false; Assert(fdwroutine != NULL); @@ -1186,9 +1186,9 @@ mark_async_capable_plan(Plan *plan, Path *path) /* * If the generated plan node includes a Result node for the - * projection, we can't execute it asynchronously. + * projection or a Sort node, we can't execute it asynchronously. */ - if (IsA(plan, Result)) + if (IsA(plan, Result) || IsA(plan, Sort)) return false; /* @@ -1450,6 +1450,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, ListCell *subpaths; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; + bool consider_async = false; /* * We don't have the actual creation of the MergeAppend node split out @@ -1464,6 +1465,9 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, plan->righttree = NULL; node->apprelids = rel->relids; + consider_async = (enable_async_merge_append && + !best_path->path.parallel_safe && + list_length(best_path->subpaths) > 1); /* * Compute sort column info, and adjust MergeAppend's tlist as needed. * Because we pass adjust_tlist_in_place = true, we may ignore the @@ -1539,6 +1543,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, subplan = (Plan *) sort; } + /* If needed, check to see if subplan can be executed asynchronously */ + if (consider_async) + mark_async_capable_plan(subplan, subpath); + subplans = lappend(subplans, subplan); } diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 38cb9e970d5b..4f477e39ed83 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -987,6 +987,16 @@ struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_async_merge_append", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of async merge append plans."), + NULL, + GUC_EXPLAIN + }, + &enable_async_merge_append, + true, + NULL, NULL, NULL + }, { {"enable_group_by_reordering", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables reordering of GROUP BY keys."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 079efa1baa79..7f97d086e7b8 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -386,6 +386,7 @@ # - Planner Method Configuration - #enable_async_append = on +#enable_async_merge_append = on #enable_bitmapscan = on #enable_gathermerge = on #enable_hashagg = on diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h index 4eb05dc30d61..e3fdb26ece61 100644 --- a/src/include/executor/nodeMergeAppend.h +++ b/src/include/executor/nodeMergeAppend.h @@ -19,5 +19,6 @@ extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags); extern void ExecEndMergeAppend(MergeAppendState *node); extern void ExecReScanMergeAppend(MergeAppendState *node); +extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq); #endif /* NODEMERGEAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index d0f2dca59286..a2f5d877f7a4 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1512,10 +1512,66 @@ typedef struct MergeAppendState TupleTableSlot **ms_slots; /* array of length ms_nplans */ struct binaryheap *ms_heap; /* binary heap of slot indices */ bool ms_initialized; /* are subplans started? */ + Bitmapset *ms_asyncplans; /* asynchronous plans indexes */ + int ms_nasyncplans; /* # of asynchronous plans */ + AsyncRequest **ms_asyncrequests; /* array of AsyncRequests */ + TupleTableSlot **ms_asyncresults; /* unreturned results of async plans */ + Bitmapset *ms_has_asyncresults; /* plans which have async results */ + Bitmapset *ms_asyncremain; /* remaining asynchronous plans */ + Bitmapset *ms_needrequest; /* asynchronous plans needing a new request */ + struct WaitEventSet *ms_eventset; /* WaitEventSet used to configure file + * descriptor wait events */ struct PartitionPruneState *ms_prune_state; + bool ms_valid_subplans_identified; /* is ms_valid_subplans valid? */ Bitmapset *ms_valid_subplans; + Bitmapset *ms_valid_asyncplans; /* valid asynchronous plans indexes */ } MergeAppendState; +/* Getters for AppendState and MergeAppendState */ +static inline struct WaitEventSet * +GetAppendEventSet(PlanState *ps) +{ + Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState)); + + if (IsA(ps, AppendState)) + return ((AppendState *)ps)->as_eventset; + else + return ((MergeAppendState *)ps)->ms_eventset; +} + +static inline Bitmapset * +GetNeedRequest(PlanState *ps) +{ + Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState)); + + if (IsA(ps, AppendState)) + return ((AppendState *)ps)->as_needrequest; + else + return ((MergeAppendState *)ps)->ms_needrequest; +} + +static inline Bitmapset * +GetValidAsyncplans(PlanState *ps) +{ + Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState)); + + if (IsA(ps, AppendState)) + return ((AppendState *)ps)->as_valid_asyncplans; + else + return ((MergeAppendState *)ps)->ms_valid_asyncplans; +} + +static inline AsyncRequest* +GetValidAsyncRequest(PlanState *ps, int nreq) +{ + Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState)); + + if (IsA(ps, AppendState)) + return ((AppendState *)ps)->as_asyncrequests[nreq]; + else + return ((MergeAppendState *)ps)->ms_asyncrequests[nreq]; +} + /* ---------------- * RecursiveUnionState information * diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 3aa3c16e4422..f1977f83c2c6 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash; extern PGDLLIMPORT bool enable_partition_pruning; extern PGDLLIMPORT bool enable_presorted_aggregate; extern PGDLLIMPORT bool enable_async_append; +extern PGDLLIMPORT bool enable_async_merge_append; extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 352abc0bd424..1d40bab2c6a4 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -149,6 +149,7 @@ select name, setting from pg_settings where name like 'enable%'; name | setting --------------------------------+--------- enable_async_append | on + enable_async_merge_append | on enable_bitmapscan | on enable_distinct_reordering | on enable_gathermerge | on