Refactor script execution state machine in pgbench.
authorHeikki Linnakangas <[email protected]>
Mon, 26 Sep 2016 07:56:02 +0000 (10:56 +0300)
committerHeikki Linnakangas <[email protected]>
Mon, 26 Sep 2016 07:56:02 +0000 (10:56 +0300)
The doCustom() function had grown into quite a mess. Rewrite it, in a more
explicit state machine style, for readability.

This also fixes one minor bug: if a script consisted entirely of meta
commands, doCustom() never returned to the caller, so progress reports
with the -P option were not printed. I don't want to backpatch this
refactoring, and the bug is quite insignificant, so only commit this to
master, and leave the bug unfixed in back-branches.

Review and original bug report by Fabien Coelho.

Discussion: <alpine.DEB.2.20.1607090850120.3412@sto>

src/bin/pgbench/pgbench.c

index 8b24ad50e7ab36468126bb10170bbc83eab850ad..1fb4ae46d56bd571afa48dd76dc8dbd94cad4c31 100644 (file)
@@ -235,25 +235,95 @@ typedef struct StatsData
 } StatsData;
 
 /*
- * Connection state
+ * Connection state machine states.
+ */
+typedef enum
+{
+   /*
+    * The client must first choose a script to execute.  Once chosen, it can
+    * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
+    * right away (state CSTATE_START_TX).
+    */
+   CSTATE_CHOOSE_SCRIPT,
+
+   /*
+    * In CSTATE_START_THROTTLE state, we calculate when to begin the next
+    * transaction, and advance to CSTATE_THROTTLE.  CSTATE_THROTTLE state
+    * sleeps until that moment.  (If throttling is not enabled, doCustom()
+    * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+    */
+   CSTATE_START_THROTTLE,
+   CSTATE_THROTTLE,
+
+   /*
+    * CSTATE_START_TX performs start-of-transaction processing.  Establishes
+    * a new connection for the transaction, in --connect mode, and records
+    * the transaction start time.
+    */
+   CSTATE_START_TX,
+
+   /*
+    * We loop through these states, to process each command in the script:
+    *
+    * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
+    * command, the command is sent to the server, and we move to
+    * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is set,
+    * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
+    * meta-commands are executed immediately.
+    *
+    * CSTATE_WAIT_RESULT waits until we get a result set back from the server
+    * for the current command.
+    *
+    * CSTATE_SLEEP waits until the end of \sleep.
+    *
+    * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
+    * command counter, and loops back to CSTATE_START_COMMAND state.
+    */
+   CSTATE_START_COMMAND,
+   CSTATE_WAIT_RESULT,
+   CSTATE_SLEEP,
+   CSTATE_END_COMMAND,
+
+   /*
+    * CSTATE_END_TX performs end-of-transaction processing.  Calculates
+    * latency, and logs the transaction.  In --connect mode, closes the
+    * current connection.  Chooses the next script to execute and starts over
+    * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
+    * more work to do.
+    */
+   CSTATE_END_TX,
+
+   /*
+    * Final states.  CSTATE_ABORTED means that the script execution was
+    * aborted because a command failed, CSTATE_FINISHED means success.
+    */
+   CSTATE_ABORTED,
+   CSTATE_FINISHED
+}  ConnectionStateEnum;
+
+/*
+ * Connection state.
  */
 typedef struct
 {
    PGconn     *con;            /* connection handle to DB */
    int         id;             /* client No. */
-   int         state;          /* state No. */
-   bool        listen;         /* whether an async query has been sent */
-   bool        sleeping;       /* whether the client is napping */
-   bool        throttling;     /* whether nap is for throttling */
-   bool        is_throttled;   /* whether transaction throttling is done */
+   ConnectionStateEnum state;  /* state machine's current state. */
+
+   int         use_file;       /* index in sql_script for this client */
+   int         command;        /* command number in script */
+
+   /* client variables */
    Variable   *variables;      /* array of variable definitions */
    int         nvariables;     /* number of variables */
    bool        vars_sorted;    /* are variables sorted by name? */
+
+   /* various times about current transaction */
    int64       txn_scheduled;  /* scheduled start time of transaction (usec) */
    int64       sleep_until;    /* scheduled start time of next cmd (usec) */
    instr_time  txn_begin;      /* used for measuring schedule lag times */
    instr_time  stmt_begin;     /* used for measuring statement latencies */
-   int         use_file;       /* index in sql_scripts for this client */
+
    bool        prepared[MAX_SCRIPTS];  /* whether client prepared the script */
 
    /* per client collected stats */
@@ -1382,7 +1452,7 @@ evalFunc(TState *thread, CState *st,
                Assert(nargs == 1);
 
                fprintf(stderr, "debug(script=%d,command=%d): ",
-                       st->use_file, st->state + 1);
+                       st->use_file, st->command + 1);
 
                if (varg->type == PGBT_INT)
                    fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
@@ -1733,15 +1803,12 @@ preparedStatementName(char *buffer, int file, int state)
    sprintf(buffer, "P%d_%d", file, state);
 }
 
-static bool
-clientDone(CState *st)
+static void
+commandFailed(CState *st, char *message)
 {
-   if (st->con != NULL)
-   {
-       PQfinish(st->con);
-       st->con = NULL;
-   }
-   return false;               /* always false */
+   fprintf(stderr,
+           "client %d aborted in command %d of script %d; %s\n",
+           st->id, st->command, st->use_file, message);
 }
 
 /* return a script number with a weighted choice. */
@@ -1763,425 +1830,595 @@ chooseScript(TState *thread)
    return i - 1;
 }
 
-/* return false iff client should be disconnected */
+/* Send a SQL command, using the chosen querymode */
 static bool
-doCustom(TState *thread, CState *st, StatsData *agg)
+sendCommand(CState *st, Command *command)
 {
-   PGresult   *res;
-   Command   **commands;
-   bool        trans_needs_throttle = false;
-   instr_time  now;
+   int         r;
 
-   /*
-    * gettimeofday() isn't free, so we get the current timestamp lazily the
-    * first time it's needed, and reuse the same value throughout this
-    * function after that. This also ensures that e.g. the calculated latency
-    * reported in the log file and in the totals are the same. Zero means
-    * "not set yet". Reset "now" when we step to the next command with "goto
-    * top", though.
-    */
-top:
-   INSTR_TIME_SET_ZERO(now);
+   if (querymode == QUERY_SIMPLE)
+   {
+       char       *sql;
 
-   commands = sql_script[st->use_file].commands;
+       sql = pg_strdup(command->argv[0]);
+       sql = assignVariables(st, sql);
 
-   /*
-    * Handle throttling once per transaction by sleeping.  It is simpler to
-    * do this here rather than at the end, because so much complicated logic
-    * happens below when statements finish.
-    */
-   if (throttle_delay && !st->is_throttled)
+       if (debug)
+           fprintf(stderr, "client %d sending %s\n", st->id, sql);
+       r = PQsendQuery(st->con, sql);
+       free(sql);
+   }
+   else if (querymode == QUERY_EXTENDED)
    {
-       /*
-        * Generate a delay such that the series of delays will approximate a
-        * Poisson distribution centered on the throttle_delay time.
-        *
-        * If transactions are too slow or a given wait is shorter than a
-        * transaction, the next transaction will start right away.
-        */
-       int64       wait = getPoissonRand(thread, throttle_delay);
+       const char *sql = command->argv[0];
+       const char *params[MAX_ARGS];
 
-       thread->throttle_trigger += wait;
-       st->txn_scheduled = thread->throttle_trigger;
+       getQueryParams(st, command, params);
 
-       /* stop client if next transaction is beyond pgbench end of execution */
-       if (duration > 0 && st->txn_scheduled > end_time)
-           return clientDone(st);
+       if (debug)
+           fprintf(stderr, "client %d sending %s\n", st->id, sql);
+       r = PQsendQueryParams(st->con, sql, command->argc - 1,
+                             NULL, params, NULL, NULL, 0);
+   }
+   else if (querymode == QUERY_PREPARED)
+   {
+       char        name[MAX_PREPARE_NAME];
+       const char *params[MAX_ARGS];
 
-       /*
-        * If this --latency-limit is used, and this slot is already late so
-        * that the transaction will miss the latency limit even if it
-        * completed immediately, we skip this time slot and iterate till the
-        * next slot that isn't late yet.
-        */
-       if (latency_limit)
+       if (!st->prepared[st->use_file])
        {
-           int64       now_us;
+           int         j;
+           Command   **commands = sql_script[st->use_file].commands;
 
-           if (INSTR_TIME_IS_ZERO(now))
-               INSTR_TIME_SET_CURRENT(now);
-           now_us = INSTR_TIME_GET_MICROSEC(now);
-           while (thread->throttle_trigger < now_us - latency_limit)
+           for (j = 0; commands[j] != NULL; j++)
            {
-               processXactStats(thread, st, &now, true, agg);
-               /* next rendez-vous */
-               wait = getPoissonRand(thread, throttle_delay);
-               thread->throttle_trigger += wait;
-               st->txn_scheduled = thread->throttle_trigger;
+               PGresult   *res;
+               char        name[MAX_PREPARE_NAME];
+
+               if (commands[j]->type != SQL_COMMAND)
+                   continue;
+               preparedStatementName(name, st->use_file, j);
+               res = PQprepare(st->con, name,
+                         commands[j]->argv[0], commands[j]->argc - 1, NULL);
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                   fprintf(stderr, "%s", PQerrorMessage(st->con));
+               PQclear(res);
            }
+           st->prepared[st->use_file] = true;
        }
 
-       st->sleep_until = st->txn_scheduled;
-       st->sleeping = true;
-       st->throttling = true;
-       st->is_throttled = true;
+       getQueryParams(st, command, params);
+       preparedStatementName(name, st->use_file, st->command);
+
        if (debug)
-           fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
-                   st->id, wait);
+           fprintf(stderr, "client %d sending %s\n", st->id, name);
+       r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+                               params, NULL, NULL, 0);
    }
+   else    /* unknown sql mode */
+       r = 0;
 
-   if (st->sleeping)
-   {                           /* are we sleeping? */
-       if (INSTR_TIME_IS_ZERO(now))
-           INSTR_TIME_SET_CURRENT(now);
-       if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
-           return true;        /* Still sleeping, nothing to do here */
-       /* Else done sleeping, go ahead with next command */
-       st->sleeping = false;
-       st->throttling = false;
+   if (r == 0)
+   {
+       if (debug)
+           fprintf(stderr, "client %d could not send %s\n",
+                   st->id, command->argv[0]);
+       st->ecnt++;
+       return false;
    }
+   else
+       return true;
+}
+
+/*
+ * Parse the argument to a \sleep command, and return the requested amount
+ * of delay, in microseconds.  Returns true on success, false on error.
+ */
+static bool
+evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+{
+   char       *var;
+   int         usec;
 
-   if (st->listen)
-   {                           /* are we receiver? */
-       if (commands[st->state]->type == SQL_COMMAND)
+   if (*argv[1] == ':')
+   {
+       if ((var = getVariable(st, argv[1] + 1)) == NULL)
        {
-           if (debug)
-               fprintf(stderr, "client %d receiving\n", st->id);
-           if (!PQconsumeInput(st->con))
-           {                   /* there's something wrong */
-               fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
-               return clientDone(st);
-           }
-           if (PQisBusy(st->con))
-               return true;    /* don't have the whole result yet */
+           fprintf(stderr, "%s: undefined variable \"%s\"\n",
+                   argv[0], argv[1]);
+           return false;
        }
+       usec = atoi(var);
+   }
+   else
+       usec = atoi(argv[1]);
 
-       /*
-        * command finished: accumulate per-command execution times in
-        * thread-local data structure, if per-command latencies are requested
-        */
-       if (is_latencies)
-       {
-           if (INSTR_TIME_IS_ZERO(now))
-               INSTR_TIME_SET_CURRENT(now);
+   if (argc > 2)
+   {
+       if (pg_strcasecmp(argv[2], "ms") == 0)
+           usec *= 1000;
+       else if (pg_strcasecmp(argv[2], "s") == 0)
+           usec *= 1000000;
+   }
+   else
+       usec *= 1000000;
 
-           /* XXX could use a mutex here, but we choose not to */
-           addToSimpleStats(&commands[st->state]->stats,
-                            INSTR_TIME_GET_DOUBLE(now) -
-                            INSTR_TIME_GET_DOUBLE(st->stmt_begin));
-       }
+   *usecs = usec;
+   return true;
+}
 
-       /* transaction finished: calculate latency and log the transaction */
-       if (commands[st->state + 1] == NULL)
-       {
-           if (progress || throttle_delay || latency_limit ||
-               per_script_stats || use_log)
-               processXactStats(thread, st, &now, false, agg);
-           else
-               thread->stats.cnt++;
-       }
+/*
+ * Advance the state machine of a connection, if possible.
+ */
+static void
+doCustom(TState *thread, CState *st, StatsData *agg)
+{
+   PGresult   *res;
+   Command    *command;
+   instr_time  now;
+   bool        end_tx_processed = false;
+   int64       wait;
 
-       if (commands[st->state]->type == SQL_COMMAND)
-       {
-           /*
-            * Read and discard the query result; note this is not included in
-            * the statement latency numbers.
-            */
-           res = PQgetResult(st->con);
-           switch (PQresultStatus(res))
-           {
-               case PGRES_COMMAND_OK:
-               case PGRES_TUPLES_OK:
-               case PGRES_EMPTY_QUERY:
-                   break;      /* OK */
-               default:
-                   fprintf(stderr, "client %d aborted in state %d: %s",
-                           st->id, st->state, PQerrorMessage(st->con));
-                   PQclear(res);
-                   return clientDone(st);
-           }
-           PQclear(res);
-           discard_response(st);
-       }
+   /*
+    * gettimeofday() isn't free, so we get the current timestamp lazily the
+    * first time it's needed, and reuse the same value throughout this
+    * function after that.  This also ensures that e.g. the calculated
+    * latency reported in the log file and in the totals are the same. Zero
+    * means "not set yet".  Reset "now" when we execute shell commands or
+    * expressions, which might take a non-negligible amount of time, though.
+    */
+   INSTR_TIME_SET_ZERO(now);
 
-       if (commands[st->state + 1] == NULL)
+   /*
+    * Loop in the state machine, until we have to wait for a result from the
+    * server (or have to sleep, for throttling or for \sleep).
+    *
+    * Note: In the switch-statement below, 'break' will loop back here,
+    * meaning "continue in the state machine".  Return is used to return to
+    * the caller.
+    */
+   for (;;)
+   {
+       switch (st->state)
        {
-           if (is_connect)
-           {
-               PQfinish(st->con);
-               st->con = NULL;
-           }
+               /*
+                * Select transaction to run.
+                */
+           case CSTATE_CHOOSE_SCRIPT:
 
-           ++st->cnt;
-           if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
-               return clientDone(st);  /* exit success */
-       }
+               st->use_file = chooseScript(thread);
 
-       /* increment state counter */
-       st->state++;
-       if (commands[st->state] == NULL)
-       {
-           st->state = 0;
-           st->use_file = chooseScript(thread);
-           commands = sql_script[st->use_file].commands;
-           if (debug)
-               fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
-                       sql_script[st->use_file].desc);
-           st->is_throttled = false;
-
-           /*
-            * No transaction is underway anymore, which means there is
-            * nothing to listen to right now.  When throttling rate limits
-            * are active, a sleep will happen next, as the next transaction
-            * starts.  And then in any case the next SQL command will set
-            * listen back to true.
-            */
-           st->listen = false;
-           trans_needs_throttle = (throttle_delay > 0);
-       }
-   }
+               if (debug)
+                   fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
+                           sql_script[st->use_file].desc);
 
-   if (st->con == NULL)
-   {
-       instr_time  start,
-                   end;
+               if (throttle_delay > 0)
+                   st->state = CSTATE_START_THROTTLE;
+               else
+                   st->state = CSTATE_START_TX;
+               break;
 
-       INSTR_TIME_SET_CURRENT(start);
-       if ((st->con = doConnect()) == NULL)
-       {
-           fprintf(stderr, "client %d aborted while establishing connection\n",
-                   st->id);
-           return clientDone(st);
-       }
-       INSTR_TIME_SET_CURRENT(end);
-       INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
-
-       /* Reset session-local state */
-       st->listen = false;
-       st->sleeping = false;
-       st->throttling = false;
-       st->is_throttled = false;
-       memset(st->prepared, 0, sizeof(st->prepared));
-   }
+               /*
+                * Handle throttling once per transaction by sleeping.
+                */
+           case CSTATE_START_THROTTLE:
 
-   /*
-    * This ensures that a throttling delay is inserted before proceeding with
-    * sql commands, after the first transaction. The first transaction
-    * throttling is performed when first entering doCustom.
-    */
-   if (trans_needs_throttle)
-   {
-       trans_needs_throttle = false;
-       goto top;
-   }
+               /*
+                * Generate a delay such that the series of delays will
+                * approximate a Poisson distribution centered on the
+                * throttle_delay time.
+                *
+                * If transactions are too slow or a given wait is shorter
+                * than a transaction, the next transaction will start right
+                * away.
+                */
+               Assert(throttle_delay > 0);
+               wait = getPoissonRand(thread, throttle_delay);
 
-   /* Record transaction start time under logging, progress or throttling */
-   if ((use_log || progress || throttle_delay || latency_limit ||
-        per_script_stats) && st->state == 0)
-   {
-       INSTR_TIME_SET_CURRENT(st->txn_begin);
+               thread->throttle_trigger += wait;
+               st->txn_scheduled = thread->throttle_trigger;
 
-       /*
-        * When not throttling, this is also the transaction's scheduled start
-        * time.
-        */
-       if (!throttle_delay)
-           st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
-   }
+               /*
+                * stop client if next transaction is beyond pgbench end of
+                * execution
+                */
+               if (duration > 0 && st->txn_scheduled > end_time)
+               {
+                   st->state = CSTATE_FINISHED;
+                   break;
+               }
 
-   /* Record statement start time if per-command latencies are requested */
-   if (is_latencies)
-       INSTR_TIME_SET_CURRENT(st->stmt_begin);
+               /*
+                * If this --latency-limit is used, and this slot is already
+                * late so that the transaction will miss the latency limit
+                * even if it completed immediately, we skip this time slot
+                * and iterate till the next slot that isn't late yet.
+                */
+               if (latency_limit)
+               {
+                   int64       now_us;
 
-   if (commands[st->state]->type == SQL_COMMAND)
-   {
-       const Command *command = commands[st->state];
-       int         r;
+                   if (INSTR_TIME_IS_ZERO(now))
+                       INSTR_TIME_SET_CURRENT(now);
+                   now_us = INSTR_TIME_GET_MICROSEC(now);
+                   while (thread->throttle_trigger < now_us - latency_limit)
+                   {
+                       processXactStats(thread, st, &now, true, agg);
+                       /* next rendez-vous */
+                       wait = getPoissonRand(thread, throttle_delay);
+                       thread->throttle_trigger += wait;
+                       st->txn_scheduled = thread->throttle_trigger;
+                   }
+               }
 
-       if (querymode == QUERY_SIMPLE)
-       {
-           char       *sql;
+               st->state = CSTATE_THROTTLE;
+               if (debug)
+                   fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+                           st->id, wait);
+               break;
 
-           sql = pg_strdup(command->argv[0]);
-           sql = assignVariables(st, sql);
+               /*
+                * Wait until it's time to start next transaction.
+                */
+           case CSTATE_THROTTLE:
+               if (INSTR_TIME_IS_ZERO(now))
+                   INSTR_TIME_SET_CURRENT(now);
+               if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+                   return;     /* Still sleeping, nothing to do here */
+
+               /* Else done sleeping, start the transaction */
+               st->state = CSTATE_START_TX;
+               break;
 
-           if (debug)
-               fprintf(stderr, "client %d sending %s\n", st->id, sql);
-           r = PQsendQuery(st->con, sql);
-           free(sql);
-       }
-       else if (querymode == QUERY_EXTENDED)
-       {
-           const char *sql = command->argv[0];
-           const char *params[MAX_ARGS];
+               /* Start new transaction */
+           case CSTATE_START_TX:
 
-           getQueryParams(st, command, params);
+               /*
+                * Establish connection on first call, or if is_connect is
+                * true.
+                */
+               if (st->con == NULL)
+               {
+                   instr_time  start;
 
-           if (debug)
-               fprintf(stderr, "client %d sending %s\n", st->id, sql);
-           r = PQsendQueryParams(st->con, sql, command->argc - 1,
-                                 NULL, params, NULL, NULL, 0);
-       }
-       else if (querymode == QUERY_PREPARED)
-       {
-           char        name[MAX_PREPARE_NAME];
-           const char *params[MAX_ARGS];
+                   if (INSTR_TIME_IS_ZERO(now))
+                       INSTR_TIME_SET_CURRENT(now);
+                   start = now;
+                   if ((st->con = doConnect()) == NULL)
+                   {
+                       fprintf(stderr, "client %d aborted while establishing connection\n",
+                               st->id);
+                       st->state = CSTATE_ABORTED;
+                       break;
+                   }
+                   INSTR_TIME_SET_CURRENT(now);
+                   INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
 
-           if (!st->prepared[st->use_file])
-           {
-               int         j;
+                   /* Reset session-local state */
+                   memset(st->prepared, 0, sizeof(st->prepared));
+               }
 
-               for (j = 0; commands[j] != NULL; j++)
+               /*
+                * Record transaction start time under logging, progress or
+                * throttling.
+                */
+               if (use_log || progress || throttle_delay || latency_limit ||
+                   per_script_stats)
                {
-                   PGresult   *res;
-                   char        name[MAX_PREPARE_NAME];
+                   if (INSTR_TIME_IS_ZERO(now))
+                       INSTR_TIME_SET_CURRENT(now);
+                   st->txn_begin = now;
+
+                   /*
+                    * When not throttling, this is also the transaction's
+                    * scheduled start time.
+                    */
+                   if (!throttle_delay)
+                       st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+               }
 
-                   if (commands[j]->type != SQL_COMMAND)
-                       continue;
-                   preparedStatementName(name, st->use_file, j);
-                   res = PQprepare(st->con, name,
-                         commands[j]->argv[0], commands[j]->argc - 1, NULL);
-                   if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                       fprintf(stderr, "%s", PQerrorMessage(st->con));
-                   PQclear(res);
+               /* Begin with the first command */
+               st->command = 0;
+               st->state = CSTATE_START_COMMAND;
+               break;
+
+               /*
+                * Send a command to server (or execute a meta-command)
+                */
+           case CSTATE_START_COMMAND:
+               command = sql_script[st->use_file].commands[st->command];
+
+               /*
+                * If we reached the end of the script, move to end-of-xact
+                * processing.
+                */
+               if (command == NULL)
+               {
+                   st->state = CSTATE_END_TX;
+                   break;
                }
-               st->prepared[st->use_file] = true;
-           }
 
-           getQueryParams(st, command, params);
-           preparedStatementName(name, st->use_file, st->state);
+               /*
+                * Record statement start time if per-command latencies are
+                * requested
+                */
+               if (is_latencies)
+               {
+                   if (INSTR_TIME_IS_ZERO(now))
+                       INSTR_TIME_SET_CURRENT(now);
+                   st->stmt_begin = now;
+               }
 
-           if (debug)
-               fprintf(stderr, "client %d sending %s\n", st->id, name);
-           r = PQsendQueryPrepared(st->con, name, command->argc - 1,
-                                   params, NULL, NULL, 0);
-       }
-       else    /* unknown sql mode */
-           r = 0;
+               if (command->type == SQL_COMMAND)
+               {
+                   if (!sendCommand(st, command))
+                   {
+                       /*
+                        * Failed. Stay in CSTATE_START_COMMAND state, to
+                        * retry. ??? What the point or retrying? Should
+                        * rather abort?
+                        */
+                       return;
+                   }
+                   else
+                       st->state = CSTATE_WAIT_RESULT;
+               }
+               else if (command->type == META_COMMAND)
+               {
+                   int         argc = command->argc,
+                               i;
+                   char      **argv = command->argv;
 
-       if (r == 0)
-       {
-           if (debug)
-               fprintf(stderr, "client %d could not send %s\n",
-                       st->id, command->argv[0]);
-           st->ecnt++;
-       }
-       else
-           st->listen = true;  /* flags that should be listened */
-   }
-   else if (commands[st->state]->type == META_COMMAND)
-   {
-       int         argc = commands[st->state]->argc,
-                   i;
-       char      **argv = commands[st->state]->argv;
+                   if (debug)
+                   {
+                       fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
+                       for (i = 1; i < argc; i++)
+                           fprintf(stderr, " %s", argv[i]);
+                       fprintf(stderr, "\n");
+                   }
 
-       if (debug)
-       {
-           fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
-           for (i = 1; i < argc; i++)
-               fprintf(stderr, " %s", argv[i]);
-           fprintf(stderr, "\n");
-       }
+                   if (pg_strcasecmp(argv[0], "sleep") == 0)
+                   {
+                       /*
+                        * A \sleep doesn't execute anything, we just get the
+                        * delay from the argument, and enter the CSTATE_SLEEP
+                        * state.  (The per-command latency will be recorded
+                        * in CSTATE_SLEEP state, not here, after the delay
+                        * has elapsed.)
+                        */
+                       int         usec;
+
+                       if (!evaluateSleep(st, argc, argv, &usec))
+                       {
+                           commandFailed(st, "execution of meta-command 'sleep' failed");
+                           st->state = CSTATE_ABORTED;
+                           break;
+                       }
 
-       if (pg_strcasecmp(argv[0], "set") == 0)
-       {
-           PgBenchExpr *expr = commands[st->state]->expr;
-           PgBenchValue result;
+                       if (INSTR_TIME_IS_ZERO(now))
+                           INSTR_TIME_SET_CURRENT(now);
+                       st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
+                       st->state = CSTATE_SLEEP;
+                       break;
+                   }
+                   else
+                   {
+                       if (pg_strcasecmp(argv[0], "set") == 0)
+                       {
+                           PgBenchExpr *expr = command->expr;
+                           PgBenchValue result;
 
-           if (!evaluateExpr(thread, st, expr, &result))
-           {
-               st->ecnt++;
-               return true;
-           }
+                           if (!evaluateExpr(thread, st, expr, &result))
+                           {
+                               commandFailed(st, "evaluation of meta-command 'set' failed");
+                               st->state = CSTATE_ABORTED;
+                               break;
+                           }
 
-           if (!putVariableNumber(st, argv[0], argv[1], &result))
-           {
-               st->ecnt++;
-               return true;
-           }
+                           if (!putVariableNumber(st, argv[0], argv[1], &result))
+                           {
+                               commandFailed(st, "assignment of meta-command 'set' failed");
+                               st->state = CSTATE_ABORTED;
+                               break;
+                           }
+                       }
+                       else if (pg_strcasecmp(argv[0], "setshell") == 0)
+                       {
+                           bool        ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
 
-           st->listen = true;
-       }
-       else if (pg_strcasecmp(argv[0], "sleep") == 0)
-       {
-           char       *var;
-           int         usec;
-           instr_time  now;
+                           if (timer_exceeded) /* timeout */
+                           {
+                               st->state = CSTATE_FINISHED;
+                               break;
+                           }
+                           else if (!ret)      /* on error */
+                           {
+                               commandFailed(st, "execution of meta-command 'setshell' failed");
+                               st->state = CSTATE_ABORTED;
+                               break;
+                           }
+                           else
+                           {
+                               /* succeeded */
+                           }
+                       }
+                       else if (pg_strcasecmp(argv[0], "shell") == 0)
+                       {
+                           bool        ret = runShellCommand(st, NULL, argv + 1, argc - 1);
 
-           if (*argv[1] == ':')
-           {
-               if ((var = getVariable(st, argv[1] + 1)) == NULL)
+                           if (timer_exceeded) /* timeout */
+                           {
+                               st->state = CSTATE_FINISHED;
+                               break;
+                           }
+                           else if (!ret)      /* on error */
+                           {
+                               commandFailed(st, "execution of meta-command 'shell' failed");
+                               st->state = CSTATE_ABORTED;
+                               break;
+                           }
+                           else
+                           {
+                               /* succeeded */
+                           }
+                       }
+
+                       /*
+                        * executing the expression or shell command might
+                        * take a non-negligible amount of time, so reset
+                        * 'now'
+                        */
+                       INSTR_TIME_SET_ZERO(now);
+
+                       st->state = CSTATE_END_COMMAND;
+                   }
+               }
+               break;
+
+               /*
+                * Wait for the current SQL command to complete
+                */
+           case CSTATE_WAIT_RESULT:
+               command = sql_script[st->use_file].commands[st->command];
+               if (debug)
+                   fprintf(stderr, "client %d receiving\n", st->id);
+               if (!PQconsumeInput(st->con))
+               {               /* there's something wrong */
+                   commandFailed(st, "perhaps the backend died while processing");
+                   st->state = CSTATE_ABORTED;
+                   break;
+               }
+               if (PQisBusy(st->con))
+                   return;     /* don't have the whole result yet */
+
+               /*
+                * Read and discard the query result;
+                */
+               res = PQgetResult(st->con);
+               switch (PQresultStatus(res))
                {
-                   fprintf(stderr, "%s: undefined variable \"%s\"\n",
-                           argv[0], argv[1]);
-                   st->ecnt++;
-                   return true;
+                   case PGRES_COMMAND_OK:
+                   case PGRES_TUPLES_OK:
+                   case PGRES_EMPTY_QUERY:
+                       /* OK */
+                       PQclear(res);
+                       discard_response(st);
+                       st->state = CSTATE_END_COMMAND;
+                       break;
+                   default:
+                       commandFailed(st, PQerrorMessage(st->con));
+                       PQclear(res);
+                       st->state = CSTATE_ABORTED;
+                       break;
                }
-               usec = atoi(var);
-           }
-           else
-               usec = atoi(argv[1]);
+               break;
 
-           if (argc > 2)
-           {
-               if (pg_strcasecmp(argv[2], "ms") == 0)
-                   usec *= 1000;
-               else if (pg_strcasecmp(argv[2], "s") == 0)
-                   usec *= 1000000;
-           }
-           else
-               usec *= 1000000;
+               /*
+                * Wait until sleep is done. This state is entered after a
+                * \sleep metacommand. The behavior is similar to
+                * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
+                * instead of CSTATE_START_TX.
+                */
+           case CSTATE_SLEEP:
+               if (INSTR_TIME_IS_ZERO(now))
+                   INSTR_TIME_SET_CURRENT(now);
+               if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+                   return;     /* Still sleeping, nothing to do here */
+               /* Else done sleeping. */
+               st->state = CSTATE_END_COMMAND;
+               break;
 
-           INSTR_TIME_SET_CURRENT(now);
-           st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
-           st->sleeping = true;
+               /*
+                * End of command: record stats and proceed to next command.
+                */
+           case CSTATE_END_COMMAND:
 
-           st->listen = true;
-       }
-       else if (pg_strcasecmp(argv[0], "setshell") == 0)
-       {
-           bool        ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+               /*
+                * command completed: accumulate per-command execution times
+                * in thread-local data structure, if per-command latencies
+                * are requested.
+                */
+               if (is_latencies)
+               {
+                   if (INSTR_TIME_IS_ZERO(now))
+                       INSTR_TIME_SET_CURRENT(now);
 
-           if (timer_exceeded) /* timeout */
-               return clientDone(st);
-           else if (!ret)      /* on error */
-           {
-               st->ecnt++;
-               return true;
-           }
-           else    /* succeeded */
-               st->listen = true;
-       }
-       else if (pg_strcasecmp(argv[0], "shell") == 0)
-       {
-           bool        ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+                   /* XXX could use a mutex here, but we choose not to */
+                   command = sql_script[st->use_file].commands[st->command];
+                   addToSimpleStats(&command->stats,
+                                    INSTR_TIME_GET_DOUBLE(now) -
+                                    INSTR_TIME_GET_DOUBLE(st->stmt_begin));
+               }
 
-           if (timer_exceeded) /* timeout */
-               return clientDone(st);
-           else if (!ret)      /* on error */
-           {
-               st->ecnt++;
-               return true;
-           }
-           else    /* succeeded */
-               st->listen = true;
-       }
+               /* Go ahead with next command */
+               st->command++;
+               st->state = CSTATE_START_COMMAND;
+               break;
 
-       /* after a meta command, immediately proceed with next command */
-       goto top;
-   }
+               /*
+                * End of transaction.
+                */
+           case CSTATE_END_TX:
 
-   return true;
+               /*
+                * transaction finished: calculate latency and log the
+                * transaction
+                */
+               if (progress || throttle_delay || latency_limit ||
+                   per_script_stats || use_log)
+                   processXactStats(thread, st, &now, false, agg);
+               else
+                   thread->stats.cnt++;
+
+               if (is_connect)
+               {
+                   PQfinish(st->con);
+                   st->con = NULL;
+                   INSTR_TIME_SET_ZERO(now);
+               }
+
+               ++st->cnt;
+               if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+               {
+                   /* exit success */
+                   st->state = CSTATE_FINISHED;
+                   break;
+               }
+
+               /*
+                * No transaction is underway anymore.
+                */
+               st->state = CSTATE_CHOOSE_SCRIPT;
+
+               /*
+                * If we paced through all commands in the script in this
+                * loop, without returning to the caller even once, do it now.
+                * This gives the thread a chance to process other
+                * connections, and to do progress reporting.  This can
+                * currently only happen if the script consists entirely of
+                * meta-commands.
+                */
+               if (end_tx_processed)
+                   return;
+               else
+               {
+                   end_tx_processed = true;
+                   break;
+               }
+
+               /*
+                * Final states.  Close the connection if it's still open.
+                */
+           case CSTATE_ABORTED:
+           case CSTATE_FINISHED:
+               if (st->con != NULL)
+               {
+                   PQfinish(st->con);
+                   st->con = NULL;
+               }
+               return;
+       }
+   }
 }
 
 /*
@@ -4183,29 +4420,10 @@ threadRun(void *arg)
    initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
    last = aggs;
 
-   /* send start up queries in async manner */
+   /* initialize explicitely the state machines */
    for (i = 0; i < nstate; i++)
    {
-       CState     *st = &state[i];
-       int         prev_ecnt = st->ecnt;
-       Command   **commands;
-
-       st->use_file = chooseScript(thread);
-       commands = sql_script[st->use_file].commands;
-       if (debug)
-           fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
-                   sql_script[st->use_file].desc);
-       if (!doCustom(thread, st, &aggs))
-           remains--;          /* I've aborted */
-
-       if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
-       {
-           fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
-                   i, st->state);
-           remains--;          /* I've aborted */
-           PQfinish(st->con);
-           st->con = NULL;
-       }
+       state[i].state = CSTATE_CHOOSE_SCRIPT;
    }
 
    while (remains > 0)
@@ -4222,59 +4440,60 @@ threadRun(void *arg)
        for (i = 0; i < nstate; i++)
        {
            CState     *st = &state[i];
-           Command   **commands = sql_script[st->use_file].commands;
            int         sock;
 
-           if (st->con == NULL)
+           if (st->state == CSTATE_THROTTLE && timer_exceeded)
            {
+               /* interrupt client which has not started a transaction */
+               st->state = CSTATE_FINISHED;
+               remains--;
+               PQfinish(st->con);
+               st->con = NULL;
                continue;
            }
-           else if (st->sleeping)
+           else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
            {
-               if (st->throttling && timer_exceeded)
-               {
-                   /* interrupt client which has not started a transaction */
-                   remains--;
-                   st->sleeping = false;
-                   st->throttling = false;
-                   PQfinish(st->con);
-                   st->con = NULL;
-                   continue;
-               }
-               else    /* just a nap from the script */
-               {
-                   int         this_usec;
+               /* a nap from the script, or under throttling */
+               int         this_usec;
 
-                   if (min_usec == PG_INT64_MAX)
-                   {
-                       instr_time  now;
-
-                       INSTR_TIME_SET_CURRENT(now);
-                       now_usec = INSTR_TIME_GET_MICROSEC(now);
-                   }
+               if (min_usec == PG_INT64_MAX)
+               {
+                   instr_time  now;
 
-                   this_usec = st->txn_scheduled - now_usec;
-                   if (min_usec > this_usec)
-                       min_usec = this_usec;
+                   INSTR_TIME_SET_CURRENT(now);
+                   now_usec = INSTR_TIME_GET_MICROSEC(now);
                }
+
+               this_usec = (st->state == CSTATE_SLEEP ?
+                            st->sleep_until : st->txn_scheduled) - now_usec;
+               if (min_usec > this_usec)
+                   min_usec = this_usec;
            }
-           else if (commands[st->state]->type == META_COMMAND)
+           else if (st->state == CSTATE_WAIT_RESULT)
            {
-               min_usec = 0;   /* the connection is ready to run */
+               /*
+                * waiting for result from server - nothing to do unless the
+                * socket is readable
+                */
+               sock = PQsocket(st->con);
+               if (sock < 0)
+               {
+                   fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
+                   goto done;
+               }
+
+               FD_SET(sock, &input_mask);
+
+               if (maxsock < sock)
+                   maxsock = sock;
                break;
            }
-
-           sock = PQsocket(st->con);
-           if (sock < 0)
+           else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
            {
-               fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
-               goto done;
+               /* the connection is ready to run */
+               min_usec = 0;
+               break;
            }
-
-           FD_SET(sock, &input_mask);
-
-           if (maxsock < sock)
-               maxsock = sock;
        }
 
        /* also wake up to print the next progress report on time */
@@ -4324,14 +4543,13 @@ threadRun(void *arg)
            }
        }
 
-       /* ok, backend returns reply */
+       /* ok, advance the state machine of each connection */
        for (i = 0; i < nstate; i++)
        {
            CState     *st = &state[i];
-           Command   **commands = sql_script[st->use_file].commands;
-           int         prev_ecnt = st->ecnt;
+           bool        ready;
 
-           if (st->con)
+           if (st->state == CSTATE_WAIT_RESULT && st->con)
            {
                int         sock = PQsocket(st->con);
 
@@ -4341,21 +4559,19 @@ threadRun(void *arg)
                            PQerrorMessage(st->con));
                    goto done;
                }
-               if (FD_ISSET(sock, &input_mask) ||
-                   commands[st->state]->type == META_COMMAND)
-               {
-                   if (!doCustom(thread, st, &aggs))
-                       remains--;      /* I've aborted */
-               }
+
+               ready = FD_ISSET(sock, &input_mask);
            }
+           else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+               ready = false;
+           else
+               ready = true;
 
-           if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
+           if (ready)
            {
-               fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
-                       i, st->state);
-               remains--;      /* I've aborted */
-               PQfinish(st->con);
-               st->con = NULL;
+               doCustom(thread, st, &aggs);
+               if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+                   remains--;
            }
        }