Extend pg_publication_tables to display column list and row filter.
authorAmit Kapila <[email protected]>
Thu, 19 May 2022 02:50:55 +0000 (08:20 +0530)
committerAmit Kapila <[email protected]>
Thu, 19 May 2022 02:50:55 +0000 (08:20 +0530)
Commit 923def9a53 and 52e4f0cd47 allowed to specify column lists and row
filters for publication tables. This commit extends the
pg_publication_tables view and pg_get_publication_tables function to
display that information.

This information will be useful to users and we also need this for the
later commit that prohibits combining multiple publications with different
column lists for the same table.

Author: Hou Zhijie
Reviewed By: Amit Kapila, Alvaro Herrera, Shi Yu, Takamichi Osumi
Discussion: https://postgr.es/m/202204251548[email protected]

doc/src/sgml/catalogs.sgml
src/backend/catalog/pg_publication.c
src/backend/catalog/system_views.sql
src/backend/replication/logical/tablesync.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/test/regress/expected/publication.out
src/test/regress/expected/rules.out

index a533a2153e2d704d2c90e3148bfc4e94bd2bd494..d96c72e5310ee3073f7aff9dd981d6b84b23e916 100644 (file)
@@ -9691,7 +9691,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
 
      <row>
       <entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
-      <entry>publications and their associated tables</entry>
+      <entry>publications and information of their associated tables</entry>
      </row>
 
      <row>
@@ -11635,8 +11635,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
   <para>
    The view <structname>pg_publication_tables</structname> provides
-   information about the mapping between publications and the tables they
-   contain.  Unlike the underlying catalog
+   information about the mapping between publications and information of
+   tables they contain.  Unlike the underlying catalog
    <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
    this view expands publications defined as <literal>FOR ALL TABLES</literal>
    and <literal>FOR ALL TABLES IN SCHEMA</literal>, so for such publications
@@ -11687,6 +11687,27 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        Name of table
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>attnames</structfield> <type>name[]</type>
+       (references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attname</structfield>)
+      </para>
+      <para>
+       Names of table columns included in the publication. This contains all
+       the columns of the table when the user didn't specify the column list
+       for the table.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>rowfilter</structfield> <type>text</type>
+      </para>
+      <para>
+       Expression for the table's publication qualifying condition
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index e2c8bcb2797d59d31a5b5dce280cb8d9480e9f18..8c7fca62de362e81f0caf7ec199a50c49842eb6d 100644 (file)
@@ -1077,11 +1077,12 @@ get_publication_name(Oid pubid, bool missing_ok)
 }
 
 /*
- * Returns Oids of tables in a publication.
+ * Returns information of tables in a publication.
  */
 Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
+#define NUM_PUBLICATOIN_TABLES_ELEM    3
    FuncCallContext *funcctx;
    char       *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    Publication *publication;
@@ -1090,6 +1091,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
    /* stuff done only on the first call of the function */
    if (SRF_IS_FIRSTCALL())
    {
+       TupleDesc   tupdesc;
        MemoryContext oldcontext;
 
        /* create a function context for cross-call persistence */
@@ -1136,6 +1138,16 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
                tables = filter_partitions(tables);
        }
 
+       /* Construct a tuple descriptor for the result rows. */
+       tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATOIN_TABLES_ELEM);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relid",
+                          OIDOID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 2, "attrs",
+                          INT2VECTOROID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 3, "qual",
+                          PG_NODE_TREEOID, -1, 0);
+
+       funcctx->tuple_desc = BlessTupleDesc(tupdesc);
        funcctx->user_fctx = (void *) tables;
 
        MemoryContextSwitchTo(oldcontext);
@@ -1147,9 +1159,47 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 
    if (funcctx->call_cntr < list_length(tables))
    {
+       HeapTuple   pubtuple = NULL;
+       HeapTuple   rettuple;
        Oid         relid = list_nth_oid(tables, funcctx->call_cntr);
+       Datum       values[NUM_PUBLICATOIN_TABLES_ELEM];
+       bool        nulls[NUM_PUBLICATOIN_TABLES_ELEM];
+
+       /*
+        * Form tuple with appropriate data.
+        */
+       MemSet(nulls, 0, sizeof(nulls));
+       MemSet(values, 0, sizeof(values));
+
+       publication = GetPublicationByName(pubname, false);
+
+       values[0] = ObjectIdGetDatum(relid);
+
+       pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
+                                      ObjectIdGetDatum(relid),
+                                      ObjectIdGetDatum(publication->oid));
+
+       if (HeapTupleIsValid(pubtuple))
+       {
+           /* Lookup the column list attribute. */
+           values[1] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+                                       Anum_pg_publication_rel_prattrs,
+                                       &(nulls[1]));
+
+           /* Null indicates no filter. */
+           values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
+                                       Anum_pg_publication_rel_prqual,
+                                       &(nulls[2]));
+       }
+       else
+       {
+           nulls[1] = true;
+           nulls[2] = true;
+       }
+
+       rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
 
-       SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+       SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
    }
 
    SRF_RETURN_DONE(funcctx);
index 0fc614e32c0cccef06456fc8a7bd790efb0c6e0c..fedaed533b910842b07765861f1864145b443f42 100644 (file)
@@ -368,7 +368,15 @@ CREATE VIEW pg_publication_tables AS
     SELECT
         P.pubname AS pubname,
         N.nspname AS schemaname,
-        C.relname AS tablename
+        C.relname AS tablename,
+        ( SELECT array_agg(a.attname ORDER BY a.attnum)
+          FROM unnest(CASE WHEN GPT.attrs IS NOT NULL THEN GPT.attrs
+                      ELSE (SELECT array_agg(g) FROM generate_series(1, C.relnatts) g)
+                      END) k
+               JOIN pg_attribute a
+                    ON (a.attrelid = GPT.relid AND a.attnum = k)
+        ) AS attnames,
+        pg_get_expr(GPT.qual, GPT.relid) AS rowfilter
     FROM pg_publication P,
          LATERAL pg_get_publication_tables(P.pubname) GPT,
          pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
index b03e0f5aac2ff75336c92e11870d36d45d2d162e..994c7a09d925c208c83bb529089655ad4d4303f0 100644 (file)
@@ -795,15 +795,12 @@ fetch_remote_table_info(char *nspname, char *relname,
        resetStringInfo(&cmd);
        appendStringInfo(&cmd,
                         "SELECT DISTINCT unnest"
-                        "  FROM pg_publication p"
-                        "  LEFT OUTER JOIN pg_publication_rel pr"
-                        "       ON (p.oid = pr.prpubid AND pr.prrelid = %u)"
-                        "  LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE,"
+                        "  FROM pg_publication p,"
                         "  LATERAL pg_get_publication_tables(p.pubname) gpt"
+                        "  LEFT OUTER JOIN unnest(gpt.attrs) ON TRUE"
                         " WHERE gpt.relid = %u"
                         "   AND p.pubname IN ( %s )",
                         lrel->remoteid,
-                        lrel->remoteid,
                         pub_names.data);
 
        pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
@@ -965,15 +962,12 @@ fetch_remote_table_info(char *nspname, char *relname,
        /* Check for row filters. */
        resetStringInfo(&cmd);
        appendStringInfo(&cmd,
-                        "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
-                        "  FROM pg_publication p"
-                        "  LEFT OUTER JOIN pg_publication_rel pr"
-                        "       ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
+                        "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
+                        "  FROM pg_publication p,"
                         "  LATERAL pg_get_publication_tables(p.pubname) gpt"
                         " WHERE gpt.relid = %u"
                         "   AND p.pubname IN ( %s )",
                         lrel->remoteid,
-                        lrel->remoteid,
                         pub_names.data);
 
        res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
index 2cdcfa667a80165c70ebb136a113103142b7409c..8eac3f8c8259bf1fba1ba065e38255896bd38636 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202205131
+#define CATALOG_VERSION_NO 202205191
 
 #endif
index babe16f00a58f55760aa153c057da47790aac7ca..87aa571a3319b0a6be896c6ed7fac0a6481ed81b 100644 (file)
   prosrc => 'pg_show_replication_origin_status' },
 
 # publications
-{ oid => '6119', descr => 'get OIDs of tables in a publication',
+{ oid => '6119', descr => 'get information of tables in a publication',
   proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
-  provolatile => 's', prorettype => 'oid', proargtypes => 'text',
-  proallargtypes => '{text,oid}', proargmodes => '{i,o}',
-  proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+  provolatile => 's', prorettype => 'record', proargtypes => 'text',
+  proallargtypes => '{text,oid,int2vector,pg_node_tree}', proargmodes => '{i,o,o,o}',
+  proargnames => '{pubname,relid,attrs,qual}', prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
   proname => 'pg_relation_is_publishable', provolatile => 's',
index 398c0f38f6c6c1b6a605c1a4d6f256da41e10cfa..274b37dfe572c95203785412de46e5f26d7fa917 100644 (file)
@@ -1585,52 +1585,52 @@ CREATE TABLE sch2.tbl1_part1 PARTITION OF sch1.tbl1 FOR VALUES FROM (1) to (10);
 -- Schema publication that does not include the schema that has the parent table
 CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
 SELECT * FROM pg_publication_tables;
- pubname | schemaname | tablename  
----------+------------+------------
- pub     | sch2       | tbl1_part1
+ pubname | schemaname | tablename  | attnames | rowfilter 
+---------+------------+------------+----------+-----------
+ pub     | sch2       | tbl1_part1 | {a}      | 
 (1 row)
 
 DROP PUBLICATION pub;
 -- Table publication that does not include the parent table
 CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
 SELECT * FROM pg_publication_tables;
- pubname | schemaname | tablename  
----------+------------+------------
- pub     | sch2       | tbl1_part1
+ pubname | schemaname | tablename  | attnames | rowfilter 
+---------+------------+------------+----------+-----------
+ pub     | sch2       | tbl1_part1 | {a}      | 
 (1 row)
 
 -- Table publication that includes both the parent table and the child table
 ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
 SELECT * FROM pg_publication_tables;
- pubname | schemaname | tablename 
----------+------------+-----------
- pub     | sch1       | tbl1
+ pubname | schemaname | tablename | attnames | rowfilter 
+---------+------------+-----------+----------+-----------
+ pub     | sch1       | tbl1      | {a}      | 
 (1 row)
 
 DROP PUBLICATION pub;
 -- Schema publication that does not include the schema that has the parent table
 CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch2 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
 SELECT * FROM pg_publication_tables;
- pubname | schemaname | tablename  
----------+------------+------------
- pub     | sch2       | tbl1_part1
+ pubname | schemaname | tablename  | attnames | rowfilter 
+---------+------------+------------+----------+-----------
+ pub     | sch2       | tbl1_part1 | {a}      | 
 (1 row)
 
 DROP PUBLICATION pub;
 -- Table publication that does not include the parent table
 CREATE PUBLICATION pub FOR TABLE sch2.tbl1_part1 WITH (PUBLISH_VIA_PARTITION_ROOT=0);
 SELECT * FROM pg_publication_tables;
- pubname | schemaname | tablename  
----------+------------+------------
- pub     | sch2       | tbl1_part1
+ pubname | schemaname | tablename  | attnames | rowfilter 
+---------+------------+------------+----------+-----------
+ pub     | sch2       | tbl1_part1 | {a}      | 
 (1 row)
 
 -- Table publication that includes both the parent table and the child table
 ALTER PUBLICATION pub ADD TABLE sch1.tbl1;
 SELECT * FROM pg_publication_tables;
- pubname | schemaname | tablename  
----------+------------+------------
- pub     | sch2       | tbl1_part1
+ pubname | schemaname | tablename  | attnames | rowfilter 
+---------+------------+------------+----------+-----------
+ pub     | sch2       | tbl1_part1 | {a}      | 
 (1 row)
 
 DROP PUBLICATION pub;
@@ -1643,9 +1643,9 @@ CREATE TABLE sch1.tbl1_part3 (a int) PARTITION BY RANGE(a);
 ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (30);
 CREATE PUBLICATION pub FOR ALL TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
 SELECT * FROM pg_publication_tables;
- pubname | schemaname | tablename 
----------+------------+-----------
- pub     | sch1       | tbl1
+ pubname | schemaname | tablename | attnames | rowfilter 
+---------+------------+-----------+----------+-----------
+ pub     | sch1       | tbl1      | {a}      | 
 (1 row)
 
 RESET client_min_messages;
index 21effe83154327f330d88ef2cb363be44f7d1981..fc3cde3226404cf8d8d18f49f43d19691ac63eb1 100644 (file)
@@ -1437,9 +1437,18 @@ pg_prepared_xacts| SELECT p.transaction,
      LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
 pg_publication_tables| SELECT p.pubname,
     n.nspname AS schemaname,
-    c.relname AS tablename
+    c.relname AS tablename,
+    ( SELECT array_agg(a.attname ORDER BY a.attnum) AS array_agg
+           FROM (unnest(
+                CASE
+                    WHEN (gpt.attrs IS NOT NULL) THEN (gpt.attrs)::integer[]
+                    ELSE ( SELECT array_agg(g.g) AS array_agg
+                       FROM generate_series(1, (c.relnatts)::integer) g(g))
+                END) k(k)
+             JOIN pg_attribute a ON (((a.attrelid = gpt.relid) AND (a.attnum = k.k))))) AS attnames,
+    pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid),
+    LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);