Add --rate option.
authorTatsuo Ishii <[email protected]>
Mon, 22 Jul 2013 23:40:22 +0000 (08:40 +0900)
committerTatsuo Ishii <[email protected]>
Mon, 22 Jul 2013 23:40:22 +0000 (08:40 +0900)
This controls the target transaction rate to certain tps, rather than
maximum. Patch contributed by Fabien COELHO, reviewed by Greg Smith,
and slight editing by me.

contrib/pgbench/pgbench.c
doc/src/sgml/pgbench.sgml

index 2ad8f0bb5b48411e2f8c5b5be76708f4a98768ac..ad8e272c9109c755a67f32f6318b745a17fe4d10 100644 (file)
@@ -136,6 +136,12 @@ int                        unlogged_tables = 0;
  */
 double         sample_rate = 0.0;
 
+/*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64          throttle_delay = 0;
+
 /*
  * tablespace selection
  */
@@ -202,11 +208,13 @@ typedef struct
        int                     listen;                 /* 0 indicates that an async query has been
                                                                 * sent */
        int                     sleeping;               /* 1 indicates that the client is napping */
+       bool        throttling;     /* whether nap is for throttling */
        int64           until;                  /* napping until (usec) */
        Variable   *variables;          /* array of variable definitions */
        int                     nvariables;
        instr_time      txn_begin;              /* used for measuring transaction latencies */
        instr_time      stmt_begin;             /* used for measuring statement latencies */
+       bool            is_throttled;   /* whether transaction throttling is done */
        int                     use_file;               /* index in sql_files for this client */
        bool            prepared[MAX_FILES];
 } CState;
@@ -224,6 +232,9 @@ typedef struct
        instr_time *exec_elapsed;       /* time spent executing cmds (per Command) */
        int                *exec_count;         /* number of cmd executions (per Command) */
        unsigned short random_state[3];         /* separate randomness for each thread */
+       int64       throttle_trigger;   /* previous/next throttling (us) */
+       int64       throttle_lag;               /* total transaction lag behind throttling */
+       int64       throttle_lag_max;   /* max transaction lag */
 } TState;
 
 #define INVALID_THREAD         ((pthread_t) 0)
@@ -232,6 +243,8 @@ typedef struct
 {
        instr_time      conn_time;
        int                     xacts;
+       int64       throttle_lag;
+       int64       throttle_lag_max;
 } TResult;
 
 /*
@@ -356,6 +369,7 @@ usage(void)
                   "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
                   "  -P, --progress=NUM       show thread progress report every NUM seconds\n"
                   "  -r, --report-latencies   report average latency per command\n"
+                  "  -R, --rate=SPEC          target rate in transactions per second\n"
                   "  -s, --scale=NUM          report this scale factor in output\n"
                   "  -S, --select-only        perform SELECT-only transactions\n"
                   "  -t, --transactions       number of transactions each client runs "
@@ -898,17 +912,62 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
 {
        PGresult   *res;
        Command   **commands;
+       bool        trans_needs_throttle = false;
 
 top:
        commands = sql_files[st->use_file];
 
+       /*
+        * Handle throttling once per transaction by sleeping.  It is simpler
+        * to do this here rather than at the end, because so much complicated
+        * logic happens below when statements finish.
+        */
+       if (throttle_delay && ! st->is_throttled)
+       {
+               /*
+                * Use inverse transform sampling to randomly generate a delay, such
+                * that the series of delays will approximate a Poisson distribution
+                * centered on the throttle_delay time.
+                 *
+                 * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier.
+                *
+                * If transactions are too slow or a given wait is shorter than
+                * a transaction, the next transaction will start right away.
+                */
+               int64 wait = (int64)
+                       throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+               thread->throttle_trigger += wait;
+
+               st->until = thread->throttle_trigger;
+               st->sleeping = 1;
+               st->throttling = true;
+               st->is_throttled = true;
+               if (debug)
+                       fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+                                       st->id, wait);
+       }
+
        if (st->sleeping)
        {                                                       /* are we sleeping? */
                instr_time      now;
+               int64 now_us;
 
                INSTR_TIME_SET_CURRENT(now);
-               if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+               now_us = INSTR_TIME_GET_MICROSEC(now);
+               if (st->until <= now_us)
+               {
                        st->sleeping = 0;       /* Done sleeping, go ahead with next command */
+                       if (st->throttling)
+                       {
+                               /* Measure lag of throttled transaction relative to target */
+                               int64 lag = now_us - st->until;
+                               thread->throttle_lag += lag;
+                               if (lag > thread->throttle_lag_max)
+                                       thread->throttle_lag_max = lag;
+                               st->throttling = false;
+                       }
+               }
                else
                        return true;            /* Still sleeping, nothing to do here */
        }
@@ -1095,6 +1154,15 @@ top:
                        st->state = 0;
                        st->use_file = (int) getrand(thread, 0, num_files - 1);
                        commands = sql_files[st->use_file];
+                       st->is_throttled = false;
+                       /*
+                        * No transaction is underway anymore, which means there is nothing
+                        * to listen to right now.  When throttling rate limits are active,
+                        * a sleep will happen next, as the next transaction starts.  And
+                        * then in any case the next SQL command will set listen back to 1.
+                        */
+                       st->listen = 0;
+                       trans_needs_throttle = (throttle_delay>0);
                }
        }
 
@@ -1113,6 +1181,16 @@ top:
                INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
        }
 
+       /*
+         * This ensures that a throttling delay is inserted before proceeding
+         * with sql commands, after the first transaction. The first transaction
+         * throttling is performed when first entering doCustom.
+        */
+       if (trans_needs_throttle) {
+               trans_needs_throttle = false;
+               goto top;
+       }
+
        /* Record transaction start time if logging is enabled */
        if (logfile && st->state == 0)
                INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -2017,7 +2095,8 @@ process_builtin(char *tb)
 static void
 printResults(int ttype, int normal_xacts, int nclients,
                         TState *threads, int nthreads,
-                        instr_time total_time, instr_time conn_total_time)
+                        instr_time total_time, instr_time conn_total_time,
+                        int64 throttle_lag, int64 throttle_lag_max)
 {
        double          time_include,
                                tps_include,
@@ -2055,6 +2134,19 @@ printResults(int ttype, int normal_xacts, int nclients,
                printf("number of transactions actually processed: %d\n",
                           normal_xacts);
        }
+
+       if (throttle_delay)
+       {
+               /*
+                * Report average transaction lag under rate limit throttling.  This
+                * is the delay between scheduled and actual start times for the
+                * transaction.  The measured lag may be caused by thread/client load,
+                * the database load, or the Poisson throttling process.
+                */
+               printf("average rate limit schedule lag: %.3f ms (max %.3f ms)\n",
+                          0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+       }
+
        printf("tps = %f (including connections establishing)\n", tps_include);
        printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 
@@ -2140,6 +2232,7 @@ main(int argc, char **argv)
                {"unlogged-tables", no_argument, &unlogged_tables, 1},
                {"sampling-rate", required_argument, NULL, 4},
                {"aggregate-interval", required_argument, NULL, 5},
+               {"rate", required_argument, NULL, 'R'},
                {NULL, 0, NULL, 0}
        };
 
@@ -2162,6 +2255,8 @@ main(int argc, char **argv)
        instr_time      total_time;
        instr_time      conn_total_time;
        int                     total_xacts;
+       int64       throttle_lag = 0;
+       int64       throttle_lag_max = 0;
 
        int                     i;
 
@@ -2206,7 +2301,7 @@ main(int argc, char **argv)
        state = (CState *) pg_malloc(sizeof(CState));
        memset(state, 0, sizeof(CState));
 
-       while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:", long_options, &optindex)) != -1)
+       while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
        {
                switch (c)
                {
@@ -2371,6 +2466,19 @@ main(int argc, char **argv)
                                        exit(1);
                                }
                                break;
+                       case 'R':
+                       {
+                               /* get a double from the beginning of option value */
+                               double throttle_value = atof(optarg);
+                               if (throttle_value <= 0.0)
+                               {
+                                       fprintf(stderr, "invalid rate limit: %s\n", optarg);
+                                       exit(1);
+                               }
+                               /* Invert rate limit into a time offset */
+                               throttle_delay = (int64) (1000000.0 / throttle_value);
+                       }
+                               break;
                        case 0:
                                /* This covers long options which take no argument. */
                                break;
@@ -2408,6 +2516,9 @@ main(int argc, char **argv)
                }
        }
 
+    /* compute a per thread delay */
+       throttle_delay *= nthreads;
+
        if (argc > optind)
                dbName = argv[optind];
        else
@@ -2721,6 +2832,9 @@ main(int argc, char **argv)
                        TResult    *r = (TResult *) ret;
 
                        total_xacts += r->xacts;
+                       throttle_lag += r->throttle_lag;
+                       if (r->throttle_lag_max > throttle_lag_max)
+                               throttle_lag_max = r->throttle_lag_max;
                        INSTR_TIME_ADD(conn_total_time, r->conn_time);
                        free(ret);
                }
@@ -2731,7 +2845,7 @@ main(int argc, char **argv)
        INSTR_TIME_SET_CURRENT(total_time);
        INSTR_TIME_SUBTRACT(total_time, start_time);
        printResults(ttype, total_xacts, nclients, threads, nthreads,
-                                total_time, conn_total_time);
+                                total_time, conn_total_time, throttle_lag, throttle_lag_max);
 
        return 0;
 }
@@ -2756,6 +2870,17 @@ threadRun(void *arg)
 
        AggVals         aggs;
 
+       /*
+        * Initialize throttling rate target for all of the thread's clients.  It
+        * might be a little more accurate to reset thread->start_time here too.
+        * The possible drift seems too small relative to typical throttle delay
+        * times to worry about it.
+        */
+       INSTR_TIME_SET_CURRENT(start);
+       thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+       thread->throttle_lag = 0;
+       thread->throttle_lag_max = 0;
+
        result = pg_malloc(sizeof(TResult));
 
        INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2831,25 +2956,38 @@ threadRun(void *arg)
                        Command   **commands = sql_files[st->use_file];
                        int                     sock;
 
-                       if (st->sleeping)
+                       if (st->con == NULL)
                        {
-                               int                     this_usec;
-
-                               if (min_usec == INT64_MAX)
+                               continue;
+                       }
+                       else if (st->sleeping)
+                       {
+                               if (st->throttling && timer_exceeded)
                                {
-                                       instr_time      now;
-
-                                       INSTR_TIME_SET_CURRENT(now);
-                                       now_usec = INSTR_TIME_GET_MICROSEC(now);
+                                       /* interrupt client which has not started a transaction */
+                                       remains--;
+                                       st->sleeping = 0;
+                                       st->throttling = false;
+                                       PQfinish(st->con);
+                                       st->con = NULL;
+                                       continue;
                                }
+                               else /* just a nap from the script */
+                               {
+                                       int                     this_usec;
 
-                               this_usec = st->until - now_usec;
-                               if (min_usec > this_usec)
-                                       min_usec = this_usec;
-                       }
-                       else if (st->con == NULL)
-                       {
-                               continue;
+                                       if (min_usec == INT64_MAX)
+                                       {
+                                               instr_time      now;
+
+                                               INSTR_TIME_SET_CURRENT(now);
+                                               now_usec = INSTR_TIME_GET_MICROSEC(now);
+                                       }
+
+                                       this_usec = st->until - now_usec;
+                                       if (min_usec > this_usec)
+                                               min_usec = this_usec;
+                               }
                        }
                        else if (commands[st->state]->type == META_COMMAND)
                        {
@@ -2986,6 +3124,8 @@ done:
        result->xacts = 0;
        for (i = 0; i < nstate; i++)
                result->xacts += state[i].cnt;
+       result->throttle_lag = thread->throttle_lag;
+       result->throttle_lag_max = thread->throttle_lag_max;
        INSTR_TIME_SET_CURRENT(end);
        INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
        if (logfile)
index 62555e11a25d3fc44b485872688f2f8e36a816ee..49a79b194efd6d70dbdaeb7e03c6d3c9080631b5 100644 (file)
@@ -409,7 +409,7 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
 
      <varlistentry>
       <term><option>-P</option> <replaceable>sec</></term>
-      <term><option>--progress=</option> <replaceable>sec</></term>
+      <term><option>--progress=</option><replaceable>sec</></term>
       <listitem>
        <para>
         Show progress report every <literal>sec</> seconds.
@@ -417,6 +417,52 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-R</option> <replaceable>rate</></term>
+      <term><option>--rate=</option><replaceable>rate</></term>
+      <listitem>
+       <para>
+        Execute transactions targeting the specified rate instead of running
+        as fast as possible (the default).  The rate is given in transactions
+        per second.  If the targeted rate is above the maximum possible rate,
+        the rate limit won't impact the results.
+       </para>
+       <para>
+        The rate is targeted by starting transactions along a
+        Poisson-distributed schedule time line.  The expected finish time
+        schedule moves forward based on when the client first started, not
+        when the previous transaction ended.  That approach means that when
+        transactions go past their original scheduled end time, it is
+        possible for later ones to catch up again.
+       </para>
+       <para>        
+        When throttling is active, the average and maximum transaction
+        schedule lag time are reported in ms.  This is the delay between
+        the original scheduled transaction time and the actual transaction
+        start times.  The schedule lag shows whether a transaction was
+        started on time or late.  Once a client starts running behind its
+        schedule, every following transaction can continue to be penalized
+        for schedule lag.  If faster transactions are able to catch up, it's
+        possible for them to get back on schedule again.  The lag measurement
+        of every transaction is shown when pgbench is run with debugging
+        output.
+       </para>
+       <para>
+        High rate limit schedule lag values, that is lag values that are large
+        compared to the actual transaction latency, indicate that something is
+        amiss in the throttling process.  High schedule lag can highlight a subtle
+        problem there even if the target rate limit is met in the end.  One
+        possible cause of schedule lag is insufficient pgbench threads to
+        handle all of the clients.  To improve that, consider reducing the
+        number of clients, increasing the number of threads in pgbench, or
+        running pgbench on a separate host.  Another possibility is that the
+        database is not keeping up with the load at some point.  When that
+        happens, you will have to reduce the expected transaction rate to
+        lower schedule lag.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-s</option> <replaceable>scale_factor</></term>
       <term><option>--scale=</option><replaceable>scale_factor</></term>