From: Robert Haas Date: Sat, 4 Nov 2017 18:03:03 +0000 (+0100) Subject: shm-mq-reduce-receiver-latch-set-v2 X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fgather;p=users%2Frhaas%2Fpostgres.git shm-mq-reduce-receiver-latch-set-v2 --- diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 0a2776e37a..3bcc07e6cb 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -143,10 +143,10 @@ struct shm_mq_handle }; static void shm_mq_detach_internal(shm_mq *mq); -static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, +static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, Size *bytes_written); -static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, - bool nowait, Size *nbytesp, void **datap); +static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, + Size bytes_needed, bool nowait, Size *nbytesp, void **datap); static bool shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle); static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr, @@ -586,8 +586,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) mqh->mqh_counterparty_attached = true; } - /* Consume any zero-copy data from previous receive operation. */ - if (mqh->mqh_consume_pending > 0) + /* + * If we've consumed an amount of data greater than 1/4th of the ring + * size, mark it consumed in shared memory. We try to avoid doing this + * unnecessarily when only a small amount of data has been consumed, + * because SetLatch() is fairly expensive and we don't want to do it too + * often. + */ + if (mqh->mqh_consume_pending > mq->mq_ring_size / 4) { shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); mqh->mqh_consume_pending = 0; @@ -598,7 +604,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) { /* Try to receive the message length word. */ Assert(mqh->mqh_partial_bytes < sizeof(Size)); - res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes, + res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, nowait, &rb, &rawdata); if (res != SHM_MQ_SUCCESS) return res; @@ -618,13 +624,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes); if (rb >= needed) { - /* - * Technically, we could consume the message length - * information at this point, but the extra write to shared - * memory wouldn't be free and in most cases we would reap no - * benefit. - */ - mqh->mqh_consume_pending = needed; + mqh->mqh_consume_pending += needed; *nbytesp = nbytes; *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size)); return SHM_MQ_SUCCESS; @@ -636,7 +636,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) */ mqh->mqh_expected_bytes = nbytes; mqh->mqh_length_word_complete = true; - shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size))); + mqh->mqh_consume_pending += MAXALIGN(sizeof(Size)); rb -= MAXALIGN(sizeof(Size)); } else @@ -655,7 +655,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) } Assert(mqh->mqh_buflen >= sizeof(Size)); - /* Copy and consume partial length word. */ + /* Copy partial length word; remember to consume it. */ if (mqh->mqh_partial_bytes + rb > sizeof(Size)) lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes; else @@ -663,7 +663,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, lengthbytes); mqh->mqh_partial_bytes += lengthbytes; - shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes)); + mqh->mqh_consume_pending += MAXALIGN(lengthbytes); rb -= lengthbytes; /* If we now have the whole word, we're ready to read payload. */ @@ -685,13 +685,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) * we need not copy the data and can return a pointer directly into * shared memory. */ - res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata); + res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata); if (res != SHM_MQ_SUCCESS) return res; if (rb >= nbytes) { mqh->mqh_length_word_complete = false; - mqh->mqh_consume_pending = MAXALIGN(nbytes); + mqh->mqh_consume_pending += MAXALIGN(nbytes); *nbytesp = nbytes; *datap = rawdata; return SHM_MQ_SUCCESS; @@ -731,13 +731,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) mqh->mqh_partial_bytes += rb; /* - * Update count of bytes read, with alignment padding. Note that this - * will never actually insert any padding except at the end of a - * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF, - * and each read and write is as well. + * Update count of bytes that can be consumed, accounting for + * alignment padding. Note that this will never actually insert any + * padding except at the end of a message, because the buffer size is + * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well. */ Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb)); - shm_mq_inc_bytes_read(mq, MAXALIGN(rb)); + mqh->mqh_consume_pending += MAXALIGN(rb); /* If we got all the data, exit the loop. */ if (mqh->mqh_partial_bytes >= nbytes) @@ -745,7 +745,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) /* Wait for some more data. */ still_needed = nbytes - mqh->mqh_partial_bytes; - res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata); + res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata); if (res != SHM_MQ_SUCCESS) return res; if (rb > still_needed) @@ -1010,9 +1010,10 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, * is SHM_MQ_SUCCESS. */ static shm_mq_result -shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, +shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, Size *nbytesp, void **datap) { + shm_mq *mq = mqh->mqh_queue; Size ringsize = mq->mq_ring_size; uint64 used; uint64 written; @@ -1024,7 +1025,13 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, /* Get bytes written, so we can compute what's available to read. */ written = pg_atomic_read_u64(&mq->mq_bytes_written); - read = pg_atomic_read_u64(&mq->mq_bytes_read); + + /* + * Get bytes read. Include bytes we could consume but have not yet + * consumed. + */ + read = pg_atomic_read_u64(&mq->mq_bytes_read) + + mqh->mqh_consume_pending; used = written - read; Assert(used <= ringsize); offset = read % (uint64) ringsize; @@ -1055,6 +1062,16 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait, if (mq->mq_detached) return SHM_MQ_DETACHED; + /* + * We didn't get enough data to satisfy the request, so mark any data + * previously-consumed as read to make more buffer space. + */ + if (mqh->mqh_consume_pending > 0) + { + shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); + mqh->mqh_consume_pending = 0; + } + /* Skip manipulation of our latch if nowait = true. */ if (nowait) return SHM_MQ_WOULD_BLOCK;