WIP: pgbench pgxact-split
authorAndres Freund <[email protected]>
Tue, 7 Apr 2020 04:28:55 +0000 (21:28 -0700)
committerAndres Freund <[email protected]>
Tue, 7 Apr 2020 04:28:55 +0000 (21:28 -0700)
src/bin/pgbench/pgbench.c

index e99af8016752dccbe73a7bf427a93351c112f1e5..21d1ab2aac1028a611a4ac55b80c718259da905e 100644 (file)
@@ -310,6 +310,10 @@ typedef struct RandomState
 /* Various random sequences are initialized from this one. */
 static RandomState base_random_sequence;
 
+#ifdef ENABLE_THREAD_SAFETY
+pthread_barrier_t conn_barrier;
+#endif
+
 /*
  * Connection state machine states.
  */
@@ -5206,6 +5210,10 @@ printResults(StatsData *total, instr_time total_time,
    tps_exclude = ntx /
        (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
 
+   //fprintf(stderr, "time: include: %f, exclude: %f, conn total: %f\n",
+   //      time_include, time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients),
+   //      INSTR_TIME_GET_DOUBLE(conn_total_time));
+
    /* Report test parameters. */
    printf("transaction type: %s\n",
           num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
@@ -6126,26 +6134,14 @@ main(int argc, char **argv)
    /* all clients must be assigned to a thread */
    Assert(nclients_dealt == nclients);
 
-   /* get start up time */
-   INSTR_TIME_SET_CURRENT(start_time);
-
-   /* set alarm if duration is specified. */
-   if (duration > 0)
-       setalarm(duration);
-
    /* start threads */
 #ifdef ENABLE_THREAD_SAFETY
+   pthread_barrier_init(&conn_barrier, NULL, nthreads);
+
    for (i = 0; i < nthreads; i++)
    {
        TState     *thread = &threads[i];
 
-       INSTR_TIME_SET_CURRENT(thread->start_time);
-
-       /* compute when to stop */
-       if (duration > 0)
-           end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
-               (int64) 1000000 * duration;
-
        /* the first thread (i = 0) is executed by main thread */
        if (i > 0)
        {
@@ -6162,13 +6158,38 @@ main(int argc, char **argv)
            thread->thread = INVALID_THREAD;
        }
    }
-#else
-   INSTR_TIME_SET_CURRENT(threads[0].start_time);
-   /* compute when to stop */
+#endif                         /* ENABLE_THREAD_SAFETY */
+
+#ifdef ENABLE_THREAD_SAFETY
+   /* wait till all threads started (threads wait in threadRun()) */
+   //fprintf(stderr, "andres: waiting for thread start: %u\n", threads[0].tid);
+   pthread_barrier_wait(&conn_barrier);
+#endif                         /* ENABLE_THREAD_SAFETY */
+
+   /* get start up time */
+   INSTR_TIME_SET_CURRENT(start_time);
+
+   /* */
+   for (i = 0; i < nthreads; i++)
+   {
+       TState     *thread = &threads[i];
+
+       thread->start_time = start_time;
+
+       /* compute when to stop */
+       if (duration > 0)
+           end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
+               (int64) 1000000 * duration;
+   }
+
+   /* set alarm if duration is specified. */
    if (duration > 0)
-       end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
-           (int64) 1000000 * duration;
-   threads[0].thread = INVALID_THREAD;
+       setalarm(duration);
+
+#ifdef ENABLE_THREAD_SAFETY
+   /* updated start time (threads wait in threadRun()) */
+   //fprintf(stderr, "andres: %u: waiting for start time\n", threads[0].tid);
+   pthread_barrier_wait(&conn_barrier);
 #endif                         /* ENABLE_THREAD_SAFETY */
 
    /* wait for threads and accumulate results */
@@ -6236,12 +6257,30 @@ threadRun(void *arg)
    int         i;
 
    /* for reporting progress: */
-   int64       thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
-   int64       last_report = thread_start;
-   int64       next_report = last_report + (int64) progress * 1000000;
+   int64       thread_start;
+   int64       last_report;
+   int64       next_report;
    StatsData   last,
                aggs;
 
+   /* wait till all threads started (main waits outside) */
+   if (thread->tid != 0)
+   {
+       //fprintf(stderr, "andres: %u: waiting for thread start\n", thread->tid);
+       pthread_barrier_wait(&conn_barrier);
+   }
+
+   /* wait for start time to be initialized (main waits outside) */
+   if (thread->tid != 0)
+   {
+       //fprintf(stderr, "andres: %u: waiting for start time\n", thread->tid);
+       pthread_barrier_wait(&conn_barrier);
+   }
+
+   thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
+   last_report = thread_start;
+   next_report = last_report + (int64) progress * 1000000;
+
    /*
     * Initialize throttling rate target for all of the thread's clients.  It
     * might be a little more accurate to reset thread->start_time here too.
@@ -6288,7 +6327,27 @@ threadRun(void *arg)
 
    /* time after thread and connections set up */
    INSTR_TIME_SET_CURRENT(thread->conn_time);
-   INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
+   INSTR_TIME_SUBTRACT(thread->conn_time, start);
+
+   //  e = thread->conn_time;
+   //fprintf(stderr, "andres: %u: connection established in %f (s %f, e %f)\n",
+   //      thread->tid, INSTR_TIME_GET_DOUBLE(thread->conn_time),
+   //      INSTR_TIME_GET_DOUBLE(e),
+   //      INSTR_TIME_GET_DOUBLE(start));
+
+   /* add once for each other connection */
+   if (!is_connect)
+   {
+       instr_time e = thread->conn_time;
+       for (i = 0; i < (nstate - 1); i++)
+       {
+           INSTR_TIME_ADD(thread->conn_time, e);
+       }
+   }
+
+   /* wait for all connections to be established */
+   //fprintf(stderr, "andres: %u: waiting for connection establishment\n", thread->tid);
+   pthread_barrier_wait(&conn_barrier);
 
    /* explicitly initialize the state machines */
    for (i = 0; i < nstate; i++)