Skip to content

Commit e9e0bdf

Browse files
committed
Add 9.6 pg_shard port integrated with tsdtm
1 parent af5213a commit e9e0bdf

File tree

142 files changed

+47055
-14
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

142 files changed

+47055
-14
lines changed

contrib/multimaster/multimaster.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
8888
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
8989
static void DtmUpdateRecentXmin(Snapshot snapshot);
9090
static void DtmInitialize(void);
91-
static void DtmSubXactCallback(XactEvent event, void *arg);
91+
static void DtmSubXactCallback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg);
9292
static void DtmXactCallback(XactEvent event, void *arg);
9393
static TransactionId DtmGetNextXid(void);
9494
static TransactionId DtmGetNewTransactionId(bool isSubXact);
@@ -765,7 +765,7 @@ static void DtmInitialize()
765765
}
766766

767767
static void
768-
DtmSubXactCallback(XactEvent event, void *arg)
768+
DtmSubXactCallback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
769769
{
770770
elog(ERROR, "Subtransactions are not currently supported");
771771
}

contrib/pg_dtm/pg_dtm.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
7474
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
7575
static void DtmUpdateRecentXmin(Snapshot snapshot);
7676
static void DtmInitialize(void);
77-
static void DtmSubXactCallback(XactEvent event, void *arg);
77+
static void DtmSubXactCallback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg);
7878
static void DtmXactCallback(XactEvent event, void *arg);
7979
static TransactionId DtmGetNextXid(void);
8080
static TransactionId DtmGetNewTransactionId(bool isSubXact);
@@ -737,7 +737,7 @@ static void DtmInitialize()
737737
}
738738

739739
static void
740-
DtmSubXactCallback(XactEvent event, void *arg)
740+
DtmSubXactCallback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
741741
{
742742
elog(ERROR, "Subtransactions are not currently supported");
743743
}

contrib/pg_multimaster/Makefile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
MODULE_big = multimaster
2+
OBJS = multimaster.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o
3+
#OBJS = multimaster.o pglogical_receiver.o decoder_raw.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o
4+
EXTENSION = multimaster
5+
DATA = multimaster--1.0.sql
6+
7+
PG_CPPFLAGS = -I$(libpq_srcdir) -DUSE_PGLOGICAL_OUTPUT
8+
SHLIB_LINK = $(libpq)
9+
10+
ifdef USE_PGXS
11+
PG_CONFIG = pg_config
12+
PGXS := $(shell $(PG_CONFIG) --pgxs)
13+
include $(PGXS)
14+
else
15+
subdir = contrib/multimaster
16+
top_builddir = ../..
17+
include $(top_builddir)/src/Makefile.global
18+
include $(top_srcdir)/contrib/contrib-global.mk
19+
endif
20+

contrib/pg_multimaster/README.md

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# pg_dtm
2+
3+
### Design
4+
5+
This repo implements distributed transaction manager using Snapshot Sharing mechanism. General concepts and alternative approaches described in postgres wiki https://wiki.postgresql.org/wiki/DTM.
6+
7+
Backend-DTM protocol description can be found in [dtmd/README](dtmd/README).
8+
9+
### Installation
10+
11+
* Patch postgres using xtm.patch. After that build and install postgres in usual way.
12+
```bash
13+
cd ~/code/postgres
14+
patch -p1 < ~/code/pg_dtm/xtm.patch
15+
```
16+
* Install pg_dtm extension.
17+
```bash
18+
export PATH=/path/to/pgsql/bin/:$PATH
19+
cd ~/code/pg_dtm
20+
make && make install
21+
```
22+
* Run dtmd.
23+
```bash
24+
cd ~/code/pg_dtm/dtmd
25+
make
26+
mkdir /tmp/clog
27+
./bin/dtmd &
28+
```
29+
* To run something meaningful you need at leat two postgres instances. Also pg_dtm requires presense in ```shared_preload_libraries```.
30+
```bash
31+
initdb -D ./install/data1
32+
initdb -D ./install/data2
33+
echo "port = 5433" >> ./install/data2/postgresql.conf
34+
echo "shared_preload_libraries = 'pg_dtm'" >> ./install/data1/postgresql.conf
35+
echo "shared_preload_libraries = 'pg_dtm'" >> ./install/data2/postgresql.conf
36+
pg_ctl -D ./install/data1 -l ./install/data1/log start
37+
pg_ctl -D ./install/data2 -l ./install/data2/log start
38+
```
39+
40+
#### Automatic provisioning
41+
42+
For a cluster-wide deploy we use ansible, more details in tests/deploy_layouts. (Ansible instructions will be later)
43+
44+
### Usage
45+
46+
Now cluster is running and you can use global tx between two nodes. Let's connect to postgres instances at different ports:
47+
48+
```sql
49+
create extension pg_dtm; -- node1
50+
create table accounts(user_id int, amount int); -- node1
51+
insert into accounts (select 2*generate_series(1,100)-1, 0); -- node1, odd user_id's
52+
create extension pg_dtm; -- node2
53+
create table accounts(user_id int, amount int); -- node2
54+
insert into accounts (select 2*generate_series(1,100), 0); -- node2, even user_id's
55+
select dtm_begin_transaction(); -- node1, returns global xid, e.g. 42
56+
select dtm_join_transaction(42); -- node2, join global tx
57+
begin; -- node1
58+
begin; -- node2
59+
update accounts set amount=amount-100 where user_id=1; -- node1, transfer money from user#1
60+
update accounts set amount=amount+100 where user_id=2; -- node2, to user#2
61+
commit; -- node1, blocks until second commit happend
62+
commit; -- node2
63+
```
64+
65+
### Consistency testing
66+
67+
To ensure consistency we use simple bank test: perform a lot of simultaneous transfers between accounts on different servers, while constantly checking total amount of money on all accounts. This test can be found in tests/perf.
68+
69+
```bash
70+
> go run ./tests/perf/*
71+
-C value
72+
Connection string (repeat for multiple connections)
73+
-a int
74+
The number of bank accounts (default 100000)
75+
-b string
76+
Backend to use. Possible optinos: transfers, fdw, pgshard, readers. (default "transfers")
77+
-g Use DTM to keep global consistency
78+
-i Init database
79+
-l Use 'repeatable read' isolation level instead of 'read committed'
80+
-n int
81+
The number updates each writer (reader in case of Reades backend) performs (default 10000)
82+
-p Use parallel execs
83+
-r int
84+
The number of readers (default 1)
85+
-s int
86+
StartID. Script will update rows starting from this value
87+
-v Show progress and other stuff for mortals
88+
-w int
89+
The number of writers (default 8)
90+
```
91+
92+
So previous installation can be initialized with:
93+
```
94+
go run ./tests/perf/*.go \
95+
-C "dbname=postgres port=5432" \
96+
-C "dbname=postgres port=5433" \
97+
-g -i
98+
```
99+
and tested with:
100+
```
101+
go run ./tests/perf/*.go \
102+
-C "dbname=postgres port=5432" \
103+
-C "dbname=postgres port=5433" \
104+
-g
105+
```
106+
107+
### Using with postres_fdw.
108+
109+
We also provide a patch, that enables support of global transactions with postres_fdw. After patching and installing postres_fdw it is possible to run same test via fdw usig key ```-b fdw```.
110+
111+
### Using with pg_shard
112+
113+
Citus Data have branch in their pg_shard repo, that interacts with transaction manager. https://github.com/citusdata/pg_shard/tree/transaction_manager_integration
114+
To use this feature one should have following line in postgresql.conf (or set it via GUC)
115+
```
116+
pg_shard.use_dtm_transactions = 1
117+
```

contrib/pg_multimaster/bgwpool.c

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#include "postgres.h"
2+
#include "fmgr.h"
3+
#include "miscadmin.h"
4+
#include "postmaster/postmaster.h"
5+
#include "postmaster/bgworker.h"
6+
#include "storage/s_lock.h"
7+
#include "storage/spin.h"
8+
#include "storage/pg_sema.h"
9+
#include "storage/shmem.h"
10+
11+
#include "bgwpool.h"
12+
13+
typedef struct
14+
{
15+
BgwPoolConstructor constructor;
16+
int id;
17+
} BgwPoolExecutorCtx;
18+
19+
size_t n_snapshots;
20+
size_t n_active;
21+
22+
static void BgwPoolMainLoop(Datum arg)
23+
{
24+
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)arg;
25+
int id = ctx->id;
26+
BgwPool* pool = ctx->constructor();
27+
int size;
28+
void* work;
29+
30+
BackgroundWorkerUnblockSignals();
31+
BackgroundWorkerInitializeConnection(pool->dbname, NULL);
32+
33+
elog(WARNING, "Start background worker %d", id);
34+
35+
while(true) {
36+
PGSemaphoreLock(&pool->available);
37+
SpinLockAcquire(&pool->lock);
38+
size = *(int*)&pool->queue[pool->head];
39+
Assert(size < pool->size);
40+
work = palloc(size);
41+
pool->active -= 1;
42+
if (pool->head + size + 4 > pool->size) {
43+
memcpy(work, pool->queue, size);
44+
pool->head = INTALIGN(size);
45+
} else {
46+
memcpy(work, &pool->queue[pool->head+4], size);
47+
pool->head += 4 + INTALIGN(size);
48+
}
49+
if (pool->size == pool->head) {
50+
pool->head = 0;
51+
}
52+
if (pool->producerBlocked) {
53+
pool->producerBlocked = false;
54+
PGSemaphoreUnlock(&pool->overflow);
55+
}
56+
SpinLockRelease(&pool->lock);
57+
pool->executor(id, work, size);
58+
pfree(work);
59+
}
60+
}
61+
62+
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize)
63+
{
64+
pool->queue = (char*)ShmemAlloc(queueSize);
65+
pool->executor = executor;
66+
PGSemaphoreCreate(&pool->available);
67+
PGSemaphoreCreate(&pool->overflow);
68+
PGSemaphoreReset(&pool->available);
69+
PGSemaphoreReset(&pool->overflow);
70+
SpinLockInit(&pool->lock);
71+
pool->producerBlocked = false;
72+
pool->head = 0;
73+
pool->tail = 0;
74+
pool->size = queueSize;
75+
pool->active = 0;
76+
strcpy(pool->dbname, dbname);
77+
}
78+
79+
void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
80+
{
81+
int i;
82+
BackgroundWorker worker;
83+
84+
MemSet(&worker, 0, sizeof(BackgroundWorker));
85+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
86+
worker.bgw_start_time = BgWorkerStart_ConsistentState;
87+
worker.bgw_main = BgwPoolMainLoop;
88+
worker.bgw_restart_time = 10; /* Wait 10 seconds for restart before crash */
89+
90+
for (i = 0; i < nWorkers; i++) {
91+
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));
92+
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_worker_%d", i+1);
93+
ctx->id = i;
94+
ctx->constructor = constructor;
95+
worker.bgw_main_arg = (Datum)ctx;
96+
RegisterBackgroundWorker(&worker);
97+
}
98+
}
99+
100+
void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
101+
{
102+
Assert(size+4 <= pool->size);
103+
104+
SpinLockAcquire(&pool->lock);
105+
while (true) {
106+
if ((pool->head <= pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
107+
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
108+
{
109+
pool->producerBlocked = true;
110+
SpinLockRelease(&pool->lock);
111+
PGSemaphoreLock(&pool->overflow);
112+
SpinLockAcquire(&pool->lock);
113+
} else {
114+
pool->active += 1;
115+
n_snapshots += 1;
116+
n_active += pool->active;
117+
*(int*)&pool->queue[pool->tail] = size;
118+
if (pool->size - pool->tail >= size + 4) {
119+
memcpy(&pool->queue[pool->tail+4], work, size);
120+
pool->tail += 4 + INTALIGN(size);
121+
} else {
122+
memcpy(pool->queue, work, size);
123+
pool->tail = INTALIGN(size);
124+
}
125+
if (pool->tail == pool->size) {
126+
pool->tail = 0;
127+
}
128+
PGSemaphoreUnlock(&pool->available);
129+
break;
130+
}
131+
}
132+
SpinLockRelease(&pool->lock);
133+
}
134+

contrib/pg_multimaster/bgwpool.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#ifndef __BGWPOOL_H__
2+
#define __BGWPOOL_H__
3+
4+
#include "storage/s_lock.h"
5+
#include "storage/spin.h"
6+
#include "storage/pg_sema.h"
7+
8+
typedef void(*BgwPoolExecutor)(int id, void* work, size_t size);
9+
10+
#define MAX_DBNAME_LEN 30
11+
12+
typedef struct
13+
{
14+
BgwPoolExecutor executor;
15+
volatile slock_t lock;
16+
PGSemaphoreData available;
17+
PGSemaphoreData overflow;
18+
size_t head;
19+
size_t tail;
20+
size_t size;
21+
size_t active;
22+
bool producerBlocked;
23+
char dbname[MAX_DBNAME_LEN];
24+
char* queue;
25+
} BgwPool;
26+
27+
typedef BgwPool*(*BgwPoolConstructor)(void);
28+
29+
extern void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor);
30+
31+
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize);
32+
33+
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
34+
35+
#endif

contrib/pg_multimaster/bytebuf.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include "postgres.h"
2+
#include "bytebuf.h"
3+
4+
#define INIT_BUF_SIZE 1024
5+
6+
void ByteBufferAlloc(ByteBuffer* buf)
7+
{
8+
buf->size = INIT_BUF_SIZE;
9+
buf->data = palloc(buf->size);
10+
buf->used = 0;
11+
}
12+
13+
void ByteBufferAppend(ByteBuffer* buf, void* data, int len)
14+
{
15+
if (buf->used + len > buf->size) {
16+
buf->size = buf->used + len > buf->size*2 ? buf->used + len : buf->size*2;
17+
buf->data = (char*)repalloc(buf->data, buf->size);
18+
}
19+
memcpy(&buf->data[buf->used], data, len);
20+
buf->used += len;
21+
}
22+
23+
void ByteBufferAppendInt32(ByteBuffer* buf, int data)
24+
{
25+
ByteBufferAppend(buf, &data, sizeof data);
26+
}
27+
28+
void ByteBufferFree(ByteBuffer* buf)
29+
{
30+
pfree(buf->data);
31+
}
32+
33+
void ByteBufferReset(ByteBuffer* buf)
34+
{
35+
buf->used = 0;
36+
}

0 commit comments

Comments
 (0)