Display the leader apply worker's PID for parallel apply workers.
authorAmit Kapila <[email protected]>
Wed, 18 Jan 2023 03:33:12 +0000 (09:03 +0530)
committerAmit Kapila <[email protected]>
Wed, 18 Jan 2023 03:33:12 +0000 (09:03 +0530)
Add leader_pid to pg_stat_subscription. leader_pid is the process ID of
the leader apply worker if this process is a parallel apply worker. If
this field is NULL, it indicates that the process is a leader apply
worker or a synchronization worker. The new column makes it easier to
distinguish parallel apply workers from other kinds of workers and helps
to identify the leader for the parallel workers corresponding to a
particular subscription.

Additionally, update the leader_pid column in pg_stat_activity as well to
display the PID of the leader apply worker for parallel apply workers.

Author: Hou Zhijie
Reviewed-by: Peter Smith, Sawada Masahiko, Amit Kapila, Shveta Mallik
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com

doc/src/sgml/logical-replication.sgml
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/logical/applyparallelworker.c
src/backend/replication/logical/launcher.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/logicallauncher.h
src/include/replication/worker_internal.h
src/test/regress/expected/rules.out

index 54f48be87f3707f8a7a70844a4d9af5d46d8a64b..f4b4e641bebc971f9613d4a16129848c33ef7040 100644 (file)
@@ -1692,7 +1692,8 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
    subscription.  A disabled subscription or a crashed subscription will have
    zero rows in this view.  If the initial data synchronization of any
    table is in progress, there will be additional workers for the tables
-   being synchronized.
+   being synchronized. Moreover, if the streaming transaction is applied in
+   parallel, there may be additional parallel apply workers.
   </para>
  </sect1>
 
index 358d2ff90f81e9860d7ac879c04b41ffca6a486b..e3a783abd0fee6bd552c9e17db6ca4b7c0f8c8b6 100644 (file)
@@ -743,9 +743,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
        <structfield>leader_pid</structfield> <type>integer</type>
       </para>
       <para>
-       Process ID of the parallel group leader, if this process is a
-       parallel query worker.  <literal>NULL</literal> if this process is a
-       parallel group leader or does not participate in parallel query.
+       Process ID of the parallel group leader if this process is a parallel
+       query worker, or process ID of the leader apply worker if this process
+       is a parallel apply worker.  <literal>NULL</literal> indicates that this
+       process is a parallel group leader or leader apply worker, or does not
+       participate in any parallel operation.
       </para></entry>
      </row>
 
@@ -3206,13 +3208,24 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>leader_pid</structfield> <type>integer</type>
+      </para>
+      <para>
+       Process ID of the leader apply worker if this process is a parallel
+       apply worker; NULL if this process is a leader apply worker or a
+       synchronization worker
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>relid</structfield> <type>oid</type>
       </para>
       <para>
-       OID of the relation that the worker is synchronizing; null for the
-       main apply worker
+       OID of the relation that the worker is synchronizing; NULL for the
+       leader apply worker and parallel apply workers
       </para></entry>
      </row>
 
@@ -3222,7 +3235,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para>
       <para>
        Last write-ahead log location received, the initial value of
-       this field being 0
+       this field being 0; NULL for parallel apply workers
       </para></entry>
      </row>
 
@@ -3231,7 +3244,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        <structfield>last_msg_send_time</structfield> <type>timestamp with time zone</type>
       </para>
       <para>
-       Send time of last message received from origin WAL sender
+       Send time of last message received from origin WAL sender; NULL for
+       parallel apply workers
       </para></entry>
      </row>
 
@@ -3240,7 +3254,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        <structfield>last_msg_receipt_time</structfield> <type>timestamp with time zone</type>
       </para>
       <para>
-       Receipt time of last message received from origin WAL sender
+       Receipt time of last message received from origin WAL sender; NULL for
+       parallel apply workers
       </para></entry>
      </row>
 
@@ -3249,7 +3264,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
        <structfield>latest_end_lsn</structfield> <type>pg_lsn</type>
       </para>
       <para>
-       Last write-ahead log location reported to origin WAL sender
+       Last write-ahead log location reported to origin WAL sender; NULL for
+       parallel apply workers
       </para></entry>
      </row>
 
@@ -3259,7 +3275,7 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </para>
       <para>
        Time of last write-ahead log location reported to origin WAL
-       sender
+       sender; NULL for parallel apply workers
       </para></entry>
      </row>
     </tbody>
index d2a8c82900e238b42265e63c062c57c9045729b9..8608e3fa5b19805bfddce082013eb563d1219a2c 100644 (file)
@@ -948,6 +948,7 @@ CREATE VIEW pg_stat_subscription AS
             su.oid AS subid,
             su.subname,
             st.pid,
+            st.leader_pid,
             st.relid,
             st.received_lsn,
             st.last_msg_send_time,
index 3dfcff2798fc7f30ef08d9ccf02a199cdfc29fa8..3579e704fe5e8c81056d4331d8b99b1a6f8534a2 100644 (file)
@@ -849,7 +849,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 static void
 pa_shutdown(int code, Datum arg)
 {
-   SendProcSignal(MyLogicalRepWorker->apply_leader_pid,
+   SendProcSignal(MyLogicalRepWorker->leader_pid,
                   PROCSIG_PARALLEL_APPLY_MESSAGE,
                   InvalidBackendId);
 
@@ -932,7 +932,7 @@ ParallelApplyWorkerMain(Datum main_arg)
    error_mqh = shm_mq_attach(mq, seg, NULL);
 
    pq_redirect_to_shm_mq(seg, error_mqh);
-   pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
+   pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
                           InvalidBackendId);
 
    MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
@@ -950,7 +950,7 @@ ParallelApplyWorkerMain(Datum main_arg)
     * The parallel apply worker doesn't need to monopolize this replication
     * origin which was already acquired by its leader process.
     */
-   replorigin_session_setup(originid, MyLogicalRepWorker->apply_leader_pid);
+   replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
    replorigin_session_origin = originid;
    CommitTransactionCommand();
 
index afb7acddaa61f1a0ae7bc6c84a9d1fd4564f707c..27e58566ceca850a6481aa334b0abed249d6dd47 100644 (file)
@@ -410,7 +410,7 @@ retry:
    worker->relstate = SUBREL_STATE_UNKNOWN;
    worker->relstate_lsn = InvalidXLogRecPtr;
    worker->stream_fileset = NULL;
-   worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
+   worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
    worker->parallel_apply = is_parallel_apply_worker;
    worker->last_lsn = InvalidXLogRecPtr;
    TIMESTAMP_NOBEGIN(worker->last_send_time);
@@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
    worker->userid = InvalidOid;
    worker->subid = InvalidOid;
    worker->relid = InvalidOid;
-   worker->apply_leader_pid = InvalidPid;
+   worker->leader_pid = InvalidPid;
    worker->parallel_apply = false;
 }
 
@@ -1066,13 +1066,41 @@ IsLogicalLauncher(void)
    return LogicalRepCtx->launcher_pid == MyProcPid;
 }
 
+/*
+ * Return the pid of the leader apply worker if the given pid is the pid of a
+ * parallel apply worker, otherwise, return InvalidPid.
+ */
+pid_t
+GetLeaderApplyWorkerPid(pid_t pid)
+{
+   int         leader_pid = InvalidPid;
+   int         i;
+
+   LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+   for (i = 0; i < max_logical_replication_workers; i++)
+   {
+       LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+       if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
+       {
+           leader_pid = w->leader_pid;
+           break;
+       }
+   }
+
+   LWLockRelease(LogicalRepWorkerLock);
+
+   return leader_pid;
+}
+
 /*
  * Returns state of the subscriptions.
  */
 Datum
 pg_stat_get_subscription(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_COLS  8
+#define PG_STAT_GET_SUBSCRIPTION_COLS  9
    Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
    int         i;
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
        if (OidIsValid(subid) && worker.subid != subid)
            continue;
 
-       /* Skip if this is a parallel apply worker */
-       if (isParallelApplyWorker(&worker))
-           continue;
-
        worker_pid = worker.proc->pid;
 
        values[0] = ObjectIdGetDatum(worker.subid);
@@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
        else
            nulls[1] = true;
        values[2] = Int32GetDatum(worker_pid);
-       if (XLogRecPtrIsInvalid(worker.last_lsn))
+
+       if (isParallelApplyWorker(&worker))
+           values[3] = Int32GetDatum(worker.leader_pid);
+       else
            nulls[3] = true;
+
+       if (XLogRecPtrIsInvalid(worker.last_lsn))
+           nulls[4] = true;
        else
-           values[3] = LSNGetDatum(worker.last_lsn);
+           values[4] = LSNGetDatum(worker.last_lsn);
        if (worker.last_send_time == 0)
-           nulls[4] = true;
+           nulls[5] = true;
        else
-           values[4] = TimestampTzGetDatum(worker.last_send_time);
+           values[5] = TimestampTzGetDatum(worker.last_send_time);
        if (worker.last_recv_time == 0)
-           nulls[5] = true;
+           nulls[6] = true;
        else
-           values[5] = TimestampTzGetDatum(worker.last_recv_time);
+           values[6] = TimestampTzGetDatum(worker.last_recv_time);
        if (XLogRecPtrIsInvalid(worker.reply_lsn))
-           nulls[6] = true;
+           nulls[7] = true;
        else
-           values[6] = LSNGetDatum(worker.reply_lsn);
+           values[7] = LSNGetDatum(worker.reply_lsn);
        if (worker.reply_time == 0)
-           nulls[7] = true;
+           nulls[8] = true;
        else
-           values[7] = TimestampTzGetDatum(worker.reply_time);
+           values[8] = TimestampTzGetDatum(worker.reply_time);
 
        tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
                             values, nulls);
index 58bd1360b976e2dd4a3f26404895b633c3398448..673749340229432ffdbe8c4380aa1a0ef66b942a 100644 (file)
@@ -25,6 +25,7 @@
 #include "pgstat.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicallauncher.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/acl.h"
@@ -409,9 +410,9 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 
            /*
             * If a PGPROC entry was retrieved, display wait events and lock
-            * group leader information if any.  To avoid extra overhead, no
-            * extra lock is being held, so there is no guarantee of
-            * consistency across multiple rows.
+            * group leader or apply leader information if any.  To avoid
+            * extra overhead, no extra lock is being held, so there is no
+            * guarantee of consistency across multiple rows.
             */
            if (proc != NULL)
            {
@@ -426,14 +427,24 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 
                /*
                 * Show the leader only for active parallel workers.  This
-                * leaves the field as NULL for the leader of a parallel
-                * group.
+                * leaves the field as NULL for the leader of a parallel group
+                * or the leader of parallel apply workers.
                 */
                if (leader && leader->pid != beentry->st_procpid)
                {
                    values[28] = Int32GetDatum(leader->pid);
                    nulls[28] = false;
                }
+               else if (beentry->st_backendType == B_BG_WORKER)
+               {
+                   int         leader_pid = GetLeaderApplyWorkerPid(beentry->st_procpid);
+
+                   if (leader_pid != InvalidPid)
+                   {
+                       values[28] = Int32GetDatum(leader_pid);
+                       nulls[28] = false;
+                   }
+               }
            }
 
            if (wait_event_type)
index 2afed46b899e6712bbeb99f764ea6d65411bb3b9..fe35d7c76ae2d4d60852a3fa85829339b5bfb4f0 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202301131
+#define CATALOG_VERSION_NO 202301181
 
 #endif
index 3810de7b22d0c18343c1f84099a3e57e44a86288..86eb8e8c58a7cb4bf22ee3164e9779bbdc66584e 100644 (file)
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,oid,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
+  proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
index c85593ad113f57622737ba8ebb4b8590cdb2c6d7..360e98702a8198fea0ed33a84dc47a8479d7e8c4 100644 (file)
@@ -27,4 +27,6 @@ extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
 
+extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
+
 #endif                         /* LOGICALLAUNCHER_H */
index db891eea8ae34c9569c63b883ad3ec9b9f17981c..dc87a4edd1315b8a43be408d31ef0a94e8157d15 100644 (file)
@@ -71,7 +71,7 @@ typedef struct LogicalRepWorker
     * PID of leader apply worker if this slot is used for a parallel apply
     * worker, InvalidPid otherwise.
     */
-   pid_t       apply_leader_pid;
+   pid_t       leader_pid;
 
    /* Indicates whether apply can be performed in parallel. */
    bool        parallel_apply;
@@ -303,7 +303,7 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                           XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
 
 static inline bool
 am_tablesync_worker(void)
index a969ae63eb5846c699b9733e37e3795a6a432c60..05eec2adfd29527f852d9f8d78f690bb3aab3afa 100644 (file)
@@ -2094,6 +2094,7 @@ pg_stat_ssl| SELECT s.pid,
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
     st.pid,
+    st.leader_pid,
     st.relid,
     st.received_lsn,
     st.last_msg_send_time,
@@ -2101,7 +2102,7 @@ pg_stat_subscription| SELECT su.oid AS subid,
     st.latest_end_lsn,
     st.latest_end_time
    FROM (pg_subscription su
-     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
+     LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, leader_pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
 pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,