Fix updating of pg_subscription_rel from workers
authorPeter Eisentraut <[email protected]>
Wed, 7 Jun 2017 17:49:14 +0000 (13:49 -0400)
committerPeter Eisentraut <[email protected]>
Wed, 7 Jun 2017 17:49:14 +0000 (13:49 -0400)
A logical replication worker should not insert new rows into
pg_subscription_rel, only update existing rows, so that there are no
races if a concurrent refresh removes rows.  Adjust the API to be able
to choose that behavior.

Author: Masahiko Sawada <[email protected]>
Reported-by: tushar <[email protected]>
src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/include/catalog/pg_subscription_rel.h

index ab5f3719fc397fdaba5f4f7fefc7dc7fd54b2a35..c5b2541319ee8fa13e01cd1d368e409316c64c74 100644 (file)
@@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray)
 /*
  * Set the state of a subscription table.
  *
+ * If update_only is true and the record for given table doesn't exist, do
+ * nothing.  This can be used to avoid inserting a new record that was deleted
+ * by someone else.  Generally, subscription DDL commands should use false,
+ * workers should use true.
+ *
  * The insert-or-update logic in this function is not concurrency safe so it
  * might raise an error in rare circumstances.  But if we took a stronger lock
  * such as ShareRowExclusiveLock, we would risk more deadlocks.
  */
 Oid
 SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                       XLogRecPtr sublsn)
+                       XLogRecPtr sublsn, bool update_only)
 {
    Relation    rel;
    HeapTuple   tup;
-   Oid         subrelid;
+   Oid         subrelid = InvalidOid;
    bool        nulls[Natts_pg_subscription_rel];
    Datum       values[Natts_pg_subscription_rel];
 
@@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
     * If the record for given table does not exist yet create new record,
     * otherwise update the existing one.
     */
-   if (!HeapTupleIsValid(tup))
+   if (!HeapTupleIsValid(tup) && !update_only)
    {
        /* Form the tuple. */
        memset(values, 0, sizeof(values));
@@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
 
        heap_freetuple(tup);
    }
-   else
+   else if (HeapTupleIsValid(tup))
    {
        bool        replaces[Natts_pg_subscription_rel];
 
index ad98b38efe8adc6d899f319dc171fd0eb1ac7959..49737a904207ed8c000e092ee9d95ee0b76b78bb 100644 (file)
@@ -451,7 +451,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                                         rv->schemaname, rv->relname);
 
                SetSubscriptionRelState(subid, relid, table_state,
-                                       InvalidXLogRecPtr);
+                                       InvalidXLogRecPtr, false);
            }
 
            ereport(NOTICE,
@@ -574,7 +574,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
        {
            SetSubscriptionRelState(sub->oid, relid,
                          copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-                                   InvalidXLogRecPtr);
+                                   InvalidXLogRecPtr, false);
            ereport(NOTICE,
                    (errmsg("added subscription for table %s.%s",
                            quote_identifier(rv->schemaname),
index 6fe39d20237d3956819b3da64ffc997df21cd5bc..f57ae6ee2d560abc048abad2b7b345aff600c955 100644 (file)
@@ -287,7 +287,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
        SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                MyLogicalRepWorker->relid,
                                MyLogicalRepWorker->relstate,
-                               MyLogicalRepWorker->relstate_lsn);
+                               MyLogicalRepWorker->relstate_lsn,
+                               true);
 
        walrcv_endstreaming(wrconn, &tli);
        finish_sync_worker();
@@ -414,7 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                }
                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                        rstate->relid, rstate->state,
-                                       rstate->lsn);
+                                       rstate->lsn, true);
            }
        }
        else
@@ -845,7 +846,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                        MyLogicalRepWorker->relid,
                                        MyLogicalRepWorker->relstate,
-                                       MyLogicalRepWorker->relstate_lsn);
+                                       MyLogicalRepWorker->relstate_lsn,
+                                       true);
                CommitTransactionCommand();
                pgstat_report_stat(false);
 
@@ -932,7 +934,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                    SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                            MyLogicalRepWorker->relid,
                                            SUBREL_STATE_SYNCDONE,
-                                           *origin_startpos);
+                                           *origin_startpos,
+                                           true);
                    finish_sync_worker();
                }
                break;
index 391f96b76e4f4110e90336a6572d261fdab6d352..f5f6191676884af40a21070741e4120a8517455c 100644 (file)
@@ -71,7 +71,7 @@ typedef struct SubscriptionRelState
 } SubscriptionRelState;
 
 extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                       XLogRecPtr sublsn);
+                       XLogRecPtr sublsn, bool update_only);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
                        XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);