Skip to content

Commit 6e047e2

Browse files
committed
Support snapshots at replica
1 parent 40785ad commit 6e047e2

File tree

7 files changed

+110
-30
lines changed

7 files changed

+110
-30
lines changed

src/backend/access/rmgrdesc/standbydesc.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ standby_identify(uint8 info)
8484
case XLOG_INVALIDATIONS:
8585
id = "INVALIDATIONS";
8686
break;
87+
case XLOG_MAKE_SNAPSHOT:
88+
id = "MAKE_SNAPSHOT";
89+
break;
90+
case XLOG_REMOVE_SNAPSHOT:
91+
id = "REMOVE_SNAPSHOT";
92+
break;
93+
case XLOG_RECOVER_TO_SNAPSHOT:
94+
id = "RECOVER_TO_SNAPSHOT";
95+
break;
8796
}
8897

8998
return id;

src/backend/storage/file/fd.c

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
#include "storage/fd.h"
9292
#include "storage/ipc.h"
9393
#include "storage/snapfs.h"
94+
#include "storage/standbydefs.h"
9495
#include "utils/guc.h"
9596
#include "utils/inval.h"
9697
#include "utils/resowner_private.h"
@@ -4021,6 +4022,8 @@ sfs_remove_snapshot(SnapshotId snap_id)
40214022

40224023
ControlFile->oldest_snapshot = snap_id + 1;
40234024
UpdateControlFile();
4025+
4026+
sfs_xlog_insert(XLOG_REMOVE_SNAPSHOT, snap_id);
40244027
}
40254028

40264029
void
@@ -4032,9 +4035,7 @@ sfs_recover_to_snapshot(SnapshotId snap_id)
40324035
elog(ERROR, "Can not perform operation inside snapshot");
40334036

40344037
sfs_lock_database();
4035-
4036-
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
4037-
| CHECKPOINT_FLUSH_ALL);
4038+
sfs_checkpoint();
40384039

40394040
sfs_current_snapshot = snap_id;
40404041
walk_data_dir(sfs_recover_snapshot_file, ERROR);
@@ -4046,8 +4047,12 @@ sfs_recover_to_snapshot(SnapshotId snap_id)
40464047
walk_data_dir(sfs_remove_applied_snapshot_file, LOG);
40474048

40484049
DropSharedBuffers();
4049-
InvalidateSystemCaches();
4050-
CacheInvalidateRelcacheAll();
4051-
4050+
if (!InRecovery)
4051+
{
4052+
InvalidateSystemCaches();
4053+
CacheInvalidateRelcacheAll();
4054+
}
40524055
sfs_unlock_database();
4056+
4057+
sfs_xlog_insert(XLOG_RECOVER_TO_SNAPSHOT, snap_id);
40534058
}

src/backend/storage/file/snapfs.c

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "storage/lwlock.h"
3434
#include "storage/procarray.h"
3535
#include "storage/snapfs.h"
36+
#include "storage/standbydefs.h"
3637
#include "storage/bufmgr.h"
3738
#include "utils/inval.h"
3839
#include "utils/fmgrprotos.h"
@@ -163,8 +164,7 @@ sfs_switch_to_snapshot(SnapshotId snap_id)
163164
sfs_lock_database();
164165

165166
if (!SFS_IN_SNAPSHOT())
166-
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
167-
| CHECKPOINT_FLUSH_ALL);
167+
sfs_checkpoint();
168168

169169
ControlFile->active_snapshot = snap_id;
170170
UpdateControlFile();
@@ -197,14 +197,15 @@ sfs_make_snapshot(void)
197197
elog(ERROR, "Can not perform operation inside snapshot");
198198

199199
sfs_lock_database();
200+
sfs_checkpoint();
200201

201-
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
202-
| CHECKPOINT_FLUSH_ALL);
203202
snap_id = ++ControlFile->recent_snapshot;
204203
UpdateControlFile();
205204

206205
sfs_unlock_database();
207206

207+
sfs_xlog_insert(XLOG_MAKE_SNAPSHOT, snap_id);
208+
208209
return snap_id;
209210
}
210211

@@ -214,34 +215,66 @@ sfs_make_snapshot(void)
214215
void
215216
sfs_lock_database(void)
216217
{
217-
bool standalone = false;
218-
TransactionId myXid = GetCurrentTransactionIdIfAny();
219-
/* Prevent assignment Xids to transaction and
220-
* so delay start of any new update transactions
221-
*/
222-
LWLockAcquire(XidGenLock, LW_SHARED);
223-
224-
/* Wait completion of all active tranasction except own */
225-
do
218+
if (!InRecovery)
226219
{
227-
RunningTransactions running = GetRunningTransactionData();
228-
standalone = (TransactionIdIsValid(myXid) && running->xcnt == 1) || (!TransactionIdIsValid(myXid) && running->xcnt == 0);
220+
bool standalone = false;
221+
TransactionId myXid = GetCurrentTransactionIdIfAny();
229222

230-
/* Release locks set by GetRunningTransactionData */
231-
LWLockRelease(ProcArrayLock);
232-
LWLockRelease(XidGenLock);
223+
/* Prevent assignment Xids to transaction and
224+
* so delay start of any new update transactions
225+
*/
226+
LWLockAcquire(XidGenLock, LW_SHARED);
227+
228+
/* Wait completion of all active tranasction except own */
229+
do
230+
{
231+
RunningTransactions running = GetRunningTransactionData();
232+
standalone = (TransactionIdIsValid(myXid) && running->xcnt == 1) || (!TransactionIdIsValid(myXid) && running->xcnt == 0);
233+
234+
/* Release locks set by GetRunningTransactionData */
235+
LWLockRelease(ProcArrayLock);
236+
LWLockRelease(XidGenLock);
233237

234-
/* Wait for one second */
235-
if (!standalone)
236-
pg_usleep(USECS_PER_SEC);
238+
/* Wait for one second */
239+
if (!standalone)
240+
pg_usleep(USECS_PER_SEC);
237241

238-
} while (!standalone);
242+
} while (!standalone);
243+
}
239244
}
240245

241246
void
242247
sfs_unlock_database(void)
243248
{
244-
LWLockRelease(XidGenLock);
249+
if (!InRecovery)
250+
LWLockRelease(XidGenLock);
251+
}
252+
253+
void
254+
sfs_xlog_insert(int op, SnapshotId snapid)
255+
{
256+
if (!InRecovery)
257+
{
258+
xl_snapshot xlrec;
259+
xlrec.snapid = snapid;
260+
XLogBeginInsert();
261+
XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
262+
XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
263+
XLogInsert(RM_STANDBY_ID, op);
264+
}
265+
}
266+
267+
268+
void
269+
sfs_checkpoint(void)
270+
{
271+
int flags = CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL;
272+
/*
273+
if (InRecovery)
274+
CreateRestartPoint(flags);
275+
else
276+
*/
277+
RequestCheckpoint(flags);
245278
}
246279

247280
/*

src/backend/storage/ipc/standby.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "storage/proc.h"
2929
#include "storage/procarray.h"
3030
#include "storage/sinvaladt.h"
31+
#include "storage/snapfs.h"
3132
#include "storage/standby.h"
3233
#include "utils/hsearch.h"
3334
#include "utils/memutils.h"
@@ -827,6 +828,24 @@ standby_redo(XLogReaderState *record)
827828
xlrec->dbId,
828829
xlrec->tsId);
829830
}
831+
else if (info == XLOG_MAKE_SNAPSHOT)
832+
{
833+
xl_snapshot *xlrec = (xl_snapshot *) XLogRecGetData(record);
834+
SnapshotId snapid = sfs_make_snapshot();
835+
if (snapid != xlrec->snapid)
836+
elog(WARNING, "Snapshot identifiers at replica and master do not match: %d vs. %d",
837+
snapid, xlrec->snapid);
838+
}
839+
else if (info == XLOG_RECOVER_TO_SNAPSHOT)
840+
{
841+
xl_snapshot *xlrec = (xl_snapshot *) XLogRecGetData(record);
842+
sfs_recover_to_snapshot(xlrec->snapid);
843+
}
844+
else if (info == XLOG_REMOVE_SNAPSHOT)
845+
{
846+
xl_snapshot *xlrec = (xl_snapshot *) XLogRecGetData(record);
847+
sfs_remove_snapshot(xlrec->snapid);
848+
}
830849
else
831850
elog(PANIC, "standby_redo: unknown op code %u", info);
832851
}

src/include/catalog/pg_control.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ typedef struct CheckPoint
5151
TransactionId oldestCommitTsXid; /* oldest Xid with valid commit
5252
* timestamp */
5353
TransactionId newestCommitTsXid; /* newest Xid with valid commit
54-
* timestamp */
54+
* timestamp */
5555

5656
/*
5757
* Oldest XID still running. This is only needed to initialize hot standby

src/include/storage/snapfs.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ extern void sfs_unlock_database(void);
9090

9191
extern void sfs_check_snapshot(SnapshotId sid);
9292

93+
extern void sfs_xlog_insert(int op, SnapshotId snapid);
94+
extern void sfs_checkpoint(void);
95+
9396
extern struct ControlFileData *ControlFile;
9497

9598
#endif

src/include/storage/standbydefs.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ extern void standby_desc_invalidations(StringInfo buf,
3434
#define XLOG_STANDBY_LOCK 0x00
3535
#define XLOG_RUNNING_XACTS 0x10
3636
#define XLOG_INVALIDATIONS 0x20
37+
#define XLOG_MAKE_SNAPSHOT 0x30
38+
#define XLOG_REMOVE_SNAPSHOT 0x40
39+
#define XLOG_RECOVER_TO_SNAPSHOT 0x50
3740

3841
typedef struct xl_standby_locks
3942
{
@@ -69,6 +72,14 @@ typedef struct xl_invalidations
6972
SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
7073
} xl_invalidations;
7174

75+
/*
76+
* Snapshot operation at standby
77+
*/
78+
typedef struct xl_snapshot
79+
{
80+
int snapid;
81+
} xl_snapshot;
82+
7283
#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
7384

7485
#endif /* STANDBYDEFS_H */

0 commit comments

Comments
 (0)