Skip to content

Commit 94d09aa

Browse files
committed
Switch multimaster to the unified arbiter and sockhub implementation.
1 parent 3f68177 commit 94d09aa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+520
-5595
lines changed

contrib/arbiter/README

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ The commands:
5050
[RES_OK, min, max] if reserved a range [min, max]
5151
[RES_FAILED] on failure
5252

53-
'b': begin(size)
54-
Starts a global transaction and assign a 'xid' to it. 'size' is used
55-
for vote results calculation. The arbiter also creates and returns the
53+
'b': begin(), begin(size)
54+
Starts a global transaction and assign a 'xid' to it. Optional 'size' is
55+
used for vote results calculation. The arbiter also creates and returns the
5656
snapshot.
5757

5858
The arbiter replies with:

contrib/arbiter/api/arbiter.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ void ArbiterInitSnapshot(Snapshot snapshot)
345345
}
346346
}
347347

348-
TransactionId ArbiterStartTransaction(Snapshot snapshot, TransactionId *gxmin)
348+
TransactionId ArbiterStartTransaction(Snapshot snapshot, TransactionId *gxmin, int nParticipants)
349349
{
350350
int i;
351351
xid_t xid;
@@ -359,7 +359,11 @@ TransactionId ArbiterStartTransaction(Snapshot snapshot, TransactionId *gxmin)
359359
assert(snapshot != NULL);
360360

361361
// command
362-
if (!arbiter_send_command(arbiter, CMD_BEGIN, 0)) goto failure;
362+
if (nParticipants) {
363+
if (!arbiter_send_command(arbiter, CMD_BEGIN, 1, nParticipants)) goto failure;
364+
} else {
365+
if (!arbiter_send_command(arbiter, CMD_BEGIN, 0)) goto failure;
366+
}
363367

364368
// results
365369
reslen = arbiter_recv_results(arbiter, RESULTS_SIZE, results);

contrib/arbiter/api/arbiter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ void ArbiterInitSnapshot(Snapshot snapshot);
2121
* smallest xmin among all snapshots known to arbiter. Returns INVALID_XID
2222
* otherwise.
2323
*/
24-
TransactionId ArbiterStartTransaction(Snapshot snapshot, TransactionId *gxmin);
24+
TransactionId ArbiterStartTransaction(Snapshot snapshot, TransactionId *gxmin, int nParticipants);
2525

2626
/**
2727
* Asks the arbiter for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
#ifndef ARBITER_LIMITS_H
22
#define ARBITER_LIMITS_H
33

4-
// how many xids are reserved per raft term
4+
/* how many xids are reserved per raft term */
55
#define XIDS_PER_TERM 1000000
66

7-
// start a new term when this number of xids is left
7+
/* start a new term when this number of xids is left */
88
#define NEW_TERM_THRESHOLD 100000
99

1010
#define MAX_TRANSACTIONS 4096
1111
#define MAX_SNAPSHOTS_PER_TRANS 8
1212

13-
#define BUFFER_SIZE (64 * 1024)
13+
#define BUFFER_SIZE (256 * 1024)
1414
#define LISTEN_QUEUE_SIZE 100
1515
#define MAX_STREAMS 4096
16+
#define SOCKET_BUFFER_SIZE (1024 * 1024)
1617

1718
#define MAX_SERVERS 16
1819
#define HEARTBEAT_TIMEOUT_MS 20
1920
#define ELECTION_TIMEOUT_MS_MIN 150
2021
#define ELECTION_TIMEOUT_MS_MAX 300
2122
#define RAFT_LOGLEN 1024
22-
#define RAFT_KEEP_APPLIED 512 // how many applied entries to keep during compaction
23+
#define RAFT_KEEP_APPLIED 512 /* how many applied entries to keep during compaction */
2324

2425
#endif

contrib/arbiter/include/transaction.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ typedef struct Transaction {
2121
xid_t xid;
2222
xid_t xmin;
2323

24-
int size; // number of paritcipants
24+
int size; // number of participants
25+
bool fixed_size;
2526

2627
// for + against ≤ size
2728
int votes_for;
@@ -57,8 +58,6 @@ static inline void l2_list_unlink(L2List* elem)
5758
elem->prev->next = elem->next;
5859
}
5960

60-
61-
6261
Snapshot *transaction_latest_snapshot(Transaction *t);
6362
Snapshot *transaction_snapshot(Transaction *t, int snapno);
6463
Snapshot *transaction_next_snapshot(Transaction *t);

contrib/arbiter/sockhub/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = gcc
2-
CFLAGS = -c -I. -Wall -O3 -g -fPIC
2+
CFLAGS = -c -I. -Wall -O0 -g -fPIC
33
LD = $(CC)
44
LDFLAGS = -g
55
AR = ar

contrib/arbiter/sockhub/sockhub.c

Lines changed: 91 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,31 @@
1919

2020
#include "sockhub.h"
2121

22+
#define SOCKHUB_BUFFER_SIZE (1024*1024)
23+
#define ERR_BUF_SIZE 1024
24+
25+
void ShubAddSocket(Shub* shub, int fd);
26+
27+
inline void ShubAddSocket(Shub* shub, int fd)
28+
{
29+
#ifdef USE_EPOLL
30+
struct epoll_event ev;
31+
ev.events = EPOLLIN;
32+
ev.data.fd = fd;
33+
if (epoll_ctl(shub->epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
34+
char buf[ERR_BUF_SIZE];
35+
sprintf(buf, "Failed to add socket %d to epoll set", fd);
36+
shub->params->error_handler(buf, SHUB_FATAL_ERROR);
37+
}
38+
#else
39+
FD_SET(fd, &shub->inset);
40+
if (fd > shub->max_fd) {
41+
shub->max_fd = fd;
42+
}
43+
#endif
44+
}
45+
46+
2247
static void default_error_handler(char const* msg, ShubErrorSeverity severity)
2348
{
2449
perror(msg);
@@ -116,8 +141,15 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
116141

117142
static void close_socket(Shub* shub, int fd)
118143
{
119-
close(fd);
144+
#ifdef USE_EPOLL
145+
if (epoll_ctl(shub->epollfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
146+
char buf[ERR_BUF_SIZE];
147+
sprintf(buf, "Failed to remove socket %d from epoll set", fd);
148+
shub->params->error_handler(buf, SHUB_RECOVERABLE_ERROR);
149+
}
150+
#else
120151
FD_CLR(fd, &shub->inset);
152+
#endif
121153
}
122154

123155
int ShubReadSocketEx(int sd, void* buf, int min_size, int max_size)
@@ -211,7 +243,12 @@ static void reconnect(Shub* shub)
211243
} else {
212244
int optval = 1;
213245
setsockopt(shub->output, IPPROTO_TCP, TCP_NODELAY, (char const*)&optval, sizeof(optval));
214-
FD_SET(shub->output, &shub->inset);
246+
optval = SOCKHUB_BUFFER_SIZE;
247+
setsockopt(shub->output, SOL_SOCKET, SO_SNDBUF, (const char*) &optval, sizeof(int));
248+
optval = SOCKHUB_BUFFER_SIZE;
249+
setsockopt(shub->output, SOL_SOCKET, SO_RCVBUF, (const char*) &optval, sizeof(int));
250+
251+
ShubAddSocket(shub, shub->output);
215252
return;
216253
}
217254
}
@@ -238,6 +275,7 @@ static void notify_disconnect(Shub* shub, int chan)
238275

239276
static void recovery(Shub* shub)
240277
{
278+
#ifndef USE_EPOLL
241279
int i, max_fd;
242280

243281
for (i = 0, max_fd = shub->max_fd; i <= max_fd; i++) {
@@ -254,6 +292,7 @@ static void recovery(Shub* shub)
254292
}
255293
}
256294
}
295+
#endif
257296
}
258297

259298
void ShubInitialize(Shub* shub, ShubParams* params)
@@ -276,11 +315,17 @@ void ShubInitialize(Shub* shub, ShubParams* params)
276315
if (listen(shub->input, params->queue_size) < 0) {
277316
shub->params->error_handler("Failed to listen local socket", SHUB_FATAL_ERROR);
278317
}
279-
FD_ZERO(&shub->inset);
280-
FD_SET(shub->input, &shub->inset);
281-
282318
shub->output = -1;
283-
shub->max_fd = shub->input;
319+
#ifdef USE_EPOLL
320+
shub->epollfd = epoll_create(MAX_EVENTS);
321+
if (shub->epollfd < 0) {
322+
shub->params->error_handler("Failed to create epoll", SHUB_FATAL_ERROR);
323+
}
324+
#else
325+
FD_ZERO(&shub->inset);
326+
shub->max_fd = 0;
327+
#endif
328+
ShubAddSocket(shub, shub->input);
284329
reconnect(shub);
285330

286331
shub->in_buffer = malloc(params->buffer_size);
@@ -301,43 +346,61 @@ static void die(int sig)
301346
void ShubLoop(Shub* shub)
302347
{
303348
int buffer_size = shub->params->buffer_size;
349+
sigset_t sset;
304350
signal(SIGINT, die);
305351
signal(SIGQUIT, die);
306352
signal(SIGTERM, die);
307-
// signal(SIGHUP, die);
308-
sigset_t sset;
353+
/* signal(SIGHUP, die); */
309354
sigfillset(&sset);
310355
sigprocmask(SIG_UNBLOCK, &sset, NULL);
311356

312-
while (!stop) {
357+
while (!stop) {
358+
int i, rc;
359+
#ifdef USE_EPOLL
360+
struct epoll_event events[MAX_EVENTS];
361+
rc = epoll_wait(shub->epollfd, events, MAX_EVENTS, shub->in_buffer_used == 0 ? -1 : shub->params->delay);
362+
#else
313363
fd_set events;
314364
struct timeval tm;
315-
int i, rc;
316365
int max_fd = shub->max_fd;
317366

318367
tm.tv_sec = shub->params->delay/1000;
319368
tm.tv_usec = shub->params->delay % 1000 * 1000;
320-
321369
events = shub->inset;
322370
rc = select(max_fd+1, &events, NULL, NULL, shub->in_buffer_used == 0 ? NULL : &tm);
323-
if (rc < 0) {
324-
if (errno != EINTR) {
371+
#endif
372+
if (rc < 0) {
373+
if (errno != EINTR) {
325374
shub->params->error_handler("Select failed", SHUB_RECOVERABLE_ERROR);
326375
recovery(shub);
327376
}
328377
} else {
329378
if (rc > 0) {
379+
#ifdef USE_EPOLL
380+
int j;
381+
int n = rc;
382+
for (j = 0; j < n; j++) {
383+
i = events[j].data.fd;
384+
if (events[j].events & EPOLLERR) {
385+
if (i == shub->input) {
386+
shub->params->error_handler("Input socket error", SHUB_FATAL_ERROR);
387+
} else if (i == shub->output) {
388+
reconnect(shub);
389+
} else {
390+
notify_disconnect(shub, i);
391+
close_socket(shub, i);
392+
}
393+
} else if (events[j].events & EPOLLIN) {
394+
#else
330395
for (i = 0; i <= max_fd; i++) {
331396
if (FD_ISSET(i, &events)) {
332-
if (i == shub->input) { /* accept incomming connection */
397+
#endif
398+
if (i == shub->input) { /* accept incomming connection */
333399
int s = accept(i, NULL, NULL);
334400
if (s < 0) {
335401
shub->params->error_handler("Failed to accept socket", SHUB_RECOVERABLE_ERROR);
336402
} else {
337-
if (s > shub->max_fd) {
338-
shub->max_fd = s;
339-
}
340-
FD_SET(s, &shub->inset);
403+
ShubAddSocket(shub, s);
341404
}
342405
} else if (i == shub->output) { /* receive response from server */
343406
/* try to read as much as possible */
@@ -424,8 +487,11 @@ void ShubLoop(Shub* shub)
424487
assert(sizeof(ShubMessageHdr) > available);
425488
/* read as much as possible */
426489
rc = ShubReadSocketEx(chan, &shub->in_buffer[pos + available], sizeof(ShubMessageHdr) - available, buffer_size - pos - available);
427-
if (rc < sizeof(ShubMessageHdr) - available) {
428-
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
490+
if (rc < sizeof(ShubMessageHdr) - available) {
491+
char buf[ERR_BUF_SIZE];
492+
sprintf(buf, "Failed to read local socket chan=%d, rc=%d, min requested=%ld, max requested=%d, errno=%d", chan, rc, sizeof(ShubMessageHdr) - available, buffer_size - pos - available, errno);
493+
shub->params->error_handler(buf, SHUB_RECOVERABLE_ERROR);
494+
//shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
429495
close_socket(shub, i);
430496
shub->in_buffer_used = pos;
431497
notify_disconnect(shub, i);
@@ -460,8 +526,11 @@ void ShubLoop(Shub* shub)
460526
/* fetch rest of message body */
461527
do {
462528
unsigned int n = processed + size > buffer_size ? buffer_size - processed : size;
463-
if (chan >= 0 && !ShubReadSocket(chan, shub->in_buffer + processed, n)) {
464-
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
529+
if (chan >= 0 && !ShubReadSocket(chan, shub->in_buffer + processed, n)) {
530+
char buf[ERR_BUF_SIZE];
531+
sprintf(buf, "Failed to read local socket rc=%d, len=%d, errno=%d", rc, n, errno);
532+
shub->params->error_handler(buf, SHUB_RECOVERABLE_ERROR);
533+
//shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
465534
close_socket(shub, chan);
466535
if (hdr != NULL) { /* if message header is not yet sent to the server... */
467536
/* ... then skip this message */

contrib/arbiter/sockhub/sockhub.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
11
#ifndef __SOCKHUB_H__
22
#define __SOCKHUB_H__
33

4+
5+
#ifdef __linux__
6+
#define USE_EPOLL 1
7+
#endif
8+
9+
#ifdef USE_EPOLL
10+
#include <sys/epoll.h>
11+
#define MAX_EVENTS 1024
12+
#else
413
#include <sys/select.h>
14+
#endif
15+
516

617
typedef struct {
718
unsigned int size : 24; /* size of message without header */
@@ -47,15 +58,20 @@ typedef struct
4758
{
4859
int output;
4960
int input;
61+
#ifdef USE_EPOLL
62+
int epollfd;
63+
#else
5064
int max_fd;
5165
fd_set inset;
66+
#endif
5267
char* in_buffer;
5368
char* out_buffer;
5469
int in_buffer_used;
5570
int out_buffer_used;
5671
ShubParams* params;
5772
} Shub;
5873

74+
5975
int ShubReadSocketEx(int sd, void* buf, int min_size, int max_size);
6076
int ShubReadSocket(int sd, void* buf, int size);
6177
int ShubWriteSocket(int sd, void const* buf, int size);
@@ -66,5 +82,3 @@ void ShubInitialize(Shub* shub, ShubParams* params);
6682
void ShubLoop(Shub* shub);
6783

6884
#endif
69-
70-
// vim: sts=4 ts=4 sw=4 expandtab

0 commit comments

Comments
 (0)