diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 85252cbdbcf1..5151fa1abe31 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11308,6 +11308,46 @@ SELECT * FROM result_tbl ORDER BY a;
(2 rows)
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(8 rows)
+
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1000 | 0 | 0000
+ 2000 | 0 | 0000
+ 1100 | 100 | 0100
+ 2100 | 100 | 0100
+ 1200 | 200 | 0200
+ 2200 | 200 | 0200
+ 1300 | 300 | 0300
+ 2300 | 300 | 0300
+ 1400 | 400 | 0400
+ 2400 | 400 | 0400
+ 1500 | 500 | 0500
+ 2500 | 500 | 0500
+ 1600 | 600 | 0600
+ 2600 | 600 | 0600
+ 1700 | 700 | 0700
+ 2700 | 700 | 0700
+ 1800 | 800 | 0800
+ 2800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2900 | 900 | 0900
+(20 rows)
+
-- Test error handling, if accessing one of the foreign partitions errors out
CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -11351,6 +11391,35 @@ SELECT * FROM result_tbl ORDER BY a;
(3 rows)
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(14 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
-- Check case where the partitioned table has local/remote partitions
@@ -11386,6 +11455,37 @@ SELECT * FROM result_tbl ORDER BY a;
(3 rows)
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------
+ Merge Append
+ Sort Key: async_pt.b, async_pt.a
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+ -> Sort
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Sort Key: async_pt_3.b, async_pt_3.a
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+(16 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
-- partitionwise joins
SET enable_partitionwise_join TO true;
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
@@ -12168,6 +12268,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
DROP FOREIGN TABLE foreign_tbl CASCADE;
NOTICE: drop cascades to foreign table foreign_tbl2
DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+SET enable_partitionwise_join TO ON;
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr1.i
+ -> Async Foreign Scan
+ Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+ Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1)
+ Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST
+ -> Async Foreign Scan
+ Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+ Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2)
+ Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ i | j | k | i | j | k
+----+-----+---------+----+-----+---------
+ 10 | 100 | data_10 | 10 | 100 | data_10
+ 11 | 110 | data_11 | 11 | 110 | data_11
+ 12 | 120 | data_12 | 12 | 120 | data_12
+ 13 | 130 | data_13 | 13 | 130 | data_13
+ 14 | 140 | data_14 | 14 | 140 | data_14
+ 15 | 150 | data_15 | 15 | 150 | data_15
+ 16 | 160 | data_16 | 16 | 160 | data_16
+ 17 | 170 | data_17 | 17 | 170 | data_17
+ 18 | 180 | data_18 | 18 | 180 | data_18
+ 19 | 190 | data_19 | 19 | 190 | data_19
+(10 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+ QUERY PLAN
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr1.i
+ -> Async Foreign Scan
+ Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+ Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1)
+ Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST
+ -> Async Foreign Scan
+ Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+ Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2)
+ Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+ i | j | k | i | j | k
+-----+------+----------+-----+------+----------
+ 91 | 910 | data_91 | 91 | 910 | data_91
+ 92 | 920 | data_92 | 92 | 920 | data_92
+ 93 | 930 | data_93 | 93 | 930 | data_93
+ 94 | 940 | data_94 | 94 | 940 | data_94
+ 95 | 950 | data_95 | 95 | 950 | data_95
+ 96 | 960 | data_96 | 96 | 960 | data_96
+ 97 | 970 | data_97 | 97 | 970 | data_97
+ 98 | 980 | data_98 | 98 | 980 | data_98
+ 99 | 990 | data_99 | 99 | 990 | data_99
+ 100 | 1000 | data_100 | 100 | 1000 | data_100
+ 101 | 1010 | data_101 | | |
+ 102 | 1020 | data_102 | | |
+ 103 | 1030 | data_103 | | |
+ 104 | 1040 | data_104 | | |
+ 105 | 1050 | data_105 | | |
+ 106 | 1060 | data_106 | | |
+ 107 | 1070 | data_107 | | |
+ 108 | 1080 | data_108 | | |
+ 109 | 1090 | data_109 | | |
+ 110 | 1100 | data_110 | | |
+(20 rows)
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+ ORDER BY i,j
+ LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+ EXECUTE async_pt_query(1, 1);
+ QUERY PLAN
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+ Output: distr2.i, distr2.j, distr2.k
+ -> Merge Append
+ Sort Key: distr2.i, distr2.j
+ Subplans Removed: 1
+ -> Async Foreign Scan on public.distr2_p1 distr2_1
+ Output: distr2_1.i, distr2_1.j, distr2_1.k
+ Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST
+(8 rows)
+
+EXECUTE async_pt_query(1, 1);
+ i | j | k
+---+-----+---------
+ 1 | 10 | data_1
+ 1 | 110 | data_11
+ 1 | 210 | data_21
+ 1 | 310 | data_31
+ 1 | 410 | data_41
+ 1 | 510 | data_51
+ 1 | 610 | data_61
+ 1 | 710 | data_71
+ 1 | 810 | data_81
+ 1 | 910 | data_91
+(10 rows)
+
+RESET plan_cache_mode;
+RESET enable_partitionwise_join;
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
-- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index b92e2a0fc9f8..b38f80cf3df7 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7291,12 +7291,16 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
- AppendState *requestor = (AppendState *) areq->requestor;
- WaitEventSet *set = requestor->as_eventset;
+ PlanState *requestor = areq->requestor;
+ WaitEventSet *set;
+ Bitmapset *needrequest;
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
+ set = GetAppendEventSet(requestor);
+ needrequest = GetNeedRequest(requestor);
+
/*
* If process_pending_request() has been invoked on the given request
* before we get here, we might have some tuples already; in which case
@@ -7334,7 +7338,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
* below, because we might otherwise end up with no configured events
* other than the postmaster death event.
*/
- if (!bms_is_empty(requestor->as_needrequest))
+ if (!bms_is_empty(needrequest))
return;
if (GetNumRegisteredWaitEvents(set) > 1)
return;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index b58ab6ee5867..45b68eb9bc38 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3814,6 +3814,11 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+
-- Test error handling, if accessing one of the foreign partitions errors out
CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -3834,6 +3839,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
DROP FOREIGN TABLE async_p3;
DROP TABLE base_tbl3;
@@ -3849,6 +3859,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
-- partitionwise joins
SET enable_partitionwise_join TO true;
@@ -4087,6 +4102,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
DROP FOREIGN TABLE foreign_tbl CASCADE;
DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+
+SET enable_partitionwise_join TO ON;
+
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+ ORDER BY i,j
+ LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+ EXECUTE async_pt_query(1, 1);
+EXECUTE async_pt_query(1, 1);
+RESET plan_cache_mode;
+
+RESET enable_partitionwise_join;
+
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
+
ALTER SERVER loopback OPTIONS (DROP async_capable);
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index a782f1099824..cc04ffac462d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5211,6 +5211,20 @@ ANY num_sync (
+ enable_async_merge_append (boolean)
+
+ enable_async_merge_append configuration parameter
+
+
+
+
+ Enables or disables the query planner's use of async-aware
+ merge append plan types. The default is on.
+
+
+
+
enable_bitmapscan (boolean)
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index 5d3cabe73e34..6dc19ebc3744 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -17,6 +17,7 @@
#include "executor/execAsync.h"
#include "executor/executor.h"
#include "executor/nodeAppend.h"
+#include "executor/nodeMergeAppend.h"
#include "executor/nodeForeignscan.h"
/*
@@ -121,6 +122,9 @@ ExecAsyncResponse(AsyncRequest *areq)
case T_AppendState:
ExecAsyncAppendResponse(areq);
break;
+ case T_MergeAppendState:
+ ExecAsyncMergeAppendResponse(areq);
+ break;
default:
/* If the node doesn't support async, caller messed up. */
elog(ERROR, "unrecognized node type: %d",
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e152c9ee3a07..d081e3a7ba3b 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -39,10 +39,15 @@
#include "postgres.h"
#include "executor/executor.h"
+#include "executor/execAsync.h"
#include "executor/execPartition.h"
#include "executor/nodeMergeAppend.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
+#include "storage/latch.h"
+#include "utils/wait_event.h"
+
+#define EVENT_BUFFER_SIZE 16
/*
* We have one slot for each item in the heap array. We use SlotNumber
@@ -54,6 +59,12 @@ typedef int32 SlotNumber;
static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
static int heap_compare_slots(Datum a, Datum b, void *arg);
+static void classify_matching_subplans(MergeAppendState *node);
+static void ExecMergeAppendAsyncBegin(MergeAppendState *node);
+static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan);
+static bool ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan);
+static void ExecMergeAppendAsyncEventWait(MergeAppendState *node);
+
/* ----------------------------------------------------------------
* ExecInitMergeAppend
@@ -71,6 +82,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
int nplans;
int i,
j;
+ Bitmapset *asyncplans;
+ int nasyncplans;
/* check for unsupported flags */
Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -105,7 +118,10 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
* later calls to ExecFindMatchingSubPlans.
*/
if (!prunestate->do_exec_prune && nplans > 0)
+ {
mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+ mergestate->ms_valid_subplans_identified = true;
+ }
}
else
{
@@ -118,6 +134,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
Assert(nplans > 0);
mergestate->ms_valid_subplans = validsubplans =
bms_add_range(NULL, 0, nplans - 1);
+ mergestate->ms_valid_subplans_identified = true;
mergestate->ms_prune_state = NULL;
}
@@ -134,11 +151,25 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
* the results into the mergeplanstates array.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
+
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
+ /*
+ * Record async subplans. When executing EvalPlanQual, we treat them
+ * as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
}
@@ -169,6 +200,45 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
*/
mergestate->ps.ps_ProjInfo = NULL;
+ /* Initialize async state */
+ mergestate->ms_asyncplans = asyncplans;
+ mergestate->ms_nasyncplans = nasyncplans;
+ mergestate->ms_asyncrequests = NULL;
+ mergestate->ms_asyncresults = NULL;
+ mergestate->ms_has_asyncresults = NULL;
+ mergestate->ms_asyncremain = NULL;
+ mergestate->ms_needrequest = NULL;
+ mergestate->ms_eventset = NULL;
+ mergestate->ms_valid_asyncplans = NULL;
+
+ if (nasyncplans > 0)
+ {
+ mergestate->ms_asyncrequests = (AsyncRequest **)
+ palloc0(nplans * sizeof(AsyncRequest *));
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) mergestate;
+ areq->requestee = mergeplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ mergestate->ms_asyncrequests[i] = areq;
+ }
+
+ mergestate->ms_asyncresults = (TupleTableSlot **)
+ palloc0(nplans * sizeof(TupleTableSlot *));
+
+ if (mergestate->ms_valid_subplans_identified)
+ classify_matching_subplans(mergestate);
+ }
+
/*
* initialize sort-key information
*/
@@ -230,9 +300,16 @@ ExecMergeAppend(PlanState *pstate)
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
- if (node->ms_valid_subplans == NULL)
+ if (!node->ms_valid_subplans_identified)
+ {
node->ms_valid_subplans =
ExecFindMatchingSubPlans(node->ms_prune_state, false);
+ node->ms_valid_subplans_identified = true;
+ }
+
+ /* If there are any async subplans, begin executing them. */
+ if (node->ms_nasyncplans > 0)
+ ExecMergeAppendAsyncBegin(node);
/*
* First time through: pull the first tuple from each valid subplan,
@@ -245,6 +322,16 @@ ExecMergeAppend(PlanState *pstate)
if (!TupIsNull(node->ms_slots[i]))
binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
}
+
+ /* Look at async subplans */
+ i = -1;
+ while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+ {
+ ExecMergeAppendAsyncGetNext(node, i);
+ if (!TupIsNull(node->ms_slots[i]))
+ binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+ }
+
binaryheap_build(node->ms_heap);
node->ms_initialized = true;
}
@@ -259,7 +346,13 @@ ExecMergeAppend(PlanState *pstate)
* to not pull tuples until necessary.)
*/
i = DatumGetInt32(binaryheap_first(node->ms_heap));
- node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ if (bms_is_member(i, node->ms_asyncplans))
+ ExecMergeAppendAsyncGetNext(node, i);
+ else
+ {
+ Assert(bms_is_member(i, node->ms_valid_subplans));
+ node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ }
if (!TupIsNull(node->ms_slots[i]))
binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
else
@@ -275,6 +368,8 @@ ExecMergeAppend(PlanState *pstate)
{
i = DatumGetInt32(binaryheap_first(node->ms_heap));
result = node->ms_slots[i];
+ /* For async plan record that we can get the next tuple */
+ node->ms_has_asyncresults = bms_del_member(node->ms_has_asyncresults, i);
}
return result;
@@ -354,6 +449,7 @@ void
ExecReScanMergeAppend(MergeAppendState *node)
{
int i;
+ int nasyncplans = node->ms_nasyncplans;
/*
* If any PARAM_EXEC Params used in pruning expressions have changed, then
@@ -364,8 +460,11 @@ ExecReScanMergeAppend(MergeAppendState *node)
bms_overlap(node->ps.chgParam,
node->ms_prune_state->execparamids))
{
+ node->ms_valid_subplans_identified = false;
bms_free(node->ms_valid_subplans);
node->ms_valid_subplans = NULL;
+ bms_free(node->ms_valid_asyncplans);
+ node->ms_valid_asyncplans = NULL;
}
for (i = 0; i < node->ms_nplans; i++)
@@ -386,6 +485,361 @@ ExecReScanMergeAppend(MergeAppendState *node)
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
+
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->ms_asyncremain);
+ node->ms_asyncremain = NULL;
+ bms_free(node->ms_needrequest);
+ node->ms_needrequest = NULL;
+ bms_free(node->ms_has_asyncresults);
+ node->ms_has_asyncresults = NULL;
+ }
binaryheap_reset(node->ms_heap);
node->ms_initialized = false;
}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's ms_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's ms_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(MergeAppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+
+ Assert(node->ms_valid_subplans_identified);
+ Assert(node->ms_valid_asyncplans == NULL);
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->ms_valid_subplans))
+ {
+ node->ms_asyncremain = NULL;
+ return;
+ }
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(node->ms_valid_subplans, node->ms_asyncplans))
+ {
+ node->ms_asyncremain = NULL;
+ return;
+ }
+
+ /* Get valid async subplans. */
+ valid_asyncplans = bms_intersect(node->ms_asyncplans,
+ node->ms_valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ node->ms_valid_subplans = bms_del_members(node->ms_valid_subplans,
+ valid_asyncplans);
+
+ /* Save valid async subplans. */
+ node->ms_valid_asyncplans = valid_asyncplans;
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncBegin
+ *
+ * Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncBegin(MergeAppendState *node)
+{
+ int i;
+
+ /* Backward scan is not supported by async-aware MergeAppends. */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ /* We should never be called when there are no subplans */
+ Assert(node->ms_nplans > 0);
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->ms_nasyncplans > 0);
+
+ /* If we've yet to determine the valid subplans then do so now. */
+ if (!node->ms_valid_subplans_identified)
+ {
+ node->ms_valid_subplans =
+ ExecFindMatchingSubPlans(node->ms_prune_state, false);
+ node->ms_valid_subplans_identified = true;
+
+ classify_matching_subplans(node);
+ }
+
+ /* Initialize state variables. */
+ node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans);
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (bms_is_empty(node->ms_asyncremain))
+ return;
+
+ /* Make a request for each of the valid async subplans. */
+ i = -1;
+ while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncGetNext
+ *
+ * Get the next tuple from specified asynchronous subplan.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan)
+{
+ node->ms_slots[mplan] = NULL;
+
+ /* Request a tuple asynchronously. */
+ if (ExecMergeAppendAsyncRequest(node, mplan))
+ return;
+
+ /*
+ * node->ms_asyncremain can be NULL if we have fetched tuples, but haven't
+ * returned them yet. In this case ExecMergeAppendAsyncRequest() above just
+ * returns tuples without performing a request.
+ */
+ while (bms_is_member(mplan, node->ms_asyncremain))
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll for async events. */
+ ExecMergeAppendAsyncEventWait(node);
+
+ /* Request a tuple asynchronously. */
+ if (ExecMergeAppendAsyncRequest(node, mplan))
+ return;
+
+ /*
+ * Waiting until there's no async requests pending or we got some
+ * tuples from our request
+ */
+ }
+
+ /* No tuples */
+ return;
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncRequest
+ *
+ * Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /*
+ * If we've already fetched necessary data, just return it
+ */
+ if (bms_is_member(mplan, node->ms_has_asyncresults))
+ {
+ node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+ return true;
+ }
+
+ /*
+ * Get a list of members which can process request and don't have data
+ * ready.
+ */
+ needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(node->ms_needrequest, i)) >= 0)
+ {
+ if (!bms_is_member(i, node->ms_has_asyncresults))
+ needrequest = bms_add_member(needrequest, i);
+ }
+
+ /*
+ * If there's no members, which still need request, no need to send it.
+ */
+ if (bms_is_empty(needrequest))
+ return false;
+
+ /* Clear ms_needrequest flag, as we are going to send requests now */
+ node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest);
+
+ /* Make a new request for each of the async subplans that need it. */
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ /*
+ * We've just checked that subplan doesn't already have some fetched
+ * data
+ */
+ Assert(!bms_is_member(i, node->ms_has_asyncresults));
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return needed asynchronously-generated results if any. */
+ if (bms_is_member(mplan, node->ms_has_asyncresults))
+ {
+ node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncMergeAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncMergeAppendResponse(AsyncRequest *areq)
+{
+ MergeAppendState *node = (MergeAppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+ Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults));
+
+ node->ms_asyncresults[areq->request_index] = NULL;
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /* The request would have been pending for a callback. */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ node->ms_asyncremain = bms_del_member(node->ms_asyncremain, areq->request_index);
+ return;
+ }
+
+ node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults, areq->request_index);
+ /* Save result so we can return it. */
+ node->ms_asyncresults[areq->request_index] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->ms_needrequest = bms_add_member(node->ms_needrequest,
+ areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncEventWait(MergeAppendState *node)
+{
+ int nevents = node->ms_nasyncplans + 1; /* one for PM death */
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(bms_num_members(node->ms_asyncremain) > 0);
+
+ node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
+ AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->ms_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /*
+ * No need for further processing if there are no configured events other
+ * than the postmaster death event.
+ */
+ if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1)
+ {
+ FreeWaitEventSet(node->ms_eventset);
+ node->ms_eventset = NULL;
+ return;
+ }
+
+ /* We wait on at most EVENT_BUFFER_SIZE events. */
+ if (nevents > EVENT_BUFFER_SIZE)
+ nevents = EVENT_BUFFER_SIZE;
+
+ /*
+ * Wait until at least one event occurs.
+ */
+ noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */, occurred_event,
+ nevents, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->ms_eventset);
+ node->ms_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ if (areq->callback_pending)
+ {
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+ }
+}
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ec004ed9493a..8c7746d7d876 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -163,6 +163,7 @@ bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
bool enable_presorted_aggregate = true;
bool enable_async_append = true;
+bool enable_async_merge_append = true;
typedef struct
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 1106cd85f0c2..3e5eb19442e5 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1149,10 +1149,10 @@ mark_async_capable_plan(Plan *plan, Path *path)
SubqueryScan *scan_plan = (SubqueryScan *) plan;
/*
- * If the generated plan node includes a gating Result node,
- * we can't execute it asynchronously.
+ * If the generated plan node includes a gating Result node or
+ * a Sort node, we can't execute it asynchronously.
*/
- if (IsA(plan, Result))
+ if (IsA(plan, Result) || IsA(plan, Sort))
return false;
/*
@@ -1170,10 +1170,10 @@ mark_async_capable_plan(Plan *plan, Path *path)
FdwRoutine *fdwroutine = path->parent->fdwroutine;
/*
- * If the generated plan node includes a gating Result node,
- * we can't execute it asynchronously.
+ * If the generated plan node includes a gating Result node or
+ * a Sort node, we can't execute it asynchronously.
*/
- if (IsA(plan, Result))
+ if (IsA(plan, Result) || IsA(plan, Sort))
return false;
Assert(fdwroutine != NULL);
@@ -1186,9 +1186,9 @@ mark_async_capable_plan(Plan *plan, Path *path)
/*
* If the generated plan node includes a Result node for the
- * projection, we can't execute it asynchronously.
+ * projection or a Sort node, we can't execute it asynchronously.
*/
- if (IsA(plan, Result))
+ if (IsA(plan, Result) || IsA(plan, Sort))
return false;
/*
@@ -1450,6 +1450,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
ListCell *subpaths;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
+ bool consider_async = false;
/*
* We don't have the actual creation of the MergeAppend node split out
@@ -1464,6 +1465,9 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
plan->righttree = NULL;
node->apprelids = rel->relids;
+ consider_async = (enable_async_merge_append &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
/*
* Compute sort column info, and adjust MergeAppend's tlist as needed.
* Because we pass adjust_tlist_in_place = true, we may ignore the
@@ -1539,6 +1543,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
subplan = (Plan *) sort;
}
+ /* If needed, check to see if subplan can be executed asynchronously */
+ if (consider_async)
+ mark_async_capable_plan(subplan, subpath);
+
subplans = lappend(subplans, subplan);
}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 38cb9e970d5b..4f477e39ed83 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -987,6 +987,16 @@ struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
+ {
+ {"enable_async_merge_append", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of async merge append plans."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_async_merge_append,
+ true,
+ NULL, NULL, NULL
+ },
{
{"enable_group_by_reordering", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables reordering of GROUP BY keys."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 079efa1baa79..7f97d086e7b8 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -386,6 +386,7 @@
# - Planner Method Configuration -
#enable_async_append = on
+#enable_async_merge_append = on
#enable_bitmapscan = on
#enable_gathermerge = on
#enable_hashagg = on
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index 4eb05dc30d61..e3fdb26ece61 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -19,5 +19,6 @@
extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
extern void ExecEndMergeAppend(MergeAppendState *node);
extern void ExecReScanMergeAppend(MergeAppendState *node);
+extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq);
#endif /* NODEMERGEAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index d0f2dca59286..a2f5d877f7a4 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1512,10 +1512,66 @@ typedef struct MergeAppendState
TupleTableSlot **ms_slots; /* array of length ms_nplans */
struct binaryheap *ms_heap; /* binary heap of slot indices */
bool ms_initialized; /* are subplans started? */
+ Bitmapset *ms_asyncplans; /* asynchronous plans indexes */
+ int ms_nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **ms_asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **ms_asyncresults; /* unreturned results of async plans */
+ Bitmapset *ms_has_asyncresults; /* plans which have async results */
+ Bitmapset *ms_asyncremain; /* remaining asynchronous plans */
+ Bitmapset *ms_needrequest; /* asynchronous plans needing a new request */
+ struct WaitEventSet *ms_eventset; /* WaitEventSet used to configure file
+ * descriptor wait events */
struct PartitionPruneState *ms_prune_state;
+ bool ms_valid_subplans_identified; /* is ms_valid_subplans valid? */
Bitmapset *ms_valid_subplans;
+ Bitmapset *ms_valid_asyncplans; /* valid asynchronous plans indexes */
} MergeAppendState;
+/* Getters for AppendState and MergeAppendState */
+static inline struct WaitEventSet *
+GetAppendEventSet(PlanState *ps)
+{
+ Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+ if (IsA(ps, AppendState))
+ return ((AppendState *)ps)->as_eventset;
+ else
+ return ((MergeAppendState *)ps)->ms_eventset;
+}
+
+static inline Bitmapset *
+GetNeedRequest(PlanState *ps)
+{
+ Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+ if (IsA(ps, AppendState))
+ return ((AppendState *)ps)->as_needrequest;
+ else
+ return ((MergeAppendState *)ps)->ms_needrequest;
+}
+
+static inline Bitmapset *
+GetValidAsyncplans(PlanState *ps)
+{
+ Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+ if (IsA(ps, AppendState))
+ return ((AppendState *)ps)->as_valid_asyncplans;
+ else
+ return ((MergeAppendState *)ps)->ms_valid_asyncplans;
+}
+
+static inline AsyncRequest*
+GetValidAsyncRequest(PlanState *ps, int nreq)
+{
+ Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+ if (IsA(ps, AppendState))
+ return ((AppendState *)ps)->as_asyncrequests[nreq];
+ else
+ return ((MergeAppendState *)ps)->ms_asyncrequests[nreq];
+}
+
/* ----------------
* RecursiveUnionState information
*
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 3aa3c16e4422..f1977f83c2c6 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
extern PGDLLIMPORT bool enable_presorted_aggregate;
extern PGDLLIMPORT bool enable_async_append;
+extern PGDLLIMPORT bool enable_async_merge_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 352abc0bd424..1d40bab2c6a4 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -149,6 +149,7 @@ select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
enable_async_append | on
+ enable_async_merge_append | on
enable_bitmapscan | on
enable_distinct_reordering | on
enable_gathermerge | on