Add option to enable two_phase commits via pg_create_logical_replication_slot.
authorAmit Kapila <[email protected]>
Wed, 3 Mar 2021 01:58:43 +0000 (07:28 +0530)
committerAmit Kapila <[email protected]>
Wed, 3 Mar 2021 02:04:11 +0000 (07:34 +0530)
Commit 0aa8a01d04 extends the output plugin API to allow decoding of
prepared xacts and allowed the user to enable/disable the two-phase option
via pg_logical_slot_get_changes(). This can lead to a problem such that
the first time when it gets changes via pg_logical_slot_get_changes()
without two_phase option enabled it will not get the prepared even though
prepare is after consistent snapshot. Now next time during getting changes,
if the two_phase option is enabled it can skip prepare because by that
time start decoding point has been moved. So the user will only get commit
prepared.

Allow to enable/disable this option at the create slot time and default
will be false. It will break the existing slots which is fine in a major
release.

Author: Ajin Cherian
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com

20 files changed:
contrib/test_decoding/expected/twophase.out
contrib/test_decoding/expected/twophase_snapshot.out
contrib/test_decoding/expected/twophase_stream.out
contrib/test_decoding/specs/twophase_snapshot.spec
contrib/test_decoding/sql/twophase.sql
contrib/test_decoding/sql/twophase_stream.sql
contrib/test_decoding/test_decoding.c
doc/src/sgml/catalogs.sgml
doc/src/sgml/func.sgml
doc/src/sgml/logicaldecoding.sgml
src/backend/catalog/system_views.sql
src/backend/replication/logical/logical.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/nodes/replnodes.h
src/include/replication/slot.h
src/test/regress/expected/rules.out

index 8a1d06d706d3aea6fe1d7027fbb7e239b2ff6c14..e5e0f968961710cf3a381448415f66c27360612d 100644 (file)
@@ -1,7 +1,7 @@
 -- Test prepared transactions. When two-phase-commit is enabled, transactions are
 -- decoded at PREPARE time rather than at COMMIT PREPARED time.
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
  ?column? 
 ----------
  init
@@ -15,14 +15,14 @@ BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 INSERT INTO test_prepared1 VALUES (2);
 -- should show nothing because the xact has not been prepared yet.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
  data 
 ------
 (0 rows)
 
 PREPARE TRANSACTION 'test_prepared#1';
 -- should show both the above inserts and the PREPARE TRANSACTION.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                         data                        
 ----------------------------------------------------
  BEGIN
@@ -32,7 +32,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 (4 rows)
 
 COMMIT PREPARED 'test_prepared#1';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                data                
 -----------------------------------
  COMMIT PREPARED 'test_prepared#1'
@@ -42,7 +42,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                         data                        
 ----------------------------------------------------
  BEGIN
@@ -51,7 +51,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 (3 rows)
 
 ROLLBACK PREPARED 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                 data                 
 -------------------------------------
  ROLLBACK PREPARED 'test_prepared#2'
@@ -74,7 +74,7 @@ WHERE locktype = 'relation'
 (2 rows)
 
 -- The insert should show the newly altered column but not the DDL.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                   data                                   
 -------------------------------------------------------------------------
  BEGIN
@@ -89,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 -- the ALTER will stop us inserting into the other one.
 --
 INSERT INTO test_prepared2 VALUES (5);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                         data                        
 ----------------------------------------------------
  BEGIN
@@ -98,7 +98,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 (3 rows)
 
 COMMIT PREPARED 'test_prepared#3';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                data                
 -----------------------------------
  COMMIT PREPARED 'test_prepared#3'
@@ -107,7 +107,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
 INSERT INTO test_prepared2 VALUES (7);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                 data                                
 --------------------------------------------------------------------
  BEGIN
@@ -138,7 +138,7 @@ WHERE locktype = 'relation'
 
 -- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
 SET statement_timeout = '180s';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                    data                                    
 ---------------------------------------------------------------------------
  BEGIN
@@ -150,7 +150,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                  data                 
 --------------------------------------
  COMMIT PREPARED 'test_prepared_lock'
@@ -166,7 +166,7 @@ INSERT INTO test_prepared_savepoint VALUES (2);
 ROLLBACK TO SAVEPOINT test_savepoint;
 PREPARE TRANSACTION 'test_prepared_savepoint';
 -- should show only 1, not 2
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                             data                            
 ------------------------------------------------------------
  BEGIN
@@ -176,7 +176,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                    data                    
 -------------------------------------------
  COMMIT PREPARED 'test_prepared_savepoint'
@@ -187,14 +187,14 @@ BEGIN;
 INSERT INTO test_prepared1 VALUES (20);
 PREPARE TRANSACTION 'test_prepared_nodecode';
 -- should show nothing
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
  data 
 ------
 (0 rows)
 
 COMMIT PREPARED 'test_prepared_nodecode';
 -- should be decoded now
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                 data                                 
 ---------------------------------------------------------------------
  BEGIN
@@ -207,7 +207,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 -- show results. There should be nothing to show
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
  data 
 ------
 (0 rows)
index 14d93876462ed6d09e33749677318f84446c04a2..0e8e1f50fe49772e8eedac64e03d4f2386150beb 100644 (file)
@@ -6,7 +6,7 @@ step s2txid: SELECT pg_current_xact_id() IS NULL;
 ?column?       
 
 f              
-step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true); <waiting ...>
 step s3b: BEGIN;
 step s3txid: SELECT pg_current_xact_id() IS NULL;
 ?column?       
@@ -22,14 +22,14 @@ step s1init: <... completed>
 
 init           
 step s1insert: INSERT INTO do_write DEFAULT VALUES;
-step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.do_write: INSERT: id[integer]:2
 COMMIT         
 step s2cp: COMMIT PREPARED 'test1';
-step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
index d54e640b409de37f0471d7c37dcad55931e07332..b08bb0e5730be8decc0eef3e7314766520aead6f 100644 (file)
@@ -1,6 +1,6 @@
 -- Test streaming of two-phase commits
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
  ?column? 
 ----------
  init
@@ -28,7 +28,7 @@ ROLLBACK TO s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1';
 -- should show the inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
  streaming message: transactional: 1 prefix: test, sz: 50
@@ -59,7 +59,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
           data           
 -------------------------
  COMMIT PREPARED 'test1'
@@ -81,7 +81,7 @@ ROLLBACK to s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1_nodecode';
 -- should NOT show inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
  streaming message: transactional: 1 prefix: test, sz: 50
@@ -89,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 
 COMMIT PREPARED 'test1_nodecode';
 -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                             data                             
 -------------------------------------------------------------
  BEGIN
index 3e700404e0e5f17b9d8d97eac89398073cf265cc..e8d9567fb9a245757c0347d40eb4acfce3631e1b 100644 (file)
@@ -15,8 +15,8 @@ teardown
 session "s1"
 setup { SET synchronous_commit=on; }
 
-step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
-step "s1start" {SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');}
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);}
+step "s1start" {SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');}
 step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
 
 session "s2"
index dacedfe1814e4df6a3d87f114e2bf5254b03cf5c..05f18e84948b9e548455315191d0dfe3233d886a 100644 (file)
@@ -1,7 +1,7 @@
 -- Test prepared transactions. When two-phase-commit is enabled, transactions are
 -- decoded at PREPARE time rather than at COMMIT PREPARED time.
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
 
 CREATE TABLE test_prepared1(id integer primary key);
 CREATE TABLE test_prepared2(id integer primary key);
@@ -12,20 +12,20 @@ BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 INSERT INTO test_prepared1 VALUES (2);
 -- should show nothing because the xact has not been prepared yet.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 PREPARE TRANSACTION 'test_prepared#1';
 -- should show both the above inserts and the PREPARE TRANSACTION.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared#1';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 ROLLBACK PREPARED 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
 BEGIN;
@@ -38,7 +38,7 @@ FROM pg_locks
 WHERE locktype = 'relation'
   AND relation = 'test_prepared1'::regclass;
 -- The insert should show the newly altered column but not the DDL.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test that we decode correctly while an uncommitted prepared xact
 -- with ddl exists.
@@ -47,14 +47,14 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 -- the ALTER will stop us inserting into the other one.
 --
 INSERT INTO test_prepared2 VALUES (5);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 COMMIT PREPARED 'test_prepared#3';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
 INSERT INTO test_prepared2 VALUES (7);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
 -- logical decoding.
@@ -70,11 +70,11 @@ WHERE locktype = 'relation'
   AND relation = 'test_prepared1'::regclass;
 -- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
 SET statement_timeout = '180s';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -86,26 +86,26 @@ INSERT INTO test_prepared_savepoint VALUES (2);
 ROLLBACK TO SAVEPOINT test_savepoint;
 PREPARE TRANSACTION 'test_prepared_savepoint';
 -- should show only 1, not 2
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
 INSERT INTO test_prepared1 VALUES (20);
 PREPARE TRANSACTION 'test_prepared_nodecode';
 -- should show nothing
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared_nodecode';
 -- should be decoded now
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test 8:
 -- cleanup and make sure results are also empty
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 -- show results. There should be nothing to show
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 SELECT pg_drop_replication_slot('regression_slot');
index e9dd44fdb37bb79aee8b2b05c33312fa3d7a202a..646076da2074dc90b5b8dff9fea544ed2d620fd2 100644 (file)
@@ -1,7 +1,7 @@
 -- Test streaming of two-phase commits
 
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
 
 CREATE TABLE stream_test(data text);
 
@@ -18,11 +18,11 @@ ROLLBACK TO s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1';
 -- should show the inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
@@ -35,11 +35,11 @@ ROLLBACK to s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1_nodecode';
 -- should NOT show inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 COMMIT PREPARED 'test1_nodecode';
 -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
index 929255eac74660cdba0531971f98a457eecd0352..ae5f397f35118ac44099ee1ca1c31c3328ad6617 100644 (file)
@@ -164,7 +164,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
    ListCell   *option;
    TestDecodingData *data;
    bool        enable_streaming = false;
-   bool        enable_twophase = false;
 
    data = palloc0(sizeof(TestDecodingData));
    data->context = AllocSetContextCreate(ctx->context,
@@ -265,16 +264,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                         errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                strVal(elem->arg), elem->defname)));
        }
-       else if (strcmp(elem->defname, "two-phase-commit") == 0)
-       {
-           if (elem->arg == NULL)
-               continue;
-           else if (!parse_bool(strVal(elem->arg), &enable_twophase))
-               ereport(ERROR,
-                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                        errmsg("could not parse value \"%s\" for parameter \"%s\"",
-                               strVal(elem->arg), elem->defname)));
-       }
        else
        {
            ereport(ERROR,
@@ -286,7 +275,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
    }
 
    ctx->streaming &= enable_streaming;
-   ctx->twophase &= enable_twophase;
 }
 
 /* cleanup this plugin's resources */
index db29905e91ffa87d7bf9b02c8e4976dc18d0f9cb..b1de6d0674b22f180ad777a6afab48883dab5726 100644 (file)
@@ -11529,6 +11529,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        is <literal>-1</literal>.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>two_phase</structfield> <type>bool</type>
+      </para>
+      <para>
+       True if the slot is enabled for decoding prepared transactions.  Always
+       false for physical slots.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index 6c189bfed25b68b2b02f44607f9f81f299a685f3..bf99f821496aa525aa4a868d0d653dc50ff0213c 100644 (file)
@@ -25559,7 +25559,7 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
         <indexterm>
          <primary>pg_create_logical_replication_slot</primary>
         </indexterm>
-        <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type> </optional> )
+        <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>two_phase</parameter> <type>boolean</type> </optional> )
         <returnvalue>record</returnvalue>
         ( <parameter>slot_name</parameter> <type>name</type>,
         <parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -25571,9 +25571,11 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
         parameter, <parameter>temporary</parameter>, when set to true, specifies that
         the slot should not be permanently stored to disk and is only meant
         for use by the current session. Temporary slots are also
-        released upon any error. A call to this function has the same
-        effect as the replication protocol command
-        <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
+        released upon any error. The optional fourth parameter,
+        <parameter>two_phase</parameter>, when set to true, specifies
+        that the decoding of prepared transactions is enabled for this
+        slot. A call to this function has the same effect as the replication
+        protocol command <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </para></entry>
       </row>
 
index f1f13d81d569806bf14cd61099cd66085db7830e..80eb96d609a91a7dd6a939c3cc02c313bdbbf72a 100644 (file)
@@ -55,7 +55,7 @@
 
 <programlisting>
 postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
-postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
     slot_name    |    lsn
 -----------------+-----------
  regression_slot | 0/16B1970
@@ -169,17 +169,18 @@ $ pg_recvlogical -d postgres --slot=test --drop-slot
   <para>
   The following example shows SQL interface that can be used to decode prepared
   transactions. Before you use two-phase commit commands, you must set
-  <varname>max_prepared_transactions</varname> to at least 1. You must also set
-  the option 'two-phase-commit' to 1 while calling
-  <function>pg_logical_slot_get_changes</function>. Note that we will stream
-  the entire transaction after the commit if it is not already decoded.
+  <varname>max_prepared_transactions</varname> to at least 1. You must also have
+  set the two-phase parameter as 'true' while creating the slot using
+  <function>pg_create_logical_replication_slot</function>
+  Note that we will stream the entire transaction after the commit if it
+  is not already decoded.
   </para>
 <programlisting>
 postgres=# BEGIN;
 postgres=*# INSERT INTO data(data) VALUES('5');
 postgres=*# PREPARE TRANSACTION 'test_prepared1';
 
-postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                          data                           
 -----------+-----+---------------------------------------------------------
  0/1689DC0 | 529 | BEGIN 529
@@ -188,7 +189,7 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
 (3 rows)
 
 postgres=# COMMIT PREPARED 'test_prepared1';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                    data                    
 -----------+-----+--------------------------------------------
  0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
@@ -198,7 +199,7 @@ postgres=#-- you can also rollback a prepared transaction
 postgres=# BEGIN;
 postgres=*# INSERT INTO data(data) VALUES('6');
 postgres=*# PREPARE TRANSACTION 'test_prepared2';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                          data                           
 -----------+-----+---------------------------------------------------------
  0/168A180 | 530 | BEGIN 530
@@ -207,7 +208,7 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
 (3 rows)
 
 postgres=# ROLLBACK PREPARED 'test_prepared2';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                     data                     
 -----------+-----+----------------------------------------------
  0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
index fa58afd9d781403ded9de00ac956a8e70f1951d9..fc94a73a54a85b0a4996f634b95c30f547b33f78 100644 (file)
@@ -894,7 +894,8 @@ CREATE VIEW pg_replication_slots AS
             L.restart_lsn,
             L.confirmed_flush_lsn,
             L.wal_status,
-            L.safe_wal_size
+            L.safe_wal_size,
+            L.two_phase
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
@@ -1318,6 +1319,7 @@ AS 'pg_create_physical_replication_slot';
 CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
     IN slot_name name, IN plugin name,
     IN temporary boolean DEFAULT false,
+    IN twophase boolean DEFAULT false,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
index 3f6d723d09637a5f7b115cb4a99504c11297d26e..37b75deb728bb88195a0830ec8be11f98195238a 100644 (file)
@@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin,
        startup_cb_wrapper(ctx, &ctx->options, true);
    MemoryContextSwitchTo(old_context);
 
+   /*
+    * We allow decoding of prepared transactions iff the two_phase option is
+    * enabled at the time of slot creation.
+    */
+   ctx->twophase &= MyReplicationSlot->data.two_phase;
+
    ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
    return ctx;
@@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn,
        startup_cb_wrapper(ctx, &ctx->options, false);
    MemoryContextSwitchTo(old_context);
 
+   /*
+    * We allow decoding of prepared transactions iff the two_phase option is
+    * enabled at the time of slot creation.
+    */
+   ctx->twophase &= MyReplicationSlot->data.two_phase;
+
    ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
    ereport(LOG,
index fb4af2ef52d58691fe6607e271239a8f6c319883..75a087c2f9d27f5f499a50863af8d555f687bb3d 100644 (file)
@@ -216,10 +216,17 @@ ReplicationSlotValidateName(const char *name, int elevel)
  * name: Name of the slot
  * db_specific: logical decoding is db specific; if the slot is going to
  *    be used for that pass true, otherwise false.
+ * two_phase: Allows decoding of prepared transactions. We allow this option
+ *     to be enabled only at the slot creation time. If we allow this option
+ *     to be changed during decoding then it is quite possible that we skip
+ *     prepare first time because this option was not enabled. Now next time
+ *     during getting changes, if the two_phase  option is enabled it can skip
+ *     prepare because by that time start decoding point has been moved. So the
+ *     user will only get commit prepared.
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
-                     ReplicationSlotPersistency persistency)
+                     ReplicationSlotPersistency persistency, bool two_phase)
 {
    ReplicationSlot *slot = NULL;
    int         i;
@@ -277,6 +284,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
    namestrcpy(&slot->data.name, name);
    slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
    slot->data.persistency = persistency;
+   slot->data.two_phase = two_phase;
 
    /* and then data only present in shared memory */
    slot->just_dirtied = false;
index d24bb5b0b5f8fe022edfe64647459b87ea54ee0c..9817b4411366399a8eddfaecc9c8afa3e7a621f0 100644 (file)
@@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
 
    /* acquire replication slot, this will check for conflicting names */
    ReplicationSlotCreate(name, false,
-                         temporary ? RS_TEMPORARY : RS_PERSISTENT);
+                         temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
 
    if (immediately_reserve)
    {
@@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-                               bool temporary, XLogRecPtr restart_lsn,
+                               bool temporary, bool two_phase,
+                               XLogRecPtr restart_lsn,
                                bool find_startpoint)
 {
    LogicalDecodingContext *ctx = NULL;
@@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin,
     * error as well.
     */
    ReplicationSlotCreate(name, true,
-                         temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+                         temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
 
    /*
     * Create logical decoding context to find start point or, if we don't
@@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
    Name        name = PG_GETARG_NAME(0);
    Name        plugin = PG_GETARG_NAME(1);
    bool        temporary = PG_GETARG_BOOL(2);
+   bool        two_phase = PG_GETARG_BOOL(3);
    Datum       result;
    TupleDesc   tupdesc;
    HeapTuple   tuple;
@@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
    create_logical_replication_slot(NameStr(*name),
                                    NameStr(*plugin),
                                    temporary,
+                                   two_phase,
                                    InvalidXLogRecPtr,
                                    true);
 
@@ -236,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 13
+#define PG_GET_REPLICATION_SLOTS_COLS 14
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc   tupdesc;
    Tuplestorestate *tupstore;
@@ -432,6 +435,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
            values[i++] = Int64GetDatum(failLSN - currlsn);
        }
 
+       values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+
        Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -796,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
        create_logical_replication_slot(NameStr(*dst_name),
                                        plugin,
                                        temporary,
+                                       false,
                                        src_restart_lsn,
                                        false);
    }
index eb3f18ed487ceeaab5f4f61313c995efe4083bbb..23baa4498af2262452b4317255fb3d96bd59cfbe 100644 (file)
@@ -938,7 +938,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
    if (cmd->kind == REPLICATION_KIND_PHYSICAL)
    {
        ReplicationSlotCreate(cmd->slotname, false,
-                             cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
+                             cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
+                             false);
    }
    else
    {
@@ -952,7 +953,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
         * they get dropped on error as well.
         */
        ReplicationSlotCreate(cmd->slotname, true,
-                             cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+                             cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
+                             cmd->two_phase);
    }
 
    if (cmd->kind == REPLICATION_KIND_LOGICAL)
index 4cc94de224a8530370f23d6a4176977a62e20c3a..b19975c5c89543e8785db3c6f20af19428e6d19c 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202102191
+#define CATALOG_VERSION_NO 202103031
 
 #endif
index 1487710d590773c8a64170c24f78c1370f0de834..3d3974f46761a58db23184f29e733b3294dde381 100644 (file)
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
-  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
-  proallargtypes => '{name,name,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,o,o}',
-  proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool bool',
+  proallargtypes => '{name,name,bool,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
 { oid => '4222',
   descr => 'copy a logical replication slot, changing temporality and plugin',
index faa3a251f26d51b7a0f06ad1217a7cbae64f4198..ebc43a0293deb2b25cc042fd4f5e428e82bb406b 100644 (file)
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
    ReplicationKind kind;
    char       *plugin;
    bool        temporary;
+   bool        two_phase;
    List       *options;
 } CreateReplicationSlotCmd;
 
index 5c3fde20c6981b621a742abc48f33c64c2d52075..1ad5e6c50dffd698cbf3160f43a5f8b587c8a2b8 100644 (file)
@@ -98,6 +98,11 @@ typedef struct ReplicationSlotPersistentData
     */
    XLogRecPtr  initial_consistent_point;
 
+   /*
+    * Allow decoding of prepared transactions?
+    */
+   bool        two_phase;
+
    /* plugin name */
    NameData    plugin;
 } ReplicationSlotPersistentData;
@@ -199,7 +204,7 @@ extern void ReplicationSlotsShmemInit(void);
 
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
-                                 ReplicationSlotPersistency p);
+                                 ReplicationSlotPersistency p, bool two_phase);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
index 10a1f34ebc172a25d56cb296a3e1904f554ac809..b1c9b7bdfe397c6910916aaa5f70b40fe38623ba 100644 (file)
@@ -1477,8 +1477,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.restart_lsn,
     l.confirmed_flush_lsn,
     l.wal_status,
-    l.safe_wal_size
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
+    l.safe_wal_size,
+    l.two_phase
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,