shm-mq-reduce-receiver-latch-set-v2 gather
authorRobert Haas <[email protected]>
Sat, 4 Nov 2017 18:03:03 +0000 (19:03 +0100)
committerRobert Haas <[email protected]>
Thu, 25 Jan 2018 17:08:21 +0000 (12:08 -0500)
src/backend/storage/ipc/shm_mq.c

index 0a2776e37a0ce822cfec011ca1996225cded3074..3bcc07e6cb5710ab4ced51285ffb4858bd627db7 100644 (file)
@@ -143,10 +143,10 @@ struct shm_mq_handle
 };
 
 static void shm_mq_detach_internal(shm_mq *mq);
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
                                  const void *data, bool nowait, Size *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
-                                        bool nowait, Size *nbytesp, void **datap);
+static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
+                                        Size bytes_needed, bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
                                                 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
@@ -586,8 +586,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                mqh->mqh_counterparty_attached = true;
        }
 
-       /* Consume any zero-copy data from previous receive operation. */
-       if (mqh->mqh_consume_pending > 0)
+       /*
+        * If we've consumed an amount of data greater than 1/4th of the ring
+        * size, mark it consumed in shared memory.  We try to avoid doing this
+        * unnecessarily when only a small amount of data has been consumed,
+        * because SetLatch() is fairly expensive and we don't want to do it too
+        * often.
+        */
+       if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
        {
                shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
                mqh->mqh_consume_pending = 0;
@@ -598,7 +604,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
        {
                /* Try to receive the message length word. */
                Assert(mqh->mqh_partial_bytes < sizeof(Size));
-               res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+               res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
                                                                   nowait, &rb, &rawdata);
                if (res != SHM_MQ_SUCCESS)
                        return res;
@@ -618,13 +624,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                        needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
                        if (rb >= needed)
                        {
-                               /*
-                                * Technically, we could consume the message length
-                                * information at this point, but the extra write to shared
-                                * memory wouldn't be free and in most cases we would reap no
-                                * benefit.
-                                */
-                               mqh->mqh_consume_pending = needed;
+                               mqh->mqh_consume_pending += needed;
                                *nbytesp = nbytes;
                                *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
                                return SHM_MQ_SUCCESS;
@@ -636,7 +636,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                         */
                        mqh->mqh_expected_bytes = nbytes;
                        mqh->mqh_length_word_complete = true;
-                       shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+                       mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
                        rb -= MAXALIGN(sizeof(Size));
                }
                else
@@ -655,7 +655,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                        }
                        Assert(mqh->mqh_buflen >= sizeof(Size));
 
-                       /* Copy and consume partial length word. */
+                       /* Copy partial length word; remember to consume it. */
                        if (mqh->mqh_partial_bytes + rb > sizeof(Size))
                                lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
                        else
@@ -663,7 +663,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                        memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
                                   lengthbytes);
                        mqh->mqh_partial_bytes += lengthbytes;
-                       shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+                       mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
                        rb -= lengthbytes;
 
                        /* If we now have the whole word, we're ready to read payload. */
@@ -685,13 +685,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                 * we need not copy the data and can return a pointer directly into
                 * shared memory.
                 */
-               res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
+               res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
                if (res != SHM_MQ_SUCCESS)
                        return res;
                if (rb >= nbytes)
                {
                        mqh->mqh_length_word_complete = false;
-                       mqh->mqh_consume_pending = MAXALIGN(nbytes);
+                       mqh->mqh_consume_pending += MAXALIGN(nbytes);
                        *nbytesp = nbytes;
                        *datap = rawdata;
                        return SHM_MQ_SUCCESS;
@@ -731,13 +731,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                mqh->mqh_partial_bytes += rb;
 
                /*
-                * Update count of bytes read, with alignment padding.  Note that this
-                * will never actually insert any padding except at the end of a
-                * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
-                * and each read and write is as well.
+                * Update count of bytes that can be consumed, accounting for
+                * alignment padding.  Note that this will never actually insert any
+                * padding except at the end of a message, because the buffer size is
+                * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
                 */
                Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
-               shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
+               mqh->mqh_consume_pending += MAXALIGN(rb);
 
                /* If we got all the data, exit the loop. */
                if (mqh->mqh_partial_bytes >= nbytes)
@@ -745,7 +745,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
                /* Wait for some more data. */
                still_needed = nbytes - mqh->mqh_partial_bytes;
-               res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
+               res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
                if (res != SHM_MQ_SUCCESS)
                        return res;
                if (rb > still_needed)
@@ -1010,9 +1010,10 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
  * is SHM_MQ_SUCCESS.
  */
 static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
                                         Size *nbytesp, void **datap)
 {
+       shm_mq     *mq = mqh->mqh_queue;
        Size            ringsize = mq->mq_ring_size;
        uint64          used;
        uint64          written;
@@ -1024,7 +1025,13 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 
                /* Get bytes written, so we can compute what's available to read. */
                written = pg_atomic_read_u64(&mq->mq_bytes_written);
-               read = pg_atomic_read_u64(&mq->mq_bytes_read);
+
+               /*
+                * Get bytes read.  Include bytes we could consume but have not yet
+                * consumed.
+                */
+               read = pg_atomic_read_u64(&mq->mq_bytes_read) +
+                       mqh->mqh_consume_pending;
                used = written - read;
                Assert(used <= ringsize);
                offset = read % (uint64) ringsize;
@@ -1055,6 +1062,16 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
                if (mq->mq_detached)
                        return SHM_MQ_DETACHED;
 
+               /*
+                * We didn't get enough data to satisfy the request, so mark any data
+                * previously-consumed as read to make more buffer space.
+                */
+               if (mqh->mqh_consume_pending > 0)
+               {
+                       shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+                       mqh->mqh_consume_pending = 0;
+               }
+
                /* Skip manipulation of our latch if nowait = true. */
                if (nowait)
                        return SHM_MQ_WOULD_BLOCK;