Skip to content

Commit 1e66876

Browse files
committed
Continue work on bulk inserting
1 parent d9c2e1a commit 1e66876

File tree

6 files changed

+253
-58
lines changed

6 files changed

+253
-58
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# contrib/jsonbc/Makefile
22

33
MODULE_big = jsonbc
4-
OBJS= jsonbc.o jsonbc_worker.o $(WIN32RES)
4+
OBJS= jsonbc.o jsonbc_worker.o jsonbc_utils.o $(WIN32RES)
55

66
EXTENSION = jsonbc
77
DATA = jsonbc--0.1.sql

jsonbc.c

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,6 @@ typedef struct
3636
MemoryContext item_mcxt;
3737
} CompressionThroughBuffers;
3838

39-
#if PG_VERSION_NUM == 110000
40-
struct shm_mq_alt
41-
{
42-
slock_t mq_mutex;
43-
PGPROC *mq_receiver; /* this one */
44-
PGPROC *mq_sender; /* this one */
45-
uint64 mq_bytes_read;
46-
uint64 mq_bytes_written;
47-
Size mq_ring_size;
48-
bool mq_detached; /* and this one */
49-
50-
/* in postgres version there are more attributes, but we don't need them */
51-
};
52-
#else
53-
#error "shm_mq struct in jsonbc is copied from PostgreSQL 11, please correct it according to your version"
54-
#endif
55-
5639
/* local */
5740
static MemoryContext compression_mcxt = NULL;
5841
static CompressionThroughBuffers *compression_buffers = NULL;
@@ -68,44 +51,18 @@ int jsonbc_queue_size = 0;
6851
static void init_memory_context(bool);
6952
static void memory_reset_callback(void *arg);
7053
static void encode_varbyte(uint32 val, unsigned char *ptr, int *len);
71-
static uint32 decode_varbyte(unsigned char *ptr);
7254
static char *packJsonbValue(JsonbValue *val, int header_size, int *len);
7355
static void setup_guc_variables(void);
7456
static char *jsonbc_get_keys(Oid cmoptoid, uint32 *ids, int nkeys, size_t *buflen);
7557
static void jsonbc_get_key_ids(Oid cmoptoid, char *buf, int buflen, uint32 *idsbuf, int nkeys);
58+
static uint32 decode_varbyte(unsigned char *ptr);
7659

7760
static size_t
7861
jsonbc_get_queue_size(void)
7962
{
8063
return (Size) (jsonbc_queue_size * 1024);
8164
}
8265

83-
void
84-
shm_mq_clean_sender(shm_mq *mq)
85-
{
86-
struct shm_mq_alt *amq = (struct shm_mq_alt *) mq;
87-
88-
/* check that attributes are same and our struct still compatible with global shm_mq */
89-
Assert(shm_mq_get_sender(mq) == amq->mq_sender);
90-
Assert(shm_mq_get_receiver(mq) == amq->mq_receiver);
91-
92-
amq->mq_sender = NULL;
93-
amq->mq_detached = false;
94-
}
95-
96-
void
97-
shm_mq_clean_receiver(shm_mq *mq)
98-
{
99-
struct shm_mq_alt *amq = (struct shm_mq_alt *) mq;
100-
101-
/* check that attributes are same and our struct still compatible with global shm_mq */
102-
Assert(shm_mq_get_sender(mq) == amq->mq_sender);
103-
Assert(shm_mq_get_receiver(mq) == amq->mq_receiver);
104-
105-
amq->mq_receiver = NULL;
106-
amq->mq_detached = false;
107-
}
108-
10966
static size_t
11067
jsonbc_shmem_size(void)
11168
{
@@ -131,7 +88,6 @@ jsonbc_shmem_size(void)
13188
static void
13289
jsonbc_shmem_startup_hook(void)
13390
{
134-
int mqkey;
13591
bool found;
13692
Size size = jsonbc_shmem_size();
13793
jsonbc_shm_hdr *hdr;
@@ -146,6 +102,7 @@ jsonbc_shmem_startup_hook(void)
146102
if (!found)
147103
{
148104
int i;
105+
int mqkey;
149106

150107
toc = shm_toc_create(JSONBC_SHM_MQ_MAGIC, workers_data, size);
151108
hdr = shm_toc_allocate(toc, sizeof(jsonbc_shm_hdr));

jsonbc.h

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,31 @@ typedef struct jsonbc_shm_worker
3030
PGPROC *proc;
3131
} jsonbc_shm_worker;
3232

33+
typedef struct jsonbc_pair
34+
{
35+
int32 id;
36+
char *key;
37+
} jsonbc_pair;
38+
3339
typedef struct jsonbc_cached_cmopt
3440
{
35-
HTAB *keys;
36-
} cmopt_cached;
41+
Oid cmoptoid;
42+
HTAB *key_cache;
43+
HTAB *id_cache;
44+
} jsonbc_cached_opt;
3745

3846
typedef struct jsonbc_cached_key
3947
{
40-
HTAB *
41-
}
48+
uint32 keyhash;
49+
uint32 pairslen;
50+
jsonbc_pair **pairs; /* for hash collisions */
51+
} jsonbc_cached_key;
52+
53+
typedef struct jsonbc_cached_id
54+
{
55+
uint32 id;
56+
jsonbc_pair *pair;
57+
} jsonbc_cached_id;
4258

4359
extern void _PG_init(void);
4460
extern void jsonbc_register_worker(int n);
@@ -48,7 +64,4 @@ extern int jsonbc_nworkers;
4864
extern int jsonbc_cache_size;
4965
extern int jsonbc_queue_size;
5066

51-
extern void shm_mq_clean_receiver(shm_mq *mq);
52-
extern void shm_mq_clean_sender(shm_mq *mq);
53-
5467
#endif

jsonbc_utils.c

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
#include "jsonbc.h"
2+
#include "jsonbc_utils.h"
3+
4+
#include "postgres.h"
5+
#include "nodes/execnodes.h"
6+
7+
#if PG_VERSION_NUM == 110000
8+
struct shm_mq_alt
9+
{
10+
slock_t mq_mutex;
11+
PGPROC *mq_receiver; /* this one */
12+
PGPROC *mq_sender; /* this one */
13+
uint64 mq_bytes_read;
14+
uint64 mq_bytes_written;
15+
Size mq_ring_size;
16+
bool mq_detached; /* and this one */
17+
18+
/* in postgres version there are more attributes, but we don't need them */
19+
};
20+
#else
21+
#error "shm_mq struct in jsonbc is copied from PostgreSQL 11, please correct it according to your version"
22+
#endif
23+
24+
RangeTblEntry *
25+
add_range_table_to_estate(EState *estate, Relation rel)
26+
{
27+
RangeTblEntry *rte = makeNode(RangeTblEntry);
28+
char *refname = RelationGetRelationName(rel);
29+
30+
Assert(pstate != NULL);
31+
32+
rte->rtekind = RTE_RELATION;
33+
rte->alias = NULL;
34+
rte->relid = RelationGetRelid(rel);
35+
rte->relkind = rel->rd_rel->relkind;
36+
37+
rte->eref = makeAlias(refname, NIL);
38+
39+
rte->lateral = false;
40+
rte->inh = false;
41+
rte->inFromCl = false;
42+
43+
rte->requiredPerms = ACL_SELECT;
44+
rte->checkAsUser = InvalidOid;
45+
rte->selectedCols = NULL;
46+
rte->insertedCols = NULL;
47+
rte->updatedCols = NULL;
48+
49+
estate->es_range_table = lappend(estate->es_range_table, rte);
50+
51+
return rte;
52+
}
53+
54+
/**
55+
* Get 32-bit Murmur3 hash. Ported from qLibc library.
56+
* Added compability with C99, and postgres code style
57+
*
58+
* @param data source data
59+
* @param nbytes size of data
60+
*
61+
* @return 32-bit unsigned hash value.
62+
*
63+
* @code
64+
* uint32_t hashval = qhashmurmur3_32((void*)"hello", 5);
65+
* @endcode
66+
*
67+
* @code
68+
* MurmurHash3 was created by Austin Appleby in 2008. The initial
69+
* implementation was published in C++ and placed in the public.
70+
* https://sites.google.com/site/murmurhash/
71+
* Seungyoung Kim has ported its implementation into C language
72+
* in 2012 and published it as a part of qLibc component.
73+
* @endcode
74+
*/
75+
uint32 qhashmurmur3_32(const void *data, size_t nbytes)
76+
{
77+
int i;
78+
uint32 k;
79+
const int nblocks;
80+
const uint32 *blocks;
81+
const uint8 *tail;
82+
83+
const uint32 c1 = 0xcc9e2d51;
84+
const uint32 c2 = 0x1b873593;
85+
86+
uint32 h = 0;
87+
88+
Assert(data != NULL && nbytes > 0);
89+
90+
nblocks = nbytes / 4;
91+
blocks = (const uint32 *) (data);
92+
tail = (const uint8 *) (data + (nblocks * 4));
93+
94+
for (i = 0; i < nblocks; i++)
95+
{
96+
k = blocks[i];
97+
98+
k *= c1;
99+
k = (k << 15) | (k >> (32 - 15));
100+
k *= c2;
101+
102+
h ^= k;
103+
h = (h << 13) | (h >> (32 - 13));
104+
h = (h * 5) + 0xe6546b64;
105+
}
106+
107+
k = 0;
108+
switch (nbytes & 3)
109+
{
110+
case 3:
111+
k ^= tail[2] << 16;
112+
case 2:
113+
k ^= tail[1] << 8;
114+
case 1:
115+
k ^= tail[0];
116+
k *= c1;
117+
k = (k << 15) | (k >> (32 - 15));
118+
k *= c2;
119+
h ^= k;
120+
};
121+
122+
h ^= nbytes;
123+
124+
h ^= h >> 16;
125+
h *= 0x85ebca6b;
126+
h ^= h >> 13;
127+
h *= 0xc2b2ae35;
128+
h ^= h >> 16;
129+
130+
return h;
131+
}
132+
133+
void
134+
shm_mq_clean_sender(shm_mq *mq)
135+
{
136+
struct shm_mq_alt *amq = (struct shm_mq_alt *) mq;
137+
138+
/* check that attributes are same and our struct still compatible with global shm_mq */
139+
Assert(shm_mq_get_sender(mq) == amq->mq_sender);
140+
Assert(shm_mq_get_receiver(mq) == amq->mq_receiver);
141+
142+
amq->mq_sender = NULL;
143+
amq->mq_detached = false;
144+
}
145+
146+
void
147+
shm_mq_clean_receiver(shm_mq *mq)
148+
{
149+
struct shm_mq_alt *amq = (struct shm_mq_alt *) mq;
150+
151+
/* check that attributes are same and our struct still compatible with global shm_mq */
152+
Assert(shm_mq_get_sender(mq) == amq->mq_sender);
153+
Assert(shm_mq_get_receiver(mq) == amq->mq_receiver);
154+
155+
amq->mq_receiver = NULL;
156+
amq->mq_detached = false;
157+
}

jsonbc_utils.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#ifndef JSONBC_UTILS_H
2+
#define JSONBC_UTILS_H
3+
4+
#include "postgres.h"
5+
#include "nodes/execnodes.h"
6+
7+
extern RangeTblEntry * add_range_table_to_estate(EState *estate, Relation rel);
8+
extern void shm_mq_clean_receiver(shm_mq *mq);
9+
extern void shm_mq_clean_sender(shm_mq *mq);
10+
11+
#endif

0 commit comments

Comments
 (0)