Refactor broken CREATE TABLE IF NOT EXISTS support.
authorRobert Haas <[email protected]>
Mon, 25 Apr 2011 20:55:11 +0000 (16:55 -0400)
committerRobert Haas <[email protected]>
Mon, 25 Apr 2011 20:55:11 +0000 (16:55 -0400)
Per bug #5988, reported by Marko Tiikkaja, and further analyzed by Tom
Lane, the previous coding was broken in several respects: even if the
target table already existed, a subsequent CREATE TABLE IF NOT EXISTS
might try to add additional constraints or sequences-for-serial
specified in the new CREATE TABLE statement.

In passing, this also fixes a minor information leak: it's no longer
possible to figure out whether a schema to which you don't have CREATE
access contains a sequence named like "x_y_seq" by attempting to create a
table in that schema called "x" with a serial column called "y".

Some more refactoring of this code in the future might be warranted,
but that will need to wait for a later major release.

src/backend/bootstrap/bootparse.y
src/backend/catalog/heap.c
src/backend/catalog/namespace.c
src/backend/catalog/toasting.c
src/backend/commands/cluster.c
src/backend/commands/tablecmds.c
src/backend/executor/execMain.c
src/backend/parser/parse_utilcmd.c
src/backend/tcop/utility.c
src/include/catalog/heap.h
src/include/catalog/namespace.h

index 75a137bbd7732692d40afc9e8c411c3508d47e12..a9d242869897d1d23fb5c349219466dfd4b422d4 100644 (file)
@@ -247,8 +247,7 @@ Boot_CreateStmt:
                                                                                                          ONCOMMIT_NOOP,
                                                                                                          (Datum) 0,
                                                                                                          false,
-                                                                                                         true,
-                                                                                                         false);
+                                                                                                         true);
                                                elog(DEBUG4, "relation created with oid %u", id);
                                        }
                                        do_end();
index 4c089677030d391f5a0e8e8ede24566e558359de..71c99318343c4c8f80b03ad237b7943b872769a8 100644 (file)
@@ -973,8 +973,7 @@ heap_create_with_catalog(const char *relname,
                                                 OnCommitAction oncommit,
                                                 Datum reloptions,
                                                 bool use_user_acl,
-                                                bool allow_system_table_mods,
-                                                bool if_not_exists)
+                                                bool allow_system_table_mods)
 {
        Relation        pg_class_desc;
        Relation        new_rel_desc;
@@ -994,26 +993,14 @@ heap_create_with_catalog(const char *relname,
        CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
 
        /*
-        * If the relation already exists, it's an error, unless the user
-        * specifies "IF NOT EXISTS".  In that case, we just print a notice and do
-        * nothing further.
+        * This would fail later on anyway, if the relation already exists.  But
+        * by catching it here we can emit a nicer error message.
         */
        existing_relid = get_relname_relid(relname, relnamespace);
        if (existing_relid != InvalidOid)
-       {
-               if (if_not_exists)
-               {
-                       ereport(NOTICE,
-                                       (errcode(ERRCODE_DUPLICATE_TABLE),
-                                        errmsg("relation \"%s\" already exists, skipping",
-                                                       relname)));
-                       heap_close(pg_class_desc, RowExclusiveLock);
-                       return InvalidOid;
-               }
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_TABLE),
                                 errmsg("relation \"%s\" already exists", relname)));
-       }
 
        /*
         * Since we are going to create a rowtype as well, also check for
index f8fd8276936a687e97655147845e4d4f9fa2f495..d803d28a0679194197cc3f8267b12559f7ec5612 100644 (file)
@@ -360,6 +360,35 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
        return namespaceId;
 }
 
+/*
+ * RangeVarGetAndCheckCreationNamespace
+ *      As RangeVarGetCreationNamespace, but with a permissions check.
+ */
+Oid
+RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation)
+{
+       Oid                     namespaceId;
+
+       namespaceId = RangeVarGetCreationNamespace(newRelation);
+
+       /*
+        * Check we have permission to create there. Skip check if bootstrapping,
+        * since permissions machinery may not be working yet.
+        */
+       if (!IsBootstrapProcessingMode())
+       {
+               AclResult       aclresult;
+
+               aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
+                                                                                 ACL_CREATE);
+               if (aclresult != ACLCHECK_OK)
+                       aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+                                                  get_namespace_name(namespaceId));
+       }
+
+       return namespaceId;
+}
+
 /*
  * RelnameGetRelid
  *             Try to resolve an unqualified relation name.
index 362d26d9d1fe73f5d2519c151963053a75c4e9fa..a8cf0dbe2f25896ad4f1663ed1bd99997f3de68e 100644 (file)
@@ -227,8 +227,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio
                                                                                   ONCOMMIT_NOOP,
                                                                                   reloptions,
                                                                                   false,
-                                                                                  true,
-                                                                                  false);
+                                                                                  true);
        Assert(toast_relid != InvalidOid);
 
        /* make the toast relation visible, else heap_open will fail */
index ff228b7d5310b958dd16e4d173a7fc6712280eaf..191ef543cd20f3dcf1633d67a3ba4425bb889438 100644 (file)
@@ -646,8 +646,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
                                                                                  ONCOMMIT_NOOP,
                                                                                  reloptions,
                                                                                  false,
-                                                                                 true,
-                                                                                 false);
+                                                                                 true);
        Assert(OIDNewHeap != InvalidOid);
 
        ReleaseSysCache(tuple);
index 7660114ec2ce41b050c7cbe2aa44fc0c99abfa1d..60eecb14976db56508b9d3c850d1879b47487d8f 100644 (file)
@@ -439,22 +439,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
                                 errmsg("cannot create temporary table within security-restricted operation")));
 
        /*
-        * Look up the namespace in which we are supposed to create the relation.
-        * Check we have permission to create there. Skip check if bootstrapping,
-        * since permissions machinery may not be working yet.
+        * Look up the namespace in which we are supposed to create the relation,
+        * and check we have permission to create there.
         */
-       namespaceId = RangeVarGetCreationNamespace(stmt->relation);
-
-       if (!IsBootstrapProcessingMode())
-       {
-               AclResult       aclresult;
-
-               aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
-                                                                                 ACL_CREATE);
-               if (aclresult != ACLCHECK_OK)
-                       aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
-                                                  get_namespace_name(namespaceId));
-       }
+       namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation);
 
        /*
         * Select tablespace to use.  If not specified, use default tablespace
@@ -602,16 +590,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
                                                                                  stmt->oncommit,
                                                                                  reloptions,
                                                                                  true,
-                                                                                 allowSystemTableMods,
-                                                                                 stmt->if_not_exists);
-
-       /*
-        * If heap_create_with_catalog returns InvalidOid, it means that the user
-        * specified "IF NOT EXISTS" and the relation already exists.  In that
-        * case we do nothing further.
-        */
-       if (relationId == InvalidOid)
-               return InvalidOid;
+                                                                                 allowSystemTableMods);
 
        /* Store inheritance information for new rel. */
        StoreCatalogInheritance(relationId, inheritOids);
index 86ec9870198af543e8d45b90e92357c908d2f22c..620efda838421a94a9835a7e8db1d8fa4b50e1ea 100644 (file)
@@ -2341,7 +2341,6 @@ OpenIntoRel(QueryDesc *queryDesc)
        Oid                     namespaceId;
        Oid                     tablespaceId;
        Datum           reloptions;
-       AclResult       aclresult;
        Oid                     intoRelationId;
        TupleDesc       tupdesc;
        DR_intorel *myState;
@@ -2378,13 +2377,7 @@ OpenIntoRel(QueryDesc *queryDesc)
         * Find namespace to create in, check its permissions
         */
        intoName = into->rel->relname;
-       namespaceId = RangeVarGetCreationNamespace(into->rel);
-
-       aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
-                                                                         ACL_CREATE);
-       if (aclresult != ACLCHECK_OK)
-               aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
-                                          get_namespace_name(namespaceId));
+       namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel);
 
        /*
         * Select tablespace to use.  If not specified, use default tablespace
@@ -2444,8 +2437,7 @@ OpenIntoRel(QueryDesc *queryDesc)
                                                                                          into->onCommit,
                                                                                          reloptions,
                                                                                          true,
-                                                                                         allowSystemTableMods,
-                                                                                         false);
+                                                                                         allowSystemTableMods);
        Assert(intoRelationId != InvalidOid);
 
        FreeTupleDesc(tupdesc);
index 0078814905d12ed8638af2d39d18274dba3edbba..5588cfac0bd54a8a8d45ec49569060fa5570a359 100644 (file)
@@ -148,6 +148,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
        List       *result;
        List       *save_alist;
        ListCell   *elements;
+       Oid                     namespaceid;
 
        /*
         * We must not scribble on the passed-in CreateStmt, so copy it.  (This is
@@ -155,6 +156,33 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
         */
        stmt = (CreateStmt *) copyObject(stmt);
 
+       /*
+        * Look up the creation namespace.  This also checks permissions on the
+        * target namespace, so that we throw any permissions error as early as
+        * possible.
+        */
+       namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
+
+       /*
+        * If the relation already exists and the user specified "IF NOT EXISTS",
+        * bail out with a NOTICE.
+        */
+       if (stmt->if_not_exists)
+       {
+               Oid             existing_relid;
+
+               existing_relid = get_relname_relid(stmt->relation->relname,
+                                                                                  namespaceid);
+               if (existing_relid != InvalidOid)
+               {
+                       ereport(NOTICE,
+                                       (errcode(ERRCODE_DUPLICATE_TABLE),
+                                        errmsg("relation \"%s\" already exists, skipping",
+                                                stmt->relation->relname)));
+                       return NIL;
+               }
+       }
+
        /*
         * If the target relation name isn't schema-qualified, make it so.  This
         * prevents some corner cases in which added-on rewritten commands might
@@ -164,11 +192,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
         */
        if (stmt->relation->schemaname == NULL
                && stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
-       {
-               Oid                     namespaceid = RangeVarGetCreationNamespace(stmt->relation);
-
                stmt->relation->schemaname = get_namespace_name(namespaceid);
-       }
 
        /* Set up pstate and CreateStmtContext */
        pstate = make_parsestate(NULL);
index b1fd5ec04fe6a6506cb3632b3e01e1686e8d6dc1..0559998c71f78fcb763fd6f55ecf9d9a61d7d913 100644 (file)
@@ -529,13 +529,6 @@ standard_ProcessUtility(Node *parsetree,
                                                                                                RELKIND_RELATION,
                                                                                                InvalidOid);
 
-                                               /*
-                                                * If "IF NOT EXISTS" was specified and the relation
-                                                * already exists, do nothing further.
-                                                */
-                                               if (relOid == InvalidOid)
-                                                       continue;
-
                                                /*
                                                 * Let AlterTableCreateToastTable decide if this one
                                                 * needs a secondary relation too.
@@ -559,15 +552,8 @@ standard_ProcessUtility(Node *parsetree,
                                                relOid = DefineRelation((CreateStmt *) stmt,
                                                                                                RELKIND_FOREIGN_TABLE,
                                                                                                InvalidOid);
-
-                                               /*
-                                                * Unless "IF NOT EXISTS" was specified and the
-                                                * relation already exists, create the
-                                                * pg_foreign_table entry.
-                                                */
-                                               if (relOid != InvalidOid)
-                                                       CreateForeignTable((CreateForeignTableStmt *) stmt,
-                                                                                          relOid);
+                                               CreateForeignTable((CreateForeignTableStmt *) stmt,
+                                                                                  relOid);
                                        }
                                        else
                                        {
index 463aff0358fcc7366bf34c7222c2ec9b164f9c97..c95e91303b8873633a9460e9b85ce24df21b45b1 100644 (file)
@@ -63,8 +63,7 @@ extern Oid heap_create_with_catalog(const char *relname,
                                                 OnCommitAction oncommit,
                                                 Datum reloptions,
                                                 bool use_user_acl,
-                                                bool allow_system_table_mods,
-                                                bool if_not_exists);
+                                                bool allow_system_table_mods);
 
 extern void heap_drop_with_catalog(Oid relid);
 
index f59beee80dd77d46df81a5dfe3d20e01cd0a24b0..53600969ad7e29ef5875ff3c1feb3edd59e2c822 100644 (file)
@@ -49,6 +49,7 @@ typedef struct OverrideSearchPath
 
 extern Oid     RangeVarGetRelid(const RangeVar *relation, bool failOK);
 extern Oid     RangeVarGetCreationNamespace(const RangeVar *newRelation);
+extern Oid     RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
 extern Oid     RelnameGetRelid(const char *relname);
 extern bool RelationIsVisible(Oid relid);