Skip to content

Commit dccee06

Browse files
committed
Actualize daemons.go and dtmbench for multimaster. Works partially :(
1 parent 017c2c1 commit dccee06

File tree

4 files changed

+137
-23
lines changed

4 files changed

+137
-23
lines changed

contrib/arbiter/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
top_builddir = ../..
22
include $(top_builddir)/src/Makefile.global
33

4-
override CFLAGS += -fPIC
4+
override CFLAGS += -fPIC -O0
55
override CPPFLAGS += -I. -Iinclude -Iapi -D_LARGEFILE64_SOURCE
66

77
AR = ar

contrib/multimaster/tests/daemons.go

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package main
22

33
import (
44
"log"
5+
"fmt"
56
"os/exec"
67
"io"
78
"bufio"
89
"sync"
10+
"flag"
911
"os"
1012
"strconv"
1113
"strings"
14+
"time"
1215
)
1316

1417
func read_to_channel(r io.Reader, c chan string, wg *sync.WaitGroup) {
@@ -56,9 +59,24 @@ func cmd_to_channel(argv []string, name string, out chan string) {
5659
log.Printf("'%s' finished\n", name)
5760
}
5861

59-
func dtmd(bin string, wg *sync.WaitGroup) {
60-
argv := []string{bin}
61-
name := "dtmd"
62+
const (
63+
DtmHost = "127.0.0.1"
64+
DtmPort = 5431
65+
PgPort = 5432
66+
)
67+
68+
func arbiter(bin string, datadir string, servers []string, id int, wg *sync.WaitGroup) {
69+
argv := []string{
70+
bin,
71+
"-d", datadir,
72+
"-i", strconv.Itoa(id),
73+
}
74+
for _, server := range servers {
75+
argv = append(argv, "-r", server)
76+
}
77+
log.Println(argv)
78+
79+
name := "arbiter " + datadir
6280
c := make(chan string)
6381

6482
go cmd_to_channel(argv, name, c)
@@ -70,6 +88,21 @@ func dtmd(bin string, wg *sync.WaitGroup) {
7088
wg.Done()
7189
}
7290

91+
func appendfile(filename string, lines ...string) {
92+
f, err := os.OpenFile(filename, os.O_APPEND | os.O_WRONLY, 0600)
93+
if err != nil {
94+
log.Fatal(err)
95+
}
96+
97+
defer f.Close()
98+
99+
for _, l := range lines {
100+
if _, err = f.WriteString(l + "\n"); err != nil {
101+
log.Fatal(err)
102+
}
103+
}
104+
}
105+
73106
func initdb(bin string, datadir string) {
74107
if err := os.RemoveAll(datadir); err != nil {
75108
log.Fatal(err)
@@ -84,20 +117,45 @@ func initdb(bin string, datadir string) {
84117
for s := range c {
85118
log.Printf("[%s] %s\n", name, s)
86119
}
120+
121+
appendfile(
122+
datadir + "/pg_hba.conf",
123+
"local replication all trust",
124+
"host replication all 127.0.0.1/32 trust",
125+
"host replication all ::1/128 trust",
126+
)
127+
}
128+
129+
func initarbiter(arbiterdir string) {
130+
if err := os.RemoveAll(arbiterdir); err != nil {
131+
log.Fatal(err)
132+
}
133+
if err := os.MkdirAll(arbiterdir, os.ModeDir | 0777); err != nil {
134+
log.Fatal(err)
135+
}
87136
}
88137

89-
func postgres(bin string, datadir string, port int, nodeid int, wg *sync.WaitGroup) {
138+
func postgres(bin string, datadir string, postgresi []string, arbiters []string, port int, nodeid int, wg *sync.WaitGroup) {
90139
argv := []string{
91140
bin,
92141
"-D", datadir,
93142
"-p", strconv.Itoa(port),
94-
"-c", "dtm.buffer_size=65536",
95-
"-c", "dtm.host=127.0.0.1",
96-
"-c", "dtm.port=" + strconv.Itoa(5431),
143+
"-c", "multimaster.buffer_size=65536",
144+
"-c", "multimaster.conn_strings=" + strings.Join(postgresi, ","),
145+
"-c", "multimaster.node_id=" + strconv.Itoa(nodeid + 1),
146+
"-c", "multimaster.arbiters=" + strings.Join(arbiters, ","),
147+
"-c", "multimaster.workers=8",
148+
"-c", "multimaster.queue_size=1073741824",
149+
"-c", "wal_level=logical",
150+
"-c", "wal_sender_timeout=0",
151+
"-c", "max_wal_senders=10",
152+
"-c", "max_worker_processes=100",
153+
"-c", "max_replication_slots=10",
97154
"-c", "autovacuum=off",
98155
"-c", "fsync=off",
99-
"-c", "synchronous_commit=off",
100-
"-c", "shared_preload_libraries=pg_dtm",
156+
"-c", "synchronous_commit=on",
157+
"-c", "max_connections=200",
158+
"-c", "shared_preload_libraries=multimaster",
101159
}
102160
name := "postgres " + datadir
103161
c := make(chan string)
@@ -138,32 +196,60 @@ func get_prefix(srcroot string) string {
138196
return "."
139197
}
140198

199+
var doInitDb bool = false
200+
func init() {
201+
flag.BoolVar(&doInitDb, "i", false, "perform initdb")
202+
flag.Parse()
203+
}
204+
141205
func main() {
142206
srcroot := "../../.."
143207
prefix := get_prefix(srcroot)
144208

145209
bin := map[string]string{
146-
"dtmd": srcroot + "/contrib/pg_dtm/dtmd/bin/dtmd",
210+
"arbiter": srcroot + "/contrib/arbiter/bin/arbiter",
147211
"initdb": prefix + "/bin/initdb",
148212
"postgres": prefix + "/bin/postgres",
149213
}
150214

151-
datadirs := []string{"/tmp/data1", "/tmp/data2", "/tmp/data3"}
215+
datadirs := []string{"/tmp/data0", "/tmp/data1", "/tmp/data2"}
216+
//arbiterdirs := []string{"/tmp/arbiter0", "/tmp/arbiter1", "/tmp/arbiter2"}
217+
arbiterdirs := []string{"/tmp/arbiter0"}
152218

153219
check_bin(&bin);
154220

155-
for _, datadir := range datadirs {
156-
initdb(bin["initdb"], datadir)
221+
if doInitDb {
222+
for _, datadir := range datadirs {
223+
initdb(bin["initdb"], datadir)
224+
}
225+
for _, arbiterdir := range arbiterdirs {
226+
initarbiter(arbiterdir)
227+
}
157228
}
158229

159230
var wg sync.WaitGroup
160231

161-
wg.Add(1)
162-
go dtmd(bin["dtmd"], &wg)
232+
var arbiters []string
233+
for i := range arbiterdirs {
234+
arbiters = append(arbiters, DtmHost + ":" + strconv.Itoa(DtmPort - i))
235+
}
236+
for i, dir := range arbiterdirs {
237+
wg.Add(1)
238+
go arbiter(bin["arbiter"], dir, arbiters, i, &wg)
239+
}
240+
241+
time.Sleep(3 * time.Second)
163242

164-
for i, datadir := range datadirs {
243+
var postgresi []string
244+
for i := range datadirs {
245+
postgresi = append(
246+
postgresi,
247+
fmt.Sprintf("dbname=postgres host=127.0.0.1 port=%d sslmode=disable", PgPort + i),
248+
)
249+
}
250+
for i, dir := range datadirs {
165251
wg.Add(1)
166-
go postgres(bin["postgres"], datadir, 5432 + i, i, &wg)
252+
go postgres(bin["postgres"], dir, postgresi, arbiters, PgPort + i, i, &wg)
167253
}
168254

169255
wg.Wait()

contrib/multimaster/tests/dtmbench.cpp

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,38 @@ void* writer(void* arg)
178178

179179
void initializeDatabase()
180180
{
181+
//for (size_t i = 0; i < cfg.connections.size(); i++) {
182+
// printf("creating extension %lu\n", i);
183+
// try {
184+
// connection conn(cfg.connections[i]);
185+
// work txn(conn);
186+
// //exec(txn, "drop extension if exists multimaster");
187+
// exec(txn, "create extension multimaster");
188+
// txn.commit();
189+
// } catch (pqxx_exception const& x) {
190+
// i -= 1;
191+
// continue;
192+
// }
193+
// printf("extension %lu created\n", i);
194+
//}
195+
196+
printf("creating table t\n");
181197
connection conn(cfg.connections[0]);
182-
work txn(conn);
183-
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
184-
txn.commit();
198+
{
199+
work txn(conn);
200+
exec(txn, "drop table if exists t");
201+
exec(txn, "create table t(u int primary key, v int)");
202+
txn.commit();
203+
}
204+
printf("table t created\n");
205+
206+
printf("inserting stuff into t\n");
207+
{
208+
work txn(conn);
209+
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
210+
txn.commit();
211+
}
212+
printf("stuff inserted\n");
185213
}
186214

187215
int main (int argc, char* argv[])

contrib/multimaster/tests/makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CXXFLAGS=-g -Wall -O2 -pthread
44
all: dtmbench
55

66
dtmbench: dtmbench.cpp
7-
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
7+
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx -lpq
88

99
clean:
10-
rm -f dtmbench
10+
rm -f dtmbench

0 commit comments

Comments
 (0)