Use callbacks in SlruScanDirectory for the actual action
authorAlvaro Herrera <[email protected]>
Wed, 28 Sep 2011 14:32:38 +0000 (11:32 -0300)
committerAlvaro Herrera <[email protected]>
Tue, 4 Oct 2011 17:03:23 +0000 (14:03 -0300)
Previously, the code assumed that the only possible action to take was
to delete files behind a certain cutoff point.  The async notify code
was already a crock: it used a different "pagePrecedes" function for
truncation than for regular operation.  By allowing it to pass a
callback to SlruScanDirectory it can do cleanly exactly what it needs to
do.

The clog.c code also had its own use for SlruScanDirectory, which is
made a bit simpler with this.

src/backend/access/transam/clog.c
src/backend/access/transam/slru.c
src/backend/commands/async.c
src/include/access/slru.h

index 3a6c39164ce558e51d8fb8dfe90c7a1dae9b415a..ee645f7bd472ee394ace23f9461318b611e57514 100644 (file)
@@ -606,7 +606,7 @@ TruncateCLOG(TransactionId oldestXact)
    cutoffPage = TransactionIdToPage(oldestXact);
 
    /* Check to see if there's any files that could be removed */
-   if (!SlruScanDirectory(ClogCtl, cutoffPage, false))
+   if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage))
        return;                 /* nothing to remove */
 
    /* Write XLOG record and flush XLOG to disk */
index 69e5245beba14a7b6efb8b5d9b2ddd61b51dd13e..4b838a0dfeb0db4a5f36e0e3e13b907bdf92f0de 100644 (file)
@@ -132,6 +132,8 @@ static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
 static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
 static int SlruSelectLRUPage(SlruCtl ctl, int pageno);
 
+static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
+                         int segpage, void *data);
 
 /*
  * Initialization of shared memory
@@ -1137,33 +1139,84 @@ restart:;
    LWLockRelease(shared->ControlLock);
 
    /* Now we can remove the old segment(s) */
-   (void) SlruScanDirectory(ctl, cutoffPage, true);
+   (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
 }
 
 /*
- * SimpleLruTruncate subroutine: scan directory for removable segments.
- * Actually remove them iff doDeletions is true.  Return TRUE iff any
- * removable segments were found.  Note: no locking is needed.
+ * SlruScanDirectory callback
+ *         This callback reports true if there's any segment prior to the one
+ *         containing the page passed as "data".
+ */
+bool
+SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
+{
+   int     cutoffPage = *(int *) data;
+
+   cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
+
+   if (ctl->PagePrecedes(segpage, cutoffPage))
+       return true;    /* found one; don't iterate any more */
+
+   return false;   /* keep going */
+}
+
+/*
+ * SlruScanDirectory callback.
+ *     This callback deletes segments prior to the one passed in as "data".
+ */
+static bool
+SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
+{
+   char    path[MAXPGPATH];
+   int     cutoffPage = *(int *) data;
+
+   if (ctl->PagePrecedes(segpage, cutoffPage))
+   {
+       snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
+       ereport(DEBUG2,
+               (errmsg("removing file \"%s\"", path)));
+       unlink(path);
+   }
+
+   return false;   /* keep going */
+}
+
+/*
+ * SlruScanDirectory callback.
+ *     This callback deletes all segments.
+ */
+bool
+SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
+{
+   char    path[MAXPGPATH];
+
+   snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
+   ereport(DEBUG2,
+           (errmsg("removing file \"%s\"", path)));
+   unlink(path);
+
+   return false;   /* keep going */
+}
+
+/*
+ * Scan the SimpleLRU directory and apply a callback to each file found in it.
+ *
+ * If the callback returns true, the scan is stopped.  The last return value
+ * from the callback is returned.
  *
- * This can be called directly from clog.c, for reasons explained there.
+ * Note that the ordering in which the directory is scanned is not guaranteed.
+ *
+ * Note that no locking is applied.
  */
 bool
-SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions)
+SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
 {
-   bool        found = false;
    DIR        *cldir;
    struct dirent *clde;
    int         segno;
    int         segpage;
-   char        path[MAXPGPATH];
-
-   /*
-    * The cutoff point is the start of the segment containing cutoffPage.
-    * (This is redundant when called from SimpleLruTruncate, but not when
-    * called directly from clog.c.)
-    */
-   cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
-
+   bool        retval;
+   
    cldir = AllocateDir(ctl->Dir);
    while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
    {
@@ -1172,20 +1225,15 @@ SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions)
        {
            segno = (int) strtol(clde->d_name, NULL, 16);
            segpage = segno * SLRU_PAGES_PER_SEGMENT;
-           if (ctl->PagePrecedes(segpage, cutoffPage))
-           {
-               found = true;
-               if (doDeletions)
-               {
-                   snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, clde->d_name);
-                   ereport(DEBUG2,
-                           (errmsg("removing file \"%s\"", path)));
-                   unlink(path);
-               }
-           }
+
+           elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
+                ctl->Dir, clde->d_name);
+           retval = callback(ctl, clde->d_name, segpage, data);
+           if (retval)
+               break;
        }
    }
    FreeDir(cldir);
 
-   return found;
+   return retval;
 }
index 4f20ca2ef73d2affd01b27fe62ae9447ca6abf28..ca2e7348c9729f4281f3ac3015256f2f83c5ddfd 100644 (file)
@@ -194,7 +194,7 @@ typedef struct QueuePosition
 
 /* choose logically smaller QueuePosition */
 #define QUEUE_POS_MIN(x,y) \
-   (asyncQueuePagePrecedesLogically((x).page, (y).page) ? (x) : \
+   (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     (x).page != (y).page ? (y) : \
     (x).offset < (y).offset ? (x) : (y))
 
@@ -360,8 +360,7 @@ static bool backendHasExecutedInitialListen = false;
 bool       Trace_notify = false;
 
 /* local function prototypes */
-static bool asyncQueuePagePrecedesPhysically(int p, int q);
-static bool asyncQueuePagePrecedesLogically(int p, int q);
+static bool asyncQueuePagePrecedes(int p, int q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
 static void Exec_ListenPreCommit(void);
@@ -388,25 +387,11 @@ static void NotifyMyFrontEnd(const char *channel,
 static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
 static void ClearPendingActionsAndNotifies(void);
 
-
 /*
  * We will work on the page range of 0..QUEUE_MAX_PAGE.
- *
- * asyncQueuePagePrecedesPhysically just checks numerically without any magic
- * if one page precedes another one.  This is wrong for normal operation but
- * is helpful when clearing pg_notify/ during startup.
- *
- * asyncQueuePagePrecedesLogically compares using wraparound logic, as is
- * required by slru.c.
  */
 static bool
-asyncQueuePagePrecedesPhysically(int p, int q)
-{
-   return p < q;
-}
-
-static bool
-asyncQueuePagePrecedesLogically(int p, int q)
+asyncQueuePagePrecedes(int p, int q)
 {
    int         diff;
 
@@ -484,7 +469,7 @@ AsyncShmemInit(void)
    /*
     * Set up SLRU management of the pg_notify data.
     */
-   AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
+   AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
    SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
                  AsyncCtlLock, "pg_notify");
    /* Override default assumption that writes should be fsync'd */
@@ -494,15 +479,8 @@ AsyncShmemInit(void)
    {
        /*
         * During start or reboot, clean out the pg_notify directory.
-        *
-        * Since we want to remove every file, we temporarily use
-        * asyncQueuePagePrecedesPhysically() and pass INT_MAX as the
-        * comparison value; every file in the directory should therefore
-        * appear to be less than that.
         */
-       AsyncCtl->PagePrecedes = asyncQueuePagePrecedesPhysically;
-       (void) SlruScanDirectory(AsyncCtl, INT_MAX, true);
-       AsyncCtl->PagePrecedes = asyncQueuePagePrecedesLogically;
+       (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL);
 
        /* Now initialize page zero to empty */
        LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
@@ -1223,7 +1201,7 @@ asyncQueueIsFull(void)
        nexthead = 0;           /* wrap around */
    boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
    boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
-   return asyncQueuePagePrecedesLogically(nexthead, boundary);
+   return asyncQueuePagePrecedes(nexthead, boundary);
 }
 
 /*
@@ -2074,7 +2052,7 @@ asyncQueueAdvanceTail(void)
     */
    newtailpage = QUEUE_POS_PAGE(min);
    boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
-   if (asyncQueuePagePrecedesLogically(oldtailpage, boundary))
+   if (asyncQueuePagePrecedes(oldtailpage, boundary))
    {
        /*
         * SimpleLruTruncate() will ask for AsyncCtlLock but will also release
index c491b7d5f9fb70a4b75a20b28ea2a11c4f0da825..e48743f55d7e6e3e00df0beb6f0c6d7c13dacbd4 100644 (file)
@@ -145,6 +145,15 @@ extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno,
 extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
 extern void SimpleLruFlush(SlruCtl ctl, bool checkpoint);
 extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage);
-extern bool SlruScanDirectory(SlruCtl ctl, int cutoffPage, bool doDeletions);
+
+typedef bool (*SlruScanCallback) (SlruCtl ctl, char *filename, int segpage,
+                    void *data);
+extern bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data);
+
+/* SlruScanDirectory public callbacks */
+extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename,
+                           int segpage, void *data);
+extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage,
+                      void *data);
 
 #endif   /* SLRU_H */