Skip to content

Commit e0b2eed

Browse files
author
Amit Kapila
committed
Flush logical slots to disk during a shutdown checkpoint if required.
It's entirely possible for a logical slot to have a confirmed_flush LSN higher than the last value saved on disk while not being marked as dirty. Currently, it is not a major problem but a later patch adding support for the upgrade of slots relies on that value being properly flushed to disk. It can also help avoid processing the same transactions again in some boundary cases after the clean shutdown and restart. Say, we process some transactions for which we didn't send anything downstream (the changes got filtered) but the confirm_flush LSN is updated due to keepalives. As we don't flush the latest value of confirm_flush LSN, it may lead to processing the same changes again without this patch. The approach taken by this patch has been suggested by Ashutosh Bapat. Author: Vignesh C, Julien Rouhaud, Kuroda Hayato Reviewed-by: Amit Kapila, Dilip Kumar, Michael Paquier, Ashutosh Bapat, Peter Smith, Hou Zhijie Discussion: http://postgr.es/m/CAA4eK1JzJagMmb_E8D4au=GYQkxox0AfNBm1FbP7sy7t4YWXPQ@mail.gmail.com Discussion: http://postgr.es/m/TYAPR01MB58664C81887B3AF2EB6B16E3F5939@TYAPR01MB5866.jpnprd01.prod.outlook.com
1 parent a2e0d5e commit e0b2eed

File tree

5 files changed

+145
-6
lines changed

5 files changed

+145
-6
lines changed

src/backend/access/transam/xlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7039,7 +7039,7 @@ static void
70397039
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
70407040
{
70417041
CheckPointRelationMap();
7042-
CheckPointReplicationSlots();
7042+
CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
70437043
CheckPointSnapBuild();
70447044
CheckPointLogicalRewriteHeap();
70457045
CheckPointReplicationOrigin();

src/backend/replication/slot.c

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
321321
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
322322
slot->candidate_restart_valid = InvalidXLogRecPtr;
323323
slot->candidate_restart_lsn = InvalidXLogRecPtr;
324+
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
324325

325326
/*
326327
* Create the slot on disk. We haven't actually marked the slot allocated
@@ -1572,11 +1573,13 @@ InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
15721573
/*
15731574
* Flush all replication slots to disk.
15741575
*
1575-
* This needn't actually be part of a checkpoint, but it's a convenient
1576-
* location.
1576+
* It is convenient to flush dirty replication slots at the time of checkpoint.
1577+
* Additionally, in case of a shutdown checkpoint, we also identify the slots
1578+
* for which the confirmed_flush LSN has been updated since the last time it
1579+
* was saved and flush them.
15771580
*/
15781581
void
1579-
CheckPointReplicationSlots(void)
1582+
CheckPointReplicationSlots(bool is_shutdown)
15801583
{
15811584
int i;
15821585

@@ -1601,6 +1604,30 @@ CheckPointReplicationSlots(void)
16011604

16021605
/* save the slot to disk, locking is handled in SaveSlotToPath() */
16031606
sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1607+
1608+
/*
1609+
* Slot's data is not flushed each time the confirmed_flush LSN is
1610+
* updated as that could lead to frequent writes. However, we decide
1611+
* to force a flush of all logical slot's data at the time of shutdown
1612+
* if the confirmed_flush LSN is changed since we last flushed it to
1613+
* disk. This helps in avoiding an unnecessary retreat of the
1614+
* confirmed_flush LSN after restart.
1615+
*/
1616+
if (is_shutdown && SlotIsLogical(s))
1617+
{
1618+
SpinLockAcquire(&s->mutex);
1619+
1620+
Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush);
1621+
1622+
if (s->data.invalidated == RS_INVAL_NONE &&
1623+
s->data.confirmed_flush != s->last_saved_confirmed_flush)
1624+
{
1625+
s->just_dirtied = true;
1626+
s->dirty = true;
1627+
}
1628+
SpinLockRelease(&s->mutex);
1629+
}
1630+
16041631
SaveSlotToPath(s, path, LOG);
16051632
}
16061633
LWLockRelease(ReplicationSlotAllocationLock);
@@ -1873,11 +1900,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
18731900

18741901
/*
18751902
* Successfully wrote, unset dirty bit, unless somebody dirtied again
1876-
* already.
1903+
* already and remember the confirmed_flush LSN value.
18771904
*/
18781905
SpinLockAcquire(&slot->mutex);
18791906
if (!slot->just_dirtied)
18801907
slot->dirty = false;
1908+
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
18811909
SpinLockRelease(&slot->mutex);
18821910

18831911
LWLockRelease(&slot->io_in_progress_lock);
@@ -2074,6 +2102,7 @@ RestoreSlotFromDisk(const char *name)
20742102
/* initialize in memory state */
20752103
slot->effective_xmin = cp.slotdata.xmin;
20762104
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2105+
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
20772106

20782107
slot->candidate_catalog_xmin = InvalidTransactionId;
20792108
slot->candidate_xmin_lsn = InvalidXLogRecPtr;

src/include/replication/slot.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,13 @@ typedef struct ReplicationSlot
178178
XLogRecPtr candidate_xmin_lsn;
179179
XLogRecPtr candidate_restart_valid;
180180
XLogRecPtr candidate_restart_lsn;
181+
182+
/*
183+
* This value tracks the last confirmed_flush LSN flushed which is used
184+
* during a shutdown checkpoint to decide if logical's slot data should be
185+
* forcibly flushed or not.
186+
*/
187+
XLogRecPtr last_saved_confirmed_flush;
181188
} ReplicationSlot;
182189

183190
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
@@ -241,7 +248,7 @@ extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslo
241248
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
242249

243250
extern void StartupReplicationSlots(void);
244-
extern void CheckPointReplicationSlots(void);
251+
extern void CheckPointReplicationSlots(bool is_shutdown);
245252

246253
extern void CheckSlotRequirements(void);
247254
extern void CheckSlotPermissions(void);

src/test/recovery/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ tests += {
4343
't/035_standby_logical_decoding.pl',
4444
't/036_truncated_dropped.pl',
4545
't/037_invalid_database.pl',
46+
't/038_save_logical_slots_shutdown.pl',
4647
],
4748
},
4849
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
2+
# Copyright (c) 2023, PostgreSQL Global Development Group
3+
4+
# Test logical replication slots are always flushed to disk during a shutdown
5+
# checkpoint.
6+
7+
use strict;
8+
use warnings;
9+
10+
use PostgreSQL::Test::Cluster;
11+
use PostgreSQL::Test::Utils;
12+
use Test::More;
13+
14+
sub compare_confirmed_flush
15+
{
16+
my ($node, $confirmed_flush_from_log) = @_;
17+
18+
# Fetch Latest checkpoint location from the control file
19+
my ($stdout, $stderr) =
20+
run_command([ 'pg_controldata', $node->data_dir ]);
21+
my @control_data = split("\n", $stdout);
22+
my $latest_checkpoint = undef;
23+
foreach (@control_data)
24+
{
25+
if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg)
26+
{
27+
$latest_checkpoint = $1;
28+
last;
29+
}
30+
}
31+
die "Latest checkpoint location not found in control file\n"
32+
unless defined($latest_checkpoint);
33+
34+
# Is it same as the value read from log?
35+
ok( $latest_checkpoint eq $confirmed_flush_from_log,
36+
"Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location"
37+
);
38+
39+
return;
40+
}
41+
42+
# Initialize publisher node
43+
my $node_publisher = PostgreSQL::Test::Cluster->new('pub');
44+
$node_publisher->init(allows_streaming => 'logical');
45+
# Avoid checkpoint during the test, otherwise, the latest checkpoint location
46+
# will change.
47+
$node_publisher->append_conf(
48+
'postgresql.conf', q{
49+
checkpoint_timeout = 1h
50+
autovacuum = off
51+
});
52+
$node_publisher->start;
53+
54+
# Create subscriber node
55+
my $node_subscriber = PostgreSQL::Test::Cluster->new('sub');
56+
$node_subscriber->init(allows_streaming => 'logical');
57+
$node_subscriber->start;
58+
59+
# Create tables
60+
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
61+
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
62+
63+
# Insert some data
64+
$node_publisher->safe_psql('postgres',
65+
"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
66+
67+
# Setup logical replication
68+
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
69+
$node_publisher->safe_psql('postgres',
70+
"CREATE PUBLICATION pub FOR ALL TABLES");
71+
$node_subscriber->safe_psql('postgres',
72+
"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
73+
);
74+
75+
$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
76+
77+
my $result =
78+
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
79+
80+
is($result, qq(5), "check initial copy was done");
81+
82+
my $offset = -s $node_publisher->logfile;
83+
84+
# Restart the publisher to ensure that the slot will be flushed if required
85+
$node_publisher->restart();
86+
87+
# Wait until the walsender creates decoding context
88+
$node_publisher->wait_for_log(
89+
qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./,
90+
$offset);
91+
92+
# Extract confirmed_flush from the logfile
93+
my $log_contents = slurp_file($node_publisher->logfile, $offset);
94+
$log_contents =~
95+
qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./
96+
or die "could not get confirmed_flush_lsn";
97+
98+
# Ensure that the slot's confirmed_flush LSN is the same as the
99+
# latest_checkpoint location.
100+
compare_confirmed_flush($node_publisher, $1);
101+
102+
done_testing();

0 commit comments

Comments
 (0)