diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 51f1c63d70498e616dc4be03a598ed9179240fe3..adcf9bc6cb6bb80a570dc75b29c5916edcdc5bf1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.130 2002/03/02 21:39:17 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.131 2002/03/03 17:47:53 tgl Exp $ * * * INTERFACE ROUTINES @@ -1131,12 +1131,12 @@ heap_insert(Relation relation, HeapTuple tup) WriteBuffer(buffer); /* - * If tuple is cachable, mark it for rollback from the caches in case + * If tuple is cachable, mark it for invalidation from the caches in case * we abort. Note it is OK to do this after WriteBuffer releases the * buffer, because the "tup" data structure is all in local memory, * not in the shared buffer. */ - RelationMark4RollbackHeapTuple(relation, tup); + CacheInvalidateHeapTuple(relation, tup); return tup->t_data->t_oid; } @@ -1278,7 +1278,7 @@ l1: * look at the contents of the tuple, so we need to hold our refcount * on the buffer. */ - RelationInvalidateHeapTuple(relation, &tp); + CacheInvalidateHeapTuple(relation, &tp); WriteBuffer(buffer); @@ -1585,19 +1585,19 @@ l2: * boundary. We have to do this before WriteBuffer because we need to * look at the contents of the tuple, so we need to hold our refcount. */ - RelationInvalidateHeapTuple(relation, &oldtup); + CacheInvalidateHeapTuple(relation, &oldtup); if (newbuf != buffer) WriteBuffer(newbuf); WriteBuffer(buffer); /* - * If new tuple is cachable, mark it for rollback from the caches in + * If new tuple is cachable, mark it for invalidation from the caches in * case we abort. Note it is OK to do this after WriteBuffer releases * the buffer, because the "newtup" data structure is all in local * memory, not in the shared buffer. */ - RelationMark4RollbackHeapTuple(relation, newtup); + CacheInvalidateHeapTuple(relation, newtup); return HeapTupleMayBeUpdated; } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index ad192dca5082372ad8a74ac650ce74b088bedea7..e14abe656769a7a76cd450e09fac301adde82633 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.182 2002/02/19 20:11:11 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.183 2002/03/03 17:47:54 tgl Exp $ * * * INTERFACE ROUTINES @@ -57,6 +57,7 @@ #include "storage/smgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/relcache.h" #include "utils/syscache.h" @@ -75,10 +76,10 @@ static void RelationRemoveIndexes(Relation relation); static void RelationRemoveInheritance(Relation relation); static void AddNewRelationType(char *typeName, Oid new_rel_oid, Oid new_type_oid); -static void StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, - bool updatePgAttribute); +static void StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin); static void StoreRelCheck(Relation rel, char *ccname, char *ccbin); -static void StoreConstraints(Relation rel); +static void StoreConstraints(Relation rel, TupleDesc tupdesc); +static void SetRelationNumChecks(Relation rel, int numchecks); static void RemoveConstraints(Relation rel); static void RemoveStatistics(Relation rel); @@ -202,9 +203,6 @@ SystemAttributeByName(const char *attname, bool relhasoids) * * Remove the system relation specific code to elsewhere eventually. * - * Eventually, must place information about this temporary relation - * into the transaction context block. - * * NOTE: if istemp is TRUE then heap_create will overwrite relname with * the unique "real" name chosen for the temp relation. * @@ -799,9 +797,16 @@ heap_create_with_catalog(char *relname, * now add tuples to pg_attribute for the attributes in our new * relation. */ - AddNewAttributeTuples(new_rel_oid, tupdesc, relhasoids); + AddNewAttributeTuples(new_rel_oid, new_rel_desc->rd_att, relhasoids); - StoreConstraints(new_rel_desc); + /* + * store constraints and defaults passed in the tupdesc, if any. + * + * NB: this may do a CommandCounterIncrement and rebuild the relcache + * entry, so the relation must be valid and self-consistent at this point. + * In particular, there are not yet constraints and defaults anywhere. + */ + StoreConstraints(new_rel_desc, tupdesc); /* * We create the disk file for this relation here @@ -922,8 +927,6 @@ RelationRemoveIndexes(Relation relation) Oid indexoid = lfirsti(indexoidscan); index_drop(indexoid); - /* advance cmd counter to make catalog changes visible */ - CommandCounterIncrement(); } freeList(indexoidlist); @@ -1377,12 +1380,9 @@ heap_drop_with_catalog(const char *relname, /* * Store a default expression for column attnum of relation rel. * The expression must be presented as a nodeToString() string. - * If updatePgAttribute is true, update the pg_attribute entry - * for the column to show that a default exists. */ static void -StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, - bool updatePgAttribute) +StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin) { Node *expr; char *adsrc; @@ -1429,9 +1429,10 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin, heap_freetuple(tuple); pfree(adsrc); - if (!updatePgAttribute) - return; /* done if pg_attribute is OK */ - + /* + * Update the pg_attribute entry for the column to show that a default + * exists. + */ attrrel = heap_openr(AttributeRelationName, RowExclusiveLock); atttup = SearchSysCacheCopy(ATTNUM, ObjectIdGetDatum(RelationGetRelid(rel)), @@ -1516,33 +1517,35 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin) * NOTE: only pre-cooked expressions will be passed this way, which is to * say expressions inherited from an existing relation. Newly parsed * expressions can be added later, by direct calls to StoreAttrDefault - * and StoreRelCheck (see AddRelationRawConstraints()). We assume that - * pg_attribute and pg_class entries for the relation were already set - * to reflect the existence of these defaults/constraints. + * and StoreRelCheck (see AddRelationRawConstraints()). */ static void -StoreConstraints(Relation rel) +StoreConstraints(Relation rel, TupleDesc tupdesc) { - TupleConstr *constr = rel->rd_att->constr; + TupleConstr *constr = tupdesc->constr; int i; if (!constr) - return; + return; /* nothing to do */ /* - * deparsing of constraint expressions will fail unless the + * Deparsing of constraint expressions will fail unless the * just-created pg_attribute tuples for this relation are made - * visible. So, bump the command counter. + * visible. So, bump the command counter. CAUTION: this will + * cause a relcache entry rebuild. */ CommandCounterIncrement(); for (i = 0; i < constr->num_defval; i++) StoreAttrDefault(rel, constr->defval[i].adnum, - constr->defval[i].adbin, false); + constr->defval[i].adbin); for (i = 0; i < constr->num_check; i++) StoreRelCheck(rel, constr->check[i].ccname, constr->check[i].ccbin); + + if (constr->num_check > 0) + SetRelationNumChecks(rel, constr->num_check); } /* @@ -1580,10 +1583,6 @@ AddRelationRawConstraints(Relation rel, RangeTblEntry *rte; int numchecks; List *listptr; - Relation relrel; - Relation relidescs[Num_pg_class_indices]; - HeapTuple reltup; - Form_pg_class relStruct; /* * Get info about existing constraints. @@ -1681,7 +1680,7 @@ AddRelationRawConstraints(Relation rel, /* * OK, store it. */ - StoreAttrDefault(rel, colDef->attnum, nodeToString(expr), true); + StoreAttrDefault(rel, colDef->attnum, nodeToString(expr)); } /* @@ -1839,9 +1838,29 @@ AddRelationRawConstraints(Relation rel, * We do this even if there was no change, in order to ensure that an * SI update message is sent out for the pg_class tuple, which will * force other backends to rebuild their relcache entries for the rel. - * (Of course, for a newly created rel there is no need for an SI - * message, but for ALTER TABLE ADD ATTRIBUTE this'd be important.) + * (This is critical if we added defaults but not constraints.) */ + SetRelationNumChecks(rel, numchecks); +} + +/* + * Update the count of constraints in the relation's pg_class tuple. + * + * Caller had better hold exclusive lock on the relation. + * + * An important side effect is that a SI update message will be sent out for + * the pg_class tuple, which will force other backends to rebuild their + * relcache entries for the rel. Also, this backend will rebuild its + * own relcache entry at the next CommandCounterIncrement. + */ +static void +SetRelationNumChecks(Relation rel, int numchecks) +{ + Relation relrel; + HeapTuple reltup; + Form_pg_class relStruct; + Relation relidescs[Num_pg_class_indices]; + relrel = heap_openr(RelationRelationName, RowExclusiveLock); reltup = SearchSysCacheCopy(RELOID, ObjectIdGetDatum(RelationGetRelid(rel)), @@ -1851,22 +1870,30 @@ AddRelationRawConstraints(Relation rel, RelationGetRelid(rel)); relStruct = (Form_pg_class) GETSTRUCT(reltup); - relStruct->relchecks = numchecks; + if (relStruct->relchecks != numchecks) + { + relStruct->relchecks = numchecks; - simple_heap_update(relrel, &reltup->t_self, reltup); + simple_heap_update(relrel, &reltup->t_self, reltup); - /* keep catalog indices current */ - CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, - relidescs); - CatalogIndexInsert(relidescs, Num_pg_class_indices, relrel, reltup); - CatalogCloseIndices(Num_pg_class_indices, relidescs); + /* keep catalog indices current */ + CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, + relidescs); + CatalogIndexInsert(relidescs, Num_pg_class_indices, relrel, reltup); + CatalogCloseIndices(Num_pg_class_indices, relidescs); + } + else + { + /* Skip the disk update, but force relcache inval anyway */ + CacheInvalidateRelcache(RelationGetRelid(rel)); + } heap_freetuple(reltup); heap_close(relrel, RowExclusiveLock); } static void -RemoveAttrDefault(Relation rel) +RemoveAttrDefaults(Relation rel) { Relation adrel; HeapScanDesc adscan; @@ -1889,7 +1916,7 @@ RemoveAttrDefault(Relation rel) } static void -RemoveRelCheck(Relation rel) +RemoveRelChecks(Relation rel) { Relation rcrel; HeapScanDesc rcscan; @@ -1923,9 +1950,6 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) { Oid relid; Relation rcrel; - Relation relrel; - Relation inhrel; - Relation relidescs[Num_pg_class_indices]; TupleDesc tupleDesc; TupleConstr *oldconstr; int numoldchecks; @@ -1933,8 +1957,6 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) HeapScanDesc rcscan; ScanKeyData key[2]; HeapTuple rctup; - HeapTuple reltup; - Form_pg_class relStruct; int rel_deleted = 0; int all_deleted = 0; @@ -1960,6 +1982,7 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) foreach(child, children) { Oid childrelid = lfirsti(child); + Relation inhrel; if (childrelid == relid) continue; @@ -1969,7 +1992,17 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) } } - /* Grab an exclusive lock on the pg_relcheck relation */ + /* + * Get number of existing constraints. + */ + tupleDesc = RelationGetDescr(rel); + oldconstr = tupleDesc->constr; + if (oldconstr) + numoldchecks = oldconstr->num_check; + else + numoldchecks = 0; + + /* Grab an appropriate lock on the pg_relcheck relation */ rcrel = heap_openr(RelCheckRelationName, RowExclusiveLock); /* @@ -2002,60 +2035,21 @@ RemoveCheckConstraint(Relation rel, const char *constrName, bool inh) /* Clean up after the scan */ heap_endscan(rcscan); + heap_close(rcrel, RowExclusiveLock); - /* - * Update the count of constraints in the relation's pg_class tuple. - * We do this even if there was no change, in order to ensure that an - * SI update message is sent out for the pg_class tuple, which will - * force other backends to rebuild their relcache entries for the rel. - * (Of course, for a newly created rel there is no need for an SI - * message, but for ALTER TABLE ADD ATTRIBUTE this'd be important.) - */ - - /* - * Get number of existing constraints. - */ - - tupleDesc = RelationGetDescr(rel); - oldconstr = tupleDesc->constr; - if (oldconstr) - numoldchecks = oldconstr->num_check; - else - numoldchecks = 0; - - /* Calculate the new number of checks in the table, fail if negative */ - numchecks = numoldchecks - rel_deleted; - - if (numchecks < 0) - elog(ERROR, "check count became negative"); - - relrel = heap_openr(RelationRelationName, RowExclusiveLock); - reltup = SearchSysCacheCopy(RELOID, - ObjectIdGetDatum(RelationGetRelid(rel)), 0, 0, 0); - - if (!HeapTupleIsValid(reltup)) - elog(ERROR, "cache lookup of relation %u failed", - RelationGetRelid(rel)); - relStruct = (Form_pg_class) GETSTRUCT(reltup); - - relStruct->relchecks = numchecks; - - simple_heap_update(relrel, &reltup->t_self, reltup); - - /* Keep catalog indices current */ - CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, - relidescs); - CatalogIndexInsert(relidescs, Num_pg_class_indices, relrel, reltup); - CatalogCloseIndices(Num_pg_class_indices, relidescs); - - /* Clean up after the scan */ - heap_freetuple(reltup); - heap_close(relrel, RowExclusiveLock); + if (rel_deleted) + { + /* + * Update the count of constraints in the relation's pg_class tuple. + */ + numchecks = numoldchecks - rel_deleted; + if (numchecks < 0) + elog(ERROR, "check count became negative"); - /* Close the heap relation */ - heap_close(rcrel, RowExclusiveLock); + SetRelationNumChecks(rel, numchecks); + } - /* Return the number of tuples deleted */ + /* Return the number of tuples deleted, including all children */ return all_deleted; } @@ -2068,10 +2062,10 @@ RemoveConstraints(Relation rel) return; if (constr->num_defval > 0) - RemoveAttrDefault(rel); + RemoveAttrDefaults(rel); if (constr->num_check > 0) - RemoveRelCheck(rel); + RemoveRelChecks(rel); } static void diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index bd6bd1de38aa56675c768a17bae1b2c761fc3af9..ce63a0fd548f35d1ff6e30bb0b2565c46c22a8db 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.172 2002/02/19 20:11:11 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.173 2002/03/03 17:47:54 tgl Exp $ * * * INTERFACE ROUTINES @@ -800,19 +800,6 @@ index_drop(Oid indexId) simple_heap_delete(relationRelation, &tuple->t_self); heap_freetuple(tuple); - /* - * Update the pg_class tuple for the owning relation. We are - * presently too lazy to attempt to compute the new correct value of - * relhasindex (the next VACUUM will fix it if necessary). But we - * must send out a shared-cache-inval notice on the owning relation to - * ensure other backends update their relcache lists of indexes. So, - * unconditionally do setRelhasindex(true). - * - * Possible future improvement: skip the physical tuple update and just - * send out an invalidation message. - */ - setRelhasindex(heapId, true, false, InvalidOid); - heap_close(relationRelation, RowExclusiveLock); /* @@ -858,6 +845,15 @@ index_drop(Oid indexId) smgrunlink(DEFAULT_SMGR, userIndexRelation); + /* + * We are presently too lazy to attempt to compute the new correct value + * of relhasindex (the next VACUUM will fix it if necessary). So there is + * no need to update the pg_class tuple for the owning relation. + * But we must send out a shared-cache-inval notice on the owning relation + * to ensure other backends update their relcache lists of indexes. + */ + CacheInvalidateRelcache(heapId); + /* * Close rels, but keep locks */ @@ -1076,7 +1072,7 @@ LockClassinfoForUpdate(Oid relid, HeapTuple rtup, } break; } - RelationInvalidateHeapTuple(relationRelation, rtup); + CacheInvalidateHeapTuple(relationRelation, rtup); if (confirmCommitted) { HeapTupleHeader th = rtup->t_data; @@ -1137,10 +1133,8 @@ IndexesAreActive(Oid relid, bool confirmCommitted) * * NOTE: an important side-effect of this operation is that an SI invalidation * message is sent out to all backends --- including me --- causing relcache - * entries to be flushed or updated with the new hasindex data. - * Therefore, we execute the update even if relhasindex has the right value - * already. Possible future improvement: skip the disk update and just send - * an SI message in that case. + * entries to be flushed or updated with the new hasindex data. This must + * happen even if we find that no change is needed in the pg_class row. * ---------------- */ void @@ -1149,6 +1143,7 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) Relation pg_class; HeapTuple tuple; Form_pg_class classtuple; + bool dirty = false; HeapScanDesc pg_class_scan = NULL; /* @@ -1192,13 +1187,28 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) LockBuffer(pg_class_scan->rs_cbuf, BUFFER_LOCK_EXCLUSIVE); classtuple = (Form_pg_class) GETSTRUCT(tuple); - classtuple->relhasindex = hasindex; + + if (classtuple->relhasindex != hasindex) + { + classtuple->relhasindex = hasindex; + dirty = true; + } if (isprimary) - classtuple->relhaspkey = true; + { + if (!classtuple->relhaspkey) + { + classtuple->relhaspkey = true; + dirty = true; + } + } if (OidIsValid(reltoastidxid)) { Assert(classtuple->relkind == RELKIND_TOASTVALUE); - classtuple->reltoastidxid = reltoastidxid; + if (classtuple->reltoastidxid != reltoastidxid) + { + classtuple->reltoastidxid = reltoastidxid; + dirty = true; + } } if (pg_class_scan) @@ -1210,10 +1220,10 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) WriteNoReleaseBuffer(pg_class_scan->rs_cbuf); /* Send out shared cache inval if necessary */ if (!IsBootstrapProcessingMode()) - RelationInvalidateHeapTuple(pg_class, tuple); + CacheInvalidateHeapTuple(pg_class, tuple); BufferSync(); } - else + else if (dirty) { simple_heap_update(pg_class, &tuple->t_self, tuple); @@ -1228,6 +1238,11 @@ setRelhasindex(Oid relid, bool hasindex, bool isprimary, Oid reltoastidxid) CatalogCloseIndices(Num_pg_class_indices, idescs); } } + else + { + /* no need to change tuple, but force relcache rebuild anyway */ + CacheInvalidateRelcache(relid); + } if (!pg_class_scan) heap_freetuple(tuple); @@ -1280,7 +1295,7 @@ setNewRelfilenode(Relation relation) classTuple = &lockTupleData; /* Send out shared cache inval if necessary */ if (!IsBootstrapProcessingMode()) - RelationInvalidateHeapTuple(pg_class, classTuple); + CacheInvalidateHeapTuple(pg_class, classTuple); /* Update the buffer in-place */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); ((Form_pg_class) GETSTRUCT(classTuple))->relfilenode = newrelfilenode; @@ -1442,7 +1457,7 @@ UpdateStats(Oid relid, double reltuples) LockBuffer(pg_class_scan->rs_cbuf, BUFFER_LOCK_UNLOCK); WriteNoReleaseBuffer(pg_class_scan->rs_cbuf); if (!IsBootstrapProcessingMode()) - RelationInvalidateHeapTuple(pg_class, tuple); + CacheInvalidateHeapTuple(pg_class, tuple); } else { diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index a88ce7099ec435a475294cc63944c76c4c55d3d6..afcdac7fb56fd9b68f550b4478b23f2bdbcd0ca1 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -13,7 +13,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.215 2002/03/02 21:39:23 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.216 2002/03/03 17:47:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -527,7 +527,7 @@ vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples, * no flush will occur, but no great harm is done since there are no * noncritical state updates here.) */ - RelationInvalidateHeapTuple(rd, &rtup); + CacheInvalidateHeapTuple(rd, &rtup); /* Write the buffer */ WriteBuffer(buffer); @@ -583,7 +583,7 @@ vac_update_dbstats(Oid dbid, dbform->datfrozenxid = frozenXID; /* invalidate the tuple in the cache and write the buffer */ - RelationInvalidateHeapTuple(relation, tuple); + CacheInvalidateHeapTuple(relation, tuple); WriteNoReleaseBuffer(scan->rs_cbuf); heap_endscan(scan); @@ -1796,7 +1796,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, */ heap_copytuple_with_tuple(&tuple, &newtup); - RelationInvalidateHeapTuple(onerel, &tuple); + /* + * register invalidation of source tuple in catcaches. + */ + CacheInvalidateHeapTuple(onerel, &tuple); /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */ START_CRIT_SECTION(); @@ -1953,7 +1956,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, /* copy tuple */ heap_copytuple_with_tuple(&tuple, &newtup); - RelationInvalidateHeapTuple(onerel, &tuple); + /* + * register invalidation of source tuple in catcaches. + * + * (Note: we do not need to register the copied tuple, + * because we are not changing the tuple contents and + * so there cannot be any need to flush negative + * catcache entries.) + */ + CacheInvalidateHeapTuple(onerel, &tuple); /* NO ELOG(ERROR) TILL CHANGES ARE LOGGED */ START_CRIT_SECTION(); diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c index b79d67125592ce32f12f0bf6b7d740e5e96c7115..e04792d71fa0d26384c607cd4d46c75486868a4a 100644 --- a/src/backend/utils/cache/catcache.c +++ b/src/backend/utils/cache/catcache.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.89 2002/03/02 21:39:32 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/cache/catcache.c,v 1.90 2002/03/03 17:47:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -91,15 +91,15 @@ static const Oid eqproc[] = { #define EQPROC(SYSTEMTYPEOID) eqproc[(SYSTEMTYPEOID)-BOOLOID] -static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct); -static Index CatalogCacheComputeHashIndex(CatCache *cache, +static uint32 CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey); -static Index CatalogCacheComputeTupleHashIndex(CatCache *cache, +static uint32 CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple); -static void CatalogCacheInitializeCache(CatCache *cache); #ifdef CATCACHE_STATS static void CatCachePrintStats(void); #endif +static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct); +static void CatalogCacheInitializeCache(CatCache *cache); /* @@ -136,221 +136,17 @@ GetCCHashFunc(Oid keytype) } } -#ifdef CATCACHE_STATS - -static void -CatCachePrintStats(void) -{ - CatCache *cache; - long cc_searches = 0; - long cc_hits = 0; - long cc_newloads = 0; - - elog(LOG, "Catcache stats dump: %d/%d tuples in catcaches", - CacheHdr->ch_ntup, CacheHdr->ch_maxtup); - - for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) - { - if (cache->cc_ntup == 0 && cache->cc_searches == 0) - continue; /* don't print unused caches */ - elog(LOG, "Catcache %s/%s: %d tup, %ld srch, %ld hits, %ld loads, %ld not found", - cache->cc_relname, - cache->cc_indname, - cache->cc_ntup, - cache->cc_searches, - cache->cc_hits, - cache->cc_newloads, - cache->cc_searches - cache->cc_hits - cache->cc_newloads); - cc_searches += cache->cc_searches; - cc_hits += cache->cc_hits; - cc_newloads += cache->cc_newloads; - } - elog(LOG, "Catcache totals: %d tup, %ld srch, %ld hits, %ld loads, %ld not found", - CacheHdr->ch_ntup, - cc_searches, - cc_hits, - cc_newloads, - cc_searches - cc_hits - cc_newloads); -} - -#endif /* CATCACHE_STATS */ - - -/* - * Standard routine for creating cache context if it doesn't exist yet - * - * There are a lot of places (probably far more than necessary) that check - * whether CacheMemoryContext exists yet and want to create it if not. - * We centralize knowledge of exactly how to create it here. - */ -void -CreateCacheMemoryContext(void) -{ - /* - * Purely for paranoia, check that context doesn't exist; caller - * probably did so already. - */ - if (!CacheMemoryContext) - CacheMemoryContext = AllocSetContextCreate(TopMemoryContext, - "CacheMemoryContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); -} - - /* - * CatalogCacheInitializeCache + * CatalogCacheComputeHashValue * - * This function does final initialization of a catcache: obtain the tuple - * descriptor and set up the hash and equality function links. We assume - * that the relcache entry can be opened at this point! + * Compute the hash value associated with a given set of lookup keys */ -#ifdef CACHEDEBUG -#define CatalogCacheInitializeCache_DEBUG1 \ - elog(LOG, "CatalogCacheInitializeCache: cache @%p %s", cache, \ - cache->cc_relname) - -#define CatalogCacheInitializeCache_DEBUG2 \ -do { \ - if (cache->cc_key[i] > 0) { \ - elog(LOG, "CatalogCacheInitializeCache: load %d/%d w/%d, %u", \ - i+1, cache->cc_nkeys, cache->cc_key[i], \ - tupdesc->attrs[cache->cc_key[i] - 1]->atttypid); \ - } else { \ - elog(LOG, "CatalogCacheInitializeCache: load %d/%d w/%d", \ - i+1, cache->cc_nkeys, cache->cc_key[i]); \ - } \ -} while(0) - -#else -#define CatalogCacheInitializeCache_DEBUG1 -#define CatalogCacheInitializeCache_DEBUG2 -#endif - -static void -CatalogCacheInitializeCache(CatCache *cache) +static uint32 +CatalogCacheComputeHashValue(CatCache *cache, ScanKey cur_skey) { - Relation relation; - MemoryContext oldcxt; - TupleDesc tupdesc; - int i; - - CatalogCacheInitializeCache_DEBUG1; - - /* - * Open the relation without locking --- we only need the tupdesc, - * which we assume will never change ... - */ - relation = heap_openr(cache->cc_relname, NoLock); - Assert(RelationIsValid(relation)); - - /* - * switch to the cache context so our allocations do not vanish at the - * end of a transaction - */ - Assert(CacheMemoryContext != NULL); - - oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - - /* - * copy the relcache's tuple descriptor to permanent cache storage - */ - tupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation)); - - /* - * get the relation's relisshared flag, too - */ - cache->cc_relisshared = RelationGetForm(relation)->relisshared; - - /* - * return to the caller's memory context and close the rel - */ - MemoryContextSwitchTo(oldcxt); - - heap_close(relation, NoLock); + uint32 hashValue = 0; - CACHE3_elog(LOG, "CatalogCacheInitializeCache: %s, %d keys", - cache->cc_relname, cache->cc_nkeys); - - /* - * initialize cache's key information - */ - for (i = 0; i < cache->cc_nkeys; ++i) - { - Oid keytype; - - CatalogCacheInitializeCache_DEBUG2; - - if (cache->cc_key[i] > 0) - keytype = tupdesc->attrs[cache->cc_key[i] - 1]->atttypid; - else - { - if (cache->cc_key[i] != ObjectIdAttributeNumber) - elog(FATAL, "CatalogCacheInit: only sys attr supported is OID"); - keytype = OIDOID; - } - - cache->cc_hashfunc[i] = GetCCHashFunc(keytype); - - /* - * If GetCCHashFunc liked the type, safe to index into eqproc[] - */ - cache->cc_skey[i].sk_procedure = EQPROC(keytype); - - /* Do function lookup */ - fmgr_info_cxt(cache->cc_skey[i].sk_procedure, - &cache->cc_skey[i].sk_func, - CacheMemoryContext); - - /* Initialize sk_attno suitably for HeapKeyTest() and heap scans */ - cache->cc_skey[i].sk_attno = cache->cc_key[i]; - - CACHE4_elog(LOG, "CatalogCacheInit %s %d %p", - cache->cc_relname, - i, - cache); - } - - /* - * mark this cache fully initialized - */ - cache->cc_tupdesc = tupdesc; -} - -/* - * InitCatCachePhase2 -- external interface for CatalogCacheInitializeCache - * - * The only reason to call this routine is to ensure that the relcache - * has created entries for all the catalogs and indexes referenced by - * catcaches. Therefore, open the index too. An exception is the indexes - * on pg_am, which we don't use (cf. IndexScanOK). - */ -void -InitCatCachePhase2(CatCache *cache) -{ - if (cache->cc_tupdesc == NULL) - CatalogCacheInitializeCache(cache); - - if (cache->id != AMOID && - cache->id != AMNAME) - { - Relation idesc; - - idesc = index_openr(cache->cc_indname); - index_close(idesc); - } -} - -/* - * CatalogCacheComputeHashIndex - */ -static Index -CatalogCacheComputeHashIndex(CatCache *cache, ScanKey cur_skey) -{ - uint32 hashIndex = 0; - - CACHE4_elog(LOG, "CatalogCacheComputeHashIndex %s %d %p", + CACHE4_elog(DEBUG1, "CatalogCacheComputeHashValue %s %d %p", cache->cc_relname, cache->cc_nkeys, cache); @@ -358,39 +154,40 @@ CatalogCacheComputeHashIndex(CatCache *cache, ScanKey cur_skey) switch (cache->cc_nkeys) { case 4: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[3], cur_skey[3].sk_argument)) << 9; /* FALLTHROUGH */ case 3: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[2], cur_skey[2].sk_argument)) << 6; /* FALLTHROUGH */ case 2: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[1], cur_skey[1].sk_argument)) << 3; /* FALLTHROUGH */ case 1: - hashIndex ^= + hashValue ^= DatumGetUInt32(DirectFunctionCall1(cache->cc_hashfunc[0], cur_skey[0].sk_argument)); break; default: - elog(FATAL, "CCComputeHashIndex: %d cc_nkeys", cache->cc_nkeys); + elog(FATAL, "CCComputeHashValue: %d cc_nkeys", cache->cc_nkeys); break; } - hashIndex %= (uint32) cache->cc_size; - return (Index) hashIndex; + + return hashValue; } /* - * CatalogCacheComputeTupleHashIndex + * CatalogCacheComputeTupleHashValue + * + * Compute the hash value associated with a given tuple to be cached */ -static Index -CatalogCacheComputeTupleHashIndex(CatCache *cache, - HeapTuple tuple) +static uint32 +CatalogCacheComputeTupleHashValue(CatCache *cache, HeapTuple tuple) { ScanKeyData cur_skey[4]; bool isNull = false; @@ -442,16 +239,75 @@ CatalogCacheComputeTupleHashIndex(CatCache *cache, Assert(!isNull); break; default: - elog(FATAL, "CCComputeTupleHashIndex: %d cc_nkeys", + elog(FATAL, "CCComputeTupleHashValue: %d cc_nkeys", cache->cc_nkeys); break; } - return CatalogCacheComputeHashIndex(cache, cur_skey); + return CatalogCacheComputeHashValue(cache, cur_skey); } + +#ifdef CATCACHE_STATS + +static void +CatCachePrintStats(void) +{ + CatCache *cache; + long cc_searches = 0; + long cc_hits = 0; + long cc_neg_hits = 0; + long cc_newloads = 0; + long cc_invals = 0; + long cc_discards = 0; + + elog(DEBUG1, "Catcache stats dump: %d/%d tuples in catcaches", + CacheHdr->ch_ntup, CacheHdr->ch_maxtup); + + for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) + { + if (cache->cc_ntup == 0 && cache->cc_searches == 0) + continue; /* don't print unused caches */ + elog(DEBUG1, "Catcache %s/%s: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards", + cache->cc_relname, + cache->cc_indname, + cache->cc_ntup, + cache->cc_searches, + cache->cc_hits, + cache->cc_neg_hits, + cache->cc_hits + cache->cc_neg_hits, + cache->cc_newloads, + cache->cc_searches - cache->cc_hits - cache->cc_neg_hits - cache->cc_newloads, + cache->cc_searches - cache->cc_hits - cache->cc_neg_hits, + cache->cc_invals, + cache->cc_discards); + cc_searches += cache->cc_searches; + cc_hits += cache->cc_hits; + cc_neg_hits += cache->cc_neg_hits; + cc_newloads += cache->cc_newloads; + cc_invals += cache->cc_invals; + cc_discards += cache->cc_discards; + } + elog(DEBUG1, "Catcache totals: %d tup, %ld srch, %ld+%ld=%ld hits, %ld+%ld=%ld loads, %ld invals, %ld discards", + CacheHdr->ch_ntup, + cc_searches, + cc_hits, + cc_neg_hits, + cc_hits + cc_neg_hits, + cc_newloads, + cc_searches - cc_hits - cc_neg_hits - cc_newloads, + cc_searches - cc_hits - cc_neg_hits, + cc_invals, + cc_discards); +} + +#endif /* CATCACHE_STATS */ + + /* * CatCacheRemoveCTup + * + * Unlink and delete the given cache entry */ static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct) @@ -473,16 +329,26 @@ CatCacheRemoveCTup(CatCache *cache, CatCTup *ct) } /* - * CatalogCacheIdInvalidate() + * CatalogCacheIdInvalidate + * + * Invalidate entries in the specified cache, given a hash value and + * item pointer. Positive entries are deleted if they match the item + * pointer. Negative entries must be deleted if they match the hash + * value (since we do not have the exact key of the tuple that's being + * inserted). But this should only rarely result in loss of a cache + * entry that could have been kept. * - * Invalidate a tuple given a cache id. In this case the id should always - * be found (whether the cache has opened its relation or not). Of course, - * if the cache has yet to open its relation, there will be no tuples so - * no problem. + * Note that it's not very relevant whether the tuple identified by + * the item pointer is being inserted or deleted. We don't expect to + * find matching positive entries in the one case, and we don't expect + * to find matching negative entries in the other; but we will do the + * right things in any case. + * + * This routine is only quasi-public: it should only be used by inval.c. */ void CatalogCacheIdInvalidate(int cacheId, - Index hashIndex, + uint32 hashValue, ItemPointer pointer) { CatCache *ccp; @@ -491,37 +357,51 @@ CatalogCacheIdInvalidate(int cacheId, * sanity checks */ Assert(ItemPointerIsValid(pointer)); - CACHE1_elog(LOG, "CatalogCacheIdInvalidate: called"); + CACHE1_elog(DEBUG1, "CatalogCacheIdInvalidate: called"); /* * inspect caches to find the proper cache */ for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next) { + Index hashIndex; Dlelem *elt, *nextelt; if (cacheId != ccp->id) continue; - Assert(hashIndex < ccp->cc_size); + /* + * We don't bother to check whether the cache has finished + * initialization yet; if not, there will be no entries in it + * so no problem. + */ /* - * inspect the hash bucket until we find a match or exhaust + * inspect the proper hash bucket for matches */ + hashIndex = (Index) (hashValue % (uint32) ccp->cc_size); + for (elt = DLGetHead(&ccp->cc_bucket[hashIndex]); elt; elt = nextelt) { CatCTup *ct = (CatCTup *) DLE_VAL(elt); nextelt = DLGetSucc(elt); - if (ItemPointerEquals(pointer, &ct->tuple.t_self)) + if (hashValue != ct->hash_value) + continue; /* ignore non-matching hash values */ + + if (ct->negative || + ItemPointerEquals(pointer, &ct->tuple.t_self)) { if (ct->refcount > 0) ct->dead = true; else CatCacheRemoveCTup(ccp, ct); - CACHE1_elog(LOG, "CatalogCacheIdInvalidate: invalidated"); + CACHE1_elog(DEBUG1, "CatalogCacheIdInvalidate: invalidated"); +#ifdef CATCACHE_STATS + ccp->cc_invals++; +#endif /* could be multiple matches, so keep looking! */ } } @@ -531,17 +411,33 @@ CatalogCacheIdInvalidate(int cacheId, /* ---------------------------------------------------------------- * public functions - * - * AtEOXact_CatCache - * ResetCatalogCaches - * InitCatCache - * SearchCatCache - * ReleaseCatCache - * RelationInvalidateCatalogCacheTuple * ---------------------------------------------------------------- */ +/* + * Standard routine for creating cache context if it doesn't exist yet + * + * There are a lot of places (probably far more than necessary) that check + * whether CacheMemoryContext exists yet and want to create it if not. + * We centralize knowledge of exactly how to create it here. + */ +void +CreateCacheMemoryContext(void) +{ + /* + * Purely for paranoia, check that context doesn't exist; caller + * probably did so already. + */ + if (!CacheMemoryContext) + CacheMemoryContext = AllocSetContextCreate(TopMemoryContext, + "CacheMemoryContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); +} + + /* * AtEOXact_CatCache * @@ -609,6 +505,9 @@ ResetCatalogCache(CatCache *cache) ct->dead = true; else CatCacheRemoveCTup(cache, ct); +#ifdef CATCACHE_STATS + cache->cc_invals++; +#endif } } } @@ -623,12 +522,12 @@ ResetCatalogCaches(void) { CatCache *cache; - CACHE1_elog(LOG, "ResetCatalogCaches called"); + CACHE1_elog(DEBUG1, "ResetCatalogCaches called"); for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) ResetCatalogCache(cache); - CACHE1_elog(LOG, "end of ResetCatalogCaches call"); + CACHE1_elog(DEBUG1, "end of ResetCatalogCaches call"); } /* @@ -656,7 +555,7 @@ CatalogCacheFlushRelation(Oid relId) { CatCache *cache; - CACHE2_elog(LOG, "CatalogCacheFlushRelation called for %u", relId); + CACHE2_elog(DEBUG1, "CatalogCacheFlushRelation called for %u", relId); for (cache = CacheHdr->ch_caches; cache; cache = cache->cc_next) { @@ -691,6 +590,13 @@ CatalogCacheFlushRelation(Oid relId) nextelt = DLGetSucc(elt); + /* + * Negative entries are never considered related to a rel, + * even if the rel is part of their lookup key. + */ + if (ct->negative) + continue; + if (cache->cc_reloidattr == ObjectIdAttributeNumber) tupRelid = ct->tuple.t_data->t_oid; else @@ -711,12 +617,15 @@ CatalogCacheFlushRelation(Oid relId) ct->dead = true; else CatCacheRemoveCTup(cache, ct); +#ifdef CATCACHE_STATS + cache->cc_invals++; +#endif } } } } - CACHE1_elog(LOG, "end of CatalogCacheFlushRelation call"); + CACHE1_elog(DEBUG1, "end of CatalogCacheFlushRelation call"); } /* @@ -730,7 +639,7 @@ CatalogCacheFlushRelation(Oid relId) #ifdef CACHEDEBUG #define InitCatCache_DEBUG1 \ do { \ - elog(LOG, "InitCatCache: rel=%s id=%d nkeys=%d size=%d\n", \ + elog(DEBUG1, "InitCatCache: rel=%s id=%d nkeys=%d size=%d\n", \ cp->cc_relname, cp->id, cp->cc_nkeys, cp->cc_size); \ } while(0) @@ -791,9 +700,10 @@ InitCatCache(int id, cp->id = id; cp->cc_relname = relname; cp->cc_indname = indname; - cp->cc_reloidattr = reloidattr; + cp->cc_reloid = InvalidOid; /* temporary */ cp->cc_relisshared = false; /* temporary */ cp->cc_tupdesc = (TupleDesc) NULL; + cp->cc_reloidattr = reloidattr; cp->cc_ntup = 0; cp->cc_size = NCCBUCKETS; cp->cc_nkeys = nkeys; @@ -820,6 +730,152 @@ InitCatCache(int id, return cp; } +/* + * CatalogCacheInitializeCache + * + * This function does final initialization of a catcache: obtain the tuple + * descriptor and set up the hash and equality function links. We assume + * that the relcache entry can be opened at this point! + */ +#ifdef CACHEDEBUG +#define CatalogCacheInitializeCache_DEBUG1 \ + elog(DEBUG1, "CatalogCacheInitializeCache: cache @%p %s", cache, \ + cache->cc_relname) + +#define CatalogCacheInitializeCache_DEBUG2 \ +do { \ + if (cache->cc_key[i] > 0) { \ + elog(DEBUG1, "CatalogCacheInitializeCache: load %d/%d w/%d, %u", \ + i+1, cache->cc_nkeys, cache->cc_key[i], \ + tupdesc->attrs[cache->cc_key[i] - 1]->atttypid); \ + } else { \ + elog(DEBUG1, "CatalogCacheInitializeCache: load %d/%d w/%d", \ + i+1, cache->cc_nkeys, cache->cc_key[i]); \ + } \ +} while(0) + +#else +#define CatalogCacheInitializeCache_DEBUG1 +#define CatalogCacheInitializeCache_DEBUG2 +#endif + +static void +CatalogCacheInitializeCache(CatCache *cache) +{ + Relation relation; + MemoryContext oldcxt; + TupleDesc tupdesc; + int i; + + CatalogCacheInitializeCache_DEBUG1; + + /* + * Open the relation without locking --- we only need the tupdesc, + * which we assume will never change ... + */ + relation = heap_openr(cache->cc_relname, NoLock); + Assert(RelationIsValid(relation)); + + /* + * switch to the cache context so our allocations do not vanish at the + * end of a transaction + */ + Assert(CacheMemoryContext != NULL); + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + + /* + * copy the relcache's tuple descriptor to permanent cache storage + */ + tupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation)); + + /* + * get the relation's OID and relisshared flag, too + */ + cache->cc_reloid = RelationGetRelid(relation); + cache->cc_relisshared = RelationGetForm(relation)->relisshared; + + /* + * return to the caller's memory context and close the rel + */ + MemoryContextSwitchTo(oldcxt); + + heap_close(relation, NoLock); + + CACHE3_elog(DEBUG1, "CatalogCacheInitializeCache: %s, %d keys", + cache->cc_relname, cache->cc_nkeys); + + /* + * initialize cache's key information + */ + for (i = 0; i < cache->cc_nkeys; ++i) + { + Oid keytype; + + CatalogCacheInitializeCache_DEBUG2; + + if (cache->cc_key[i] > 0) + keytype = tupdesc->attrs[cache->cc_key[i] - 1]->atttypid; + else + { + if (cache->cc_key[i] != ObjectIdAttributeNumber) + elog(FATAL, "CatalogCacheInit: only sys attr supported is OID"); + keytype = OIDOID; + } + + cache->cc_hashfunc[i] = GetCCHashFunc(keytype); + + cache->cc_isname[i] = (keytype == NAMEOID); + + /* + * If GetCCHashFunc liked the type, safe to index into eqproc[] + */ + cache->cc_skey[i].sk_procedure = EQPROC(keytype); + + /* Do function lookup */ + fmgr_info_cxt(cache->cc_skey[i].sk_procedure, + &cache->cc_skey[i].sk_func, + CacheMemoryContext); + + /* Initialize sk_attno suitably for HeapKeyTest() and heap scans */ + cache->cc_skey[i].sk_attno = cache->cc_key[i]; + + CACHE4_elog(DEBUG1, "CatalogCacheInit %s %d %p", + cache->cc_relname, + i, + cache); + } + + /* + * mark this cache fully initialized + */ + cache->cc_tupdesc = tupdesc; +} + +/* + * InitCatCachePhase2 -- external interface for CatalogCacheInitializeCache + * + * The only reason to call this routine is to ensure that the relcache + * has created entries for all the catalogs and indexes referenced by + * catcaches. Therefore, open the index too. An exception is the indexes + * on pg_am, which we don't use (cf. IndexScanOK). + */ +void +InitCatCachePhase2(CatCache *cache) +{ + if (cache->cc_tupdesc == NULL) + CatalogCacheInitializeCache(cache); + + if (cache->id != AMOID && + cache->id != AMNAME) + { + Relation idesc; + + idesc = index_openr(cache->cc_indname); + index_close(idesc); + } +} + /* * IndexScanOK @@ -874,10 +930,20 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey) } /* - * SearchCatCache + * SearchCatCache * * This call searches a system cache for a tuple, opening the relation - * if necessary (the first access to a particular cache). + * if necessary (on the first access to a particular cache). + * + * The result is NULL if not found, or a pointer to a HeapTuple in + * the cache. The caller must not modify the tuple, and must call + * ReleaseCatCache() when done with it. + * + * The search key values should be expressed as Datums of the key columns' + * datatype(s). (Pass zeroes for any unused parameters.) As a special + * exception, the passed-in key for a NAME column can be just a C string; + * the caller need not go to the trouble of converting it to a fully + * null-padded NAME. */ HeapTuple SearchCatCache(CatCache *cache, @@ -887,11 +953,13 @@ SearchCatCache(CatCache *cache, Datum v4) { ScanKeyData cur_skey[4]; - Index hash; + uint32 hashValue; + Index hashIndex; Dlelem *elt; CatCTup *ct; - HeapTuple ntp; Relation relation; + HeapTuple ntp; + int i; MemoryContext oldcxt; /* @@ -916,12 +984,13 @@ SearchCatCache(CatCache *cache, /* * find the hash bucket in which to look for the tuple */ - hash = CatalogCacheComputeHashIndex(cache, cur_skey); + hashValue = CatalogCacheComputeHashValue(cache, cur_skey); + hashIndex = (Index) (hashValue % (uint32) cache->cc_size); /* * scan the hash bucket until we find a match or exhaust our tuples */ - for (elt = DLGetHead(&cache->cc_bucket[hash]); + for (elt = DLGetHead(&cache->cc_bucket[hashIndex]); elt; elt = DLGetSucc(elt)) { @@ -932,9 +1001,11 @@ SearchCatCache(CatCache *cache, if (ct->dead) continue; /* ignore dead entries */ + if (ct->hash_value != hashValue) + continue; /* quickly skip entry if wrong hash val */ + /* - * see if the cached tuple matches our key. (should we be worried - * about time ranges? -cim 10/2/90) + * see if the cached tuple matches our key. */ HeapKeyTest(&ct->tuple, cache->cc_tupdesc, @@ -945,33 +1016,53 @@ SearchCatCache(CatCache *cache, continue; /* - * we found a tuple in the cache: bump its refcount, move it to - * the front of the LRU list, and return it. We also move it to - * the front of the list for its hashbucket, in order to speed - * subsequent searches. (The most frequently accessed elements in - * any hashbucket will tend to be near the front of the - * hashbucket's list.) + * we found a match in the cache: move it to the front of the global + * LRU list. We also move it to the front of the list for its + * hashbucket, in order to speed subsequent searches. (The most + * frequently accessed elements in any hashbucket will tend to be + * near the front of the hashbucket's list.) */ - ct->refcount++; - DLMoveToFront(&ct->lrulist_elem); DLMoveToFront(&ct->cache_elem); + /* + * If it's a positive entry, bump its refcount and return it. + * If it's negative, we can report failure to the caller. + */ + if (!ct->negative) + { + ct->refcount++; + #ifdef CACHEDEBUG - CACHE3_elog(LOG, "SearchCatCache(%s): found in bucket %d", - cache->cc_relname, hash); + CACHE3_elog(DEBUG1, "SearchCatCache(%s): found in bucket %d", + cache->cc_relname, hashIndex); #endif /* CACHEDEBUG */ #ifdef CATCACHE_STATS - cache->cc_hits++; + cache->cc_hits++; #endif - return &ct->tuple; + return &ct->tuple; + } + else + { +#ifdef CACHEDEBUG + CACHE3_elog(DEBUG1, "SearchCatCache(%s): found neg entry in bucket %d", + cache->cc_relname, hashIndex); +#endif /* CACHEDEBUG */ + +#ifdef CATCACHE_STATS + cache->cc_neg_hits++; +#endif + + return NULL; + } } /* - * Tuple was not found in cache, so we have to try and retrieve it - * directly from the relation. If it's found, we add it to the cache. + * Tuple was not found in cache, so we have to try to retrieve it + * directly from the relation. If found, we will add it to the + * cache; if not found, we will add a negative cache entry instead. * * NOTE: it is possible for recursive cache lookups to occur while * reading the relation --- for example, due to shared-cache-inval @@ -987,14 +1078,18 @@ SearchCatCache(CatCache *cache, /* * open the relation associated with the cache */ - relation = heap_openr(cache->cc_relname, AccessShareLock); + relation = heap_open(cache->cc_reloid, AccessShareLock); + + /* + * Pre-create cache entry header, and mark no tuple found. + */ + ct = (CatCTup *) MemoryContextAlloc(CacheMemoryContext, sizeof(CatCTup)); + ct->negative = true; /* * Scan the relation to find the tuple. If there's an index, and if * it's safe to do so, use the index. Else do a heap scan. */ - ct = NULL; - if ((RelationGetForm(relation))->relhasindex && !IsIgnoringSystemIndexes() && IndexScanOK(cache, cur_skey)) @@ -1004,9 +1099,8 @@ SearchCatCache(CatCache *cache, RetrieveIndexResult indexRes; HeapTupleData tuple; Buffer buffer; - int i; - CACHE2_elog(LOG, "SearchCatCache(%s): performing index scan", + CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing index scan", cache->cc_relname); /* @@ -1031,8 +1125,8 @@ SearchCatCache(CatCache *cache, { /* Copy tuple into our context */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - ct = (CatCTup *) palloc(sizeof(CatCTup)); heap_copytuple_with_tuple(&tuple, &ct->tuple); + ct->negative = false; MemoryContextSwitchTo(oldcxt); ReleaseBuffer(buffer); break; @@ -1045,7 +1139,7 @@ SearchCatCache(CatCache *cache, { HeapScanDesc sd; - CACHE2_elog(LOG, "SearchCatCache(%s): performing heap scan", + CACHE2_elog(DEBUG1, "SearchCatCache(%s): performing heap scan", cache->cc_relname); sd = heap_beginscan(relation, 0, SnapshotNow, @@ -1057,8 +1151,8 @@ SearchCatCache(CatCache *cache, { /* Copy tuple into our context */ oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - ct = (CatCTup *) palloc(sizeof(CatCTup)); heap_copytuple_with_tuple(ntp, &ct->tuple); + ct->negative = false; MemoryContextSwitchTo(oldcxt); /* We should not free the result of heap_getnext... */ } @@ -1072,58 +1166,140 @@ SearchCatCache(CatCache *cache, heap_close(relation, AccessShareLock); /* - * scan is complete. if tup was found, we can add it to the cache. + * scan is complete. If tuple was not found, we need to build + * a fake tuple for the negative cache entry. The fake tuple has + * the correct key columns, but nulls everywhere else. */ - if (ct == NULL) - return NULL; + if (ct->negative) + { + TupleDesc tupDesc = cache->cc_tupdesc; + Datum *values; + char *nulls; + Oid negOid = InvalidOid; + + values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); + nulls = (char *) palloc(tupDesc->natts * sizeof(char)); + + memset(values, 0, tupDesc->natts * sizeof(Datum)); + memset(nulls, 'n', tupDesc->natts * sizeof(char)); + + for (i = 0; i < cache->cc_nkeys; i++) + { + int attindex = cache->cc_key[i]; + Datum keyval = cur_skey[i].sk_argument; + + if (attindex > 0) + { + /* + * Here we must be careful in case the caller passed a + * C string where a NAME is wanted: convert the given + * argument to a correctly padded NAME. Otherwise the + * memcpy() done in heap_formtuple could fall off the + * end of memory. + */ + if (cache->cc_isname[i]) + { + Name newval = (Name) palloc(NAMEDATALEN); + + namestrcpy(newval, DatumGetCString(keyval)); + keyval = NameGetDatum(newval); + } + values[attindex-1] = keyval; + nulls[attindex-1] = ' '; + } + else + { + Assert(attindex == ObjectIdAttributeNumber); + negOid = DatumGetObjectId(keyval); + } + } + + ntp = heap_formtuple(tupDesc, values, nulls); + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + heap_copytuple_with_tuple(ntp, &ct->tuple); + ct->tuple.t_data->t_oid = negOid; + MemoryContextSwitchTo(oldcxt); + + heap_freetuple(ntp); + for (i = 0; i < cache->cc_nkeys; i++) + { + if (cache->cc_isname[i]) + pfree(DatumGetName(values[cache->cc_key[i]-1])); + } + pfree(values); + pfree(nulls); + } /* * Finish initializing the CatCTup header, and add it to the linked * lists. */ - CACHE1_elog(LOG, "SearchCatCache: found tuple"); - ct->ct_magic = CT_MAGIC; ct->my_cache = cache; DLInitElem(&ct->lrulist_elem, (void *) ct); DLInitElem(&ct->cache_elem, (void *) ct); ct->refcount = 1; /* count this first reference */ ct->dead = false; + ct->hash_value = hashValue; DLAddHead(&CacheHdr->ch_lrulist, &ct->lrulist_elem); - DLAddHead(&cache->cc_bucket[hash], &ct->cache_elem); - -#ifdef CATCACHE_STATS - cache->cc_newloads++; -#endif + DLAddHead(&cache->cc_bucket[hashIndex], &ct->cache_elem); /* * If we've exceeded the desired size of the caches, try to throw away - * the least recently used entry. + * the least recently used entry. NB: the newly-built entry cannot + * get thrown away here, because it has positive refcount. */ ++cache->cc_ntup; if (++CacheHdr->ch_ntup > CacheHdr->ch_maxtup) { - for (elt = DLGetTail(&CacheHdr->ch_lrulist); - elt; - elt = DLGetPred(elt)) + Dlelem *prevelt; + + for (elt = DLGetTail(&CacheHdr->ch_lrulist); elt; elt = prevelt) { CatCTup *oldct = (CatCTup *) DLE_VAL(elt); + prevelt = DLGetPred(elt); + if (oldct->refcount == 0) { - CACHE2_elog(LOG, "SearchCatCache(%s): Overflow, LRU removal", + CACHE2_elog(DEBUG1, "SearchCatCache(%s): Overflow, LRU removal", cache->cc_relname); +#ifdef CATCACHE_STATS + oldct->my_cache->cc_discards++; +#endif CatCacheRemoveCTup(oldct->my_cache, oldct); - break; + if (CacheHdr->ch_ntup <= CacheHdr->ch_maxtup) + break; } } } - CACHE4_elog(LOG, "SearchCatCache(%s): Contains %d/%d tuples", + CACHE4_elog(DEBUG1, "SearchCatCache(%s): Contains %d/%d tuples", cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup); - CACHE3_elog(LOG, "SearchCatCache(%s): put in bucket %d", - cache->cc_relname, hash); + + if (ct->negative) + { + CACHE3_elog(DEBUG1, "SearchCatCache(%s): put neg entry in bucket %d", + cache->cc_relname, hashIndex); + + /* + * We are not returning the new entry to the caller, so reset its + * refcount. Note it would be uncool to set the refcount to 0 + * before doing the extra-entry removal step above. + */ + ct->refcount = 0; /* negative entries never have refs */ + + return NULL; + } + + CACHE3_elog(DEBUG1, "SearchCatCache(%s): put in bucket %d", + cache->cc_relname, hashIndex); + +#ifdef CATCACHE_STATS + cache->cc_newloads++; +#endif return &ct->tuple; } @@ -1164,7 +1340,7 @@ ReleaseCatCache(HeapTuple tuple) * * This is part of a rather subtle chain of events, so pay attention: * - * When a tuple is updated or deleted, it cannot be flushed from the + * When a tuple is inserted or deleted, it cannot be flushed from the * catcaches immediately, for reasons explained at the top of cache/inval.c. * Instead we have to add entry(s) for the tuple to a list of pending tuple * invalidations that will be done at the end of the command or transaction. @@ -1172,15 +1348,16 @@ ReleaseCatCache(HeapTuple tuple) * The lists of tuples that need to be flushed are kept by inval.c. This * routine is a helper routine for inval.c. Given a tuple belonging to * the specified relation, find all catcaches it could be in, compute the - * correct hashindex for each such catcache, and call the specified function - * to record the cache id, hashindex, and tuple ItemPointer in inval.c's + * correct hash value for each such catcache, and call the specified function + * to record the cache id, hash value, and tuple ItemPointer in inval.c's * lists. CatalogCacheIdInvalidate will be called later, if appropriate, * using the recorded information. * * Note that it is irrelevant whether the given tuple is actually loaded * into the catcache at the moment. Even if it's not there now, it might - * be by the end of the command --- or might be in other backends' caches - * --- so we have to be prepared to flush it. + * be by the end of the command, or there might be a matching negative entry + * to flush --- or other backends' caches might have such entries --- so + * we have to make list entries to flush it later. * * Also note that it's not an error if there are no catcaches for the * specified relation. inval.c doesn't know exactly which rels have @@ -1190,11 +1367,12 @@ ReleaseCatCache(HeapTuple tuple) void PrepareToInvalidateCacheTuple(Relation relation, HeapTuple tuple, - void (*function) (int, Index, ItemPointer, Oid)) + void (*function) (int, uint32, ItemPointer, Oid)) { CatCache *ccp; + Oid reloid; - CACHE1_elog(LOG, "PrepareToInvalidateCacheTuple: called"); + CACHE1_elog(DEBUG1, "PrepareToInvalidateCacheTuple: called"); /* * sanity checks @@ -1204,25 +1382,27 @@ PrepareToInvalidateCacheTuple(Relation relation, Assert(PointerIsValid(function)); Assert(CacheHdr != NULL); + reloid = RelationGetRelid(relation); + /* ---------------- * for each cache * if the cache contains tuples from the specified relation - * compute the tuple's hash index in this cache, + * compute the tuple's hash value in this cache, * and call the passed function to register the information. * ---------------- */ for (ccp = CacheHdr->ch_caches; ccp; ccp = ccp->cc_next) { - if (strcmp(ccp->cc_relname, RelationGetRelationName(relation)) != 0) - continue; - /* Just in case cache hasn't finished initialization yet... */ if (ccp->cc_tupdesc == NULL) CatalogCacheInitializeCache(ccp); + if (ccp->cc_reloid != reloid) + continue; + (*function) (ccp->id, - CatalogCacheComputeTupleHashIndex(ccp, tuple), + CatalogCacheComputeTupleHashValue(ccp, tuple), &tuple->t_self, ccp->cc_relisshared ? (Oid) 0 : MyDatabaseId); } diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 0e383b32c8417bfd9e1c123d7b8f33081cba92a8..6d397a7ba98c206d21354f47b4baef6a5213b992 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -5,26 +5,38 @@ * * This is subtle stuff, so pay attention: * - * When a tuple is updated or deleted, our time qualification rules consider - * that it is *still valid* so long as we are in the same command, ie, - * until the next CommandCounterIncrement() or transaction commit. - * (See utils/time/tqual.c.) At the command boundary, the old tuple stops + * When a tuple is updated or deleted, our standard time qualification rules + * consider that it is *still valid* so long as we are in the same command, + * ie, until the next CommandCounterIncrement() or transaction commit. + * (See utils/time/tqual.c, and note that system catalogs are generally + * scanned under SnapshotNow rules by the system, or plain user snapshots + * for user queries.) At the command boundary, the old tuple stops * being valid and the new version, if any, becomes valid. Therefore, * we cannot simply flush a tuple from the system caches during heap_update() * or heap_delete(). The tuple is still good at that point; what's more, * even if we did flush it, it might be reloaded into the caches by a later * request in the same command. So the correct behavior is to keep a list * of outdated (updated/deleted) tuples and then do the required cache - * flushes at the next command boundary. Similarly, we need a list of - * inserted tuples (including new versions of updated tuples), which we will - * use to flush those tuples out of the caches if we abort the transaction. - * Notice that the first list lives only till command boundary, whereas the - * second lives till end of transaction. Finally, we need a third list of - * all tuples outdated in the current transaction; if we commit, we send - * those invalidation events to all other backends (via the SI message queue) - * so that they can flush obsolete entries from their caches. This list - * definitely can't be processed until after we commit, otherwise the other - * backends won't see our updated tuples as good. + * flushes at the next command boundary. We must also keep track of + * inserted tuples so that we can flush "negative" cache entries that match + * the new tuples; again, that mustn't happen until end of command. + * + * Once we have finished the command, we still need to remember inserted + * tuples (including new versions of updated tuples), so that we can flush + * them from the caches if we abort the transaction. Similarly, we'd better + * be able to flush "negative" cache entries that may have been loaded in + * place of deleted tuples, so we still need the deleted ones too. + * + * If we successfully complete the transaction, we have to broadcast all + * these invalidation events to other backends (via the SI message queue) + * so that they can flush obsolete entries from their caches. Note we have + * to record the transaction commit before sending SI messages, otherwise + * the other backends won't see our updated tuples as good. + * + * In short, we need to remember until xact end every insert or delete + * of a tuple that might be in the system caches. Updates are treated as + * two events, delete + insert, for simplicity. (There are cases where + * it'd be possible to record just one event, but we don't currently try.) * * We do not need to register EVERY tuple operation in this way, just those * on tuples in relations that have associated catcaches. We do, however, @@ -62,7 +74,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/cache/inval.c,v 1.48 2002/02/19 20:11:17 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/cache/inval.c,v 1.49 2002/03/03 17:47:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -99,33 +111,26 @@ typedef struct InvalidationListHeader /* * ---------------- - * Invalidation info is divided into three parts. - * 1) shared invalidation to be sent to all backends at commit - * 2) local invalidation for the transaction itself (actually, just - * for the current command within the transaction) - * 3) rollback information for the transaction itself (in case we abort) + * Invalidation info is divided into two lists: + * 1) events so far in current command, not yet reflected to caches. + * 2) events in previous commands of current transaction; these have + * been reflected to local caches, and must be either broadcast to + * other backends or rolled back from local cache when we commit + * or abort the transaction. + * + * The relcache-file-invalidated flag can just be a simple boolean, + * since we only act on it at transaction commit; we don't care which + * command of the transaction set it. * ---------------- */ -/* - * head of invalidation message list for all backends - * eaten by AtCommit_Cache() in CommitTransaction() - */ -static InvalidationListHeader GlobalInvalidMsgs; +/* head of current-command event list */ +static InvalidationListHeader CurrentCmdInvalidMsgs; -static bool RelcacheInitFileInval; /* init file must be invalidated? */ +/* head of previous-commands event list */ +static InvalidationListHeader PriorCmdInvalidMsgs; -/* - * head of invalidation message list for the current command - * eaten by AtCommit_LocalCache() in CommandCounterIncrement() - */ -static InvalidationListHeader LocalInvalidMsgs; - -/* - * head of rollback message list for abort-time processing - * eaten by AtAbort_Cache() in AbortTransaction() - */ -static InvalidationListHeader RollbackMsgs; +static bool RelcacheInitFileInval; /* init file must be invalidated? */ /* ---------------------------------------------------------------- @@ -204,6 +209,29 @@ FreeInvalidationMessageList(InvalidationChunk **listHdr) } } +/* + * Append one list of invalidation message chunks to another, resetting + * the source chunk-list pointer to NULL. + */ +static void +AppendInvalidationMessageList(InvalidationChunk **destHdr, + InvalidationChunk **srcHdr) +{ + InvalidationChunk *chunk = *srcHdr; + + if (chunk == NULL) + return; /* nothing to do */ + + while (chunk->next != NULL) + chunk = chunk->next; + + chunk->next = *destHdr; + + *destHdr = *srcHdr; + + *srcHdr = NULL; +} + /* * Process a list of invalidation messages. * @@ -238,15 +266,15 @@ FreeInvalidationMessageList(InvalidationChunk **listHdr) */ static void AddCatcacheInvalidationMessage(InvalidationListHeader *hdr, - int id, Index hashIndex, + int id, uint32 hashValue, ItemPointer tuplePtr, Oid dbId) { SharedInvalidationMessage msg; msg.cc.id = (int16) id; - msg.cc.hashIndex = (uint16) hashIndex; - msg.cc.dbId = dbId; msg.cc.tuplePtr = *tuplePtr; + msg.cc.dbId = dbId; + msg.cc.hashValue = hashValue; AddInvalidationMessage(&hdr->cclist, &msg); } @@ -271,6 +299,18 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, AddInvalidationMessage(&hdr->rclist, &msg); } +/* + * Append one list of invalidation messages to another, resetting + * the source list to empty. + */ +static void +AppendInvalidationMessages(InvalidationListHeader *dest, + InvalidationListHeader *src) +{ + AppendInvalidationMessageList(&dest->cclist, &src->cclist); + AppendInvalidationMessageList(&dest->rclist, &src->rclist); +} + /* * Reset an invalidation list to empty * @@ -318,21 +358,16 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr, /* * RegisterCatcacheInvalidation * - * Register an invalidation event for an updated/deleted catcache entry. - * We insert the event into both GlobalInvalidMsgs (for transmission - * to other backends at transaction commit) and LocalInvalidMsgs (for - * my local invalidation at end of command within xact). + * Register an invalidation event for a catcache tuple entry. */ static void RegisterCatcacheInvalidation(int cacheId, - Index hashIndex, + uint32 hashValue, ItemPointer tuplePtr, Oid dbId) { - AddCatcacheInvalidationMessage(&GlobalInvalidMsgs, - cacheId, hashIndex, tuplePtr, dbId); - AddCatcacheInvalidationMessage(&LocalInvalidMsgs, - cacheId, hashIndex, tuplePtr, dbId); + AddCatcacheInvalidationMessage(&CurrentCmdInvalidMsgs, + cacheId, hashValue, tuplePtr, dbId); } /* @@ -343,11 +378,8 @@ RegisterCatcacheInvalidation(int cacheId, static void RegisterRelcacheInvalidation(Oid dbId, Oid relId) { - AddRelcacheInvalidationMessage(&GlobalInvalidMsgs, + AddRelcacheInvalidationMessage(&CurrentCmdInvalidMsgs, dbId, relId); - AddRelcacheInvalidationMessage(&LocalInvalidMsgs, - dbId, relId); - /* * If the relation being invalidated is one of those cached in the * relcache init file, mark that we need to zap that file at commit. @@ -356,34 +388,6 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId) RelcacheInitFileInval = true; } -/* - * RegisterCatcacheRollback - * - * Register an invalidation event for an inserted catcache entry. - * This only needs to be flushed out of my local catcache, if I abort. - */ -static void -RegisterCatcacheRollback(int cacheId, - Index hashIndex, - ItemPointer tuplePtr, - Oid dbId) -{ - AddCatcacheInvalidationMessage(&RollbackMsgs, - cacheId, hashIndex, tuplePtr, dbId); -} - -/* - * RegisterRelcacheRollback - * - * As above, but register a relcache invalidation event. - */ -static void -RegisterRelcacheRollback(Oid dbId, Oid relId) -{ - AddRelcacheInvalidationMessage(&RollbackMsgs, - dbId, relId); -} - /* * LocalExecuteInvalidationMessage * @@ -398,7 +402,7 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) { if (msg->cc.dbId == MyDatabaseId || msg->cc.dbId == 0) CatalogCacheIdInvalidate(msg->cc.id, - msg->cc.hashIndex, + msg->cc.hashValue, &msg->cc.tuplePtr); } else if (msg->id == SHAREDINVALRELCACHE_ID) @@ -438,7 +442,7 @@ InvalidateSystemCaches(void) */ static void PrepareForTupleInvalidation(Relation relation, HeapTuple tuple, - void (*CacheIdRegisterFunc) (int, Index, + void (*CacheIdRegisterFunc) (int, uint32, ItemPointer, Oid), void (*RelationIdRegisterFunc) (Oid, Oid)) { @@ -517,16 +521,18 @@ AcceptInvalidationMessages(void) * AtEOXactInvalidationMessages * Process queued-up invalidation messages at end of transaction. * - * If isCommit, we must send out the messages in our GlobalInvalidMsgs list + * If isCommit, we must send out the messages in our PriorCmdInvalidMsgs list * to the shared invalidation message queue. Note that these will be read * not only by other backends, but also by our own backend at the next - * transaction start (via AcceptInvalidationMessages). Therefore, it's okay - * to discard any pending LocalInvalidMsgs, since these will be redundant - * with the global list. + * transaction start (via AcceptInvalidationMessages). This means that + * we can skip immediate local processing of anything that's still in + * CurrentCmdInvalidMsgs, and just send that list out too. * * If not isCommit, we are aborting, and must locally process the messages - * in our RollbackMsgs list. No messages need be sent to other backends, - * since they'll not have seen our changed tuples anyway. + * in PriorCmdInvalidMsgs. No messages need be sent to other backends, + * since they'll not have seen our changed tuples anyway. We can forget + * about CurrentCmdInvalidMsgs too, since those changes haven't touched + * the caches yet. * * In any case, reset the various lists to empty. We need not physically * free memory here, since TopTransactionContext is about to be emptied @@ -548,7 +554,10 @@ AtEOXactInvalidationMessages(bool isCommit) if (RelcacheInitFileInval) RelationCacheInitFileInvalidate(true); - ProcessInvalidationMessages(&GlobalInvalidMsgs, + AppendInvalidationMessages(&PriorCmdInvalidMsgs, + &CurrentCmdInvalidMsgs); + + ProcessInvalidationMessages(&PriorCmdInvalidMsgs, SendSharedInvalidMessage); if (RelcacheInitFileInval) @@ -556,15 +565,14 @@ AtEOXactInvalidationMessages(bool isCommit) } else { - ProcessInvalidationMessages(&RollbackMsgs, + ProcessInvalidationMessages(&PriorCmdInvalidMsgs, LocalExecuteInvalidationMessage); } RelcacheInitFileInval = false; - DiscardInvalidationMessages(&GlobalInvalidMsgs, false); - DiscardInvalidationMessages(&LocalInvalidMsgs, false); - DiscardInvalidationMessages(&RollbackMsgs, false); + DiscardInvalidationMessages(&PriorCmdInvalidMsgs, false); + DiscardInvalidationMessages(&CurrentCmdInvalidMsgs, false); } /* @@ -573,13 +581,13 @@ AtEOXactInvalidationMessages(bool isCommit) * in a transaction. * * Here, we send no messages to the shared queue, since we don't know yet if - * we will commit. But we do need to locally process the LocalInvalidMsgs - * list, so as to flush our caches of any tuples we have outdated in the - * current command. + * we will commit. We do need to locally process the CurrentCmdInvalidMsgs + * list, so as to flush our caches of any entries we have outdated in the + * current command. We then move the current-cmd list over to become part + * of the prior-cmds list. * * The isCommit = false case is not currently used, but may someday be * needed to support rollback to a savepoint within a transaction. - * (I suspect it needs more work first --- tgl.) * * Note: * This should be called during CommandCounterIncrement(), @@ -590,29 +598,24 @@ CommandEndInvalidationMessages(bool isCommit) { if (isCommit) { - ProcessInvalidationMessages(&LocalInvalidMsgs, + ProcessInvalidationMessages(&CurrentCmdInvalidMsgs, LocalExecuteInvalidationMessage); + AppendInvalidationMessages(&PriorCmdInvalidMsgs, + &CurrentCmdInvalidMsgs); } else { - ProcessInvalidationMessages(&RollbackMsgs, - LocalExecuteInvalidationMessage); + /* XXX what needs to be done here? */ } - - /* - * LocalInvalidMsgs list is not interesting anymore, so flush it (for - * real). Do *not* clear GlobalInvalidMsgs or RollbackMsgs. - */ - DiscardInvalidationMessages(&LocalInvalidMsgs, true); } /* - * RelationInvalidateHeapTuple + * CacheInvalidateHeapTuple * Register the given tuple for invalidation at end of command * (ie, current command is outdating this tuple). */ void -RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple) +CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple) { PrepareForTupleInvalidation(relation, tuple, RegisterCatcacheInvalidation, @@ -620,14 +623,17 @@ RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple) } /* - * RelationMark4RollbackHeapTuple - * Register the given tuple for invalidation in case of abort - * (ie, current command is creating this tuple). + * CacheInvalidateRelcache + * Register invalidation of the specified relation's relcache entry + * at end of command. + * + * This is used in places that need to force relcache rebuild but aren't + * changing any of the tuples recognized as contributors to the relcache + * entry by PrepareForTupleInvalidation. (An example is dropping an index.) */ void -RelationMark4RollbackHeapTuple(Relation relation, HeapTuple tuple) +CacheInvalidateRelcache(Oid relationId) { - PrepareForTupleInvalidation(relation, tuple, - RegisterCatcacheRollback, - RegisterRelcacheRollback); + /* See KLUGE ALERT in PrepareForTupleInvalidation */ + RegisterRelcacheInvalidation(MyDatabaseId, relationId); } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index cd5162ac81c2e12579e2e76a9433075190a72fed..e3b0c69372bfa226e28a85bbc4f4db480abf755a 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.152 2002/02/19 20:11:17 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.153 2002/03/03 17:47:55 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -2065,10 +2065,16 @@ RelationBuildLocalRelation(const char *relname, rel->rd_isnailed = true; /* - * create a new tuple descriptor from the one passed in (we do this to - * copy it into the cache context) + * create a new tuple descriptor from the one passed in. We do this + * partly to copy it into the cache context, and partly because the + * new relation can't have any defaults or constraints yet; they + * have to be added in later steps, because they require additions + * to multiple system catalogs. We can copy attnotnull constraints + * here, however. */ - rel->rd_att = CreateTupleDescCopyConstr(tupDesc); + rel->rd_att = CreateTupleDescCopy(tupDesc); + for (i = 0; i < natts; i++) + rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull; /* * initialize relation tuple form (caller may add/override data later) @@ -2082,8 +2088,6 @@ RelationBuildLocalRelation(const char *relname, rel->rd_rel->relhasoids = true; rel->rd_rel->relnatts = natts; rel->rd_rel->reltype = InvalidOid; - if (tupDesc->constr) - rel->rd_rel->relchecks = tupDesc->constr->num_check; /* * Insert relation OID and database/tablespace ID into the right diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index af88e066e2e16099f1cb7ada8b189eaae2b3c098..e679910e253f99ea349d60e41cc0aa1731a2f6e0 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: sinval.h,v 1.25 2001/11/05 17:46:35 momjian Exp $ + * $Id: sinval.h,v 1.26 2002/03/03 17:47:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -28,22 +28,32 @@ * are available to identify other inval message types. * * Shared-inval events are initially driven by detecting tuple inserts, - * updates and deletions in system catalogs (see RelationInvalidateHeapTuple - * and RelationMark4RollbackHeapTuple). Note that some system catalogs have - * multiple caches on them (with different indexes). On detecting a tuple - * invalidation in such a catalog, a separate catcache inval message must be - * generated for each of its caches. The catcache inval message carries the - * hash index for the target tuple, so that the catcache only needs to search - * one hash chain not all its chains. Of course this assumes that all the - * backends are using identical hashing code, but that should be OK. + * updates and deletions in system catalogs (see CacheInvalidateHeapTuple). + * An update generates two inval events, one for the old tuple and one for + * the new --- this is needed to get rid of both positive entries for the + * old tuple, and negative cache entries associated with the new tuple's + * cache key. (This could perhaps be optimized down to one event when the + * cache key is not changing, but for now we don't bother to try.) Note that + * the inval events themselves don't actually say whether the tuple is being + * inserted or deleted. + * + * Note that some system catalogs have multiple caches on them (with different + * indexes). On detecting a tuple invalidation in such a catalog, separate + * catcache inval messages must be generated for each of its caches. The + * catcache inval messages carry the hash value for the target tuple, so + * that the catcache only needs to search one hash chain not all its chains, + * and so that negative cache entries can be recognized with good accuracy. + * (Of course this assumes that all the backends are using identical hashing + * code, but that should be OK.) */ typedef struct { + /* note: field layout chosen with an eye to alignment concerns */ int16 id; /* cache ID --- must be first */ - uint16 hashIndex; /* hashchain index within this catcache */ - Oid dbId; /* database ID, or 0 if a shared relation */ ItemPointerData tuplePtr; /* tuple identifier in cached relation */ + Oid dbId; /* database ID, or 0 if a shared relation */ + uint32 hashValue; /* hash value of key for this catcache */ } SharedInvalCatcacheMsg; #define SHAREDINVALRELCACHE_ID (-1) diff --git a/src/include/utils/catcache.h b/src/include/utils/catcache.h index 6ad0d74851c9987a8c44177b358e281a94955698..cf820e37b8f4f6ba50630b9e204c05e06111c376 100644 --- a/src/include/utils/catcache.h +++ b/src/include/utils/catcache.h @@ -13,7 +13,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: catcache.h,v 1.38 2002/02/19 20:11:19 tgl Exp $ + * $Id: catcache.h,v 1.39 2002/03/03 17:47:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -35,20 +35,28 @@ typedef struct catcache struct catcache *cc_next; /* link to next catcache */ char *cc_relname; /* name of relation the tuples come from */ char *cc_indname; /* name of index matching cache keys */ - int cc_reloidattr; /* AttrNumber of relation OID, or 0 */ + Oid cc_reloid; /* OID of relation the tuples come from */ bool cc_relisshared; /* is relation shared? */ TupleDesc cc_tupdesc; /* tuple descriptor (copied from reldesc) */ + int cc_reloidattr; /* AttrNumber of relation OID attr, or 0 */ int cc_ntup; /* # of tuples currently in this cache */ int cc_size; /* # of hash buckets in this cache */ int cc_nkeys; /* number of keys (1..4) */ int cc_key[4]; /* AttrNumber of each key */ PGFunction cc_hashfunc[4]; /* hash function to use for each key */ ScanKeyData cc_skey[4]; /* precomputed key info for heap scans */ + bool cc_isname[4]; /* flag key columns that are NAMEs */ #ifdef CATCACHE_STATS long cc_searches; /* total # searches against this cache */ long cc_hits; /* # of matches against existing entry */ + long cc_neg_hits; /* # of matches against negative entry */ long cc_newloads; /* # of successful loads of new entry */ - /* cc_searches - (cc_hits + cc_newloads) is # of failed searches */ + /* + * cc_searches - (cc_hits + cc_neg_hits + cc_newloads) is number of + * failed searches, each of which will result in loading a negative entry + */ + long cc_invals; /* # of entries invalidated from cache */ + long cc_discards; /* # of entries discarded due to overflow */ #endif Dllist cc_bucket[1]; /* hash buckets --- VARIABLE LENGTH ARRAY */ } CatCache; /* VARIABLE LENGTH STRUCT */ @@ -68,11 +76,18 @@ typedef struct catctup * A tuple marked "dead" must not be returned by subsequent searches. * However, it won't be physically deleted from the cache until its * refcount goes to zero. + * + * A negative cache entry is an assertion that there is no tuple + * matching a particular key. This is just as useful as a normal entry + * so far as avoiding catalog searches is concerned. Management of + * positive and negative entries is identical. */ Dlelem lrulist_elem; /* list member of global LRU list */ Dlelem cache_elem; /* list member of per-bucket list */ int refcount; /* number of active references */ bool dead; /* dead but not yet removed? */ + bool negative; /* negative cache entry? */ + uint32 hash_value; /* hash value for this tuple's keys */ HeapTupleData tuple; /* tuple management header */ } CatCTup; @@ -104,10 +119,10 @@ extern void ReleaseCatCache(HeapTuple tuple); extern void ResetCatalogCaches(void); extern void CatalogCacheFlushRelation(Oid relId); -extern void CatalogCacheIdInvalidate(int cacheId, Index hashIndex, +extern void CatalogCacheIdInvalidate(int cacheId, uint32 hashValue, ItemPointer pointer); extern void PrepareToInvalidateCacheTuple(Relation relation, HeapTuple tuple, - void (*function) (int, Index, ItemPointer, Oid)); + void (*function) (int, uint32, ItemPointer, Oid)); #endif /* CATCACHE_H */ diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index 25fb73d570e5bf916987fea2d32018bb178d3dce..d2286ed54ef7bb693fcfc0c36671850e450aa73e 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: inval.h,v 1.23 2001/11/05 17:46:36 momjian Exp $ + * $Id: inval.h,v 1.24 2002/03/03 17:47:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -23,8 +23,8 @@ extern void AtEOXactInvalidationMessages(bool isCommit); extern void CommandEndInvalidationMessages(bool isCommit); -extern void RelationInvalidateHeapTuple(Relation relation, HeapTuple tuple); +extern void CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple); -extern void RelationMark4RollbackHeapTuple(Relation relation, HeapTuple tuple); +extern void CacheInvalidateRelcache(Oid relationId); #endif /* INVAL_H */