};
static void shm_mq_detach_internal(shm_mq *mq);
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
const void *data, bool nowait, Size *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
- bool nowait, Size *nbytesp, void **datap);
+static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
+ Size bytes_needed, bool nowait, Size *nbytesp, void **datap);
static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
BackgroundWorkerHandle *handle);
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
mqh->mqh_counterparty_attached = true;
}
- /* Consume any zero-copy data from previous receive operation. */
- if (mqh->mqh_consume_pending > 0)
+ /*
+ * If we've consumed an amount of data greater than 1/4th of the ring
+ * size, mark it consumed in shared memory. We try to avoid doing this
+ * unnecessarily when only a small amount of data has been consumed,
+ * because SetLatch() is fairly expensive and we don't want to do it too
+ * often.
+ */
+ if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
{
shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
mqh->mqh_consume_pending = 0;
{
/* Try to receive the message length word. */
Assert(mqh->mqh_partial_bytes < sizeof(Size));
- res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+ res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
if (rb >= needed)
{
- /*
- * Technically, we could consume the message length
- * information at this point, but the extra write to shared
- * memory wouldn't be free and in most cases we would reap no
- * benefit.
- */
- mqh->mqh_consume_pending = needed;
+ mqh->mqh_consume_pending += needed;
*nbytesp = nbytes;
*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
return SHM_MQ_SUCCESS;
*/
mqh->mqh_expected_bytes = nbytes;
mqh->mqh_length_word_complete = true;
- shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+ mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
rb -= MAXALIGN(sizeof(Size));
}
else
}
Assert(mqh->mqh_buflen >= sizeof(Size));
- /* Copy and consume partial length word. */
+ /* Copy partial length word; remember to consume it. */
if (mqh->mqh_partial_bytes + rb > sizeof(Size))
lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
else
memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
lengthbytes);
mqh->mqh_partial_bytes += lengthbytes;
- shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+ mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
rb -= lengthbytes;
/* If we now have the whole word, we're ready to read payload. */
* we need not copy the data and can return a pointer directly into
* shared memory.
*/
- res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
+ res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
if (rb >= nbytes)
{
mqh->mqh_length_word_complete = false;
- mqh->mqh_consume_pending = MAXALIGN(nbytes);
+ mqh->mqh_consume_pending += MAXALIGN(nbytes);
*nbytesp = nbytes;
*datap = rawdata;
return SHM_MQ_SUCCESS;
mqh->mqh_partial_bytes += rb;
/*
- * Update count of bytes read, with alignment padding. Note that this
- * will never actually insert any padding except at the end of a
- * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
- * and each read and write is as well.
+ * Update count of bytes that can be consumed, accounting for
+ * alignment padding. Note that this will never actually insert any
+ * padding except at the end of a message, because the buffer size is
+ * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
*/
Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
- shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
+ mqh->mqh_consume_pending += MAXALIGN(rb);
/* If we got all the data, exit the loop. */
if (mqh->mqh_partial_bytes >= nbytes)
/* Wait for some more data. */
still_needed = nbytes - mqh->mqh_partial_bytes;
- res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
+ res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
if (rb > still_needed)
* is SHM_MQ_SUCCESS.
*/
static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
Size *nbytesp, void **datap)
{
+ shm_mq *mq = mqh->mqh_queue;
Size ringsize = mq->mq_ring_size;
uint64 used;
uint64 written;
/* Get bytes written, so we can compute what's available to read. */
written = pg_atomic_read_u64(&mq->mq_bytes_written);
- read = pg_atomic_read_u64(&mq->mq_bytes_read);
+
+ /*
+ * Get bytes read. Include bytes we could consume but have not yet
+ * consumed.
+ */
+ read = pg_atomic_read_u64(&mq->mq_bytes_read) +
+ mqh->mqh_consume_pending;
used = written - read;
Assert(used <= ringsize);
offset = read % (uint64) ringsize;
if (mq->mq_detached)
return SHM_MQ_DETACHED;
+ /*
+ * We didn't get enough data to satisfy the request, so mark any data
+ * previously-consumed as read to make more buffer space.
+ */
+ if (mqh->mqh_consume_pending > 0)
+ {
+ shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+ mqh->mqh_consume_pending = 0;
+ }
+
/* Skip manipulation of our latch if nowait = true. */
if (nowait)
return SHM_MQ_WOULD_BLOCK;