shm_mq: Update mq_bytes_written less often.
authorRobert Haas <[email protected]>
Thu, 14 Oct 2021 20:06:43 +0000 (16:06 -0400)
committerRobert Haas <[email protected]>
Thu, 14 Oct 2021 20:13:36 +0000 (16:13 -0400)
Do not update shm_mq's mq_bytes_written until we have written
an amount of data greater than 1/4th of the ring size, unless
the caller of shm_mq_send(v) requests a flush at the end of
the message. This reduces the number of calls to SetLatch(),
and also the number of CPU cache misses, considerably, and thus
makes shm_mq significantly faster.

Dilip Kumar, reviewed by Zhihong Yu and Tomas Vondra. Some
minor cosmetic changes by me.

Discussion: http://postgr.es/m/CAFiTN-tVXqn_OG7tHNeSkBbN+iiCZTiQ83uakax43y1sQb2OBA@mail.gmail.com

src/backend/executor/tqueue.c
src/backend/libpq/pqmq.c
src/backend/storage/ipc/shm_mq.c
src/include/storage/shm_mq.h
src/test/modules/test_shm_mq/test.c
src/test/modules/test_shm_mq/worker.c

index 7af9fbe984891b7eb28fe0e7939cabd97cd44b87..eb0cbd7b217d3146f779ee7c5431135304072ef9 100644 (file)
@@ -60,7 +60,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 
        /* Send the tuple itself. */
        tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
-       result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
+       result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
 
        if (should_free)
                pfree(tuple);
index d1a1f47a7889d203b60c3dca1672241b297edc81..846494bf44c1fe981179c65e94035ccbfe2b0895 100644 (file)
@@ -154,7 +154,12 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 
        for (;;)
        {
-               result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+               /*
+                * Immediately notify the receiver by passing force_flush as true so
+                * that the shared memory value is updated before we send the parallel
+                * message signal right after this.
+                */
+               result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
 
                if (pq_mq_parallel_leader_pid != 0)
                        SendProcSignal(pq_mq_parallel_leader_pid,
index 91a7093e0335a5ae00ea2c8d0c1e4ae5f5859167..b4ce9629d4c3033565ba04da59837063278e54ca 100644 (file)
@@ -109,6 +109,12 @@ struct shm_mq
  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
  * the buffer, and mqh_buflen is the number of bytes allocated for it.
  *
+ * mqh_send_pending, is number of bytes that is written to the queue but not
+ * yet updated in the shared memory.  We will not update it until the written
+ * data is 1/4th of the ring size or the tuple queue is full.  This will
+ * prevent frequent CPU cache misses, and it will also avoid frequent
+ * SetLatch() calls, which are quite expensive.
+ *
  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
  * are used to track the state of non-blocking operations.  When the caller
  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
@@ -137,6 +143,7 @@ struct shm_mq_handle
        char       *mqh_buffer;
        Size            mqh_buflen;
        Size            mqh_consume_pending;
+       Size            mqh_send_pending;
        Size            mqh_partial_bytes;
        Size            mqh_expected_bytes;
        bool            mqh_length_word_complete;
@@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
        mqh->mqh_buffer = NULL;
        mqh->mqh_buflen = 0;
        mqh->mqh_consume_pending = 0;
+       mqh->mqh_send_pending = 0;
        mqh->mqh_partial_bytes = 0;
        mqh->mqh_expected_bytes = 0;
        mqh->mqh_length_word_complete = false;
@@ -319,14 +327,15 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
  * Write a message into a shared message queue.
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
+                       bool force_flush)
 {
        shm_mq_iovec iov;
 
        iov.data = data;
        iov.len = nbytes;
 
-       return shm_mq_sendv(mqh, &iov, 1, nowait);
+       return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
 }
 
 /*
@@ -343,9 +352,15 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
  * arguments, each time the process latch is set.  (Once begun, the sending
  * of a message cannot be aborted except by detaching from the queue; changing
  * the length or payload will corrupt the queue.)
+ *
+ * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
+ * and notify the receiver (if it is already attached).  Otherwise, we don't
+ * update it until we have written an amount of data greater than 1/4th of the
+ * ring size.
  */
 shm_mq_result
-shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
+                        bool force_flush)
 {
        shm_mq_result res;
        shm_mq     *mq = mqh->mqh_queue;
@@ -518,8 +533,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
                mqh->mqh_counterparty_attached = true;
        }
 
-       /* Notify receiver of the newly-written data, and return. */
-       SetLatch(&receiver->procLatch);
+       /*
+        * If the caller has requested force flush or we have written more than 1/4
+        * of the ring size, mark it as written in shared memory and notify the
+        * receiver.
+        */
+       if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
+       {
+               shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+               SetLatch(&receiver->procLatch);
+               mqh->mqh_send_pending = 0;
+       }
+
        return SHM_MQ_SUCCESS;
 }
 
@@ -816,6 +841,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
 void
 shm_mq_detach(shm_mq_handle *mqh)
 {
+       /* Before detaching, notify the receiver about any already-written data. */
+       if (mqh->mqh_send_pending > 0)
+       {
+               shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
+               mqh->mqh_send_pending = 0;
+       }
+
        /* Notify counterparty that we're outta here. */
        shm_mq_detach_internal(mqh->mqh_queue);
 
@@ -894,7 +926,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 
                /* Compute number of ring buffer bytes used and available. */
                rb = pg_atomic_read_u64(&mq->mq_bytes_read);
-               wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+               wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
                Assert(wb >= rb);
                used = wb - rb;
                Assert(used <= ringsize);
@@ -951,6 +983,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
                }
                else if (available == 0)
                {
+                       /* Update the pending send bytes in the shared memory. */
+                       shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+
                        /*
                         * Since mq->mqh_counterparty_attached is known to be true at this
                         * point, mq_receiver has been set, and it can't change once set.
@@ -959,6 +994,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
                        Assert(mqh->mqh_counterparty_attached);
                        SetLatch(&mq->mq_receiver->procLatch);
 
+                       /*
+                        * We have just updated the mqh_send_pending bytes in the shared
+                        * memory so reset it.
+                        */
+                       mqh->mqh_send_pending = 0;
+
                        /* Skip manipulation of our latch if nowait = true. */
                        if (nowait)
                        {
@@ -1009,13 +1050,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
                         * MAXIMUM_ALIGNOF, and each read is as well.
                         */
                        Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
-                       shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
 
                        /*
-                        * For efficiency, we don't set the reader's latch here.  We'll do
-                        * that only when the buffer fills up or after writing an entire
-                        * message.
+                        * For efficiency, we don't update the bytes written in the shared
+                        * memory and also don't set the reader's latch here.  Refer to
+                        * the comments atop the shm_mq_handle structure for more
+                        * information.
                         */
+                       mqh->mqh_send_pending += MAXALIGN(sendnow);
                }
        }
 
index e693f3f7600904d2dd02def22a106e20bd6a38fb..cb1c555656ccefd90b4e84217a99ae37a17caeec 100644 (file)
@@ -70,11 +70,13 @@ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-                                                                Size nbytes, const void *data, bool nowait);
-extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
-                                                                 shm_mq_iovec *iov, int iovcnt, bool nowait);
+                                                                Size nbytes, const void *data, bool nowait,
+                                                                bool force_flush);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov,
+                                                                 int iovcnt, bool nowait, bool force_flush);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
                                                                        Size *nbytesp, void **datap, bool nowait);
+extern void shm_mq_flush(shm_mq_handle *mqh);
 
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
index 2d8d695f97a7a515bd1c380a0b92ca9a32d6b110..be074f08a31b0ef004b0e5799823350edb864eed 100644 (file)
@@ -73,7 +73,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
        test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
 
        /* Send the initial message. */
-       res = shm_mq_send(outqh, message_size, message_contents, false);
+       res = shm_mq_send(outqh, message_size, message_contents, false, true);
        if (res != SHM_MQ_SUCCESS)
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -97,7 +97,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
                        break;
 
                /* Send it back out. */
-               res = shm_mq_send(outqh, len, data, false);
+               res = shm_mq_send(outqh, len, data, false, true);
                if (res != SHM_MQ_SUCCESS)
                        ereport(ERROR,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -177,7 +177,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS)
                 */
                if (send_count < loop_count)
                {
-                       res = shm_mq_send(outqh, message_size, message_contents, true);
+                       res = shm_mq_send(outqh, message_size, message_contents, true,
+                                                         true);
                        if (res == SHM_MQ_SUCCESS)
                        {
                                ++send_count;
index 2180776a669a185bb6e135df71414f0d65e9893b..9b037b98fe7bc2b6f1101f5177095a22ed80cd92 100644 (file)
@@ -190,7 +190,7 @@ copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
                        break;
 
                /* Send it back out. */
-               res = shm_mq_send(outqh, len, data, false);
+               res = shm_mq_send(outqh, len, data, false, true);
                if (res != SHM_MQ_SUCCESS)
                        break;
        }