Fix memory leak in pgoutput with publication list cache
authorMichael Paquier <[email protected]>
Mon, 9 Dec 2024 07:41:46 +0000 (16:41 +0900)
committerMichael Paquier <[email protected]>
Mon, 9 Dec 2024 07:41:46 +0000 (16:41 +0900)
The pgoutput module caches publication names in a list and frees it upon
invalidation.  However, the code forgot to free the actual publication
names within the list elements, as publication names are pstrdup()'d in
GetPublication().  This would cause memory to leak in
CacheMemoryContext, bloating it over time as this context is not
cleaned.

This is a problem for WAL senders running for a long time, as an
accumulation of invalidation requests would bloat its cache memory
usage.  A second case, where this leak is easier to see, involves a
backend calling SQL functions like pg_logical_slot_{get,peek}_changes()
which create a new decoding context with each execution.  More
publications create more bloat.

To address this, this commit adds a new memory context within the
logical decoding context and resets it each time the publication names
cache is invalidated, based on a suggestion from Amit Kapila.  This
ensures that the lifespan of the publication names aligns with that of
the logical decoding context.

This solution changes PGOutputData, which is fine for HEAD but it could
cause an ABI breakage in stable branches as the structure size would
change, so these are left out for now.

Analyzed-by: Michael Paquier, Jeff Davis
Author: Zhijie Hou
Reviewed-by: Michael Paquier, Masahiko Sawada, Euler Taveira
Discussion: https://postgr.es/m/[email protected]

src/backend/replication/pgoutput/pgoutput.c
src/include/replication/pgoutput.h

index 5e23453f071a99b57e7cc22e16efd91e2e9e7ede..b50b3d62e3dbb9b43c6475c7ed492f059d35be0e 100644 (file)
@@ -436,6 +436,10 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                           "logical replication cache context",
                                           ALLOCSET_DEFAULT_SIZES);
 
+   data->pubctx = AllocSetContextCreate(ctx->context,
+                                        "logical replication publication list context",
+                                        ALLOCSET_SMALL_SIZES);
+
    ctx->output_plugin_private = data;
 
    /* This plugin uses binary protocol. */
@@ -1734,9 +1738,9 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 /*
  * Shutdown the output plugin.
  *
- * Note, we don't need to clean the data->context and data->cachectx as
- * they are child contexts of the ctx->context so they will be cleaned up by
- * logical decoding machinery.
+ * Note, we don't need to clean the data->context, data->cachectx, and
+ * data->pubctx as they are child contexts of the ctx->context so they
+ * will be cleaned up by logical decoding machinery.
  */
 static void
 pgoutput_shutdown(LogicalDecodingContext *ctx)
@@ -2063,12 +2067,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
        /* Reload publications if needed before use. */
        if (!publications_valid)
        {
-           oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-           if (data->publications)
-           {
-               list_free_deep(data->publications);
-               data->publications = NIL;
-           }
+           MemoryContextReset(data->pubctx);
+
+           oldctx = MemoryContextSwitchTo(data->pubctx);
            data->publications = LoadPublications(data->publication_names);
            MemoryContextSwitchTo(oldctx);
            publications_valid = true;
index 89f94e114726fbbad02092f21e6ba6222d192f03..ec7f1aa94e84ecf4312d42d72a0b093b90074ede 100644 (file)
@@ -20,6 +20,7 @@ typedef struct PGOutputData
    MemoryContext context;      /* private memory context for transient
                                 * allocations */
    MemoryContext cachectx;     /* private memory context for cache data */
+   MemoryContext pubctx;       /* private memory context for publication data */
 
    bool        in_streaming;   /* true if we are streaming a chunk of
                                 * transaction */