#include "utils/datum.h"
#include "utils/dynahash.h"
#include "utils/expandeddatum.h"
+#include "utils/injection_point.h"
#include "utils/logtape.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
perhash->aggnode->numGroups,
memory);
+#ifdef USE_INJECTION_POINTS
+ if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
+ {
+ nbuckets = memory / sizeof(TupleHashEntryData);
+ INJECTION_POINT_CACHED("hash-aggregate-oversize-table");
+ }
+#endif
+
build_hash_table(aggstate, setno, nbuckets);
}
true);
Size hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
true);
+ bool do_spill = false;
+
+#ifdef USE_INJECTION_POINTS
+ if (ngroups >= 1000)
+ {
+ if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
+ {
+ do_spill = true;
+ INJECTION_POINT_CACHED("hash-aggregate-spill-1000");
+ }
+ }
+#endif
/*
* Don't spill unless there's at least one group in the hash table so we
(meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
ngroups > aggstate->hash_ngroups_limit))
{
- hash_agg_enter_spill_mode(aggstate);
+ do_spill = true;
}
+
+ if (do_spill)
+ hash_agg_enter_spill_mode(aggstate);
}
/*
static void
hash_agg_enter_spill_mode(AggState *aggstate)
{
+ INJECTION_POINT("hash-aggregate-enter-spill-mode");
aggstate->hash_spill_mode = true;
hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
*/
hashagg_recompile_expressions(aggstate, true, true);
+ INJECTION_POINT("hash-aggregate-process-batch");
for (;;)
{
TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
used_bits, &partition_bits);
+#ifdef USE_INJECTION_POINTS
+ if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
+ {
+ npartitions = 1;
+ partition_bits = 0;
+ INJECTION_POINT_CACHED("hash-aggregate-single-partition");
+ }
+#endif
+
spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
spill->ntuples = palloc0(sizeof(int64) * npartitions);
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
spill->partitions[i] = LogicalTapeCreate(tapeset);
spill->shift = 32 - used_bits - partition_bits;
- spill->mask = (npartitions - 1) << spill->shift;
+ if (spill->shift < 32)
+ spill->mask = (npartitions - 1) << spill->shift;
+ else
+ spill->mask = 0;
spill->npartitions = npartitions;
for (int i = 0; i < npartitions; i++)
tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
- partition = (hash & spill->mask) >> spill->shift;
+ if (spill->shift < 32)
+ partition = (hash & spill->mask) >> spill->shift;
+ else
+ partition = 0;
+
spill->ntuples[partition]++;
/*
DATA = injection_points--1.0.sql
PGFILEDESC = "injection_points - facility for injection points"
-REGRESS = injection_points reindex_conc
+REGRESS = injection_points hashagg reindex_conc
REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
ISOLATION = basic inplace syscache-update-pruned
--- /dev/null
+-- Test for hash aggregation
+CREATE EXTENSION injection_points;
+SELECT injection_points_set_local();
+ injection_points_set_local
+----------------------------
+
+(1 row)
+
+SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
+ injection_points_attach
+-------------------------
+
+(1 row)
+
+SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
+ injection_points_attach
+-------------------------
+
+(1 row)
+
+-- force partition fan-out to 1
+SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
+ injection_points_attach
+-------------------------
+
+(1 row)
+
+-- force spilling after 1000 groups
+SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
+ injection_points_attach
+-------------------------
+
+(1 row)
+
+CREATE TABLE hashagg_ij(x INTEGER);
+INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
+SET max_parallel_workers=0;
+SET max_parallel_workers_per_gather=0;
+SET enable_sort=FALSE;
+SET work_mem='4MB';
+SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
+NOTICE: notice triggered for injection point hash-aggregate-spill-1000
+NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE: notice triggered for injection point hash-aggregate-single-partition
+NOTICE: notice triggered for injection point hash-aggregate-process-batch
+NOTICE: notice triggered for injection point hash-aggregate-spill-1000
+NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE: notice triggered for injection point hash-aggregate-single-partition
+NOTICE: notice triggered for injection point hash-aggregate-process-batch
+NOTICE: notice triggered for injection point hash-aggregate-spill-1000
+NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE: notice triggered for injection point hash-aggregate-single-partition
+NOTICE: notice triggered for injection point hash-aggregate-process-batch
+NOTICE: notice triggered for injection point hash-aggregate-spill-1000
+NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE: notice triggered for injection point hash-aggregate-single-partition
+NOTICE: notice triggered for injection point hash-aggregate-process-batch
+NOTICE: notice triggered for injection point hash-aggregate-spill-1000
+NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
+NOTICE: notice triggered for injection point hash-aggregate-single-partition
+NOTICE: notice triggered for injection point hash-aggregate-process-batch
+ count
+-------
+ 5100
+(1 row)
+
+DROP TABLE hashagg_ij;
+DROP EXTENSION injection_points;
'regress': {
'sql': [
'injection_points',
+ 'hashagg',
'reindex_conc',
],
'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
--- /dev/null
+-- Test for hash aggregation
+CREATE EXTENSION injection_points;
+
+SELECT injection_points_set_local();
+
+SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
+SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
+
+-- force partition fan-out to 1
+SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
+
+-- force spilling after 1000 groups
+SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
+
+CREATE TABLE hashagg_ij(x INTEGER);
+INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
+
+SET max_parallel_workers=0;
+SET max_parallel_workers_per_gather=0;
+SET enable_sort=FALSE;
+SET work_mem='4MB';
+
+SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
+
+DROP TABLE hashagg_ij;
+DROP EXTENSION injection_points;