Skip to content

Commit ef73920

Browse files
committed
Fix queues and tests
1 parent 273382f commit ef73920

File tree

3 files changed

+98
-75
lines changed

3 files changed

+98
-75
lines changed

jsonbd.c

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
4444
static shm_toc *toc = NULL;
4545

4646
/* global */
47-
void *workers_data = NULL;
48-
int jsonbd_nworkers = -1;
49-
int jsonbd_queue_size = 0;
47+
void *workers_data = NULL;
48+
int jsonbd_nworkers = -1;
49+
int jsonbd_queue_size = 0;
50+
Size jsonbd_total_queue_size = 0;
5051

5152
static void init_memory_context(bool);
5253
static void memory_reset_callback(void *arg);
@@ -78,11 +79,14 @@ jsonbd_shmem_size(void)
7879
/* two queues for launcher */
7980
shm_toc_estimate_chunk(&e, shm_mq_minimum_size * 2);
8081

82+
/* set total queue size */
83+
jsonbd_total_queue_size = (Size) (shm_mq_minimum_size + jsonbd_queue_size * 1024);
84+
8185
for (i = 0; i < MAX_JSONBD_WORKERS; i++)
8286
{
8387
shm_toc_estimate_chunk(&e, sizeof(jsonbd_shm_worker));
84-
shm_toc_estimate_chunk(&e, jsonbd_get_queue_size());
85-
shm_toc_estimate_chunk(&e, jsonbd_get_queue_size());
88+
shm_toc_estimate_chunk(&e, jsonbd_total_queue_size);
89+
shm_toc_estimate_chunk(&e, jsonbd_total_queue_size);
8690
}
8791

8892
/* 3 keys each worker + 3 for header (header itself and two queues) */
@@ -106,22 +110,17 @@ jsonbd_init_worker(shm_toc *toc, jsonbd_shm_worker *wd, int worker_num,
106110
LWLockPadded *locks;
107111
static int mqkey = MAX_JSONBD_WORKERS + 1;
108112

109-
/* each worker will have two mq, for input and output */
110-
wd->mqin = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
111-
wd->mqout = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
112-
113113
/* init worker context */
114114
wd->proc = NULL;
115115
wd->dboid = InvalidOid;
116116

117-
shm_mq_clean_receiver(wd->mqin);
118-
shm_mq_clean_receiver(wd->mqout);
119-
shm_mq_clean_sender(wd->mqin);
120-
shm_mq_clean_sender(wd->mqout);
121-
122117
if (worker_num)
123118
shm_toc_insert(toc, worker_num, wd);
124119

120+
/* each worker will have two mq, for input and output */
121+
wd->mqin = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
122+
wd->mqout = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
123+
125124
shm_toc_insert(toc, mqkey++, wd->mqin);
126125
shm_toc_insert(toc, mqkey++, wd->mqout);
127126

@@ -294,6 +293,8 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
294293
shm_mq_handle *mqh;
295294
jsonbd_shm_hdr *hdr;
296295
jsonbd_shm_worker *wd = NULL;
296+
shm_mq *mqin,
297+
*mqout;
297298

298299
char *res;
299300
Size reslen;
@@ -355,10 +356,19 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
355356
if (!LWLockAcquireOrWait(hdr->launcher.lock, LW_EXCLUSIVE))
356357
continue;
357358

358-
shm_mq_set_sender(hdr->launcher.mqin, MyProc);
359-
shm_mq_set_receiver(hdr->launcher.mqout, MyProc);
359+
mqin = shm_mq_create(hdr->launcher.mqin, shm_mq_minimum_size);
360+
mqout = shm_mq_create(hdr->launcher.mqout, shm_mq_minimum_size);
361+
362+
/*
363+
* important that sender on mqout should be set earlier than
364+
* receiver on mqin
365+
*/
366+
shm_mq_set_sender(mqout, hdr->launcher.proc);
367+
shm_mq_set_receiver(mqout, MyProc);
368+
shm_mq_set_sender(mqin, MyProc);
369+
shm_mq_set_receiver(mqin, hdr->launcher.proc);
360370

361-
mqh = shm_mq_attach(hdr->launcher.mqin, NULL, NULL);
371+
mqh = shm_mq_attach(mqin, NULL, NULL);
362372
resmq = shm_mq_sendv(mqh,
363373
&((shm_mq_iovec) {(char *) &MyDatabaseId, sizeof(MyDatabaseId)}), 1, false);
364374
if (resmq != SHM_MQ_SUCCESS)
@@ -367,7 +377,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
367377

368378
if (!detached)
369379
{
370-
mqh = shm_mq_attach(hdr->launcher.mqout, NULL, NULL);
380+
mqh = shm_mq_attach(mqout, NULL, NULL);
371381
resmq = shm_mq_receive(mqh, &reslen, (void **) &res, false);
372382
if (resmq != SHM_MQ_SUCCESS)
373383
detached = true;
@@ -377,9 +387,6 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
377387

378388
shm_mq_detach(mqh);
379389
}
380-
381-
shm_mq_clean_sender(hdr->launcher.mqin);
382-
shm_mq_clean_receiver(hdr->launcher.mqout);
383390
LWLockRelease(hdr->launcher.lock);
384391

385392
if (detached)
@@ -395,10 +402,15 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
395402
detached = false;
396403

397404
/* send data */
398-
shm_mq_set_sender(wd->mqin, MyProc);
399-
shm_mq_set_receiver(wd->mqout, MyProc);
405+
mqin = shm_mq_create(wd->mqin, jsonbd_total_queue_size);
406+
mqout = shm_mq_create(wd->mqout, jsonbd_total_queue_size);
400407

401-
mqh = shm_mq_attach(wd->mqin, NULL, NULL);
408+
shm_mq_set_sender(mqin, MyProc);
409+
shm_mq_set_receiver(mqin, wd->proc);
410+
shm_mq_set_sender(mqout, wd->proc);
411+
shm_mq_set_receiver(mqout, MyProc);
412+
413+
mqh = shm_mq_attach(mqin, NULL, NULL);
402414
resmq = shm_mq_sendv(mqh, iov, iov_len, false);
403415
if (resmq != SHM_MQ_SUCCESS)
404416
detached = true;
@@ -407,7 +419,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
407419
/* get data */
408420
if (!detached)
409421
{
410-
mqh = shm_mq_attach(wd->mqout, NULL, NULL);
422+
mqh = shm_mq_attach(mqout, NULL, NULL);
411423
resmq = shm_mq_receive(mqh, &reslen, (void **) &res, false);
412424
if (resmq != SHM_MQ_SUCCESS)
413425
detached = true;
@@ -418,10 +430,6 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
418430
shm_mq_detach(mqh);
419431
}
420432

421-
/* clean self as receiver and unlock mq */
422-
shm_mq_clean_sender(wd->mqin);
423-
shm_mq_clean_receiver(wd->mqout);
424-
425433
LWLockRelease(wd->lock);
426434

427435
if (detached)

jsonbd_worker.c

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,6 @@ init_worker(dsm_segment *seg)
138138
worker_state->proc = MyProc;
139139
worker_state->dboid = worker_args->dboid;
140140

141-
/* input mq */
142-
shm_mq_set_receiver(worker_state->mqin, MyProc);
143-
144-
/* output mq */
145-
shm_mq_set_sender(worker_state->mqout, MyProc);
146-
147141
/* this context will be reset after each task */
148142
Assert(worker_context == NULL);
149143
worker_context = AllocSetContextCreate(TopMemoryContext,
@@ -557,10 +551,9 @@ jsonbd_launcher_main(Datum arg)
557551
toc = shm_toc_attach(JSONBD_SHM_MQ_MAGIC, workers_data);
558552
hdr = shm_toc_lookup(toc, 0, false);
559553
worker_state = &hdr->launcher;
554+
worker_state->proc = MyProc;
560555

561556
InitLatch(&hdr->launcher_latch);
562-
shm_mq_set_receiver(worker_state->mqin, MyProc);
563-
shm_mq_set_sender(worker_state->mqout, MyProc);
564557

565558
elog(LOG, "jsonbd launcher started with pid: %d", MyProcPid);
566559

@@ -572,10 +565,24 @@ jsonbd_launcher_main(Datum arg)
572565

573566
shm_mq_result resmq;
574567

575-
if (!mqh)
576-
mqh = shm_mq_attach(worker_state->mqin, NULL, NULL);
568+
if (shutdown_requested)
569+
break;
570+
571+
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
572+
0, PG_WAIT_EXTENSION);
573+
574+
if (rc & WL_POSTMASTER_DEATH)
575+
break;
576+
577+
ResetLatch(&MyProc->procLatch);
578+
579+
if (shm_mq_get_receiver(worker_state->mqin) != MyProc)
580+
continue;
581+
582+
Assert(shm_mq_get_sender(worker_state->mqout) == MyProc);
577583

578-
resmq = shm_mq_receive(mqh, &nbytes, &data, true);
584+
mqh = shm_mq_attach(worker_state->mqin, NULL, NULL);
585+
resmq = shm_mq_receive(mqh, &nbytes, &data, false);
579586

580587
if (resmq == SHM_MQ_SUCCESS)
581588
{
@@ -602,6 +609,10 @@ jsonbd_launcher_main(Datum arg)
602609
}
603610

604611
shm_mq_detach(mqh);
612+
613+
/* we don't need start this cycle again after we send data */
614+
shm_mq_clean_receiver(worker_state->mqin);
615+
605616
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
606617
if (started)
607618
{
@@ -623,17 +634,6 @@ jsonbd_launcher_main(Datum arg)
623634
/* mark we need new handle */
624635
mqh = NULL;
625636
}
626-
627-
if (shutdown_requested)
628-
break;
629-
630-
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
631-
0, PG_WAIT_EXTENSION);
632-
633-
if (rc & WL_POSTMASTER_DEATH)
634-
break;
635-
636-
ResetLatch(&MyProc->procLatch);
637637
}
638638

639639
elog(LOG, "jsonbd launcher has ended its work");
@@ -669,10 +669,24 @@ jsonbd_worker_main(Datum arg)
669669

670670
shm_mq_result resmq;
671671

672-
if (!mqh)
673-
mqh = shm_mq_attach(worker_state->mqin, NULL, NULL);
672+
if (shutdown_requested)
673+
break;
674+
675+
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
676+
0, PG_WAIT_EXTENSION);
677+
678+
if (rc & WL_POSTMASTER_DEATH)
679+
break;
680+
681+
ResetLatch(&MyProc->procLatch);
682+
683+
if (shm_mq_get_receiver(worker_state->mqin) != MyProc)
684+
continue;
685+
686+
Assert(shm_mq_get_sender(worker_state->mqout) == MyProc);
674687

675-
resmq = shm_mq_receive(mqh, &nbytes, &data, true);
688+
mqh = shm_mq_attach(worker_state->mqin, NULL, NULL);
689+
resmq = shm_mq_receive(mqh, &nbytes, &data, false);
676690

677691
if (resmq == SHM_MQ_SUCCESS)
678692
{
@@ -719,6 +733,7 @@ jsonbd_worker_main(Datum arg)
719733
}
720734

721735
shm_mq_detach(mqh);
736+
shm_mq_clean_receiver(worker_state->mqin);
722737
mqh = shm_mq_attach(worker_state->mqout, NULL, NULL);
723738

724739
if (iov != NULL)
@@ -731,21 +746,7 @@ jsonbd_worker_main(Datum arg)
731746

732747
shm_mq_detach(mqh);
733748
MemoryContextReset(worker_context);
734-
735-
/* mark we need new handle */
736-
mqh = NULL;
737749
}
738-
739-
if (shutdown_requested)
740-
break;
741-
742-
rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH,
743-
0, PG_WAIT_EXTENSION);
744-
745-
if (rc & WL_POSTMASTER_DEATH)
746-
break;
747-
748-
ResetLatch(&MyProc->procLatch);
749750
}
750751

751752
elog(LOG, "jsonbd dictionary worker has ended its work");

tests/jsonbd_test.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,29 @@
5555
) t3;
5656
'''
5757

58-
def generate_dict():
59-
population = 'qwertyuiopsadfghjklzxcvbnm1234567890'
60-
res = {}
6158

62-
for i in range(300):
59+
def generate_keys(count):
60+
population = 'qwertyuiopsadfghjklzxcvbnm1234567890'
61+
result = []
62+
for i in range(count):
6363
keylen = random.randint(1, len(population))
6464
key = ''.join(random.sample(population, keylen))
65-
res[key] = keylen
65+
result.append(key)
66+
67+
return result
68+
69+
70+
def generate_dict(keys):
71+
res = {}
72+
73+
for i in range(100):
74+
res[keys[random.randint(0, len(keys) - 1)]] = random.randint(1, 10000)
6675

6776
return res
6877

6978

79+
KEYS = ['pu4rj8cin2vthkzx3gm79q1ea6wlb5sdfy0', 'en4rl5h01mpwocydx9', 'hui0gen37qv1zf5kjw8lp2d6ramst4bx', 'macvjs35y1xneodi', 'r', 'gev6qfyb57dakwhx803umnczi4pj2lrst', 'pzkd5n4ufcaj', 'wubzi', 'h', 'ca6', 'krypftxe8ovbu3i2dh', '5y70', 'of2zcp8rgq0kmntu9yv314eb6ws7jahl', '2yu1iv645cwhepkmasnzfrl7gjq9x8td', 'ysoikbdwj8l7hrv4ag', 'q4s7xugt9bnzkw125vl60rcojayh38dimepf', 'ijbtvadn9x', '0aonrspwhbdvzg2lq8cuef', 'hf2ybkcvl8eaj9m503o4dtnrguxq6w', 'ayr0', 'r612my98ehwsui7bo30vk4c5djlxtazgn', 'ancd7fe8qh65s', 'ghptl062z5mwr7fqae', 'hgf1j7myqo0vrpbd93se4ztu6i28lnxwack5', 'ibx6cje7rlof0ukyh54apvs', 'w1xhvfu', 'nb8zf601tjmi29q5pyexw7gdlk', 'f6v4xjn9ylr2m', 'uoai017bfth4gxwjsep3y28kz', 'pg', 't', '5w61bms8cjoayr93ixtehg4p7uq0n2vf', 'u53k0nfswaoyjx19vdp8it', 'u4rlpft3qh6gjkacs1e28b5wimvx0yz', '0249or1fze', 'd4uey10zbhtf9jla5gqs', '8k0', '21sjt8ap7u5vxhkyeo0zf4ic9gqrw63d', 'pvih7546ea3cbgxuk', 'uarnfycxib74926jt1lgqhm0kw5esp83v', 'zrwmkgs06yv', 'v5igznkpjle8632cuyfdxq1a9mthwsbo704', '5y3ktwe6hxcl9rfdsn4z7uqjg', 'ly3f6centogzb82us9wp', 'o0gqlpn31sw845i9eukv', '1ho5uk97azcd', '8723gtsz6a9fcbo', 'nfxi8sl20r9kbdz6t35meqoapygucvhj4w17', 'dgutb4fj2ceqsyz750o', 'c65', 'do', 'sohb1gfea6cnxyd92qv78p4w0tkuzm5jlr3', 'iedsfb6mgl85zh32krtx1v94', 'igz27dvfwx45qh', '8qxwg3fcm6dba7rk50ntjohslze', '9mj5a1f8rxpb6s30kdlnw7guitcve', '2zlae93nxf480ykuojc1hw7qirmt6', '4abs05mnug7tpvjrlx3q18fc', '57w0g1tsbrun2hk4', 'z2yh7ojkx9p0', 'w', 'bgp3sefz5vrkot87uqh', 'a3h', '4zg9i3bmeqd52vpft0laruj7hksn', 'tkenuwzqy35hv2dbof4clm', 'fydqn290lwxrpus8ka', 'zmg7lx0df1qewt', 'k2cegpz0dq6r4uiwovhmj', 'tci91qj', 's1ug4t85wca0hnmlpfo', 'bca42i1pu7h3dolvkme9yn8fw05qrgx6', 'kw72sdb', 'jc5sa9zqhmb21v36tdpk0f48', 'w8fvk4751xqdyjp6eolcgsamn9z20rb3tu', '64b8mz9017nq3xyec', '5gju7r9ae30xzh8inp6b1', '02zcf94mojtyxk75bs8a', 'n18com3dzihj6upxkb0wgat2sverlqf59y74', 'qbofcwz15h9e24j6mul3rd', '0topwjfmd3z849kenu2vsxqac1yih5gl7rb6', 'i0319k', '40g83qtbdih69vr1ol', '4lpxsfj1q2eztguok3wa6cr9db', '2vl', '2rq1b90zojetxas6v47lkyn38dwgpc5hi', 'f9dcobjw6xy1', 'dhue8c3t0gi4vnz', '7bkxn9po46', 'gj0q4it6d2s7mkzf5xlo1ha9y8pucr3', 'rucdks4w01', 'uo6txm2gwf4k38ijav9150cdhbrlqzpnesy', '0d4gec8txisa2r5f3mwnjhl', 'p8cqeoi071bt4hyw6fzv', '10g3b', 'zhrolvm5c', '2iroh6pz3l5b8vkfens1caym9qg0x', 'sm5wftozqbkd4py132u', 'cvyfqruz', 'bcmq2nag6hu3j0otyk8iz9evp4fr7s5dw1', 'pnqc7xy06bfiv14zg']
80+
7081
class Tests(unittest.TestCase):
7182
def set_trace(self, con, command="pg_debug"):
7283
pid = con.execute("select pg_backend_pid()")[0][0]
@@ -84,15 +95,18 @@ def test_correctness(self):
8495

8596
data = []
8697
with node.connect('postgres') as con:
87-
for i in range(1000):
88-
d = generate_dict()
98+
for i in range(10000):
99+
d = generate_dict(KEYS)
89100
data.append(d)
90101
con.execute("insert into t1 (a) values ('%s');" % json.dumps(d))
91102

92103
res = con.execute('select pk, a from t1 order by pk')
93104
for pk, val in res:
94105
self.assertEqual(val, data[pk - 1])
95106

107+
data = node.psql('postgres', "select pg_size_pretty(pg_total_relation_size('t1'))")
108+
print("Relation size: ", data[1].decode('utf-8'))
109+
96110

97111
if __name__ == "__main__":
98112
unittest.main()

0 commit comments

Comments
 (0)