diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 7b27c2305821b4e5633759f7b3058a90f0e2c07c..245b224686257f3802934e7483ef26d74238a0eb 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -979,7 +979,7 @@ relation_openrv(const RangeVar *relation, LOCKMODE lockmode) Oid relOid; /* Look up and lock the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, lockmode, false, false); + relOid = RangeVarGetRelid(relation, lockmode, false); /* Let relation_open do the rest */ return relation_open(relOid, NoLock); @@ -1001,7 +1001,7 @@ relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode, Oid relOid; /* Look up and lock the appropriate relation using namespace search */ - relOid = RangeVarGetRelid(relation, lockmode, missing_ok, false); + relOid = RangeVarGetRelid(relation, lockmode, missing_ok); /* Return NULL on not-found */ if (!OidIsValid(relOid)) diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 9a125bdac68cc483b7f79138a873ff2756ec6fb4..60c0b8a5b234b4e3c33ffa3f373769ba72467372 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -606,7 +606,7 @@ objectNamesToOids(GrantObjectType objtype, List *objnames) RangeVar *relvar = (RangeVar *) lfirst(cell); Oid relOid; - relOid = RangeVarGetRelid(relvar, NoLock, false, false); + relOid = RangeVarGetRelid(relvar, NoLock, false); objects = lappend_oid(objects, relOid); } break; diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 99e130c1b0d9fea27e7de86225f646907d144601..758872ff4eda584ab4f269c768226e10079ddf08 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -111,7 +111,6 @@ static void validate_index_heapscan(Relation heapRelation, IndexInfo *indexInfo, Snapshot snapshot, v_i_state *state); -static Oid IndexGetRelation(Oid indexId); static bool ReindexIsCurrentlyProcessingIndex(Oid indexOid); static void SetReindexProcessing(Oid heapOid, Oid indexOid); static void ResetReindexProcessing(void); @@ -1301,7 +1300,7 @@ index_drop(Oid indexId) * proceeding until we commit and send out a shared-cache-inval notice * that will make them update their index lists. */ - heapId = IndexGetRelation(indexId); + heapId = IndexGetRelation(indexId, false); userHeapRelation = heap_open(heapId, AccessExclusiveLock); userIndexRelation = index_open(indexId, AccessExclusiveLock); @@ -2763,8 +2762,8 @@ validate_index_heapscan(Relation heapRelation, * IndexGetRelation: given an index's relation OID, get the OID of the * relation it is an index on. Uses the system cache. */ -static Oid -IndexGetRelation(Oid indexId) +Oid +IndexGetRelation(Oid indexId, bool missing_ok) { HeapTuple tuple; Form_pg_index index; @@ -2772,7 +2771,11 @@ IndexGetRelation(Oid indexId) tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexId)); if (!HeapTupleIsValid(tuple)) + { + if (missing_ok) + return InvalidOid; elog(ERROR, "cache lookup failed for index %u", indexId); + } index = (Form_pg_index) GETSTRUCT(tuple); Assert(index->indexrelid == indexId); @@ -2800,7 +2803,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks) * Open and lock the parent heap relation. ShareLock is sufficient since * we only need to be sure no schema or data changes are going on. */ - heapId = IndexGetRelation(indexId); + heapId = IndexGetRelation(indexId, false); heapRelation = heap_open(heapId, ShareLock); /* diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 6d4d4b1cccf765cb22692e3d1b68db91eacaf13c..25cad3d1528f6ba8eee5a60cb7bc67fc51d25c7d 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -219,10 +219,14 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS); * otherwise raise an error. * * If nowait = true, throw an error if we'd have to wait for a lock. + * + * Callback allows caller to check permissions or acquire additional locks + * prior to grabbing the relation lock. */ Oid -RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, - bool nowait) +RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode, + bool missing_ok, bool nowait, + RangeVarGetRelidCallback callback, void *callback_arg) { uint64 inval_count; Oid relId; @@ -308,6 +312,19 @@ RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, relId = RelnameGetRelid(relation->relname); } + /* + * Invoke caller-supplied callback, if any. + * + * This callback is a good place to check permissions: we haven't taken + * the table lock yet (and it's really best to check permissions before + * locking anything!), but we've gotten far enough to know what OID we + * think we should lock. Of course, concurrent DDL might things while + * we're waiting for the lock, but in that case the callback will be + * invoked again for the new OID. + */ + if (callback) + callback(relation, relId, oldRelId, callback_arg); + /* * If no lock requested, we assume the caller knows what they're * doing. They should have already acquired a heavyweight lock on diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index c32122466433349e08fd1222180412c77fb56af3..eb0584e99105f69433cd6dcfdd6f4f6701273c74 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -111,7 +111,7 @@ ExecRenameStmt(RenameStmt *stmt) * in RenameRelation, renameatt, or renametrig. */ relid = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, - false, false); + false); switch (stmt->renameType) { diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 69aa5bf6466a77042c5c6935c98b81bc48b4b28c..386b95bca2096f8fd4dea420681dd6fbd37252d9 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -63,7 +63,10 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo, static Oid GetIndexOpClass(List *opclass, Oid attrType, char *accessMethodName, Oid accessMethodId); static char *ChooseIndexNameAddition(List *colnames); - +static void RangeVarCallbackForReindexTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg); +static void RangeVarCallbackForReindexIndex(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg); /* * CheckIndexCompatible @@ -129,7 +132,7 @@ CheckIndexCompatible(Oid oldId, Datum d; /* Caller should already have the relation locked in some way. */ - relationId = RangeVarGetRelid(heapRelation, NoLock, false, false); + relationId = RangeVarGetRelid(heapRelation, NoLock, false); /* * We can pretend isconstraint = false unconditionally. It only serves to * decide the text of an error message that should never happen for us. @@ -1725,33 +1728,74 @@ void ReindexIndex(RangeVar *indexRelation) { Oid indOid; - HeapTuple tuple; + Oid heapOid = InvalidOid; + + /* lock level used here should match index lock reindex_index() */ + indOid = RangeVarGetRelidExtended(indexRelation, AccessExclusiveLock, + false, false, + RangeVarCallbackForReindexIndex, + (void *) &heapOid); + + reindex_index(indOid, false); +} + +/* + * Check permissions on table before acquiring relation lock; also lock + * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid + * deadlocks. + */ +static void +RangeVarCallbackForReindexIndex(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + char relkind; + Oid *heapOid = (Oid *) arg; /* - * XXX: This is not safe in the presence of concurrent DDL. We should - * take AccessExclusiveLock here, but that would violate the rule that - * indexes should only be locked after their parent tables. For now, - * we live with it. + * If we previously locked some other index's heap, and the name we're + * looking up no longer refers to that relation, release the now-useless + * lock. */ - indOid = RangeVarGetRelid(indexRelation, NoLock, false, false); - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indOid)); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", indOid); + if (relId != oldRelId && OidIsValid(oldRelId)) + { + /* lock level here should match reindex_index() heap lock */ + UnlockRelationOid(*heapOid, ShareLock); + *heapOid = InvalidOid; + } + + /* If the relation does not exist, there's nothing more to do. */ + if (!OidIsValid(relId)) + return; - if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX) + /* + * If the relation does exist, check whether it's an index. But note + * that the relation might have been dropped between the time we did the + * name lookup and now. In that case, there's nothing to do. + */ + relkind = get_rel_relkind(relId); + if (!relkind) + return; + if (relkind != RELKIND_INDEX) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not an index", - indexRelation->relname))); + errmsg("\"%s\" is not an index", relation->relname))); /* Check permissions */ - if (!pg_class_ownercheck(indOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - indexRelation->relname); - - ReleaseSysCache(tuple); + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); - reindex_index(indOid, false); + /* Lock heap before index to avoid deadlock. */ + if (relId != oldRelId) + { + /* + * Lock level here should match reindex_index() heap lock. + * If the OID isn't valid, it means the index as concurrently dropped, + * which is not a problem for us; just return normally. + */ + *heapOid = IndexGetRelation(relId, true); + if (OidIsValid(*heapOid)) + LockRelationOid(*heapOid, ShareLock); + } } /* @@ -1762,27 +1806,10 @@ void ReindexTable(RangeVar *relation) { Oid heapOid; - HeapTuple tuple; /* The lock level used here should match reindex_relation(). */ - heapOid = RangeVarGetRelid(relation, ShareLock, false, false); - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(heapOid)); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for relation %u", heapOid); - - if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION && - ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - relation->relname))); - - /* Check permissions */ - if (!pg_class_ownercheck(heapOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - relation->relname); - - ReleaseSysCache(tuple); + heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false, + RangeVarCallbackForReindexTable, NULL); if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST)) ereport(NOTICE, @@ -1790,6 +1817,37 @@ ReindexTable(RangeVar *relation) relation->relname))); } +/* + * Check permissions on table before acquiring relation lock. + */ +static void +RangeVarCallbackForReindexTable(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + char relkind; + + /* Nothing to do if the relation was not found. */ + if (!OidIsValid(relId)) + return; + + /* + * If the relation does exist, check whether it's an index. But note + * that the relation might have been dropped between the time we did the + * name lookup and now. In that case, there's nothing to do. + */ + relkind = get_rel_relkind(relId); + if (!relkind) + return; + if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table", relation->relname))); + + /* Check permissions */ + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname); +} + /* * ReindexDatabase * Recreate indexes of a database. diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index 875f868b969233344f3286d9491072c38fe3e9b2..91642ce03de2e4fb6e54bb4401fd5bdd63cae104 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -23,10 +23,12 @@ #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" -static void LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, - bool recurse); - +static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait); +static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode); +static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, + Oid oldrelid, void *arg); /* * LOCK TABLE @@ -51,98 +53,124 @@ LockTableCommand(LockStmt *lockstmt) foreach(p, lockstmt->relations) { RangeVar *rv = (RangeVar *) lfirst(p); - Relation rel; bool recurse = interpretInhOption(rv->inhOpt); Oid reloid; - reloid = RangeVarGetRelid(rv, lockstmt->mode, false, lockstmt->nowait); - rel = relation_open(reloid, NoLock); + reloid = RangeVarGetRelidExtended(rv, lockstmt->mode, false, + lockstmt->nowait, + RangeVarCallbackForLockTable, + (void *) &lockstmt->mode); - LockTableRecurse(rel, lockstmt->mode, lockstmt->nowait, recurse); + if (recurse) + LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait); } } /* - * Apply LOCK TABLE recursively over an inheritance tree + * Before acquiring a table lock on the named table, check whether we have + * permission to do so. */ static void -LockTableRecurse(Relation rel, LOCKMODE lockmode, bool nowait, bool recurse) +RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, + void *arg) { + LOCKMODE lockmode = * (LOCKMODE *) arg; + char relkind; AclResult aclresult; - Oid reloid = RelationGetRelid(rel); - /* Verify adequate privilege */ - if (lockmode == AccessShareLock) - aclresult = pg_class_aclcheck(reloid, GetUserId(), - ACL_SELECT); - else - aclresult = pg_class_aclcheck(reloid, GetUserId(), - ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, ACL_KIND_CLASS, - RelationGetRelationName(rel)); + if (!OidIsValid(relid)) + return; /* doesn't exist, so no permissions check */ + relkind = get_rel_relkind(relid); + if (!relkind) + return; /* woops, concurrently dropped; no permissions check */ /* Currently, we only allow plain tables to be locked */ - if (rel->rd_rel->relkind != RELKIND_RELATION) + if (relkind != RELKIND_RELATION) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); + rv->relname))); - /* - * If requested, recurse to children. We use find_inheritance_children - * not find_all_inheritors to avoid taking locks far in advance of - * checking privileges. This means we'll visit multiply-inheriting - * children more than once, but that's no problem. - */ - if (recurse) + /* Check permissions. */ + aclresult = LockTableAclCheck(relid, lockmode); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_CLASS, rv->relname); +} + +/* + * Apply LOCK TABLE recursively over an inheritance tree + * + * We use find_inheritance_children not find_all_inheritors to avoid taking + * locks far in advance of checking privileges. This means we'll visit + * multiply-inheriting children more than once, but that's no problem. + */ +static void +LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait) +{ + List *children; + ListCell *lc; + + children = find_inheritance_children(reloid, NoLock); + + foreach(lc, children) { - List *children = find_inheritance_children(reloid, NoLock); - ListCell *lc; - Relation childrel; + Oid childreloid = lfirst_oid(lc); + AclResult aclresult; + + /* Check permissions before acquiring the lock. */ + aclresult = LockTableAclCheck(childreloid, lockmode); + if (aclresult != ACLCHECK_OK) + { + char *relname = get_rel_name(childreloid); + if (!relname) + continue; /* child concurrently dropped, just skip it */ + aclcheck_error(aclresult, ACL_KIND_CLASS, relname); + } - foreach(lc, children) + /* We have enough rights to lock the relation; do so. */ + if (!nowait) + LockRelationOid(childreloid, lockmode); + else if (!ConditionalLockRelationOid(childreloid, lockmode)) { - Oid childreloid = lfirst_oid(lc); - - /* - * Acquire the lock, to protect against concurrent drops. Note - * that a lock against an already-dropped relation's OID won't - * fail. - */ - if (!nowait) - LockRelationOid(childreloid, lockmode); - else if (!ConditionalLockRelationOid(childreloid, lockmode)) - { - /* try to throw error by name; relation could be deleted... */ - char *relname = get_rel_name(childreloid); - - if (relname) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation \"%s\"", - relname))); - else - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on relation with OID %u", - reloid))); - } - - /* - * Now that we have the lock, check to see if the relation really - * exists or not. - */ - childrel = try_relation_open(childreloid, NoLock); - if (!childrel) - { - /* Release useless lock */ - UnlockRelationOid(childreloid, lockmode); - } - - LockTableRecurse(childrel, lockmode, nowait, recurse); + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(childreloid); + if (!relname) + continue; /* child concurrently dropped, just skip it */ + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relname))); } + + /* + * Even if we got the lock, child might have been concurrently dropped. + * If so, we can skip it. + */ + if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(childreloid))) + { + /* Release useless lock */ + UnlockRelationOid(childreloid, lockmode); + continue; + } + + LockTableRecurse(childreloid, lockmode, nowait); } +} - relation_close(rel, NoLock); /* close rel, keep lock */ +/* + * Check whether the current user is permitted to lock this relation. + */ +static AclResult +LockTableAclCheck(Oid reloid, LOCKMODE lockmode) +{ + AclResult aclresult; + + /* Verify adequate privilege */ + if (lockmode == AccessShareLock) + aclresult = pg_class_aclcheck(reloid, GetUserId(), + ACL_SELECT); + else + aclresult = pg_class_aclcheck(reloid, GetUserId(), + ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE); + return aclresult; } diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 54660f448055375f41efc42e05bee5f8eb10d264..41e5301fc5f84e128079816fc52fd985e521a75b 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -425,7 +425,7 @@ AlterSequence(AlterSeqStmt *stmt) List *owned_by; /* Open and lock sequence. */ - relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false, false); + relid = RangeVarGetRelid(stmt->sequence, AccessShareLock, false); init_sequence(relid, &elm, &seqrel); /* allow ALTER to sequence owner only */ @@ -513,7 +513,7 @@ nextval(PG_FUNCTION_ARGS) * whether the performance penalty is material in practice, but for now, * we do it this way. */ - relid = RangeVarGetRelid(sequence, NoLock, false, false); + relid = RangeVarGetRelid(sequence, NoLock, false); PG_RETURN_INT64(nextval_internal(relid)); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c4622c04125bbca7d3fa75e538c5737e4aa4fb96..1c3fe6a9630c0470baf48d38e3980978e4c0efcc 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -235,6 +235,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = { {'\0', 0, NULL, NULL, NULL, NULL} }; +struct DropRelationCallbackState +{ + char relkind; + Oid heapOid; +}; + /* Alter table target-type flags for ATSimplePermissions */ #define ATT_TABLE 0x0001 #define ATT_VIEW 0x0002 @@ -375,6 +381,9 @@ static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, ForkNumber forkNum, char relpersistence); static const char *storage_name(char c); +static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, + Oid oldRelOid, void *arg); + /* ---------------------------------------------------------------- * DefineRelation @@ -751,9 +760,8 @@ RemoveRelations(DropStmt *drop) { RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell)); Oid relOid; - HeapTuple tuple; - Form_pg_class classform; ObjectAddress obj; + struct DropRelationCallbackState state; /* * These next few steps are a great deal like relation_openrv, but we @@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop) */ AcceptInvalidationMessages(); - /* - * Look up the appropriate relation using namespace search. - * - * XXX: Doing this without a lock is unsafe in the presence of - * concurrent DDL, but acquiring a lock here might violate the rule - * that a table must be locked before its corresponding index. - * So, for now, we ignore the hazard. - */ - relOid = RangeVarGetRelid(rel, NoLock, true, false); + /* Look up the appropriate relation using namespace search. */ + state.relkind = relkind; + state.heapOid = InvalidOid; + relOid = RangeVarGetRelidExtended(rel, AccessExclusiveLock, true, + false, + RangeVarCallbackForDropRelation, + (void *) &state); /* Not there? */ if (!OidIsValid(relOid)) @@ -784,57 +790,12 @@ RemoveRelations(DropStmt *drop) continue; } - /* - * In DROP INDEX, attempt to acquire lock on the parent table before - * locking the index. index_drop() will need this anyway, and since - * regular queries lock tables before their indexes, we risk deadlock - * if we do it the other way around. No error if we don't find a - * pg_index entry, though --- that most likely means it isn't an - * index, and we'll fail below. - */ - if (relkind == RELKIND_INDEX) - { - tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relOid)); - if (HeapTupleIsValid(tuple)) - { - Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple); - - LockRelationOid(index->indrelid, AccessExclusiveLock); - ReleaseSysCache(tuple); - } - } - - /* Get the lock before trying to fetch the syscache entry */ - LockRelationOid(relOid, AccessExclusiveLock); - - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", relOid); - classform = (Form_pg_class) GETSTRUCT(tuple); - - if (classform->relkind != relkind) - DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); - - /* Allow DROP to either table owner or schema owner */ - if (!pg_class_ownercheck(relOid, GetUserId()) && - !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - rel->relname); - - if (!allowSystemTableMods && IsSystemClass(classform)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - rel->relname))); - /* OK, we're ready to delete this one */ obj.classId = RelationRelationId; obj.objectId = relOid; obj.objectSubId = 0; add_exact_object_address(&obj, objects); - - ReleaseSysCache(tuple); } performMultipleDeletions(objects, drop->behavior); @@ -842,6 +803,74 @@ RemoveRelations(DropStmt *drop) free_object_addresses(objects); } +/* + * Before acquiring a table lock, check whether we have sufficient rights. + * In the case of DROP INDEX, also try to lock the table before the index. + */ +static void +RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, + void *arg) +{ + HeapTuple tuple; + struct DropRelationCallbackState *state; + char relkind; + Form_pg_class classform; + + state = (struct DropRelationCallbackState *) arg; + relkind = state->relkind; + + /* + * If we previously locked some other index's heap, and the name we're + * looking up no longer refers to that relation, release the now-useless + * lock. + */ + if (relOid != oldRelOid && OidIsValid(state->heapOid)) + { + UnlockRelationOid(state->heapOid, AccessExclusiveLock); + state->heapOid = InvalidOid; + } + + /* Didn't find a relation, so need for locking or permission checks. */ + if (!OidIsValid(relOid)) + return; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); + if (!HeapTupleIsValid(tuple)) + return; /* concurrently dropped, so nothing to do */ + classform = (Form_pg_class) GETSTRUCT(tuple); + + if (classform->relkind != relkind) + DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); + + /* Allow DROP to either table owner or schema owner */ + if (!pg_class_ownercheck(relOid, GetUserId()) && + !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + rel->relname); + + if (!allowSystemTableMods && IsSystemClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + rel->relname))); + + ReleaseSysCache(tuple); + + /* + * In DROP INDEX, attempt to acquire lock on the parent table before + * locking the index. index_drop() will need this anyway, and since + * regular queries lock tables before their indexes, we risk deadlock + * if we do it the other way around. No error if we don't find a + * pg_index entry, though --- the relation may have been droppd. + */ + if (relkind == RELKIND_INDEX && relOid != oldRelOid) + { + state->heapOid = IndexGetRelation(relOid, true); + if (OidIsValid(state->heapOid)) + LockRelationOid(state->heapOid, AccessExclusiveLock); + } +} + /* * ExecuteTruncate * Executes a TRUNCATE command. diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 5589528e5ca3d63083c3f21fe157b5ddda75cd40..b205deca29f6cef42b889bd710ca6067c4961b42 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -201,8 +201,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, * we might end up creating a pg_constraint entry referencing a * nonexistent table. */ - constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false, - false); + constrrelid = RangeVarGetRelid(stmt->constrrel, AccessShareLock, false); } /* permission checks */ diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 5eec56950cc400669523145dbeecadfd5db5ce80..e70dbedbd056369cccc9b55acbca3a0209203b23 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -330,7 +330,7 @@ get_rel_oids(Oid relid, const RangeVar *vacrel) * alternative, since we're going to commit this transaction and * begin a new one between now and then. */ - relid = RangeVarGetRelid(vacrel, NoLock, false, false); + relid = RangeVarGetRelid(vacrel, NoLock, false); /* Make a relation list entry for this guy */ oldcontext = MemoryContextSwitchTo(vac_context); diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c index 9ac28c916cd608fef10dc7b54df583a254116c6b..35e4baa7240d8deac8a612b987a3b1ba7b178351 100644 --- a/src/backend/parser/parse_relation.c +++ b/src/backend/parser/parse_relation.c @@ -282,7 +282,7 @@ searchRangeTable(ParseState *pstate, RangeVar *relation) if (!relation->schemaname) cte = scanNameSpaceForCTE(pstate, refname, &ctelevelsup); if (!cte) - relId = RangeVarGetRelid(relation, NoLock, true, false); + relId = RangeVarGetRelid(relation, NoLock, true); /* Now look for RTEs matching either the relation/CTE or the alias */ for (levelsup = 0; diff --git a/src/backend/parser/parse_type.c b/src/backend/parser/parse_type.c index 2122032ce223ccaaf0eee3e908a9f35fc31ec42a..91028c44ca262295b678fb1d797f126e4bb24f3b 100644 --- a/src/backend/parser/parse_type.c +++ b/src/backend/parser/parse_type.c @@ -115,7 +115,7 @@ LookupTypeName(ParseState *pstate, const TypeName *typeName, * of concurrent DDL. But taking a lock would carry a performance * penalty and would also require a permissions check. */ - relid = RangeVarGetRelid(rel, NoLock, false, false); + relid = RangeVarGetRelid(rel, NoLock, false); attnum = get_attnum(relid, field); if (attnum == InvalidAttrNumber) ereport(ERROR, diff --git a/src/backend/rewrite/rewriteDefine.c b/src/backend/rewrite/rewriteDefine.c index 17db70e74a3162d4caa65a2e25ac42dbecaddf18..a9bb8d51f2c5c70de4658701297aaa2583cc18bc 100644 --- a/src/backend/rewrite/rewriteDefine.c +++ b/src/backend/rewrite/rewriteDefine.c @@ -203,8 +203,7 @@ DefineRule(RuleStmt *stmt, const char *queryString) * Find and lock the relation. Lock level should match * DefineQueryRewrite. */ - relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false, - false); + relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false); /* ... and execute */ DefineQueryRewrite(stmt->rulename, diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 6f88c476adae1a9564ea337ee6b42716c5f48159..36d9edc2aaf20a57ace57f2589b18095d821c6f6 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -84,7 +84,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs) * AccessExclusiveLock) before verifying that the user has permissions * is not appealing either. */ - relOid = RangeVarGetRelid(rel, NoLock, false, false); + relOid = RangeVarGetRelid(rel, NoLock, false); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); if (!HeapTupleIsValid(tuple)) /* should not happen */ diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c index e79ba50ad3315ab25dd5e5030a83ad5c627be84d..48fb673df251cd811b7e954aaca4de9283fe8884 100644 --- a/src/backend/utils/adt/acl.c +++ b/src/backend/utils/adt/acl.c @@ -1940,7 +1940,7 @@ convert_table_name(text *tablename) relrv = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); /* We might not even have permissions on this relation; don't lock it. */ - return RangeVarGetRelid(relrv, NoLock, false, false); + return RangeVarGetRelid(relrv, NoLock, false); } /* diff --git a/src/backend/utils/adt/regproc.c b/src/backend/utils/adt/regproc.c index 0d42a39ec431faf03e9c8b884d43dc14333b799d..c050f5ec910e4e8d019bad266b3583ab712b2ffc 100644 --- a/src/backend/utils/adt/regproc.c +++ b/src/backend/utils/adt/regproc.c @@ -824,8 +824,7 @@ regclassin(PG_FUNCTION_ARGS) names = stringToQualifiedNameList(class_name_or_oid); /* We might not even have permissions on this relation; don't lock it. */ - result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false, - false); + result = RangeVarGetRelid(makeRangeVarFromNameList(names), NoLock, false); PG_RETURN_OID(result); } @@ -1298,7 +1297,7 @@ text_regclass(PG_FUNCTION_ARGS) rv = makeRangeVarFromNameList(textToQualifiedNameList(relname)); /* We might not even have permissions on this relation; don't lock it. */ - result = RangeVarGetRelid(rv, NoLock, false, false); + result = RangeVarGetRelid(rv, NoLock, false); PG_RETURN_OID(result); } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 75923a6f2ea92c147f282d3e54ce60f3cb9c6610..29df7488fd64c58adb722cd901020f577f037936 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -390,7 +390,7 @@ pg_get_viewdef_name(PG_FUNCTION_ARGS) /* Look up view name. Can't lock it - we might not have privileges. */ viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname)); - viewoid = RangeVarGetRelid(viewrel, NoLock, false, false); + viewoid = RangeVarGetRelid(viewrel, NoLock, false); PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, 0))); } @@ -410,7 +410,7 @@ pg_get_viewdef_name_ext(PG_FUNCTION_ARGS) /* Look up view name. Can't lock it - we might not have privileges. */ viewrel = makeRangeVarFromNameList(textToQualifiedNameList(viewname)); - viewoid = RangeVarGetRelid(viewrel, NoLock, false, false); + viewoid = RangeVarGetRelid(viewrel, NoLock, false); PG_RETURN_TEXT_P(string_to_text(pg_get_viewdef_worker(viewoid, prettyFlags))); } @@ -1576,7 +1576,7 @@ pg_get_serial_sequence(PG_FUNCTION_ARGS) /* Look up table name. Can't lock it - we might not have privileges. */ tablerv = makeRangeVarFromNameList(textToQualifiedNameList(tablename)); - tableOid = RangeVarGetRelid(tablerv, NoLock, false, false); + tableOid = RangeVarGetRelid(tablerv, NoLock, false); /* Get the number of the column */ column = text_to_cstring(columnname); diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 8b78b05a2e68a719e9c19ea5dcbe342d8f524ccc..9efd9afed8d4070fa383c41c7d58fd75a3bc527b 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -99,5 +99,6 @@ extern bool reindex_relation(Oid relid, int flags); extern bool ReindexIsProcessingHeap(Oid heapOid); extern bool ReindexIsProcessingIndex(Oid indexOid); +extern Oid IndexGetRelation(Oid indexId, bool missing_ok); #endif /* INDEX_H */ diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 904c6fd97d844059c96d345e62bde216730f959f..d537a48b62ca481d00f15ae4f90d2b9ca9570d8c 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -47,9 +47,16 @@ typedef struct OverrideSearchPath bool addTemp; /* implicitly prepend temp schema? */ } OverrideSearchPath; +typedef void (*RangeVarGetRelidCallback)(const RangeVar *relation, Oid relId, + Oid oldRelId, void *callback_arg); -extern Oid RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, - bool missing_ok, bool nowait); +#define RangeVarGetRelid(relation, lockmode, missing_ok) \ + RangeVarGetRelidExtended(relation, lockmode, missing_ok, false, NULL, NULL) + +extern Oid RangeVarGetRelidExtended(const RangeVar *relation, + LOCKMODE lockmode, bool missing_ok, bool nowait, + RangeVarGetRelidCallback callback, + void *callback_arg); extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation); extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation); extern void RangeVarAdjustRelationPersistence(RangeVar *newRelation, Oid nspid); diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c index 275cad7b5c95ae880fba595ca7ae99bc4938ab02..d1be6c9985a06a0957b0d2ff8fa5b31e95ed1b61 100644 --- a/src/pl/plpgsql/src/pl_comp.c +++ b/src/pl/plpgsql/src/pl_comp.c @@ -1707,7 +1707,7 @@ plpgsql_parse_cwordtype(List *idents) strVal(lsecond(idents)), -1); /* Can't lock relation - we might not have privileges. */ - classOid = RangeVarGetRelid(relvar, NoLock, true, false); + classOid = RangeVarGetRelid(relvar, NoLock, true); if (!OidIsValid(classOid)) goto done; fldname = strVal(lthird(idents)); @@ -1808,7 +1808,7 @@ plpgsql_parse_cwordrowtype(List *idents) relvar = makeRangeVar(strVal(linitial(idents)), strVal(lsecond(idents)), -1); - classOid = RangeVarGetRelid(relvar, NoLock, false, false); + classOid = RangeVarGetRelid(relvar, NoLock, false); MemoryContextSwitchTo(oldCxt);