Fix assorted bugs related to identity column in partitioned tables
authorPeter Eisentraut <[email protected]>
Tue, 7 May 2024 20:42:32 +0000 (22:42 +0200)
committerPeter Eisentraut <[email protected]>
Tue, 7 May 2024 20:50:00 +0000 (22:50 +0200)
When changing the data type of a column of a partitioned table, craft
the ALTER SEQUENCE command only once.  Partitions do not have identity
sequences of their own and thus do not need a ALTER SEQUENCE command
for each partition.

Fix getIdentitySequence() to fetch the identity sequence associated
with the top-level partitioned table when a Relation of a partition is
passed to it.  While doing so, translate the attribute number of the
partition into the attribute number of the partitioned table.

Author: Ashutosh Bapat <[email protected]>
Reported-by: Alexander Lakhin <[email protected]>
Reviewed-by: Dmitry Dolgov <[email protected]>
Discussion: https://www.postgresql.org/message-id/3b8a9dc1-bbc7-0ef5-6863-c432afac7d59@gmail.com

src/backend/catalog/pg_depend.c
src/backend/commands/tablecmds.c
src/backend/parser/parse_utilcmd.c
src/backend/rewrite/rewriteHandler.c
src/include/catalog/dependency.h
src/test/regress/expected/identity.out
src/test/regress/sql/identity.sql

index f85a898de8d893039b07010180c55e887b1f1878..5366f7820c12ef1c6928b98ffaa249389dcb3b2b 100644 (file)
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_depend.h"
 #include "catalog/pg_extension.h"
+#include "catalog/partition.h"
 #include "commands/extension.h"
 #include "miscadmin.h"
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 #include "utils/rel.h"
 
 
@@ -941,10 +943,35 @@ getOwnedSequences(Oid relid)
  * Get owned identity sequence, error if not exactly one.
  */
 Oid
-getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok)
+getIdentitySequence(Relation rel, AttrNumber attnum, bool missing_ok)
 {
-   List       *seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
+   Oid         relid;
+   List       *seqlist;
 
+   /*
+    * The identity sequence is associated with the topmost partitioned table,
+    * which might have column order different than the given partition.
+    */
+   if (RelationGetForm(rel)->relispartition)
+   {
+       List       *ancestors =
+           get_partition_ancestors(RelationGetRelid(rel));
+       HeapTuple   ctup = SearchSysCacheAttNum(RelationGetRelid(rel), attnum);
+       const char *attname = NameStr(((Form_pg_attribute) GETSTRUCT(ctup))->attname);
+       HeapTuple   ptup;
+
+       relid = llast_oid(ancestors);
+       ptup = SearchSysCacheAttName(relid, attname);
+       attnum = ((Form_pg_attribute) GETSTRUCT(ptup))->attnum;
+
+       ReleaseSysCache(ctup);
+       ReleaseSysCache(ptup);
+       list_free(ancestors);
+   }
+   else
+       relid = RelationGetRelid(rel);
+
+   seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
    if (list_length(seqlist) > 1)
        elog(ERROR, "more than one owned sequence found");
    else if (seqlist == NIL)
index a79ac884f7cbcfcaa9a90c144790aa70977c48d2..5bf5e69c5b8a8750f352f07a1e9c2247997ac94d 100644 (file)
@@ -8535,7 +8535,7 @@ ATExecDropIdentity(Relation rel, const char *colName, bool missing_ok, LOCKMODE
    if (!recursing)
    {
        /* drop the internal sequence */
-       seqid = getIdentitySequence(RelationGetRelid(rel), attnum, false);
+       seqid = getIdentitySequence(rel, attnum, false);
        deleteDependencyRecordsForClass(RelationRelationId, seqid,
                                        RelationRelationId, DEPENDENCY_INTERNAL);
        CommandCounterIncrement();
index 9fb6ff86db55c69b89255be995158bcbec5b1d26..6520bf9baa550c585fc30102c364adfb76bec494 100644 (file)
@@ -1136,7 +1136,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
             * find sequence owned by old column; extract sequence parameters;
             * build new create sequence command
             */
-           seq_relid = getIdentitySequence(RelationGetRelid(relation), attribute->attnum, false);
+           seq_relid = getIdentitySequence(relation, attribute->attnum, false);
            seq_options = sequence_options(seq_relid);
            generateSerialExtraStmts(cxt, def,
                                     InvalidOid, seq_options,
@@ -3716,28 +3716,36 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
 
                    /*
                     * For identity column, create ALTER SEQUENCE command to
-                    * change the data type of the sequence.
+                    * change the data type of the sequence. Identity sequence
+                    * is associated with the top level partitioned table.
+                    * Hence ignore partitions.
                     */
-                   attnum = get_attnum(relid, cmd->name);
-                   if (attnum == InvalidAttrNumber)
-                       ereport(ERROR,
-                               (errcode(ERRCODE_UNDEFINED_COLUMN),
-                                errmsg("column \"%s\" of relation \"%s\" does not exist",
-                                       cmd->name, RelationGetRelationName(rel))));
-
-                   if (attnum > 0 &&
-                       TupleDescAttr(tupdesc, attnum - 1)->attidentity)
+                   if (!RelationGetForm(rel)->relispartition)
                    {
-                       Oid         seq_relid = getIdentitySequence(relid, attnum, false);
-                       Oid         typeOid = typenameTypeId(pstate, def->typeName);
-                       AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
-
-                       altseqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
-                                                           get_rel_name(seq_relid),
-                                                           -1);
-                       altseqstmt->options = list_make1(makeDefElem("as", (Node *) makeTypeNameFromOid(typeOid, -1), -1));
-                       altseqstmt->for_identity = true;
-                       cxt.blist = lappend(cxt.blist, altseqstmt);
+                       attnum = get_attnum(relid, cmd->name);
+                       if (attnum == InvalidAttrNumber)
+                           ereport(ERROR,
+                                   (errcode(ERRCODE_UNDEFINED_COLUMN),
+                                    errmsg("column \"%s\" of relation \"%s\" does not exist",
+                                           cmd->name, RelationGetRelationName(rel))));
+
+                       if (attnum > 0 &&
+                           TupleDescAttr(tupdesc, attnum - 1)->attidentity)
+                       {
+                           Oid         seq_relid = getIdentitySequence(rel, attnum, false);
+                           Oid         typeOid = typenameTypeId(pstate, def->typeName);
+                           AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
+
+                           altseqstmt->sequence
+                               = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
+                                              get_rel_name(seq_relid),
+                                              -1);
+                           altseqstmt->options = list_make1(makeDefElem("as",
+                                                                        (Node *) makeTypeNameFromOid(typeOid, -1),
+                                                                        -1));
+                           altseqstmt->for_identity = true;
+                           cxt.blist = lappend(cxt.blist, altseqstmt);
+                       }
                    }
 
                    newcmds = lappend(newcmds, cmd);
@@ -3803,7 +3811,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
                                 errmsg("column \"%s\" of relation \"%s\" does not exist",
                                        cmd->name, RelationGetRelationName(rel))));
 
-                   seq_relid = getIdentitySequence(relid, attnum, true);
+                   seq_relid = getIdentitySequence(rel, attnum, true);
 
                    if (seq_relid)
                    {
index 9fd05b15e7342132172db9f018fafaa654de6503..8a29fbbc46525e63a46ec14c7baef311a108fc18 100644 (file)
@@ -24,7 +24,6 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/dependency.h"
-#include "catalog/partition.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "foreign/fdwapi.h"
@@ -1233,24 +1232,8 @@ build_column_default(Relation rel, int attrno)
    if (att_tup->attidentity)
    {
        NextValueExpr *nve = makeNode(NextValueExpr);
-       Oid         reloid;
 
-       /*
-        * The identity sequence is associated with the topmost partitioned
-        * table.
-        */
-       if (rel->rd_rel->relispartition)
-       {
-           List       *ancestors =
-               get_partition_ancestors(RelationGetRelid(rel));
-
-           reloid = llast_oid(ancestors);
-           list_free(ancestors);
-       }
-       else
-           reloid = RelationGetRelid(rel);
-
-       nve->seqid = getIdentitySequence(reloid, attrno, false);
+       nve->seqid = getIdentitySequence(rel, attrno, false);
        nve->typeId = att_tup->atttypid;
 
        return (Node *) nve;
index e0dcc0b069e9a7ae84d63bc46ef8e6d684e56f89..7eee66f8106a15599af7ab5b9c1b9f3ea0bec5a8 100644 (file)
@@ -177,7 +177,7 @@ extern List *getAutoExtensionsOfObject(Oid classId, Oid objectId);
 
 extern bool sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId);
 extern List *getOwnedSequences(Oid relid);
-extern Oid getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok);
+extern Oid getIdentitySequence(Relation rel, AttrNumber attnum, bool missing_ok);
 
 extern Oid get_index_constraint(Oid indexId);
 
index f357b9b63b10c24378a2ba7056dda96922b3e94a..31ad041055f7d26887618c61723f2625772d5979 100644 (file)
@@ -619,14 +619,17 @@ CREATE TABLE pitest1_p1 PARTITION OF pitest1 FOR VALUES FROM ('2016-07-01') TO (
 INSERT into pitest1(f1, f2) VALUES ('2016-07-2', 'from pitest1');
 INSERT into pitest1_p1 (f1, f2) VALUES ('2016-07-3', 'from pitest1_p1');
 -- attached partition
-CREATE TABLE pitest1_p2 (f1 date NOT NULL, f2 text, f3 bigint);
-INSERT INTO pitest1_p2 VALUES ('2016-08-2', 'before attaching', 100);
+CREATE TABLE pitest1_p2 (f3 bigint, f2 text, f1 date NOT NULL);
+INSERT INTO pitest1_p2 (f1, f2, f3) VALUES ('2016-08-2', 'before attaching', 100);
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01'); -- requires NOT NULL constraint
 ERROR:  column "f3" in child table must be marked NOT NULL
 ALTER TABLE pitest1_p2 ALTER COLUMN f3 SET NOT NULL;
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01');
 INSERT INTO pitest1_p2 (f1, f2) VALUES ('2016-08-3', 'from pitest1_p2');
 INSERT INTO pitest1 (f1, f2) VALUES ('2016-08-4', 'from pitest1');
+-- LIKE INCLUDING on partition
+CREATE TABLE pitest1_p1_like (LIKE pitest1_p1 INCLUDING IDENTITY);
+INSERT into pitest1_p1_like(f1, f2) VALUES ('2016-07-2', 'from pitest1_p1_like');
 SELECT tableoid::regclass, f1, f2, f3 FROM pitest1;
   tableoid  |     f1     |        f2        | f3  
 ------------+------------+------------------+-----
@@ -637,6 +640,31 @@ SELECT tableoid::regclass, f1, f2, f3 FROM pitest1;
  pitest1_p2 | 08-04-2016 | from pitest1     |   4
 (5 rows)
 
+SELECT tableoid::regclass, f1, f2, f3 FROM pitest1_p1_like;
+    tableoid     |     f1     |          f2          | f3 
+-----------------+------------+----------------------+----
+ pitest1_p1_like | 07-02-2016 | from pitest1_p1_like |  1
+(1 row)
+
+ALTER TABLE pitest1 ALTER COLUMN f3 SET DATA TYPE bigint;
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1;
+  tableoid  |     f1     |        f2        | f3  | pg_typeof 
+------------+------------+------------------+-----+-----------
+ pitest1_p1 | 07-02-2016 | from pitest1     |   1 | bigint
+ pitest1_p1 | 07-03-2016 | from pitest1_p1  |   2 | bigint
+ pitest1_p2 | 08-02-2016 | before attaching | 100 | bigint
+ pitest1_p2 | 08-03-2016 | from pitest1_p2  |   3 | bigint
+ pitest1_p2 | 08-04-2016 | from pitest1     |   4 | bigint
+(5 rows)
+
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1_p2;
+  tableoid  |     f1     |        f2        | f3  | pg_typeof 
+------------+------------+------------------+-----+-----------
+ pitest1_p2 | 08-02-2016 | before attaching | 100 | bigint
+ pitest1_p2 | 08-03-2016 | from pitest1_p2  |   3 | bigint
+ pitest1_p2 | 08-04-2016 | from pitest1     |   4 | bigint
+(3 rows)
+
 -- add identity column
 CREATE TABLE pitest2 (f1 date NOT NULL, f2 text) PARTITION BY RANGE (f1);
 CREATE TABLE pitest2_p1 PARTITION OF pitest2 FOR VALUES FROM ('2016-07-01') TO ('2016-08-01');
index 7b0800226ca0ac3066cd067232e9bdaa64bead5f..27ca708ff196b36024ad6c39fcf3aaa14cf311e0 100644 (file)
@@ -358,14 +358,21 @@ CREATE TABLE pitest1_p1 PARTITION OF pitest1 FOR VALUES FROM ('2016-07-01') TO (
 INSERT into pitest1(f1, f2) VALUES ('2016-07-2', 'from pitest1');
 INSERT into pitest1_p1 (f1, f2) VALUES ('2016-07-3', 'from pitest1_p1');
 -- attached partition
-CREATE TABLE pitest1_p2 (f1 date NOT NULL, f2 text, f3 bigint);
-INSERT INTO pitest1_p2 VALUES ('2016-08-2', 'before attaching', 100);
+CREATE TABLE pitest1_p2 (f3 bigint, f2 text, f1 date NOT NULL);
+INSERT INTO pitest1_p2 (f1, f2, f3) VALUES ('2016-08-2', 'before attaching', 100);
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01'); -- requires NOT NULL constraint
 ALTER TABLE pitest1_p2 ALTER COLUMN f3 SET NOT NULL;
 ALTER TABLE pitest1 ATTACH PARTITION pitest1_p2 FOR VALUES FROM ('2016-08-01') TO ('2016-09-01');
 INSERT INTO pitest1_p2 (f1, f2) VALUES ('2016-08-3', 'from pitest1_p2');
 INSERT INTO pitest1 (f1, f2) VALUES ('2016-08-4', 'from pitest1');
+-- LIKE INCLUDING on partition
+CREATE TABLE pitest1_p1_like (LIKE pitest1_p1 INCLUDING IDENTITY);
+INSERT into pitest1_p1_like(f1, f2) VALUES ('2016-07-2', 'from pitest1_p1_like');
 SELECT tableoid::regclass, f1, f2, f3 FROM pitest1;
+SELECT tableoid::regclass, f1, f2, f3 FROM pitest1_p1_like;
+ALTER TABLE pitest1 ALTER COLUMN f3 SET DATA TYPE bigint;
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1;
+SELECT tableoid::regclass, f1, f2, f3, pg_typeof(f3) FROM pitest1_p2;
 
 -- add identity column
 CREATE TABLE pitest2 (f1 date NOT NULL, f2 text) PARTITION BY RANGE (f1);