diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 7e518f4dde5e9318aecba4ac71a32c9c9d56a3b8..ed144bf27e330bdde748f902cac86215156880af 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -2070,19 +2070,39 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) { LWLockId partitionLock = FirstLockMgrLock + partition; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); + PROCLOCK *nextplock; - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - - if (!proclock) + /* + * If the proclock list for this partition is empty, we can skip + * acquiring the partition lock. This optimization is trickier than + * it looks, because another backend could be in process of adding + * something to our proclock list due to promoting one of our + * fast-path locks. However, any such lock must be one that we + * decided not to delete above, so it's okay to skip it again now; + * we'd just decide not to delete it again. We must, however, be + * careful to re-fetch the list header once we've acquired the + * partition lock, to be sure we have a valid, up-to-date pointer. + * (There is probably no significant risk if pointer fetch/store is + * atomic, but we don't wish to assume that.) + * + * XXX This argument assumes that the locallock table correctly + * represents all of our fast-path locks. While allLocks mode + * guarantees to clean up all of our normal locks regardless of the + * locallock situation, we lose that guarantee for fast-path locks. + * This is not ideal. + */ + if (SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)) == NULL) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - while (proclock) + for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + proclock; + proclock = nextplock) { bool wakeupNeeded = false; - PROCLOCK *nextplock; /* Get link first, since we may unlink/delete this proclock */ nextplock = (PROCLOCK *) @@ -2095,7 +2115,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) /* Ignore items that are not of the lockmethod to be removed */ if (LOCK_LOCKMETHOD(*lock) != lockmethodid) - goto next_item; + continue; /* * In allLocks mode, force release of all locks even if locallock @@ -2111,7 +2131,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) * holdMask == 0 and are therefore recyclable */ if (proclock->releaseMask == 0 && proclock->holdMask != 0) - goto next_item; + continue; PROCLOCK_PRINT("LockReleaseAll", proclock); LOCK_PRINT("LockReleaseAll", lock, 0); @@ -2140,9 +2160,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) lockMethodTable, LockTagHashCode(&lock->tag), wakeupNeeded); - - next_item: - proclock = nextplock; } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); @@ -3074,18 +3091,27 @@ PostPrepare_Locks(TransactionId xid) { LWLockId partitionLock = FirstLockMgrLock + partition; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); + PROCLOCK *nextplock; - proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, - offsetof(PROCLOCK, procLink)); - - if (!proclock) + /* + * If the proclock list for this partition is empty, we can skip + * acquiring the partition lock. This optimization is safer than the + * situation in LockReleaseAll, because we got rid of any fast-path + * locks during AtPrepare_Locks, so there cannot be any case where + * another backend is adding something to our lists now. For safety, + * though, we code this the same way as in LockReleaseAll. + */ + if (SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)) == NULL) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - while (proclock) + for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + proclock; + proclock = nextplock) { - PROCLOCK *nextplock; LOCKMASK holdMask; PROCLOCK *newproclock; @@ -3100,7 +3126,7 @@ PostPrepare_Locks(TransactionId xid) /* Ignore VXID locks */ if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION) - goto next_item; + continue; PROCLOCK_PRINT("PostPrepare_Locks", proclock); LOCK_PRINT("PostPrepare_Locks", lock, 0); @@ -3111,7 +3137,7 @@ PostPrepare_Locks(TransactionId xid) /* Ignore it if nothing to release (must be a session lock) */ if (proclock->releaseMask == 0) - goto next_item; + continue; /* Else we should be releasing all locks */ if (proclock->releaseMask != proclock->holdMask) @@ -3175,9 +3201,6 @@ PostPrepare_Locks(TransactionId xid) */ Assert((newproclock->holdMask & holdMask) == 0); newproclock->holdMask |= holdMask; - - next_item: - proclock = nextplock; } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); @@ -3874,7 +3897,7 @@ VirtualXactLockTableInsert(VirtualTransactionId vxid) * unblocking waiters. */ void -VirtualXactLockTableCleanup() +VirtualXactLockTableCleanup(void) { bool fastpath; LocalTransactionId lxid;