diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1dd31b37ffe06e67bd3756489f2b139cea6dfce7..939813e7b7177ad022dd2344a6ecec1530a68a56 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1408,9 +1408,7 @@ WALInsertLockAcquire(void) * The insertingAt value is initially set to 0, as we don't know our * insert location yet. */ - immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock, - &WALInsertLocks[MyLockNo].l.insertingAt, - 0); + immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE); if (!immed) { /* @@ -1435,26 +1433,28 @@ WALInsertLockAcquireExclusive(void) int i; /* - * When holding all the locks, we only update the last lock's insertingAt - * indicator. The others are set to 0xFFFFFFFFFFFFFFFF, which is higher - * than any real XLogRecPtr value, to make sure that no-one blocks waiting - * on those. + * When holding all the locks, all but the last lock's insertingAt + * indicator is set to 0xFFFFFFFFFFFFFFFF, which is higher than any real + * XLogRecPtr value, to make sure that no-one blocks waiting on those. */ for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++) { - LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, - &WALInsertLocks[i].l.insertingAt, - PG_UINT64_MAX); + LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE); + LWLockUpdateVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + PG_UINT64_MAX); } - LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, - &WALInsertLocks[i].l.insertingAt, - 0); + /* Variable value reset to 0 at release */ + LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE); holdingAllLocks = true; } /* * Release our insertion lock (or locks, if we're holding them all). + * + * NB: Reset all variables to 0, so they cause LWLockWaitForVar to block the + * next time the lock is acquired. */ static void WALInsertLockRelease(void) @@ -1464,13 +1464,17 @@ WALInsertLockRelease(void) int i; for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++) - LWLockRelease(&WALInsertLocks[i].l.lock); + LWLockReleaseClearVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + 0); holdingAllLocks = false; } else { - LWLockRelease(&WALInsertLocks[MyLockNo].l.lock); + LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock, + &WALInsertLocks[MyLockNo].l.insertingAt, + 0); } } diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index e5566d1b60969bc1fb03eb7503e7e35b363d1e5c..ae03eb14196c80fec7a0d042da94726a4a2e2b65 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -10,13 +10,15 @@ * locking should be done with the full lock manager --- which depends on * LWLocks to protect its shared state. * - * In addition to exclusive and shared modes, lightweight locks can be used - * to wait until a variable changes value. The variable is initially set - * when the lock is acquired with LWLockAcquireWithVar, and can be updated + * In addition to exclusive and shared modes, lightweight locks can be used to + * wait until a variable changes value. The variable is initially not set + * when the lock is acquired with LWLockAcquire, i.e. it remains set to the + * value it was set to when the lock was released last, and can be updated * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar - * waits for the variable to be updated, or until the lock is free. The - * meaning of the variable is up to the caller, the lightweight lock code - * just assigns and compares it. + * waits for the variable to be updated, or until the lock is free. When + * releasing the lock with LWLockReleaseClearVar() the value can be set to an + * appropriate value for a free lock. The meaning of the variable is up to + * the caller, the lightweight lock code just assigns and compares it. * * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -150,9 +152,6 @@ static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; -static inline bool LWLockAcquireCommon(LWLock *l, LWLockMode mode, - uint64 *valptr, uint64 val); - #ifdef LWLOCK_STATS typedef struct lwlock_stats_key { @@ -899,25 +898,7 @@ LWLockDequeueSelf(LWLock *lock) * Side effect: cancel/die interrupts are held off until lock release. */ bool -LWLockAcquire(LWLock *l, LWLockMode mode) -{ - return LWLockAcquireCommon(l, mode, NULL, 0); -} - -/* - * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val - * - * The lock is always acquired in exclusive mode with this function. - */ -bool -LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val) -{ - return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val); -} - -/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */ -static inline bool -LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) +LWLockAcquire(LWLock *lock, LWLockMode mode) { PGPROC *proc = MyProc; bool result = true; @@ -1064,10 +1045,6 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) result = false; } - /* If there's a variable associated with this lock, initialize it */ - if (valptr) - *valptr = val; - TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode); /* Add lock to list of locks held by this backend */ @@ -1258,6 +1235,71 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) return !mustwait; } +/* + * Does the lwlock in its current state need to wait for the variable value to + * change? + * + * If we don't need to wait, and it's because the value of the variable has + * changed, store the current value in newval. + * + * *result is set to true if the lock was free, and false otherwise. + */ +static bool +LWLockConflictsWithVar(LWLock *lock, + uint64 *valptr, uint64 oldval, uint64 *newval, + bool *result) +{ + bool mustwait; + uint64 value; +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); +#endif + + /* + * Test first to see if it the slot is free right now. + * + * XXX: the caller uses a spinlock before this, so we don't need a memory + * barrier here as far as the current usage is concerned. But that might + * not be safe in general. + */ + mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; + + if (!mustwait) + { + *result = true; + return false; + } + + *result = false; + + /* + * Read value using spinlock as we can't rely on atomic 64 bit + * reads/stores. TODO: On platforms with a way to do atomic 64 bit + * reads/writes the spinlock could be optimized away. + */ +#ifdef LWLOCK_STATS + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else + SpinLockAcquire(&lock->mutex); +#endif + value = *valptr; + SpinLockRelease(&lock->mutex); + + if (value != oldval) + { + mustwait = false; + *newval = value; + } + else + { + mustwait = true; + } + + return mustwait; +} + /* * LWLockWaitForVar - Wait until lock is free, or a variable is updated. * @@ -1268,11 +1310,6 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) * matches oldval, returns false and sets *newval to the current value in * *valptr. * - * It's possible that the lock holder releases the lock, but another backend - * acquires it again before we get a chance to observe that the lock was - * momentarily released. We wouldn't need to wait for the new lock holder, - * but we cannot distinguish that case, so we will have to wait. - * * Note: this function ignores shared lock holders; if the lock is held * in shared mode, returns 'true'. */ @@ -1290,16 +1327,6 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE); - /* - * Quick test first to see if it the slot is free right now. - * - * XXX: the caller uses a spinlock before this, so we don't need a memory - * barrier here as far as the current usage is concerned. But that might - * not be safe in general. - */ - if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0) - return true; - /* * Lock out cancel/die interrupts while we sleep on the lock. There is no * cleanup mechanism to remove us from the wait queue if we got @@ -1313,39 +1340,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) for (;;) { bool mustwait; - uint64 value; - - mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; - - if (mustwait) - { - /* - * Perform comparison using spinlock as we can't rely on atomic 64 - * bit reads/stores. - */ -#ifdef LWLOCK_STATS - lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); -#else - SpinLockAcquire(&lock->mutex); -#endif - /* - * XXX: We can significantly optimize this on platforms with 64bit - * atomics. - */ - value = *valptr; - if (value != oldval) - { - result = false; - mustwait = false; - *newval = value; - } - else - mustwait = true; - SpinLockRelease(&lock->mutex); - } - else - mustwait = false; + mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, + &result); if (!mustwait) break; /* the lock was free or value didn't match */ @@ -1354,7 +1351,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) * Add myself to wait queue. Note that this is racy, somebody else * could wakeup before we're finished queuing. NB: We're using nearly * the same twice-in-a-row lock acquisition protocol as - * LWLockAcquire(). Check its comments for details. + * LWLockAcquire(). Check its comments for details. The only + * difference is that we also have to check the variable's values when + * checking the state of the lock. */ LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); @@ -1365,12 +1364,13 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); /* - * We're now guaranteed to be woken up if necessary. Recheck the - * lock's state. + * We're now guaranteed to be woken up if necessary. Recheck the lock + * and variables state. */ - mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; + mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval, + &result); - /* Ok, lock is free after we queued ourselves. Undo queueing. */ + /* Ok, no conflict after we queued ourselves. Undo queueing. */ if (!mustwait) { LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue"); @@ -1587,6 +1587,31 @@ LWLockRelease(LWLock *lock) RESUME_INTERRUPTS(); } +/* + * LWLockReleaseClearVar - release a previously acquired lock, reset variable + */ +void +LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val) +{ +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else + SpinLockAcquire(&lock->mutex); +#endif + /* + * Set the variable's value before releasing the lock, that prevents race + * a race condition wherein a new locker acquires the lock, but hasn't yet + * set the variables value. + */ + *valptr = val; + SpinLockRelease(&lock->mutex); + + LWLockRelease(lock); +} + /* * LWLockReleaseAll - release all currently-held locks diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index cff3b9992218e79f3f168798a1b42bf534a10254..cbd63184b9b4c131afd91a7e3187da3f89a92c87 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -182,10 +182,10 @@ extern bool LWLockAcquire(LWLock *lock, LWLockMode mode); extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode); extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode); extern void LWLockRelease(LWLock *lock); +extern void LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val); extern void LWLockReleaseAll(void); extern bool LWLockHeldByMe(LWLock *lock); -extern bool LWLockAcquireWithVar(LWLock *lock, uint64 *valptr, uint64 val); extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval); extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value);