Injection points for hash aggregation.
authorJeff Davis <[email protected]>
Tue, 11 Feb 2025 19:26:25 +0000 (11:26 -0800)
committerJeff Davis <[email protected]>
Tue, 11 Feb 2025 19:26:25 +0000 (11:26 -0800)
Requires adding a guard against shift-by-32. Previously, that was
impossible because the number of partitions was always greater than 1,
but a new injection point can force the number of partitions to 1.

Discussion: https://postgr.es/m/ff4e59305e5d689e03cd256a736348d3e7958f8f[email protected]

src/backend/executor/nodeAgg.c
src/test/modules/injection_points/Makefile
src/test/modules/injection_points/expected/hashagg.out [new file with mode: 0644]
src/test/modules/injection_points/meson.build
src/test/modules/injection_points/sql/hashagg.sql [new file with mode: 0644]

index 3005b5c0e3b89ee38a4ba2288d61c60676232df6..ae9cc256b8cd2e1f037c332c31e4e6d0529b2ff3 100644 (file)
 #include "utils/datum.h"
 #include "utils/dynahash.h"
 #include "utils/expandeddatum.h"
+#include "utils/injection_point.h"
 #include "utils/logtape.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -1489,6 +1490,14 @@ build_hash_tables(AggState *aggstate)
                                           perhash->aggnode->numGroups,
                                           memory);
 
+#ifdef USE_INJECTION_POINTS
+       if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
+       {
+           nbuckets = memory / sizeof(TupleHashEntryData);
+           INJECTION_POINT_CACHED("hash-aggregate-oversize-table");
+       }
+#endif
+
        build_hash_table(aggstate, setno, nbuckets);
    }
 
@@ -1860,6 +1869,18 @@ hash_agg_check_limits(AggState *aggstate)
                                                     true);
    Size        hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
                                                        true);
+   bool        do_spill = false;
+
+#ifdef USE_INJECTION_POINTS
+   if (ngroups >= 1000)
+   {
+       if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
+       {
+           do_spill = true;
+           INJECTION_POINT_CACHED("hash-aggregate-spill-1000");
+       }
+   }
+#endif
 
    /*
     * Don't spill unless there's at least one group in the hash table so we
@@ -1869,8 +1890,11 @@ hash_agg_check_limits(AggState *aggstate)
        (meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
         ngroups > aggstate->hash_ngroups_limit))
    {
-       hash_agg_enter_spill_mode(aggstate);
+       do_spill = true;
    }
+
+   if (do_spill)
+       hash_agg_enter_spill_mode(aggstate);
 }
 
 /*
@@ -1881,6 +1905,7 @@ hash_agg_check_limits(AggState *aggstate)
 static void
 hash_agg_enter_spill_mode(AggState *aggstate)
 {
+   INJECTION_POINT("hash-aggregate-enter-spill-mode");
    aggstate->hash_spill_mode = true;
    hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
 
@@ -2652,6 +2677,7 @@ agg_refill_hash_table(AggState *aggstate)
     */
    hashagg_recompile_expressions(aggstate, true, true);
 
+   INJECTION_POINT("hash-aggregate-process-batch");
    for (;;)
    {
        TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
@@ -2900,6 +2926,15 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
    npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
                                             used_bits, &partition_bits);
 
+#ifdef USE_INJECTION_POINTS
+   if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
+   {
+       npartitions = 1;
+       partition_bits = 0;
+       INJECTION_POINT_CACHED("hash-aggregate-single-partition");
+   }
+#endif
+
    spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
    spill->ntuples = palloc0(sizeof(int64) * npartitions);
    spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
@@ -2908,7 +2943,10 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
        spill->partitions[i] = LogicalTapeCreate(tapeset);
 
    spill->shift = 32 - used_bits - partition_bits;
-   spill->mask = (npartitions - 1) << spill->shift;
+   if (spill->shift < 32)
+       spill->mask = (npartitions - 1) << spill->shift;
+   else
+       spill->mask = 0;
    spill->npartitions = npartitions;
 
    for (int i = 0; i < npartitions; i++)
@@ -2957,7 +2995,11 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 
    tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
 
-   partition = (hash & spill->mask) >> spill->shift;
+   if (spill->shift < 32)
+       partition = (hash & spill->mask) >> spill->shift;
+   else
+       partition = 0;
+
    spill->ntuples[partition]++;
 
    /*
index 4f0161fd33a24692701aa47a20ca4f7d276bbf80..e680991f8d4f0d3e7e62633f62a015abbc67832b 100644 (file)
@@ -11,7 +11,7 @@ EXTENSION = injection_points
 DATA = injection_points--1.0.sql
 PGFILEDESC = "injection_points - facility for injection points"
 
-REGRESS = injection_points reindex_conc
+REGRESS = injection_points hashagg reindex_conc
 REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
 
 ISOLATION = basic inplace syscache-update-pruned
diff --git a/src/test/modules/injection_points/expected/hashagg.out b/src/test/modules/injection_points/expected/hashagg.out
new file mode 100644 (file)
index 0000000..cc4247a
--- /dev/null
@@ -0,0 +1,68 @@
+-- Test for hash aggregation
+CREATE EXTENSION injection_points;
+SELECT injection_points_set_local();
+ injection_points_set_local 
+----------------------------
+(1 row)
+
+SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
+ injection_points_attach 
+-------------------------
+(1 row)
+
+SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
+ injection_points_attach 
+-------------------------
+(1 row)
+
+-- force partition fan-out to 1
+SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
+ injection_points_attach 
+-------------------------
+(1 row)
+
+-- force spilling after 1000 groups
+SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
+ injection_points_attach 
+-------------------------
+(1 row)
+
+CREATE TABLE hashagg_ij(x INTEGER);
+INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
+SET max_parallel_workers=0;
+SET max_parallel_workers_per_gather=0;
+SET enable_sort=FALSE;
+SET work_mem='4MB';
+SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+NOTICE:  notice triggered for injection point hash-aggregate-spill-1000
+NOTICE:  notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE:  notice triggered for injection point hash-aggregate-single-partition
+NOTICE:  notice triggered for injection point hash-aggregate-process-batch
+ count 
+-------
+  5100
+(1 row)
+
+DROP TABLE hashagg_ij;
+DROP EXTENSION injection_points;
index 259045e5c2d4ff4d046d5e1dc1a16d06aea6e62c..d61149712fd7d28fea2002940e3a7d09d74cb8fa 100644 (file)
@@ -35,6 +35,7 @@ tests += {
   'regress': {
     'sql': [
       'injection_points',
+      'hashagg',
       'reindex_conc',
     ],
     'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
diff --git a/src/test/modules/injection_points/sql/hashagg.sql b/src/test/modules/injection_points/sql/hashagg.sql
new file mode 100644 (file)
index 0000000..51d8146
--- /dev/null
@@ -0,0 +1,26 @@
+-- Test for hash aggregation
+CREATE EXTENSION injection_points;
+
+SELECT injection_points_set_local();
+
+SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
+SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
+
+-- force partition fan-out to 1
+SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
+
+-- force spilling after 1000 groups
+SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
+
+CREATE TABLE hashagg_ij(x INTEGER);
+INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
+
+SET max_parallel_workers=0;
+SET max_parallel_workers_per_gather=0;
+SET enable_sort=FALSE;
+SET work_mem='4MB';
+
+SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
+
+DROP TABLE hashagg_ij;
+DROP EXTENSION injection_points;