}
}
+/*
+ * Check if replica identity matches and mark the updatable flag.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * We just mark the relation entry as not updatable here if the local
+ * replica identity is found to be insufficient for applying
+ * updates/deletes (inserts don't care!) and leave it to
+ * check_relation_updatable() to throw the actual error if needed.
+ */
+static void
+logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
+{
+ Bitmapset *idkey;
+ LogicalRepRelation *remoterel = &entry->remoterel;
+ int i;
+
+ entry->updatable = true;
+
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ /* fallback to PK if no replica identity */
+ if (idkey == NULL)
+ {
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+ /*
+ * If no replica identity index and no PK, the published table must
+ * have replica identity FULL.
+ */
+ if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+ entry->updatable = false;
+ }
+
+ i = -1;
+ while ((i = bms_next_member(idkey, i)) >= 0)
+ {
+ int attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+ if (!AttrNumberIsForUserDefinedAttr(attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" uses "
+ "system columns in REPLICA IDENTITY index",
+ remoterel->nspname, remoterel->relname)));
+
+ attnum = AttrNumberGetAttrOffset(attnum);
+
+ if (entry->attrmap->attnums[attnum] < 0 ||
+ !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
+ {
+ entry->updatable = false;
+ break;
+ }
+ }
+}
+
/*
* Open the local relation associated with the remote one.
*
if (!entry->localrelvalid)
{
Oid relid;
- Bitmapset *idkey;
TupleDesc desc;
MemoryContext oldctx;
int i;
bms_free(missingatts);
/*
- * Check that replica identity matches. We allow for stricter replica
- * identity (fewer columns) on subscriber as that will not stop us
- * from finding unique tuple. IE, if publisher has identity
- * (id,timestamp) and subscriber just (id) this will not be a problem,
- * but in the opposite scenario it will.
- *
- * Don't throw any error here just mark the relation entry as not
- * updatable, as replica identity is only for updates and deletes but
- * inserts can be replicated even without it.
+ * Set if the table's replica identity is enough to apply
+ * update/delete.
*/
- entry->updatable = true;
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_IDENTITY_KEY);
- /* fallback to PK if no replica identity */
- if (idkey == NULL)
- {
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
- /*
- * If no replica identity index and no PK, the published table
- * must have replica identity FULL.
- */
- if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
- entry->updatable = false;
- }
-
- i = -1;
- while ((i = bms_next_member(idkey, i)) >= 0)
- {
- int attnum = i + FirstLowInvalidHeapAttributeNumber;
-
- if (!AttrNumberIsForUserDefinedAttr(attnum))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication target relation \"%s.%s\" uses "
- "system columns in REPLICA IDENTITY index",
- remoterel->nspname, remoterel->relname)));
-
- attnum = AttrNumberGetAttrOffset(attnum);
-
- if (entry->attrmap->attnums[attnum] < 0 ||
- !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
- {
- entry->updatable = false;
- break;
- }
- }
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
}
attrmap->maplen * sizeof(AttrNumber));
}
- entry->updatable = root->updatable;
+ /* Set if the table's replica identity is enough to apply update/delete. */
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
static void
check_relation_updatable(LogicalRepRelMapEntry *rel)
{
+ /*
+ * For partitioned tables, we only need to care if the target partition is
+ * updatable (aka has PK or RI defined for it).
+ */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ return;
+
/* Updatable, no error. */
if (rel->updatable)
return;
TupleTableSlot *remoteslot_part;
TupleConversionMap *map;
MemoryContext oldctx;
+ LogicalRepRelMapEntry *part_entry = NULL;
+ AttrMap *attrmap = NULL;
/* ModifyTableState is needed for ExecFindPartition(). */
edata->mtstate = mtstate = makeNode(ModifyTableState);
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
map = partrelinfo->ri_RootToPartitionMap;
if (map != NULL)
- remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+ {
+ attrmap = map->attrMap;
+ remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
remoteslot_part);
+ }
else
{
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
}
MemoryContextSwitchTo(oldctx);
+ /* Check if we can do the update or delete on the leaf partition. */
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ part_entry = logicalrep_partition_open(relmapentry, partrel,
+ attrmap);
+ check_relation_updatable(part_entry);
+ }
+
switch (operation)
{
case CMD_INSERT:
* suitable partition.
*/
{
- AttrMap *attrmap = map ? map->attrMap : NULL;
- LogicalRepRelMapEntry *part_entry;
TupleTableSlot *localslot;
ResultRelInfo *partrelinfo_new;
bool found;
- part_entry = logicalrep_partition_open(relmapentry, partrel,
- attrmap);
-
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
"SELECT a, b, c FROM tab5 ORDER BY 1");
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
+# Test that replication works correctly as long as the leaf partition
+# has the necessary REPLICA IDENTITY, even though the actual target
+# partitioned table does not.
+$node_subscriber2->safe_psql('postgres',
+ "ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5_1 ORDER BY 1");
+is($result, qq(4||1), 'updates of tab5 replicated correctly');
+
done_testing();