Skip to content

Commit 151fe4a

Browse files
committed
Provide cache for PruneShardList
1 parent f510c8f commit 151fe4a

File tree

4 files changed

+98
-8
lines changed

4 files changed

+98
-8
lines changed

contrib/pg_shard/bench/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ void initializeDatabase()
158158
exec(txn, "CREATE EXTENSION pg_shard");
159159
exec(txn, "create table t(u int primary key, v int)");
160160
exec(txn, "SELECT master_create_distributed_table(table_name := 't', partition_column := 'u')");
161-
exec(txn, "SELECT master_create_worker_shards(table_name := 't', shard_count := 9, replication_factor := 1)");
161+
exec(txn, "SELECT master_create_worker_shards(table_name := 't', shard_count := 100, replication_factor := 1)");
162162
for (int i = 0; i < cfg.nAccounts; i++) {
163163
exec(txn, "insert into t values (%d,0)", i);
164164
}

contrib/pg_shard/bench/makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
CXX=g++
2-
CXXFLAGS=-g -Wall -O0 -pthread
2+
CXXFLAGS=-g -Wall -O2 -pthread
33

44
all: dtmbench
55

66
dtmbench: dtmbench.cpp
77
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
88

99
clean:
10-
rm -f dtmbench
10+
rm -f dtmbench

contrib/pg_shard/bench/postgresql.conf.pg_shard

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,4 +624,5 @@ default_text_search_config = 'pg_catalog.english'
624624

625625
# Add settings for extensions here
626626
pg_shard.use_dtm_transactions = 1
627+
pg_shard.all_modifications_commutative = 1
627628

contrib/pg_shard/src/prune_shard_list.c

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "distribution_metadata.h"
1919
#include "prune_shard_list.h"
20+
#include "create_shards.h"
2021

2122
#include <stddef.h>
2223

@@ -69,6 +70,50 @@ static List * BuildRestrictInfoList(List *qualList);
6970
static Node * BuildBaseConstraint(Var *column);
7071
static void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval);
7172

73+
static HTAB* shardPlacementCache;
74+
75+
typedef struct ShardPlacementEntryCacheEntry
76+
{
77+
Oid tableId;
78+
List** placements;
79+
} ShardPlacementCacheEntry;
80+
81+
#define MAX_DISTRIBUTED_TABLES 101
82+
83+
static List*
84+
LookupShardPlacementCache(Oid relationId, int shardHashCode)
85+
{
86+
ShardPlacementCacheEntry* entry = NULL;
87+
88+
if (shardPlacementCache == NULL)
89+
{
90+
HASHCTL info;
91+
int hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
92+
93+
memset(&info, 0, sizeof(info));
94+
info.keysize = sizeof(Oid);
95+
info.entrysize = sizeof(ShardPlacementCacheEntry);
96+
info.hcxt = CacheMemoryContext;
97+
98+
shardPlacementCache = hash_create("pg_shard placement cache", MAX_DISTRIBUTED_TABLES, &info, hashFlags);
99+
}
100+
entry = hash_search(shardPlacementCache, &relationId, HASH_FIND, NULL);
101+
return (entry != NULL) ? entry->placements[shardHashCode] : NULL;
102+
}
103+
104+
static void
105+
AddToShardPlacementCache(Oid relationId, int shardHashCode, int shardCount, List* shardPlacements)
106+
{
107+
MemoryContext oldContext = MemoryContextSwitchTo(CacheMemoryContext);
108+
bool found = false;
109+
ShardPlacementCacheEntry* entry = (ShardPlacementCacheEntry*)hash_search(shardPlacementCache, &relationId, HASH_ENTER, &found);
110+
if (!found)
111+
{
112+
entry->placements = palloc0(shardCount*sizeof(List*));
113+
}
114+
entry->placements[shardHashCode] = list_copy(shardPlacements);
115+
MemoryContextSwitchTo(oldContext);
116+
}
72117

73118
/*
74119
* PruneShardList prunes shards from given list based on the selection criteria,
@@ -81,7 +126,7 @@ PruneShardList(Oid relationId, List *whereClauseList, List *shardIntervalList)
81126
ListCell *shardIntervalCell = NULL;
82127
List *restrictInfoList = NIL;
83128
Node *baseConstraint = NULL;
84-
129+
int shardHashCode = -1;
85130
Var *partitionColumn = PartitionColumn(relationId);
86131
char partitionMethod = PartitionType(relationId);
87132

@@ -97,10 +142,51 @@ PruneShardList(Oid relationId, List *whereClauseList, List *shardIntervalList)
97142

98143
case HASH_PARTITION_TYPE:
99144
{
100-
Node *hashedNode = HashableClauseMutator((Node *) whereClauseList,
101-
partitionColumn);
145+
Node *hashedNode = NULL;
146+
List *hashedClauseList = NULL;
147+
if (whereClauseList && whereClauseList->length == 1)
148+
{
149+
Expr* predicate = (Expr *)lfirst(list_head(whereClauseList));
150+
151+
if (IsA(predicate, OpExpr))
152+
{
153+
OpExpr *operatorExpression = (OpExpr *) predicate;
154+
Oid leftHashFunction = InvalidOid;
155+
Oid rightHashFunction = InvalidOid;
156+
if (get_op_hash_functions(operatorExpression->opno,
157+
&leftHashFunction,
158+
&rightHashFunction)
159+
&& SimpleOpExpression(predicate)
160+
&& OpExpressionContainsColumn(operatorExpression,
161+
partitionColumn))
162+
{
163+
Node *leftOperand = get_leftop(predicate);
164+
Node *rightOperand = get_rightop(predicate);
165+
Const *constant = (Const*)(IsA(rightOperand, Const) ? rightOperand : leftOperand);
166+
TypeCacheEntry *typeEntry = lookup_type_cache(constant->consttype, TYPECACHE_HASH_PROC_FINFO);
167+
FmgrInfo *hashFunction = &(typeEntry->hash_proc_finfo);
168+
if (OidIsValid(hashFunction->fn_oid))
169+
{
170+
int hashedValue = DatumGetInt32(FunctionCall1(hashFunction, constant->constvalue));
171+
int shardCount = shardIntervalList->length;
172+
uint32 hashTokenIncrement = (uint32)(HASH_TOKEN_COUNT / shardCount);
173+
shardHashCode = (int)((uint32)(hashedValue - INT32_MIN) / hashTokenIncrement);
174+
remainingShardList = LookupShardPlacementCache(relationId, shardHashCode);
175+
if (remainingShardList != NULL)
176+
{
177+
return remainingShardList;
178+
}
179+
}
180+
}
181+
}
182+
}
183+
hashedNode = HashableClauseMutator((Node *) whereClauseList,
184+
partitionColumn);
185+
hashedClauseList = (List *) hashedNode;
186+
restrictInfoList = BuildRestrictInfoList(hashedClauseList);
102187

103-
List *hashedClauseList = (List *) hashedNode;
188+
/* override the partition column for hash partitioning */
189+
partitionColumn = MakeInt4Column();
104190
restrictInfoList = BuildRestrictInfoList(hashedClauseList);
105191

106192
/* override the partition column for hash partitioning */
@@ -141,7 +227,10 @@ PruneShardList(Oid relationId, List *whereClauseList, List *shardIntervalList)
141227
remainingShardList = lappend(remainingShardList, &(shardInterval->id));
142228
}
143229
}
144-
230+
if (shardHashCode >= 0)
231+
{
232+
AddToShardPlacementCache(relationId, shardHashCode, shardIntervalList->length, remainingShardList);
233+
}
145234
return remainingShardList;
146235
}
147236

0 commit comments

Comments
 (0)