Fix initialization of RelationSyncEntry for streaming transactions.
authorAmit Kapila <[email protected]>
Wed, 16 Sep 2020 02:15:44 +0000 (07:45 +0530)
committerAmit Kapila <[email protected]>
Wed, 16 Sep 2020 02:15:44 +0000 (07:45 +0530)
In commit 464824323e, for each RelationSyncEntry we maintained the list
of xids (streamed_txns) for which we have already sent the schema. This
helps us to track when to send the schema to the downstream node for
replication of streaming transactions. Before this list got initialized,
we were processing invalidation messages which access this list and led
to an assertion failure.

In passing, clean up the nearby code:

* Initialize the list of xids with NIL instead of NULL which is our usual
coding practice.
* Remove the MemoryContext switch for creating a RelationSyncEntry in dynahash.

Diagnosed-by: Amit Kapila and Tom Lane
Author: Amit Kapila
Reviewed-by: Tom Lane and Dilip Kumar
Discussion: https://postgr.es/m/904373.1600033123@sss.pgh.pa.us

src/backend/replication/pgoutput/pgoutput.c

index 343f03129fe7a4ba8889de80b1a0ef8a3378a5e3..eb1f23004e773a74753aba6e6f3b689a0561b92a 100644 (file)
@@ -945,16 +945,26 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
    Assert(RelationSyncCache != NULL);
 
-   /* Find cached function info, creating if not found */
-   oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+   /* Find cached relation info, creating if not found */
    entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
                                              (void *) &relid,
                                              HASH_ENTER, &found);
-   MemoryContextSwitchTo(oldctx);
    Assert(entry != NULL);
 
    /* Not found means schema wasn't sent */
-   if (!found || !entry->replicate_valid)
+   if (!found)
+   {
+       /* immediately make a new entry valid enough to satisfy callbacks */
+       entry->schema_sent = false;
+       entry->streamed_txns = NIL;
+       entry->replicate_valid = false;
+       entry->pubactions.pubinsert = entry->pubactions.pubupdate =
+           entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+       entry->publish_as_relid = InvalidOid;
+   }
+
+   /* Validate the entry */
+   if (!entry->replicate_valid)
    {
        List       *pubids = GetRelationPublications(relid);
        ListCell   *lc;
@@ -977,9 +987,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
         * relcache considers all publications given relation is in, but here
         * we only need to consider ones that the subscriber requested.
         */
-       entry->pubactions.pubinsert = entry->pubactions.pubupdate =
-           entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
-
        foreach(lc, data->publications)
        {
            Publication *pub = lfirst(lc);
@@ -1054,12 +1061,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
        entry->replicate_valid = true;
    }
 
-   if (!found)
-   {
-       entry->schema_sent = false;
-       entry->streamed_txns = NULL;
-   }
-
    return entry;
 }
 
@@ -1145,7 +1146,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
    {
        entry->schema_sent = false;
        list_free(entry->streamed_txns);
-       entry->streamed_txns = NULL;
+       entry->streamed_txns = NIL;
    }
 }