Code review for recent slot.c changes.
authorAndres Freund <[email protected]>
Thu, 6 Apr 2017 03:56:35 +0000 (20:56 -0700)
committerAndres Freund <[email protected]>
Thu, 6 Apr 2017 04:00:29 +0000 (21:00 -0700)
src/backend/replication/slot.c

index 6c5ec7a00e8d646e34805ec9ebaf02174196ec9f..6ac0ce69c65017eeb5552158341cfe13c11cef1e 100644 (file)
@@ -410,7 +410,7 @@ ReplicationSlotRelease(void)
  * Cleanup all temporary slots created in current session.
  */
 void
-ReplicationSlotCleanup()
+ReplicationSlotCleanup(void)
 {
    int         i;
 
@@ -802,12 +802,12 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
  * pg_database oid for the database to prevent creation of new slots on the db
  * or replay from existing slots.
  *
- * This routine isn't as efficient as it could be - but we don't drop databases
- * often, especially databases with lots of slots.
- *
  * Another session that concurrently acquires an existing slot on the target DB
  * (most likely to drop it) may cause this function to ERROR. If that happens
  * it may have dropped some but not all slots.
+ *
+ * This routine isn't as efficient as it could be - but we don't drop
+ * databases often, especially databases with lots of slots.
  */
 void
 ReplicationSlotsDropDBSlots(Oid dboid)
@@ -822,7 +822,7 @@ restart:
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s;
-       NameData slotname;
+       char *slotname;
        int active_pid;
 
        s = &ReplicationSlotCtl->replication_slots[i];
@@ -839,10 +839,10 @@ restart:
        if (s->data.database != dboid)
            continue;
 
-       /* Claim the slot, as if ReplicationSlotAcquire()ing. */
+       /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
        SpinLockAcquire(&s->mutex);
-       strncpy(NameStr(slotname), NameStr(s->data.name), NAMEDATALEN);
-       NameStr(slotname)[NAMEDATALEN-1] = '\0';
+       /* can't change while ReplicationSlotControlLock is held */
+       slotname = NameStr(s->data.name);
        active_pid = s->active_pid;
        if (active_pid == 0)
        {
@@ -852,36 +852,32 @@ restart:
        SpinLockRelease(&s->mutex);
 
        /*
-        * We might fail here if the slot was active. Even though we hold an
-        * exclusive lock on the database object a logical slot for that DB can
-        * still be active if it's being dropped by a backend connected to
-        * another DB or is otherwise acquired.
+        * Even though we hold an exclusive lock on the database object a
+        * logical slot for that DB can still be active, e.g. if it's
+        * concurrently being dropped by a backend connected to another DB.
         *
-        * It's an unlikely race that'll only arise from concurrent user action,
-        * so we'll just bail out.
+        * That's fairly unlikely in practice, so we'll just bail out.
         */
        if (active_pid)
-           elog(ERROR, "replication slot %s is in use by pid %d",
-                       NameStr(slotname), active_pid);
+           ereport(ERROR,
+                   (errcode(ERRCODE_OBJECT_IN_USE),
+                    errmsg("replication slot \"%s\" is active for PID %d",
+                           slotname, active_pid)));
 
        /*
-        * To avoid largely duplicating ReplicationSlotDropAcquired() or
-        * complicating it with already_locked flags for ProcArrayLock,
-        * ReplicationSlotControlLock and ReplicationSlotAllocationLock, we
-        * just release our ReplicationSlotControlLock to drop the slot.
+        * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
+        * holding ReplicationSlotControlLock over filesystem operations,
+        * release ReplicationSlotControlLock and use
+        * ReplicationSlotDropAcquired.
         *
-        * For safety we'll restart our scan from the beginning each
-        * time we release the lock.
+        * As that means the set of slots could change, restart scan from the
+        * beginning each time we release the lock.
         */
        LWLockRelease(ReplicationSlotControlLock);
        ReplicationSlotDropAcquired();
        goto restart;
    }
    LWLockRelease(ReplicationSlotControlLock);
-
-   /* recompute limits once after all slots are dropped */
-   ReplicationSlotsComputeRequiredXmin(false);
-   ReplicationSlotsComputeRequiredLSN();
 }