*/
void
InvalidateSystemCaches(void)
+{
+ InvalidateSystemCachesExtended(false);
+}
+
+void
+InvalidateSystemCachesExtended(bool debug_discard)
{
int i;
InvalidateCatalogSnapshot();
ResetCatalogCaches();
- RelationCacheInvalidate(); /* gets smgr and relmap too */
+ RelationCacheInvalidate(debug_discard); /* gets smgr and relmap too */
for (i = 0; i < syscache_callback_count; i++)
{
if (recursion_depth < 3)
{
recursion_depth++;
- InvalidateSystemCaches();
+ InvalidateSystemCachesExtended(true);
recursion_depth--;
}
}
*/
static long relcacheInvalsReceived = 0L;
+/*
+ * in_progress_list is a stack of ongoing RelationBuildDesc() calls. CREATE
+ * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock.
+ * It critically relies on each backend absorbing those changes no later than
+ * next transaction start. Hence, RelationBuildDesc() loops until it finishes
+ * without accepting a relevant invalidation. (Most invalidation consumers
+ * don't do this.)
+ */
+typedef struct inprogressent
+{
+ Oid reloid; /* OID of relation being built */
+ bool invalidated; /* whether an invalidation arrived for it */
+} InProgressEnt;
+
+static InProgressEnt *in_progress_list;
+static int in_progress_list_len;
+static int in_progress_list_maxlen;
+
/*
* eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
* cleanup work. This list intentionally has limited size; if it overflows,
static Relation
RelationBuildDesc(Oid targetRelId, bool insertIt)
{
+ int in_progress_offset;
Relation relation;
Oid relid;
HeapTuple pg_class_tuple;
Form_pg_class relp;
+ /* Register to catch invalidation messages */
+ if (in_progress_list_len >= in_progress_list_maxlen)
+ {
+ int allocsize;
+
+ allocsize = in_progress_list_maxlen * 2;
+ in_progress_list = repalloc(in_progress_list,
+ allocsize * sizeof(*in_progress_list));
+ in_progress_list_maxlen = allocsize;
+ }
+ in_progress_offset = in_progress_list_len++;
+ in_progress_list[in_progress_offset].reloid = targetRelId;
+retry:
+ in_progress_list[in_progress_offset].invalidated = false;
+
/*
* find the tuple in pg_class corresponding to the given relation id
*/
* if no such tuple exists, return NULL
*/
if (!HeapTupleIsValid(pg_class_tuple))
+ {
+ Assert(in_progress_offset + 1 == in_progress_list_len);
+ in_progress_list_len--;
return NULL;
+ }
/*
* get information from the pg_class_tuple
*/
heap_freetuple(pg_class_tuple);
+ /*
+ * If an invalidation arrived mid-build, start over. Between here and the
+ * end of this function, don't add code that does or reasonably could read
+ * system catalogs. That range must be free from invalidation processing
+ * for the !insertIt case. For the insertIt case, RelationCacheInsert()
+ * will enroll this relation in ordinary relcache invalidation processing,
+ */
+ if (in_progress_list[in_progress_offset].invalidated)
+ {
+ RelationDestroyRelation(relation, false);
+ goto retry;
+ }
+ Assert(in_progress_offset + 1 == in_progress_list_len);
+ in_progress_list_len--;
+
/*
* Insert newly created relation into relcache hash table, if requested.
*
/* Build temporary entry, but don't link it into hashtable */
newrel = RelationBuildDesc(save_relid, false);
+
+ /*
+ * Between here and the end of the swap, don't add code that does or
+ * reasonably could read system catalogs. That range must be free
+ * from invalidation processing. See RelationBuildDesc() manipulation
+ * of in_progress_list.
+ */
+
if (newrel == NULL)
{
/*
relcacheInvalsReceived++;
RelationFlushRelation(relation);
}
+ else
+ {
+ int i;
+
+ for (i = 0; i < in_progress_list_len; i++)
+ if (in_progress_list[i].reloid == relationId)
+ in_progress_list[i].invalidated = true;
+ }
}
/*
* second pass processes nailed-in-cache items before other nondeletable
* items. This should ensure that system catalogs are up to date before
* we attempt to use them to reload information about other open relations.
+ *
+ * After those two phases of work having immediate effects, we normally
+ * signal any RelationBuildDesc() on the stack to start over. However, we
+ * don't do this if called as part of debug_discard_caches. Otherwise,
+ * RelationBuildDesc() would become an infinite loop.
*/
void
-RelationCacheInvalidate(void)
+RelationCacheInvalidate(bool debug_discard)
{
HASH_SEQ_STATUS status;
RelIdCacheEnt *idhentry;
List *rebuildFirstList = NIL;
List *rebuildList = NIL;
ListCell *l;
+ int i;
/*
* Reload relation mapping data before starting to reconstruct cache.
RelationClearRelation(relation, true);
}
list_free(rebuildList);
+
+ if (!debug_discard)
+ /* Any RelationBuildDesc() on the stack must start over. */
+ for (i = 0; i < in_progress_list_len; i++)
+ in_progress_list[i].invalidated = true;
}
/*
RelIdCacheEnt *idhentry;
int i;
+ /*
+ * Forget in_progress_list. This is relevant when we're aborting due to
+ * an error during RelationBuildDesc().
+ */
+ Assert(in_progress_list_len == 0 || !isCommit);
+ in_progress_list_len = 0;
+
/*
* Unless the eoxact_list[] overflowed, we only need to examine the rels
* listed in it. Otherwise fall back on a hash_seq_search scan.
RelIdCacheEnt *idhentry;
int i;
+ /*
+ * Forget in_progress_list. This is relevant when we're aborting due to
+ * an error during RelationBuildDesc(). We don't commit subtransactions
+ * during RelationBuildDesc().
+ */
+ Assert(in_progress_list_len == 0 || !isCommit);
+ in_progress_list_len = 0;
+
/*
* Unless the eoxact_list[] overflowed, we only need to examine the rels
* listed in it. Otherwise fall back on a hash_seq_search scan. Same
RelationCacheInitialize(void)
{
HASHCTL ctl;
+ int allocsize;
/*
* make sure cache memory context exists
RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
&ctl, HASH_ELEM | HASH_BLOBS);
+ /*
+ * reserve enough in_progress_list slots for many cases
+ */
+ allocsize = 4;
+ in_progress_list =
+ MemoryContextAlloc(CacheMemoryContext,
+ allocsize * sizeof(*in_progress_list));
+ in_progress_list_maxlen = allocsize;
+
/*
* relation mapper needs to be initialized too
*/
--- /dev/null
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test CREATE INDEX CONCURRENTLY with concurrent modifications
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+
+use Test::More tests => 4;
+
+my ($node, $result);
+
+#
+# Test set-up
+#
+$node = get_new_node('CIC_test');
+$node->init;
+$node->append_conf('postgresql.conf', 'lock_timeout = 180000');
+$node->start;
+$node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
+$node->safe_psql('postgres', q(CREATE INDEX idx ON tbl(i)));
+$node->safe_psql(
+ 'postgres', q(
+ CREATE FUNCTION heapallindexed() RETURNS void AS $$
+ DECLARE
+ count_seqscan int;
+ count_idxscan int;
+ BEGIN
+ count_seqscan := (SELECT count(*) FROM tbl);
+ SET enable_seqscan = off;
+ count_idxscan := (SELECT count(*) FROM tbl);
+ RESET enable_seqscan;
+ IF count_seqscan <> count_idxscan THEN
+ RAISE 'seqscan found % rows, but idxscan found % rows',
+ count_seqscan, count_idxscan;
+ END IF;
+ END
+ $$ LANGUAGE plpgsql;
+));
+
+#
+# Stress CIC with pgbench
+#
+
+# Run background pgbench with CIC. We cannot mix-in this script into single
+# pgbench: CIC will deadlock with itself occasionally.
+my $pgbench_out = '';
+my $pgbench_timer = IPC::Run::timeout(180);
+my $pgbench_h = $node->background_pgbench(
+ '--no-vacuum --client=1 --transactions=200',
+ {
+ '002_pgbench_concurrent_cic' => q(
+ DROP INDEX CONCURRENTLY idx;
+ CREATE INDEX CONCURRENTLY idx ON tbl(i);
+ BEGIN ISOLATION LEVEL REPEATABLE READ;
+ SELECT heapallindexed();
+ ROLLBACK;
+ )
+ },
+ \$pgbench_out,
+ $pgbench_timer);
+
+# Run pgbench.
+$node->pgbench(
+ '--no-vacuum --client=5 --transactions=200',
+ 0,
+ [qr{actually processed}],
+ [qr{^$}],
+ 'concurrent INSERTs',
+ {
+ '002_pgbench_concurrent_transaction' => q(
+ BEGIN;
+ INSERT INTO tbl VALUES(0);
+ COMMIT;
+ ),
+ '002_pgbench_concurrent_transaction_savepoints' => q(
+ BEGIN;
+ SAVEPOINT s1;
+ INSERT INTO tbl VALUES(0);
+ COMMIT;
+ )
+ });
+
+$pgbench_h->pump_nb;
+$pgbench_h->finish();
+unlike($pgbench_out, qr/aborted in/, "pgbench with CIC works");
+
+# done
+$node->stop;
+done_testing();
extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
extern void InvalidateSystemCaches(void);
+extern void InvalidateSystemCachesExtended(bool debug_discard);
#endif /* INVAL_H */
extern void RelationCacheInvalidateEntry(Oid relationId);
-extern void RelationCacheInvalidate(void);
+extern void RelationCacheInvalidate(bool debug_discard);
extern void RelationCloseSmgrByOid(Oid relationId);
}
}
+# Common sub of pgbench-invoking interfaces. Makes any requested script files
+# and returns pgbench command-line options causing use of those files.
+sub _pgbench_make_files
+{
+ my ($self, $files) = @_;
+ my @file_opts;
+
+ if (defined $files)
+ {
+
+ # note: files are ordered for determinism
+ for my $fn (sort keys %$files)
+ {
+ my $filename = $self->basedir . '/' . $fn;
+ push @file_opts, '-f', $filename;
+
+ # cleanup file weight
+ $filename =~ s/\@\d+$//;
+
+ #push @filenames, $filename;
+ # filenames are expected to be unique on a test
+ if (-e $filename)
+ {
+ ok(0, "$filename must not already exist");
+ unlink $filename or die "cannot unlink $filename: $!";
+ }
+ TestLib::append_to_file($filename, $$files{$fn});
+ }
+ }
+
+ return @file_opts;
+}
+
+=pod
+
+=item $node->pgbench($opts, $stat, $out, $err, $name, $files, @args)
+
+Invoke B<pgbench>, with parameters and files.
+
+=over
+
+=item $opts
+
+Options as a string to be split on spaces.
+
+=item $stat
+
+Expected exit status.
+
+=item $out
+
+Reference to a regexp list that must match stdout.
+
+=item $err
+
+Reference to a regexp list that must match stderr.
+
+=item $name
+
+Name of test for error messages.
+
+=item $files
+
+Reference to filename/contents dictionary.
+
+=item @args
+
+Further raw options or arguments.
+
+=back
+
+=cut
+
+sub pgbench
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my ($self, $opts, $stat, $out, $err, $name, $files, @args) = @_;
+ my @cmd = (
+ 'pgbench',
+ split(/\s+/, $opts),
+ $self->_pgbench_make_files($files), @args);
+
+ $self->command_checks_all(\@cmd, $stat, $out, $err, $name);
+}
+
+=pod
+
+=item $node->background_pgbench($opts, $files, \$stdout, $timer) => harness
+
+Invoke B<pgbench> and return an IPC::Run harness object. The process's stdin
+is empty, and its stdout and stderr go to the $stdout scalar reference. This
+allows the caller to act on other parts of the system while B<pgbench> is
+running. Errors from B<pgbench> are the caller's problem.
+
+The specified timer object is attached to the harness, as well. It's caller's
+responsibility to select the timeout length, and to restart the timer after
+each command if the timeout is per-command.
+
+Be sure to "finish" the harness when done with it.
+
+=over
+
+=item $opts
+
+Options as a string to be split on spaces.
+
+=item $files
+
+Reference to filename/contents dictionary.
+
+=back
+
+=cut
+
+sub background_pgbench
+{
+ my ($self, $opts, $files, $stdout, $timer) = @_;
+
+ my @cmd =
+ ('pgbench', split(/\s+/, $opts), $self->_pgbench_make_files($files));
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ my $stdin = "";
+ # IPC::Run would otherwise append to existing contents:
+ $$stdout = "" if ref($stdout);
+
+ my $harness = IPC::Run::start \@cmd, '<', \$stdin, '>', $stdout, '2>&1',
+ $timer;
+
+ return $harness;
+}
+
=pod
=item $node->poll_query_until(dbname, query)
=pod
+=item $node->command_checks_all(...)
+
+TestLib::command_checks_all with our connection parameters. See
+command_ok(...)
+
+=cut
+
+sub command_checks_all
+{
+ my $self = shift;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ TestLib::command_checks_all(@_);
+ return;
+}
+
+=pod
+
=item $node->issues_sql_like(cmd, expected_sql, test_name)
Run a command on the node, then verify that $expected_sql appears in the
like($stdout, $expected_stdout, "$test_name: matches");
}
+# Run a command and check its status and outputs.
+# The 5 arguments are:
+# - cmd: ref to list for command, options and arguments to run
+# - ret: expected exit status
+# - out: ref to list of re to be checked against stdout (all must match)
+# - err: ref to list of re to be checked against stderr (all must match)
+# - test_name: name of test
+sub command_checks_all
+{
+ my ($cmd, $expected_ret, $out, $err, $test_name) = @_;
+
+ # run command
+ my ($stdout, $stderr);
+ print("# Running: " . join(" ", @{$cmd}) . "\n");
+ IPC::Run::run($cmd, '>', \$stdout, '2>', \$stderr);
+
+ # See http://perldoc.perl.org/perlvar.html#%24CHILD_ERROR
+ my $ret = $?;
+ die "command exited with signal " . ($ret & 127)
+ if $ret & 127;
+ $ret = $ret >> 8;
+
+ foreach ($stderr, $stdout) { s/\r\n/\n/g if $Config{osname} eq 'msys'; }
+
+ # check status
+ ok($ret == $expected_ret,
+ "$test_name status (got $ret vs expected $expected_ret)");
+
+ # check stdout
+ for my $re (@$out)
+ {
+ like($stdout, $re, "$test_name stdout /$re/");
+ }
+
+ # check stderr
+ for my $re (@$err)
+ {
+ like($stderr, $re, "$test_name stderr /$re/");
+ }
+
+ return;
+}
+
1;
ImportForeignSchemaType
ImportForeignSchema_function
ImportQual
+InProgressEnt
InclusionOpaque
IncrementVarSublevelsUp_context
Index