*/
CSTATE_ABORTED,
CSTATE_FINISHED
-} ConnectionStateEnum;
+} ConnectionStateEnum;
/*
* Connection state.
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs;
- /* initialize explicitely the state machines */
+ /* explicitly initialize the state machines */
for (i = 0; i < nstate; i++)
{
state[i].state = CSTATE_CHOOSE_SCRIPT;
}
+ /* loop till all clients have terminated */
while (remains > 0)
{
fd_set input_mask;
- int maxsock; /* max socket number to be waited */
- int64 now_usec = 0;
+ int maxsock; /* max socket number to be waited for */
int64 min_usec;
+ int64 now_usec = 0; /* set this only if needed */
+ /* identify which client sockets should be checked for input */
FD_ZERO(&input_mask);
-
maxsock = -1;
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- int sock;
if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
- /* interrupt client which has not started a transaction */
+ /* interrupt client that has not started a transaction */
st->state = CSTATE_FINISHED;
- remains--;
PQfinish(st->con);
st->con = NULL;
- continue;
+ remains--;
}
else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
/* a nap from the script, or under throttling */
- int this_usec;
+ int64 this_usec;
- if (min_usec == PG_INT64_MAX)
+ /* get current time if needed */
+ if (now_usec == 0)
{
instr_time now;
now_usec = INSTR_TIME_GET_MICROSEC(now);
}
+ /* min_usec should be the minimum delay across all clients */
this_usec = (st->state == CSTATE_SLEEP ?
st->sleep_until : st->txn_scheduled) - now_usec;
if (min_usec > this_usec)
* waiting for result from server - nothing to do unless the
* socket is readable
*/
- sock = PQsocket(st->con);
+ int sock = PQsocket(st->con);
+
if (sock < 0)
{
- fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
+ fprintf(stderr, "invalid socket: %s",
+ PQerrorMessage(st->con));
goto done;
}
FD_SET(sock, &input_mask);
-
if (maxsock < sock)
maxsock = sock;
- break;
}
- else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
+ else if (st->state != CSTATE_ABORTED &&
+ st->state != CSTATE_FINISHED)
{
- /* the connection is ready to run */
+ /*
+ * This client thread is ready to do something, so we don't
+ * want to wait. No need to examine additional clients.
+ */
min_usec = 0;
break;
}
}
/*
- * Sleep until we receive data from the server, or a nap-time
- * specified in the script ends, or it's time to print a progress
- * report.
+ * If no clients are ready to execute actions, sleep until we receive
+ * data from the server, or a nap-time specified in the script ends,
+ * or it's time to print a progress report. Update input_mask to show
+ * which client(s) received data.
*/
if (min_usec > 0 && maxsock != -1)
{
if (nsocks < 0)
{
if (errno == EINTR)
+ {
+ /* On EINTR, go back to top of loop */
continue;
+ }
/* must be something wrong */
fprintf(stderr, "select() failed: %s\n", strerror(errno));
goto done;
}
}
+ else
+ {
+ /* If we didn't call select(), don't try to read any data */
+ FD_ZERO(&input_mask);
+ }
/* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- bool ready;
- if (st->state == CSTATE_WAIT_RESULT && st->con)
+ if (st->state == CSTATE_WAIT_RESULT)
{
+ /* don't call doCustom unless data is available */
int sock = PQsocket(st->con);
if (sock < 0)
goto done;
}
- ready = FD_ISSET(sock, &input_mask);
+ if (!FD_ISSET(sock, &input_mask))
+ continue;
}
- else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
- ready = false;
- else
- ready = true;
-
- if (ready)
+ else if (st->state == CSTATE_FINISHED ||
+ st->state == CSTATE_ABORTED)
{
- doCustom(thread, st, &aggs);
- if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
- remains--;
+ /* this client is done, no need to consider it anymore */
+ continue;
}
+
+ doCustom(thread, st, &aggs);
+
+ /* If doCustom changed client to finished state, reduce remains */
+ if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ remains--;
}
- /* progress report by thread 0 for all threads */
+ /* progress report is made by thread 0 for all threads */
if (progress && thread->tid == 0)
{
instr_time now_time;