Skip to content
Snippets Groups Projects
Commit ec9037df authored by Robert Haas's avatar Robert Haas
Browse files

Single-reader, single-writer, lightweight shared message queue.

This code provides infrastructure for user backends to communicate
relatively easily with background workers.  The message queue is
structured as a ring buffer and allows messages of arbitary length
to be sent and received.

Patch by me.  Review by KaiGai Kohei and Andres Freund.
parent 6ddd5137
Branches
Tags
No related merge requests found
......@@ -16,6 +16,6 @@ endif
endif
OBJS = dsm_impl.o dsm.o ipc.o ipci.o pmsignal.o procarray.o procsignal.o \
shmem.o shmqueue.o shm_toc.o sinval.o sinvaladt.o standby.o
shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o sinvaladt.o standby.o
include $(top_srcdir)/src/backend/common.mk
/*-------------------------------------------------------------------------
*
* shm_mq.c
* single-reader, single-writer shared memory message queue
*
* Both the sender and the receiver must have a PGPROC; their respective
* process latches are used for synchronization. Only the sender may send,
* and only the receiver may receive. This is intended to allow a user
* backend to communicate with worker backends that it has registered.
*
* Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/shm_mq.h
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/procsignal.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
/*
* This structure represents the actual queue, stored in shared memory.
*
* Some notes on synchronization:
*
* mq_receiver and mq_bytes_read can only be changed by the receiver; and
* mq_sender and mq_bytes_written can only be changed by the sender. However,
* because most of these fields are 8 bytes and we don't assume that 8 byte
* reads and writes are atomic, the spinlock must be taken whenever the field
* is updated, and whenever it is read by a process other than the one allowed
* to modify it. But the process that is allowed to modify it is also allowed
* to read it without the lock. On architectures where 8-byte writes are
* atomic, we could replace these spinlocks with memory barriers, but
* testing found no performance benefit, so it seems best to keep things
* simple for now.
*
* mq_detached can be set by either the sender or the receiver, so the mutex
* must be held to read or write it. Memory barriers could be used here as
* well, if needed.
*
* mq_ring_size and mq_ring_offset never change after initialization, and
* can therefore be read without the lock.
*
* Importantly, mq_ring can be safely read and written without a lock. Were
* this not the case, we'd have to hold the spinlock for much longer
* intervals, and performance might suffer. Fortunately, that's not
* necessary. At any given time, the difference between mq_bytes_read and
* mq_bytes_written defines the number of bytes within mq_ring that contain
* unread data, and mq_bytes_read defines the position where those bytes
* begin. The sender can increase the number of unread bytes at any time,
* but only the receiver can give license to overwrite those bytes, by
* incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
* the unread bytes it knows to be present without the lock. Conversely,
* the sender can write to the unused portion of the ring buffer without
* the lock, because nobody else can be reading or writing those bytes. The
* receiver could be making more bytes unused by incrementing mq_bytes_read,
* but that's OK. Note that it would be unsafe for the receiver to read any
* data it's already marked as read, or to write any data; and it would be
* unsafe for the sender to reread any data after incrementing
* mq_bytes_written, but fortunately there's no need for any of that.
*/
struct shm_mq
{
slock_t mq_mutex;
PGPROC *mq_receiver;
PGPROC *mq_sender;
uint64 mq_bytes_read;
uint64 mq_bytes_written;
uint64 mq_ring_size;
bool mq_detached;
uint8 mq_ring_offset;
char mq_ring[FLEXIBLE_ARRAY_MEMBER];
};
/*
* This structure is a backend-private handle for access to a queue.
*
* mqh_queue is a pointer to the queue we've attached, and mqh_segment is
* a pointer to the dynamic shared memory segment that contains it.
*
* If this queue is intended to connect the current process with a background
* worker that started it, the user can pass a pointer to the worker handle
* to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
* is to allow us to begin sending to or receiving from that queue before the
* process we'll be communicating with has even been started. If it fails
* to start, the handle will allow us to notice that and fail cleanly, rather
* than waiting forever; see shm_mq_wait_internal. This is mostly useful in
* simple cases - e.g. where there are just 2 processes communicating; in
* more complex scenarios, every process may not have a BackgroundWorkerHandle
* available, or may need to watch for the failure of more than one other
* process at a time.
*
* When a message exists as a contiguous chunk of bytes in the queue - that is,
* it is smaller than the size of the ring buffer and does not wrap around
* the end - we return the message to the caller as a pointer into the buffer.
* For messages that are larger or happen to wrap, we reassemble the message
* locally by copying the chunks into a backend-local buffer. mqh_buffer is
* the buffer, and mqh_buflen is the number of bytes allocated for it.
*
* mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word
* are used to track the state of non-blocking operations. When the caller
* attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
* are expected to retry the call at a later time with the same argument;
* we need to retain enough state to pick up where we left off.
* mqh_did_length_word tracks whether we read or wrote the length word,
* mqh_partial_message_bytes tracks the number of payload bytes read or
* written, and mqh_expected_bytes - which is used only for reads - tracks
* the expected total size of the payload.
*
* mqh_counterparty_attached tracks whether we know the counterparty to have
* attached to the queue at some previous point. This lets us avoid some
* mutex acquisitions.
*
* mqh_context is the memory context in effect at the time we attached to
* the shm_mq. The shm_mq_handle itself is allocated in this context, and
* we make sure any other allocations we do happen in this context as well,
* to avoid nasty surprises.
*/
struct shm_mq_handle
{
shm_mq *mqh_queue;
dsm_segment *mqh_segment;
BackgroundWorkerHandle *mqh_handle;
char *mqh_buffer;
uint64 mqh_buflen;
uint64 mqh_consume_pending;
uint64 mqh_partial_message_bytes;
uint64 mqh_expected_bytes;
bool mqh_did_length_word;
bool mqh_counterparty_attached;
MemoryContext mqh_context;
};
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes,
void *data, bool nowait, uint64 *bytes_written);
static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed,
bool nowait, uint64 *nbytesp, void **datap);
static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
BackgroundWorkerHandle *handle);
static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n);
static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n);
static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
/* Minimum queue size is enough for header and at least one chunk of data. */
const Size shm_mq_minimum_size =
MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
#define MQH_INITIAL_BUFSIZE 8192
/*
* Initialize a new shared message queue.
*/
shm_mq *
shm_mq_create(void *address, Size size)
{
shm_mq *mq = address;
uint64 data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
size = MAXALIGN_DOWN(size);
/* Queue size must be large enough to hold some data. */
Assert(size > data_offset);
/* Initialize queue header. */
SpinLockInit(&mq->mq_mutex);
mq->mq_receiver = NULL;
mq->mq_sender = NULL;
mq->mq_bytes_read = 0;
mq->mq_bytes_written = 0;
mq->mq_ring_size = size - data_offset;
mq->mq_detached = false;
mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
return mq;
}
/*
* Set the identity of the process that will receive from a shared message
* queue.
*/
void
shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
{
volatile shm_mq *vmq = mq;
PGPROC *sender;
SpinLockAcquire(&mq->mq_mutex);
Assert(vmq->mq_receiver == NULL);
vmq->mq_receiver = proc;
sender = vmq->mq_sender;
SpinLockRelease(&mq->mq_mutex);
if (sender != NULL)
SetLatch(&sender->procLatch);
}
/*
* Set the identity of the process that will send to a shared message queue.
*/
void
shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
{
volatile shm_mq *vmq = mq;
PGPROC *receiver;
SpinLockAcquire(&mq->mq_mutex);
Assert(vmq->mq_sender == NULL);
vmq->mq_sender = proc;
receiver = vmq->mq_receiver;
SpinLockRelease(&mq->mq_mutex);
if (receiver != NULL)
SetLatch(&receiver->procLatch);
}
/*
* Get the configured receiver.
*/
PGPROC *
shm_mq_get_receiver(shm_mq *mq)
{
volatile shm_mq *vmq = mq;
PGPROC *receiver;
SpinLockAcquire(&mq->mq_mutex);
receiver = vmq->mq_receiver;
SpinLockRelease(&mq->mq_mutex);
return receiver;
}
/*
* Get the configured sender.
*/
PGPROC *
shm_mq_get_sender(shm_mq *mq)
{
volatile shm_mq *vmq = mq;
PGPROC *sender;
SpinLockAcquire(&mq->mq_mutex);
sender = vmq->mq_sender;
SpinLockRelease(&mq->mq_mutex);
return sender;
}
/*
* Attach to a shared message queue so we can send or receive messages.
*
* The memory context in effect at the time this function is called should
* be one which will last for at least as long as the message queue itself.
* We'll allocate the handle in that context, and future allocations that
* are needed to buffer incoming data will happen in that context as well.
*
* If seg != NULL, the queue will be automatically detached when that dynamic
* shared memory segment is detached.
*
* If handle != NULL, the queue can be read or written even before the
* other process has attached. We'll wait for it to do so if needed. The
* handle must be for a background worker initialized with bgw_notify_pid
* equal to our PID.
*
* shm_mq_detach() should be called when done. This will free the
* shm_mq_handle and mark the queue itself as detached, so that our
* counterpart won't get stuck waiting for us to fill or drain the queue
* after we've already lost interest.
*/
shm_mq_handle *
shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
{
shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
mqh->mqh_queue = mq;
mqh->mqh_segment = seg;
mqh->mqh_buffer = NULL;
mqh->mqh_handle = handle;
mqh->mqh_buflen = 0;
mqh->mqh_consume_pending = 0;
mqh->mqh_context = CurrentMemoryContext;
mqh->mqh_partial_message_bytes = 0;
mqh->mqh_did_length_word = false;
mqh->mqh_counterparty_attached = false;
if (seg != NULL)
on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
return mqh;
}
/*
* Write a message into a shared message queue.
*
* When nowait = false, we'll wait on our process latch when the ring buffer
* fills up, and then continue writing once the receiver has drained some data.
* The process latch is reset after each wait.
*
* When nowait = true, we do not manipulate the state of the process latch;
* instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
* this case, the caller should call this function again, with the same
* arguments, each time the process latch is set. (Once begun, the sending
* of a message cannot be aborted except by detaching from the queue; changing
* the length or payload will corrupt the queue.)
*/
shm_mq_result
shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
{
shm_mq_result res;
shm_mq *mq = mqh->mqh_queue;
uint64 bytes_written;
Assert(mq->mq_sender == MyProc);
/* Write the message length into the buffer. */
if (!mqh->mqh_did_length_word)
{
res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait,
&bytes_written);
if (res != SHM_MQ_SUCCESS)
return res;
/*
* We're sure to have sent the length in full, since we always
* write a MAXALIGN'd chunk.
*/
Assert(bytes_written == MAXALIGN64(sizeof(uint64)));
mqh->mqh_did_length_word = true;
}
/* Write the actual data bytes into the buffer. */
Assert(mqh->mqh_partial_message_bytes <= nbytes);
res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes,
((char *) data) + mqh->mqh_partial_message_bytes,
nowait, &bytes_written);
if (res == SHM_MQ_WOULD_BLOCK)
mqh->mqh_partial_message_bytes += bytes_written;
else
{
mqh->mqh_partial_message_bytes = 0;
mqh->mqh_did_length_word = false;
}
if (res != SHM_MQ_SUCCESS)
return res;
/* Notify receiver of the newly-written data, and return. */
return shm_mq_notify_receiver(mq);
}
/*
* Receive a message from a shared message queue.
*
* We set *nbytes to the message length and *data to point to the message
* payload. If the entire message exists in the queue as a single,
* contiguous chunk, *data will point directly into shared memory; otherwise,
* it will point to a temporary buffer. This mostly avoids data copying in
* the hoped-for case where messages are short compared to the buffer size,
* while still allowing longer messages. In either case, the return value
* remains valid until the next receive operation is perfomed on the queue.
*
* When nowait = false, we'll wait on our process latch when the ring buffer
* is empty and we have not yet received a full message. The sender will
* set our process latch after more data has been written, and we'll resume
* processing. Each call will therefore return a complete message
* (unless the sender detaches the queue).
*
* When nowait = true, we do not manipulate the state of the process latch;
* instead, whenever the buffer is empty and we need to read from it, we
* return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
* function again after the process latch has been set.
*/
shm_mq_result
shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
{
shm_mq *mq = mqh->mqh_queue;
shm_mq_result res;
uint64 rb = 0;
uint64 nbytes;
uint64 needed;
void *rawdata;
Assert(mq->mq_receiver == MyProc);
/* We can't receive data until the sender has attached. */
if (!mqh->mqh_counterparty_attached)
{
if (nowait)
{
if (shm_mq_get_sender(mq) == NULL)
return SHM_MQ_WOULD_BLOCK;
}
else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle))
{
mq->mq_detached = true;
return SHM_MQ_DETACHED;
}
mqh->mqh_counterparty_attached = true;
}
/* Consume any zero-copy data from previous receive operation. */
if (mqh->mqh_consume_pending > 0)
{
shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
mqh->mqh_consume_pending = 0;
}
/* Determine the message length. */
if (mqh->mqh_did_length_word)
{
/* We've partially received a message; recall expected length. */
nbytes = mqh->mqh_expected_bytes;
}
else
{
/* Try to receive the message length word. */
res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
Assert(rb >= sizeof(uint64));
memcpy(&nbytes, rawdata, sizeof(uint64));
mqh->mqh_expected_bytes = nbytes;
/* If we've already got the whole message, we're done. */
needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes);
if (rb >= needed)
{
/*
* Technically, we could consume the message length information at
* this point, but the extra write to shared memory wouldn't be
* free and in most cases we would reap no benefit.
*/
mqh->mqh_consume_pending = needed;
*nbytesp = nbytes;
*datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
return SHM_MQ_SUCCESS;
}
/* Consume the length word. */
shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64)));
mqh->mqh_did_length_word = true;
rb -= MAXALIGN64(sizeof(uint64));
}
if (mqh->mqh_partial_message_bytes == 0)
{
/*
* Try to obtain the whole message in a single chunk. If this works,
* we need not copy the data and can return a pointer directly into
* shared memory.
*/
res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
if (rb >= nbytes)
{
mqh->mqh_did_length_word = false;
mqh->mqh_consume_pending = MAXALIGN64(nbytes);
*nbytesp = nbytes;
*datap = rawdata;
return SHM_MQ_SUCCESS;
}
/*
* The message has wrapped the buffer. We'll need to copy it in order
* to return it to the client in one chunk. First, make sure we have a
* large enough buffer available.
*/
if (mqh->mqh_buflen < nbytes)
{
uint64 newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
while (newbuflen < nbytes)
newbuflen *= 2;
if (mqh->mqh_buffer != NULL)
{
pfree(mqh->mqh_buffer);
mqh->mqh_buffer = NULL;
mqh->mqh_buflen = 0;
}
mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
mqh->mqh_buflen = newbuflen;
}
}
/* Loop until we've copied the entire message. */
for (;;)
{
uint64 still_needed;
/* Copy as much as we can. */
Assert(mqh->mqh_partial_message_bytes + rb <= nbytes);
memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb);
mqh->mqh_partial_message_bytes += rb;
/*
* Update count of bytes read, with alignment padding. Note
* that this will never actually insert any padding except at the
* end of a message, because the buffer size is a multiple of
* MAXIMUM_ALIGNOF, and each read and write is as well.
*/
Assert(mqh->mqh_partial_message_bytes == nbytes ||
rb == MAXALIGN64(rb));
shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
/* If we got all the data, exit the loop. */
if (mqh->mqh_partial_message_bytes >= nbytes)
break;
/* Wait for some more data. */
still_needed = nbytes - mqh->mqh_partial_message_bytes;
res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
if (res != SHM_MQ_SUCCESS)
return res;
if (rb > still_needed)
rb = still_needed;
}
/* Return the complete message, and reset for next message. */
*nbytesp = nbytes;
*datap = mqh->mqh_buffer;
mqh->mqh_did_length_word = false;
mqh->mqh_partial_message_bytes = 0;
return SHM_MQ_SUCCESS;
}
/*
* Wait for the other process that's supposed to use this queue to attach
* to it.
*
* The return value is SHM_MQ_DETACHED if the worker has already detached or
* if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
* Note that we will only be able to detect that the worker has died before
* attaching if a background worker handle was passed to shm_mq_attach().
*/
shm_mq_result
shm_mq_wait_for_attach(shm_mq_handle *mqh)
{
shm_mq *mq = mqh->mqh_queue;
PGPROC **victim;
if (shm_mq_get_receiver(mq) == MyProc)
victim = &mq->mq_sender;
else
{
Assert(shm_mq_get_sender(mq) == MyProc);
victim = &mq->mq_receiver;
}
if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
return SHM_MQ_SUCCESS;
else
return SHM_MQ_DETACHED;
}
/*
* Detach a shared message queue.
*
* The purpose of this function is to make sure that the process
* with which we're communicating doesn't block forever waiting for us to
* fill or drain the queue once we've lost interest. Whem the sender
* detaches, the receiver can read any messages remaining in the queue;
* further reads will return SHM_MQ_DETACHED. If the receiver detaches,
* further attempts to send messages will likewise return SHM_MQ_DETACHED.
*/
void
shm_mq_detach(shm_mq *mq)
{
volatile shm_mq *vmq = mq;
PGPROC *victim;
SpinLockAcquire(&mq->mq_mutex);
if (vmq->mq_sender == MyProc)
victim = vmq->mq_receiver;
else
{
Assert(vmq->mq_receiver == MyProc);
victim = vmq->mq_sender;
}
vmq->mq_detached = true;
SpinLockRelease(&mq->mq_mutex);
if (victim != NULL)
SetLatch(&victim->procLatch);
}
/*
* Write bytes into a shared message queue.
*/
static shm_mq_result
shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
uint64 *bytes_written)
{
shm_mq *mq = mqh->mqh_queue;
uint64 sent = 0;
uint64 used;
uint64 ringsize = mq->mq_ring_size;
uint64 available;
while (sent < nbytes)
{
bool detached;
uint64 rb;
/* Compute number of ring buffer bytes used and available. */
rb = shm_mq_get_bytes_read(mq, &detached);
Assert(mq->mq_bytes_written >= rb);
used = mq->mq_bytes_written - rb;
Assert(used <= ringsize);
available = Min(ringsize - used, nbytes - sent);
/* Bail out if the queue has been detached. */
if (detached)
return SHM_MQ_DETACHED;
if (available == 0)
{
shm_mq_result res;
/*
* The queue is full, so if the receiver isn't yet known to be
* attached, we must wait for that to happen.
*/
if (!mqh->mqh_counterparty_attached)
{
if (nowait)
{
if (shm_mq_get_receiver(mq) == NULL)
return SHM_MQ_WOULD_BLOCK;
}
else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
mqh->mqh_handle))
{
mq->mq_detached = true;
return SHM_MQ_DETACHED;
}
mqh->mqh_counterparty_attached = true;
}
/* Let the receiver know that we need them to read some data. */
res = shm_mq_notify_receiver(mq);
if (res != SHM_MQ_SUCCESS)
{
*bytes_written = res;
return res;
}
/* Skip manipulation of our latch if nowait = true. */
if (nowait)
{
*bytes_written = sent;
return SHM_MQ_WOULD_BLOCK;
}
/*
* Wait for our latch to be set. It might already be set for
* some unrelated reason, but that'll just result in one extra
* trip through the loop. It's worth it to avoid resetting the
* latch at top of loop, because setting an already-set latch is
* much cheaper than setting one that has been reset.
*/
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
/* An interrupt may have occurred while we were waiting. */
CHECK_FOR_INTERRUPTS();
/* Reset the latch so we don't spin. */
ResetLatch(&MyProc->procLatch);
}
else
{
uint64 offset = mq->mq_bytes_written % ringsize;
uint64 sendnow = Min(available, ringsize - offset);
/* Write as much data as we can via a single memcpy(). */
memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
(char *) data + sent, sendnow);
sent += sendnow;
/*
* Update count of bytes written, with alignment padding. Note
* that this will never actually insert any padding except at the
* end of a run of bytes, because the buffer size is a multiple of
* MAXIMUM_ALIGNOF, and each read is as well.
*/
Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow));
shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow));
/*
* For efficiency, we don't set the reader's latch here. We'll
* do that only when the buffer fills up or after writing an
* entire message.
*/
}
}
*bytes_written = sent;
return SHM_MQ_SUCCESS;
}
/*
* Wait until at least *nbytesp bytes are available to be read from the
* shared message queue, or until the buffer wraps around. On return,
* *datap is set to the location at which data bytes can be read. The
* return value is the number of bytes available to be read starting at
* that offset; if the message has wrapped the buffer, it may be less than
* bytes_needed.
*/
static shm_mq_result
shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait,
uint64 *nbytesp, void **datap)
{
uint64 used;
uint64 ringsize = mq->mq_ring_size;
uint64 written;
for (;;)
{
uint64 offset;
bool detached;
/* Get bytes written, so we can compute what's available to read. */
written = shm_mq_get_bytes_written(mq, &detached);
used = written - mq->mq_bytes_read;
Assert(used <= ringsize);
offset = mq->mq_bytes_read % ringsize;
/* If we have enough data or buffer has wrapped, we're done. */
if (used >= bytes_needed || offset + used >= ringsize)
{
*nbytesp = Min(used, ringsize - offset);
*datap = &mq->mq_ring[mq->mq_ring_offset + offset];
return SHM_MQ_SUCCESS;
}
/*
* Fall out before waiting if the queue has been detached.
*
* Note that we don't check for this until *after* considering
* whether the data already available is enough, since the
* receiver can finish receiving a message stored in the buffer
* even after the sender has detached.
*/
if (detached)
return SHM_MQ_DETACHED;
/* Skip manipulation of our latch if nowait = true. */
if (nowait)
return SHM_MQ_WOULD_BLOCK;
/*
* Wait for our latch to be set. It might already be set for
* some unrelated reason, but that'll just result in one extra
* trip through the loop. It's worth it to avoid resetting the
* latch at top of loop, because setting an already-set latch is
* much cheaper than setting one that has been reset.
*/
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
/* An interrupt may have occurred while we were waiting. */
CHECK_FOR_INTERRUPTS();
/* Reset the latch so we don't spin. */
ResetLatch(&MyProc->procLatch);
}
}
/*
* This is used when a process is waiting for its counterpart to attach to the
* queue. We exit when the other process attaches as expected, or, if
* handle != NULL, when the referenced background process or the postmaster
* dies. Note that if handle == NULL, and the process fails to attach, we'll
* potentially get stuck here forever waiting for a process that may never
* start. We do check for interrupts, though.
*
* ptr is a pointer to the memory address that we're expecting to become
* non-NULL when our counterpart attaches to the queue.
*/
static bool
shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
BackgroundWorkerHandle *handle)
{
bool save_set_latch_on_sigusr1;
bool result = false;
save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
if (handle != NULL)
set_latch_on_sigusr1 = true;
PG_TRY();
{
for (;;)
{
BgwHandleStatus status;
pid_t pid;
bool detached;
/* Acquire the lock just long enough to check the pointer. */
SpinLockAcquire(&mq->mq_mutex);
detached = mq->mq_detached;
result = (*ptr != NULL);
SpinLockRelease(&mq->mq_mutex);
/* Fail if detached; else succeed if initialized. */
if (detached)
{
result = false;
break;
}
if (result)
break;
if (handle != NULL)
{
/* Check for unexpected worker death. */
status = GetBackgroundWorkerPid(handle, &pid);
if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
{
result = false;
break;
}
}
/* Wait to be signalled. */
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
/* An interrupt may have occurred while we were waiting. */
CHECK_FOR_INTERRUPTS();
/* Reset the latch so we don't spin. */
ResetLatch(&MyProc->procLatch);
}
}
PG_CATCH();
{
set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
PG_RE_THROW();
}
PG_END_TRY();
return result;
}
/*
* Get the number of bytes read. The receiver need not use this to access
* the count of bytes read, but the sender must.
*/
static uint64
shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
{
uint64 v;
SpinLockAcquire(&mq->mq_mutex);
v = mq->mq_bytes_read;
*detached = mq->mq_detached;
SpinLockRelease(&mq->mq_mutex);
return v;
}
/*
* Increment the number of bytes read.
*/
static void
shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n)
{
PGPROC *sender;
SpinLockAcquire(&mq->mq_mutex);
mq->mq_bytes_read += n;
sender = mq->mq_sender;
SpinLockRelease(&mq->mq_mutex);
/* We shoudn't have any bytes to read without a sender. */
Assert(sender != NULL);
SetLatch(&sender->procLatch);
}
/*
* Get the number of bytes written. The sender need not use this to access
* the count of bytes written, but the reciever must.
*/
static uint64
shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
{
uint64 v;
SpinLockAcquire(&mq->mq_mutex);
v = mq->mq_bytes_written;
*detached = mq->mq_detached;
SpinLockRelease(&mq->mq_mutex);
return v;
}
/*
* Increment the number of bytes written.
*/
static void
shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n)
{
SpinLockAcquire(&mq->mq_mutex);
mq->mq_bytes_written += n;
SpinLockRelease(&mq->mq_mutex);
}
/*
* Set sender's latch, unless queue is detached.
*/
static shm_mq_result
shm_mq_notify_receiver(volatile shm_mq *mq)
{
PGPROC *receiver;
bool detached;
SpinLockAcquire(&mq->mq_mutex);
detached = mq->mq_detached;
receiver = mq->mq_receiver;
SpinLockRelease(&mq->mq_mutex);
if (detached)
return SHM_MQ_DETACHED;
if (receiver)
SetLatch(&receiver->procLatch);
return SHM_MQ_SUCCESS;
}
/* Shim for on_dsm_callback. */
static void
shm_mq_detach_callback(dsm_segment *seg, Datum arg)
{
shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
shm_mq_detach(mq);
}
/*-------------------------------------------------------------------------
*
* shm_mq.h
* single-reader, single-writer shared memory message queue
*
* Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/shm_mq.h
*
*-------------------------------------------------------------------------
*/
#ifndef SHM_MQ_H
#define SHM_MQ_H
#include "postmaster/bgworker.h"
#include "storage/dsm.h"
#include "storage/proc.h"
/* The queue itself, in shared memory. */
struct shm_mq;
typedef struct shm_mq shm_mq;
/* Backend-private state. */
struct shm_mq_handle;
typedef struct shm_mq_handle shm_mq_handle;
/* Possible results of a send or receive operation. */
typedef enum
{
SHM_MQ_SUCCESS, /* Sent or received a message. */
SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
SHM_MQ_DETACHED /* Other process has detached queue. */
} shm_mq_result;
/*
* Primitives to create a queue and set the sender and receiver.
*
* Both the sender and the receiver must be set before any messages are read
* or written, but they need not be set by the same process. Each must be
* set exactly once.
*/
extern shm_mq *shm_mq_create(void *address, Size size);
extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
/* Accessor methods for sender and receiver. */
extern PGPROC *shm_mq_get_receiver(shm_mq *);
extern PGPROC *shm_mq_get_sender(shm_mq *);
/* Set up backend-local queue state. */
extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
BackgroundWorkerHandle *handle);
/* Break connection. */
extern void shm_mq_detach(shm_mq *);
/* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
uint64 nbytes, void *data, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
uint64 *nbytesp, void **datap, bool nowait);
/* Wait for our counterparty to attach to the queue. */
extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
/* Smallest possible queue. */
extern const Size shm_mq_minimum_size;
#endif /* SHM_MQ_H */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment