static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
-static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid);
+static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
static void SnapBuildFreeSnapshot(Snapshot snap);
* and ->subxip/subxcnt values.
*/
static Snapshot
-SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
+SnapBuildBuildSnapshot(SnapBuild *builder)
{
Snapshot snapshot;
Size ssize;
if (TransactionIdIsValid(MyPgXact->xmin))
elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid");
- snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId());
+ snap = SnapBuildBuildSnapshot(builder);
/*
* We know that snap->xmin is alive, enforced by the logical xmin
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
{
- builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot);
}
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
{
- builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
/* increase refcount for the snapshot builder */
SnapBuildSnapIncRefcount(builder->snapshot);
}
if (builder->snapshot)
SnapBuildSnapDecRefcount(builder->snapshot);
- builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
/* we might need to execute invalidations, add snapshot */
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
{
SnapBuildSnapDecRefcount(builder->snapshot);
}
- builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId);
+ builder->snapshot = SnapBuildBuildSnapshot(builder);
SnapBuildSnapIncRefcount(builder->snapshot);
ReorderBufferSetRestartPoint(builder->reorder, lsn);
* predicate lock maintenance
* GetSerializableTransactionSnapshot(Snapshot snapshot)
* SetSerializableTransactionSnapshot(Snapshot snapshot,
- * TransactionId sourcexid)
+ * VirtualTransactionId *sourcevxid)
* RegisterPredicateLockingXid(void)
* PredicateLockRelation(Relation relation, Snapshot snapshot)
* PredicateLockPage(Relation relation, BlockNumber blkno,
static void SummarizeOldestCommittedSxact(void);
static Snapshot GetSafeSnapshot(Snapshot snapshot);
static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
- TransactionId sourcexid);
+ VirtualTransactionId *sourcevxid,
+ int sourcepid);
static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
PREDICATELOCKTARGETTAG *parent);
* one passed to it, but we avoid assuming that here.
*/
snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
- InvalidTransactionId);
+ NULL, InvalidPid);
if (MySerializableXact == InvalidSerializableXact)
return snapshot; /* no concurrent r/w xacts; it's safe */
return GetSafeSnapshot(snapshot);
return GetSerializableTransactionSnapshotInt(snapshot,
- InvalidTransactionId);
+ NULL, InvalidPid);
}
/*
*/
void
SetSerializableTransactionSnapshot(Snapshot snapshot,
- TransactionId sourcexid)
+ VirtualTransactionId *sourcevxid,
+ int sourcepid)
{
Assert(IsolationIsSerializable());
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
- (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
+ (void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid,
+ sourcepid);
}
/*
*/
static Snapshot
GetSerializableTransactionSnapshotInt(Snapshot snapshot,
- TransactionId sourcexid)
+ VirtualTransactionId *sourcevxid,
+ int sourcepid)
{
PGPROC *proc;
VirtualTransactionId vxid;
} while (!sxact);
/* Get the snapshot, or check that it's safe to use */
- if (!TransactionIdIsValid(sourcexid))
+ if (!sourcevxid)
snapshot = GetSnapshotData(snapshot);
- else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
+ else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid))
{
ReleasePredXact(sxact);
LWLockRelease(SerializableXactHashLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
- errdetail("The source transaction %u is not running anymore.",
- sourcexid)));
+ errdetail("The source process with pid %d is not running anymore.",
+ sourcepid)));
}
/*
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
+#include "storage/sinvaladt.h"
#include "storage/spin.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
/* Define pathname of exported-snapshot files */
#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
-#define XactExportFilePath(path, xid, num, suffix) \
- snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
- xid, num, suffix)
-/* Current xact's exported snapshots (a list of Snapshot structs) */
+/* Structure holding info about exported snapshot. */
+typedef struct ExportedSnapshot
+{
+ char *snapfile;
+ Snapshot snapshot;
+} ExportedSnapshot;
+
+/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
static List *exportedSnapshots = NIL;
/* Prototypes for local functions */
* in GetTransactionSnapshot.
*/
static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
- PGPROC *sourceproc)
+SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
+ int sourcepid, PGPROC *sourceproc)
{
/* Caller should have checked this already */
Assert(!FirstSnapshotSet);
errmsg("could not import the requested snapshot"),
errdetail("The source transaction is not running anymore.")));
}
- else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+ else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not import the requested snapshot"),
- errdetail("The source transaction %u is not running anymore.",
- sourcexid)));
+ errdetail("The source process with pid %d is not running anymore.",
+ sourcepid)));
/*
* In transaction-snapshot mode, the first snapshot must live until end of
if (IsolationUsesXactSnapshot())
{
if (IsolationIsSerializable())
- SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
+ SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
+ sourcepid);
/* Make a saved copy */
CurrentSnapshot = CopySnapshot(CurrentSnapshot);
FirstXactSnapshot = CurrentSnapshot;
*/
if (exportedSnapshots != NIL)
{
- TransactionId myxid = GetTopTransactionId();
- int i;
- char buf[MAXPGPATH];
ListCell *lc;
/*
* Get rid of the files. Unlink failure is only a WARNING because (1)
* it's too late to abort the transaction, and (2) leaving a leaked
* file around has little real consequence anyway.
- */
- for (i = 1; i <= list_length(exportedSnapshots); i++)
- {
- XactExportFilePath(buf, myxid, i, "");
- if (unlink(buf))
- elog(WARNING, "could not unlink file \"%s\": %m", buf);
- }
-
- /*
- * As with the FirstXactSnapshot, we needn't spend any effort on
- * cleaning up the per-snapshot data structures, but we do need to
- * remove them from RegisteredSnapshots to prevent a warning below.
+ *
+ * We also also need to remove the snapshots from RegisteredSnapshots
+ * to prevent a warning below.
+ *
+ * As with the FirstXactSnapshot, we don't need to free resources of
+ * the snapshot iself as it will go away with the memory context.
*/
foreach(lc, exportedSnapshots)
{
- Snapshot snap = (Snapshot) lfirst(lc);
+ ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
- pairingheap_remove(&RegisteredSnapshots, &snap->ph_node);
+ if (unlink(esnap->snapfile))
+ elog(WARNING, "could not unlink file \"%s\": %m",
+ esnap->snapfile);
+
+ pairingheap_remove(&RegisteredSnapshots,
+ &esnap->snapshot->ph_node);
}
exportedSnapshots = NIL;
{
TransactionId topXid;
TransactionId *children;
+ ExportedSnapshot *esnap;
int nchildren;
int addTopXid;
StringInfoData buf;
*/
/*
- * This will assign a transaction ID if we do not yet have one.
+ * Get our transaction ID if there is one, to include in the snapshot.
*/
- topXid = GetTopTransactionId();
+ topXid = GetTopTransactionIdIfAny();
/*
* We cannot export a snapshot from a subtransaction because there's no
*/
nchildren = xactGetCommittedChildren(&children);
+ /*
+ * Generate file path for the snapshot. We start numbering of snapshots
+ * inside the transaction from 1.
+ */
+ snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
+ MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
+
/*
* Copy the snapshot into TopTransactionContext, add it to the
* exportedSnapshots list, and mark it pseudo-registered. We do this to
snapshot = CopySnapshot(snapshot);
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
- exportedSnapshots = lappend(exportedSnapshots, snapshot);
+ esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
+ esnap->snapfile = pstrdup(path);
+ esnap->snapshot = snapshot;
+ exportedSnapshots = lappend(exportedSnapshots, esnap);
MemoryContextSwitchTo(oldcxt);
snapshot->regd_count++;
*/
initStringInfo(&buf);
- appendStringInfo(&buf, "xid:%u\n", topXid);
+ appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
+ appendStringInfo(&buf, "pid:%d\n", MyProcPid);
appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
* xmax. (We need not make the same check for subxip[] members, see
* snapshot.h.)
*/
- addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
+ addTopXid = (TransactionIdIsValid(topXid) &&
+ TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
for (i = 0; i < snapshot->xcnt; i++)
appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
* ensures that no other backend can read an incomplete file
* (ImportSnapshot won't allow it because of its valid-characters check).
*/
- XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
+ snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
ereport(ERROR,
(errcode_for_file_access(),
* Now that we have written everything into a .tmp file, rename the file
* to remove the .tmp suffix.
*/
- XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
-
if (rename(pathtmp, path) < 0)
ereport(ERROR,
(errcode_for_file_access(),
return val;
}
+static void
+parseVxidFromText(const char *prefix, char **s, const char *filename,
+ VirtualTransactionId *vxid)
+{
+ char *ptr = *s;
+ int prefixlen = strlen(prefix);
+
+ if (strncmp(ptr, prefix, prefixlen) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr += prefixlen;
+ if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr = strchr(ptr, '\n');
+ if (!ptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ *s = ptr + 1;
+}
+
/*
* ImportSnapshot
* Import a previously exported snapshot. The argument should be a
char *filebuf;
int xcnt;
int i;
- TransactionId src_xid;
+ VirtualTransactionId src_vxid;
+ int src_pid;
Oid src_dbid;
int src_isolevel;
bool src_readonly;
*/
memset(&snapshot, 0, sizeof(snapshot));
- src_xid = parseXidFromText("xid:", &filebuf, path);
+ parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
+ src_pid = parseIntFromText("pid:", &filebuf, path);
/* we abuse parseXidFromText a bit here ... */
src_dbid = parseXidFromText("dbid:", &filebuf, path);
src_isolevel = parseIntFromText("iso:", &filebuf, path);
* don't trouble to check the array elements, just the most critical
* fields.
*/
- if (!TransactionIdIsNormal(src_xid) ||
+ if (!VirtualTransactionIdIsValid(src_vxid) ||
!OidIsValid(src_dbid) ||
!TransactionIdIsNormal(snapshot.xmin) ||
!TransactionIdIsNormal(snapshot.xmax))
errmsg("cannot import a snapshot from a different database")));
/* OK, install the snapshot */
- SetTransactionSnapshot(&snapshot, src_xid, NULL);
+ SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
}
/*
void
RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
{
- SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+ SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc);
}