<sect2>
<title>Exported Snapshots</title>
<para>
- When a new replication slot is created using the streaming replication interface,
- a snapshot is exported
+ When a new replication slot is created using the streaming replication
+ interface (see <xref linkend="protocol-replication-create-slot">), a
+ snapshot is exported
(see <xref linkend="functions-snapshot-synchronization">), which will show
exactly the state of the database after which all changes will be
included in the change stream. This can be used to create a new replica by
database's state at that point in time, which afterwards can be updated
using the slot's contents without losing any changes.
</para>
+ <para>
+ Creation of a snapshot is not always possible. In particular, it will
+ fail when connected to a hot standby. Applications that do not require
+ snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</>
+ option.
+ </para>
</sect2>
</sect1>
</listitem>
</varlistentry>
- <varlistentry>
- <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> }
+ <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
+ <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</> [ <literal>TEMPORARY</> ] { <literal>PHYSICAL</> [ <literal>RESERVE_WAL</> ] | <literal>LOGICAL</> <replaceable class="parameter">output_plugin</> [ <literal>EXPORT_SNAPSHOT</> | <literal>NOEXPORT_SNAPSHOT</> ] }
<indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
</term>
<listitem>
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><literal>EXPORT_SNAPSHOT</></term>
+ <term><literal>NOEXPORT_SNAPSHOT</></term>
+ <listitem>
+ <para>
+ Decides what to do with the snapshot created during logical slot
+ initialization. <literal>EXPORT_SNAPSHOT</>, which is the default,
+ will export the snapshot for use in other sessions. This option can't
+ be used inside a transaction. <literal>NOEXPORT_SNAPSHOT</> will
+ just use the snapshot for logical decoding as normal but won't do
+ anything else with it.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
<para>
PG_TRY();
{
- walrcv_create_slot(wrconn, slotname, false, &lsn);
+ /*
+ * Create permanent slot for the subscription. We won't use the
+ * initial snapshot for anything, so no need to export it.
+ */
+ walrcv_create_slot(wrconn, slotname, false, false, &lsn);
ereport(NOTICE,
(errmsg("created replication slot \"%s\" on publisher",
slotname)));
static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
+ bool export_snapshot,
XLogRecPtr *lsn);
static bool libpqrcv_command(WalReceiverConn *conn,
const char *cmd, char **err);
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
- bool temporary, XLogRecPtr *lsn)
+ bool temporary, bool export_snapshot, XLogRecPtr *lsn)
{
PGresult *res;
StringInfoData cmd;
initStringInfo(&cmd);
- appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
+ appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
if (temporary)
- appendStringInfo(&cmd, "TEMPORARY ");
+ appendStringInfo(&cmd, " TEMPORARY");
if (conn->logical)
- appendStringInfo(&cmd, "LOGICAL pgoutput");
+ {
+ appendStringInfo(&cmd, " LOGICAL pgoutput");
+ if (export_snapshot)
+ appendStringInfo(&cmd, " EXPORT_SNAPSHOT");
+ else
+ appendStringInfo(&cmd, " NOEXPORT_SNAPSHOT");
+ }
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
%token K_SLOT
%token K_RESERVE_WAL
%token K_TEMPORARY
+%token K_EXPORT_SNAPSHOT
+%token K_NOEXPORT_SNAPSHOT
%type <node> command
%type <node> base_backup start_replication start_logical_replication
%type <defelt> plugin_opt_elem
%type <node> plugin_opt_arg
%type <str> opt_slot var_name
-%type <boolval> opt_reserve_wal opt_temporary
+%type <boolval> opt_temporary
+%type <list> create_slot_opt_list
+%type <defelt> create_slot_opt
%%
create_replication_slot:
/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
- K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL opt_reserve_wal
+ K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_PHYSICAL;
cmd->slotname = $2;
cmd->temporary = $3;
- cmd->reserve_wal = $5;
+ cmd->options = $5;
$$ = (Node *) cmd;
}
/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
- | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT
+ | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
{
CreateReplicationSlotCmd *cmd;
cmd = makeNode(CreateReplicationSlotCmd);
cmd->slotname = $2;
cmd->temporary = $3;
cmd->plugin = $5;
+ cmd->options = $6;
$$ = (Node *) cmd;
}
;
+create_slot_opt_list:
+ create_slot_opt_list create_slot_opt
+ { $$ = lappend($1, $2); }
+ | /* EMPTY */
+ { $$ = NIL; }
+ ;
+
+create_slot_opt:
+ K_EXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ | K_NOEXPORT_SNAPSHOT
+ {
+ $$ = makeDefElem("export_snapshot",
+ (Node *)makeInteger(FALSE), -1);
+ }
+ | K_RESERVE_WAL
+ {
+ $$ = makeDefElem("reserve_wal",
+ (Node *)makeInteger(TRUE), -1);
+ }
+ ;
+
/* DROP_REPLICATION_SLOT slot */
drop_replication_slot:
K_DROP_REPLICATION_SLOT IDENT
| /* EMPTY */
;
-opt_reserve_wal:
- K_RESERVE_WAL { $$ = true; }
- | /* EMPTY */ { $$ = false; }
- ;
-
opt_temporary:
K_TEMPORARY { $$ = true; }
| /* EMPTY */ { $$ = false; }
LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
TEMPORARY { return K_TEMPORARY; }
+EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; }
+NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; }
"," { return ','; }
";" { return ';'; }
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
+#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
return count;
}
+/*
+ * Process extra options given to CREATE_REPLICATION_SLOT.
+ */
+static void
+parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
+ bool *reserve_wal,
+ bool *export_snapshot)
+{
+ ListCell *lc;
+ bool snapshot_action_given = false;
+ bool reserve_wal_given = false;
+
+ /* Parse options */
+ foreach (lc, cmd->options)
+ {
+ DefElem *defel = (DefElem *) lfirst(lc);
+
+ if (strcmp(defel->defname, "export_snapshot") == 0)
+ {
+ if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ snapshot_action_given = true;
+ *export_snapshot = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "reserve_wal") == 0)
+ {
+ if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ reserve_wal_given = true;
+ *reserve_wal = true;
+ }
+ else
+ elog(ERROR, "unrecognized option: %s", defel->defname);
+ }
+}
+
/*
* Create a new replication slot.
*/
const char *snapshot_name = NULL;
char xpos[MAXFNAMELEN];
char *slot_name;
+ bool reserve_wal = false;
+ bool export_snapshot = true;
DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc;
Assert(!MyReplicationSlot);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &export_snapshot);
+
/* setup state for XLogReadPage */
sendTimeLineIsHistoric = false;
sendTimeLine = ThisTimeLineID;
DecodingContextFindStartpoint(ctx);
/*
- * Export a plain (not of the snapbuild.c type) snapshot to the user
- * that can be imported into another session.
+ * Export the snapshot if we've been asked to do so.
+ *
+ * NB. We will convert the snapbuild.c kind of snapshot to normal
+ * snapshot when doing this.
*/
- snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
+ if (export_snapshot)
+ snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
if (!cmd->temporary)
ReplicationSlotPersist();
}
- else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
+ else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
{
ReplicationSlotReserveWal();
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
slot_name);
else
+ {
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
slot_name, plugin);
+ if (PQserverVersion(conn) >= 100000)
+ /* pg_recvlogical doesn't use an exported snapshot, so suppress */
+ appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
+ }
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
ReplicationKind kind;
char *plugin;
bool temporary;
- bool reserve_wal;
+ List *options;
} CreateReplicationSlotCmd;
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary,
- XLogRecPtr *lsn);
+ bool export_snapshot, XLogRecPtr *lsn);
typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
char **err);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
#define walrcv_command(conn, cmd, err) \
WalReceiverFunctions->walrcv_command(conn, cmd, err)
#define walrcv_disconnect(conn) \