* locally by copying the chunks into a backend-local buffer. mqh_buffer is
* the buffer, and mqh_buflen is the number of bytes allocated for it.
*
+ * mqh_send_pending, is number of bytes that is written to the queue but not
+ * yet updated in the shared memory. We will not update it until the written
+ * data is 1/4th of the ring size or the tuple queue is full. This will
+ * prevent frequent CPU cache misses, and it will also avoid frequent
+ * SetLatch() calls, which are quite expensive.
+ *
* mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
* are used to track the state of non-blocking operations. When the caller
* attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
char *mqh_buffer;
Size mqh_buflen;
Size mqh_consume_pending;
+ Size mqh_send_pending;
Size mqh_partial_bytes;
Size mqh_expected_bytes;
bool mqh_length_word_complete;
mqh->mqh_buffer = NULL;
mqh->mqh_buflen = 0;
mqh->mqh_consume_pending = 0;
+ mqh->mqh_send_pending = 0;
mqh->mqh_partial_bytes = 0;
mqh->mqh_expected_bytes = 0;
mqh->mqh_length_word_complete = false;
* Write a message into a shared message queue.
*/
shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
+ bool force_flush)
{
shm_mq_iovec iov;
iov.data = data;
iov.len = nbytes;
- return shm_mq_sendv(mqh, &iov, 1, nowait);
+ return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
}
/*
* arguments, each time the process latch is set. (Once begun, the sending
* of a message cannot be aborted except by detaching from the queue; changing
* the length or payload will corrupt the queue.)
+ *
+ * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
+ * and notify the receiver (if it is already attached). Otherwise, we don't
+ * update it until we have written an amount of data greater than 1/4th of the
+ * ring size.
*/
shm_mq_result
-shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
+ bool force_flush)
{
shm_mq_result res;
shm_mq *mq = mqh->mqh_queue;
mqh->mqh_counterparty_attached = true;
}
- /* Notify receiver of the newly-written data, and return. */
- SetLatch(&receiver->procLatch);
+ /*
+ * If the caller has requested force flush or we have written more than 1/4
+ * of the ring size, mark it as written in shared memory and notify the
+ * receiver.
+ */
+ if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
+ {
+ shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+ SetLatch(&receiver->procLatch);
+ mqh->mqh_send_pending = 0;
+ }
+
return SHM_MQ_SUCCESS;
}
void
shm_mq_detach(shm_mq_handle *mqh)
{
+ /* Before detaching, notify the receiver about any already-written data. */
+ if (mqh->mqh_send_pending > 0)
+ {
+ shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
+ mqh->mqh_send_pending = 0;
+ }
+
/* Notify counterparty that we're outta here. */
shm_mq_detach_internal(mqh->mqh_queue);
/* Compute number of ring buffer bytes used and available. */
rb = pg_atomic_read_u64(&mq->mq_bytes_read);
- wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+ wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
Assert(wb >= rb);
used = wb - rb;
Assert(used <= ringsize);
}
else if (available == 0)
{
+ /* Update the pending send bytes in the shared memory. */
+ shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+
/*
* Since mq->mqh_counterparty_attached is known to be true at this
* point, mq_receiver has been set, and it can't change once set.
Assert(mqh->mqh_counterparty_attached);
SetLatch(&mq->mq_receiver->procLatch);
+ /*
+ * We have just updated the mqh_send_pending bytes in the shared
+ * memory so reset it.
+ */
+ mqh->mqh_send_pending = 0;
+
/* Skip manipulation of our latch if nowait = true. */
if (nowait)
{
* MAXIMUM_ALIGNOF, and each read is as well.
*/
Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
- shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
/*
- * For efficiency, we don't set the reader's latch here. We'll do
- * that only when the buffer fills up or after writing an entire
- * message.
+ * For efficiency, we don't update the bytes written in the shared
+ * memory and also don't set the reader's latch here. Refer to
+ * the comments atop the shm_mq_handle structure for more
+ * information.
*/
+ mqh->mqh_send_pending += MAXALIGN(sendnow);
}
}
/* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
- Size nbytes, const void *data, bool nowait);
-extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
- shm_mq_iovec *iov, int iovcnt, bool nowait);
+ Size nbytes, const void *data, bool nowait,
+ bool force_flush);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov,
+ int iovcnt, bool nowait, bool force_flush);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
Size *nbytesp, void **datap, bool nowait);
+extern void shm_mq_flush(shm_mq_handle *mqh);
/* Wait for our counterparty to attach to the queue. */
extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
/* Send the initial message. */
- res = shm_mq_send(outqh, message_size, message_contents, false);
+ res = shm_mq_send(outqh, message_size, message_contents, false, true);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
break;
/* Send it back out. */
- res = shm_mq_send(outqh, len, data, false);
+ res = shm_mq_send(outqh, len, data, false, true);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
*/
if (send_count < loop_count)
{
- res = shm_mq_send(outqh, message_size, message_contents, true);
+ res = shm_mq_send(outqh, message_size, message_contents, true,
+ true);
if (res == SHM_MQ_SUCCESS)
{
++send_count;