dboid = MyDatabaseId;
snprintf(path, MAXPGPATH,
- "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
- dboid, relid,
+ "%s/" LOGICAL_REWRITE_FORMAT,
+ PG_LOGICAL_MAPPINGS_DIR, dboid, relid,
LSN_FORMAT_ARGS(state->rs_begin_lsn),
xid, GetCurrentTransactionId());
xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
snprintf(path, MAXPGPATH,
- "pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
- xlrec->mapped_db, xlrec->mapped_rel,
+ "%s/" LOGICAL_REWRITE_FORMAT,
+ PG_LOGICAL_MAPPINGS_DIR, xlrec->mapped_db, xlrec->mapped_rel,
LSN_FORMAT_ARGS(xlrec->start_lsn),
xlrec->mapped_xid, XLogRecGetXid(r));
XLogRecPtr redo;
DIR *mappings_dir;
struct dirent *mapping_de;
- char path[MAXPGPATH + 20];
+ char path[MAXPGPATH + sizeof(PG_LOGICAL_MAPPINGS_DIR)];
/*
* We start of with a minimum of the last redo pointer. No new decoding
if (cutoff != InvalidXLogRecPtr && redo < cutoff)
cutoff = redo;
- mappings_dir = AllocateDir("pg_logical/mappings");
- while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+ mappings_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
+ while ((mapping_de = ReadDir(mappings_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
{
Oid dboid;
Oid relid;
strcmp(mapping_de->d_name, "..") == 0)
continue;
- snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+ snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_MAPPINGS_DIR, mapping_de->d_name);
de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
FreeDir(mappings_dir);
/* persist directory entries to disk */
- fsync_fname("pg_logical/mappings", true);
+ fsync_fname(PG_LOGICAL_MAPPINGS_DIR, true);
}
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+/* paths for replication origin checkpoint files */
+#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
+#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
+
/*
* Replay progress of a single remote node.
*/
void
CheckPointReplicationOrigin(void)
{
- const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
- const char *path = "pg_logical/replorigin_checkpoint";
+ const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
+ const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
int tmpfd;
int i;
uint32 magic = REPLICATION_STATE_MAGIC;
void
StartupReplicationOrigin(void)
{
- const char *path = "pg_logical/replorigin_checkpoint";
+ const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
int fd;
int readBytes;
uint32 magic = REPLICATION_STATE_MAGIC;
int readBytes;
LogicalRewriteMappingData map;
- sprintf(path, "pg_logical/mappings/%s", fname);
+ sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (fd < 0)
ereport(ERROR,
ListCell *file;
Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
- mapping_dir = AllocateDir("pg_logical/mappings");
- while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
+ mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
+ while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
{
Oid f_dboid;
Oid f_relid;
* unless the user used pg_resetwal or similar. If a user did so, there's
* no hope continuing to decode anyway.
*/
- sprintf(path, "pg_logical/snapshots/%X-%X.snap",
+ sprintf(path, "%s/%X-%X.snap",
+ PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn));
/*
* be safely on disk.
*/
fsync_fname(path, false);
- fsync_fname("pg_logical/snapshots", true);
+ fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
builder->last_serialized_snapshot = lsn;
goto out;
elog(DEBUG1, "serializing snapshot to %s", path);
/* to make sure only we will write to this tempfile, include pid */
- sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
+ sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
+ PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn), MyProcPid);
/*
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m", tmppath)));
- fsync_fname("pg_logical/snapshots", true);
+ fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
/*
* We may overwrite the work from some other backend, but that's ok, our
/* make sure we persist */
fsync_fname(path, false);
- fsync_fname("pg_logical/snapshots", true);
+ fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
/*
* Now there's no way we can lose the dumped state anymore, remember this
if (builder->state == SNAPBUILD_CONSISTENT)
return false;
- sprintf(path, "pg_logical/snapshots/%X-%X.snap",
+ sprintf(path, "%s/%X-%X.snap",
+ PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn));
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
* ----
*/
fsync_fname(path, false);
- fsync_fname("pg_logical/snapshots", true);
+ fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
/* read statically sized portion of snapshot */
XLogRecPtr redo;
DIR *snap_dir;
struct dirent *snap_de;
- char path[MAXPGPATH + 21];
+ char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
/*
* We start off with a minimum of the last redo pointer. No new
if (redo < cutoff)
cutoff = redo;
- snap_dir = AllocateDir("pg_logical/snapshots");
- while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
+ snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
+ while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
{
uint32 hi;
uint32 lo;
strcmp(snap_de->d_name, "..") == 0)
continue;
- snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
+ snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
de_type = get_dirent_type(path, snap_de, false, DEBUG1);
if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
int ret;
struct stat stat_buf;
- sprintf(path, "pg_logical/snapshots/%X-%X.snap",
+ sprintf(path, "%s/%X-%X.snap",
+ PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn));
ret = stat(path, &stat_buf);
}
/*
- * Function to return the list of files in the pg_logical/snapshots directory.
+ * Function to return the list of files in the PG_LOGICAL_SNAPSHOTS_DIR
+ * directory.
*/
Datum
pg_ls_logicalsnapdir(PG_FUNCTION_ARGS)
{
- return pg_ls_dir_files(fcinfo, "pg_logical/snapshots", false);
+ return pg_ls_dir_files(fcinfo, PG_LOGICAL_SNAPSHOTS_DIR, false);
}
/*
- * Function to return the list of files in the pg_logical/mappings directory.
+ * Function to return the list of files in the PG_LOGICAL_MAPPINGS_DIR
+ * directory.
*/
Datum
pg_ls_logicalmapdir(PG_FUNCTION_ARGS)
{
- return pg_ls_dir_files(fcinfo, "pg_logical/mappings", false);
+ return pg_ls_dir_files(fcinfo, PG_LOGICAL_MAPPINGS_DIR, false);
}
/*
#include "utils/snapshot.h"
#include "utils/timestamp.h"
+/* paths for logical decoding data (relative to installation's $PGDATA) */
+#define PG_LOGICAL_DIR "pg_logical"
+#define PG_LOGICAL_MAPPINGS_DIR PG_LOGICAL_DIR "/mappings"
+#define PG_LOGICAL_SNAPSHOTS_DIR PG_LOGICAL_DIR "/snapshots"
+
/* GUC variables */
extern PGDLLIMPORT int logical_decoding_work_mem;
extern PGDLLIMPORT int debug_logical_replication_streaming;