Detect and Log multiple_unique_conflicts type conflict.
authorAmit Kapila <[email protected]>
Mon, 24 Mar 2025 07:00:44 +0000 (12:30 +0530)
committerAmit Kapila <[email protected]>
Mon, 24 Mar 2025 07:00:44 +0000 (12:30 +0530)
Introduce a new conflict type, multiple_unique_conflicts, to handle cases
where an incoming row during logical replication violates multiple UNIQUE
constraints.

Previously, the apply worker detected and reported only the first
encountered key conflict (insert_exists/update_exists), causing repeated
failures as each constraint violation needs to be handled one by one
making the process slow and error-prone.

With this patch, the apply worker checks all unique constraints upfront
once the first key conflict is detected and reports
multiple_unique_conflicts if multiple violations exist. This allows users
to resolve all conflicts at once by deleting all conflicting tuples rather
than dealing with them individually or skipping the transaction.

In the future, this will also allow us to specify different resolution
handlers for such a conflict type.

Add the stats for this conflict type in pg_stat_subscription_stats.

Author: Nisha Moond <[email protected]>
Author: Zhijie Hou <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Peter Smith <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Discussion: https://postgr.es/m/CABdArM7FW-_dnthGkg2s0fy1HhUB8C3ELA0gZX1kkbs1ZZoV3Q@mail.gmail.com

14 files changed:
doc/src/sgml/logical-replication.sgml
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/executor/execReplication.c
src/backend/replication/logical/conflict.c
src/backend/replication/logical/worker.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/conflict.h
src/test/regress/expected/rules.out
src/test/subscription/meson.build
src/test/subscription/t/035_conflicts.pl [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index 3556ce7cb46e7a3740aaa2970a1a164a9f09af6b..f288c049a5c9aaf73f896f1bca8201e681d456aa 100644 (file)
@@ -1877,6 +1877,19 @@ test_sub=# SELECT * from tab_gen_to_gen;
       </para>
      </listitem>
     </varlistentry>
+    <varlistentry id="conflict-multiple-unique-conflicts" xreflabel="multiple_unique_conflicts">
+     <term><literal>multiple_unique_conflicts</literal></term>
+     <listitem>
+      <para>
+       Inserting or updating a row violates multiple
+       <literal>NOT DEFERRABLE</literal> unique constraints. Note that to log
+       the origin and commit timestamp details of conflicting keys, ensure
+       that <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       is enabled on the subscriber. In this case, an error will be raised until
+       the conflict is resolved manually.
+      </para>
+     </listitem>
+    </varlistentry>
    </variablelist>
     Note that there are other conflict scenarios, such as exclusion constraint
     violations. Currently, we do not provide additional details for them in the
@@ -1935,8 +1948,8 @@ DETAIL:  <replaceable class="parameter">detailed_explanation</replaceable>.
         <para>
          The <literal>Key</literal> section includes the key values of the local
          tuple that violated a unique constraint for
-         <literal>insert_exists</literal> or <literal>update_exists</literal>
-         conflicts.
+         <literal>insert_exists</literal>, <literal>update_exists</literal> or
+         <literal>multiple_unique_conflicts</literal> conflicts.
         </para>
        </listitem>
        <listitem>
@@ -1945,8 +1958,8 @@ DETAIL:  <replaceable class="parameter">detailed_explanation</replaceable>.
          tuple if its origin differs from the remote tuple for
          <literal>update_origin_differs</literal> or <literal>delete_origin_differs</literal>
          conflicts, or if the key value conflicts with the remote tuple for
-         <literal>insert_exists</literal> or <literal>update_exists</literal>
-         conflicts.
+         <literal>insert_exists</literal>, <literal>update_exists</literal> or
+         <literal>multiple_unique_conflicts</literal> conflicts.
         </para>
        </listitem>
        <listitem>
@@ -1982,6 +1995,16 @@ DETAIL:  <replaceable class="parameter">detailed_explanation</replaceable>.
          The large column values are truncated to 64 bytes.
         </para>
        </listitem>
+       <listitem>
+        <para>
+         Note that in case of <literal>multiple_unique_conflicts</literal> conflict,
+         multiple <replaceable class="parameter">detailed_explanation</replaceable>
+         and <replaceable class="parameter">detail_values</replaceable> lines
+         will be generated, each detailing the conflict information associated
+         with distinct unique
+         constraints.
+        </para>
+       </listitem>
       </itemizedlist>
      </listitem>
     </varlistentry>
index aaa6586d3a4942ca1cc6aa6f38ad8354ee208691..0960f5ba94a8eafc153e85efb2fe56ca4a0d4119 100644 (file)
@@ -2250,6 +2250,18 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_multiple_unique_conflicts</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion or an updated row values violated multiple
+       <literal>NOT DEFERRABLE</literal> unique constraints during the
+       application of changes. See <xref linkend="conflict-multiple-unique-conflicts"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
index a4d2cfdcaf5d449c111c38c478e6653fb9c00741..31d269b7ee0c42f35f28326cde9939a537f76339 100644 (file)
@@ -1384,6 +1384,7 @@ CREATE VIEW pg_stat_subscription_stats AS
         ss.confl_update_missing,
         ss.confl_delete_origin_differs,
         ss.confl_delete_missing,
+        ss.confl_multiple_unique_conflicts,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
index 0a9b880d250ac511dc2fd8f7362111695d60ef74..ede89ea3cf972d4326b0da97f8e3f3a5af780bc6 100644 (file)
@@ -493,25 +493,33 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
                       ConflictType type, List *recheckIndexes,
                       TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
 {
-   /* Check all the unique indexes for a conflict */
+   List       *conflicttuples = NIL;
+   TupleTableSlot *conflictslot;
+
+   /* Check all the unique indexes for conflicts */
    foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
    {
-       TupleTableSlot *conflictslot;
-
        if (list_member_oid(recheckIndexes, uniqueidx) &&
            FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
                              &conflictslot))
        {
-           RepOriginId origin;
-           TimestampTz committs;
-           TransactionId xmin;
-
-           GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
-           ReportApplyConflict(estate, resultRelInfo, ERROR, type,
-                               searchslot, conflictslot, remoteslot,
-                               uniqueidx, xmin, origin, committs);
+           ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
+
+           conflicttuple->slot = conflictslot;
+           conflicttuple->indexoid = uniqueidx;
+
+           GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
+                                   &conflicttuple->origin, &conflicttuple->ts);
+
+           conflicttuples = lappend(conflicttuples, conflicttuple);
        }
    }
+
+   /* Report the conflict, if found */
+   if (conflicttuples)
+       ReportApplyConflict(estate, resultRelInfo, ERROR,
+                           list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
+                           searchslot, remoteslot, conflicttuples);
 }
 
 /*
index 772fc83e88b73d7af6a32a7bac23a568b1e424e8..f1e92f2fc1a35564e4acd1776c4f0aff95812cbe 100644 (file)
@@ -29,11 +29,12 @@ static const char *const ConflictTypeNames[] = {
    [CT_UPDATE_EXISTS] = "update_exists",
    [CT_UPDATE_MISSING] = "update_missing",
    [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
-   [CT_DELETE_MISSING] = "delete_missing"
+   [CT_DELETE_MISSING] = "delete_missing",
+   [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
 };
 
 static int errcode_apply_conflict(ConflictType type);
-static int errdetail_apply_conflict(EState *estate,
+static void errdetail_apply_conflict(EState *estate,
                                     ResultRelInfo *relinfo,
                                     ConflictType type,
                                     TupleTableSlot *searchslot,
@@ -41,7 +42,7 @@ static int    errdetail_apply_conflict(EState *estate,
                                     TupleTableSlot *remoteslot,
                                     Oid indexoid, TransactionId localxmin,
                                     RepOriginId localorigin,
-                                    TimestampTz localts);
+                                    TimestampTz localts, StringInfo err_msg);
 static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
                                       ConflictType type,
                                       TupleTableSlot *searchslot,
@@ -90,30 +91,33 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
  * 'searchslot' should contain the tuple used to search the local tuple to be
  * updated or deleted.
  *
- * 'localslot' should contain the existing local tuple, if any, that conflicts
- * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
- * transaction information related to this existing local tuple.
- *
  * 'remoteslot' should contain the remote new tuple, if any.
  *
- * The 'indexoid' represents the OID of the unique index that triggered the
- * constraint violation error. We use this to report the key values for
- * conflicting tuple.
+ * conflicttuples is a list of local tuples that caused the conflict and the
+ * conflict related information. See ConflictTupleInfo.
  *
- * The caller must ensure that the index with the OID 'indexoid' is locked so
- * that we can fetch and display the conflicting key value.
+ * The caller must ensure that all the indexes passed in ConflictTupleInfo are
+ * locked so that we can fetch and display the conflicting key values.
  */
 void
 ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
                    ConflictType type, TupleTableSlot *searchslot,
-                   TupleTableSlot *localslot, TupleTableSlot *remoteslot,
-                   Oid indexoid, TransactionId localxmin,
-                   RepOriginId localorigin, TimestampTz localts)
+                   TupleTableSlot *remoteslot, List *conflicttuples)
 {
    Relation    localrel = relinfo->ri_RelationDesc;
+   StringInfoData err_detail;
+
+   initStringInfo(&err_detail);
 
-   Assert(!OidIsValid(indexoid) ||
-          CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+   /* Form errdetail message by combining conflicting tuples information. */
+   foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
+       errdetail_apply_conflict(estate, relinfo, type, searchslot,
+                                conflicttuple->slot, remoteslot,
+                                conflicttuple->indexoid,
+                                conflicttuple->xmin,
+                                conflicttuple->origin,
+                                conflicttuple->ts,
+                                &err_detail);
 
    pgstat_report_subscription_conflict(MySubscription->oid, type);
 
@@ -123,9 +127,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
                   get_namespace_name(RelationGetNamespace(localrel)),
                   RelationGetRelationName(localrel),
                   ConflictTypeNames[type]),
-           errdetail_apply_conflict(estate, relinfo, type, searchslot,
-                                    localslot, remoteslot, indexoid,
-                                    localxmin, localorigin, localts));
+           errdetail_internal("%s", err_detail.data));
 }
 
 /*
@@ -169,6 +171,7 @@ errcode_apply_conflict(ConflictType type)
    {
        case CT_INSERT_EXISTS:
        case CT_UPDATE_EXISTS:
+       case CT_MULTIPLE_UNIQUE_CONFLICTS:
            return errcode(ERRCODE_UNIQUE_VIOLATION);
        case CT_UPDATE_ORIGIN_DIFFERS:
        case CT_UPDATE_MISSING:
@@ -191,12 +194,13 @@ errcode_apply_conflict(ConflictType type)
  *    replica identity columns, if any. The remote old tuple is excluded as its
  *    information is covered in the replica identity columns.
  */
-static int
+static void
 errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
                         ConflictType type, TupleTableSlot *searchslot,
                         TupleTableSlot *localslot, TupleTableSlot *remoteslot,
                         Oid indexoid, TransactionId localxmin,
-                        RepOriginId localorigin, TimestampTz localts)
+                        RepOriginId localorigin, TimestampTz localts,
+                        StringInfo err_msg)
 {
    StringInfoData err_detail;
    char       *val_desc;
@@ -209,7 +213,9 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
    {
        case CT_INSERT_EXISTS:
        case CT_UPDATE_EXISTS:
-           Assert(OidIsValid(indexoid));
+       case CT_MULTIPLE_UNIQUE_CONFLICTS:
+           Assert(OidIsValid(indexoid) &&
+                  CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
            if (localts)
            {
@@ -291,7 +297,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
    if (val_desc)
        appendStringInfo(&err_detail, "\n%s", val_desc);
 
-   return errdetail_internal("%s", err_detail.data);
+   /*
+    * Insert a blank line to visually separate the new detail line from the
+    * existing ones.
+    */
+   if (err_msg->len > 0)
+       appendStringInfoChar(err_msg, '\n');
+
+   appendStringInfo(err_msg, "%s", err_detail.data);
 }
 
 /*
@@ -323,7 +336,8 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
     * Report the conflicting key values in the case of a unique constraint
     * violation.
     */
-   if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
+   if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS ||
+       type == CT_MULTIPLE_UNIQUE_CONFLICTS)
    {
        Assert(OidIsValid(indexoid) && localslot);
 
index 31ab69ea13a6da532a865729827c9d7c0f3d98e1..e3b2b1449420871831c88ae6367cc9358fd8f5fc 100644 (file)
@@ -2674,7 +2674,8 @@ apply_handle_update_internal(ApplyExecutionData *edata,
    LogicalRepRelMapEntry *relmapentry = edata->targetRel;
    Relation    localrel = relinfo->ri_RelationDesc;
    EPQState    epqstate;
-   TupleTableSlot *localslot;
+   TupleTableSlot *localslot = NULL;
+   ConflictTupleInfo conflicttuple = {0};
    bool        found;
    MemoryContext oldctx;
 
@@ -2693,16 +2694,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
     */
    if (found)
    {
-       RepOriginId localorigin;
-       TransactionId localxmin;
-       TimestampTz localts;
-
        /*
         * Report the conflict if the tuple was modified by a different
         * origin.
         */
-       if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
-           localorigin != replorigin_session_origin)
+       if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+                                   &conflicttuple.origin, &conflicttuple.ts) &&
+           conflicttuple.origin != replorigin_session_origin)
        {
            TupleTableSlot *newslot;
 
@@ -2710,9 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata,
            newslot = table_slot_create(localrel, &estate->es_tupleTable);
            slot_store_data(newslot, relmapentry, newtup);
 
+           conflicttuple.slot = localslot;
+
            ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
-                               remoteslot, localslot, newslot,
-                               InvalidOid, localxmin, localorigin, localts);
+                               remoteslot, newslot,
+                               list_make1(&conflicttuple));
        }
 
        /* Process and store remote tuple in the slot */
@@ -2741,9 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
         * emitting a log message.
         */
        ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
-                           remoteslot, NULL, newslot,
-                           InvalidOid, InvalidTransactionId,
-                           InvalidRepOriginId, 0);
+                           remoteslot, newslot, list_make1(&conflicttuple));
    }
 
    /* Cleanup. */
@@ -2861,6 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
    LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
    EPQState    epqstate;
    TupleTableSlot *localslot;
+   ConflictTupleInfo conflicttuple = {0};
    bool        found;
 
    EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
@@ -2876,19 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
    /* If found delete it. */
    if (found)
    {
-       RepOriginId localorigin;
-       TransactionId localxmin;
-       TimestampTz localts;
-
        /*
         * Report the conflict if the tuple was modified by a different
         * origin.
         */
-       if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
-           localorigin != replorigin_session_origin)
+       if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+                                   &conflicttuple.origin, &conflicttuple.ts) &&
+           conflicttuple.origin != replorigin_session_origin)
+       {
+           conflicttuple.slot = localslot;
            ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
-                               remoteslot, localslot, NULL,
-                               InvalidOid, localxmin, localorigin, localts);
+                               remoteslot, NULL,
+                               list_make1(&conflicttuple));
+       }
 
        EvalPlanQualSetSlot(&epqstate, localslot);
 
@@ -2903,9 +2902,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
         * emitting a log message.
         */
        ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING,
-                           remoteslot, NULL, NULL,
-                           InvalidOid, InvalidTransactionId,
-                           InvalidRepOriginId, 0);
+                           remoteslot, NULL, list_make1(&conflicttuple));
    }
 
    /* Cleanup. */
@@ -3073,9 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                Relation    partrel_new;
                bool        found;
                EPQState    epqstate;
-               RepOriginId localorigin;
-               TransactionId localxmin;
-               TimestampTz localts;
+               ConflictTupleInfo conflicttuple = {0};
 
                /* Get the matching local tuple from the partition. */
                found = FindReplTupleInLocalRel(edata, partrel,
@@ -3093,11 +3088,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                     * The tuple to be updated could not be found.  Do nothing
                     * except for emitting a log message.
                     */
-                   ReportApplyConflict(estate, partrelinfo,
-                                       LOG, CT_UPDATE_MISSING,
-                                       remoteslot_part, NULL, newslot,
-                                       InvalidOid, InvalidTransactionId,
-                                       InvalidRepOriginId, 0);
+                   ReportApplyConflict(estate, partrelinfo, LOG,
+                                       CT_UPDATE_MISSING, remoteslot_part,
+                                       newslot, list_make1(&conflicttuple));
 
                    return;
                }
@@ -3106,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                 * Report the conflict if the tuple was modified by a
                 * different origin.
                 */
-               if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) &&
-                   localorigin != replorigin_session_origin)
+               if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
+                                           &conflicttuple.origin,
+                                           &conflicttuple.ts) &&
+                   conflicttuple.origin != replorigin_session_origin)
                {
                    TupleTableSlot *newslot;
 
@@ -3115,10 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                    newslot = table_slot_create(partrel, &estate->es_tupleTable);
                    slot_store_data(newslot, part_entry, newtup);
 
+                   conflicttuple.slot = localslot;
+
                    ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
-                                       remoteslot_part, localslot, newslot,
-                                       InvalidOid, localxmin, localorigin,
-                                       localts);
+                                       remoteslot_part, newslot,
+                                       list_make1(&conflicttuple));
                }
 
                /*
index 662ce46cbc2056d7131bc5146fde29ede5ce3580..97af7c6554ff3c6870102ca22fe706d2c3bc115c 100644 (file)
@@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    10
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    11
    Oid         subid = PG_GETARG_OID(0);
    TupleDesc   tupdesc;
    Datum       values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2203,7 +2203,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
                       INT8OID, -1, 0);
    TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
                       INT8OID, -1, 0);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+   TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
                       TIMESTAMPTZOID, -1, 0);
    BlessTupleDesc(tupdesc);
 
index d52944b1145e904b6a2bef9f7776095ac23260d1..cf381867e4097db571b3d5d6b0f606791436f9e5 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202503131
+#define CATALOG_VERSION_NO 202503241
 
 #endif
index 890822eaf79e18137914c8fb23c4b49326ff190e..0d29ef50ff2b85ced52fcc098b440842399ac991 100644 (file)
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
index 37454dc951372ee4b8c1ca8b74e2bccd59a6547b..6c59125f25657cad833b5167f2eb832b60ab1e9b 100644 (file)
@@ -41,6 +41,9 @@ typedef enum
    /* The row to be deleted is missing */
    CT_DELETE_MISSING,
 
+   /* The row to be inserted/updated violates multiple unique constraint */
+   CT_MULTIPLE_UNIQUE_CONFLICTS,
+
    /*
     * Other conflicts, such as exclusion constraint violations, involve more
     * complex rules than simple equality checks. These conflicts are left for
@@ -48,7 +51,23 @@ typedef enum
     */
 } ConflictType;
 
-#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+#define CONFLICT_NUM_TYPES (CT_MULTIPLE_UNIQUE_CONFLICTS + 1)
+
+/*
+ * Information for the existing local tuple that caused the conflict.
+ */
+typedef struct ConflictTupleInfo
+{
+   TupleTableSlot *slot;       /* tuple slot holding the conflicting local
+                                * tuple */
+   Oid         indexoid;       /* OID of the index where the conflict
+                                * occurred */
+   TransactionId xmin;         /* transaction ID of the modification causing
+                                * the conflict */
+   RepOriginId origin;         /* origin identifier of the modification */
+   TimestampTz ts;             /* timestamp of when the modification on the
+                                * conflicting local tuple occurred */
+} ConflictTupleInfo;
 
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
                                    TransactionId *xmin,
@@ -57,10 +76,7 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
                                int elevel, ConflictType type,
                                TupleTableSlot *searchslot,
-                               TupleTableSlot *localslot,
                                TupleTableSlot *remoteslot,
-                               Oid indexoid, TransactionId localxmin,
-                               RepOriginId localorigin, TimestampTz localts);
+                               List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
-
 #endif
index 62f69ac20b2eff93883153691ac4ad5daa8792b1..474789691357bf1f0df2348e9c82b462907b9c2f 100644 (file)
@@ -2157,9 +2157,10 @@ pg_stat_subscription_stats| SELECT ss.subid,
     ss.confl_update_missing,
     ss.confl_delete_origin_differs,
     ss.confl_delete_missing,
+    ss.confl_multiple_unique_conflicts,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
index d40b49714f62af2754d05b52a2b95952ee183a05..586ffba434e1177f3b203b1891fd66e94936d54d 100644 (file)
@@ -41,6 +41,7 @@ tests += {
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
       't/034_temporal.pl',
+      't/035_conflicts.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
new file mode 100644 (file)
index 0000000..f9778db
--- /dev/null
@@ -0,0 +1,113 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Create a publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create a subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create a table on publisher
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);");
+
+# Create same table on subscriber
+$node_subscriber->safe_psql('postgres',
+   "CREATE TABLE conf_tab (a int PRIMARY key, b int UNIQUE, c int UNIQUE);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION pub_tab FOR TABLE conf_tab");
+
+# Create the subscription
+my $appname = 'sub_tab';
+$node_subscriber->safe_psql(
+   'postgres',
+   "CREATE SUBSCRIPTION sub_tab
+    CONNECTION '$publisher_connstr application_name=$appname'
+    PUBLICATION pub_tab;");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+##################################################
+# INSERT data on Pub and Sub
+##################################################
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO conf_tab VALUES (1,1,1);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+   "INSERT INTO conf_tab VALUES (2,2,2), (3,3,3), (4,4,4);");
+
+##################################################
+# Test multiple_unique_conflicts due to INSERT
+##################################################
+my $log_offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO conf_tab VALUES (2,3,4);");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+   qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
+.*Key already exists in unique index \"conf_tab_pkey\".*
+.*Key \(a\)=\(2\); existing local tuple \(2, 2, 2\); remote tuple \(2, 3, 4\).*
+.*Key already exists in unique index \"conf_tab_b_key\".*
+.*Key \(b\)=\(3\); existing local tuple \(3, 3, 3\); remote tuple \(2, 3, 4\).*
+.*Key already exists in unique index \"conf_tab_c_key\".*
+.*Key \(c\)=\(4\); existing local tuple \(4, 4, 4\); remote tuple \(2, 3, 4\)./,
+   $log_offset);
+
+pass('multiple_unique_conflicts detected during update');
+
+# Truncate table to get rid of the error
+$node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;");
+
+##################################################
+# Test multiple_unique_conflicts due to UPDATE
+##################################################
+$log_offset = -s $node_subscriber->logfile;
+
+# Insert data in the publisher table
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO conf_tab VALUES (5,5,5);");
+
+# Insert data in the subscriber table
+$node_subscriber->safe_psql('postgres',
+   "INSERT INTO conf_tab VALUES (6,6,6), (7,7,7), (8,8,8);");
+
+$node_publisher->safe_psql('postgres',
+   "UPDATE conf_tab set a=6, b=7, c=8 where a=5;");
+
+# Confirm that this causes an error on the subscriber
+$node_subscriber->wait_for_log(
+   qr/conflict detected on relation \"public.conf_tab\": conflict=multiple_unique_conflicts.*
+.*Key already exists in unique index \"conf_tab_pkey\".*
+.*Key \(a\)=\(6\); existing local tuple \(6, 6, 6\); remote tuple \(6, 7, 8\).*
+.*Key already exists in unique index \"conf_tab_b_key\".*
+.*Key \(b\)=\(7\); existing local tuple \(7, 7, 7\); remote tuple \(6, 7, 8\).*
+.*Key already exists in unique index \"conf_tab_c_key\".*
+.*Key \(c\)=\(8\); existing local tuple \(8, 8, 8\); remote tuple \(6, 7, 8\)./,
+   $log_offset);
+
+pass('multiple_unique_conflicts detected during insert');
+
+done_testing();
index bfa276d2d355a6edc01221ff6defde5177ff8da1..3fbf5a4c2128e9aaa18973975d0b6e9dcb54829d 100644 (file)
@@ -480,6 +480,7 @@ ConditionVariableMinimallyPadded
 ConditionalStack
 ConfigData
 ConfigVariable
+ConflictTupleInfo
 ConflictType
 ConnCacheEntry
 ConnCacheKey