Allow parallel vacuum to accumulate buffer usage.
authorAmit Kapila <[email protected]>
Thu, 2 Apr 2020 02:34:58 +0000 (08:04 +0530)
committerAmit Kapila <[email protected]>
Thu, 2 Apr 2020 02:34:58 +0000 (08:04 +0530)
Commit 40d964ec99 allowed vacuum command to process indexes in parallel but
forgot to accumulate the buffer usage stats of parallel workers.  This
allows leader backend to accumulate buffer usage stats of all the parallel
workers.

Reported-by: Julien Rouhaud
Author: Sawada Masahiko
Reviewed-by: Dilip Kumar, Amit Kapila and Julien Rouhaud
Discussion: https://postgr.es/m/20200328151721.GB12854@nol

src/backend/access/heap/vacuumlazy.c

index 04b12342b815e1d81fb1574c7bdd0552074d2cf0..9f9596c71840347b7a7dd89fb7d9c74c3322af04 100644 (file)
@@ -65,6 +65,7 @@
 #include "commands/dbcommands.h"
 #include "commands/progress.h"
 #include "commands/vacuum.h"
+#include "executor/instrument.h"
 #include "miscadmin.h"
 #include "optimizer/paths.h"
 #include "pgstat.h"
 #define PARALLEL_VACUUM_KEY_SHARED         1
 #define PARALLEL_VACUUM_KEY_DEAD_TUPLES        2
 #define PARALLEL_VACUUM_KEY_QUERY_TEXT     3
+#define PARALLEL_VACUUM_KEY_BUFFER_USAGE   4
 
 /*
  * Macro to check if we are in a parallel vacuum.  If true, we are in the
@@ -270,6 +272,9 @@ typedef struct LVParallelState
    /* Shared information among parallel vacuum workers */
    LVShared   *lvshared;
 
+   /* Points to buffer usage area in DSM */
+   BufferUsage *buffer_usage;
+
    /*
     * The number of indexes that support parallel index bulk-deletion and
     * parallel index cleanup respectively.
@@ -2137,8 +2142,20 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
    parallel_vacuum_index(Irel, stats, lps->lvshared,
                          vacrelstats->dead_tuples, nindexes, vacrelstats);
 
-   /* Wait for all vacuum workers to finish */
-   WaitForParallelWorkersToFinish(lps->pcxt);
+   /*
+    * Next, accumulate buffer usage.  (This must wait for the workers to
+    * finish, or we might get incomplete data.)
+    */
+   if (nworkers > 0)
+   {
+       int         i;
+
+       /* Wait for all vacuum workers to finish */
+       WaitForParallelWorkersToFinish(lps->pcxt);
+
+       for (i = 0; i < lps->pcxt->nworkers_launched; i++)
+           InstrAccumParallelQuery(&lps->buffer_usage[i]);
+   }
 
    /*
     * Carry the shared balance value to heap scan and disable shared costing
@@ -3153,6 +3170,7 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
    ParallelContext *pcxt;
    LVShared   *shared;
    LVDeadTuples *dead_tuples;
+   BufferUsage *buffer_usage;
    bool       *can_parallel_vacuum;
    long        maxtuples;
    char       *sharedquery;
@@ -3236,6 +3254,17 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
    shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
    shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+   /*
+    * Estimate space for BufferUsage -- PARALLEL_VACUUM_KEY_BUFFER_USAGE.
+    *
+    * If there are no extensions loaded that care, we could skip this.  We
+    * have no way of knowing whether anyone's looking at pgBufferUsage, so do
+    * it unconditionally.
+    */
+   shm_toc_estimate_chunk(&pcxt->estimator,
+                          mul_size(sizeof(BufferUsage), pcxt->nworkers));
+   shm_toc_estimate_keys(&pcxt->estimator, 1);
+
    /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
    querylen = strlen(debug_query_string);
    shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
@@ -3270,6 +3299,12 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
    shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
    vacrelstats->dead_tuples = dead_tuples;
 
+   /* Allocate space for each worker's BufferUsage; no need to initialize */
+   buffer_usage = shm_toc_allocate(pcxt->toc,
+                                   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+   shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
+   lps->buffer_usage = buffer_usage;
+
    /* Store query string for workers */
    sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
    memcpy(sharedquery, debug_query_string, querylen + 1);
@@ -3399,6 +3434,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
    Relation   *indrels;
    LVShared   *lvshared;
    LVDeadTuples *dead_tuples;
+   BufferUsage *buffer_usage;
    int         nindexes;
    char       *sharedquery;
    IndexBulkDeleteResult **stats;
@@ -3468,10 +3504,17 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
    errcallback.previous = error_context_stack;
    error_context_stack = &errcallback;
 
+   /* Prepare to track buffer usage during parallel execution */
+   InstrStartParallelQuery();
+
    /* Process indexes to perform vacuum/cleanup */
    parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes,
                          &vacrelstats);
 
+   /* Report buffer usage during parallel execution */
+   buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
+   InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+
    /* Pop the error context stack */
    error_context_stack = errcallback.previous;