diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c index 6622d22f5f8d3a951664d35482bde453e4942d28..17b4b6fa6fc2efca89f86db108ed5b38c0cdf120 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.c +++ b/contrib/pg_buffercache/pg_buffercache_pages.c @@ -148,11 +148,12 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) */ for (i = 0; i < NBuffers; i++) { - volatile BufferDesc *bufHdr; + BufferDesc *bufHdr; + uint32 buf_state; bufHdr = GetBufferDescriptor(i); /* Lock each buffer header before inspecting. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr); fctx->record[i].relfilenode = bufHdr->tag.rnode.relNode; @@ -160,21 +161,21 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) fctx->record[i].reldatabase = bufHdr->tag.rnode.dbNode; fctx->record[i].forknum = bufHdr->tag.forkNum; fctx->record[i].blocknum = bufHdr->tag.blockNum; - fctx->record[i].usagecount = bufHdr->usage_count; - fctx->record[i].pinning_backends = bufHdr->refcount; + fctx->record[i].usagecount = BUF_STATE_GET_USAGECOUNT(buf_state); + fctx->record[i].pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state); - if (bufHdr->flags & BM_DIRTY) + if (buf_state & BM_DIRTY) fctx->record[i].isdirty = true; else fctx->record[i].isdirty = false; /* Note if the buffer is valid, and has storage created */ - if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_TAG_VALID)) + if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID)) fctx->record[i].isvalid = true; else fctx->record[i].isvalid = false; - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } /* diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index bfa37f1c66b4c81286a394b753c9f86936062a65..a5cffc7896828291b8e8d46b70496622ef9a450c 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -135,12 +135,9 @@ InitBufferPool(void) BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - buf->refcount = 0; - buf->wait_backend_pid = 0; - SpinLockInit(&buf->buf_hdr_lock); + pg_atomic_init_u32(&buf->state, 0); + buf->wait_backend_pid = 0; buf->buf_id = i; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index c664984d0a11b9f75a71bbcd4612843bbd777efd..29f10e59568792dc8ceea6886298c1dad4f14f67 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -436,11 +436,12 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); +static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - int set_flag_bits); + uint32 set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, @@ -816,8 +817,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -828,10 +832,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ do { - LockBufHdr(bufHdr); - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; - UnlockBufHdr(bufHdr); + uint32 buf_state = LockBufHdr(bufHdr); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + UnlockBufHdr(bufHdr, buf_state); } while (!StartBufferIO(bufHdr, true)); } } @@ -848,7 +853,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); @@ -933,7 +938,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - bufHdr->flags |= BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -987,10 +995,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; int buf_id; BufferDesc *buf; bool valid; + uint32 buf_state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -1059,12 +1068,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * Select a victim buffer. The buffer is returned with its header * spinlock still held! */ - buf = StrategyGetBuffer(strategy); + buf = StrategyGetBuffer(strategy, &buf_state); - Assert(buf->refcount == 0); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); /* Must copy buffer flags while we still hold the spinlock */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); @@ -1108,9 +1117,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, XLogRecPtr lsn; /* Read the LSN while holding buffer header lock */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); lsn = BufferGetLSN(buf); - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf)) @@ -1254,7 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, /* * Need to lock the buffer header too in order to change its tag. */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were @@ -1262,11 +1271,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ - oldFlags = buf->flags; - if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) + oldFlags = buf_state & BUF_FLAG_MASK; + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY)) break; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); BufTableDelete(&newTag, newHash); if ((oldFlags & BM_TAG_VALID) && oldPartitionLock != newPartitionLock) @@ -1284,14 +1293,15 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; - buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | + BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | + BUF_USAGECOUNT_MASK); if (relpersistence == RELPERSISTENCE_PERMANENT) - buf->flags |= BM_TAG_VALID | BM_PERMANENT; + buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; else - buf->flags |= BM_TAG_VALID; - buf->usage_count = 1; + buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (oldFlags & BM_TAG_VALID) { @@ -1338,12 +1348,15 @@ InvalidateBuffer(BufferDesc *buf) BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; + uint32 buf_state; /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; - UnlockBufHdr(buf); + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + UnlockBufHdr(buf, buf_state); /* * Need to compute the old tag's hashcode and partition lock ID. XXX is it @@ -1362,12 +1375,12 @@ retry: LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); return; } @@ -1381,9 +1394,9 @@ retry: * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ - if (buf->refcount != 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); /* safety check: should definitely not be our *own* pin */ if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) @@ -1396,12 +1409,10 @@ retry: * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - - UnlockBufHdr(buf); + buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); + UnlockBufHdr(buf, buf_state); /* * Remove the buffer from the lookup hashtable, if it was in there. @@ -1433,6 +1444,8 @@ void MarkBufferDirty(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state; + uint32 old_buf_state; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); @@ -1449,24 +1462,32 @@ MarkBufferDirty(Buffer buffer) /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); - LockBufHdr(bufHdr); + old_buf_state = pg_atomic_read_u32(&bufHdr->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(bufHdr); - Assert(bufHdr->refcount > 0); + buf_state = old_buf_state; + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, + buf_state)) + break; + } /* * If the buffer was not dirty already, do vacuum accounting. */ - if (!(bufHdr->flags & BM_DIRTY)) + if (!(old_buf_state & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; if (VacuumCostActive) VacuumCostBalance += VacuumCostPageDirty; } - - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - - UnlockBufHdr(bufHdr); } /* @@ -1531,6 +1552,10 @@ ReleaseAndReadBuffer(Buffer buffer, * * This should be applied only to shared buffers, never local ones. * + * Since buffers are pinned/unpinned very frequently, pin buffers without + * taking the buffer header lock; instead update the state variable in loop of + * CAS operations. Hopefully it's just a single CAS. + * * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows @@ -1547,23 +1572,34 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) if (ref == NULL) { + uint32 buf_state; + uint32 old_buf_state; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); - LockBufHdr(buf); - buf->refcount++; - if (strategy == NULL) - { - if (buf->usage_count < BM_MAX_USAGE_COUNT) - buf->usage_count++; - } - else + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { - if (buf->usage_count == 0) - buf->usage_count = 1; + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + /* increase refcount */ + buf_state += BUF_REFCOUNT_ONE; + + /* increase usagecount unless already max */ + if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) + buf_state += BUF_USAGECOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + { + result = (buf_state & BM_VALID) != 0; + break; + } } - result = (buf->flags & BM_VALID) != 0; - UnlockBufHdr(buf); } else { @@ -1603,6 +1639,7 @@ PinBuffer_Locked(BufferDesc *buf) { Buffer b; PrivateRefCountEntry *ref; + uint32 buf_state; /* * As explained, We don't expect any preexisting pins. That allows us to @@ -1610,8 +1647,14 @@ PinBuffer_Locked(BufferDesc *buf) */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); - buf->refcount++; - UnlockBufHdr(buf); + /* + * Since we hold the buffer spinlock, we can update the buffer state and + * release the lock in one operation. + */ + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + buf_state += BUF_REFCOUNT_ONE; + UnlockBufHdr(buf, buf_state); b = BufferDescriptorGetBuffer(buf); @@ -1646,30 +1689,59 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) ref->refcount--; if (ref->refcount == 0) { + uint32 buf_state; + uint32 old_buf_state; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); - LockBufHdr(buf); + /* + * Decrement the shared reference count. + * + * Since buffer spinlock holder can update status using just write, + * it's not safe to use atomic decrement here; thus use a CAS loop. + */ + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + buf_state -= BUF_REFCOUNT_ONE; - /* Decrement the shared reference count */ - Assert(buf->refcount > 0); - buf->refcount--; + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + break; + } /* Support LockBufferForCleanup() */ - if ((buf->flags & BM_PIN_COUNT_WAITER) && - buf->refcount == 1) + if (buf_state & BM_PIN_COUNT_WAITER) { - /* we just released the last pin other than the waiter's */ - int wait_backend_pid = buf->wait_backend_pid; + /* + * Acquire the buffer header lock, re-check that there's a waiter. + * Another backend could have unpinned this buffer, and already + * woken up the waiter. There's no danger of the buffer being + * replaced after we unpinned it above, as it's pinned by the + * waiter. + */ + buf_state = LockBufHdr(buf); - buf->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); - ProcSendSignal(wait_backend_pid); - } - else - UnlockBufHdr(buf); + if ((buf_state & BM_PIN_COUNT_WAITER) && + BUF_STATE_GET_REFCOUNT(buf_state) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pid = buf->wait_backend_pid; + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(buf, buf_state); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf, buf_state); + } ForgetPrivateRefCountEntry(ref); } } @@ -1687,6 +1759,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) static void BufferSync(int flags) { + uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -1736,13 +1809,13 @@ BufferSync(int flags) * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if ((bufHdr->flags & mask) == mask) + if ((buf_state & mask) == mask) { CkptSortItem *item; - bufHdr->flags |= BM_CHECKPOINT_NEEDED; + buf_state |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; @@ -1752,7 +1825,7 @@ BufferSync(int flags) item->blockNum = bufHdr->tag.blockNum; } - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } if (num_to_scan == 0) @@ -1888,7 +1961,7 @@ BufferSync(int flags) * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ - if (bufHdr->flags & BM_CHECKPOINT_NEEDED) + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { @@ -2176,8 +2249,8 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int buffer_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, true, + wb_context); if (++next_to_clean >= NBuffers) { @@ -2186,7 +2259,7 @@ BgBufferSync(WritebackContext *wb_context) } num_to_scan--; - if (buffer_state & BUF_WRITTEN) + if (sync_state & BUF_WRITTEN) { reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) @@ -2195,7 +2268,7 @@ BgBufferSync(WritebackContext *wb_context) break; } } - else if (buffer_state & BUF_REUSABLE) + else if (sync_state & BUF_REUSABLE) reusable_buffers++; } @@ -2258,6 +2331,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 buf_state; BufferTag tag; ReservePrivateRefCountEntry(); @@ -2271,21 +2345,24 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && + BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + { result |= BUF_REUSABLE; + } else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } - if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } @@ -2439,6 +2516,7 @@ PrintBufferLeakWarning(Buffer buffer) int32 loccount; char *path; BackendId backend; + uint32 buf_state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) @@ -2456,12 +2534,13 @@ PrintBufferLeakWarning(Buffer buffer) /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + buf_state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, - buf->tag.blockNum, buf->flags, - buf->refcount, loccount); + buf->tag.blockNum, buf_state & BUF_FLAG_MASK, + BUF_STATE_GET_REFCOUNT(buf_state), loccount); pfree(path); } @@ -2573,6 +2652,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) io_time; Block bufBlock; char *bufToWrite; + uint32 buf_state; /* * Acquire the buffer's io_in_progress lock. If StartBufferIO returns @@ -2598,7 +2678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Run PageGetLSN while holding header lock, since we don't have the @@ -2607,8 +2687,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf->flags &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(buf, buf_state); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL @@ -2627,7 +2707,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf->flags & BM_PERMANENT) + if (buf_state & BM_PERMANENT) XLogFlush(recptr); /* @@ -2716,12 +2796,12 @@ BufferIsPermanent(Buffer buffer) /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we * need not bother with the buffer header spinlock. Even if someone else - * changes the buffer header flags while we're doing this, we assume that - * changing an aligned 2-byte BufFlags value is atomic, so we'll read the - * old value or the new value, but not random garbage. + * changes the buffer header state while we're doing this, the state is + * changed atomically, so we'll read the old value or the new value, but + * not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); - return (bufHdr->flags & BM_PERMANENT) != 0; + return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* @@ -2736,6 +2816,7 @@ BufferGetLSNAtomic(Buffer buffer) BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1); char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); XLogRecPtr lsn; + uint32 buf_state; /* * If we don't need locking for correctness, fastpath out. @@ -2747,9 +2828,9 @@ BufferGetLSNAtomic(Buffer buffer) Assert(BufferIsValid(buffer)); Assert(BufferIsPinned(buffer)); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); lsn = PageGetLSN(page); - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return lsn; } @@ -2797,6 +2878,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * We can make this a tad faster by prechecking the buffer tag before @@ -2817,13 +2899,13 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node)) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -2887,6 +2969,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) { RelFileNode *rnode = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2917,11 +3000,11 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) if (rnode == NULL) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } pfree(nodes); @@ -2951,6 +3034,7 @@ DropDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2959,11 +3043,11 @@ DropDatabaseBuffers(Oid dbid) if (bufHdr->tag.rnode.dbNode != dbid) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3055,9 +3139,12 @@ FlushRelationBuffers(Relation rel) { for (i = 0; i < NLocBuffer; i++) { + uint32 buf_state; + bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + ((buf_state = pg_atomic_read_u32(&bufHdr->state)) & + (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; @@ -3078,7 +3165,8 @@ FlushRelationBuffers(Relation rel) localpage, false); - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED); + pg_atomic_write_u32(&bufHdr->state, buf_state); /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -3093,6 +3181,8 @@ FlushRelationBuffers(Relation rel) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3104,9 +3194,9 @@ FlushRelationBuffers(Relation rel) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3115,7 +3205,7 @@ FlushRelationBuffers(Relation rel) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3145,6 +3235,8 @@ FlushDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3156,9 +3248,9 @@ FlushDatabaseBuffers(Oid dbid) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3167,7 +3259,7 @@ FlushDatabaseBuffers(Oid dbid) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3297,12 +3389,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ - if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != + if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 buf_state; /* * If we need to protect hint bit updates from torn writes, WAL-log a @@ -3313,7 +3406,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ - if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) + if (XLogHintBitIsNeeded() && + (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. @@ -3352,9 +3446,11 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) lsn = XLogSaveBufferForHint(buffer, buffer_std); } - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (!(bufHdr->flags & BM_DIRTY)) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + + if (!(buf_state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ @@ -3374,8 +3470,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - UnlockBufHdr(bufHdr); + + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + UnlockBufHdr(bufHdr, buf_state); if (delayChkpt) MyPgXact->delayChkpt = false; @@ -3406,17 +3503,19 @@ UnlockBuffers(void) if (buf) { - LockBufHdr(buf); + uint32 buf_state; + + buf_state = LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ - if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) - buf->flags &= ~BM_PIN_COUNT_WAITER; + buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); PinCountWaitBuf = NULL; } @@ -3509,27 +3608,30 @@ LockBufferForCleanup(Buffer buffer) for (;;) { + uint32 buf_state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return; } /* Failed, so mark myself as waiting for pincount 1 */ - if (bufHdr->flags & BM_PIN_COUNT_WAITER) + if (buf_state & BM_PIN_COUNT_WAITER) { - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; - bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; - UnlockBufHdr(bufHdr); + buf_state |= BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Report the wait */ @@ -3558,11 +3660,11 @@ LockBufferForCleanup(Buffer buffer) * impossible with the current usages due to table level locking, but * better be safe. */ - LockBufHdr(bufHdr); - if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && + buf_state = LockBufHdr(bufHdr); + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) - bufHdr->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(bufHdr); + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); PinCountWaitBuf = NULL; /* Loop back and try again */ @@ -3603,22 +3705,26 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state, + refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ - Assert(LocalRefCount[-buffer - 1] > 0); - if (LocalRefCount[-buffer - 1] != 1) + Assert(refcount > 0); + if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ - Assert(GetPrivateRefCount(buffer) > 0); - if (GetPrivateRefCount(buffer) != 1) + refcount = GetPrivateRefCount(buffer); + Assert(refcount); + if (refcount != 1) return false; /* Try to acquire lock */ @@ -3626,17 +3732,19 @@ ConditionalLockBufferForCleanup(Buffer buffer) return false; bufHdr = GetBufferDescriptor(buffer - 1); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + refcount = BUF_STATE_GET_REFCOUNT(buf_state); + + Assert(refcount > 0); + if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return true; } /* Failed, so release the lock */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); return false; } @@ -3666,17 +3774,17 @@ WaitIO(BufferDesc *buf) */ for (;;) { - BufFlags sv_flags; + uint32 buf_state; /* * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ - LockBufHdr(buf); - sv_flags = buf->flags; - UnlockBufHdr(buf); - if (!(sv_flags & BM_IO_IN_PROGRESS)) + buf_state = LockBufHdr(buf); + UnlockBufHdr(buf, buf_state); + + if (!(buf_state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockRelease(BufferDescriptorGetIOLock(buf)); @@ -3704,6 +3812,8 @@ WaitIO(BufferDesc *buf) static bool StartBufferIO(BufferDesc *buf, bool forInput) { + uint32 buf_state; + Assert(!InProgressBuf); for (;;) @@ -3714,9 +3824,9 @@ StartBufferIO(BufferDesc *buf, bool forInput) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); - if (!(buf->flags & BM_IO_IN_PROGRESS)) + if (!(buf_state & BM_IO_IN_PROGRESS)) break; /* @@ -3725,24 +3835,23 @@ StartBufferIO(BufferDesc *buf, bool forInput) * an error (see AbortBufferIO). If that's the case, we must wait for * him to get unwedged. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); WaitIO(buf); } /* Once we get here, there is definitely no I/O active on this buffer */ - if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) + if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { /* someone else already did the I/O */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); return false; } - buf->flags |= BM_IO_IN_PROGRESS; - - UnlockBufHdr(buf); + buf_state |= BM_IO_IN_PROGRESS; + UnlockBufHdr(buf, buf_state); InProgressBuf = buf; IsForInput = forInput; @@ -3768,19 +3877,22 @@ StartBufferIO(BufferDesc *buf, bool forInput) * be 0, or BM_VALID if we just finished reading in the page. */ static void -TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) +TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) { + uint32 buf_state; + Assert(buf == InProgressBuf); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); + + Assert(buf_state & BM_IO_IN_PROGRESS); - Assert(buf->flags & BM_IO_IN_PROGRESS); - buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) - buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - buf->flags |= set_flag_bits; + buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); + if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) + buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - UnlockBufHdr(buf); + buf_state |= set_flag_bits; + UnlockBufHdr(buf, buf_state); InProgressBuf = NULL; @@ -3803,6 +3915,8 @@ AbortBufferIO(void) if (buf) { + uint32 buf_state; + /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that @@ -3811,24 +3925,22 @@ AbortBufferIO(void) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); - Assert(buf->flags & BM_IO_IN_PROGRESS); + buf_state = LockBufHdr(buf); + Assert(buf_state & BM_IO_IN_PROGRESS); if (IsForInput) { - Assert(!(buf->flags & BM_DIRTY)); + Assert(!(buf_state & BM_DIRTY)); + /* We'd better not think buffer is valid yet */ - Assert(!(buf->flags & BM_VALID)); - UnlockBufHdr(buf); + Assert(!(buf_state & BM_VALID)); + UnlockBufHdr(buf, buf_state); } else { - BufFlags sv_flags; - - sv_flags = buf->flags; - Assert(sv_flags & BM_DIRTY); - UnlockBufHdr(buf); + Assert(buf_state & BM_DIRTY); + UnlockBufHdr(buf, buf_state); /* Issue notice if this is not the first failure... */ - if (sv_flags & BM_IO_ERROR) + if (buf_state & BM_IO_ERROR) { /* Buffer is pinned, so we can read tag without spinlock */ char *path; @@ -3911,6 +4023,54 @@ rnode_comparator(const void *p1, const void *p2) return 0; } +/* + * Lock buffer header - set BM_LOCKED in buffer state. + */ +uint32 +LockBufHdr(BufferDesc *desc) +{ + SpinDelayStatus delayStatus = init_spin_delay(desc); + uint32 old_buf_state; + + while (true) + { + /* set BM_LOCKED flag */ + old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED); + /* if it wasn't set before we're OK */ + if (!(old_buf_state & BM_LOCKED)) + break; + perform_spin_delay(&delayStatus); + } + finish_spin_delay(&delayStatus); + return old_buf_state | BM_LOCKED; +} + +/* + * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's + * state at that point. + * + * Obviously the buffer could be locked by the time the value is returned, so + * this is primarily useful in CAS style loops. + */ +static uint32 +WaitBufHdrUnlocked(BufferDesc *buf) +{ + SpinDelayStatus delayStatus = init_spin_delay(buf); + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&buf->state); + + while (buf_state & BM_LOCKED) + { + perform_spin_delay(&delayStatus); + buf_state = pg_atomic_read_u32(&buf->state); + } + + finish_spin_delay(&delayStatus); + + return buf_state; +} + /* * BufferTag comparator. */ diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 551d15205ca6498bfe5407008ab9464aadba2178..88b90dc5276d285b6a4daaccea243348f26b7a79 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -98,7 +98,8 @@ typedef struct BufferAccessStrategyData /* Prototypes for internal functions */ -static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy); +static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, + uint32 *buf_state); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); @@ -180,11 +181,12 @@ ClockSweepTick(void) * return the buffer with the buffer header spinlock still held. */ BufferDesc * -StrategyGetBuffer(BufferAccessStrategy strategy) +StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state) { BufferDesc *buf; int bgwprocno; int trycounter; + uint32 local_buf_state; /* to avoid repeated (de-)referencing */ /* * If given a strategy object, see whether it can select a buffer. We @@ -192,7 +194,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) */ if (strategy != NULL) { - buf = GetBufferFromRing(strategy); + buf = GetBufferFromRing(strategy, buf_state); if (buf != NULL) return buf; } @@ -279,14 +281,16 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * it before we got to it. It's probably impossible altogether as * of 8.3, but we'd better check anyway.) */ - LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count == 0) + local_buf_state = LockBufHdr(buf); + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 + && BUF_STATE_GET_USAGECOUNT(local_buf_state) == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); + *buf_state = local_buf_state; return buf; } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); } } @@ -295,19 +299,20 @@ StrategyGetBuffer(BufferAccessStrategy strategy) trycounter = NBuffers; for (;;) { - buf = GetBufferDescriptor(ClockSweepTick()); /* * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */ - LockBufHdr(buf); - if (buf->refcount == 0) + local_buf_state = LockBufHdr(buf); + + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0) { - if (buf->usage_count > 0) + if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0) { - buf->usage_count--; + local_buf_state -= BUF_USAGECOUNT_ONE; + trycounter = NBuffers; } else @@ -315,6 +320,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) /* Found a usable buffer */ if (strategy != NULL) AddBufferToRing(strategy, buf); + *buf_state = local_buf_state; return buf; } } @@ -327,10 +333,10 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * probably better to fail than to risk getting stuck in an * infinite loop. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); elog(ERROR, "no unpinned buffers available"); } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); } } @@ -585,10 +591,12 @@ FreeAccessStrategy(BufferAccessStrategy strategy) * The bufhdr spin lock is held on the returned buffer. */ static BufferDesc * -GetBufferFromRing(BufferAccessStrategy strategy) +GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) { BufferDesc *buf; Buffer bufnum; + uint32 local_buf_state; /* to avoid repeated (de-)referencing */ + /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) @@ -616,13 +624,15 @@ GetBufferFromRing(BufferAccessStrategy strategy) * shouldn't re-use it. */ buf = GetBufferDescriptor(bufnum - 1); - LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count <= 1) + local_buf_state = LockBufHdr(buf); + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 + && BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1) { strategy->current_was_in_ring = true; + *buf_state = local_buf_state; return buf; } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); /* * Tell caller to allocate a new buffer with the normal allocation diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 17640cfe2a7001b4611a71a159b8617e6fdf0c61..68b402023a1883f3f6ed7d6336b605c5b6ef1b86 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, int b; int trycounter; bool found; + uint32 buf_state; INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -128,16 +129,21 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif + buf_state = pg_atomic_read_u32(&bufHdr->state); + /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count < BM_MAX_USAGE_COUNT) - bufHdr->usage_count++; + if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) + { + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); + } } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); - if (bufHdr->flags & BM_VALID) + if (buf_state & BM_VALID) *foundPtr = TRUE; else { @@ -169,9 +175,12 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count > 0) + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0) { - bufHdr->usage_count--; + buf_state -= BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); trycounter = NLocBuffer; } else @@ -193,7 +202,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ - if (bufHdr->flags & BM_DIRTY) + if (buf_state & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); @@ -211,7 +220,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, false); /* Mark not-dirty now in case we error out below */ - bufHdr->flags &= ~BM_DIRTY; + buf_state &= ~BM_DIRTY; + pg_atomic_write_u32(&bufHdr->state, buf_state); pgBufferUsage.local_blks_written++; } @@ -228,7 +238,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, /* * Update the hash table: remove old entry, if any, and make new one. */ - if (bufHdr->flags & BM_TAG_VALID) + if (buf_state & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, @@ -237,7 +247,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID); + buf_state &= ~(BM_VALID | BM_TAG_VALID); + pg_atomic_write_u32(&bufHdr->state, buf_state); } hresult = (LocalBufferLookupEnt *) @@ -250,9 +261,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * it's all ours now. */ bufHdr->tag = newTag; - bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); - bufHdr->flags |= BM_TAG_VALID; - bufHdr->usage_count = 1; + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); + buf_state |= BM_TAG_VALID; + buf_state &= ~BUF_USAGECOUNT_MASK; + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); *foundPtr = FALSE; return bufHdr; @@ -267,6 +280,7 @@ MarkLocalBufferDirty(Buffer buffer) { int bufid; BufferDesc *bufHdr; + uint32 buf_state; Assert(BufferIsLocal(buffer)); @@ -280,10 +294,10 @@ MarkLocalBufferDirty(Buffer buffer) bufHdr = GetLocalBufferDescriptor(bufid); - if (!(bufHdr->flags & BM_DIRTY)) - pgBufferUsage.local_blks_dirtied++; + buf_state = pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY); - bufHdr->flags |= BM_DIRTY; + if (!(buf_state & BM_DIRTY)) + pgBufferUsage.local_blks_dirtied++; } /* @@ -307,8 +321,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 buf_state; - if ((bufHdr->flags & BM_TAG_VALID) && + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) @@ -327,8 +344,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, buf_state); } } } @@ -349,8 +367,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&bufHdr->state); - if ((bufHdr->flags & BM_TAG_VALID) && + if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) @@ -367,8 +388,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, buf_state); } } } diff --git a/src/backend/storage/lmgr/s_lock.c b/src/backend/storage/lmgr/s_lock.c index cc0bf5e01fb97a9482ee4e785f2ee6e949785d53..4a6ffb4f89038a06e21b3a3d0461c2598c231386 100644 --- a/src/backend/storage/lmgr/s_lock.c +++ b/src/backend/storage/lmgr/s_lock.c @@ -3,6 +3,38 @@ * s_lock.c * Hardware-dependent implementation of spinlocks. * + * When waiting for a contended spinlock we loop tightly for awhile, then + * delay using pg_usleep() and try again. Preferably, "awhile" should be a + * small multiple of the maximum time we expect a spinlock to be held. 100 + * iterations seems about right as an initial guess. However, on a + * uniprocessor the loop is a waste of cycles, while in a multi-CPU scenario + * it's usually better to spin a bit longer than to call the kernel, so we try + * to adapt the spin loop count depending on whether we seem to be in a + * uniprocessor or multiprocessor. + * + * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd + * be wrong; there are platforms where that can result in a "stuck + * spinlock" failure. This has been seen particularly on Alphas; it seems + * that the first TAS after returning from kernel space will always fail + * on that hardware. + * + * Once we do decide to block, we use randomly increasing pg_usleep() + * delays. The first delay is 1 msec, then the delay randomly increases to + * about one second, after which we reset to 1 msec and start again. The + * idea here is that in the presence of heavy contention we need to + * increase the delay, else the spinlock holder may never get to run and + * release the lock. (Consider situation where spinlock holder has been + * nice'd down in priority by the scheduler --- it will not get scheduled + * until all would-be acquirers are sleeping, so if we always use a 1-msec + * sleep, there is a real possibility of starvation.) But we can't just + * clamp the delay to an upper bound, else it would take a long time to + * make a reasonable number of tries. + * + * We time out and declare error after NUM_DELAYS delays (thus, exactly + * that many tries). With the given settings, this will usually take 2 or + * so minutes. It seems better to fix the total number of tries (and thus + * the probability of unintended failure) than to fix the total time + * spent. * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -21,6 +53,14 @@ #include "storage/s_lock.h" #include "storage/barrier.h" + +#define MIN_SPINS_PER_DELAY 10 +#define MAX_SPINS_PER_DELAY 1000 +#define NUM_DELAYS 1000 +#define MIN_DELAY_USEC 1000L +#define MAX_DELAY_USEC 1000000L + + slock_t dummy_spinlock; static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; @@ -30,117 +70,107 @@ static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; * s_lock_stuck() - complain about a stuck spinlock */ static void -s_lock_stuck(volatile slock_t *lock, const char *file, int line) +s_lock_stuck(void *p, const char *file, int line) { #if defined(S_LOCK_TEST) fprintf(stderr, "\nStuck spinlock (%p) detected at %s:%d.\n", - lock, file, line); + p, file, line); exit(1); #else elog(PANIC, "stuck spinlock (%p) detected at %s:%d", - lock, file, line); + p, file, line); #endif } - /* * s_lock(lock) - platform-independent portion of waiting for a spinlock. */ int s_lock(volatile slock_t *lock, const char *file, int line) { - /* - * We loop tightly for awhile, then delay using pg_usleep() and try again. - * Preferably, "awhile" should be a small multiple of the maximum time we - * expect a spinlock to be held. 100 iterations seems about right as an - * initial guess. However, on a uniprocessor the loop is a waste of - * cycles, while in a multi-CPU scenario it's usually better to spin a bit - * longer than to call the kernel, so we try to adapt the spin loop count - * depending on whether we seem to be in a uniprocessor or multiprocessor. - * - * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd - * be wrong; there are platforms where that can result in a "stuck - * spinlock" failure. This has been seen particularly on Alphas; it seems - * that the first TAS after returning from kernel space will always fail - * on that hardware. - * - * Once we do decide to block, we use randomly increasing pg_usleep() - * delays. The first delay is 1 msec, then the delay randomly increases to - * about one second, after which we reset to 1 msec and start again. The - * idea here is that in the presence of heavy contention we need to - * increase the delay, else the spinlock holder may never get to run and - * release the lock. (Consider situation where spinlock holder has been - * nice'd down in priority by the scheduler --- it will not get scheduled - * until all would-be acquirers are sleeping, so if we always use a 1-msec - * sleep, there is a real possibility of starvation.) But we can't just - * clamp the delay to an upper bound, else it would take a long time to - * make a reasonable number of tries. - * - * We time out and declare error after NUM_DELAYS delays (thus, exactly - * that many tries). With the given settings, this will usually take 2 or - * so minutes. It seems better to fix the total number of tries (and thus - * the probability of unintended failure) than to fix the total time - * spent. - */ -#define MIN_SPINS_PER_DELAY 10 -#define MAX_SPINS_PER_DELAY 1000 -#define NUM_DELAYS 1000 -#define MIN_DELAY_USEC 1000L -#define MAX_DELAY_USEC 1000000L - - int spins = 0; - int delays = 0; - int cur_delay = 0; + SpinDelayStatus delayStatus = init_spin_delay((void *) lock); while (TAS_SPIN(lock)) { - /* CPU-specific delay each time through the loop */ - SPIN_DELAY(); + perform_spin_delay(&delayStatus); + } - /* Block the process every spins_per_delay tries */ - if (++spins >= spins_per_delay) - { - if (++delays > NUM_DELAYS) - s_lock_stuck(lock, file, line); + finish_spin_delay(&delayStatus); - if (cur_delay == 0) /* first time to delay? */ - cur_delay = MIN_DELAY_USEC; + return delayStatus.delays; +} - pg_usleep(cur_delay); +#ifdef USE_DEFAULT_S_UNLOCK +void +s_unlock(volatile slock_t *lock) +{ +#ifdef TAS_ACTIVE_WORD + /* HP's PA-RISC */ + *TAS_ACTIVE_WORD(lock) = -1; +#else + *lock = 0; +#endif +} +#endif + +/* + * Wait while spinning on a contended spinlock. + */ +void +perform_spin_delay(SpinDelayStatus *status) +{ + /* CPU-specific delay each time through the loop */ + SPIN_DELAY(); + + /* Block the process every spins_per_delay tries */ + if (++(status->spins) >= spins_per_delay) + { + if (++(status->delays) > NUM_DELAYS) + s_lock_stuck(status->ptr, status->file, status->line); + + if (status->cur_delay == 0) /* first time to delay? */ + status->cur_delay = MIN_DELAY_USEC; + + pg_usleep(status->cur_delay); #if defined(S_LOCK_TEST) - fprintf(stdout, "*"); - fflush(stdout); + fprintf(stdout, "*"); + fflush(stdout); #endif - /* increase delay by a random fraction between 1X and 2X */ - cur_delay += (int) (cur_delay * + /* increase delay by a random fraction between 1X and 2X */ + status->cur_delay += (int) (status->cur_delay * ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5); - /* wrap back to minimum delay when max is exceeded */ - if (cur_delay > MAX_DELAY_USEC) - cur_delay = MIN_DELAY_USEC; + /* wrap back to minimum delay when max is exceeded */ + if (status->cur_delay > MAX_DELAY_USEC) + status->cur_delay = MIN_DELAY_USEC; - spins = 0; - } + status->spins = 0; } +} - /* - * If we were able to acquire the lock without delaying, it's a good - * indication we are in a multiprocessor. If we had to delay, it's a sign - * (but not a sure thing) that we are in a uniprocessor. Hence, we - * decrement spins_per_delay slowly when we had to delay, and increase it - * rapidly when we didn't. It's expected that spins_per_delay will - * converge to the minimum value on a uniprocessor and to the maximum - * value on a multiprocessor. - * - * Note: spins_per_delay is local within our current process. We want to - * average these observations across multiple backends, since it's - * relatively rare for this function to even get entered, and so a single - * backend might not live long enough to converge on a good value. That - * is handled by the two routines below. - */ - if (cur_delay == 0) +/* + * After acquiring a spinlock, update estimates about how long to loop. + * + * If we were able to acquire the lock without delaying, it's a good + * indication we are in a multiprocessor. If we had to delay, it's a sign + * (but not a sure thing) that we are in a uniprocessor. Hence, we + * decrement spins_per_delay slowly when we had to delay, and increase it + * rapidly when we didn't. It's expected that spins_per_delay will + * converge to the minimum value on a uniprocessor and to the maximum + * value on a multiprocessor. + * + * Note: spins_per_delay is local within our current process. We want to + * average these observations across multiple backends, since it's + * relatively rare for this function to even get entered, and so a single + * backend might not live long enough to converge on a good value. That + * is handled by the two routines below. + */ +void +finish_spin_delay(SpinDelayStatus *status) +{ + if (status->cur_delay == 0) { /* we never had to delay */ if (spins_per_delay < MAX_SPINS_PER_DELAY) @@ -151,22 +181,8 @@ s_lock(volatile slock_t *lock, const char *file, int line) if (spins_per_delay > MIN_SPINS_PER_DELAY) spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY); } - return delays; } -#ifdef USE_DEFAULT_S_UNLOCK -void -s_unlock(volatile slock_t *lock) -{ -#ifdef TAS_ACTIVE_WORD - /* HP's PA-RISC */ - *TAS_ACTIVE_WORD(lock) = -1; -#else - *lock = 0; -#endif -} -#endif - /* * Set local copy of spins_per_delay during backend startup. * diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h index 61e3060c9fd7ba44c98f86092556f07cf7502254..b2d7776f2a8c74a6dcc0ff84a95c57928f25fcdb 100644 --- a/src/include/postmaster/postmaster.h +++ b/src/include/postmaster/postmaster.h @@ -63,12 +63,15 @@ extern void ShmemBackendArrayAllocation(void); #endif /* - * Note: MAX_BACKENDS is limited to 2^23-1 because inval.c stores the - * backend ID as a 3-byte signed integer. Even if that limitation were - * removed, we still could not exceed INT_MAX/4 because some places compute - * 4*MaxBackends without any overflow check. This is rechecked in the relevant - * GUC check hooks and in RegisterBackgroundWorker(). + * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved + * for buffer references in buf_internals.h. This limitation could be lifted + * by using a 64bit state; but it's unlikely to be worthwhile as 2^18-1 + * backends exceed currently realistic configurations. Even if that limitation + * were removed, we still could not a) exceed 2^23-1 because inval.c stores + * the backend ID as a 3-byte signed integer, b) INT_MAX/4 because some places + * compute 4*MaxBackends without any overflow check. This is rechecked in the + * relevant GUC check hooks and in RegisterBackgroundWorker(). */ -#define MAX_BACKENDS 0x7fffff +#define MAX_BACKENDS 0x3FFFF #endif /* _POSTMASTER_H */ diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index d04363b1665c02246955b1df3745491fc901219c..f8f71255fdd93c95d260b4d1c8123069ab5547e4 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -21,29 +21,51 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/smgr.h" +#include "port/atomics.h" #include "storage/spin.h" #include "utils/relcache.h" +/* + * Buffer state is a single 32-bit variable where following data is combined. + * + * - 18 bits refcount + * - 4 bits usage count + * - 10 bits of flags + * + * Combining these values allows to perform some operations without locking + * the buffer header, by modifying them together with a CAS loop. + * + * The definition of buffer state components is below. + */ +#define BUF_REFCOUNT_ONE 1 +#define BUF_REFCOUNT_MASK ((1U << 18) - 1) +#define BUF_USAGECOUNT_MASK 0x003C0000U +#define BUF_USAGECOUNT_ONE (1U << 18) +#define BUF_USAGECOUNT_SHIFT 18 +#define BUF_FLAG_MASK 0xFFC00000U + +/* Get refcount and usagecount from buffer state */ +#define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK) +#define BUF_STATE_GET_USAGECOUNT(state) (((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT) + /* * Flags for buffer descriptors * * Note: TAG_VALID essentially means that there is a buffer hashtable * entry associated with the buffer's tag. */ -#define BM_DIRTY (1 << 0) /* data needs writing */ -#define BM_VALID (1 << 1) /* data is valid */ -#define BM_TAG_VALID (1 << 2) /* tag is assigned */ -#define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */ -#define BM_IO_ERROR (1 << 4) /* previous I/O failed */ -#define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */ -#define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */ -#define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */ -#define BM_PERMANENT (1 << 8) /* permanent relation (not +#define BM_LOCKED (1U << 22) /* buffer header is locked */ +#define BM_DIRTY (1U << 23) /* data needs writing */ +#define BM_VALID (1U << 24) /* data is valid */ +#define BM_TAG_VALID (1U << 25) /* tag is assigned */ +#define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */ +#define BM_IO_ERROR (1U << 27) /* previous I/O failed */ +#define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */ +#define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */ +#define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */ +#define BM_PERMANENT (1U << 31) /* permanent relation (not * unlogged) */ - -typedef bits16 BufFlags; - /* * The maximum allowed value of usage_count represents a tradeoff between * accuracy and speed of the clock-sweep buffer management algorithm. A @@ -113,18 +135,29 @@ typedef struct buftag /* * BufferDesc -- shared descriptor/state data for a single shared buffer. * - * Note: buf_hdr_lock must be held to examine or change the tag, flags, - * usage_count, refcount, or wait_backend_pid fields. buf_id field never - * changes after initialization, so does not need locking. freeNext is - * protected by the buffer_strategy_lock not buf_hdr_lock. The LWLock can - * take care of itself. The buf_hdr_lock is *not* used to control access to - * the data in the buffer! + * Note: Buffer header lock (BM_LOCKED flag) must be held to examine or change + * the tag, state or wait_backend_pid fields. In general, buffer header lock + * is a spinlock which is combined with flags, refcount and usagecount into + * single atomic variable. This layout allow us to do some operations in a + * single atomic operation, without actually acquiring and releasing spinlock; + * for instance, increase or decrease refcount. buf_id field never changes + * after initialization, so does not need locking. freeNext is protected by + * the buffer_strategy_lock not buffer header lock. The LWLock can take care + * of itself. The buffer header lock is *not* used to control access to the + * data in the buffer! + * + * It's assumed that nobody changes the state field while buffer header lock + * is held. Thus buffer header lock holder can do complex updates of the + * state variable in single write, simultaneously with lock release (cleaning + * BM_LOCKED flag). On the other hand, updating of state without holding + * buffer header lock is restricted to CAS, which insure that BM_LOCKED flag + * is not set. Atomic increment/decrement, OR/AND etc. are not allowed. * * An exception is that if we have the buffer pinned, its tag can't change - * underneath us, so we can examine the tag without locking the spinlock. + * underneath us, so we can examine the tag without locking the buffer header. * Also, in places we do one-time reads of the flags without bothering to - * lock the spinlock; this is generally for situations where we don't expect - * the flag bit being tested to be changing. + * lock the buffer header; this is generally for situations where we don't + * expect the flag bit being tested to be changing. * * We can't physically remove items from a disk page if another backend has * the buffer pinned. Hence, a backend may need to wait for all other pins @@ -142,13 +175,12 @@ typedef struct buftag typedef struct BufferDesc { BufferTag tag; /* ID of page contained in buffer */ - BufFlags flags; /* see bit definitions above */ - uint8 usage_count; /* usage counter for clock sweep code */ - slock_t buf_hdr_lock; /* protects a subset of fields, see above */ - unsigned refcount; /* # of backends holding pins on buffer */ - int wait_backend_pid; /* backend PID of pin-count waiter */ - int buf_id; /* buffer's index number (from 0) */ + + /* state of the tag, containing flags, refcount and usagecount */ + pg_atomic_uint32 state; + + int wait_backend_pid; /* backend PID of pin-count waiter */ int freeNext; /* link in freelist chain */ LWLock content_lock; /* to lock access to buffer contents */ @@ -202,11 +234,15 @@ extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray; #define FREENEXT_NOT_IN_LIST (-2) /* - * Macros for acquiring/releasing a shared buffer header's spinlock. - * Do not apply these to local buffers! + * Functions for acquiring/releasing a shared buffer header's spinlock. Do + * not apply these to local buffers! */ -#define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock) -#define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock) +extern uint32 LockBufHdr(BufferDesc *desc); +#define UnlockBufHdr(desc, s) \ + do { \ + pg_atomic_write_u32(&(desc)->state, (s) & (~BM_LOCKED)); \ + pg_write_barrier(); \ + } while (0) /* @@ -267,7 +303,8 @@ extern void IssuePendingWritebacks(WritebackContext *context); extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag); /* freelist.c */ -extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy); +extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, + uint32 *buf_state); extern void StrategyFreeBuffer(BufferDesc *buf); extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf); diff --git a/src/include/storage/s_lock.h b/src/include/storage/s_lock.h index 8b240cd647c0784e34b903ddfbae340c3277bfd3..11410e221b291afbbbad5375d55b3a898ad27c0b 100644 --- a/src/include/storage/s_lock.h +++ b/src/include/storage/s_lock.h @@ -991,4 +991,22 @@ extern int s_lock(volatile slock_t *lock, const char *file, int line); extern void set_spins_per_delay(int shared_spins_per_delay); extern int update_spins_per_delay(int shared_spins_per_delay); +/* + * Support for spin delay which is useful in various places where + * spinlock-like procedures take place. + */ +typedef struct +{ + int spins; + int delays; + int cur_delay; + void *ptr; + const char *file; + int line; +} SpinDelayStatus; + +#define init_spin_delay(ptr) {0, 0, 0, (ptr), __FILE__, __LINE__} +void perform_spin_delay(SpinDelayStatus *status); +void finish_spin_delay(SpinDelayStatus *status); + #endif /* S_LOCK_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e293fc0bc8d801703d05c89913bcff4aeaa4369c..cdaad2089abbc0e4ba9635938125086ecf56e9d9 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1859,6 +1859,7 @@ SpGistScanOpaqueData SpGistState SpGistTypeDesc SpecialJoinInfo +SpinDelayStatus SplitInterval SplitLR SplitVar