diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index fab1912b1b537f370378871441159836779bed60..a7d73bbf272240050e271d9d37508f2a44ab1541 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/catalog/namespace.c,v 1.5 2002/04/01 03:34:25 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/catalog/namespace.c,v 1.6 2002/04/06 06:59:21 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -26,6 +26,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_shadow.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -33,6 +34,7 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
+#include "utils/catcache.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
 
@@ -301,6 +303,174 @@ TypenameGetTypid(const char *typname)
 	return InvalidOid;
 }
 
+/*
+ * FuncnameGetCandidates
+ *		Given a possibly-qualified function name and argument count,
+ *		retrieve a list of the possible matches.
+ *
+ * We search a single namespace if the function name is qualified, else
+ * all namespaces in the search path.  The return list will never contain
+ * multiple entries with identical argument types --- in the multiple-
+ * namespace case, we arrange for entries in earlier namespaces to mask
+ * identical entries in later namespaces.
+ */
+FuncCandidateList
+FuncnameGetCandidates(List *names, int nargs)
+{
+	FuncCandidateList resultList = NULL;
+	char	   *catalogname;
+	char	   *schemaname = NULL;
+	char	   *funcname = NULL;
+	Oid			namespaceId;
+	CatCList   *catlist;
+	int			i;
+
+	/* deconstruct the name list */
+	switch (length(names))
+	{
+		case 1:
+			funcname = strVal(lfirst(names));
+			break;
+		case 2:
+			schemaname = strVal(lfirst(names));
+			funcname = strVal(lsecond(names));
+			break;
+		case 3:
+			catalogname = strVal(lfirst(names));
+			schemaname = strVal(lsecond(names));
+			funcname = strVal(lfirst(lnext(lnext(names))));
+			/*
+			 * We check the catalog name and then ignore it.
+			 */
+			if (strcmp(catalogname, DatabaseName) != 0)
+				elog(ERROR, "Cross-database references are not implemented");
+			break;
+		default:
+			elog(ERROR, "Improper qualified name (too many dotted names)");
+			break;
+	}
+
+	if (schemaname)
+	{
+		/* use exact schema given */
+		namespaceId = GetSysCacheOid(NAMESPACENAME,
+									 CStringGetDatum(schemaname),
+									 0, 0, 0);
+		if (!OidIsValid(namespaceId))
+			elog(ERROR, "Namespace \"%s\" does not exist",
+				 schemaname);
+	}
+	else
+	{
+		/* flag to indicate we need namespace search */
+		namespaceId = InvalidOid;
+	}
+
+	/* Search syscache by name and nargs only */
+	catlist = SearchSysCacheList(PROCNAME, 2,
+								 CStringGetDatum(funcname),
+								 Int16GetDatum(nargs),
+								 0, 0);
+
+	for (i = 0; i < catlist->n_members; i++)
+	{
+		HeapTuple	proctup = &catlist->members[i]->tuple;
+		Form_pg_proc procform = (Form_pg_proc) GETSTRUCT(proctup);
+		int			pathpos = 0;
+		FuncCandidateList newResult;
+
+		if (OidIsValid(namespaceId))
+		{
+			/* Consider only procs in specified namespace */
+			if (procform->pronamespace != namespaceId)
+				continue;
+			/* No need to check args, they must all be different */
+		}
+		else
+		{
+			/* Consider only procs that are in the search path */
+			if (pathContainsSystemNamespace ||
+				procform->pronamespace != PG_CATALOG_NAMESPACE)
+			{
+				List	   *nsp;
+
+				foreach(nsp, namespaceSearchPath)
+				{
+					pathpos++;
+					if (procform->pronamespace == (Oid) lfirsti(nsp))
+						break;
+				}
+				if (nsp == NIL)
+					continue;	/* proc is not in search path */
+			}
+
+			/*
+			 * Okay, it's in the search path, but does it have the same
+			 * arguments as something we already accepted?  If so, keep
+			 * only the one that appears earlier in the search path.
+			 *
+			 * If we have an ordered list from SearchSysCacheList (the
+			 * normal case), then any conflicting proc must immediately
+			 * adjoin this one in the list, so we only need to look at
+			 * the newest result item.  If we have an unordered list,
+			 * we have to scan the whole result list.
+			 */
+			if (resultList)
+			{
+				FuncCandidateList	prevResult;
+
+				if (catlist->ordered)
+				{
+					if (memcmp(procform->proargtypes, resultList->args,
+							   nargs * sizeof(Oid)) == 0)
+						prevResult = resultList;
+					else
+						prevResult = NULL;
+				}
+				else
+				{
+					for (prevResult = resultList;
+						 prevResult;
+						 prevResult = prevResult->next)
+					{
+						if (memcmp(procform->proargtypes, prevResult->args,
+								   nargs * sizeof(Oid)) == 0)
+							break;
+					}
+				}
+				if (prevResult)
+				{
+					/* We have a match with a previous result */
+					Assert(pathpos != prevResult->pathpos);
+					if (pathpos > prevResult->pathpos)
+						continue; /* keep previous result */
+					/* replace previous result */
+					prevResult->pathpos = pathpos;
+					prevResult->oid = proctup->t_data->t_oid;
+					continue;	/* args are same, of course */
+				}
+			}
+		}
+
+		/*
+		 * Okay to add it to result list
+		 */
+		newResult = (FuncCandidateList)
+			palloc(sizeof(struct _FuncCandidateList) - sizeof(Oid)
+				   + nargs * sizeof(Oid));
+		newResult->pathpos = pathpos;
+		newResult->oid = proctup->t_data->t_oid;
+		memcpy(newResult->args, procform->proargtypes, nargs * sizeof(Oid));
+
+		newResult->next = resultList;
+		resultList = newResult;
+	}
+
+	ReleaseSysCacheList(catlist);
+
+	return resultList;
+}
+
 /*
  * QualifiedNameGetCreationNamespace
  *		Given a possibly-qualified name for an object (in List-of-Values
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index f3c8712abaed640ecb92f6e315cbd2050833aa7c..578402fd255293543a98abaa3275fa5444498b2c 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/parser/parse_func.c,v 1.123 2002/04/05 00:31:27 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/parser/parse_func.c,v 1.124 2002/04/06 06:59:22 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -18,6 +18,7 @@
 #include "access/heapam.h"
 #include "catalog/catname.h"
 #include "catalog/indexing.h"
+#include "catalog/namespace.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
@@ -40,7 +41,6 @@ static Node *ParseComplexProjection(ParseState *pstate,
 static Oid **argtype_inherit(int nargs, Oid *argtypes);
 
 static int	find_inheritors(Oid relid, Oid **supervec);
-static CandidateList func_get_candidates(char *funcname, int nargs);
 static Oid **gen_cross_product(InhPaths *arginh, int nargs);
 static void make_arguments(ParseState *pstate,
 			   int nargs,
@@ -48,14 +48,15 @@ static void make_arguments(ParseState *pstate,
 			   Oid *input_typeids,
 			   Oid *function_typeids);
 static int match_argtypes(int nargs,
-			   Oid *input_typeids,
-			   CandidateList function_typeids,
-			   CandidateList *candidates);
+						  Oid *input_typeids,
+						  FuncCandidateList function_typeids,
+						  FuncCandidateList *candidates);
 static FieldSelect *setup_field_select(Node *input, char *attname, Oid relid);
-static Oid *func_select_candidate(int nargs, Oid *input_typeids,
-					  CandidateList candidates);
-static int	agg_get_candidates(char *aggname, Oid typeId, CandidateList *candidates);
-static Oid	agg_select_candidate(Oid typeid, CandidateList candidates);
+static FuncCandidateList func_select_candidate(int nargs, Oid *input_typeids,
+								  FuncCandidateList candidates);
+static int	agg_get_candidates(char *aggname, Oid typeId,
+							   FuncCandidateList *candidates);
+static Oid	agg_select_candidate(Oid typeid, FuncCandidateList candidates);
 
 
 /*
@@ -170,7 +171,7 @@ ParseFuncOrColumn(ParseState *pstate, char *funcname, List *fargs,
 	{
 		Oid			basetype = exprType(lfirst(fargs));
 		int			ncandidates;
-		CandidateList candidates;
+		FuncCandidateList candidates;
 
 		/* try for exact match first... */
 		if (SearchSysCacheExists(AGGNAME,
@@ -374,7 +375,7 @@ ParseFuncOrColumn(ParseState *pstate, char *funcname, List *fargs,
 static int
 agg_get_candidates(char *aggname,
 				   Oid typeId,
-				   CandidateList *candidates)
+				   FuncCandidateList *candidates)
 {
 	Relation	pg_aggregate_desc;
 	SysScanDesc	pg_aggregate_scan;
@@ -398,11 +399,10 @@ agg_get_candidates(char *aggname,
 	while (HeapTupleIsValid(tup = systable_getnext(pg_aggregate_scan)))
 	{
 		Form_pg_aggregate agg = (Form_pg_aggregate) GETSTRUCT(tup);
-		CandidateList current_candidate;
-
-		current_candidate = (CandidateList) palloc(sizeof(struct _CandidateList));
-		current_candidate->args = (Oid *) palloc(sizeof(Oid));
+		FuncCandidateList current_candidate;
 
+		current_candidate = (FuncCandidateList)
+			palloc(sizeof(struct _FuncCandidateList));
 		current_candidate->args[0] = agg->aggbasetype;
 		current_candidate->next = *candidates;
 		*candidates = current_candidate;
@@ -422,10 +422,10 @@ agg_get_candidates(char *aggname,
  * if successful, else InvalidOid.
  */
 static Oid
-agg_select_candidate(Oid typeid, CandidateList candidates)
+agg_select_candidate(Oid typeid, FuncCandidateList candidates)
 {
-	CandidateList current_candidate;
-	CandidateList last_candidate;
+	FuncCandidateList current_candidate;
+	FuncCandidateList last_candidate;
 	Oid			current_typeid;
 	int			ncandidates;
 	CATEGORY	category,
@@ -498,91 +498,37 @@ agg_select_candidate(Oid typeid, CandidateList candidates)
 }	/* agg_select_candidate() */
 
 
-/* func_get_candidates()
- * get a list of all argument type vectors for which a function named
- * funcname taking nargs arguments exists
- */
-static CandidateList
-func_get_candidates(char *funcname, int nargs)
-{
-	Relation	heapRelation;
-	ScanKeyData skey[2];
-	HeapTuple	tuple;
-	SysScanDesc	funcscan;
-	CandidateList candidates = NULL;
-	int			i;
-
-	heapRelation = heap_openr(ProcedureRelationName, AccessShareLock);
-
-	ScanKeyEntryInitialize(&skey[0],
-						   (bits16) 0x0,
-						   (AttrNumber) Anum_pg_proc_proname,
-						   (RegProcedure) F_NAMEEQ,
-						   PointerGetDatum(funcname));
-	ScanKeyEntryInitialize(&skey[1],
-						   (bits16) 0x0,
-						   (AttrNumber) Anum_pg_proc_pronargs,
-						   (RegProcedure) F_INT2EQ,
-						   Int16GetDatum(nargs));
-
-	funcscan = systable_beginscan(heapRelation, ProcedureNameNspIndex, true,
-								  SnapshotNow, 2, skey);
-
-	while (HeapTupleIsValid(tuple = systable_getnext(funcscan)))
-	{
-		Form_pg_proc pgProcP = (Form_pg_proc) GETSTRUCT(tuple);
-		CandidateList current_candidate;
-
-		current_candidate = (CandidateList)
-			palloc(sizeof(struct _CandidateList));
-		current_candidate->args = (Oid *)
-			palloc(FUNC_MAX_ARGS * sizeof(Oid));
-		MemSet(current_candidate->args, 0, FUNC_MAX_ARGS * sizeof(Oid));
-		for (i = 0; i < nargs; i++)
-			current_candidate->args[i] = pgProcP->proargtypes[i];
-
-		current_candidate->next = candidates;
-		candidates = current_candidate;
-	}
-
-	systable_endscan(funcscan);
-	heap_close(heapRelation, AccessShareLock);
-
-	return candidates;
-}
-
-
 /* match_argtypes()
+ *
  * Given a list of possible typeid arrays to a function and an array of
  * input typeids, produce a shortlist of those function typeid arrays
  * that match the input typeids (either exactly or by coercion), and
- * return the number of such arrays
+ * return the number of such arrays.
+ *
+ * NB: okay to modify input list structure, as long as we find at least
+ * one match.
  */
 static int
 match_argtypes(int nargs,
 			   Oid *input_typeids,
-			   CandidateList function_typeids,
-			   CandidateList *candidates)		/* return value */
+			   FuncCandidateList function_typeids,
+			   FuncCandidateList *candidates) /* return value */
 {
-	CandidateList current_candidate;
-	CandidateList matching_candidate;
-	Oid		   *current_typeids;
+	FuncCandidateList current_candidate;
+	FuncCandidateList next_candidate;
 	int			ncandidates = 0;
 
 	*candidates = NULL;
 
 	for (current_candidate = function_typeids;
 		 current_candidate != NULL;
-		 current_candidate = current_candidate->next)
+		 current_candidate = next_candidate)
 	{
-		current_typeids = current_candidate->args;
-		if (can_coerce_type(nargs, input_typeids, current_typeids))
+		next_candidate = current_candidate->next;
+		if (can_coerce_type(nargs, input_typeids, current_candidate->args))
 		{
-			matching_candidate = (CandidateList)
-				palloc(sizeof(struct _CandidateList));
-			matching_candidate->args = current_typeids;
-			matching_candidate->next = *candidates;
-			*candidates = matching_candidate;
+			current_candidate->next = *candidates;
+			*candidates = current_candidate;
 			ncandidates++;
 		}
 	}
@@ -593,8 +539,8 @@ match_argtypes(int nargs,
 
 /* func_select_candidate()
  * Given the input argtype array and more than one candidate
- * for the function argtype array, attempt to resolve the conflict.
- * Returns the selected argtype array if the conflict can be resolved,
+ * for the function, attempt to resolve the conflict.
+ * Returns the selected candidate if the conflict can be resolved,
  * otherwise returns NULL.
  *
  * By design, this is pretty similar to oper_select_candidate in parse_oper.c.
@@ -602,13 +548,13 @@ match_argtypes(int nargs,
  * already pruned away "candidates" that aren't actually coercion-compatible
  * with the input types, whereas oper_select_candidate must do that itself.
  */
-static Oid *
+static FuncCandidateList
 func_select_candidate(int nargs,
 					  Oid *input_typeids,
-					  CandidateList candidates)
+					  FuncCandidateList candidates)
 {
-	CandidateList current_candidate;
-	CandidateList last_candidate;
+	FuncCandidateList current_candidate;
+	FuncCandidateList last_candidate;
 	Oid		   *current_typeids;
 	Oid			current_type;
 	int			i;
@@ -662,7 +608,7 @@ func_select_candidate(int nargs,
 		last_candidate->next = NULL;
 
 	if (ncandidates == 1)
-		return candidates->args;
+		return candidates;
 
 	/*
 	 * Still too many candidates? Run through all candidates and keep
@@ -709,7 +655,7 @@ func_select_candidate(int nargs,
 		last_candidate->next = NULL;
 
 	if (ncandidates == 1)
-		return candidates->args;
+		return candidates;
 
 	/*
 	 * Still too many candidates? Now look for candidates which are
@@ -755,7 +701,7 @@ func_select_candidate(int nargs,
 		last_candidate->next = NULL;
 
 	if (ncandidates == 1)
-		return candidates->args;
+		return candidates;
 
 	/*
 	 * Still too many candidates? Try assigning types for the unknown
@@ -888,7 +834,7 @@ func_select_candidate(int nargs,
 	}
 
 	if (ncandidates == 1)
-		return candidates->args;
+		return candidates;
 
 	return NULL;				/* failed to determine a unique candidate */
 }	/* func_select_candidate() */
@@ -925,22 +871,24 @@ func_get_detail(char *funcname,
 				bool *retset,	/* return value */
 				Oid **true_typeids)		/* return value */
 {
-	HeapTuple	ftup;
-	CandidateList function_typeids;
+	FuncCandidateList function_typeids;
+	FuncCandidateList best_candidate;
 
-	/* attempt to find with arguments exactly as specified... */
-	ftup = SearchSysCache(PROCNAME,
-						  PointerGetDatum(funcname),
-						  Int32GetDatum(nargs),
-						  PointerGetDatum(argtypes),
-						  0);
+	/* Get list of possible candidates from namespace search */
+	function_typeids = FuncnameGetCandidates(makeList1(makeString(funcname)), nargs);
 
-	if (HeapTupleIsValid(ftup))
+	/*
+	 * See if there is an exact match
+	 */
+	for (best_candidate = function_typeids;
+		 best_candidate != NULL;
+		 best_candidate = best_candidate->next)
 	{
-		/* given argument types are the right ones */
-		*true_typeids = argtypes;
+		if (memcmp(argtypes, best_candidate->args, nargs * sizeof(Oid)) == 0)
+			break;
 	}
-	else
+
+	if (best_candidate == NULL)
 	{
 		/*
 		 * If we didn't find an exact match, next consider the possibility
@@ -1001,10 +949,6 @@ func_get_detail(char *funcname,
 		 * didn't find an exact match, so now try to match up
 		 * candidates...
 		 */
-
-		function_typeids = func_get_candidates(funcname, nargs);
-
-		/* found something, so let's look through them... */
 		if (function_typeids != NULL)
 		{
 			Oid		  **input_typeid_vector = NULL;
@@ -1019,7 +963,7 @@ func_get_detail(char *funcname,
 
 			do
 			{
-				CandidateList current_function_typeids;
+				FuncCandidateList current_function_typeids;
 				int			ncandidates;
 
 				ncandidates = match_argtypes(nargs, current_input_typeids,
@@ -1029,13 +973,7 @@ func_get_detail(char *funcname,
 				/* one match only? then run with it... */
 				if (ncandidates == 1)
 				{
-					*true_typeids = current_function_typeids->args;
-					ftup = SearchSysCache(PROCNAME,
-										  PointerGetDatum(funcname),
-										  Int32GetDatum(nargs),
-										  PointerGetDatum(*true_typeids),
-										  0);
-					Assert(HeapTupleIsValid(ftup));
+					best_candidate = current_function_typeids;
 					break;
 				}
 
@@ -1045,25 +983,15 @@ func_get_detail(char *funcname,
 				 */
 				if (ncandidates > 1)
 				{
-					*true_typeids = func_select_candidate(nargs,
+					best_candidate = func_select_candidate(nargs,
 												   current_input_typeids,
 											   current_function_typeids);
 
-					if (*true_typeids != NULL)
-					{
-						/* was able to choose a best candidate */
-						ftup = SearchSysCache(PROCNAME,
-											  PointerGetDatum(funcname),
-											  Int32GetDatum(nargs),
-										  PointerGetDatum(*true_typeids),
-											  0);
-						Assert(HeapTupleIsValid(ftup));
-						break;
-					}
-
 					/*
-					 * otherwise, ambiguous function call, so fail by
-					 * exiting loop with ftup still NULL.
+					 * If we were able to choose a best candidate, we're
+					 * done.  Otherwise, ambiguous function call, so fail
+					 * by exiting loop with best_candidate still NULL.
+					 * Either way, we're outta here.
 					 */
 					break;
 				}
@@ -1082,11 +1010,20 @@ func_get_detail(char *funcname,
 		}
 	}
 
-	if (HeapTupleIsValid(ftup))
+	if (best_candidate)
 	{
-		Form_pg_proc pform = (Form_pg_proc) GETSTRUCT(ftup);
-
-		*funcid = ftup->t_data->t_oid;
+		HeapTuple	ftup;
+		Form_pg_proc pform;
+
+		*funcid = best_candidate->oid;
+		*true_typeids = best_candidate->args;
+
+		ftup = SearchSysCache(PROCOID,
+							  ObjectIdGetDatum(best_candidate->oid),
+							  0, 0, 0);
+		if (!HeapTupleIsValid(ftup)) /* should not happen */
+			elog(ERROR, "function %u not found", best_candidate->oid);
+		pform = (Form_pg_proc) GETSTRUCT(ftup);
 		*rettype = pform->prorettype;
 		*retset = pform->proretset;
 		ReleaseSysCache(ftup);
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index 202f447076e2a0bceeeeb243bb0f7a44a0cb8494..efcb65dbfdf13a003ee251555ba1522e5bc1a064 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.93 2002/03/26 19:16:08 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.94 2002/04/06 06:59:22 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -34,7 +34,7 @@
 #include "utils/syscache.h"
 
 
- /* #define CACHEDEBUG */	/* turns DEBUG elogs on */
+/* #define CACHEDEBUG */	/* turns DEBUG elogs on */
 
 /*
  * Constants related to size of the catcache.
@@ -98,7 +98,7 @@ static const Oid eqproc[] = {
 #define EQPROC(SYSTEMTYPEOID)	eqproc[(SYSTEMTYPEOID)-BOOLOID]
 
 
-static uint32 CatalogCacheComputeHashValue(CatCache *cache,
+static uint32 CatalogCacheComputeHashValue(CatCache *cache, int nkeys,
 							 ScanKey cur_skey);
 static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
 								  HeapTuple tuple);
@@ -106,7 +106,12 @@ static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache,
 static void CatCachePrintStats(void);
 #endif
 static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
+static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
 static void CatalogCacheInitializeCache(CatCache *cache);
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+										uint32 hashValue, Index hashIndex,
+										bool negative);
+static HeapTuple build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys);
 
 
 /*
@@ -149,16 +154,16 @@ GetCCHashFunc(Oid keytype)
  * Compute the hash value associated with a given set of lookup keys
  */
 static uint32
-CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
+CatalogCacheComputeHashValue(CatCache *cache, int nkeys, ScanKey cur_skey)
 {
 	uint32		hashValue = 0;
 
 	CACHE4_elog(DEBUG1, "CatalogCacheComputeHashValue %s %d %p",
 				cache->cc_relname,
-				cache->cc_nkeys,
+				nkeys,
 				cache);
 
-	switch (cache->cc_nkeys)
+	switch (nkeys)
 	{
 		case 4:
 			hashValue ^=
@@ -181,7 +186,7 @@ CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey)
 											   cur_skey[0].sk_argument));
 			break;
 		default:
-			elog(FATAL, "CCComputeHashValue: %d cc_nkeys", cache->cc_nkeys);
+			elog(FATAL, "CCComputeHashValue: %d nkeys", nkeys);
 			break;
 	}
 
@@ -251,7 +256,7 @@ CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple)
 			break;
 	}
 
-	return CatalogCacheComputeHashValue(cache, cur_skey);
+	return CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
 }
 
 
@@ -267,6 +272,8 @@ CatCachePrintStats(void)
 	long		cc_newloads = 0;
 	long		cc_invals = 0;
 	long		cc_discards = 0;
+	long		cc_lsearches = 0;
+	long		cc_lhits = 0;
 
 	elog(DEBUG1, "Catcache stats dump: %d/%d tuples in catcaches",
 		 CacheHdr->ch_ntup, CacheHdr->ch_maxtup);
@@ -275,7 +282,7 @@ CatCachePrintStats(void)
 	{
 		if (cache->cc_ntup == 0 && cache->cc_searches == 0)
 			continue;			/* don't print unused caches */
-		elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+		elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
 			 cache->cc_relname,
 			 cache->cc_indname,
 			 cache->cc_ntup,
@@ -287,15 +294,19 @@ CatCachePrintStats(void)
 			 cache->cc_searches - cache->cc_hits - cache->cc_neg_hits - cache->cc_newloads,
 			 cache->cc_searches - cache->cc_hits - cache->cc_neg_hits,
 			 cache->cc_invals,
-			 cache->cc_discards);
+			 cache->cc_discards,
+			 cache->cc_lsearches,
+			 cache->cc_lhits);
 		cc_searches += cache->cc_searches;
 		cc_hits += cache->cc_hits;
 		cc_neg_hits += cache->cc_neg_hits;
 		cc_newloads += cache->cc_newloads;
 		cc_invals += cache->cc_invals;
 		cc_discards += cache->cc_discards;
+		cc_lsearches += cache->cc_lsearches;
+		cc_lhits += cache->cc_lhits;
 	}
-	elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards",
+	elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards, %ld lsrch, %ld lhits",
 		 CacheHdr->ch_ntup,
 		 cc_searches,
 		 cc_hits,
@@ -305,7 +316,9 @@ CatCachePrintStats(void)
 		 cc_searches - cc_hits - cc_neg_hits - cc_newloads,
 		 cc_searches - cc_hits - cc_neg_hits,
 		 cc_invals,
-		 cc_discards);
+		 cc_discards,
+		 cc_lsearches,
+		 cc_lhits);
 }
 
 #endif /* CATCACHE_STATS */
@@ -315,6 +328,8 @@ CatCachePrintStats(void)
  *		CatCacheRemoveCTup
  *
  * Unlink and delete the given cache entry
+ *
+ * NB: if it is a member of a CatCList, the CatCList is deleted too.
  */
 static void
 CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
@@ -322,6 +337,9 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
 	Assert(ct->refcount == 0);
 	Assert(ct->my_cache == cache);
 
+	if (ct->c_list)
+		CatCacheRemoveCList(cache, ct->c_list);
+
 	/* delink from linked lists */
 	DLRemove(&ct->lrulist_elem);
 	DLRemove(&ct->cache_elem);
@@ -335,6 +353,38 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct)
 	--CacheHdr->ch_ntup;
 }
 
+/*
+ *		CatCacheRemoveCList
+ *
+ * Unlink and delete the given cache list entry
+ */
+static void
+CatCacheRemoveCList(CatCache *cache, CatCList *cl)
+{
+	int			i;
+
+	Assert(cl->refcount == 0);
+	Assert(cl->my_cache == cache);
+
+	/* delink from member tuples */
+	for (i = cl->n_members; --i >= 0; )
+	{
+		CatCTup    *ct = cl->members[i];
+
+		Assert(ct->c_list == cl);
+		ct->c_list = NULL;
+	}
+
+	/* delink from linked list */
+	DLRemove(&cl->cache_elem);
+
+	/* free associated tuple data */
+	if (cl->tuple.t_data != NULL)
+		pfree(cl->tuple.t_data);
+	pfree(cl);
+}
+
+
 /*
  *	CatalogCacheIdInvalidate
  *
@@ -385,7 +435,23 @@ CatalogCacheIdInvalidate(int cacheId,
 		 */
 
 		/*
-		 * inspect the proper hash bucket for matches
+		 * Invalidate *all* CatCLists in this cache; it's too hard to tell
+		 * which searches might still be correct, so just zap 'em all.
+		 */
+		for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+		{
+			CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+			nextelt = DLGetSucc(elt);
+
+			if (cl->refcount > 0)
+				cl->dead = true;
+			else
+				CatCacheRemoveCList(ccp, cl);
+		}
+
+		/*
+		 * inspect the proper hash bucket for tuple matches
 		 */
 		hashIndex = HASH_INDEX(hashValue, ccp->cc_nbuckets);
 
@@ -458,9 +524,38 @@ CreateCacheMemoryContext(void)
 void
 AtEOXact_CatCache(bool isCommit)
 {
+	CatCache   *ccp;
 	Dlelem	   *elt,
 			   *nextelt;
 
+	/*
+	 * First clean up CatCLists
+	 */
+	for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next)
+	{
+		for (elt = DLGetHead(&ccp->cc_lists); elt; elt = nextelt)
+		{
+			CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+			nextelt = DLGetSucc(elt);
+
+			if (cl->refcount != 0)
+			{
+				if (isCommit)
+					elog(WARNING, "Cache reference leak: cache %s (%d), list %p has count %d",
+						 ccp->cc_relname, ccp->id, cl, cl->refcount);
+				cl->refcount = 0;
+			}
+
+			/* Clean up any now-deletable dead entries */
+			if (cl->dead)
+				CatCacheRemoveCList(ccp, cl);
+		}
+	}
+
+	/*
+	 * Now clean up tuples; we can scan them all using the global LRU list
+	 */
 	for (elt = DLGetHead(&CacheHdr->ch_lrulist); elt; elt = nextelt)
 	{
 		CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -494,14 +589,26 @@ AtEOXact_CatCache(bool isCommit)
 static void
 ResetCatalogCache(CatCache *cache)
 {
+	Dlelem	   *elt,
+			   *nextelt;
 	int			i;
 
+	/* Remove each list in this cache, or at least mark it dead */
+	for (elt = DLGetHead(&cache->cc_lists); elt; elt = nextelt)
+	{
+		CatCList   *cl = (CatCList *) DLE_VAL(elt);
+
+		nextelt = DLGetSucc(elt);
+
+		if (cl->refcount > 0)
+			cl->dead = true;
+		else
+			CatCacheRemoveCList(cache, cl);
+	}
+
 	/* Remove each tuple in this cache, or at least mark it dead */
 	for (i = 0; i < cache->cc_nbuckets; i++)
 	{
-		Dlelem	   *elt,
-				   *nextelt;
-
 		for (elt = DLGetHead(&cache->cc_bucket[i]); elt; elt = nextelt)
 		{
 			CatCTup    *ct = (CatCTup *) DLE_VAL(elt);
@@ -694,7 +801,7 @@ InitCatCache(int id,
 	/*
 	 * allocate a new cache structure
 	 *
-	 * Note: we assume zeroing initializes the bucket headers correctly
+	 * Note: we assume zeroing initializes the Dllist headers correctly
 	 */
 	cp = (CatCache *) palloc(sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
 	MemSet((char *) cp, 0, sizeof(CatCache) + NCCBUCKETS * sizeof(Dllist));
@@ -965,9 +1072,8 @@ SearchCatCache(CatCache *cache,
 	Dlelem	   *elt;
 	CatCTup    *ct;
 	Relation	relation;
+	SysScanDesc	scandesc;
 	HeapTuple	ntp;
-	int			i;
-	MemoryContext oldcxt;
 
 	/*
 	 * one-time startup overhead for each cache
@@ -991,7 +1097,7 @@ SearchCatCache(CatCache *cache,
 	/*
 	 * find the hash bucket in which to look for the tuple
 	 */
-	hashValue = CatalogCacheComputeHashValue(cache, cur_skey);
+	hashValue = CatalogCacheComputeHashValue(cache, cache->cc_nkeys, cur_skey);
 	hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
 
 	/*
@@ -1040,10 +1146,8 @@ SearchCatCache(CatCache *cache,
 		{
 			ct->refcount++;
 
-#ifdef CACHEDEBUG
 			CACHE3_elog(DEBUG1, "SearchCatCache(%s): found in bucket %d",
 						cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
 			cache->cc_hits++;
@@ -1053,10 +1157,8 @@ SearchCatCache(CatCache *cache,
 		}
 		else
 		{
-#ifdef CACHEDEBUG
 			CACHE3_elog(DEBUG1, "SearchCatCache(%s): found neg entry in bucket %d",
 						cache->cc_relname, hashIndex);
-#endif   /* CACHEDEBUG */
 
 #ifdef CATCACHE_STATS
 			cache->cc_neg_hits++;
@@ -1081,187 +1183,422 @@ SearchCatCache(CatCache *cache,
 	 * cache, so there's no functional problem.  This case is rare enough
 	 * that it's not worth expending extra cycles to detect.
 	 */
+	relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+	scandesc = systable_beginscan(relation,
+								  cache->cc_indname,
+								  IndexScanOK(cache, cur_skey),
+								  SnapshotNow,
+								  cache->cc_nkeys,
+								  cur_skey);
+
+	ct = NULL;
+
+	while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+	{
+		ct = CatalogCacheCreateEntry(cache, ntp,
+									 hashValue, hashIndex,
+									 false);
+		break;					/* assume only one match */
+	}
+
+	systable_endscan(scandesc);
+
+	heap_close(relation, AccessShareLock);
 
 	/*
-	 * open the relation associated with the cache
+	 * If tuple was not found, we need to build a negative cache entry
+	 * containing a fake tuple.  The fake tuple has the correct key columns,
+	 * but nulls everywhere else.
 	 */
-	relation = heap_open(cache->cc_reloid, AccessShareLock);
+	if (ct == NULL)
+	{
+		ntp = build_dummy_tuple(cache, cache->cc_nkeys, cur_skey);
+		ct = CatalogCacheCreateEntry(cache, ntp,
+									 hashValue, hashIndex,
+									 true);
+		heap_freetuple(ntp);
+
+		CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
+					cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+		CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d",
+					cache->cc_relname, hashIndex);
+
+		/*
+		 * We are not returning the new entry to the caller, so reset its
+		 * refcount.
+		 */
+		ct->refcount = 0;		/* negative entries never have refs */
+
+		return NULL;
+	}
+
+	CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
+				cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
+	CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d",
+				cache->cc_relname, hashIndex);
+
+#ifdef CATCACHE_STATS
+	cache->cc_newloads++;
+#endif
+
+	return &ct->tuple;
+}
+
+/*
+ *	ReleaseCatCache
+ *
+ *	Decrement the reference count of a catcache entry (releasing the
+ *	hold grabbed by a successful SearchCatCache).
+ *
+ *	NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
+ *	will be freed as soon as their refcount goes to zero.  In combination
+ *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
+ *	to catch references to already-released catcache entries.
+ */
+void
+ReleaseCatCache(HeapTuple tuple)
+{
+	CatCTup    *ct = (CatCTup *) (((char *) tuple) -
+								  offsetof(CatCTup, tuple));
+
+	/* Safety checks to ensure we were handed a cache entry */
+	Assert(ct->ct_magic == CT_MAGIC);
+	Assert(ct->refcount > 0);
+
+	ct->refcount--;
+
+	if (ct->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+		&& ct->dead
+#endif
+		)
+		CatCacheRemoveCTup(ct->my_cache, ct);
+}
+
+
+/*
+ *	SearchCatCacheList
+ *
+ *		Generate a list of all tuples matching a partial key (that is,
+ *		a key specifying just the first K of the cache's N key columns).
+ *
+ *		The caller must not modify the list object or the pointed-to tuples,
+ *		and must call ReleaseCatCacheList() when done with the list.
+ */
+CatCList *
+SearchCatCacheList(CatCache *cache,
+				   int nkeys,
+				   Datum v1,
+				   Datum v2,
+				   Datum v3,
+				   Datum v4)
+{
+	ScanKeyData cur_skey[4];
+	uint32		lHashValue;
+	Dlelem	   *elt;
+	CatCList   *cl;
+	CatCTup    *ct;
+	List	   *ctlist;
+	int			nmembers;
+	Relation	relation;
+	SysScanDesc	scandesc;
+	bool		ordered;
+	HeapTuple	ntp;
+	MemoryContext oldcxt;
+	int			i;
+
+	/*
+	 * one-time startup overhead for each cache
+	 */
+	if (cache->cc_tupdesc == NULL)
+		CatalogCacheInitializeCache(cache);
+
+	Assert(nkeys > 0 && nkeys < cache->cc_nkeys);
+
+#ifdef CATCACHE_STATS
+	cache->cc_lsearches++;
+#endif
+
+	/*
+	 * initialize the search key information
+	 */
+	memcpy(cur_skey, cache->cc_skey, sizeof(cur_skey));
+	cur_skey[0].sk_argument = v1;
+	cur_skey[1].sk_argument = v2;
+	cur_skey[2].sk_argument = v3;
+	cur_skey[3].sk_argument = v4;
 
 	/*
-	 * Pre-create cache entry header, and mark no tuple found.
+	 * compute a hash value of the given keys for faster search.  We don't
+	 * presently divide the CatCList items into buckets, but this still lets
+	 * us skip non-matching items quickly most of the time.
 	 */
-	ct = (CatCTup *) MemoryContextAlloc(CacheMemoryContext, sizeof(CatCTup));
-	ct->negative = true;
+	lHashValue = CatalogCacheComputeHashValue(cache, nkeys, cur_skey);
 
 	/*
-	 * Scan the relation to find the tuple.  If there's an index, and if
-	 * it's safe to do so, use the index.  Else do a heap scan.
+	 * scan the items until we find a match or exhaust our list
 	 */
-	if ((RelationGetForm(relation))->relhasindex &&
-		!IsIgnoringSystemIndexes() &&
-		IndexScanOK(cache, cur_skey))
+	for (elt = DLGetHead(&cache->cc_lists);
+		 elt;
+		 elt = DLGetSucc(elt))
 	{
-		Relation	idesc;
-		IndexScanDesc isd;
-		RetrieveIndexResult indexRes;
-		HeapTupleData tuple;
-		Buffer		buffer;
+		bool		res;
 
-		CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing index scan",
-					cache->cc_relname);
+		cl = (CatCList *) DLE_VAL(elt);
+
+		if (cl->dead)
+			continue;			/* ignore dead entries */
+
+		if (cl->hash_value != lHashValue)
+			continue;			/* quickly skip entry if wrong hash val */
 
 		/*
-		 * For an index scan, sk_attno has to be set to the index
-		 * attribute number(s), not the heap attribute numbers.  We assume
-		 * that the index corresponds exactly to the cache keys (or its
-		 * first N keys do, anyway).
+		 * see if the cached list matches our key.
 		 */
-		for (i = 0; i < cache->cc_nkeys; ++i)
-			cur_skey[i].sk_attno = i + 1;
+		if (cl->nkeys != nkeys)
+			continue;
+		HeapKeyTest(&cl->tuple,
+					cache->cc_tupdesc,
+					nkeys,
+					cur_skey,
+					res);
+		if (!res)
+			continue;
 
-		idesc = index_openr(cache->cc_indname);
-		isd = index_beginscan(idesc, false, cache->cc_nkeys, cur_skey);
-		tuple.t_datamcxt = CurrentMemoryContext;
-		tuple.t_data = NULL;
-		while ((indexRes = index_getnext(isd, ForwardScanDirection)))
+		/*
+		 * we found a matching list: move each of its members to the front
+		 * of the global LRU list.  Also move the list itself to the front
+		 * of the cache's list-of-lists, to speed subsequent searches.
+		 * (We do not move the members to the fronts of their hashbucket
+		 * lists, however, since there's no point in that unless they are
+		 * searched for individually.)  Also bump the members' refcounts.
+		 */
+		for (i = 0; i < cl->n_members; i++)
 		{
-			tuple.t_self = indexRes->heap_iptr;
-			heap_fetch(relation, SnapshotNow, &tuple, &buffer, isd);
-			pfree(indexRes);
-			if (tuple.t_data != NULL)
-			{
-				/* Copy tuple into our context */
-				oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-				heap_copytuple_with_tuple(&tuple, &ct->tuple);
-				ct->negative = false;
-				MemoryContextSwitchTo(oldcxt);
-				ReleaseBuffer(buffer);
-				break;
-			}
+			cl->members[i]->refcount++;
+			DLMoveToFront(&cl->members[i]->lrulist_elem);
 		}
-		index_endscan(isd);
-		index_close(idesc);
+		DLMoveToFront(&cl->cache_elem);
+
+		/* Bump the list's refcount and return it */
+		cl->refcount++;
+
+		CACHE2_elog(DEBUG1, "SearchCatCacheList(%s): found list",
+					cache->cc_relname);
+
+#ifdef CATCACHE_STATS
+		cache->cc_lhits++;
+#endif
+
+		return cl;
 	}
-	else
+
+	/*
+	 * List was not found in cache, so we have to build it by reading
+	 * the relation.  For each matching tuple found in the relation,
+	 * use an existing cache entry if possible, else build a new one.
+	 */
+	relation = heap_open(cache->cc_reloid, AccessShareLock);
+
+	scandesc = systable_beginscan(relation,
+								  cache->cc_indname,
+								  true,
+								  SnapshotNow,
+								  nkeys,
+								  cur_skey);
+
+	/* The list will be ordered iff we are doing an index scan */
+	ordered = (scandesc->irel != NULL);
+
+	ctlist = NIL;
+	nmembers = 0;
+
+	while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
 	{
-		HeapScanDesc sd;
+		uint32		hashValue;
+		Index		hashIndex;
 
-		CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing heap scan",
-					cache->cc_relname);
+		/*
+		 * See if there's an entry for this tuple already.
+		 */
+		ct = NULL;
+		hashValue = CatalogCacheComputeTupleHashValue(cache, ntp);
+		hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
+
+		for (elt = DLGetHead(&cache->cc_bucket[hashIndex]);
+			 elt;
+			 elt = DLGetSucc(elt))
+		{
+			ct = (CatCTup *) DLE_VAL(elt);
+
+			if (ct->dead || ct->negative)
+				continue;			/* ignore dead and negative entries */
+
+			if (ct->hash_value != hashValue)
+				continue;			/* quickly skip entry if wrong hash val */
 
-		sd = heap_beginscan(relation, 0, SnapshotNow,
-							cache->cc_nkeys, cur_skey);
+			if (!ItemPointerEquals(&(ct->tuple.t_self), &(ntp->t_self)))
+				continue;			/* not same tuple */
 
-		ntp = heap_getnext(sd, 0);
+			/*
+			 * Found a match, but can't use it if it belongs to another list
+			 * already
+			 */
+			if (ct->c_list)
+				continue;
 
-		if (HeapTupleIsValid(ntp))
+			/* Found a match, so bump its refcount and move to front */
+			ct->refcount++;
+
+			DLMoveToFront(&ct->lrulist_elem);
+
+			break;
+		}
+
+		if (elt == NULL)
 		{
-			/* Copy tuple into our context */
-			oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-			heap_copytuple_with_tuple(ntp, &ct->tuple);
-			ct->negative = false;
-			MemoryContextSwitchTo(oldcxt);
-			/* We should not free the result of heap_getnext... */
+			/* We didn't find a usable entry, so make a new one */
+			ct = CatalogCacheCreateEntry(cache, ntp,
+										 hashValue, hashIndex,
+										 false);
 		}
 
-		heap_endscan(sd);
+		ctlist = lcons(ct, ctlist);
+		nmembers++;
 	}
 
-	/*
-	 * close the relation
-	 */
+	systable_endscan(scandesc);
+
 	heap_close(relation, AccessShareLock);
 
 	/*
-	 * scan is complete.  If tuple was not found, we need to build
-	 * a fake tuple for the negative cache entry.  The fake tuple has
-	 * the correct key columns, but nulls everywhere else.
+	 * Now we can build the CatCList entry.  First we need a dummy tuple
+	 * containing the key values...
 	 */
-	if (ct->negative)
+	ntp = build_dummy_tuple(cache, nkeys, cur_skey);
+	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+	cl = (CatCList *) palloc(sizeof(CatCList) + nmembers * sizeof(CatCTup *));
+	heap_copytuple_with_tuple(ntp, &cl->tuple);
+	MemoryContextSwitchTo(oldcxt);
+	heap_freetuple(ntp);
+
+	cl->cl_magic = CL_MAGIC;
+	cl->my_cache = cache;
+	DLInitElem(&cl->cache_elem, (void *) cl);
+	cl->refcount = 1;			/* count this first reference */
+	cl->dead = false;
+	cl->ordered = ordered;
+	cl->nkeys = nkeys;
+	cl->hash_value = lHashValue;
+	cl->n_members = nmembers;
+	/* The list is backwards because we built it with lcons */
+	for (i = nmembers; --i >= 0; )
 	{
-		TupleDesc	tupDesc = cache->cc_tupdesc;
-		Datum	   *values;
-		char	   *nulls;
-		Oid			negOid = InvalidOid;
+		cl->members[i] = ct = (CatCTup *) lfirst(ctlist);
+		Assert(ct->c_list == NULL);
+		ct->c_list = cl;
+		/* mark list dead if any members already dead */
+		if (ct->dead)
+			cl->dead = true;
+		ctlist = lnext(ctlist);
+	}
 
-		values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-		nulls = (char *) palloc(tupDesc->natts * sizeof(char));
+	DLAddHead(&cache->cc_lists, &cl->cache_elem);
 
-		memset(values, 0, tupDesc->natts * sizeof(Datum));
-		memset(nulls, 'n', tupDesc->natts * sizeof(char));
+	CACHE3_elog(DEBUG1, "SearchCatCacheList(%s): made list of %d members",
+				cache->cc_relname, nmembers);
 
-		for (i = 0; i < cache->cc_nkeys; i++)
-		{
-			int		attindex = cache->cc_key[i];
-			Datum	keyval = cur_skey[i].sk_argument;
+	return cl;
+}
 
-			if (attindex > 0)
-			{
-				/*
-				 * Here we must be careful in case the caller passed a
-				 * C string where a NAME is wanted: convert the given
-				 * argument to a correctly padded NAME.  Otherwise the
-				 * memcpy() done in heap_formtuple could fall off the
-				 * end of memory.
-				 */
-				if (cache->cc_isname[i])
-				{
-					Name	newval = (Name) palloc(NAMEDATALEN);
+/*
+ *	ReleaseCatCacheList
+ *
+ *	Decrement the reference counts of a catcache list.
+ */
+void
+ReleaseCatCacheList(CatCList *list)
+{
+	int			i;
 
-					namestrcpy(newval, DatumGetCString(keyval));
-					keyval = NameGetDatum(newval);
-				}
-				values[attindex-1] = keyval;
-				nulls[attindex-1] = ' ';
-			}
-			else
-			{
-				Assert(attindex == ObjectIdAttributeNumber);
-				negOid = DatumGetObjectId(keyval);
-			}
-		}
+	/* Safety checks to ensure we were handed a cache entry */
+	Assert(list->cl_magic == CL_MAGIC);
+	Assert(list->refcount > 0);
 
-		ntp = heap_formtuple(tupDesc, values, nulls);
+	for (i = list->n_members; --i >= 0; )
+	{
+		CatCTup    *ct = list->members[i];
 
-		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-		heap_copytuple_with_tuple(ntp, &ct->tuple);
-		ct->tuple.t_data->t_oid = negOid;
-		MemoryContextSwitchTo(oldcxt);
+		Assert(ct->refcount > 0);
 
-		heap_freetuple(ntp);
-		for (i = 0; i < cache->cc_nkeys; i++)
-		{
-			if (cache->cc_isname[i])
-				pfree(DatumGetName(values[cache->cc_key[i]-1]));
-		}
-		pfree(values);
-		pfree(nulls);
+		ct->refcount--;
+
+		if (ct->dead)
+			list->dead = true;
+		/* can't remove tuple before list is removed */
 	}
 
+	list->refcount--;
+
+	if (list->refcount == 0
+#ifndef CATCACHE_FORCE_RELEASE
+		&& list->dead
+#endif
+		)
+		CatCacheRemoveCList(list->my_cache, list);
+}
+
+
+/*
+ * CatalogCacheCreateEntry
+ *		Create a new CatCTup entry, copying the given HeapTuple and other
+ *		supplied data into it.  The new entry is given refcount 1.
+ */
+static CatCTup *
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+						uint32 hashValue, Index hashIndex, bool negative)
+{
+	CatCTup    *ct;
+	MemoryContext oldcxt;
+
+	/*
+	 * Allocate CatCTup header in cache memory, and copy the tuple there too.
+	 */
+	oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+	ct = (CatCTup *) palloc(sizeof(CatCTup));
+	heap_copytuple_with_tuple(ntp, &ct->tuple);
+	MemoryContextSwitchTo(oldcxt);
+
 	/*
-	 * Finish initializing the CatCTup header, and add it to the linked
-	 * lists.
+	 * Finish initializing the CatCTup header, and add it to the cache's
+	 * linked lists and counts.
 	 */
 	ct->ct_magic = CT_MAGIC;
 	ct->my_cache = cache;
 	DLInitElem(&ct->lrulist_elem, (void *) ct);
 	DLInitElem(&ct->cache_elem, (void *) ct);
+	ct->c_list = NULL;
 	ct->refcount = 1;			/* count this first reference */
 	ct->dead = false;
+	ct->negative = negative;
 	ct->hash_value = hashValue;
 
 	DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem);
 	DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem);
 
+	cache->cc_ntup++;
+	CacheHdr->ch_ntup++;
+
 	/*
 	 * If we've exceeded the desired size of the caches, try to throw away
 	 * the least recently used entry.  NB: the newly-built entry cannot
 	 * get thrown away here, because it has positive refcount.
 	 */
-	++cache->cc_ntup;
-	if (++CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
+	if (CacheHdr->ch_ntup > CacheHdr->ch_maxtup)
 	{
-		Dlelem	   *prevelt;
+		Dlelem	   *elt,
+				   *prevelt;
 
 		for (elt = DLGetTail(&CacheHdr->ch_lrulist); elt; elt = prevelt)
 		{
@@ -1271,7 +1608,7 @@ SearchCatCache(CatCache *cache,
 
 			if (oldct->refcount == 0)
 			{
-				CACHE2_elog(DEBUG1, "SearchCatCache(%s): Overflow, LRU removal",
+				CACHE2_elog(DEBUG1, "CatCacheCreateEntry(%s): Overflow, LRU removal",
 							cache->cc_relname);
 #ifdef CATCACHE_STATS
 				oldct->my_cache->cc_discards++;
@@ -1283,65 +1620,75 @@ SearchCatCache(CatCache *cache,
 		}
 	}
 
-	CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples",
-				cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
-
-	if (ct->negative)
-	{
-		CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d",
-					cache->cc_relname, hashIndex);
-
-		/*
-		 * We are not returning the new entry to the caller, so reset its
-		 * refcount.  Note it would be uncool to set the refcount to 0
-		 * before doing the extra-entry removal step above.
-		 */
-		ct->refcount = 0;		/* negative entries never have refs */
-
-		return NULL;
-	}
-
-	CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d",
-				cache->cc_relname, hashIndex);
-
-#ifdef CATCACHE_STATS
-	cache->cc_newloads++;
-#endif
-
-	return &ct->tuple;
+	return ct;
 }
 
 /*
- *	ReleaseCatCache()
+ * build_dummy_tuple
+ *		Generate a palloc'd HeapTuple that contains the specified key
+ *		columns, and NULLs for other columns.
  *
- *	Decrement the reference count of a catcache entry (releasing the
- *	hold grabbed by a successful SearchCatCache).
- *
- *	NOTE: if compiled with -DCATCACHE_FORCE_RELEASE then catcache entries
- *	will be freed as soon as their refcount goes to zero.  In combination
- *	with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
- *	to catch references to already-released catcache entries.
+ * This is used to store the keys for negative cache entries and CatCList
+ * entries, which don't have real tuples associated with them.
  */
-void
-ReleaseCatCache(HeapTuple tuple)
+static HeapTuple
+build_dummy_tuple(CatCache *cache, int nkeys, ScanKey skeys)
 {
-	CatCTup    *ct = (CatCTup *) (((char *) tuple) -
-								  offsetof(CatCTup, tuple));
+	HeapTuple	ntp;
+	TupleDesc	tupDesc = cache->cc_tupdesc;
+	Datum	   *values;
+	char	   *nulls;
+	Oid			tupOid = InvalidOid;
+	NameData	tempNames[4];
+	int			i;
 
-	/* Safety checks to ensure we were handed a cache entry */
-	Assert(ct->ct_magic == CT_MAGIC);
-	Assert(ct->refcount > 0);
+	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+	nulls = (char *) palloc(tupDesc->natts * sizeof(char));
 
-	ct->refcount--;
+	memset(values, 0, tupDesc->natts * sizeof(Datum));
+	memset(nulls, 'n', tupDesc->natts * sizeof(char));
 
-	if (ct->refcount == 0
-#ifndef CATCACHE_FORCE_RELEASE
-		&& ct->dead
-#endif
-		)
-		CatCacheRemoveCTup(ct->my_cache, ct);
+	for (i = 0; i < nkeys; i++)
+	{
+		int		attindex = cache->cc_key[i];
+		Datum	keyval = skeys[i].sk_argument;
+
+		if (attindex > 0)
+		{
+			/*
+			 * Here we must be careful in case the caller passed a
+			 * C string where a NAME is wanted: convert the given
+			 * argument to a correctly padded NAME.  Otherwise the
+			 * memcpy() done in heap_formtuple could fall off the
+			 * end of memory.
+			 */
+			if (cache->cc_isname[i])
+			{
+				Name	newval = &tempNames[i];
+
+				namestrcpy(newval, DatumGetCString(keyval));
+				keyval = NameGetDatum(newval);
+			}
+			values[attindex-1] = keyval;
+			nulls[attindex-1] = ' ';
+		}
+		else
+		{
+			Assert(attindex == ObjectIdAttributeNumber);
+			tupOid = DatumGetObjectId(keyval);
+		}
+	}
+
+	ntp = heap_formtuple(tupDesc, values, nulls);
+	ntp->t_data->t_oid = tupOid;
+
+	pfree(values);
+	pfree(nulls);
+
+	return ntp;
 }
 
+
 /*
  *	PrepareToInvalidateCacheTuple()
  *
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index 84cea0bb63242bfb6f863615fdfc2071a6908f3a..7e6a95a324e464a932a7855edec6c0e037057f3d 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/syscache.c,v 1.73 2002/04/05 00:31:31 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/cache/syscache.c,v 1.74 2002/04/06 06:59:23 tgl Exp $
  *
  * NOTES
  *	  These routines allow the parser/planner/executor to perform
@@ -626,3 +626,18 @@ SysCacheGetAttr(int cacheId, HeapTuple tup,
 						SysCache[cacheId]->cc_tupdesc,
 						isNull);
 }
+
+/*
+ * List-search interface
+ */
+struct catclist *
+SearchSysCacheList(int cacheId, int nkeys,
+				   Datum key1, Datum key2, Datum key3, Datum key4)
+{
+	if (cacheId < 0 || cacheId >= SysCacheSize ||
+		! PointerIsValid(SysCache[cacheId]))
+		elog(ERROR, "SearchSysCacheList: Bad cache id %d", cacheId);
+
+	return SearchCatCacheList(SysCache[cacheId], nkeys,
+							  key1, key2, key3, key4);
+}
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index a8a64bd7db1151f4ce6365e200cdb58422b0f271..4c81e78f3ffbe3699bf638798a7ec84679a46046 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: namespace.h,v 1.5 2002/04/01 03:34:27 tgl Exp $
+ * $Id: namespace.h,v 1.6 2002/04/06 06:59:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -17,6 +17,22 @@
 #include "nodes/primnodes.h"
 
 
+/*
+ *	This structure holds a list of possible functions or operators
+ *	found by namespace lookup.  Each function/operator is identified
+ *	by OID and by argument types; the list must be pruned by type
+ *	resolution rules that are embodied in the parser, not here.
+ *	The number of arguments is assumed to be known a priori.
+ */
+typedef struct _FuncCandidateList
+{
+	struct _FuncCandidateList *next;
+	int			pathpos;		/* for internal use of namespace lookup */
+	Oid			oid;			/* the function or operator's OID */
+	Oid			args[1];		/* arg types --- VARIABLE LENGTH ARRAY */
+} *FuncCandidateList;			/* VARIABLE LENGTH STRUCT */
+
+
 extern Oid	RangeVarGetRelid(const RangeVar *relation, bool failOK);
 
 extern Oid	RangeVarGetCreationNamespace(const RangeVar *newRelation);
@@ -25,6 +41,8 @@ extern Oid	RelnameGetRelid(const char *relname);
 
 extern Oid	TypenameGetTypid(const char *typname);
 
+extern FuncCandidateList FuncnameGetCandidates(List *names, int nargs);
+
 extern Oid	QualifiedNameGetCreationNamespace(List *names, char **objname_p);
 
 extern RangeVar *makeRangeVarFromNameList(List *names);
diff --git a/src/include/utils/catcache.h b/src/include/utils/catcache.h
index 8e98f41c019f2062c1bbaa87e35e0ec148deaf84..2a1a83f13b961f5b87ebe20e72fa7b553962c95d 100644
--- a/src/include/utils/catcache.h
+++ b/src/include/utils/catcache.h
@@ -13,7 +13,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catcache.h,v 1.41 2002/03/26 19:16:56 tgl Exp $
+ * $Id: catcache.h,v 1.42 2002/04/06 06:59:24 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -25,6 +25,7 @@
 
 /*
  *		struct catctup:			individual tuple in the cache.
+ *		struct catclist:		list of tuples matching a partial key.
  *		struct catcache:		information for managing a cache.
  *		struct catcacheheader:	information for managing all the caches.
  */
@@ -36,7 +37,7 @@ typedef struct catcache
 	const char *cc_relname;		/* name of relation the tuples come from */
 	const char *cc_indname;		/* name of index matching cache keys */
 	Oid			cc_reloid;		/* OID of relation the tuples come from */
-	bool		cc_relisshared; /* is relation shared? */
+	bool		cc_relisshared; /* is relation shared across databases? */
 	TupleDesc	cc_tupdesc;		/* tuple descriptor (copied from reldesc) */
 	int			cc_reloidattr;	/* AttrNumber of relation OID attr, or 0 */
 	int			cc_ntup;		/* # of tuples currently in this cache */
@@ -46,6 +47,7 @@ typedef struct catcache
 	PGFunction	cc_hashfunc[4]; /* hash function to use for each key */
 	ScanKeyData cc_skey[4];		/* precomputed key info for heap scans */
 	bool		cc_isname[4];	/* flag key columns that are NAMEs */
+	Dllist		cc_lists;		/* list of CatCList structs */
 #ifdef CATCACHE_STATS
 	long		cc_searches;	/* total # searches against this cache */
 	long		cc_hits;		/* # of matches against existing entry */
@@ -57,6 +59,8 @@ typedef struct catcache
 	 */
 	long		cc_invals;		/* # of entries invalidated from cache */
 	long		cc_discards;	/* # of entries discarded due to overflow */
+	long		cc_lsearches;	/* total # list-searches */
+	long		cc_lhits;		/* # of matches against existing lists */
 #endif
 	Dllist		cc_bucket[1];	/* hash buckets --- VARIABLE LENGTH ARRAY */
 } CatCache;						/* VARIABLE LENGTH STRUCT */
@@ -64,15 +68,25 @@ typedef struct catcache
 
 typedef struct catctup
 {
-	int			ct_magic;		/* for Assert checks */
+	int			ct_magic;		/* for identifying CatCTup entries */
 #define CT_MAGIC   0x57261502
 	CatCache   *my_cache;		/* link to owning catcache */
 
 	/*
-	 * Each tuple in a cache is a member of two lists: one lists all the
+	 * Each tuple in a cache is a member of two Dllists: one lists all the
 	 * elements in all the caches in LRU order, and the other lists just
 	 * the elements in one hashbucket of one cache, also in LRU order.
 	 *
+	 * The tuple may also be a member of at most one CatCList.  (If a single
+	 * catcache is list-searched with varying numbers of keys, we may have
+	 * to make multiple entries for the same tuple because of this
+	 * restriction.  Currently, that's not expected to be common, so we
+	 * accept the potential inefficiency.)
+	 */
+	Dlelem		lrulist_elem;	/* list member of global LRU list */
+	Dlelem		cache_elem;		/* list member of per-bucket list */
+	struct catclist *c_list;	/* containing catclist, or NULL if none */
+	/*
 	 * A tuple marked "dead" must not be returned by subsequent searches.
 	 * However, it won't be physically deleted from the cache until its
 	 * refcount goes to zero.
@@ -82,8 +96,6 @@ typedef struct catctup
 	 * so far as avoiding catalog searches is concerned.  Management of
 	 * positive and negative entries is identical.
 	 */
-	Dlelem		lrulist_elem;	/* list member of global LRU list */
-	Dlelem		cache_elem;		/* list member of per-bucket list */
 	int			refcount;		/* number of active references */
 	bool		dead;			/* dead but not yet removed? */
 	bool		negative;		/* negative cache entry? */
@@ -92,6 +104,47 @@ typedef struct catctup
 } CatCTup;
 
 
+typedef struct catclist
+{
+	int			cl_magic;		/* for identifying CatCList entries */
+#define CL_MAGIC   0x52765103
+	CatCache   *my_cache;		/* link to owning catcache */
+
+	/*
+	 * A CatCList describes the result of a partial search, ie, a search
+	 * using only the first K key columns of an N-key cache.  We form the
+	 * keys used into a tuple (with other attributes NULL) to represent
+	 * the stored key set.  The CatCList object contains links to cache
+	 * entries for all the table rows satisfying the partial key.  (Note:
+	 * none of these will be negative cache entries.)
+	 *
+	 * A CatCList is only a member of a per-cache list; we do not do
+	 * separate LRU management for CatCLists.  Instead, a CatCList is
+	 * dropped from the cache as soon as any one of its member tuples
+	 * ages out due to tuple-level LRU management.
+	 *
+	 * A list marked "dead" must not be returned by subsequent searches.
+	 * However, it won't be physically deleted from the cache until its
+	 * refcount goes to zero.  (Its member tuples must have refcounts at
+	 * least as large, so they won't go away either.)
+	 *
+	 * If "ordered" is true then the member tuples appear in the order of
+	 * the cache's underlying index.  This will be true in normal operation,
+	 * but might not be true during bootstrap or recovery operations.
+	 * (namespace.c is able to save some cycles when it is true.)
+	 */
+	Dlelem		cache_elem;		/* list member of per-catcache list */
+	int			refcount;		/* number of active references */
+	bool		dead;			/* dead but not yet removed? */
+	bool		ordered;		/* members listed in index order? */
+	short		nkeys;			/* number of lookup keys specified */
+	uint32		hash_value;		/* hash value for lookup keys */
+	HeapTupleData tuple;		/* header for tuple holding keys */
+	int			n_members;		/* number of member tuples */
+	CatCTup	   *members[1];		/* members --- VARIABLE LENGTH ARRAY */
+} CatCList;						/* VARIABLE LENGTH STRUCT */
+
+
 typedef struct catcacheheader
 {
 	CatCache   *ch_caches;		/* head of list of CatCache structs */
@@ -117,6 +170,11 @@ extern HeapTuple SearchCatCache(CatCache *cache,
 			   Datum v3, Datum v4);
 extern void ReleaseCatCache(HeapTuple tuple);
 
+extern CatCList *SearchCatCacheList(CatCache *cache, int nkeys,
+			   Datum v1, Datum v2,
+			   Datum v3, Datum v4);
+extern void ReleaseCatCacheList(CatCList *list);
+
 extern void ResetCatalogCaches(void);
 extern void CatalogCacheFlushRelation(Oid relId);
 extern void CatalogCacheIdInvalidate(int cacheId, uint32 hashValue,
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index d08dfadda49157bc2f320aa1f8b184cad28c3ac2..1d9ddd9acfd8f37173802ac48e0889947c4f0d08 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -9,7 +9,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: syscache.h,v 1.41 2002/03/29 19:06:26 tgl Exp $
+ * $Id: syscache.h,v 1.42 2002/04/06 06:59:25 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -79,4 +79,9 @@ extern Oid GetSysCacheOid(int cacheId,
 extern Datum SysCacheGetAttr(int cacheId, HeapTuple tup,
 				AttrNumber attributeNumber, bool *isNull);
 
+/* list-search interface.  Users of this must import catcache.h too */
+extern struct catclist *SearchSysCacheList(int cacheId, int nkeys,
+			   Datum key1, Datum key2, Datum key3, Datum key4);
+#define ReleaseSysCacheList(x)  ReleaseCatCacheList(x)
+
 #endif   /* SYSCACHE_H */