}
/* Send CommandComplete message */
- pq_puttextmessage('C', "START_STREAMING");
+ EndReplicationCommand("START_STREAMING");
}
/*
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
- QueryCompletion qc;
-
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
- SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0);
- EndCommand(&qc, DestRemote, false);
}
/*
{
int parse_rc;
Node *cmd_node;
+ const char *cmdtag;
MemoryContext cmd_context;
MemoryContext old_context;
- QueryCompletion qc;
/*
* If WAL sender has been told that shutdown is getting close, switch its
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
+ cmdtag = "IDENTIFY_SYSTEM";
IdentifySystem();
+ EndReplicationCommand(cmdtag);
break;
case T_BaseBackupCmd:
- PreventInTransactionBlock(true, "BASE_BACKUP");
+ cmdtag = "BASE_BACKUP";
+ PreventInTransactionBlock(true, cmdtag);
SendBaseBackup((BaseBackupCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_CreateReplicationSlotCmd:
+ cmdtag = "CREATE_REPLICATION_SLOT";
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_DropReplicationSlotCmd:
+ cmdtag = "DROP_REPLICATION_SLOT";
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
- PreventInTransactionBlock(true, "START_REPLICATION");
+ cmdtag = "START_REPLICATION";
+ PreventInTransactionBlock(true, cmdtag);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
+ /* callees already sent their own completion message */
+
Assert(xlogreader != NULL);
break;
}
case T_TimeLineHistoryCmd:
- PreventInTransactionBlock(true, "TIMELINE_HISTORY");
+ cmdtag = "TIMELINE_HISTORY";
+ PreventInTransactionBlock(true, cmdtag);
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_VariableShowStmt:
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
+ cmdtag = "SHOW";
+
/* syscache access needs a transaction environment */
StartTransactionCommand();
GetPGVariable(n->name, dest);
CommitTransactionCommand();
+ EndReplicationCommand(cmdtag);
}
break;
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
- /* Send CommandComplete message */
- SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
- EndCommand(&qc, DestRemote, true);
-
/* Report to pgstat that this process is now idle */
pgstat_report_activity(STATE_IDLE, NULL);
debug_query_string = NULL;
PG_CMDTAG(CMDTAG_DROP_POLICY, "DROP POLICY", true, false, false)
PG_CMDTAG(CMDTAG_DROP_PROCEDURE, "DROP PROCEDURE", true, false, false)
PG_CMDTAG(CMDTAG_DROP_PUBLICATION, "DROP PUBLICATION", true, false, false)
-PG_CMDTAG(CMDTAG_DROP_REPLICATION_SLOT, "DROP REPLICATION SLOT", false, false, false)
PG_CMDTAG(CMDTAG_DROP_ROLE, "DROP ROLE", false, false, false)
PG_CMDTAG(CMDTAG_DROP_ROUTINE, "DROP ROUTINE", true, false, false)
PG_CMDTAG(CMDTAG_DROP_RULE, "DROP RULE", true, false, false)