Skip to content
Snippets Groups Projects
Commit c4ccbcc1 authored by Thomas Munro's avatar Thomas Munro
Browse files

Move RecoveryLockList into a hash table.

Standbys frequently need to release all locks held by a given xid.
Instead of searching one big list linearly, let's create one list
per xid and put them in a hash table, so we can find what we need
in O(1) time.

Earlier analysis and a prototype were done by David Rowley, though
this isn't his patch.

Back-patch all the way.

Author: Thomas Munro
Diagnosed-by: David Rowley, Andres Freund
Reviewed-by: Andres Freund, Tom Lane, Robert Haas
Discussion: https://postgr.es/m/CAEepm%3D1mL0KiQ2KJ4yuPpLGX94a4Ns_W6TL4EGRouxWibu56pA%40mail.gmail.com
Discussion: https://postgr.es/m/CAKJS1f9vJ841HY%3DwonnLVbfkTWGYWdPN72VMxnArcGCjF3SywA%40mail.gmail.com
parent 79b5b101
No related branches found
No related tags found
No related merge requests found
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
#include "storage/procarray.h" #include "storage/procarray.h"
#include "storage/sinvaladt.h" #include "storage/sinvaladt.h"
#include "storage/standby.h" #include "storage/standby.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/ps_status.h" #include "utils/ps_status.h"
#include "utils/timeout.h" #include "utils/timeout.h"
#include "utils/timestamp.h" #include "utils/timestamp.h"
...@@ -36,7 +38,7 @@ int vacuum_defer_cleanup_age; ...@@ -36,7 +38,7 @@ int vacuum_defer_cleanup_age;
int max_standby_archive_delay = 30 * 1000; int max_standby_archive_delay = 30 * 1000;
int max_standby_streaming_delay = 30 * 1000; int max_standby_streaming_delay = 30 * 1000;
static List *RecoveryLockList; static HTAB *RecoveryLockLists;
static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
ProcSignalReason reason); ProcSignalReason reason);
...@@ -45,6 +47,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason); ...@@ -45,6 +47,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts); static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks); static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
/*
* Keep track of all the locks owned by a given transaction.
*/
typedef struct RecoveryLockListsEntry
{
TransactionId xid;
List *locks;
} RecoveryLockListsEntry;
/* /*
* InitRecoveryTransactionEnvironment * InitRecoveryTransactionEnvironment
...@@ -62,6 +72,20 @@ void ...@@ -62,6 +72,20 @@ void
InitRecoveryTransactionEnvironment(void) InitRecoveryTransactionEnvironment(void)
{ {
VirtualTransactionId vxid; VirtualTransactionId vxid;
HASHCTL hash_ctl;
/*
* Initialize the hash table for tracking the list of locks held by each
* transaction.
*/
memset(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(TransactionId);
hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
hash_ctl.hash = tag_hash;
RecoveryLockLists = hash_create("RecoveryLockLists",
64,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
/* /*
* Initialize shared invalidation management for Startup process, being * Initialize shared invalidation management for Startup process, being
...@@ -106,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void) ...@@ -106,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void)
/* Release all locks the tracked transactions were holding */ /* Release all locks the tracked transactions were holding */
StandbyReleaseAllLocks(); StandbyReleaseAllLocks();
/* Destroy the hash table of locks. */
hash_destroy(RecoveryLockLists);
RecoveryLockLists = NULL;
/* Cleanup our VirtualTransaction */ /* Cleanup our VirtualTransaction */
VirtualXactLockTableCleanup(); VirtualXactLockTableCleanup();
} }
...@@ -548,8 +576,8 @@ StandbyTimeoutHandler(void) ...@@ -548,8 +576,8 @@ StandbyTimeoutHandler(void)
* We only keep track of AccessExclusiveLocks, which are only ever held by * We only keep track of AccessExclusiveLocks, which are only ever held by
* one transaction on one relation, and don't worry about lock queuing. * one transaction on one relation, and don't worry about lock queuing.
* *
* We keep a single dynamically expandible list of locks in local memory, * We keep a hash table of lists of locks in local memory keyed by xid,
* RecoveryLockList, so we can keep track of the various entries made by * RecoveryLockLists, so we can keep track of the various entries made by
* the Startup process's virtual xid in the shared lock table. * the Startup process's virtual xid in the shared lock table.
* *
* We record the lock against the top-level xid, rather than individual * We record the lock against the top-level xid, rather than individual
...@@ -567,8 +595,10 @@ StandbyTimeoutHandler(void) ...@@ -567,8 +595,10 @@ StandbyTimeoutHandler(void)
void void
StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
{ {
RecoveryLockListsEntry *entry;
xl_standby_lock *newlock; xl_standby_lock *newlock;
LOCKTAG locktag; LOCKTAG locktag;
bool found;
/* Already processed? */ /* Already processed? */
if (!TransactionIdIsValid(xid) || if (!TransactionIdIsValid(xid) ||
...@@ -582,11 +612,19 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) ...@@ -582,11 +612,19 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
/* dbOid is InvalidOid when we are locking a shared relation. */ /* dbOid is InvalidOid when we are locking a shared relation. */
Assert(OidIsValid(relOid)); Assert(OidIsValid(relOid));
/* Create a new list for this xid, if we don't have one already. */
entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
if (!found)
{
entry->xid = xid;
entry->locks = NIL;
}
newlock = palloc(sizeof(xl_standby_lock)); newlock = palloc(sizeof(xl_standby_lock));
newlock->xid = xid; newlock->xid = xid;
newlock->dbOid = dbOid; newlock->dbOid = dbOid;
newlock->relOid = relOid; newlock->relOid = relOid;
RecoveryLockList = lappend(RecoveryLockList, newlock); entry->locks = lappend(entry->locks, newlock);
/* /*
* Attempt to acquire the lock as requested, if not resolve conflict * Attempt to acquire the lock as requested, if not resolve conflict
...@@ -599,46 +637,48 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) ...@@ -599,46 +637,48 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
} }
static void static void
StandbyReleaseLocks(TransactionId xid) StandbyReleaseLockList(List *locks)
{
ListCell *cell,
*prev,
*next;
/*
* Release all matching locks and remove them from list
*/
prev = NULL;
for (cell = list_head(RecoveryLockList); cell; cell = next)
{ {
xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); while (locks)
next = lnext(cell);
if (!TransactionIdIsValid(xid) || lock->xid == xid)
{ {
xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
LOCKTAG locktag; LOCKTAG locktag;
elog(trace_recovery(DEBUG4), elog(trace_recovery(DEBUG4),
"releasing recovery lock: xid %u db %u rel %u", "releasing recovery lock: xid %u db %u rel %u",
lock->xid, lock->dbOid, lock->relOid); lock->xid, lock->dbOid, lock->relOid);
SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid); SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
if (!LockRelease(&locktag, AccessExclusiveLock, true)) if (!LockRelease(&locktag, AccessExclusiveLock, true))
{
elog(LOG, elog(LOG,
"RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u", "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
lock->xid, lock->dbOid, lock->relOid); lock->xid, lock->dbOid, lock->relOid);
Assert(false);
RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev); }
pfree(lock); pfree(lock);
locks = list_delete_first(locks);
} }
else }
prev = cell;
static void
StandbyReleaseLocks(TransactionId xid)
{
RecoveryLockListsEntry *entry;
if (TransactionIdIsValid(xid))
{
if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
{
StandbyReleaseLockList(entry->locks);
hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
} }
} }
else
StandbyReleaseAllLocks();
}
/* /*
* Release locks for a transaction tree, starting at xid down, from * Release locks for a transaction tree, starting at xid down, from
* RecoveryLockList. * RecoveryLockLists.
* *
* Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode, * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
* to remove any AccessExclusiveLocks requested by a transaction. * to remove any AccessExclusiveLocks requested by a transaction.
...@@ -660,30 +700,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids) ...@@ -660,30 +700,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
void void
StandbyReleaseAllLocks(void) StandbyReleaseAllLocks(void)
{ {
ListCell *cell, HASH_SEQ_STATUS status;
*prev, RecoveryLockListsEntry *entry;
*next;
LOCKTAG locktag;
elog(trace_recovery(DEBUG2), "release all standby locks"); elog(trace_recovery(DEBUG2), "release all standby locks");
prev = NULL; hash_seq_init(&status, RecoveryLockLists);
for (cell = list_head(RecoveryLockList); cell; cell = next) while ((entry = hash_seq_search(&status)))
{ {
xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell); StandbyReleaseLockList(entry->locks);
hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
next = lnext(cell);
elog(trace_recovery(DEBUG4),
"releasing recovery lock: xid %u db %u rel %u",
lock->xid, lock->dbOid, lock->relOid);
SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
if (!LockRelease(&locktag, AccessExclusiveLock, true))
elog(LOG,
"RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
lock->xid, lock->dbOid, lock->relOid);
RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
pfree(lock);
} }
} }
...@@ -695,22 +721,17 @@ StandbyReleaseAllLocks(void) ...@@ -695,22 +721,17 @@ StandbyReleaseAllLocks(void)
void void
StandbyReleaseOldLocks(int nxids, TransactionId *xids) StandbyReleaseOldLocks(int nxids, TransactionId *xids)
{ {
ListCell *cell, HASH_SEQ_STATUS status;
*prev, RecoveryLockListsEntry *entry;
*next;
LOCKTAG locktag;
prev = NULL; hash_seq_init(&status, RecoveryLockLists);
for (cell = list_head(RecoveryLockList); cell; cell = next) while ((entry = hash_seq_search(&status)))
{ {
xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
bool remove = false; bool remove = false;
next = lnext(cell); Assert(TransactionIdIsValid(entry->xid));
Assert(TransactionIdIsValid(lock->xid)); if (StandbyTransactionIdIsPrepared(entry->xid))
if (StandbyTransactionIdIsPrepared(lock->xid))
remove = false; remove = false;
else else
{ {
...@@ -719,7 +740,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids) ...@@ -719,7 +740,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
for (i = 0; i < nxids; i++) for (i = 0; i < nxids; i++)
{ {
if (lock->xid == xids[i]) if (entry->xid == xids[i])
{ {
found = true; found = true;
break; break;
...@@ -735,19 +756,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids) ...@@ -735,19 +756,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
if (remove) if (remove)
{ {
elog(trace_recovery(DEBUG4), StandbyReleaseLockList(entry->locks);
"releasing recovery lock: xid %u db %u rel %u", hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
lock->xid, lock->dbOid, lock->relOid);
SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
if (!LockRelease(&locktag, AccessExclusiveLock, true))
elog(LOG,
"RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
lock->xid, lock->dbOid, lock->relOid);
RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
pfree(lock);
} }
else
prev = cell;
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment