char *socket_dir; /* directory for Unix-domain socket, if any */
char *sub_port; /* subscriber port number */
const char *sub_username; /* subscriber username */
+ bool two_phase; /* enable-two-phase option */
SimpleStringList database_names; /* list of database names */
SimpleStringList pub_names; /* list of publication names */
SimpleStringList sub_names; /* list of subscription names */
int recovery_timeout; /* stop recovery after this time */
};
+/* per-database publication/subscription info */
struct LogicalRepInfo
{
char *dbname; /* database name */
bool made_publication; /* publication was created */
};
+/*
+ * Information shared across all the databases (or publications and
+ * subscriptions).
+ */
+struct LogicalRepInfos
+{
+ struct LogicalRepInfo *dbinfo;
+ bool two_phase; /* enable-two-phase option */
+};
+
static void cleanup_objects_atexit(void);
static void usage();
static char *get_base_conninfo(const char *conninfo, char **dbname);
static bool success = false;
-static struct LogicalRepInfo *dbinfo;
+static struct LogicalRepInfos dbinfos;
static int num_dbs = 0; /* number of specified databases */
static int num_pubs = 0; /* number of specified publications */
static int num_subs = 0; /* number of specified subscriptions */
for (int i = 0; i < num_dbs; i++)
{
- if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
+ if (dbinfos.dbinfo[i].made_publication || dbinfos.dbinfo[i].made_replslot)
{
PGconn *conn;
- conn = connect_database(dbinfo[i].pubconninfo, false);
+ conn = connect_database(dbinfos.dbinfo[i].pubconninfo, false);
if (conn != NULL)
{
- if (dbinfo[i].made_publication)
- drop_publication(conn, &dbinfo[i]);
- if (dbinfo[i].made_replslot)
- drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
+ if (dbinfos.dbinfo[i].made_publication)
+ drop_publication(conn, &dbinfos.dbinfo[i]);
+ if (dbinfos.dbinfo[i].made_replslot)
+ drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname);
disconnect_database(conn, false);
}
else
* that some objects were left on primary and should be
* removed before trying again.
*/
- if (dbinfo[i].made_publication)
+ if (dbinfos.dbinfo[i].made_publication)
{
pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
- dbinfo[i].pubname, dbinfo[i].dbname);
+ dbinfos.dbinfo[i].pubname,
+ dbinfos.dbinfo[i].dbname);
pg_log_warning_hint("Drop this publication before trying again.");
}
- if (dbinfo[i].made_replslot)
+ if (dbinfos.dbinfo[i].made_replslot)
{
pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
- dbinfo[i].replslotname, dbinfo[i].dbname);
+ dbinfos.dbinfo[i].replslotname,
+ dbinfos.dbinfo[i].dbname);
pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
}
}
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
+ printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" --config-file=FILENAME use specified main server configuration\n"
dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
dbinfo[i].pubconninfo);
- pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+ pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
- dbinfo[i].subconninfo);
+ dbinfo[i].subconninfo,
+ dbinfos.two_phase ? "true" : "false");
if (num_pubs > 0)
pubcell = pubcell->next;
failed = true;
}
- if (max_prepared_transactions != 0)
+ if (max_prepared_transactions != 0 && !dbinfos.two_phase)
{
pg_log_warning("two_phase option will not be enabled for replication slots");
pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
"Prepared transactions will be replicated at COMMIT PREPARED.");
+ pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
}
/*
slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
appendPQExpBuffer(str,
- "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
- slot_name_esc);
+ "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
+ slot_name_esc,
+ dbinfos.two_phase ? "true" : "false");
PQfreemem(slot_name_esc);
appendPQExpBuffer(str,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
"WITH (create_slot = false, enabled = false, "
- "slot_name = %s, copy_data = false)",
- subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+ "slot_name = %s, copy_data = false, two_phase = %s)",
+ subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
+ dbinfos.two_phase ? "true" : "false");
PQfreemem(pubname_esc);
PQfreemem(subname_esc);
{"publisher-server", required_argument, NULL, 'P'},
{"socketdir", required_argument, NULL, 's'},
{"recovery-timeout", required_argument, NULL, 't'},
+ {"enable-two-phase", no_argument, NULL, 'T'},
{"subscriber-username", required_argument, NULL, 'U'},
{"verbose", no_argument, NULL, 'v'},
{"version", no_argument, NULL, 'V'},
opt.socket_dir = NULL;
opt.sub_port = DEFAULT_SUB_PORT;
opt.sub_username = NULL;
+ opt.two_phase = false;
opt.database_names = (SimpleStringList)
{
0
get_restricted_token();
- while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+ while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
long_options, &option_index)) != -1)
{
switch (c)
case 't':
opt.recovery_timeout = atoi(optarg);
break;
+ case 'T':
+ opt.two_phase = true;
+ break;
case 'U':
opt.sub_username = pg_strdup(optarg);
break;
/* Rudimentary check for a data directory */
check_data_directory(subscriber_dir);
+ dbinfos.two_phase = opt.two_phase;
+
/*
* Store database information for publisher and subscriber. It should be
* called before atexit() because its return is used in the
* cleanup_objects_atexit().
*/
- dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
+ dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
/* Register a function to clean up objects in case of failure */
atexit(cleanup_objects_atexit);
* Check if the subscriber data directory has the same system identifier
* than the publisher data directory.
*/
- pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
+ pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
sub_sysid = get_standby_sysid(subscriber_dir);
if (pub_sysid != sub_sysid)
pg_fatal("subscriber data directory is not a copy of the source database cluster");
start_standby_server(&opt, true, false);
/* Check if the standby server is ready for logical replication */
- check_subscriber(dbinfo);
+ check_subscriber(dbinfos.dbinfo);
/* Check if the primary server is ready for logical replication */
- check_publisher(dbinfo);
+ check_publisher(dbinfos.dbinfo);
/*
* Stop the target server. The recovery process requires that the server
stop_standby_server(subscriber_dir);
/* Create the required objects for each database on publisher */
- consistent_lsn = setup_publisher(dbinfo);
+ consistent_lsn = setup_publisher(dbinfos.dbinfo);
/* Write the required recovery parameters */
- setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
+ setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
/*
* Start subscriber so the recovery parameters will take effect. Wait
start_standby_server(&opt, true, true);
/* Waiting the subscriber to be promoted */
- wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
+ wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
/*
* Create the subscription for each database on subscriber. It does not
* point to the LSN reported by setup_publisher(). It also cleans up
* publications created by this tool and replication to the standby.
*/
- setup_subscriber(dbinfo, consistent_lsn);
+ setup_subscriber(dbinfos.dbinfo, consistent_lsn);
/* Remove primary_slot_name if it exists on primary */
- drop_primary_replication_slot(dbinfo, primary_slot_name);
+ drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
/* Remove failover replication slots if they exist on subscriber */
- drop_failover_replication_slots(dbinfo);
+ drop_failover_replication_slots(dbinfos.dbinfo);
/* Stop the subscriber */
pg_log_info("stopping the subscriber");