From fad153ec45299bd4d4f29dec8d9e04e2f1c08148 Mon Sep 17 00:00:00 2001 From: Tom Lane <tgl@sss.pgh.pa.us> Date: Thu, 19 Jun 2008 21:32:56 +0000 Subject: [PATCH] Rewrite the sinval messaging mechanism to reduce contention and avoid unnecessary cache resets. The major changes are: * When the queue overflows, we only issue a cache reset to the specific backend or backends that still haven't read the oldest message, rather than resetting everyone as in the original coding. * When we observe backend(s) falling well behind, we signal SIGUSR1 to only one backend, the one that is furthest behind and doesn't already have a signal outstanding for it. When it finishes catching up, it will in turn signal SIGUSR1 to the next-furthest-back guy, if there is one that is far enough behind to justify a signal. The PMSIGNAL_WAKEN_CHILDREN mechanism is removed. * We don't attempt to clean out dead messages after every message-receipt operation; rather, we do it on the insertion side, and only when the queue fullness passes certain thresholds. * Split SInvalLock into SInvalReadLock and SInvalWriteLock so that readers don't block writers nor vice versa (except during the infrequent queue cleanout operations). * Transfer multiple sinval messages for each acquisition of a read or write lock. --- src/backend/postmaster/postmaster.c | 12 +- src/backend/storage/ipc/sinval.c | 106 ++++--- src/backend/storage/ipc/sinvaladt.c | 469 +++++++++++++++++----------- src/backend/utils/cache/inval.c | 43 ++- src/include/storage/lwlock.h | 5 +- src/include/storage/pmsignal.h | 3 +- src/include/storage/sinval.h | 5 +- src/include/storage/sinvaladt.h | 14 +- 8 files changed, 413 insertions(+), 244 deletions(-) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 751bb8244d3..73d6dae56d8 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -37,7 +37,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.558 2008/06/06 22:35:22 alvherre Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.559 2008/06/19 21:32:56 tgl Exp $ * * NOTES * @@ -3829,16 +3829,6 @@ sigusr1_handler(SIGNAL_ARGS) load_role(); } - if (CheckPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN)) - { - /* - * Send SIGUSR1 to all children (triggers CatchupInterruptHandler). - * See storage/ipc/sinval[adt].c for the use of this. - */ - if (Shutdown <= SmartShutdown) - SignalChildren(SIGUSR1); - } - if (CheckPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER) && PgArchPID != 0) { diff --git a/src/backend/storage/ipc/sinval.c b/src/backend/storage/ipc/sinval.c index 4b8a8f1afbd..e2c6ca2aec9 100644 --- a/src/backend/storage/ipc/sinval.c +++ b/src/backend/storage/ipc/sinval.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.85 2008/03/17 11:50:26 alvherre Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -17,9 +17,7 @@ #include "access/xact.h" #include "commands/async.h" #include "miscadmin.h" -#include "storage/backendid.h" #include "storage/ipc.h" -#include "storage/proc.h" #include "storage/sinvaladt.h" #include "utils/inval.h" @@ -27,9 +25,9 @@ /* * Because backends sitting idle will not be reading sinval events, we * need a way to give an idle backend a swift kick in the rear and make - * it catch up before the sinval queue overflows and forces everyone - * through a cache reset exercise. This is done by broadcasting SIGUSR1 - * to all backends when the queue is threatening to become full. + * it catch up before the sinval queue overflows and forces it to go + * through a cache reset exercise. This is done by sending SIGUSR1 + * to any backend that gets too far behind. * * State for catchup events consists of two flags: one saying whether * the signal handler is currently allowed to call ProcessCatchupEvent @@ -47,67 +45,101 @@ static void ProcessCatchupEvent(void); /* - * SendSharedInvalidMessage - * Add a shared-cache-invalidation message to the global SI message queue. + * SendSharedInvalidMessages + * Add shared-cache-invalidation message(s) to the global SI message queue. */ void -SendSharedInvalidMessage(SharedInvalidationMessage *msg) +SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) { - bool insertOK; - - insertOK = SIInsertDataEntry(msg); - if (!insertOK) - elog(DEBUG4, "SI buffer overflow"); + SIInsertDataEntries(msgs, n); } /* * ReceiveSharedInvalidMessages * Process shared-cache-invalidation messages waiting for this backend * + * We guarantee to process all messages that had been queued before the + * routine was entered. It is of course possible for more messages to get + * queued right after our last SIGetDataEntries call. + * * NOTE: it is entirely possible for this routine to be invoked recursively * as a consequence of processing inside the invalFunction or resetFunction. - * Hence, we must be holding no SI resources when we call them. The only - * bad side-effect is that SIDelExpiredDataEntries might be called extra - * times on the way out of a nested call. + * Furthermore, such a recursive call must guarantee that all outstanding + * inval messages have been processed before it exits. This is the reason + * for the strange-looking choice to use a statically allocated buffer array + * and counters; it's so that a recursive call can process messages already + * sucked out of sinvaladt.c. */ void ReceiveSharedInvalidMessages( void (*invalFunction) (SharedInvalidationMessage *msg), void (*resetFunction) (void)) { - SharedInvalidationMessage data; - int getResult; - bool gotMessage = false; +#define MAXINVALMSGS 32 + static SharedInvalidationMessage messages[MAXINVALMSGS]; + /* + * We use volatile here to prevent bugs if a compiler doesn't realize + * that recursion is a possibility ... + */ + static volatile int nextmsg = 0; + static volatile int nummsgs = 0; - for (;;) + /* Deal with any messages still pending from an outer recursion */ + while (nextmsg < nummsgs) { - /* - * We can discard any pending catchup event, since we will not exit - * this loop until we're fully caught up. - */ - catchupInterruptOccurred = 0; + SharedInvalidationMessage *msg = &messages[nextmsg++]; - getResult = SIGetDataEntry(MyBackendId, &data); + invalFunction(msg); + } + + do + { + int getResult; + + nextmsg = nummsgs = 0; + + /* Try to get some more messages */ + getResult = SIGetDataEntries(messages, MAXINVALMSGS); - if (getResult == 0) - break; /* nothing more to do */ if (getResult < 0) { /* got a reset message */ elog(DEBUG4, "cache state reset"); resetFunction(); + break; /* nothing more to do */ } - else + + /* Process them, being wary that a recursive call might eat some */ + nextmsg = 0; + nummsgs = getResult; + + while (nextmsg < nummsgs) { - /* got a normal data message */ - invalFunction(&data); + SharedInvalidationMessage *msg = &messages[nextmsg++]; + + invalFunction(msg); } - gotMessage = true; - } - /* If we got any messages, try to release dead messages */ - if (gotMessage) - SIDelExpiredDataEntries(false); + /* + * We only need to loop if the last SIGetDataEntries call (which + * might have been within a recursive call) returned a full buffer. + */ + } while (nummsgs == MAXINVALMSGS); + + /* + * We are now caught up. If we received a catchup signal, reset that + * flag, and call SICleanupQueue(). This is not so much because we + * need to flush dead messages right now, as that we want to pass on + * the catchup signal to the next slowest backend. "Daisy chaining" the + * catchup signal this way avoids creating spikes in system load for + * what should be just a background maintenance activity. + */ + if (catchupInterruptOccurred) + { + catchupInterruptOccurred = 0; + elog(DEBUG4, "sinval catchup complete, cleaning queue"); + SICleanupQueue(false, 0); + } } diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index ddbc08ef55f..0befc4a9341 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -1,24 +1,25 @@ /*------------------------------------------------------------------------- * * sinvaladt.c - * POSTGRES shared cache invalidation segment definitions. + * POSTGRES shared cache invalidation data manager. * * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.70 2008/06/17 20:07:08 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include <signal.h> +#include <unistd.h> + #include "miscadmin.h" #include "storage/backendid.h" #include "storage/ipc.h" -#include "storage/lwlock.h" -#include "storage/pmsignal.h" #include "storage/proc.h" #include "storage/shmem.h" #include "storage/sinvaladt.h" @@ -27,20 +28,44 @@ /* * Conceptually, the shared cache invalidation messages are stored in an * infinite array, where maxMsgNum is the next array subscript to store a - * submitted message in, minMsgNum is the smallest array subscript containing a - * message not yet read by all backends, and we always have maxMsgNum >= + * submitted message in, minMsgNum is the smallest array subscript containing + * a message not yet read by all backends, and we always have maxMsgNum >= * minMsgNum. (They are equal when there are no messages pending.) For each * active backend, there is a nextMsgNum pointer indicating the next message it * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every * backend. * + * (In the current implementation, minMsgNum is a lower bound for the + * per-process nextMsgNum values, but it isn't rigorously kept equal to the + * smallest nextMsgNum --- it may lag behind. We only update it when + * SICleanupQueue is called, and we try not to do that often.) + * * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES * entries. We translate MsgNum values into circular-buffer indexes by * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space - * in the buffer. If the buffer does overflow, we reset it to empty and - * force each backend to "reset", ie, discard all its invalidatable state. + * in the buffer. If the buffer does overflow, we recover by setting the + * "reset" flag for each backend that has fallen too far behind. A backend + * that is in "reset" state is ignored while determining minMsgNum. When + * it does finally attempt to receive inval messages, it must discard all + * its invalidatable state, since it won't know what it missed. + * + * To reduce the probability of needing resets, we send a "catchup" interrupt + * to any backend that seems to be falling unreasonably far behind. The + * normal behavior is that at most one such interrupt is in flight at a time; + * when a backend completes processing a catchup interrupt, it executes + * SICleanupQueue, which will signal the next-furthest-behind backend if + * needed. This avoids undue contention from multiple backends all trying + * to catch up at once. However, the furthest-back backend might be stuck + * in a state where it can't catch up. Eventually it will get reset, so it + * won't cause any more problems for anyone but itself. But we don't want + * to find that a bunch of other backends are now too close to the reset + * threshold to be saved. So SICleanupQueue is designed to occasionally + * send extra catchup interrupts as the queue gets fuller, to backends that + * are far behind and haven't gotten one yet. As long as there aren't a lot + * of "stuck" backends, we won't need a lot of extra interrupts, since ones + * that aren't stuck will propagate their interrupts to the next guy. * * We would have problems if the MsgNum values overflow an integer, so * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND @@ -48,6 +73,21 @@ * large so that we don't need to do this often. It must be a multiple of * MAXNUMMESSAGES so that the existing circular-buffer entries don't need * to be moved when we do it. + * + * Access to the shared sinval array is protected by two locks, SInvalReadLock + * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this + * authorizes them to modify their own ProcState but not to modify or even + * look at anyone else's. When we need to perform array-wide updates, + * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to + * lock out all readers. Writers take SInvalWriteLock (always in exclusive + * mode) to serialize adding messages to the queue. Note that a writer + * can operate in parallel with one or more readers, because the writer + * has no need to touch anyone's ProcState, except in the infrequent cases + * when SICleanupQueue is needed. The only point of overlap is that + * the writer might change maxMsgNum while readers are looking at it. + * This should be okay: we are assuming that fetching or storing an int + * is atomic, an assumption also made elsewhere in Postgres. However + * readers mustn't assume that maxMsgNum isn't changing under them. */ @@ -59,17 +99,46 @@ * * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. * Must be a multiple of MAXNUMMESSAGES. Should be large. + * + * CLEANUP_MIN: the minimum number of messages that must be in the buffer + * before we bother to call SICleanupQueue. + * + * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once + * we exceed CLEANUP_MIN. Should be a power of 2 for speed. + * + * SIG_THRESHOLD: the minimum number of messages a backend must have fallen + * behind before we'll send it SIGUSR1. + * + * WRITE_QUANTUM: the max number of messages to push into the buffer per + * iteration of SIInsertDataEntries. Noncritical but should be less than + * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once + * per iteration. */ #define MAXNUMMESSAGES 4096 -#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096) +#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) +#define CLEANUP_MIN (MAXNUMMESSAGES / 2) +#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) +#define SIG_THRESHOLD (MAXNUMMESSAGES / 2) +#define WRITE_QUANTUM 64 /* Per-backend state in shared invalidation structure */ typedef struct ProcState { - /* nextMsgNum is -1 in an inactive ProcState array entry. */ - int nextMsgNum; /* next message number to read, or -1 */ - bool resetState; /* true, if backend has to reset its state */ + /* procPid is zero in an inactive ProcState array entry. */ + pid_t procPid; /* PID of backend, for signaling */ + /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ + int nextMsgNum; /* next message number to read */ + bool resetState; /* backend needs to reset its state */ + bool signaled; /* backend has been sent catchup signal */ + + /* + * Next LocalTransactionId to use for each idle backend slot. We keep + * this here because it is indexed by BackendId and it is convenient to + * copy the value to and from local memory when MyBackendId is set. + * It's meaningless in an active ProcState entry. + */ + LocalTransactionId nextLXID; } ProcState; /* Shared cache invalidation memory segment */ @@ -80,16 +149,10 @@ typedef struct SISeg */ int minMsgNum; /* oldest message still needed */ int maxMsgNum; /* next message number to be assigned */ + int nextThreshold; /* # of messages to call SICleanupQueue */ int lastBackend; /* index of last active procState entry, +1 */ int maxBackends; /* size of procState array */ - /* - * Next LocalTransactionId to use for each idle backend slot. We keep - * this here because it is indexed by BackendId and it is convenient to - * copy the value to and from local memory when MyBackendId is set. - */ - LocalTransactionId *nextLXID; /* array of maxBackends entries */ - /* * Circular buffer holding shared-inval messages */ @@ -110,7 +173,6 @@ static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ static LocalTransactionId nextLocalTransactionId; static void CleanupInvalidationState(int status, Datum arg); -static void SISetProcStateInvalid(SISeg *segP); /* @@ -124,8 +186,6 @@ SInvalShmemSize(void) size = offsetof(SISeg, procState); size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); - size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends)); - return size; } @@ -149,11 +209,10 @@ CreateSharedInvalidationState(void) if (found) return; - shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends); - /* Clear message counters, save size of procState array */ shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; + shmInvalBuffer->nextThreshold = CLEANUP_MIN; shmInvalBuffer->lastBackend = 0; shmInvalBuffer->maxBackends = MaxBackends; @@ -162,9 +221,11 @@ CreateSharedInvalidationState(void) /* Mark all backends inactive, and initialize nextLXID */ for (i = 0; i < shmInvalBuffer->maxBackends; i++) { - shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */ + shmInvalBuffer->procState[i].procPid = 0; /* inactive */ + shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ shmInvalBuffer->procState[i].resetState = false; - shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId; + shmInvalBuffer->procState[i].signaled = false; + shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; } } @@ -179,12 +240,19 @@ SharedInvalBackendInit(void) ProcState *stateP = NULL; SISeg *segP = shmInvalBuffer; - LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + /* + * This can run in parallel with read operations, and for that matter + * with write operations; but not in parallel with additions and removals + * of backends, nor in parallel with SICleanupQueue. It doesn't seem + * worth having a third lock, so we choose to use SInvalWriteLock to + * serialize additions/removals. + */ + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); /* Look for a free entry in the procState array */ for (index = 0; index < segP->lastBackend; index++) { - if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */ + if (segP->procState[index].procPid == 0) /* inactive slot? */ { stateP = &segP->procState[index]; break; @@ -196,7 +264,7 @@ SharedInvalBackendInit(void) if (segP->lastBackend < segP->maxBackends) { stateP = &segP->procState[segP->lastBackend]; - Assert(stateP->nextMsgNum < 0); + Assert(stateP->procPid == 0); segP->lastBackend++; } else @@ -205,7 +273,7 @@ SharedInvalBackendInit(void) * out of procState slots: MaxBackends exceeded -- report normally */ MyBackendId = InvalidBackendId; - LWLockRelease(SInvalLock); + LWLockRelease(SInvalWriteLock); ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("sorry, too many clients already"))); @@ -214,21 +282,21 @@ SharedInvalBackendInit(void) MyBackendId = (stateP - &segP->procState[0]) + 1; -#ifdef INVALIDDEBUG - elog(DEBUG2, "my backend id is %d", MyBackendId); -#endif /* INVALIDDEBUG */ + elog(DEBUG4, "my backend id is %d", MyBackendId); /* Advertise assigned backend ID in MyProc */ MyProc->backendId = MyBackendId; /* Fetch next local transaction ID into local memory */ - nextLocalTransactionId = segP->nextLXID[MyBackendId - 1]; + nextLocalTransactionId = stateP->nextLXID; /* mark myself active, with all extant messages already read */ + stateP->procPid = MyProcPid; stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; + stateP->signaled = false; - LWLockRelease(SInvalLock); + LWLockRelease(SInvalWriteLock); /* register exit routine to mark my entry inactive at exit */ on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); @@ -238,8 +306,7 @@ SharedInvalBackendInit(void) * CleanupInvalidationState * Mark the current backend as no longer active. * - * This function is called via on_shmem_exit() during backend shutdown, - * so the caller has NOT acquired the lock for us. + * This function is called via on_shmem_exit() during backend shutdown. * * arg is really of type "SISeg*". */ @@ -247,227 +314,247 @@ static void CleanupInvalidationState(int status, Datum arg) { SISeg *segP = (SISeg *) DatumGetPointer(arg); + ProcState *stateP; int i; Assert(PointerIsValid(segP)); - LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + + stateP = &segP->procState[MyBackendId - 1]; /* Update next local transaction ID for next holder of this backendID */ - segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId; + stateP->nextLXID = nextLocalTransactionId; /* Mark myself inactive */ - segP->procState[MyBackendId - 1].nextMsgNum = -1; - segP->procState[MyBackendId - 1].resetState = false; + stateP->procPid = 0; + stateP->nextMsgNum = 0; + stateP->resetState = false; + stateP->signaled = false; /* Recompute index of last active backend */ for (i = segP->lastBackend; i > 0; i--) { - if (segP->procState[i - 1].nextMsgNum >= 0) + if (segP->procState[i - 1].procPid != 0) break; } segP->lastBackend = i; - LWLockRelease(SInvalLock); + LWLockRelease(SInvalWriteLock); } /* - * SIInsertDataEntry - * Add a new invalidation message to the buffer. - * - * If we are unable to insert the message because the buffer is full, - * then clear the buffer and assert the "reset" flag to each backend. - * This will cause all the backends to discard *all* invalidatable state. - * - * Returns true for normal successful insertion, false if had to reset. + * SIInsertDataEntries + * Add new invalidation message(s) to the buffer. */ -bool -SIInsertDataEntry(SharedInvalidationMessage *data) +void +SIInsertDataEntries(const SharedInvalidationMessage *data, int n) { - int numMsgs; - bool signal_postmaster = false; - SISeg *segP; + SISeg *segP = shmInvalBuffer; + + /* + * N can be arbitrarily large. We divide the work into groups of no more + * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for + * an unreasonably long time. (This is not so much because we care about + * letting in other writers, as that some just-caught-up backend might be + * trying to do SICleanupQueue to pass on its signal, and we don't want it + * to have to wait a long time.) Also, we need to consider calling + * SICleanupQueue every so often. + */ + while (n > 0) + { + int nthistime = Min(n, WRITE_QUANTUM); + int numMsgs; - LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + n -= nthistime; - segP = shmInvalBuffer; - numMsgs = segP->maxMsgNum - segP->minMsgNum; + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); - /* Is the buffer full? */ - if (numMsgs >= MAXNUMMESSAGES) - { /* - * Don't panic just yet: slowest backend might have consumed some - * messages but not yet have done SIDelExpiredDataEntries() to advance - * minMsgNum. So, make sure minMsgNum is up-to-date. + * If the buffer is full, we *must* acquire some space. Clean the + * queue and reset anyone who is preventing space from being freed. + * Otherwise, clean the queue only when it's exceeded the next + * fullness threshold. */ - SIDelExpiredDataEntries(true); numMsgs = segP->maxMsgNum - segP->minMsgNum; - if (numMsgs >= MAXNUMMESSAGES) + if (numMsgs + nthistime > MAXNUMMESSAGES) { - /* Yup, it's definitely full, no choice but to reset */ - SISetProcStateInvalid(segP); - LWLockRelease(SInvalLock); - return false; + SICleanupQueue(true, nthistime); + Assert((segP->maxMsgNum - segP->minMsgNum + nthistime) <= MAXNUMMESSAGES); } - } - - /* - * Try to prevent table overflow. When the table is 70% full send a - * WAKEN_CHILDREN request to the postmaster. The postmaster will send a - * SIGUSR1 signal to all the backends, which will cause sinval.c to read - * any pending SI entries. - * - * This should never happen if all the backends are actively executing - * queries, but if a backend is sitting idle then it won't be starting - * transactions and so won't be reading SI entries. - */ - if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster) - signal_postmaster = true; - - /* - * Insert new message into proper slot of circular buffer - */ - segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data; - segP->maxMsgNum++; - - LWLockRelease(SInvalLock); - - if (signal_postmaster) - { - elog(DEBUG4, "SI table is 70%% full, signaling postmaster"); - SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN); - } - - return true; -} - -/* - * SISetProcStateInvalid - * Flush pending messages from buffer, assert reset flag for each backend - * - * This is used only to recover from SI buffer overflow. - */ -static void -SISetProcStateInvalid(SISeg *segP) -{ - int i; - - segP->minMsgNum = 0; - segP->maxMsgNum = 0; + else if (numMsgs >= segP->nextThreshold) + SICleanupQueue(true, 0); - for (i = 0; i < segP->lastBackend; i++) - { - if (segP->procState[i].nextMsgNum >= 0) /* active backend? */ + /* + * Insert new message(s) into proper slot of circular buffer + */ + while (nthistime-- > 0) { - segP->procState[i].resetState = true; - segP->procState[i].nextMsgNum = 0; + segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++; + segP->maxMsgNum++; } + + LWLockRelease(SInvalWriteLock); } } /* - * SIGetDataEntry - * get next SI message for specified backend, if there is one + * SIGetDataEntries + * get next SI message(s) for current backend, if there are any * * Possible return values: - * 0: no SI message available - * 1: next SI message has been extracted into *data - * (there may be more messages available after this one!) - * -1: SI reset message extracted + * 0: no SI message available + * n>0: next n SI messages have been extracted into data[] + * -1: SI reset message extracted + * + * If the return value is less than the array size "datasize", the caller + * can assume that there are no more SI messages after the one(s) returned. + * Otherwise, another call is needed to collect more messages. * - * NB: this can run in parallel with other instances of SIGetDataEntry + * NB: this can run in parallel with other instances of SIGetDataEntries * executing on behalf of other backends, since each instance will modify only * fields of its own backend's ProcState, and no instance will look at fields - * of other backends' ProcStates. We express this by grabbing SInvalLock in - * shared mode. Note that this is not exactly the normal (read-only) + * of other backends' ProcStates. We express this by grabbing SInvalReadLock + * in shared mode. Note that this is not exactly the normal (read-only) * interpretation of a shared lock! Look closely at the interactions before - * allowing SInvalLock to be grabbed in shared mode for any other reason! + * allowing SInvalReadLock to be grabbed in shared mode for any other reason! + * + * NB: this can also run in parallel with SIInsertDataEntries. It is not + * guaranteed that we will return any messages added after the routine is + * entered. + * + * Note: we assume that "datasize" is not so large that it might be important + * to break our hold on SInvalReadLock into segments. */ int -SIGetDataEntry(int backendId, SharedInvalidationMessage *data) +SIGetDataEntries(SharedInvalidationMessage *data, int datasize) { - ProcState *stateP; SISeg *segP; + ProcState *stateP; + int n; - LWLockAcquire(SInvalLock, LW_SHARED); + LWLockAcquire(SInvalReadLock, LW_SHARED); segP = shmInvalBuffer; - stateP = &segP->procState[backendId - 1]; + stateP = &segP->procState[MyBackendId - 1]; if (stateP->resetState) { /* * Force reset. We can say we have dealt with any messages added - * since the reset, as well... + * since the reset, as well; and that means we should clear the + * signaled flag, too. */ - stateP->resetState = false; stateP->nextMsgNum = segP->maxMsgNum; - LWLockRelease(SInvalLock); + stateP->resetState = false; + stateP->signaled = false; + LWLockRelease(SInvalReadLock); return -1; } - if (stateP->nextMsgNum >= segP->maxMsgNum) - { - LWLockRelease(SInvalLock); - return 0; /* nothing to read */ - } - /* - * Retrieve message and advance my counter. + * Retrieve messages and advance backend's counter, until data array is + * full or there are no more messages. + * + * There may be other backends that haven't read the message(s), so we + * cannot delete them here. SICleanupQueue() will eventually remove them + * from the queue. + * + * Note: depending on the compiler, we might read maxMsgNum only once + * here, or each time through the loop. It doesn't matter (as long as + * each fetch is atomic). */ - *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; - stateP->nextMsgNum++; + n = 0; + while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum) + { + data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; + stateP->nextMsgNum++; + } /* - * There may be other backends that haven't read the message, so we cannot - * delete it here. SIDelExpiredDataEntries() should be called to remove - * dead messages. + * Reset our "signaled" flag whenever we have caught up completely. */ + if (stateP->nextMsgNum >= segP->maxMsgNum) + stateP->signaled = false; - LWLockRelease(SInvalLock); - return 1; /* got a message */ + LWLockRelease(SInvalReadLock); + return n; } /* - * SIDelExpiredDataEntries + * SICleanupQueue * Remove messages that have been consumed by all active backends + * + * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock. + * minFree is the minimum number of free message slots required at completion. + * + * Possible side effects of this routine include marking one or more + * backends as "reset" in the array, and sending a catchup interrupt (SIGUSR1) + * to some backend that seems to be getting too far behind. We signal at + * most one backend at a time, for reasons explained at the top of the file. */ void -SIDelExpiredDataEntries(bool locked) +SICleanupQueue(bool callerHasWriteLock, int minFree) { SISeg *segP = shmInvalBuffer; int min, - i, - h; + minsig, + lowbound, + numMsgs, + i; + ProcState *needSig = NULL; - if (!locked) - LWLockAcquire(SInvalLock, LW_EXCLUSIVE); + /* Lock out all writers and readers */ + if (!callerHasWriteLock) + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); + /* + * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify + * the furthest-back backend that needs signaling (if any), and reset + * any backends that are too far back. + */ min = segP->maxMsgNum; - if (min == segP->minMsgNum) - { - if (!locked) - LWLockRelease(SInvalLock); - return; /* fast path if no messages exist */ - } - - /* Recompute minMsgNum = minimum of all backends' nextMsgNum */ + minsig = min - SIG_THRESHOLD; + lowbound = min - MAXNUMMESSAGES + minFree; for (i = 0; i < segP->lastBackend; i++) { - h = segP->procState[i].nextMsgNum; - if (h >= 0) - { /* backend active */ - if (h < min) - min = h; + ProcState *stateP = &segP->procState[i]; + int n = stateP->nextMsgNum; + + /* Ignore if inactive or already in reset state */ + if (stateP->procPid == 0 || stateP->resetState) + continue; + + /* + * If we must free some space and this backend is preventing it, + * force him into reset state and then ignore until he catches up. + */ + if (n < lowbound) + { + stateP->resetState = true; + /* no point in signaling him ... */ + continue; + } + + /* Track the global minimum nextMsgNum */ + if (n < min) + min = n; + + /* Also see who's furthest back of the unsignaled backends */ + if (n < minsig && !stateP->signaled) + { + minsig = n; + needSig = stateP; } } segP->minMsgNum = min; /* * When minMsgNum gets really large, decrement all message counters so as - * to forestall overflow of the counters. + * to forestall overflow of the counters. This happens seldom enough + * that folding it into the previous loop would be a loser. */ if (min >= MSGNUMWRAPAROUND) { @@ -475,13 +562,43 @@ SIDelExpiredDataEntries(bool locked) segP->maxMsgNum -= MSGNUMWRAPAROUND; for (i = 0; i < segP->lastBackend; i++) { - if (segP->procState[i].nextMsgNum >= 0) - segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; + /* we don't bother skipping inactive entries here */ + segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } } - if (!locked) - LWLockRelease(SInvalLock); + /* + * Determine how many messages are still in the queue, and set the + * threshold at which we should repeat SICleanupQueue(). + */ + numMsgs = segP->maxMsgNum - segP->minMsgNum; + if (numMsgs < CLEANUP_MIN) + segP->nextThreshold = CLEANUP_MIN; + else + segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; + + /* + * Lastly, signal anyone who needs a catchup interrupt. Since kill() + * might not be fast, we don't want to hold locks while executing it. + */ + if (needSig) + { + pid_t his_pid = needSig->procPid; + + needSig->signaled = true; + LWLockRelease(SInvalReadLock); + LWLockRelease(SInvalWriteLock); + elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); + kill(his_pid, SIGUSR1); + if (callerHasWriteLock) + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + } + else + { + LWLockRelease(SInvalReadLock); + if (!callerHasWriteLock) + LWLockRelease(SInvalWriteLock); + } } diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 50e27923566..050d7cc88de 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -80,7 +80,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.85 2008/06/19 00:46:05 alvherre Exp $ + * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.86 2008/06/19 21:32:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -203,7 +203,7 @@ AddInvalidationMessage(InvalidationChunk **listHdr, if (chunk == NULL) { /* First time through; create initial chunk */ -#define FIRSTCHUNKSIZE 16 +#define FIRSTCHUNKSIZE 32 chunk = (InvalidationChunk *) MemoryContextAlloc(CurTransactionContext, sizeof(InvalidationChunk) + @@ -275,6 +275,23 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr, } \ } while (0) +/* + * Process a list of invalidation messages group-wise. + * + * As above, but the code fragment can handle an array of messages. + * The fragment should refer to the messages as msgs[], with n entries. + */ +#define ProcessMessageListMulti(listHdr, codeFragment) \ + do { \ + InvalidationChunk *_chunk; \ + for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \ + { \ + SharedInvalidationMessage *msgs = _chunk->msgs; \ + int n = _chunk->nitems; \ + codeFragment; \ + } \ + } while (0) + /* ---------------------------------------------------------------- * Invalidation set support functions @@ -371,6 +388,18 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr, ProcessMessageList(hdr->rclist, func(msg)); } +/* + * As above, but the function is able to process an array of messages + * rather than just one at a time. + */ +static void +ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr, + void (*func) (const SharedInvalidationMessage *msgs, int n)) +{ + ProcessMessageListMulti(hdr->cclist, func(msgs, n)); + ProcessMessageListMulti(hdr->rclist, func(msgs, n)); +} + /* ---------------------------------------------------------------- * private support functions * ---------------------------------------------------------------- @@ -792,7 +821,7 @@ inval_twophase_postcommit(TransactionId xid, uint16 info, case TWOPHASE_INFO_MSG: msg = (SharedInvalidationMessage *) recdata; Assert(len == sizeof(SharedInvalidationMessage)); - SendSharedInvalidMessage(msg); + SendSharedInvalidMessages(msg, 1); break; case TWOPHASE_INFO_FILE_BEFORE: RelationCacheInitFileInvalidate(true); @@ -850,8 +879,8 @@ AtEOXact_Inval(bool isCommit) AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, &transInvalInfo->CurrentCmdInvalidMsgs); - ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, - SendSharedInvalidMessage); + ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs, + SendSharedInvalidMessages); if (transInvalInfo->RelcacheInitFileInval) RelationCacheInitFileInvalidate(false); @@ -1033,8 +1062,8 @@ EndNonTransactionalInvalidation(void) /* Send out the invals */ ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs, LocalExecuteInvalidationMessage); - ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs, - SendSharedInvalidMessage); + ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, + SendSharedInvalidMessages); /* Clean up and release memory */ for (chunk = transInvalInfo->CurrentCmdInvalidMsgs.cclist; diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index baccfbf5a68..b1088fcd33d 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.38 2008/01/01 19:45:59 momjian Exp $ + * $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.39 2008/06/19 21:32:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -43,7 +43,8 @@ typedef enum LWLockId OidGenLock, XidGenLock, ProcArrayLock, - SInvalLock, + SInvalReadLock, + SInvalWriteLock, FreeSpaceLock, WALInsertLock, WALWriteLock, diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h index c02593e5a86..94f1770ffce 100644 --- a/src/include/storage/pmsignal.h +++ b/src/include/storage/pmsignal.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.19 2008/01/01 19:45:59 momjian Exp $ + * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.20 2008/06/19 21:32:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -23,7 +23,6 @@ typedef enum { PMSIGNAL_PASSWORD_CHANGE, /* pg_auth file has changed */ - PMSIGNAL_WAKEN_CHILDREN, /* send a SIGUSR1 signal to all backends */ PMSIGNAL_WAKEN_ARCHIVER, /* send a NOTIFY signal to xlog archiver */ PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */ PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index 343c8d94bdb..3601216f1b6 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.47 2008/03/16 19:47:34 alvherre Exp $ + * $PostgreSQL: pgsql/src/include/storage/sinval.h,v 1.48 2008/06/19 21:32:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -83,7 +83,8 @@ typedef union } SharedInvalidationMessage; -extern void SendSharedInvalidMessage(SharedInvalidationMessage *msg); +extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, + int n); extern void ReceiveSharedInvalidMessages( void (*invalFunction) (SharedInvalidationMessage *msg), void (*resetFunction) (void)); diff --git a/src/include/storage/sinvaladt.h b/src/include/storage/sinvaladt.h index 8535cba0f06..1748f8821b4 100644 --- a/src/include/storage/sinvaladt.h +++ b/src/include/storage/sinvaladt.h @@ -1,12 +1,13 @@ /*------------------------------------------------------------------------- * * sinvaladt.h - * POSTGRES shared cache invalidation segment definitions. + * POSTGRES shared cache invalidation data manager. * * The shared cache invalidation manager is responsible for transmitting * invalidation messages between backends. Any message sent by any backend * must be delivered to all already-running backends before it can be - * forgotten. + * forgotten. (If we run out of space, we instead deliver a "RESET" + * message to backends that have fallen too far behind.) * * The struct type SharedInvalidationMessage, defining the contents of * a single message, is defined in sinval.h. @@ -14,7 +15,7 @@ * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.47 2008/03/17 11:50:27 alvherre Exp $ + * $PostgreSQL: pgsql/src/include/storage/sinvaladt.h,v 1.48 2008/06/19 21:32:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -23,7 +24,6 @@ #include "storage/sinval.h" - /* * prototypes for functions in sinvaladt.c */ @@ -31,9 +31,9 @@ extern Size SInvalShmemSize(void); extern void CreateSharedInvalidationState(void); extern void SharedInvalBackendInit(void); -extern bool SIInsertDataEntry(SharedInvalidationMessage *data); -extern int SIGetDataEntry(int backendId, SharedInvalidationMessage *data); -extern void SIDelExpiredDataEntries(bool locked); +extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n); +extern int SIGetDataEntries(SharedInvalidationMessage *data, int datasize); +extern void SICleanupQueue(bool callerHasWriteLock, int minFree); extern LocalTransactionId GetNextLocalTransactionId(void); -- GitLab