diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 67e60e1636827e7265d159127c421b2f257544eb..d313ec22531436beb0cc75f8043a82a600a40579 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.11 2002/09/19 23:40:56 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/dependency.c,v 1.12 2002/09/22 00:37:09 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -67,12 +67,11 @@ typedef enum ObjectClasses } ObjectClasses; /* expansible list of ObjectAddresses */ -typedef struct ObjectAddresses +typedef struct { ObjectAddress *refs; /* => palloc'd array */ int numrefs; /* current number of references */ int maxrefs; /* current size of palloc'd array */ - struct ObjectAddresses *link; /* list link for use in recursion */ } ObjectAddresses; /* for find_expr_references_walker */ @@ -92,10 +91,13 @@ static bool object_classes_initialized = false; static Oid object_classes[MAX_OCLASS]; +static void findAutoDeletableObjects(const ObjectAddress *object, + ObjectAddresses *oktodelete, + Relation depRel); static bool recursiveDeletion(const ObjectAddress *object, DropBehavior behavior, const ObjectAddress *callingObject, - ObjectAddresses *pending, + ObjectAddresses *oktodelete, Relation depRel); static void doDeletion(const ObjectAddress *object); static bool find_expr_references_walker(Node *node, @@ -107,9 +109,8 @@ static void add_object_address(ObjectClasses oclass, Oid objectId, int32 subId, ObjectAddresses *addrs); static void add_exact_object_address(const ObjectAddress *object, ObjectAddresses *addrs); -static void del_object_address(const ObjectAddress *object, +static bool object_address_present(const ObjectAddress *object, ObjectAddresses *addrs); -static void del_object_address_by_index(int index, ObjectAddresses *addrs); static void term_object_addresses(ObjectAddresses *addrs); static void init_object_classes(void); static ObjectClasses getObjectClass(const ObjectAddress *object); @@ -133,6 +134,7 @@ performDeletion(const ObjectAddress *object, { char *objDescription; Relation depRel; + ObjectAddresses oktodelete; /* * Get object description for possible use in failure message. Must do @@ -146,11 +148,23 @@ performDeletion(const ObjectAddress *object, */ depRel = heap_openr(DependRelationName, RowExclusiveLock); - if (!recursiveDeletion(object, behavior, NULL, NULL, depRel)) + /* + * Construct a list of objects that are reachable by AUTO or INTERNAL + * dependencies from the target object. These should be deleted silently, + * even if the actual deletion pass first reaches one of them via a + * non-auto dependency. + */ + init_object_addresses(&oktodelete); + + findAutoDeletableObjects(object, &oktodelete, depRel); + + if (!recursiveDeletion(object, behavior, NULL, &oktodelete, depRel)) elog(ERROR, "Cannot drop %s because other objects depend on it" "\n\tUse DROP ... CASCADE to drop the dependent objects too", objDescription); + term_object_addresses(&oktodelete); + heap_close(depRel, RowExclusiveLock); pfree(objDescription); @@ -158,17 +172,112 @@ performDeletion(const ObjectAddress *object, /* - * recursiveDeletion: delete a single object for performDeletion. + * findAutoDeletableObjects: find all objects that are reachable by AUTO or + * INTERNAL dependency paths from the given object. Add them all to the + * oktodelete list. Note that the originally given object will also be + * added to the list. + * + * depRel is the already-open pg_depend relation. + */ +static void +findAutoDeletableObjects(const ObjectAddress *object, + ObjectAddresses *oktodelete, + Relation depRel) +{ + ScanKeyData key[3]; + int nkeys; + SysScanDesc scan; + HeapTuple tup; + ObjectAddress otherObject; + + /* + * If this object is already in oktodelete, then we already visited it; + * don't do so again (this prevents infinite recursion if there's a loop + * in pg_depend). Otherwise, add it. + */ + if (object_address_present(object, oktodelete)) + return; + add_exact_object_address(object, oktodelete); + + /* + * Scan pg_depend records that link to this object, showing the things + * that depend on it. For each one that is AUTO or INTERNAL, visit the + * referencing object. + * + * When dropping a whole object (subId = 0), find pg_depend records for + * its sub-objects too. + */ + ScanKeyEntryInitialize(&key[0], 0x0, + Anum_pg_depend_refclassid, F_OIDEQ, + ObjectIdGetDatum(object->classId)); + ScanKeyEntryInitialize(&key[1], 0x0, + Anum_pg_depend_refobjid, F_OIDEQ, + ObjectIdGetDatum(object->objectId)); + if (object->objectSubId != 0) + { + ScanKeyEntryInitialize(&key[2], 0x0, + Anum_pg_depend_refobjsubid, F_INT4EQ, + Int32GetDatum(object->objectSubId)); + nkeys = 3; + } + else + nkeys = 2; + + scan = systable_beginscan(depRel, DependReferenceIndex, true, + SnapshotNow, nkeys, key); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); + + switch (foundDep->deptype) + { + case DEPENDENCY_NORMAL: + /* ignore */ + break; + case DEPENDENCY_AUTO: + case DEPENDENCY_INTERNAL: + /* recurse */ + otherObject.classId = foundDep->classid; + otherObject.objectId = foundDep->objid; + otherObject.objectSubId = foundDep->objsubid; + findAutoDeletableObjects(&otherObject, oktodelete, depRel); + break; + case DEPENDENCY_PIN: + /* + * For a PIN dependency we just elog immediately; there + * won't be any others to examine, and we aren't ever + * going to let the user delete it. + */ + elog(ERROR, "Cannot drop %s because it is required by the database system", + getObjectDescription(object)); + break; + default: + elog(ERROR, "findAutoDeletableObjects: unknown dependency type '%c' for %s", + foundDep->deptype, getObjectDescription(object)); + break; + } + } + + systable_endscan(scan); +} + + +/* + * recursiveDeletion: delete a single object for performDeletion, plus + * (recursively) anything that depends on it. * * Returns TRUE if successful, FALSE if not. * * callingObject is NULL at the outer level, else identifies the object that * we recursed from (the reference object that someone else needs to delete). - * pending is a linked list of objects that outer recursion levels want to - * delete. We remove the target object from any outer-level list it may - * appear in. + * + * oktodelete is a list of objects verified deletable (ie, reachable by one + * or more AUTO or INTERNAL dependencies from the original target). + * * depRel is the already-open pg_depend relation. * + * * In RESTRICT mode, we perform all the deletions anyway, but elog a NOTICE * and return FALSE if we find a restriction violation. performDeletion * will then abort the transaction to nullify the deletions. We have to @@ -176,35 +285,33 @@ performDeletion(const ObjectAddress *object, * while (b) not going into infinite recursion if there's a cycle. * * This is even more complex than one could wish, because it is possible for - * the same pair of objects to be related by both NORMAL and AUTO (or IMPLICIT) - * dependencies. (Since one or both paths might be indirect, it's very hard - * to prevent this; we must cope instead.) If there is an AUTO/IMPLICIT - * deletion path then we should perform the deletion, and not fail because - * of the NORMAL dependency. So, when we hit a NORMAL dependency we don't - * immediately decide we've failed; instead we stick the NORMAL dependent - * object into a list of pending deletions. If we find a legal path to delete - * that object later on, the recursive call will remove it from our pending - * list. After we've exhausted all such possibilities, we remove the - * remaining pending objects anyway, but emit a notice and prepare to return - * FALSE. (We have to do it this way because the dependent objects *must* be - * removed before we can remove the object they depend on.) + * the same pair of objects to be related by both NORMAL and AUTO/INTERNAL + * dependencies. Also, we might have a situation where we've been asked to + * delete object A, and objects B and C both have AUTO dependencies on A, + * but B also has a NORMAL dependency on C. (Since any of these paths might + * be indirect, we can't prevent these scenarios, but must cope instead.) + * If we visit C before B then we would mistakenly decide that the B->C link + * should prevent the restricted drop from occurring. To handle this, we make + * a pre-scan to find all the objects that are auto-deletable from A. If we + * visit C first, but B is present in the oktodelete list, then we make no + * complaint but recurse to delete B anyway. (Note that in general we must + * delete B before deleting C; the drop routine for B may try to access C.) * - * Note: in the case where the AUTO path is traversed first, we will never - * see the NORMAL dependency path because of the pg_depend removals done in - * recursive executions of step 1. The pending list is necessary essentially - * just to make the behavior independent of the order in which pg_depend + * Note: in the case where the path to B is traversed first, we will not + * see the NORMAL dependency when we reach C, because of the pg_depend + * removals done in step 1. The oktodelete list is necessary just + * to make the behavior independent of the order in which pg_depend * entries are visited. */ static bool recursiveDeletion(const ObjectAddress *object, DropBehavior behavior, const ObjectAddress *callingObject, - ObjectAddresses *pending, + ObjectAddresses *oktodelete, Relation depRel) { bool ok = true; char *objDescription; - ObjectAddresses mypending; ScanKeyData key[3]; int nkeys; SysScanDesc scan; @@ -219,12 +326,6 @@ recursiveDeletion(const ObjectAddress *object, */ objDescription = getObjectDescription(object); - /* - * Initialize list of restricted objects, and set up chain link. - */ - init_object_addresses(&mypending); - mypending.link = pending; - /* * Step 1: find and remove pg_depend records that link from this * object to others. We have to do this anyway, and doing it first @@ -274,9 +375,8 @@ recursiveDeletion(const ObjectAddress *object, * another object. We have three cases: * * 1. At the outermost recursion level, disallow the DROP. - * (We just elog here, rather than considering this drop - * to be pending, since no other dependencies are likely - * to be interesting.) + * (We just elog here, rather than proceeding, since no + * other dependencies are likely to be interesting.) */ if (callingObject == NULL) { @@ -344,11 +444,15 @@ recursiveDeletion(const ObjectAddress *object, /* * If we found we are owned by another object, ask it to delete itself - * instead of proceeding. + * instead of proceeding. Complain if RESTRICT mode, unless the other + * object is in oktodelete. */ if (amOwned) { - if (behavior == DROP_RESTRICT) + if (object_address_present(&owningObject, oktodelete)) + elog(DEBUG1, "Drop auto-cascades to %s", + getObjectDescription(&owningObject)); + else if (behavior == DROP_RESTRICT) { elog(NOTICE, "%s depends on %s", getObjectDescription(&owningObject), @@ -361,11 +465,10 @@ recursiveDeletion(const ObjectAddress *object, if (!recursiveDeletion(&owningObject, behavior, object, - pending, depRel)) + oktodelete, depRel)) ok = false; pfree(objDescription); - term_object_addresses(&mypending); return ok; } @@ -418,26 +521,30 @@ recursiveDeletion(const ObjectAddress *object, switch (foundDep->deptype) { case DEPENDENCY_NORMAL: - if (behavior == DROP_RESTRICT) + /* + * Perhaps there was another dependency path that would + * have allowed silent deletion of the otherObject, had + * we only taken that path first. + * In that case, act like this link is AUTO, too. + */ + if (object_address_present(&otherObject, oktodelete)) + elog(DEBUG1, "Drop auto-cascades to %s", + getObjectDescription(&otherObject)); + else if (behavior == DROP_RESTRICT) { - /* - * We've found a restricted object (or at least one - * that's not deletable along this path). Log for - * later processing. (Note it's okay if the same - * object gets into mypending multiple times.) - */ - add_exact_object_address(&otherObject, &mypending); + elog(NOTICE, "%s depends on %s", + getObjectDescription(&otherObject), + objDescription); + ok = false; } else - { elog(NOTICE, "Drop cascades to %s", getObjectDescription(&otherObject)); - if (!recursiveDeletion(&otherObject, behavior, - object, - &mypending, depRel)) - ok = false; - } + if (!recursiveDeletion(&otherObject, behavior, + object, + oktodelete, depRel)) + ok = false; break; case DEPENDENCY_AUTO: case DEPENDENCY_INTERNAL: @@ -452,7 +559,7 @@ recursiveDeletion(const ObjectAddress *object, if (!recursiveDeletion(&otherObject, behavior, object, - &mypending, depRel)) + oktodelete, depRel)) ok = false; break; case DEPENDENCY_PIN: @@ -473,36 +580,6 @@ recursiveDeletion(const ObjectAddress *object, systable_endscan(scan); - /* - * If we found no restricted objects, or got rid of them all via other - * paths, we're in good shape. Otherwise continue step 2 by - * processing the remaining restricted objects. - */ - if (mypending.numrefs > 0) - { - /* - * Successively extract and delete each remaining object. Note - * that the right things will happen if some of these objects - * depend on others: we'll report/delete each one exactly once. - */ - while (mypending.numrefs > 0) - { - ObjectAddress otherObject = mypending.refs[0]; - - del_object_address_by_index(0, &mypending); - - elog(NOTICE, "%s depends on %s", - getObjectDescription(&otherObject), - objDescription); - if (!recursiveDeletion(&otherObject, behavior, - object, - &mypending, depRel)) - ok = false; - } - - ok = false; - } - /* * We do not need CommandCounterIncrement here, since if step 2 did * anything then each recursive call will have ended with one. @@ -520,12 +597,6 @@ recursiveDeletion(const ObjectAddress *object, */ DeleteComments(object->objectId, object->classId, object->objectSubId); - /* - * If this object is mentioned in any caller's pending list, remove - * it. - */ - del_object_address(object, pending); - /* * CommandCounterIncrement here to ensure that preceding changes are * all visible. @@ -536,7 +607,6 @@ recursiveDeletion(const ObjectAddress *object, * And we're done! */ pfree(objDescription); - term_object_addresses(&mypending); return ok; } @@ -894,7 +964,6 @@ init_object_addresses(ObjectAddresses *addrs) addrs->maxrefs = 32; /* arbitrary initial array size */ addrs->refs = (ObjectAddress *) palloc(addrs->maxrefs * sizeof(ObjectAddress)); - addrs->link = NULL; /* Initialize object_classes[] if not done yet */ /* This will be needed by add_object_address() */ @@ -954,47 +1023,30 @@ add_exact_object_address(const ObjectAddress *object, } /* - * If an ObjectAddresses array contains any matches for the given object, - * remove it/them. Also, do the same in any linked ObjectAddresses arrays. + * Test whether an object is present in an ObjectAddresses array. + * + * We return "true" if object is a subobject of something in the array, too. */ -static void -del_object_address(const ObjectAddress *object, - ObjectAddresses *addrs) +static bool +object_address_present(const ObjectAddress *object, + ObjectAddresses *addrs) { - for (; addrs != NULL; addrs = addrs->link) + int i; + + for (i = addrs->numrefs - 1; i >= 0; i--) { - int i; + ObjectAddress *thisobj = addrs->refs + i; - /* Scan backwards to simplify deletion logic. */ - for (i = addrs->numrefs - 1; i >= 0; i--) + if (object->classId == thisobj->classId && + object->objectId == thisobj->objectId) { - ObjectAddress *thisobj = addrs->refs + i; - - if (object->classId == thisobj->classId && - object->objectId == thisobj->objectId) - { - /* - * Delete if exact match, or if thisobj is a subobject of - * the passed-in object. - */ - if (object->objectSubId == thisobj->objectSubId || - object->objectSubId == 0) - del_object_address_by_index(i, addrs); - } + if (object->objectSubId == thisobj->objectSubId || + thisobj->objectSubId == 0) + return true; } } -} -/* - * Remove an entry (specified by array index) from an ObjectAddresses array. - * The end item in the list is moved down to fill the hole. - */ -static void -del_object_address_by_index(int index, ObjectAddresses *addrs) -{ - Assert(index >= 0 && index < addrs->numrefs); - addrs->refs[index] = addrs->refs[addrs->numrefs - 1]; - addrs->numrefs--; + return false; } /* diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 3b495b010879aa55a27114925cb6f07edb319358..fa5aec0203bc3175e9273032b810412cae6be808 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.229 2002/09/19 23:40:56 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/heap.c,v 1.230 2002/09/22 00:37:09 tgl Exp $ * * * INTERFACE ROUTINES @@ -1342,6 +1342,7 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin) ' ', ' ', ' ', + InvalidOid, /* no associated index */ expr, /* Tree form check constraint */ ccbin, /* Binary form check constraint */ ccsrc); /* Source form check constraint */ diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 4909c2ea08402e44447c3e365f23c7a8d4b5ecbd..23fd0f6a171a08dd1d439c5af9bbf8083f6578a5 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.196 2002/09/04 20:31:14 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/index.c,v 1.197 2002/09/22 00:37:09 tgl Exp $ * * * INTERFACE ROUTINES @@ -693,6 +693,7 @@ index_create(Oid heapRelationId, ' ', ' ', ' ', + InvalidOid, /* no associated index */ NULL, /* no check constraint */ NULL, NULL); diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index 92d455223b09773ad553333c8e72f796aedd4085..e03e545beb37f5c4ea8929c8ad9cb24c08e5dffb 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/catalog/pg_constraint.c,v 1.6 2002/09/04 20:31:14 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/catalog/pg_constraint.c,v 1.7 2002/09/22 00:37:09 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,7 @@ CreateConstraintEntry(const char *constraintName, char foreignUpdateType, char foreignDeleteType, char foreignMatchType, + Oid indexRelId, Node *conExpr, const char *conBin, const char *conSrc) @@ -216,6 +217,21 @@ CreateConstraintEntry(const char *constraintName, } } + if (OidIsValid(indexRelId)) + { + /* + * Register normal dependency on the unique index that supports + * a foreign-key constraint. + */ + ObjectAddress relobject; + + relobject.classId = RelOid_pg_class; + relobject.objectId = indexRelId; + relobject.objectSubId = 0; + + recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL); + } + if (conExpr != NULL) { /* diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 380287caa37bad6b116f2ca6977ecad63bff0437..c79cd1f9eddd9154019af566e49302546c8750d7 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.41 2002/09/12 21:16:42 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.42 2002/09/22 00:37:09 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -41,6 +41,7 @@ #include "parser/gramparse.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" +#include "parser/parse_oper.h" #include "parser/parse_relation.h" #include "parser/parse_type.h" #include "utils/acl.h" @@ -59,10 +60,19 @@ static int findAttrByName(const char *attributeName, List *schema); static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass); static void CheckTupleType(Form_pg_class tuple_class); static bool needs_toast_table(Relation rel); +static void AlterTableAddCheckConstraint(Relation rel, Constraint *constr); +static void AlterTableAddForeignKeyConstraint(Relation rel, + FkConstraint *fkconstraint); +static int transformColumnNameList(Oid relId, List *colList, + const char *stmtname, + int16 *attnums, Oid *atttypids); +static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, + List **attnamelist, + int16 *attnums, Oid *atttypids); +static Oid transformFkeyCheckAttrs(Relation pkrel, + int numattrs, int16 *attnums); static void validateForeignKeyConstraint(FkConstraint *fkconstraint, Relation rel, Relation pkrel); -static Oid createForeignKeyConstraint(Relation rel, Relation pkrel, - FkConstraint *fkconstraint); static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, Oid constrOid); static char *fkMatchTypeToString(char match_type); @@ -2512,6 +2522,22 @@ AlterTableAddConstraint(Oid myrelid, bool recurse, { Constraint *constr = (Constraint *) newConstraint; + /* + * Assign or validate constraint name + */ + if (constr->name) + { + if (ConstraintNameIsUsed(RelationGetRelid(rel), + RelationGetNamespace(rel), + constr->name)) + elog(ERROR, "constraint \"%s\" already exists for relation \"%s\"", + constr->name, RelationGetRelationName(rel)); + } + else + constr->name = GenerateConstraintName(RelationGetRelid(rel), + RelationGetNamespace(rel), + &counter); + /* * Currently, we only expect to see CONSTR_CHECK nodes * arriving here (see the preprocessing done in @@ -2521,130 +2547,8 @@ AlterTableAddConstraint(Oid myrelid, bool recurse, switch (constr->contype) { case CONSTR_CHECK: - { - ParseState *pstate; - bool successful = true; - HeapScanDesc scan; - ExprContext *econtext; - TupleTableSlot *slot; - HeapTuple tuple; - RangeTblEntry *rte; - List *qual; - Node *expr; - - /* - * Assign or validate constraint name - */ - if (constr->name) - { - if (ConstraintNameIsUsed(RelationGetRelid(rel), - RelationGetNamespace(rel), - constr->name)) - elog(ERROR, "constraint \"%s\" already exists for relation \"%s\"", - constr->name, - RelationGetRelationName(rel)); - } - else - constr->name = GenerateConstraintName(RelationGetRelid(rel), - RelationGetNamespace(rel), - &counter); - - /* - * We need to make a parse state and range - * table to allow us to transformExpr and - * fix_opids to get a version of the - * expression we can pass to ExecQual - */ - pstate = make_parsestate(NULL); - rte = addRangeTableEntryForRelation(pstate, - myrelid, - makeAlias(RelationGetRelationName(rel), NIL), - false, - true); - addRTEtoQuery(pstate, rte, true, true); - - /* - * Convert the A_EXPR in raw_expr into an - * EXPR - */ - expr = transformExpr(pstate, constr->raw_expr); - - /* - * Make sure it yields a boolean result. - */ - expr = coerce_to_boolean(expr, "CHECK"); - - /* - * Make sure no outside relations are - * referred to. - */ - if (length(pstate->p_rtable) != 1) - elog(ERROR, "Only relation '%s' can be referenced in CHECK", - RelationGetRelationName(rel)); - - /* - * No subplans or aggregates, either... - */ - if (contain_subplans(expr)) - elog(ERROR, "cannot use subselect in CHECK constraint expression"); - if (contain_agg_clause(expr)) - elog(ERROR, "cannot use aggregate function in CHECK constraint expression"); - - /* - * Might as well try to reduce any - * constant expressions. - */ - expr = eval_const_expressions(expr); - - /* And fix the opids */ - fix_opids(expr); - - qual = makeList1(expr); - - /* Make tuple slot to hold tuples */ - slot = MakeTupleTableSlot(); - ExecSetSlotDescriptor(slot, RelationGetDescr(rel), false); - /* Make an expression context for ExecQual */ - econtext = MakeExprContext(slot, CurrentMemoryContext); - - /* - * Scan through the rows now, checking the - * expression at each row. - */ - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - if (!ExecQual(qual, econtext, true)) - { - successful = false; - break; - } - ResetExprContext(econtext); - } - - heap_endscan(scan); - - FreeExprContext(econtext); - pfree(slot); - - if (!successful) - elog(ERROR, "AlterTableAddConstraint: rejected due to CHECK constraint %s", - constr->name); - - /* - * Call AddRelationRawConstraints to do - * the real adding -- It duplicates some - * of the above, but does not check the - * validity of the constraint against - * tuples already in the table. - */ - AddRelationRawConstraints(rel, NIL, - makeList1(constr)); - - break; - } + AlterTableAddCheckConstraint(rel, constr); + break; default: elog(ERROR, "ALTER TABLE / ADD CONSTRAINT is not implemented for that constraint type."); } @@ -2653,8 +2557,6 @@ AlterTableAddConstraint(Oid myrelid, bool recurse, case T_FkConstraint: { FkConstraint *fkconstraint = (FkConstraint *) newConstraint; - Relation pkrel; - Oid constrOid; /* * Assign or validate constraint name @@ -2673,74 +2575,504 @@ AlterTableAddConstraint(Oid myrelid, bool recurse, RelationGetNamespace(rel), &counter); - /* - * Grab an exclusive lock on the pk table, so that - * someone doesn't delete rows out from under us. - * (Although a lesser lock would do for that purpose, - * we'll need exclusive lock anyway to add triggers to - * the pk table; trying to start with a lesser lock - * will just create a risk of deadlock.) - */ - pkrel = heap_openrv(fkconstraint->pktable, - AccessExclusiveLock); + AlterTableAddForeignKeyConstraint(rel, fkconstraint); - /* - * Validity checks - */ - if (pkrel->rd_rel->relkind != RELKIND_RELATION) - elog(ERROR, "referenced relation \"%s\" is not a table", - RelationGetRelationName(pkrel)); + break; + } + default: + elog(ERROR, "ALTER TABLE / ADD CONSTRAINT unable to determine type of constraint passed"); + } - if (!allowSystemTableMods - && IsSystemRelation(pkrel)) - elog(ERROR, "ALTER TABLE: relation \"%s\" is a system catalog", - RelationGetRelationName(pkrel)); + /* If we have multiple constraints to make, bump CC between 'em */ + if (lnext(listptr)) + CommandCounterIncrement(); + } - /* XXX shouldn't there be a permission check too? */ + /* Close rel, but keep lock till commit */ + heap_close(rel, NoLock); +} - if (isTempNamespace(RelationGetNamespace(pkrel)) && - !isTempNamespace(RelationGetNamespace(rel))) - elog(ERROR, "ALTER TABLE / ADD CONSTRAINT: Unable to reference temporary table from permanent table constraint"); +/* + * Add a check constraint to a single table + * + * Subroutine for AlterTableAddConstraint. Must already hold exclusive + * lock on the rel, and have done appropriate validity/permissions checks + * for it. + */ +static void +AlterTableAddCheckConstraint(Relation rel, Constraint *constr) +{ + ParseState *pstate; + bool successful = true; + HeapScanDesc scan; + ExprContext *econtext; + TupleTableSlot *slot; + HeapTuple tuple; + RangeTblEntry *rte; + List *qual; + Node *expr; - /* - * Check that the constraint is satisfied by existing - * rows (we can skip this during table creation). - * - * NOTE: we assume parser has already checked for - * existence of an appropriate unique index on the - * referenced relation, and that the column datatypes - * are comparable. - */ - if (!fkconstraint->skip_validation) - validateForeignKeyConstraint(fkconstraint, rel, pkrel); + /* + * We need to make a parse state and range + * table to allow us to transformExpr and + * fix_opids to get a version of the + * expression we can pass to ExecQual + */ + pstate = make_parsestate(NULL); + rte = addRangeTableEntryForRelation(pstate, + RelationGetRelid(rel), + makeAlias(RelationGetRelationName(rel), NIL), + false, + true); + addRTEtoQuery(pstate, rte, true, true); - /* - * Record the FK constraint in pg_constraint. - */ - constrOid = createForeignKeyConstraint(rel, pkrel, - fkconstraint); + /* + * Convert the A_EXPR in raw_expr into an EXPR + */ + expr = transformExpr(pstate, constr->raw_expr); - /* - * Create the triggers that will enforce the - * constraint. - */ - createForeignKeyTriggers(rel, fkconstraint, constrOid); + /* + * Make sure it yields a boolean result. + */ + expr = coerce_to_boolean(expr, "CHECK"); - /* - * Close pk table, but keep lock until we've - * committed. - */ - heap_close(pkrel, NoLock); + /* + * Make sure no outside relations are referred to. + */ + if (length(pstate->p_rtable) != 1) + elog(ERROR, "Only relation '%s' can be referenced in CHECK", + RelationGetRelationName(rel)); - break; + /* + * No subplans or aggregates, either... + */ + if (contain_subplans(expr)) + elog(ERROR, "cannot use subselect in CHECK constraint expression"); + if (contain_agg_clause(expr)) + elog(ERROR, "cannot use aggregate function in CHECK constraint expression"); + + /* + * Might as well try to reduce any constant expressions. + */ + expr = eval_const_expressions(expr); + + /* And fix the opids */ + fix_opids(expr); + + qual = makeList1(expr); + + /* Make tuple slot to hold tuples */ + slot = MakeTupleTableSlot(); + ExecSetSlotDescriptor(slot, RelationGetDescr(rel), false); + /* Make an expression context for ExecQual */ + econtext = MakeExprContext(slot, CurrentMemoryContext); + + /* + * Scan through the rows now, checking the expression at each row. + */ + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + if (!ExecQual(qual, econtext, true)) + { + successful = false; + break; + } + ResetExprContext(econtext); + } + + heap_endscan(scan); + + FreeExprContext(econtext); + pfree(slot); + + if (!successful) + elog(ERROR, "AlterTableAddConstraint: rejected due to CHECK constraint %s", + constr->name); + + /* + * Call AddRelationRawConstraints to do + * the real adding -- It duplicates some + * of the above, but does not check the + * validity of the constraint against + * tuples already in the table. + */ + AddRelationRawConstraints(rel, NIL, makeList1(constr)); +} + +/* + * Add a foreign-key constraint to a single table + * + * Subroutine for AlterTableAddConstraint. Must already hold exclusive + * lock on the rel, and have done appropriate validity/permissions checks + * for it. + */ +static void +AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint) +{ + const char *stmtname; + Relation pkrel; + AclResult aclresult; + int16 pkattnum[INDEX_MAX_KEYS]; + int16 fkattnum[INDEX_MAX_KEYS]; + Oid pktypoid[INDEX_MAX_KEYS]; + Oid fktypoid[INDEX_MAX_KEYS]; + int i; + int numfks, + numpks; + Oid indexOid; + Oid constrOid; + + /* cheat a little to discover statement type for error messages */ + stmtname = fkconstraint->skip_validation ? "CREATE TABLE" : "ALTER TABLE"; + + /* + * Grab an exclusive lock on the pk table, so that + * someone doesn't delete rows out from under us. + * (Although a lesser lock would do for that purpose, + * we'll need exclusive lock anyway to add triggers to + * the pk table; trying to start with a lesser lock + * will just create a risk of deadlock.) + */ + pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); + + /* + * Validity and permissions checks + * + * Note: REFERENCES permissions checks are redundant with CREATE TRIGGER, + * but we may as well error out sooner instead of later. + */ + if (pkrel->rd_rel->relkind != RELKIND_RELATION) + elog(ERROR, "referenced relation \"%s\" is not a table", + RelationGetRelationName(pkrel)); + + if (!allowSystemTableMods + && IsSystemRelation(pkrel)) + elog(ERROR, "%s: relation \"%s\" is a system catalog", + stmtname, RelationGetRelationName(pkrel)); + + aclresult = pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), + ACL_REFERENCES); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, RelationGetRelationName(pkrel)); + + aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), + ACL_REFERENCES); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, RelationGetRelationName(rel)); + + if (isTempNamespace(RelationGetNamespace(pkrel)) && + !isTempNamespace(RelationGetNamespace(rel))) + elog(ERROR, "%s: Unable to reference temporary table from permanent table constraint", + stmtname); + + /* + * Look up the referencing attributes to make sure they + * exist, and record their attnums and type OIDs. + */ + for (i = 0; i < INDEX_MAX_KEYS; i++) + { + pkattnum[i] = fkattnum[i] = 0; + pktypoid[i] = fktypoid[i] = InvalidOid; + } + + numfks = transformColumnNameList(RelationGetRelid(rel), + fkconstraint->fk_attrs, + stmtname, + fkattnum, fktypoid); + + /* + * If the attribute list for the referenced table was omitted, + * lookup the definition of the primary key and use it. Otherwise, + * validate the supplied attribute list. In either case, discover + * the index OID and the attnums and type OIDs of the attributes. + */ + if (fkconstraint->pk_attrs == NIL) + { + numpks = transformFkeyGetPrimaryKey(pkrel, &indexOid, + &fkconstraint->pk_attrs, + pkattnum, pktypoid); + } + else + { + numpks = transformColumnNameList(RelationGetRelid(pkrel), + fkconstraint->pk_attrs, + stmtname, + pkattnum, pktypoid); + /* Look for an index matching the column list */ + indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum); + } + + /* Be sure referencing and referenced column types are comparable */ + if (numfks != numpks) + elog(ERROR, "%s: number of referencing and referenced attributes for foreign key disagree", + stmtname); + + for (i = 0; i < numpks; i++) + { + /* + * fktypoid[i] is the foreign key table's i'th element's type + * pktypoid[i] is the primary key table's i'th element's type + * + * We let oper() do our work for us, including elog(ERROR) if the + * types don't compare with = + */ + Operator o = oper(makeList1(makeString("=")), + fktypoid[i], pktypoid[i], false); + + ReleaseSysCache(o); + } + + /* + * Check that the constraint is satisfied by existing + * rows (we can skip this during table creation). + */ + if (!fkconstraint->skip_validation) + validateForeignKeyConstraint(fkconstraint, rel, pkrel); + + /* + * Record the FK constraint in pg_constraint. + */ + constrOid = CreateConstraintEntry(fkconstraint->constr_name, + RelationGetNamespace(rel), + CONSTRAINT_FOREIGN, + fkconstraint->deferrable, + fkconstraint->initdeferred, + RelationGetRelid(rel), + fkattnum, + numfks, + InvalidOid, /* not a domain constraint */ + RelationGetRelid(pkrel), + pkattnum, + numpks, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + fkconstraint->fk_matchtype, + indexOid, + NULL, /* no check constraint */ + NULL, + NULL); + + /* + * Create the triggers that will enforce the constraint. + */ + createForeignKeyTriggers(rel, fkconstraint, constrOid); + + /* + * Close pk table, but keep lock until we've committed. + */ + heap_close(pkrel, NoLock); +} + + +/* + * transformColumnNameList - transform list of column names + * + * Lookup each name and return its attnum and type OID + */ +static int +transformColumnNameList(Oid relId, List *colList, + const char *stmtname, + int16 *attnums, Oid *atttypids) +{ + List *l; + int attnum; + + attnum = 0; + foreach(l, colList) + { + char *attname = strVal(lfirst(l)); + HeapTuple atttuple; + + atttuple = SearchSysCacheAttName(relId, attname); + if (!HeapTupleIsValid(atttuple)) + elog(ERROR, "%s: column \"%s\" referenced in foreign key constraint does not exist", + stmtname, attname); + if (attnum >= INDEX_MAX_KEYS) + elog(ERROR, "Can only have %d keys in a foreign key", + INDEX_MAX_KEYS); + attnums[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->attnum; + atttypids[attnum] = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid; + ReleaseSysCache(atttuple); + attnum++; + } + + return attnum; +} + +/* + * transformFkeyGetPrimaryKey - + * + * Look up the names, attnums, and types of the primary key attributes + * for the pkrel. Used when the column list in the REFERENCES specification + * is omitted. + */ +static int +transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, + List **attnamelist, + int16 *attnums, Oid *atttypids) +{ + List *indexoidlist, + *indexoidscan; + HeapTuple indexTuple = NULL; + Form_pg_index indexStruct = NULL; + int i; + + /* + * Get the list of index OIDs for the table from the relcache, and + * look up each one in the pg_index syscache until we find one marked + * primary key (hopefully there isn't more than one such). + */ + indexoidlist = RelationGetIndexList(pkrel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirsti(indexoidscan); + + indexTuple = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(indexoid), + 0, 0, 0); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "transformFkeyGetPrimaryKey: index %u not found", + indexoid); + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); + if (indexStruct->indisprimary) + { + *indexOid = indexoid; + break; + } + ReleaseSysCache(indexTuple); + indexStruct = NULL; + } + + freeList(indexoidlist); + + /* + * Check that we found it + */ + if (indexStruct == NULL) + elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found", + RelationGetRelationName(pkrel)); + + /* + * Now build the list of PK attributes from the indkey definition + */ + *attnamelist = NIL; + for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++) + { + int pkattno = indexStruct->indkey[i]; + + attnums[i] = pkattno; + atttypids[i] = attnumTypeId(pkrel, pkattno); + *attnamelist = lappend(*attnamelist, + makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno))))); + } + + ReleaseSysCache(indexTuple); + + return i; +} + +/* + * transformFkeyCheckAttrs - + * + * Make sure that the attributes of a referenced table belong to a unique + * (or primary key) constraint. Return the OID of the index supporting + * the constraint. + */ +static Oid +transformFkeyCheckAttrs(Relation pkrel, + int numattrs, int16 *attnums) +{ + Oid indexoid = InvalidOid; + bool found = false; + List *indexoidlist, + *indexoidscan; + + /* + * Get the list of index OIDs for the table from the relcache, and + * look up each one in the pg_index syscache, and match unique indexes + * to the list of attnums we are given. + */ + indexoidlist = RelationGetIndexList(pkrel); + + foreach(indexoidscan, indexoidlist) + { + HeapTuple indexTuple; + Form_pg_index indexStruct; + int i, j; + + indexoid = lfirsti(indexoidscan); + indexTuple = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(indexoid), + 0, 0, 0); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "transformFkeyCheckAttrs: index %u not found", + indexoid); + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); + + /* + * Must be unique, not a functional index, and not a partial index + */ + if (indexStruct->indisunique && + indexStruct->indproc == InvalidOid && + VARSIZE(&indexStruct->indpred) <= VARHDRSZ) + { + for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++) + ; + if (i == numattrs) + { + /* + * The given attnum list may match the index columns in any + * order. Check that each list is a subset of the other. + */ + for (i = 0; i < numattrs; i++) + { + found = false; + for (j = 0; j < numattrs; j++) + { + if (attnums[i] == indexStruct->indkey[j]) + { + found = true; + break; + } + } + if (!found) + break; } - default: - elog(ERROR, "ALTER TABLE / ADD CONSTRAINT unable to determine type of constraint passed"); + if (found) + { + for (i = 0; i < numattrs; i++) + { + found = false; + for (j = 0; j < numattrs; j++) + { + if (attnums[j] == indexStruct->indkey[i]) + { + found = true; + break; + } + } + if (!found) + break; + } + } + } } + ReleaseSysCache(indexTuple); + if (found) + break; } - /* Close rel, but keep lock till commit */ - heap_close(rel, NoLock); + if (!found) + elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found", + RelationGetRelationName(pkrel)); + + freeList(indexoidlist); + + return indexoid; } /* @@ -2834,73 +3166,6 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint, pfree(trig.tgargs); } -/* - * Record an FK constraint in pg_constraint. - */ -static Oid -createForeignKeyConstraint(Relation rel, Relation pkrel, - FkConstraint *fkconstraint) -{ - int16 *fkattr; - int16 *pkattr; - int fkcount; - int pkcount; - List *l; - int i; - - /* Convert foreign-key attr names to attr number array */ - fkcount = length(fkconstraint->fk_attrs); - fkattr = (int16 *) palloc(fkcount * sizeof(int16)); - i = 0; - foreach(l, fkconstraint->fk_attrs) - { - char *id = strVal(lfirst(l)); - AttrNumber attno; - - attno = get_attnum(RelationGetRelid(rel), id); - if (attno == InvalidAttrNumber) - elog(ERROR, "Relation \"%s\" has no column \"%s\"", - RelationGetRelationName(rel), id); - fkattr[i++] = attno; - } - - /* The same for the referenced primary key attrs */ - pkcount = length(fkconstraint->pk_attrs); - pkattr = (int16 *) palloc(pkcount * sizeof(int16)); - i = 0; - foreach(l, fkconstraint->pk_attrs) - { - char *id = strVal(lfirst(l)); - AttrNumber attno; - - attno = get_attnum(RelationGetRelid(pkrel), id); - if (attno == InvalidAttrNumber) - elog(ERROR, "Relation \"%s\" has no column \"%s\"", - RelationGetRelationName(pkrel), id); - pkattr[i++] = attno; - } - - /* Now we can make the pg_constraint entry */ - return CreateConstraintEntry(fkconstraint->constr_name, - RelationGetNamespace(rel), - CONSTRAINT_FOREIGN, - fkconstraint->deferrable, - fkconstraint->initdeferred, - RelationGetRelid(rel), - fkattr, - fkcount, - InvalidOid, /* not a domain constraint */ - RelationGetRelid(pkrel), - pkattr, - pkcount, - fkconstraint->fk_upd_action, - fkconstraint->fk_del_action, - fkconstraint->fk_matchtype, - NULL, /* no check constraint */ - NULL, - NULL); -} - /* * Create the triggers that implement an FK constraint. */ diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 663ae22d9429c0990743e8df57f98ff3ebab1b89..fe058125a04a55cd2e222a5cdcf4f9b2cf7d73f1 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Header: /cvsroot/pgsql/src/backend/parser/analyze.c,v 1.249 2002/09/18 21:35:21 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/parser/analyze.c,v 1.250 2002/09/22 00:37:09 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -117,10 +117,7 @@ static List *getSetColTypes(ParseState *pstate, Node *node); static void transformForUpdate(Query *qry, List *forUpdate); static void transformConstraintAttrs(List *constraintList); static void transformColumnType(ParseState *pstate, ColumnDef *column); -static void transformFkeyCheckAttrs(FkConstraint *fkconstraint, Oid *pktypoid); -static void transformFkeyGetPrimaryKey(FkConstraint *fkconstraint, Oid *pktypoid); static bool relationHasPrimaryKey(Oid relationOid); -static Oid transformFkeyGetColType(CreateStmtContext *cxt, char *colname); static void release_pstate_resources(ParseState *pstate); static FromExpr *makeFromExpr(List *fromlist, Node *quals); @@ -1301,189 +1298,42 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) static void transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt) { - List *fkactions = NIL; - List *fkclist; - if (cxt->fkconstraints == NIL) return; elog(NOTICE, "%s will create implicit trigger(s) for FOREIGN KEY check(s)", cxt->stmtType); - foreach(fkclist, cxt->fkconstraints) + /* + * For ALTER TABLE ADD CONSTRAINT, nothing to do. For CREATE TABLE, + * gin up an ALTER TABLE ADD CONSTRAINT command to execute after + * the basic CREATE TABLE is complete. + * + * Note: the ADD CONSTRAINT command must also execute after any index + * creation commands. Thus, this should run after + * transformIndexConstraints, so that the CREATE INDEX commands are + * already in cxt->alist. + */ + if (strcmp(cxt->stmtType, "CREATE TABLE") == 0) { - FkConstraint *fkconstraint = (FkConstraint *) lfirst(fkclist); - Oid pktypoid[INDEX_MAX_KEYS]; - Oid fktypoid[INDEX_MAX_KEYS]; - int i; - int attnum; - List *fkattrs; - - for (attnum = 0; attnum < INDEX_MAX_KEYS; attnum++) - pktypoid[attnum] = fktypoid[attnum] = InvalidOid; - - /* - * Look up the referencing attributes to make sure they exist (or - * will exist) in this table, and remember their type OIDs. - */ - attnum = 0; - foreach(fkattrs, fkconstraint->fk_attrs) - { - char *fkattr = strVal(lfirst(fkattrs)); + AlterTableStmt *alterstmt = makeNode(AlterTableStmt); + List *fkclist; - if (attnum >= INDEX_MAX_KEYS) - elog(ERROR, "Can only have %d keys in a foreign key", - INDEX_MAX_KEYS); - fktypoid[attnum++] = transformFkeyGetColType(cxt, fkattr); - } + alterstmt->subtype = 'c'; /* preprocessed add constraint */ + alterstmt->relation = cxt->relation; + alterstmt->name = NULL; + alterstmt->def = (Node *) cxt->fkconstraints; - /* - * If the attribute list for the referenced table was omitted, - * lookup the definition of the primary key. - */ - if (fkconstraint->pk_attrs == NIL) + /* Don't need to scan the table contents in this case */ + foreach(fkclist, cxt->fkconstraints) { - if (strcmp(fkconstraint->pktable->relname, cxt->relation->relname) != 0) - transformFkeyGetPrimaryKey(fkconstraint, pktypoid); - else if (cxt->pkey != NULL) - { - /* Use the to-be-created primary key */ - List *attr; + FkConstraint *fkconstraint = (FkConstraint *) lfirst(fkclist); - attnum = 0; - foreach(attr, cxt->pkey->indexParams) - { - IndexElem *ielem = lfirst(attr); - char *iname = ielem->name; - - Assert(iname); /* no func index here */ - fkconstraint->pk_attrs = lappend(fkconstraint->pk_attrs, - makeString(iname)); - if (attnum >= INDEX_MAX_KEYS) - elog(ERROR, "Can only have %d keys in a foreign key", - INDEX_MAX_KEYS); - pktypoid[attnum++] = transformFkeyGetColType(cxt, - iname); - } - } - else - { - /* In ALTER TABLE case, primary key may already exist */ - if (OidIsValid(cxt->relOid)) - transformFkeyGetPrimaryKey(fkconstraint, pktypoid); - else - elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found", - fkconstraint->pktable->relname); - } - } - else - { - /* Validate the specified referenced key list */ - if (strcmp(fkconstraint->pktable->relname, cxt->relation->relname) != 0) - transformFkeyCheckAttrs(fkconstraint, pktypoid); - else - { - /* Look for a matching new unique/primary constraint */ - List *index; - bool found = false; - - foreach(index, cxt->alist) - { - IndexStmt *ind = lfirst(index); - List *pkattrs; - - if (!ind->unique) - continue; - if (length(ind->indexParams) != - length(fkconstraint->pk_attrs)) - continue; - attnum = 0; - foreach(pkattrs, fkconstraint->pk_attrs) - { - char *pkattr = strVal(lfirst(pkattrs)); - List *indparms; - - found = false; - foreach(indparms, ind->indexParams) - { - IndexElem *indparm = lfirst(indparms); - - if (indparm->name && - strcmp(indparm->name, pkattr) == 0) - { - found = true; - break; - } - } - if (!found) - break; - if (attnum >= INDEX_MAX_KEYS) - elog(ERROR, "Can only have %d keys in a foreign key", - INDEX_MAX_KEYS); - pktypoid[attnum++] = transformFkeyGetColType(cxt, - pkattr); - } - if (found) - break; - } - if (!found) - { - /* - * In ALTER TABLE case, such an index may already - * exist - */ - if (OidIsValid(cxt->relOid)) - transformFkeyCheckAttrs(fkconstraint, pktypoid); - else - elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found", - fkconstraint->pktable->relname); - } - } - } - - /* Be sure referencing and referenced column types are comparable */ - for (i = 0; i < INDEX_MAX_KEYS && fktypoid[i] != 0; i++) - { - /* - * fktypoid[i] is the foreign key table's i'th element's type - * pktypoid[i] is the primary key table's i'th element's type - * - * We let oper() do our work for us, including elog(ERROR) if the - * types don't compare with = - */ - Operator o = oper(makeList1(makeString("=")), - fktypoid[i], pktypoid[i], false); - - ReleaseSysCache(o); - } - - /* - * For ALTER TABLE ADD CONSTRAINT, we're done. For CREATE TABLE, - * gin up an ALTER TABLE ADD CONSTRAINT command to execute after - * the basic CREATE TABLE is complete. - */ - if (strcmp(cxt->stmtType, "CREATE TABLE") == 0) - { - AlterTableStmt *alterstmt = makeNode(AlterTableStmt); - - alterstmt->subtype = 'c'; /* preprocessed add constraint */ - alterstmt->relation = cxt->relation; - alterstmt->name = NULL; - alterstmt->def = (Node *) makeList1(fkconstraint); - - /* Don't need to scan the table contents in this case */ fkconstraint->skip_validation = true; - - fkactions = lappend(fkactions, (Node *) alterstmt); } - } - /* - * Attach completed list of extra actions to cxt->alist. We cannot do - * this earlier, because we assume above that cxt->alist still holds - * only IndexStmts. - */ - cxt->alist = nconc(cxt->alist, fkactions); + cxt->alist = lappend(cxt->alist, (Node *) alterstmt); + } } /* @@ -2375,6 +2225,7 @@ static Query * transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt, List **extras_before, List **extras_after) { + Relation rel; CreateStmtContext cxt; Query *qry; @@ -2382,14 +2233,20 @@ transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt, * The only subtypes that currently require parse transformation * handling are 'A'dd column and Add 'C'onstraint. These largely * re-use code from CREATE TABLE. + * + * If we need to do any parse transformation, get exclusive lock on + * the relation to make sure it won't change before we execute the + * command. */ switch (stmt->subtype) { case 'A': + rel = heap_openrv(stmt->relation, AccessExclusiveLock); + cxt.stmtType = "ALTER TABLE"; cxt.relation = stmt->relation; cxt.inhRelations = NIL; - cxt.relOid = RangeVarGetRelid(stmt->relation, false); + cxt.relOid = RelationGetRelid(rel); cxt.hasoids = SearchSysCacheExists(ATTNUM, ObjectIdGetDatum(cxt.relOid), Int16GetDatum(ObjectIdAttributeNumber), @@ -2412,13 +2269,17 @@ transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt, ((ColumnDef *) stmt->def)->constraints = cxt.ckconstraints; *extras_before = nconc(*extras_before, cxt.blist); *extras_after = nconc(cxt.alist, *extras_after); + + heap_close(rel, NoLock); /* close rel, keep lock */ break; case 'C': + rel = heap_openrv(stmt->relation, AccessExclusiveLock); + cxt.stmtType = "ALTER TABLE"; cxt.relation = stmt->relation; cxt.inhRelations = NIL; - cxt.relOid = RangeVarGetRelid(stmt->relation, false); + cxt.relOid = RelationGetRelid(rel); cxt.hasoids = SearchSysCacheExists(ATTNUM, ObjectIdGetDatum(cxt.relOid), Int16GetDatum(ObjectIdAttributeNumber), @@ -2446,6 +2307,8 @@ transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt, stmt->def = (Node *) nconc(cxt.ckconstraints, cxt.fkconstraints); *extras_before = nconc(*extras_before, cxt.blist); *extras_after = nconc(cxt.alist, *extras_after); + + heap_close(rel, NoLock); /* close rel, keep lock */ break; case 'c': @@ -2674,174 +2537,6 @@ transformForUpdate(Query *qry, List *forUpdate) } -/* - * transformFkeyCheckAttrs - - * - * Make sure that the attributes of a referenced table - * belong to a unique (or primary key) constraint. - */ -static void -transformFkeyCheckAttrs(FkConstraint *fkconstraint, Oid *pktypoid) -{ - Relation pkrel; - List *indexoidlist, - *indexoidscan; - int i; - bool found = false; - - /* - * Open the referenced table - */ - pkrel = heap_openrv(fkconstraint->pktable, AccessShareLock); - - if (pkrel->rd_rel->relkind != RELKIND_RELATION) - elog(ERROR, "Referenced relation \"%s\" is not a table", - fkconstraint->pktable->relname); - - /* - * Get the list of index OIDs for the table from the relcache, and - * look up each one in the pg_index syscache for each unique one, and - * then compare the attributes we were given to those defined. - */ - indexoidlist = RelationGetIndexList(pkrel); - - foreach(indexoidscan, indexoidlist) - { - Oid indexoid = lfirsti(indexoidscan); - HeapTuple indexTuple; - Form_pg_index indexStruct; - - found = false; - indexTuple = SearchSysCache(INDEXRELID, - ObjectIdGetDatum(indexoid), - 0, 0, 0); - if (!HeapTupleIsValid(indexTuple)) - elog(ERROR, "transformFkeyCheckAttrs: index %u not found", - indexoid); - indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); - - if (indexStruct->indisunique) - { - for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++) - ; - if (i == length(fkconstraint->pk_attrs)) - { - /* go through the fkconstraint->pk_attrs list */ - List *attrl; - int attnum = 0; - - foreach(attrl, fkconstraint->pk_attrs) - { - char *attrname = strVal(lfirst(attrl)); - - found = false; - for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++) - { - int pkattno = indexStruct->indkey[i]; - - if (namestrcmp(attnumAttName(pkrel, pkattno), - attrname) == 0) - { - pktypoid[attnum++] = attnumTypeId(pkrel, pkattno); - found = true; - break; - } - } - if (!found) - break; - } - } - } - ReleaseSysCache(indexTuple); - if (found) - break; - } - if (!found) - elog(ERROR, "UNIQUE constraint matching given keys for referenced table \"%s\" not found", - fkconstraint->pktable->relname); - - freeList(indexoidlist); - heap_close(pkrel, AccessShareLock); -} - - -/* - * transformFkeyGetPrimaryKey - - * - * Try to find the primary key attributes of a referenced table if - * the column list in the REFERENCES specification was omitted. - */ -static void -transformFkeyGetPrimaryKey(FkConstraint *fkconstraint, Oid *pktypoid) -{ - Relation pkrel; - List *indexoidlist, - *indexoidscan; - HeapTuple indexTuple = NULL; - Form_pg_index indexStruct = NULL; - int i; - int attnum = 0; - - /* - * Open the referenced table - */ - pkrel = heap_openrv(fkconstraint->pktable, AccessShareLock); - - if (pkrel->rd_rel->relkind != RELKIND_RELATION) - elog(ERROR, "Referenced relation \"%s\" is not a table", - fkconstraint->pktable->relname); - - /* - * Get the list of index OIDs for the table from the relcache, and - * look up each one in the pg_index syscache until we find one marked - * primary key (hopefully there isn't more than one such). - */ - indexoidlist = RelationGetIndexList(pkrel); - - foreach(indexoidscan, indexoidlist) - { - Oid indexoid = lfirsti(indexoidscan); - - indexTuple = SearchSysCache(INDEXRELID, - ObjectIdGetDatum(indexoid), - 0, 0, 0); - if (!HeapTupleIsValid(indexTuple)) - elog(ERROR, "transformFkeyGetPrimaryKey: index %u not found", - indexoid); - indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); - if (indexStruct->indisprimary) - break; - ReleaseSysCache(indexTuple); - indexStruct = NULL; - } - - freeList(indexoidlist); - - /* - * Check that we found it - */ - if (indexStruct == NULL) - elog(ERROR, "PRIMARY KEY for referenced table \"%s\" not found", - fkconstraint->pktable->relname); - - /* - * Now build the list of PK attributes from the indkey definition - * using the attribute names of the PK relation descriptor - */ - for (i = 0; i < INDEX_MAX_KEYS && indexStruct->indkey[i] != 0; i++) - { - int pkattno = indexStruct->indkey[i]; - - pktypoid[attnum++] = attnumTypeId(pkrel, pkattno); - fkconstraint->pk_attrs = lappend(fkconstraint->pk_attrs, - makeString(pstrdup(NameStr(*attnumAttName(pkrel, pkattno))))); - } - - ReleaseSysCache(indexTuple); - - heap_close(pkrel, AccessShareLock); -} - /* * relationHasPrimaryKey - * @@ -2888,79 +2583,6 @@ relationHasPrimaryKey(Oid relationOid) return result; } -/* - * transformFkeyGetColType - - * - * Find a referencing column by name, and return its type OID. - * Error if it can't be found. - */ -static Oid -transformFkeyGetColType(CreateStmtContext *cxt, char *colname) -{ - List *cols; - List *inher; - Oid result; - Form_pg_attribute sysatt; - - /* First look for column among the newly-created columns */ - foreach(cols, cxt->columns) - { - ColumnDef *col = lfirst(cols); - - if (strcmp(col->colname, colname) == 0) - return typenameTypeId(col->typename); - } - /* Perhaps it's a system column name */ - sysatt = SystemAttributeByName(colname, cxt->hasoids); - if (sysatt) - return sysatt->atttypid; - /* Look for column among inherited columns (if CREATE TABLE case) */ - foreach(inher, cxt->inhRelations) - { - RangeVar *inh = lfirst(inher); - Relation rel; - int count; - - Assert(IsA(inh, RangeVar)); - rel = heap_openrv(inh, AccessShareLock); - if (rel->rd_rel->relkind != RELKIND_RELATION) - elog(ERROR, "inherited table \"%s\" is not a relation", - inh->relname); - for (count = 0; count < rel->rd_att->natts; count++) - { - Form_pg_attribute inhattr = rel->rd_att->attrs[count]; - char *inhname = NameStr(inhattr->attname); - - if (inhattr->attisdropped) - continue; - if (strcmp(inhname, colname) == 0) - { - result = inhattr->atttypid; - heap_close(rel, NoLock); - return result; - } - } - heap_close(rel, NoLock); - } - /* Look for column among existing columns (if ALTER TABLE case) */ - if (OidIsValid(cxt->relOid)) - { - HeapTuple atttuple; - - atttuple = SearchSysCacheAttName(cxt->relOid, colname); - if (HeapTupleIsValid(atttuple)) - { - result = ((Form_pg_attribute) GETSTRUCT(atttuple))->atttypid; - ReleaseSysCache(atttuple); - return result; - } - } - - elog(ERROR, "%s: column \"%s\" referenced in foreign key constraint does not exist", - cxt->stmtType, colname); - return InvalidOid; /* keep compiler quiet */ -} - /* * Preprocess a list of column constraint clauses * to attach constraint attributes to their primary constraint nodes diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h index ac7679ba07512b43da1e0b0d26eebe61914a0f55..ffe6a740ca80ba47014a4a548935c5db07c88bb0 100644 --- a/src/include/catalog/pg_constraint.h +++ b/src/include/catalog/pg_constraint.h @@ -8,7 +8,7 @@ * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: pg_constraint.h,v 1.3 2002/09/04 20:31:37 momjian Exp $ + * $Id: pg_constraint.h,v 1.4 2002/09/22 00:37:09 tgl Exp $ * * NOTES * the genbki.sh script reads this file and generates .bki @@ -159,6 +159,7 @@ extern Oid CreateConstraintEntry(const char *constraintName, char foreignUpdateType, char foreignDeleteType, char foreignMatchType, + Oid indexRelId, Node *conExpr, const char *conBin, const char *conSrc); diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out index 3cd121ad59629a46999d449176604ab533a2567f..ac4efb8e9ccd5f32889dc73f88f4607aa3db428c 100644 --- a/src/test/regress/expected/alter_table.out +++ b/src/test/regress/expected/alter_table.out @@ -322,7 +322,7 @@ ERROR: ALTER TABLE: column "c" referenced in foreign key constraint does not ex -- Try (and fail) to add constraint due to invalide destination columns explicitly given ALTER TABLE tmp3 add constraint tmpconstr foreign key(a) references tmp2(b) match full; NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s) -ERROR: UNIQUE constraint matching given keys for referenced table "tmp2" not found +ERROR: ALTER TABLE: column "b" referenced in foreign key constraint does not exist -- Try (and fail) to add constraint due to invalid data ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full; NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s) @@ -908,10 +908,10 @@ NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s) ERROR: ALTER TABLE: column "........pg.dropped.1........" referenced in foreign key constraint does not exist alter table atacc2 add foreign key (id) references atacc1(a); NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s) -ERROR: UNIQUE constraint matching given keys for referenced table "atacc1" not found +ERROR: ALTER TABLE: column "a" referenced in foreign key constraint does not exist alter table atacc2 add foreign key (id) references atacc1("........pg.dropped.1........"); NOTICE: ALTER TABLE will create implicit trigger(s) for FOREIGN KEY check(s) -ERROR: UNIQUE constraint matching given keys for referenced table "atacc1" not found +ERROR: ALTER TABLE: column "........pg.dropped.1........" referenced in foreign key constraint does not exist drop table atacc2; create index "testing_idx" on atacc1(a); ERROR: DefineIndex: attribute "a" not found diff --git a/src/test/regress/expected/foreign_key.out b/src/test/regress/expected/foreign_key.out index 6f3b5dd3df0a6fdcc8b197ff53650d9f9083e4d8..a68a406fdacd78678a22bb87cade5f4beca35c6c 100644 --- a/src/test/regress/expected/foreign_key.out +++ b/src/test/regress/expected/foreign_key.out @@ -692,7 +692,7 @@ NOTICE: CREATE TABLE will create implicit trigger(s) for FOREIGN KEY check(s) ERROR: CREATE TABLE: column "ftest2" referenced in foreign key constraint does not exist CREATE TABLE FKTABLE_FAIL2 ( ftest1 int, CONSTRAINT fkfail1 FOREIGN KEY (ftest1) REFERENCES PKTABLE(ptest2)); NOTICE: CREATE TABLE will create implicit trigger(s) for FOREIGN KEY check(s) -ERROR: UNIQUE constraint matching given keys for referenced table "pktable" not found +ERROR: CREATE TABLE: column "ptest2" referenced in foreign key constraint does not exist DROP TABLE FKTABLE_FAIL1; ERROR: table "fktable_fail1" does not exist DROP TABLE FKTABLE_FAIL2; diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out index 5eb078e0c455350970ad242d5d17d2a5015761fc..1050f71f6e48e78ea689a99c91e66d3c06ad724d 100644 --- a/src/test/regress/expected/privileges.out +++ b/src/test/regress/expected/privileges.out @@ -548,7 +548,6 @@ DROP VIEW atestv1; DROP VIEW atestv2; -- this should cascade to drop atestv4 DROP VIEW atestv3 CASCADE; -NOTICE: Drop cascades to rule _RETURN on view atestv3 NOTICE: Drop cascades to rule _RETURN on view atestv4 NOTICE: Drop cascades to view atestv4 -- this should complain "does not exist"