</programlisting>
<para>
- The following example shows how logical decoding is controlled over the
+ The following examples shows how logical decoding is controlled over the
streaming replication protocol, using the
program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
distribution. This requires that client authentication is set up to allow
replication connections
(see <xref linkend="streaming-replication-authentication"/>) and
that <varname>max_wal_senders</varname> is set sufficiently high to allow
- an additional connection.
+ an additional connection. The second example shows how to stream two-phase
+ transactions. Before you use two-phase commands, you must set
+ <xref linkend="guc-max-prepared-transactions"/> to atleast 1.
</para>
<programlisting>
+Example 1:
$ pg_recvlogical -d postgres --slot=test --create-slot
$ pg_recvlogical -d postgres --slot=test --start -f -
<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
COMMIT 693
<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
$ pg_recvlogical -d postgres --slot=test --drop-slot
+
+Example 2:
+$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
+$ pg_recvlogical -d postgres --slot=test --start -f -
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
+$ fg
+BEGIN 694
+table public.data: INSERT: id[integer]:5 data[text]:'5'
+PREPARE TRANSACTION 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "COMMIT PREPARED 'test';"
+$ fg
+COMMIT PREPARED 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
+$ pg_recvlogical -d postgres --slot=test --drop-slot
</programlisting>
<para>
</varlistentry>
<varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
- <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> ] }
+ <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
</term>
<listitem>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>TWO_PHASE</literal></term>
+ <listitem>
+ <para>
+ Specify that this logical replication slot supports decoding of two-phase
+ transactions. With this option, two-phase commands like
+ <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
+ and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+ The transaction will be decoded and transmitted at
+ <literal>PREPARE TRANSACTION</literal> time.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>RESERVE_WAL</literal></term>
<listitem>
<option>--plugin</option>, for the database specified
by <option>--dbname</option>.
</para>
+
+ <para>
+ The <option>--two-phase</option> can be specified with
+ <option>--create-slot</option> to enable two-phase decoding.
+ </para>
</listitem>
</varlistentry>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>-t</option></term>
+ <term><option>--two-phase</option></term>
+ <listitem>
+ <para>
+ Enables two-phase decoding. This option should only be specified with
+ <option>--create-slot</option>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>-v</option></term>
<term><option>--verbose</option></term>
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
+%token K_TWO_PHASE
%token K_EXPORT_SNAPSHOT
%token K_NOEXPORT_SNAPSHOT
%token K_USE_SNAPSHOT
$$ = makeDefElem("reserve_wal",
(Node *)makeInteger(true), -1);
}
+ | K_TWO_PHASE
+ {
+ $$ = makeDefElem("two_phase",
+ (Node *)makeInteger(true), -1);
+ }
;
/* DROP_REPLICATION_SLOT slot */
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
+TWO_PHASE { return K_TWO_PHASE; }
EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
- CRSSnapshotAction *snapshot_action)
+ CRSSnapshotAction *snapshot_action,
+ bool *two_phase)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
+ bool two_phase_given = false;
/* Parse options */
foreach(lc, cmd->options)
reserve_wal_given = true;
*reserve_wal = true;
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_given = true;
+ *two_phase = true;
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
char xloc[MAXFNAMELEN];
char *slot_name;
bool reserve_wal = false;
+ bool two_phase = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
Assert(!MyReplicationSlot);
- parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
/* setup state for WalSndSegmentOpen */
sendTimeLineIsHistoric = false;
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- false);
+ two_phase);
}
if (cmd->kind == REPLICATION_KIND_LOGICAL)
if (temp_replication_slot || create_slot)
{
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
- temp_replication_slot, true, true, false))
+ temp_replication_slot, true, true, false, false))
exit(1);
if (verbose)
pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
- slot_exists_ok))
+ slot_exists_ok, false))
exit(1);
exit(0);
}
/* Global Options */
static char *outfile = NULL;
static int verbose = 0;
+static bool two_phase = false;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */
printf(_(" -s, --status-interval=SECS\n"
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
+ printf(_(" -t, --two-phase enable two-phase decoding when creating a slot\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
{"fsync-interval", required_argument, NULL, 'F'},
{"no-loop", no_argument, NULL, 'n'},
{"verbose", no_argument, NULL, 'v'},
+ {"two-phase", no_argument, NULL, 't'},
{"version", no_argument, NULL, 'V'},
{"help", no_argument, NULL, '?'},
/* connection options */
}
}
- while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
+ while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
long_options, &option_index)) != -1)
{
switch (c)
case 'v':
verbose++;
break;
+ case 't':
+ two_phase = true;
+ break;
/* connection options */
case 'd':
dbname = pg_strdup(optarg);
exit(1);
}
+ if (two_phase && !do_create_slot)
+ {
+ pg_log_error("--two-phase may only be specified with --create-slot");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+
#ifndef WIN32
pqsignal(SIGINT, sigint_handler);
pqsignal(SIGHUP, sighup_handler);
pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
- false, false, slot_exists_ok))
+ false, false, slot_exists_ok, two_phase))
exit(1);
startpos = InvalidXLogRecPtr;
}
bool
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
bool is_temporary, bool is_physical, bool reserve_wal,
- bool slot_exists_ok)
+ bool slot_exists_ok, bool two_phase)
{
PQExpBuffer query;
PGresult *res;
Assert((is_physical && plugin == NULL) ||
(!is_physical && plugin != NULL));
+ Assert(!(two_phase && is_physical));
Assert(slot_name != NULL);
/* Build query */
else
{
appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+ if (two_phase && PQserverVersion(conn) >= 150000)
+ appendPQExpBufferStr(query, " TWO_PHASE");
+
if (PQserverVersion(conn) >= 100000)
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
const char *plugin, bool is_temporary,
bool is_physical, bool reserve_wal,
- bool slot_exists_ok);
+ bool slot_exists_ok, bool two_phase);
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
extern bool RunIdentifySystem(PGconn *conn, char **sysid,
TimeLineID *starttli,
use warnings;
use TestLib;
use PostgresNode;
-use Test::More tests => 15;
+use Test::More tests => 20;
program_help_ok('pg_recvlogical');
program_version_ok('pg_recvlogical');
max_wal_senders = 4
log_min_messages = 'debug1'
log_error_verbosity = verbose
+max_prepared_transactions = 10
});
$node->dump_info;
$node->start;
'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
],
'replayed a transaction');
+
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S',
+ 'test', '-d',
+ $node->connstr('postgres'), '--drop-slot'
+ ],
+ 'slot dropped');
+
+#test with two-phase option enabled
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S',
+ 'test', '-d',
+ $node->connstr('postgres'), '--create-slot', '--two-phase'
+ ],
+ 'slot with two-phase created');
+
+$slot = $node->slot('test');
+isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
+
+$node->safe_psql('postgres',
+ "BEGIN; INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'");
+$node->safe_psql('postgres',
+ "COMMIT PREPARED 'test'");
+$nextlsn =
+ $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
+chomp($nextlsn);
+
+$node->command_fails(
+ [
+ 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+ '--start', '--endpos', "$nextlsn", '--two-phase', '--no-loop', '-f', '-'
+ ],
+ 'incorrect usage');
+
+$node->command_ok(
+ [
+ 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+ '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+ ],
+ 'replayed a two-phase transaction');