Avoid repeated name lookups during table and index DDL.
authorRobert Haas <[email protected]>
Mon, 17 Feb 2014 14:33:31 +0000 (09:33 -0500)
committerRobert Haas <[email protected]>
Mon, 17 Feb 2014 14:33:38 +0000 (09:33 -0500)
If the name lookups come to different conclusions due to concurrent
activity, we might perform some parts of the DDL on a different table
than other parts.  At least in the case of CREATE INDEX, this can be
used to cause the permissions checks to be performed against a
different table than the index creation, allowing for a privilege
escalation attack.

This changes the calling convention for DefineIndex, CreateTrigger,
transformIndexStmt, transformAlterTableStmt, CheckIndexCompatible
(in 9.2 and newer), and AlterTable (in 9.1 and older).  In addition,
CheckRelationOwnership is removed in 9.2 and newer and the calling
convention is changed in older branches.  A field has also been added
to the Constraint node (FkConstraint in 8.4).  Third-party code calling
these functions or using the Constraint node will require updating.

Report by Andres Freund.  Patch by Robert Haas and Andres Freund,
reviewed by Tom Lane.

Security: CVE-2014-0062

20 files changed:
src/backend/bootstrap/bootparse.y
src/backend/catalog/index.c
src/backend/catalog/pg_constraint.c
src/backend/commands/alter.c
src/backend/commands/indexcmds.c
src/backend/commands/tablecmds.c
src/backend/commands/trigger.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/outfuncs.c
src/backend/parser/parse_utilcmd.c
src/backend/tcop/utility.c
src/include/catalog/index.h
src/include/catalog/pg_constraint.h
src/include/commands/defrem.h
src/include/commands/tablecmds.h
src/include/commands/trigger.h
src/include/nodes/parsenodes.h
src/include/parser/parse_utilcmd.h
src/include/tcop/utility.h

index 1510614d5ade983a36085cd4e41e63b0d6863e04..953b7a498220c9eb117868911c9720242cb99023 100644 (file)
@@ -27,6 +27,7 @@
 #include "bootstrap/bootstrap.h"
 #include "catalog/catalog.h"
 #include "catalog/heap.h"
+#include "catalog/namespace.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_attribute.h"
 #include "catalog/pg_authid.h"
@@ -257,9 +258,14 @@ Boot_InsertStmt:
 Boot_DeclareIndexStmt:
          XDECLARE INDEX boot_ident oidspec ON boot_ident USING boot_ident LPAREN boot_index_params RPAREN
                {
+                   Oid     relationId;
+
                    do_start();
 
-                   DefineIndex(makeRangeVar(NULL, LexIDStr($6), -1),
+                   relationId = RangeVarGetRelid(makeRangeVar(NULL, LexIDStr($6), -1),
+                                                 false);
+
+                   DefineIndex(relationId,
                                LexIDStr($3),
                                $4,
                                LexIDStr($8),
@@ -275,9 +281,14 @@ Boot_DeclareIndexStmt:
 Boot_DeclareUniqueIndexStmt:
          XDECLARE UNIQUE INDEX boot_ident oidspec ON boot_ident USING boot_ident LPAREN boot_index_params RPAREN
                {
+                   Oid     relationId;
+
                    do_start();
 
-                   DefineIndex(makeRangeVar(NULL, LexIDStr($7), -1),
+                   relationId = RangeVarGetRelid(makeRangeVar(NULL, LexIDStr($7), -1),
+                                                 false);
+
+                   DefineIndex(relationId,
                                LexIDStr($4),
                                $5,
                                LexIDStr($9),
index 108d859c0dcce8429c09427415af8b15274b55dc..4d7918d5a039e5072a40d2a281a6ff7a9b8498c3 100644 (file)
@@ -96,7 +96,6 @@ static void validate_index_heapscan(Relation heapRelation,
                        IndexInfo *indexInfo,
                        Snapshot snapshot,
                        v_i_state *state);
-static Oid IndexGetRelation(Oid indexId);
 
 
 /*
@@ -2319,7 +2318,7 @@ index_set_state_flags(Oid indexId, IndexStateFlagsAction action)
  * IndexGetRelation: given an index's relation OID, get the OID of the
  * relation it is an index on. Uses the system cache.
  */
-static Oid
+Oid
 IndexGetRelation(Oid indexId)
 {
    HeapTuple   tuple;
index 9e13872fa839b734ff1cf2387ebb7039aecf8dd5..73b9519b31a98cc6ff08789de2e61028bfa9ab26 100644 (file)
@@ -699,3 +699,22 @@ AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
 
    heap_close(conRel, RowExclusiveLock);
 }
+
+/*
+ * get_constraint_relation_oids
+ *     Find the IDs of the relations to which a constraint refers.
+ */
+void
+get_constraint_relation_oids(Oid constraint_oid, Oid *conrelid, Oid *confrelid)
+{
+   HeapTuple   tup;
+   Form_pg_constraint  con;
+
+   tup = SearchSysCache(CONSTROID, ObjectIdGetDatum(constraint_oid), 0, 0, 0);
+   if (!HeapTupleIsValid(tup)) /* should not happen */
+       elog(ERROR, "cache lookup failed for constraint %u", constraint_oid);
+   con = (Form_pg_constraint) GETSTRUCT(tup);
+   *conrelid = con->conrelid;
+   *confrelid = con->confrelid;
+   ReleaseSysCache(tup);
+}
index b91f2205d1ceed111ce97b43e0cd1729dee53113..ccd5c4a7029b0c872c16dc0594c4131dbe1fd97b 100644 (file)
@@ -91,9 +91,8 @@ ExecRenameStmt(RenameStmt *stmt)
            {
                Oid         relid;
 
-               CheckRelationOwnership(stmt->relation, true);
-
                relid = RangeVarGetRelid(stmt->relation, false);
+               CheckRelationOwnership(relid, true);
 
                switch (stmt->renameType)
                {
@@ -186,7 +185,6 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
        case OBJECT_SEQUENCE:
        case OBJECT_TABLE:
        case OBJECT_VIEW:
-           CheckRelationOwnership(stmt->relation, true);
            AlterTableNamespace(stmt->relation, stmt->newschema,
                                stmt->objectType);
            break;
index 7bd0ccbdc27045c503431b67021f02a1555e4afc..b781d40fcec54be4bb56adfc74f6d34c64281be2 100644 (file)
@@ -72,7 +72,8 @@ static bool relationHasPrimaryKey(Relation rel);
  * DefineIndex
  *     Creates a new index.
  *
- * 'heapRelation': the relation the index will apply to.
+ * 'relationId': the OID of the heap relation on which the index is to be
+ *     created
  * 'indexRelationName': the name for the new index, or NULL to indicate
  *     that a nonconflicting default name should be picked.
  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
@@ -97,7 +98,7 @@ static bool relationHasPrimaryKey(Relation rel);
  * 'concurrent': avoid blocking writers to the table while building.
  */
 void
-DefineIndex(RangeVar *heapRelation,
+DefineIndex(Oid relationId,
            char *indexRelationName,
            Oid indexRelationId,
            char *accessMethodName,
@@ -116,7 +117,6 @@ DefineIndex(RangeVar *heapRelation,
 {
    Oid        *classObjectId;
    Oid         accessMethodId;
-   Oid         relationId;
    Oid         namespaceId;
    Oid         tablespaceId;
    Relation    rel;
@@ -135,6 +135,7 @@ DefineIndex(RangeVar *heapRelation,
    int         n_old_snapshots;
    LockRelId   heaprelid;
    LOCKTAG     heaplocktag;
+   LOCKMODE    lockmode;
    Snapshot    snapshot;
    int         i;
 
@@ -153,14 +154,18 @@ DefineIndex(RangeVar *heapRelation,
                        INDEX_MAX_KEYS)));
 
    /*
-    * Open heap relation, acquire a suitable lock on it, remember its OID
-    *
     * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
     * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
     * (but not VACUUM).
+    *
+    * NB: Caller is responsible for making sure that relationId refers
+    * to the relation on which the index should be built; except in bootstrap
+    * mode, this will typically require the caller to have already locked
+    * the relation.  To avoid lock upgrade hazards, that lock should be at
+    * least as strong as the one we take here.
     */
-   rel = heap_openrv(heapRelation,
-                     (concurrent ? ShareUpdateExclusiveLock : ShareLock));
+   lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
+   rel = heap_open(relationId, lockmode);
 
    relationId = RelationGetRelid(rel);
    namespaceId = RelationGetNamespace(rel);
@@ -171,7 +176,7 @@ DefineIndex(RangeVar *heapRelation,
        ereport(ERROR,
                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                 errmsg("\"%s\" is not a table",
-                       heapRelation->relname)));
+                       RelationGetRelationName(rel))));
 
    /*
     * Don't try to CREATE INDEX on temp tables of other backends.
@@ -537,7 +542,7 @@ DefineIndex(RangeVar *heapRelation,
     */
 
    /* Open and lock the parent heap relation */
-   rel = heap_openrv(heapRelation, ShareUpdateExclusiveLock);
+   rel = heap_open(relationId, ShareUpdateExclusiveLock);
 
    /* And the target index relation */
    indexRelation = index_open(indexRelationId, RowExclusiveLock);
index 541c75ab962f8c371afac675b8a3f0bd82572b42..1edf6c75533a2ef344e5263b0846de6990915109 100644 (file)
@@ -66,6 +66,7 @@
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
+#include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -249,8 +250,8 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
 static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
 static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
                             Relation rel, Relation pkrel, Oid constraintOid);
-static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
-                        Oid constraintOid);
+static void createForeignKeyTriggers(Relation rel, Oid refRelOid,
+                        FkConstraint *fkconstraint, Oid constraintOid);
 static void ATController(Relation rel, List *cmds, bool recurse);
 static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
          bool recurse, bool recursing);
@@ -308,7 +309,8 @@ static void ATPrepAlterColumnType(List **wqueue,
 static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
                      const char *colName, TypeName *typename);
 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
-static void ATPostAlterTypeParse(char *cmd, List **wqueue);
+static void ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd,
+                    List **wqueue);
 static void change_owner_fix_column_acls(Oid relationOid,
                             Oid oldOwnerId, Oid newOwnerId);
 static void change_owner_recurse_to_sequences(Oid relationOid,
@@ -2159,9 +2161,12 @@ CheckTableNotInUse(Relation rel, const char *stmt)
  * the whole operation; we don't have to do anything special to clean up.
  */
 void
-AlterTable(AlterTableStmt *stmt)
+AlterTable(Oid relid, AlterTableStmt *stmt)
 {
-   Relation    rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+   Relation    rel;
+
+   /* Caller is required to provide an adequate lock. */
+   rel = relation_open(relid, NoLock);
 
    CheckTableNotInUse(rel, "ALTER TABLE");
 
@@ -4291,7 +4296,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
 
    /* The IndexStmt has already been through transformIndexStmt */
 
-   DefineIndex(stmt->relation, /* relation */
+   DefineIndex(RelationGetRelid(rel), /* relation */
                stmt->idxname,  /* index name */
                InvalidOid,     /* no predefined OID */
                stmt->accessMethod,     /* am name */
@@ -4540,7 +4545,10 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
     * table; trying to start with a lesser lock will just create a risk of
     * deadlock.)
     */
-   pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
+   if (OidIsValid(fkconstraint->old_pktable_oid))
+       pkrel = heap_open(fkconstraint->old_pktable_oid, AccessExclusiveLock);
+   else
+       pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
 
    /*
     * Validity checks (permission checks wait till we have the column
@@ -4776,7 +4784,8 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
    /*
     * Create the triggers that will enforce the constraint.
     */
-   createForeignKeyTriggers(rel, fkconstraint, constrOid);
+   createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
+                            constrOid);
 
    /*
     * Tell Phase 3 to check that the constraint is satisfied by existing rows
@@ -5148,14 +5157,14 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint,
 }
 
 static void
-CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
+CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, FkConstraint *fkconstraint,
                     Oid constraintOid, bool on_insert)
 {
    CreateTrigStmt *fk_trigger;
 
    fk_trigger = makeNode(CreateTrigStmt);
    fk_trigger->trigname = fkconstraint->constr_name;
-   fk_trigger->relation = myRel;
+   fk_trigger->relation = NULL;
    fk_trigger->before = false;
    fk_trigger->row = true;
 
@@ -5174,10 +5183,11 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
    fk_trigger->isconstraint = true;
    fk_trigger->deferrable = fkconstraint->deferrable;
    fk_trigger->initdeferred = fkconstraint->initdeferred;
-   fk_trigger->constrrel = fkconstraint->pktable;
+   fk_trigger->constrrel = NULL;
    fk_trigger->args = NIL;
 
-   (void) CreateTrigger(fk_trigger, constraintOid, false);
+   (void) CreateTrigger(fk_trigger, myRelOid, refRelOid,
+                        constraintOid, false);
 
    /* Make changes-so-far visible */
    CommandCounterIncrement();
@@ -5187,18 +5197,13 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint,
  * Create the triggers that implement an FK constraint.
  */
 static void
-createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
-                        Oid constraintOid)
+createForeignKeyTriggers(Relation rel, Oid refRelOid,
+                        FkConstraint *fkconstraint, Oid constraintOid)
 {
-   RangeVar   *myRel;
+   Oid         myRelOid;
    CreateTrigStmt *fk_trigger;
 
-   /*
-    * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
-    */
-   myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
-                        pstrdup(RelationGetRelationName(rel)),
-                        -1);
+   myRelOid = RelationGetRelid(rel);
 
    /* Make changes-so-far visible */
    CommandCounterIncrement();
@@ -5209,12 +5214,12 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
     */
    fk_trigger = makeNode(CreateTrigStmt);
    fk_trigger->trigname = fkconstraint->constr_name;
-   fk_trigger->relation = fkconstraint->pktable;
+   fk_trigger->relation = NULL;
    fk_trigger->before = false;
    fk_trigger->row = true;
    fk_trigger->events = TRIGGER_TYPE_DELETE;
    fk_trigger->isconstraint = true;
-   fk_trigger->constrrel = myRel;
+   fk_trigger->constrrel = NULL;
    switch (fkconstraint->fk_del_action)
    {
        case FKCONSTR_ACTION_NOACTION:
@@ -5249,7 +5254,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
    }
    fk_trigger->args = NIL;
 
-   (void) CreateTrigger(fk_trigger, constraintOid, false);
+   (void) CreateTrigger(fk_trigger, refRelOid, myRelOid,
+                        constraintOid, false);
 
    /* Make changes-so-far visible */
    CommandCounterIncrement();
@@ -5260,12 +5266,12 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
     */
    fk_trigger = makeNode(CreateTrigStmt);
    fk_trigger->trigname = fkconstraint->constr_name;
-   fk_trigger->relation = fkconstraint->pktable;
+   fk_trigger->relation = NULL;
    fk_trigger->before = false;
    fk_trigger->row = true;
    fk_trigger->events = TRIGGER_TYPE_UPDATE;
    fk_trigger->isconstraint = true;
-   fk_trigger->constrrel = myRel;
+   fk_trigger->constrrel = NULL;
    switch (fkconstraint->fk_upd_action)
    {
        case FKCONSTR_ACTION_NOACTION:
@@ -5300,7 +5306,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
    }
    fk_trigger->args = NIL;
 
-   (void) CreateTrigger(fk_trigger, constraintOid, false);
+   (void) CreateTrigger(fk_trigger, refRelOid, myRelOid,
+                        constraintOid, false);
 
    /* Make changes-so-far visible */
    CommandCounterIncrement();
@@ -5320,8 +5327,10 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
     * and the use of self-referential FKs is rare enough, that we live with
     * it for now.  There will be a real fix in PG 9.2.
     */
-   CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, true);
-   CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, false);
+   CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint,
+                        constraintOid, true);
+   CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint,
+                        constraintOid, false);
 }
 
 /*
@@ -6000,7 +6009,8 @@ static void
 ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
 {
    ObjectAddress obj;
-   ListCell   *l;
+   ListCell   *def_item;
+   ListCell   *oid_item;
 
    /*
     * Re-parse the index and constraint definitions, and attach them to the
@@ -6009,11 +6019,36 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
     * lock on the table the constraint is attached to, and we need to get
     * that before dropping.  It's safe because the parser won't actually look
     * at the catalogs to detect the existing entry.
-    */
-   foreach(l, tab->changedIndexDefs)
-       ATPostAlterTypeParse((char *) lfirst(l), wqueue);
-   foreach(l, tab->changedConstraintDefs)
-       ATPostAlterTypeParse((char *) lfirst(l), wqueue);
+    *
+    * We can't rely on the output of deparsing to tell us which relation
+    * to operate on, because concurrent activity might have made the name
+    * resolve differently.  Instead, we've got to use the OID of the
+    * constraint or index we're processing to figure out which relation
+    * to operate on.
+    */
+   forboth(oid_item, tab->changedConstraintOids,
+           def_item, tab->changedConstraintDefs)
+   {
+       Oid     oldId = lfirst_oid(oid_item);
+       Oid     relid;
+       Oid     confrelid;
+
+       get_constraint_relation_oids(oldId, &relid, &confrelid);
+       ATPostAlterTypeParse(relid, confrelid,
+                            (char *) lfirst(def_item),
+                            wqueue);
+   }
+   forboth(oid_item, tab->changedIndexOids,
+           def_item, tab->changedIndexDefs)
+   {
+       Oid     oldId = lfirst_oid(oid_item);
+       Oid     relid;
+
+       relid = IndexGetRelation(oldId);
+       ATPostAlterTypeParse(relid, InvalidOid,
+                            (char *) lfirst(def_item),
+                            wqueue);
+   }
 
    /*
     * Now we can drop the existing constraints and indexes --- constraints
@@ -6023,18 +6058,18 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
     * should be okay to use DROP_RESTRICT here, since nothing else should be
     * depending on these objects.
     */
-   foreach(l, tab->changedConstraintOids)
+   foreach(oid_item, tab->changedConstraintOids)
    {
        obj.classId = ConstraintRelationId;
-       obj.objectId = lfirst_oid(l);
+       obj.objectId = lfirst_oid(oid_item);
        obj.objectSubId = 0;
        performDeletion(&obj, DROP_RESTRICT);
    }
 
-   foreach(l, tab->changedIndexOids)
+   foreach(oid_item, tab->changedIndexOids)
    {
        obj.classId = RelationRelationId;
-       obj.objectId = lfirst_oid(l);
+       obj.objectId = lfirst_oid(oid_item);
        obj.objectSubId = 0;
        performDeletion(&obj, DROP_RESTRICT);
    }
@@ -6046,11 +6081,12 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
 }
 
 static void
-ATPostAlterTypeParse(char *cmd, List **wqueue)
+ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd, List **wqueue)
 {
    List       *raw_parsetree_list;
    List       *querytree_list;
    ListCell   *list_item;
+   Relation    rel;
 
    /*
     * We expect that we will get only ALTER TABLE and CREATE INDEX
@@ -6066,16 +6102,21 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
 
        if (IsA(stmt, IndexStmt))
            querytree_list = lappend(querytree_list,
-                                    transformIndexStmt((IndexStmt *) stmt,
+                                    transformIndexStmt(oldRelId,
+                                                       (IndexStmt *) stmt,
                                                        cmd));
        else if (IsA(stmt, AlterTableStmt))
            querytree_list = list_concat(querytree_list,
-                            transformAlterTableStmt((AlterTableStmt *) stmt,
+                            transformAlterTableStmt(oldRelId,
+                                                    (AlterTableStmt *) stmt,
                                                     cmd));
        else
            querytree_list = lappend(querytree_list, stmt);
    }
 
+   /* Caller should already have acquired whatever lock we need. */
+   rel = relation_open(oldRelId, NoLock);
+
    /*
     * Attach each generated command to the proper place in the work queue.
     * Note this could result in creation of entirely new work-queue entries.
@@ -6087,7 +6128,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
    foreach(list_item, querytree_list)
    {
        Node       *stm = (Node *) lfirst(list_item);
-       Relation    rel;
        AlteredTableInfo *tab;
 
        switch (nodeTag(stm))
@@ -6097,14 +6137,12 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
                    IndexStmt  *stmt = (IndexStmt *) stm;
                    AlterTableCmd *newcmd;
 
-                   rel = relation_openrv(stmt->relation, AccessExclusiveLock);
                    tab = ATGetQueueEntry(wqueue, rel);
                    newcmd = makeNode(AlterTableCmd);
                    newcmd->subtype = AT_ReAddIndex;
                    newcmd->def = (Node *) stmt;
                    tab->subcmds[AT_PASS_OLD_INDEX] =
                        lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
-                   relation_close(rel, NoLock);
                    break;
                }
            case T_AlterTableStmt:
@@ -6112,7 +6150,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
                    AlterTableStmt *stmt = (AlterTableStmt *) stm;
                    ListCell   *lcmd;
 
-                   rel = relation_openrv(stmt->relation, AccessExclusiveLock);
                    tab = ATGetQueueEntry(wqueue, rel);
                    foreach(lcmd, stmt->cmds)
                    {
@@ -6135,7 +6172,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
                                     (int) cmd->subtype);
                        }
                    }
-                   relation_close(rel, NoLock);
                    break;
                }
            default:
@@ -6143,8 +6179,9 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
                     (int) nodeTag(stm));
        }
    }
-}
 
+   relation_close(rel, NoLock);
+}
 
 /*
  * ALTER TABLE OWNER
@@ -7542,7 +7579,8 @@ ATExecDropInherit(Relation rel, RangeVar *parent)
 /*
  * Execute ALTER TABLE SET SCHEMA
  *
- * Note: caller must have checked ownership of the relation already
+ * WARNING WARNING WARNING: In previous *minor* releases the caller was
+ * responsible for checking ownership of the relation, but now we do it here.
  */
 void
 AlterTableNamespace(RangeVar *relation, const char *newschema,
@@ -7557,6 +7595,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
    rel = relation_openrv(relation, AccessExclusiveLock);
 
    relid = RelationGetRelid(rel);
+   CheckRelationOwnership(relid, true);
    oldNspOid = RelationGetNamespace(rel);
 
    /* Check relation type against type specified in the ALTER command */
index 89ff89a6ba8060a946973b160471eb9951e2d692..a042588a1afca5d9f61b173987fa42ed6c05b903 100644 (file)
@@ -69,6 +69,13 @@ static void AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event,
 /*
  * Create a trigger.  Returns the OID of the created trigger.
  *
+ * relOid, if nonzero, is the relation on which the trigger should be
+ * created.  If zero, the name provided in the statement will be looked up.
+ *
+ * refRelOid, if nonzero, is the relation to which the constraint trigger
+ * refers.  If zero, the constraint relation name provided in the statement
+ * will be looked up as needed.
+ *
  * constraintOid, if nonzero, says that this trigger is being created
  * internally to implement that constraint.  A suitable pg_depend entry will
  * be made to link the trigger to that constraint. constraintOid is zero when
@@ -83,7 +90,8 @@ static void AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event,
  * but a foreign-key constraint.  This is a kluge for backwards compatibility.
  */
 Oid
-CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions)
+CreateTrigger(CreateTrigStmt *stmt, Oid relOid, Oid refRelOid,
+             Oid constraintOid, bool checkPermissions)
 {
    int16       tgtype;
    int2vector *tgattr;
@@ -107,7 +115,10 @@ CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions)
    ObjectAddress myself,
                referenced;
 
-   rel = heap_openrv(stmt->relation, AccessExclusiveLock);
+   if (OidIsValid(relOid))
+       rel = heap_open(relOid, AccessExclusiveLock);
+   else
+       rel = heap_openrv(stmt->relation, AccessExclusiveLock);
 
    if (rel->rd_rel->relkind != RELKIND_RELATION)
        ereport(ERROR,
@@ -121,8 +132,13 @@ CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions)
                 errmsg("permission denied: \"%s\" is a system catalog",
                        RelationGetRelationName(rel))));
 
-   if (stmt->isconstraint && stmt->constrrel != NULL)
-       constrrelid = RangeVarGetRelid(stmt->constrrel, false);
+   if (stmt->isconstraint)
+   {
+       if (OidIsValid(refRelOid))
+           constrrelid = refRelOid;
+       else if (stmt->constrrel != NULL)
+           constrrelid = RangeVarGetRelid(stmt->constrrel, false);
+   }
 
    /* permission checks */
    if (checkPermissions)
@@ -261,7 +277,7 @@ CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions)
            ereport(ERROR,
                    (errcode(ERRCODE_DUPLICATE_OBJECT),
                  errmsg("trigger \"%s\" for relation \"%s\" already exists",
-                        trigname, stmt->relation->relname)));
+                        trigname, RelationGetRelationName(rel))));
    }
    systable_endscan(tgscan);
 
index d999a25f865c89d84559e766f4bf13fa8995ce7b..fdb30f77e950cabc854f085c933357bf1084b659 100644 (file)
@@ -1756,6 +1756,7 @@ _copyFkConstraint(FkConstraint *from)
    COPY_SCALAR_FIELD(deferrable);
    COPY_SCALAR_FIELD(initdeferred);
    COPY_SCALAR_FIELD(skip_validation);
+   COPY_SCALAR_FIELD(old_pktable_oid);
 
    return newnode;
 }
index 5d1a19f7a61463847184a87544c8bade93e73a36..445a59f731d9f48ebf1ac4d51faedb9e6579b1ea 100644 (file)
@@ -2199,6 +2199,7 @@ _equalFkConstraint(FkConstraint *a, FkConstraint *b)
    COMPARE_SCALAR_FIELD(deferrable);
    COMPARE_SCALAR_FIELD(initdeferred);
    COMPARE_SCALAR_FIELD(skip_validation);
+   COMPARE_SCALAR_FIELD(old_pktable_oid);
 
    return true;
 }
index f3a8e76e47612bada6c25e160740337b55114b67..c8c4409fb95fa7a412af61b02d7488cd55573103 100644 (file)
@@ -2345,6 +2345,7 @@ _outFkConstraint(StringInfo str, FkConstraint *node)
    WRITE_BOOL_FIELD(deferrable);
    WRITE_BOOL_FIELD(initdeferred);
    WRITE_BOOL_FIELD(skip_validation);
+   WRITE_OID_FIELD(old_pktable_oid);
 }
 
 
index 0dda5bc27cd084a6ddc3e6020d0cd883fb495123..71281f0e2fd985c390dd4bca8a14b053ac3e8298 100644 (file)
@@ -1356,14 +1356,18 @@ transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt,
  * a predicate expression. There are several code paths that create indexes
  * without bothering to call this, because they know they don't have any
  * such expressions to deal with.
+ *
+ * To avoid race conditions, it's important that this function rely only on
+ * the passed-in relid (and not on stmt->relation) to determine the target
+ * relation.
  */
 IndexStmt *
-transformIndexStmt(IndexStmt *stmt, const char *queryString)
+transformIndexStmt(Oid relid, IndexStmt *stmt, const char *queryString)
 {
-   Relation    rel;
    ParseState *pstate;
    RangeTblEntry *rte;
    ListCell   *l;
+   Relation    rel;
 
    /*
     * We must not scribble on the passed-in IndexStmt, so copy it.  (This is
@@ -1371,25 +1375,17 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString)
     */
    stmt = (IndexStmt *) copyObject(stmt);
 
-   /*
-    * Open the parent table with appropriate locking.  We must do this
-    * because addRangeTableEntry() would acquire only AccessShareLock,
-    * leaving DefineIndex() needing to do a lock upgrade with consequent risk
-    * of deadlock.  Make sure this stays in sync with the type of lock
-    * DefineIndex() wants.
-    */
-   rel = heap_openrv(stmt->relation,
-                 (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
-
    /* Set up pstate */
    pstate = make_parsestate(NULL);
    pstate->p_sourcetext = queryString;
 
    /*
     * Put the parent table into the rtable so that the expressions can refer
-    * to its fields without qualification.
+    * to its fields without qualification.  Caller is responsible for locking
+    * relation, but we still need to open it.
     */
-   rte = addRangeTableEntry(pstate, stmt->relation, NULL, false, true);
+   rel = relation_open(relid, NoLock);
+   rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, true);
 
    /* no to join list, yes to namespaces */
    addRTEtoQuery(pstate, rte, false, true, true);
@@ -1431,7 +1427,7 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString)
 
    free_parsestate(pstate);
 
-   /* Close relation, but keep the lock */
+   /* Close relation */
    heap_close(rel, NoLock);
 
    return stmt;
@@ -1722,9 +1718,14 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString,
  * Returns a List of utility commands to be done in sequence.  One of these
  * will be the transformed AlterTableStmt, but there may be additional actions
  * to be done before and after the actual AlterTable() call.
+ *
+ * To avoid race conditions, it's important that this function rely only on
+ * the passed-in relid (and not on stmt->relation) to determine the target
+ * relation.
  */
 List *
-transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
+transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
+                       const char *queryString)
 {
    Relation    rel;
    ParseState *pstate;
@@ -1743,14 +1744,8 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
     */
    stmt = (AlterTableStmt *) copyObject(stmt);
 
-   /*
-    * Acquire exclusive lock on the target relation, which will be held until
-    * end of transaction.  This ensures any decisions we make here based on
-    * the state of the relation will still be good at execution. We must get
-    * exclusive lock now because execution will; taking a lower grade lock
-    * now and trying to upgrade later risks deadlock.
-    */
-   rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+   /* Caller is responsible for locking the relation */
+   rel = relation_open(relid, NoLock);
 
    /* Set up pstate */
    pstate = make_parsestate(NULL);
@@ -1866,7 +1861,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
        Assert(IsA(idxstmt, IndexStmt));
        newcmd = makeNode(AlterTableCmd);
        newcmd->subtype = AT_AddIndex;
-       newcmd->def = (Node *) transformIndexStmt((IndexStmt *) idxstmt,
+       newcmd->def = (Node *) transformIndexStmt(relid, (IndexStmt *) idxstmt,
                                                  queryString);
        newcmds = lappend(newcmds, newcmd);
    }
@@ -1888,7 +1883,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
        newcmds = lappend(newcmds, newcmd);
    }
 
-   /* Close rel but keep lock */
+   /* Close rel */
    relation_close(rel, NoLock);
 
    /*
index bd91162cec38143ea9e27fd8f20dab88af00d3f9..ef1792a30ac25405b459a0cfeb77851c83fe8f38 100644 (file)
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteRemove.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/guc.h"
 #include "utils/syscache.h"
+#include "utils/lsyscache.h"
 
 
 /*
  * except when allowSystemTableMods is true.
  */
 void
-CheckRelationOwnership(RangeVar *rel, bool noCatalogs)
+CheckRelationOwnership(Oid relOid, bool noCatalogs)
 {
-   Oid         relOid;
    HeapTuple   tuple;
 
-   relOid = RangeVarGetRelid(rel, false);
    tuple = SearchSysCache(RELOID,
                           ObjectIdGetDatum(relOid),
                           0, 0, 0);
@@ -79,7 +79,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs)
 
    if (!pg_class_ownercheck(relOid, GetUserId()))
        aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
-                      rel->relname);
+                      get_rel_name(relOid));
 
    if (noCatalogs)
    {
@@ -88,7 +88,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs)
            ereport(ERROR,
                    (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                     errmsg("permission denied: \"%s\" is a system catalog",
-                           rel->relname)));
+                           get_rel_name(relOid))));
    }
 
    ReleaseSysCache(tuple);
@@ -638,9 +638,21 @@ ProcessUtility(Node *parsetree,
            {
                List       *stmts;
                ListCell   *l;
+               AlterTableStmt *atstmt = (AlterTableStmt *) parsetree;
+               Oid         relid;
+
+               /*
+                * Look up the relation OID just once, right here at the
+                * beginning, so that we don't end up repeating the name
+                * lookup later and latching onto a different relation
+                * partway through.
+                */
+               relid = RangeVarGetRelid(atstmt->relation, false);
+               LockRelationOid(relid, AccessExclusiveLock);
 
                /* Run parse analysis ... */
-               stmts = transformAlterTableStmt((AlterTableStmt *) parsetree,
+               stmts = transformAlterTableStmt(relid,
+                                               atstmt,
                                                queryString);
 
                /* ... and do it */
@@ -651,7 +663,7 @@ ProcessUtility(Node *parsetree,
                    if (IsA(stmt, AlterTableStmt))
                    {
                        /* Do the table alteration proper */
-                       AlterTable((AlterTableStmt *) stmt);
+                       AlterTable(relid, (AlterTableStmt *) stmt);
                    }
                    else
                    {
@@ -795,18 +807,33 @@ ProcessUtility(Node *parsetree,
        case T_IndexStmt:       /* CREATE INDEX */
            {
                IndexStmt  *stmt = (IndexStmt *) parsetree;
+               Oid         relid;
+               LOCKMODE    lockmode;
 
                if (stmt->concurrent)
                    PreventTransactionChain(isTopLevel,
                                            "CREATE INDEX CONCURRENTLY");
 
-               CheckRelationOwnership(stmt->relation, true);
+               /*
+                * Look up the relation OID just once, right here at the
+                * beginning, so that we don't end up repeating the name
+                * lookup later and latching onto a different relation
+                * partway through.  To avoid lock upgrade hazards, it's
+                * important that we take the strongest lock that will
+                * eventually be needed here, so the lockmode calculation
+                * needs to match what DefineIndex() does.
+                */
+               lockmode = stmt->concurrent ? ShareUpdateExclusiveLock
+                   : ShareLock;
+               relid = RangeVarGetRelid(stmt->relation, false);
+               LockRelationOid(relid, lockmode);
+               CheckRelationOwnership(relid, true);
 
                /* Run parse analysis ... */
-               stmt = transformIndexStmt(stmt, queryString);
+               stmt = transformIndexStmt(relid, stmt, queryString);
 
                /* ... and do it */
-               DefineIndex(stmt->relation,     /* relation */
+               DefineIndex(relid,      /* relation */
                            stmt->idxname,      /* index name */
                            InvalidOid, /* no predefined OID */
                            stmt->accessMethod, /* am name */
@@ -954,7 +981,8 @@ ProcessUtility(Node *parsetree,
            break;
 
        case T_CreateTrigStmt:
-           CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, true);
+           CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, InvalidOid,
+                         InvalidOid, true);
            break;
 
        case T_DropPropertyStmt:
index a2c3fc04b32cb3fdc21a102eeb05ac425d150f69..f441352ece17995b10fc860e40627af6585cacb6 100644 (file)
@@ -82,4 +82,6 @@ extern void index_set_state_flags(Oid indexId, IndexStateFlagsAction action);
 extern void reindex_index(Oid indexId);
 extern bool reindex_relation(Oid relid, bool toast_too);
 
+extern Oid IndexGetRelation(Oid indexId);
+
 #endif   /* INDEX_H */
index edfa864dc2b481884d501d3a07476d2dfde3cdc6..69979cef21b814ccb15a56a7ab5f45771e2ccbee 100644 (file)
@@ -215,5 +215,6 @@ extern char *ChooseConstraintName(const char *name1, const char *name2,
 
 extern void AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
                          Oid newNspId, bool isType);
+extern void get_constraint_relation_oids(Oid constraint_oid, Oid *conrelid, Oid *confrelid);
 
 #endif   /* PG_CONSTRAINT_H */
index b326dc2a4070da7d1b660bee93c169f3e1854466..9b7936d1d90a8c03da663d6ba275848ae6fc83c6 100644 (file)
@@ -18,7 +18,7 @@
 
 
 /* commands/indexcmds.c */
-extern void DefineIndex(RangeVar *heapRelation,
+extern void DefineIndex(Oid relationId,
            char *indexRelationName,
            Oid indexRelationId,
            char *accessMethodName,
index 706947e097a012b064ffa16196b47487f4a9fb02..2d934db2c9f0a519c5455affcbd9607adffc87f2 100644 (file)
@@ -22,7 +22,7 @@ extern Oid    DefineRelation(CreateStmt *stmt, char relkind);
 
 extern void RemoveRelations(DropStmt *drop);
 
-extern void AlterTable(AlterTableStmt *stmt);
+extern void AlterTable(Oid relid, AlterTableStmt *stmt);
 
 extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing);
 
index d24ba792b00a5bd60964c5942e1231da621c9cd1..5c68565b4ff845d78e9a7f03afb9fa35acf3cdea 100644 (file)
@@ -104,8 +104,8 @@ extern PGDLLIMPORT int SessionReplicationRole;
 #define TRIGGER_FIRES_ON_REPLICA           'R'
 #define TRIGGER_DISABLED                   'D'
 
-extern Oid CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid,
-             bool checkPermissions);
+extern Oid CreateTrigger(CreateTrigStmt *stmt, Oid relOid, Oid refRelOid,
+             Oid constraintOid, bool checkPermissions);
 
 extern void DropTrigger(Oid relid, const char *trigname,
            DropBehavior behavior, bool missing_ok);
index f23c9c54ae5c5950e09766fb8dc9ca36af4f00f3..9d31bdbb27dab2a896c749d094e852ce2bdef1b4 100644 (file)
@@ -1430,6 +1430,7 @@ typedef struct FkConstraint
    bool        deferrable;     /* DEFERRABLE */
    bool        initdeferred;   /* INITIALLY DEFERRED */
    bool        skip_validation;    /* skip validation of existing rows? */
+   Oid         old_pktable_oid; /* pg_constraint.confrelid of my former self */
 } FkConstraint;
 
 
index 089c907c0e65d537f836b59221a70f867fd72f8e..ec9e6d5623c96ffe581300dccf5ff0a6690a6d38 100644 (file)
 
 
 extern List *transformCreateStmt(CreateStmt *stmt, const char *queryString);
-extern List *transformAlterTableStmt(AlterTableStmt *stmt,
+extern List *transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
                        const char *queryString);
-extern IndexStmt *transformIndexStmt(IndexStmt *stmt, const char *queryString);
+extern IndexStmt *transformIndexStmt(Oid relid, IndexStmt *stmt,
+                  const char *queryString);
 extern void transformRuleStmt(RuleStmt *stmt, const char *queryString,
                  List **actions, Node **whereClause);
 extern List *transformCreateSchemaStmt(CreateSchemaStmt *stmt);
index d609b0ac4e95975b8f220af1ba47bde3c8327b33..7a12dd8084b71afb10c9bc8a64e91893695d3a36 100644 (file)
@@ -31,6 +31,6 @@ extern LogStmtLevel GetCommandLogLevel(Node *parsetree);
 
 extern bool CommandIsReadOnly(Node *parsetree);
 
-extern void CheckRelationOwnership(RangeVar *rel, bool noCatalogs);
+extern void CheckRelationOwnership(Oid relOid, bool noCatalogs);
 
 #endif   /* UTILITY_H */