diff --git a/src/backend/utils/resowner/README b/src/backend/utils/resowner/README
index 2908af2fa9d10bf24a518c393936c00f11257f33..f74f1f50770e15471d0388d0a97a51f99ef7a79b 100644
--- a/src/backend/utils/resowner/README
+++ b/src/backend/utils/resowner/README
@@ -1,4 +1,4 @@
-$PostgreSQL: pgsql/src/backend/utils/resowner/README,v 1.7 2008/03/21 13:23:28 momjian Exp $
+$PostgreSQL: pgsql/src/backend/utils/resowner/README,v 1.8 2008/11/25 20:28:29 alvherre Exp $
 
 Notes About Resource Owners
 ===========================
@@ -61,11 +61,11 @@ ResourceOwner transfers lock ownership to the parent instead of actually
 releasing the lock, if isCommit is true.
 
 Currently, ResourceOwners contain direct support for recording ownership of
-buffer pins, lmgr locks, and catcache, relcache, plancache, and tupdesc
-references.  Other objects can be associated with a ResourceOwner by recording
-the address of the owning ResourceOwner in such an object.  There is an API
-for other modules to get control during ResourceOwner release, so that they
-can scan their own data structures to find the objects that need to be
+buffer pins, lmgr locks, and catcache, relcache, plancache, tupdesc, and
+snapshot references.  Other objects can be associated with a ResourceOwner by
+recording the address of the owning ResourceOwner in such an object.  There is
+an API for other modules to get control during ResourceOwner release, so that
+they can scan their own data structures to find the objects that need to be
 deleted.
 
 Whenever we are inside a transaction, the global variable
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 614ca06f3e952af2517a2f80ea037e9baf825d37..bbdfd8568e5b1e379c5cab67e63758c1aa2e25df 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -14,7 +14,7 @@
  *
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/resowner/resowner.c,v 1.29 2008/06/19 00:46:05 alvherre Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/resowner/resowner.c,v 1.30 2008/11/25 20:28:29 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -26,6 +26,7 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/resowner.h"
+#include "utils/snapmgr.h"
 
 
 /*
@@ -66,6 +67,11 @@ typedef struct ResourceOwnerData
 	int			ntupdescs;		/* number of owned tupdesc references */
 	TupleDesc  *tupdescs;		/* dynamically allocated array */
 	int			maxtupdescs;	/* currently allocated array size */
+
+	/* We have built-in support for remembering snapshot references */
+	int			nsnapshots;		/* number of owned snapshot references */
+	Snapshot   *snapshots;		/* dynamically allocated array */
+	int			maxsnapshots;	/* currently allocated array size */
 } ResourceOwnerData;
 
 
@@ -98,6 +104,7 @@ static void ResourceOwnerReleaseInternal(ResourceOwner owner,
 static void PrintRelCacheLeakWarning(Relation rel);
 static void PrintPlanCacheLeakWarning(CachedPlan *plan);
 static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
+static void PrintSnapshotLeakWarning(Snapshot snapshot);
 
 
 /*****************************************************************************
@@ -301,6 +308,13 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 				PrintTupleDescLeakWarning(owner->tupdescs[owner->ntupdescs - 1]);
 			DecrTupleDescRefCount(owner->tupdescs[owner->ntupdescs - 1]);
 		}
+		/* Ditto for snapshot references */
+		while (owner->nsnapshots > 0)
+		{
+			if (isCommit)
+				PrintSnapshotLeakWarning(owner->snapshots[owner->nsnapshots -1]);
+			UnregisterSnapshot(owner->snapshots[owner->nsnapshots -1]);
+		}
 
 		/* Clean up index scans too */
 		ReleaseResources_hash();
@@ -332,6 +346,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	Assert(owner->nrelrefs == 0);
 	Assert(owner->nplanrefs == 0);
 	Assert(owner->ntupdescs == 0);
+	Assert(owner->nsnapshots == 0);
 
 	/*
 	 * Delete children.  The recursive call will delink the child from me, so
@@ -360,6 +375,8 @@ ResourceOwnerDelete(ResourceOwner owner)
 		pfree(owner->planrefs);
 	if (owner->tupdescs)
 		pfree(owner->tupdescs);
+	if (owner->snapshots)
+		pfree(owner->snapshots);
 
 	pfree(owner);
 }
@@ -936,3 +953,85 @@ PrintTupleDescLeakWarning(TupleDesc tupdesc)
 		 "TupleDesc reference leak: TupleDesc %p (%u,%d) still referenced",
 		 tupdesc, tupdesc->tdtypeid, tupdesc->tdtypmod);
 }
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * snapshot reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeSnapshots(ResourceOwner owner)
+{
+	int			newmax;
+
+	if (owner->nsnapshots < owner->maxsnapshots)
+		return;					/* nothing to do */
+
+	if (owner->snapshots == NULL)
+	{
+		newmax = 16;
+		owner->snapshots = (Snapshot *)
+			MemoryContextAlloc(TopMemoryContext, newmax * sizeof(Snapshot));
+		owner->maxsnapshots = newmax;
+	}
+	else
+	{
+		newmax = owner->maxsnapshots * 2;
+		owner->snapshots = (Snapshot *)
+			repalloc(owner->snapshots, newmax * sizeof(Snapshot));
+		owner->maxsnapshots = newmax;
+	}
+}
+
+/*
+ * Remember that a snapshot reference is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeSnapshots()
+ */
+void
+ResourceOwnerRememberSnapshot(ResourceOwner owner, Snapshot snapshot)
+{
+	Assert(owner->nsnapshots < owner->maxsnapshots);
+	owner->snapshots[owner->nsnapshots] = snapshot;
+	owner->nsnapshots++;
+}
+
+/*
+ * Forget that a snapshot reference is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetSnapshot(ResourceOwner owner, Snapshot snapshot)
+{
+	Snapshot   *snapshots = owner->snapshots;
+	int			ns1 = owner->nsnapshots -1;
+	int			i;
+
+	for (i = ns1; i >= 0; i--)
+	{
+		if (snapshots[i] == snapshot)
+		{
+			while (i < ns1)
+			{
+				snapshots[i] = snapshots[i + 1];
+				i++;
+			}
+			owner->nsnapshots = ns1;
+			return;
+		}
+	}
+	elog(ERROR, "snapshot reference %p is not owned by resource owner %s",
+		 snapshot, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintSnapshotLeakWarning(Snapshot snapshot)
+{
+	elog(WARNING,
+		 "Snapshot reference leak: Snapshot %p still referenced",
+		 snapshot);
+}
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 75a7a0ce5dfada0a4e6031c9637ac04e46bee7f1..1107abdf27ac826a01a18ed8a7e8bc5db64f7c6d 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -2,7 +2,7 @@
  * snapmgr.c
  *		PostgreSQL snapshot manager
  *
- * We keep track of snapshots in two ways: the "registered snapshots" list,
+ * We keep track of snapshots in two ways: those "registered" by resowner.c,
  * and the "active snapshot" stack.  All snapshots in either of them live in
  * persistent memory.  When a snapshot is no longer in any of these lists
  * (tracked by separate refcounts on each snapshot), its memory can be freed.
@@ -14,15 +14,12 @@
  * anyway it should be rather uncommon to keep snapshots referenced for too
  * long.)
  *
- * Note: parts of this code could probably be replaced by appropriate use
- * of resowner.c.
- *
  *
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $PostgreSQL: pgsql/src/backend/utils/time/snapmgr.c,v 1.6 2008/10/27 22:15:05 alvherre Exp $
+ *	  $PostgreSQL: pgsql/src/backend/utils/time/snapmgr.c,v 1.7 2008/11/25 20:28:29 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -34,6 +31,7 @@
 #include "storage/procarray.h"
 #include "utils/memutils.h"
 #include "utils/memutils.h"
+#include "utils/resowner.h"
 #include "utils/snapmgr.h"
 #include "utils/tqual.h"
 
@@ -68,34 +66,10 @@ TransactionId TransactionXmin = FirstNormalTransactionId;
 TransactionId RecentXmin = FirstNormalTransactionId;
 TransactionId RecentGlobalXmin = InvalidTransactionId;
 
-/*
- * Elements of the list of registered snapshots.
- *
- * Note that we keep refcounts both here and in SnapshotData.  This is because
- * the same snapshot may be registered more than once in a subtransaction, and
- * if a subxact aborts we want to be able to subtract the correct amount of
- * counts from SnapshotData.  (Another approach would be keeping one
- * RegdSnapshotElt each time a snapshot is registered, but that seems
- * unnecessary wastage.)
- *
- * NB: the code assumes that elements in this list are in non-increasing
- * order of s_level; also, the list must be NULL-terminated.
- */
-typedef struct RegdSnapshotElt
-{
-	Snapshot	s_snap;
-	uint32		s_count;
-	int			s_level;
-	struct RegdSnapshotElt	*s_next;
-} RegdSnapshotElt;
-
 /*
  * Elements of the active snapshot stack.
  *
- * It's not necessary to keep a refcount like we do for the registered list;
- * each element here accounts for exactly one active_count on SnapshotData.
- * We cannot condense them like we do for RegdSnapshotElt because it would mess
- * up the order of entries in the stack.
+ * Each element here accounts for exactly one active_count on SnapshotData.
  *
  * NB: the code assumes that elements in this list are in non-increasing
  * order of as_level; also, the list must be NULL-terminated.
@@ -107,12 +81,18 @@ typedef struct ActiveSnapshotElt
 	struct ActiveSnapshotElt *as_next;
 } ActiveSnapshotElt;
 
-/* Head of the list of registered snapshots */
-static RegdSnapshotElt	   *RegisteredSnapshotList = NULL;
-
 /* Top of the stack of active snapshots */
 static ActiveSnapshotElt	*ActiveSnapshot = NULL;
 
+/*
+ * How many snapshots is resowner.c tracking for us?
+ *
+ * Note: for now, a simple counter is enough.  However, if we ever want to be
+ * smarter about advancing our MyProc->xmin we will need to be more
+ * sophisticated about this, perhaps keeping our own list of snapshots.
+ */
+static int				RegisteredSnapshots = 0;
+
 /* first GetTransactionSnapshot call in a transaction? */
 bool					FirstSnapshotSet = false;
 
@@ -133,7 +113,6 @@ static void	SnapshotResetXmin(void);
  * GetTransactionSnapshot
  *		Get the appropriate snapshot for a new query in a transaction.
  *
- *
  * Note that the return value may point at static storage that will be modified
  * by future calls and by CommandCounterIncrement().  Callers should call
  * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
@@ -145,6 +124,8 @@ GetTransactionSnapshot(void)
 	/* First call in transaction? */
 	if (!FirstSnapshotSet)
 	{
+		Assert(RegisteredSnapshots == 0);
+
 		CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
 		FirstSnapshotSet = true;
 
@@ -371,108 +352,47 @@ ActiveSnapshotSet(void)
 Snapshot
 RegisterSnapshot(Snapshot snapshot)
 {
-	RegdSnapshotElt	*elt;
-	RegdSnapshotElt	*newhead;
-	int		level;
+	Snapshot	snap;
 
 	if (snapshot == InvalidSnapshot)
 		return InvalidSnapshot;
 
-	level = GetCurrentTransactionNestLevel();
-
-	/*
-	 * If there's already an item in the list for the same snapshot and the
-	 * same subxact nest level, increment its refcounts.  Otherwise create a
-	 * new one.
-	 */
-	for (elt = RegisteredSnapshotList; elt != NULL; elt = elt->s_next)
-	{
-		if (elt->s_level < level)
-			break;
-
-		if (elt->s_snap == snapshot && elt->s_level == level)
-		{
-			elt->s_snap->regd_count++;
-			elt->s_count++;
-
-			return elt->s_snap;
-		}
-	}
-
-	/*
-	 * Create the new list element.  If it's not been copied into persistent
-	 * memory already, we must do so; otherwise we can just increment the
-	 * reference count.
-	 */
-	newhead = MemoryContextAlloc(TopTransactionContext, sizeof(RegdSnapshotElt));
-	newhead->s_next = RegisteredSnapshotList;
 	/* Static snapshot?  Create a persistent copy */
-	newhead->s_snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
-	newhead->s_level = level;
-	newhead->s_count = 1;
+	snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
 
-	newhead->s_snap->regd_count++;
+	/* and tell resowner.c about it */
+	ResourceOwnerEnlargeSnapshots(CurrentResourceOwner);
+	snap->regd_count++;
+	ResourceOwnerRememberSnapshot(CurrentResourceOwner, snap);
 
-	RegisteredSnapshotList = newhead;
+	RegisteredSnapshots++;
 
-	return RegisteredSnapshotList->s_snap;
+	return snap;
 }
 
 /*
  * UnregisterSnapshot
- * 		Signals that a snapshot is no longer necessary
  *
- * If both reference counts fall to zero, the snapshot memory is released.
- * If only the registered list refcount falls to zero, just the list element is
- * freed.
+ * Decrement the reference count of a snapshot, remove the corresponding
+ * reference from CurrentResourceOwner, and free the snapshot if no more
+ * references remain.
  */
 void
 UnregisterSnapshot(Snapshot snapshot)
 {
-	RegdSnapshotElt	*prev = NULL;
-	RegdSnapshotElt	*elt;
-	bool		found = false;
-
-	if (snapshot == InvalidSnapshot)
+	if (snapshot == NULL)
 		return;
 
-	for (elt = RegisteredSnapshotList; elt != NULL; elt = elt->s_next)
-	{
-		if (elt->s_snap == snapshot)
-		{
-			Assert(elt->s_snap->regd_count > 0);
-			Assert(elt->s_count > 0);
+	Assert(snapshot->regd_count > 0);
+	Assert(RegisteredSnapshots > 0);
 
-			elt->s_snap->regd_count--;
-			elt->s_count--;
-			found = true;
-
-			if (elt->s_count == 0)
-			{
-				/* delink it from the registered snapshot list */
-				if (prev)
-					prev->s_next = elt->s_next;
-				else
-					RegisteredSnapshotList = elt->s_next;
-
-				/* free the snapshot itself if it's no longer relevant */
-				if (elt->s_snap->regd_count == 0 && elt->s_snap->active_count == 0)
-					FreeSnapshot(elt->s_snap);
-
-				/* and free the list element */
-				pfree(elt);
-			}
-
-			break;
-		}
-
-		prev = elt;
+	ResourceOwnerForgetSnapshot(CurrentResourceOwner, snapshot);
+	RegisteredSnapshots--;
+	if (--snapshot->regd_count == 0 && snapshot->active_count == 0)
+	{
+		FreeSnapshot(snapshot);
+		SnapshotResetXmin();
 	}
-
-	if (!found)
-		elog(WARNING, "unregistering failed for snapshot %p", snapshot);
-
-	SnapshotResetXmin();
 }
 
 /*
@@ -485,7 +405,7 @@ UnregisterSnapshot(Snapshot snapshot)
 static void
 SnapshotResetXmin(void)
 {
-	if (RegisteredSnapshotList == NULL && ActiveSnapshot == NULL)
+	if (RegisteredSnapshots == 0 && ActiveSnapshot == NULL)
 		MyProc->xmin = InvalidTransactionId;
 }
 
@@ -496,7 +416,6 @@ void
 AtSubCommit_Snapshot(int level)
 {
 	ActiveSnapshotElt	*active;
-	RegdSnapshotElt	*regd;
 
 	/*
 	 * Relabel the active snapshots set in this subtransaction as though they
@@ -508,20 +427,6 @@ AtSubCommit_Snapshot(int level)
 			break;
 		active->as_level = level - 1;
 	}
-
-	/*
-	 * Reassign all registered snapshots to the parent subxact.
-	 *
-	 * Note: this code is somewhat bogus in that we could end up with multiple
-	 * entries for the same snapshot and the same subxact level (my parent's
-	 * level).  Cleaning that up is more trouble than it's currently worth,
-	 * however.
-	 */
-	for (regd = RegisteredSnapshotList; regd != NULL; regd = regd->s_next)
-	{
-		if (regd->s_level == level)
-			regd->s_level--;
-	}
 }
 
 /*
@@ -531,9 +436,6 @@ AtSubCommit_Snapshot(int level)
 void
 AtSubAbort_Snapshot(int level)
 {
-	RegdSnapshotElt	*prev;
-	RegdSnapshotElt	*regd;
-
 	/* Forget the active snapshots set by this subtransaction */
 	while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
 	{
@@ -558,39 +460,6 @@ AtSubAbort_Snapshot(int level)
 		ActiveSnapshot = next;
 	}
 
-	/* Unregister all snapshots registered during this subtransaction */
-	prev = NULL;
-	for (regd = RegisteredSnapshotList; regd != NULL; )
-	{
-		if (regd->s_level >= level)
-		{
-			RegdSnapshotElt	*tofree;
-
-			if (prev)
-				prev->s_next = regd->s_next;
-			else
-				RegisteredSnapshotList = regd->s_next;
-
-			tofree = regd;
-			regd = regd->s_next;
-
-			tofree->s_snap->regd_count -= tofree->s_count;
-
-			/* free the snapshot if possible */
-			if (tofree->s_snap->regd_count == 0 &&
-				tofree->s_snap->active_count == 0)
-				FreeSnapshot(tofree->s_snap);
-
-			/* and free the list element */
-			pfree(tofree);
-		}
-		else
-		{
-			prev = regd;
-			regd = regd->s_next;
-		}
-	}
-
 	SnapshotResetXmin();
 }
 
@@ -605,7 +474,6 @@ AtEOXact_Snapshot(bool isCommit)
 	if (isCommit)
 	{
 		ActiveSnapshotElt	*active;
-		RegdSnapshotElt	*regd;
 
 		/*
 		 * On a serializable snapshot we must first unregister our private
@@ -614,16 +482,13 @@ AtEOXact_Snapshot(bool isCommit)
 		if (registered_serializable)
 			UnregisterSnapshot(CurrentSnapshot);
 
+		if (RegisteredSnapshots != 0)
+			elog(WARNING, "%d registered snapshots seem to remain after cleanup",
+				 RegisteredSnapshots);
+
 		/* complain about unpopped active snapshots */
 		for (active = ActiveSnapshot; active != NULL; active = active->as_next)
 			elog(WARNING, "snapshot %p still active", active);
-
-		/* complain about any unregistered snapshot */
-		for (regd = RegisteredSnapshotList; regd != NULL; regd = regd->s_next)
-			elog(WARNING,
-				 "snapshot %p not destroyed at commit (%d regd refs, %d active refs)",
-				 regd->s_snap, regd->s_snap->regd_count,
-				 regd->s_snap->active_count);
 	}
 
 	/*
@@ -631,7 +496,7 @@ AtEOXact_Snapshot(bool isCommit)
 	 * it'll go away with TopTransactionContext.
 	 */
 	ActiveSnapshot = NULL;
-	RegisteredSnapshotList = NULL;
+	RegisteredSnapshots = 0;
 
 	CurrentSnapshot = NULL;
 	SecondarySnapshot = NULL;
diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h
index 1369aa7f683846c2aa6f2a341d5ecbbc8a41e0ec..9d738cb8e1e13c92caede2b74f44880fcaca4ebb 100644
--- a/src/include/utils/resowner.h
+++ b/src/include/utils/resowner.h
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/resowner.h,v 1.15 2008/01/01 19:45:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/utils/resowner.h,v 1.16 2008/11/25 20:28:29 alvherre Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -22,6 +22,7 @@
 #include "storage/buf.h"
 #include "utils/catcache.h"
 #include "utils/plancache.h"
+#include "utils/snapshot.h"
 
 
 /*
@@ -121,4 +122,11 @@ extern void ResourceOwnerRememberTupleDesc(ResourceOwner owner,
 extern void ResourceOwnerForgetTupleDesc(ResourceOwner owner,
 							 TupleDesc tupdesc);
 
+/* support for snapshot refcount management */
+extern void ResourceOwnerEnlargeSnapshots(ResourceOwner owner);
+extern void ResourceOwnerRememberSnapshot(ResourceOwner owner,
+							  Snapshot snapshot);
+extern void ResourceOwnerForgetSnapshot(ResourceOwner owner,
+							Snapshot snapshot);
+
 #endif   /* RESOWNER_H */