From c4ccbcc1a2a17d547537ccecd9e856cb083c2ff7 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmunro@postgresql.org>
Date: Tue, 26 Jun 2018 18:23:36 +1200
Subject: [PATCH] Move RecoveryLockList into a hash table.

Standbys frequently need to release all locks held by a given xid.
Instead of searching one big list linearly, let's create one list
per xid and put them in a hash table, so we can find what we need
in O(1) time.

Earlier analysis and a prototype were done by David Rowley, though
this isn't his patch.

Back-patch all the way.

Author: Thomas Munro
Diagnosed-by: David Rowley, Andres Freund
Reviewed-by: Andres Freund, Tom Lane, Robert Haas
Discussion: https://postgr.es/m/CAEepm%3D1mL0KiQ2KJ4yuPpLGX94a4Ns_W6TL4EGRouxWibu56pA%40mail.gmail.com
Discussion: https://postgr.es/m/CAKJS1f9vJ841HY%3DwonnLVbfkTWGYWdPN72VMxnArcGCjF3SywA%40mail.gmail.com
---
 src/backend/storage/ipc/standby.c | 165 ++++++++++++++++--------------
 1 file changed, 88 insertions(+), 77 deletions(-)

diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index a834a72293d..6ccbe58f8f6 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -27,6 +27,8 @@
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
 #include "storage/standby.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
@@ -36,7 +38,7 @@ int			vacuum_defer_cleanup_age;
 int			max_standby_archive_delay = 30 * 1000;
 int			max_standby_streaming_delay = 30 * 1000;
 
-static List *RecoveryLockList;
+static HTAB *RecoveryLockLists;
 
 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 									   ProcSignalReason reason);
@@ -45,6 +47,14 @@ static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
 static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
 
+/*
+ * Keep track of all the locks owned by a given transaction.
+ */
+typedef struct RecoveryLockListsEntry
+{
+	TransactionId	xid;
+	List		   *locks;
+} RecoveryLockListsEntry;
 
 /*
  * InitRecoveryTransactionEnvironment
@@ -62,6 +72,20 @@ void
 InitRecoveryTransactionEnvironment(void)
 {
 	VirtualTransactionId vxid;
+	HASHCTL			hash_ctl;
+
+	/*
+	 * Initialize the hash table for tracking the list of locks held by each
+	 * transaction.
+	 */
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = sizeof(TransactionId);
+	hash_ctl.entrysize = sizeof(RecoveryLockListsEntry);
+	hash_ctl.hash = tag_hash;
+	RecoveryLockLists = hash_create("RecoveryLockLists",
+									64,
+									&hash_ctl,
+									HASH_ELEM | HASH_FUNCTION);
 
 	/*
 	 * Initialize shared invalidation management for Startup process, being
@@ -106,6 +130,10 @@ ShutdownRecoveryTransactionEnvironment(void)
 	/* Release all locks the tracked transactions were holding */
 	StandbyReleaseAllLocks();
 
+	/* Destroy the hash table of locks. */
+	hash_destroy(RecoveryLockLists);
+	RecoveryLockLists = NULL;
+
 	/* Cleanup our VirtualTransaction */
 	VirtualXactLockTableCleanup();
 }
@@ -548,8 +576,8 @@ StandbyTimeoutHandler(void)
  * We only keep track of AccessExclusiveLocks, which are only ever held by
  * one transaction on one relation, and don't worry about lock queuing.
  *
- * We keep a single dynamically expandible list of locks in local memory,
- * RecoveryLockList, so we can keep track of the various entries made by
+ * We keep a hash table of lists of locks in local memory keyed by xid,
+ * RecoveryLockLists, so we can keep track of the various entries made by
  * the Startup process's virtual xid in the shared lock table.
  *
  * We record the lock against the top-level xid, rather than individual
@@ -567,8 +595,10 @@ StandbyTimeoutHandler(void)
 void
 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 {
+	RecoveryLockListsEntry *entry;
 	xl_standby_lock *newlock;
 	LOCKTAG		locktag;
+	bool		found;
 
 	/* Already processed? */
 	if (!TransactionIdIsValid(xid) ||
@@ -582,11 +612,19 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 	/* dbOid is InvalidOid when we are locking a shared relation. */
 	Assert(OidIsValid(relOid));
 
+	/* Create a new list for this xid, if we don't have one already. */
+	entry = hash_search(RecoveryLockLists, &xid, HASH_ENTER, &found);
+	if (!found)
+	{
+		entry->xid = xid;
+		entry->locks = NIL;
+	}
+
 	newlock = palloc(sizeof(xl_standby_lock));
 	newlock->xid = xid;
 	newlock->dbOid = dbOid;
 	newlock->relOid = relOid;
-	RecoveryLockList = lappend(RecoveryLockList, newlock);
+	entry->locks = lappend(entry->locks, newlock);
 
 	/*
 	 * Attempt to acquire the lock as requested, if not resolve conflict
@@ -599,46 +637,48 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 }
 
 static void
-StandbyReleaseLocks(TransactionId xid)
+StandbyReleaseLockList(List *locks)
 {
-	ListCell   *cell,
-			   *prev,
-			   *next;
-
-	/*
-	 * Release all matching locks and remove them from list
-	 */
-	prev = NULL;
-	for (cell = list_head(RecoveryLockList); cell; cell = next)
+	while (locks)
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
+		xl_standby_lock *lock = (xl_standby_lock *) linitial(locks);
+		LOCKTAG		locktag;
+		elog(trace_recovery(DEBUG4),
+			 "releasing recovery lock: xid %u db %u rel %u",
+			 lock->xid, lock->dbOid, lock->relOid);
+		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
+		if (!LockRelease(&locktag, AccessExclusiveLock, true))
+		{
+			elog(LOG,
+				 "RecoveryLockLists contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
+				 lock->xid, lock->dbOid, lock->relOid);
+			Assert(false);
+		}
+		pfree(lock);
+		locks = list_delete_first(locks);
+	}
+}
 
-		next = lnext(cell);
+static void
+StandbyReleaseLocks(TransactionId xid)
+{
+	RecoveryLockListsEntry *entry;
 
-		if (!TransactionIdIsValid(xid) || lock->xid == xid)
+	if (TransactionIdIsValid(xid))
+	{
+		if ((entry = hash_search(RecoveryLockLists, &xid, HASH_FIND, NULL)))
 		{
-			LOCKTAG		locktag;
-
-			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
-				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
-
-			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-			pfree(lock);
+			StandbyReleaseLockList(entry->locks);
+			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
 		}
-		else
-			prev = cell;
 	}
+	else
+		StandbyReleaseAllLocks();
 }
 
 /*
  * Release locks for a transaction tree, starting at xid down, from
- * RecoveryLockList.
+ * RecoveryLockLists.
  *
  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
  * to remove any AccessExclusiveLocks requested by a transaction.
@@ -660,30 +700,16 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
 void
 StandbyReleaseAllLocks(void)
 {
-	ListCell   *cell,
-			   *prev,
-			   *next;
-	LOCKTAG		locktag;
+	HASH_SEQ_STATUS	status;
+	RecoveryLockListsEntry *entry;
 
 	elog(trace_recovery(DEBUG2), "release all standby locks");
 
-	prev = NULL;
-	for (cell = list_head(RecoveryLockList); cell; cell = next)
+	hash_seq_init(&status, RecoveryLockLists);
+	while ((entry = hash_seq_search(&status)))
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
-
-		next = lnext(cell);
-
-		elog(trace_recovery(DEBUG4),
-			 "releasing recovery lock: xid %u db %u rel %u",
-			 lock->xid, lock->dbOid, lock->relOid);
-		SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-		if (!LockRelease(&locktag, AccessExclusiveLock, true))
-			elog(LOG,
-				 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-		RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-		pfree(lock);
+		StandbyReleaseLockList(entry->locks);
+		hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
 	}
 }
 
@@ -695,22 +721,17 @@ StandbyReleaseAllLocks(void)
 void
 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 {
-	ListCell   *cell,
-			   *prev,
-			   *next;
-	LOCKTAG		locktag;
+	HASH_SEQ_STATUS status;
+	RecoveryLockListsEntry *entry;
 
-	prev = NULL;
-	for (cell = list_head(RecoveryLockList); cell; cell = next)
+	hash_seq_init(&status, RecoveryLockLists);
+	while ((entry = hash_seq_search(&status)))
 	{
-		xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
 		bool		remove = false;
 
-		next = lnext(cell);
+		Assert(TransactionIdIsValid(entry->xid));
 
-		Assert(TransactionIdIsValid(lock->xid));
-
-		if (StandbyTransactionIdIsPrepared(lock->xid))
+		if (StandbyTransactionIdIsPrepared(entry->xid))
 			remove = false;
 		else
 		{
@@ -719,7 +740,7 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 
 			for (i = 0; i < nxids; i++)
 			{
-				if (lock->xid == xids[i])
+				if (entry->xid == xids[i])
 				{
 					found = true;
 					break;
@@ -735,19 +756,9 @@ StandbyReleaseOldLocks(int nxids, TransactionId *xids)
 
 		if (remove)
 		{
-			elog(trace_recovery(DEBUG4),
-				 "releasing recovery lock: xid %u db %u rel %u",
-				 lock->xid, lock->dbOid, lock->relOid);
-			SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
-			if (!LockRelease(&locktag, AccessExclusiveLock, true))
-				elog(LOG,
-					 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
-					 lock->xid, lock->dbOid, lock->relOid);
-			RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
-			pfree(lock);
+			StandbyReleaseLockList(entry->locks);
+			hash_search(RecoveryLockLists, entry, HASH_REMOVE, NULL);
 		}
-		else
-			prev = cell;
 	}
 }
 
-- 
GitLab