More refactoring of heapgettup() and heapgettup_pagemode()
authorDavid Rowley <[email protected]>
Tue, 7 Feb 2023 04:24:07 +0000 (17:24 +1300)
committerDavid Rowley <[email protected]>
Tue, 7 Feb 2023 04:24:07 +0000 (17:24 +1300)
Here we further simplify the code in heapgettup() and
heapgettup_pagemode() to make better use of the helper functions added in
the previous recent refactors in this area.

In passing, remove an unneeded cast added in 8ca6d49f6.

Author: Melanie Plageman
Reviewed-by: Andres Freund, David Rowley
Discussion: https://postgr.es/m/CAAKRu_YSOnhKsDyFcqJsKtBSrd32DP-jjXmv7hL0BPD-z0TGXQ@mail.gmail.com

src/backend/access/heap/heapam.c

index 3bb1d5cff687dff944667604789f59a2945b4069..7eb79cee58d6b26411dd6c388641242d63f33682 100644 (file)
@@ -567,7 +567,7 @@ heapgettup_start_page(HeapScanDesc scan, ScanDirection dir, int *linesleft,
 
    TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
 
-   *linesleft = PageGetMaxOffsetNumber((Page) page) - FirstOffsetNumber + 1;
+   *linesleft = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1;
 
    if (ScanDirectionIsForward(dir))
        *lineoff = FirstOffsetNumber;
@@ -732,34 +732,17 @@ heapgettup(HeapScanDesc scan,
           ScanKey key)
 {
    HeapTuple   tuple = &(scan->rs_ctup);
-   bool        backward = ScanDirectionIsBackward(dir);
    BlockNumber block;
    Page        page;
    OffsetNumber lineoff;
    int         linesleft;
-   ItemId      lpp;
 
    if (unlikely(!scan->rs_inited))
    {
        block = heapgettup_initial_block(scan, dir);
-
-       /*
-        * Check if we have reached the end of the scan already. This could
-        * happen if the table is empty or if the parallel workers have
-        * already finished the scan before we did anything ourselves
-        */
-       if (block == InvalidBlockNumber)
-       {
-           Assert(!BufferIsValid(scan->rs_cbuf));
-           tuple->t_data = NULL;
-           return;
-       }
+       /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
+       Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
        scan->rs_inited = true;
-
-       heapgetpage((TableScanDesc) scan, block);
-
-       LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
-       page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
    }
    else
    {
@@ -768,15 +751,20 @@ heapgettup(HeapScanDesc scan,
 
        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
        page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff);
+       goto continue_page;
    }
 
    /*
     * advance the scan until we find a qualifying tuple or run out of stuff
     * to scan
     */
-   lpp = PageGetItemId(page, lineoff);
-   for (;;)
+   while (block != InvalidBlockNumber)
    {
+       heapgetpage((TableScanDesc) scan, block);
+       LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+       page = heapgettup_start_page(scan, dir, &linesleft, &lineoff);
+continue_page:
+
        /*
         * Only continue scanning the page while we have lines left.
         *
@@ -784,53 +772,39 @@ heapgettup(HeapScanDesc scan,
         * PageGetMaxOffsetNumber(); both for forward scans when we resume the
         * table scan, and for when we start scanning a new page.
         */
-       while (linesleft > 0)
+       for (; linesleft > 0; linesleft--, lineoff += dir)
        {
-           if (ItemIdIsNormal(lpp))
-           {
-               bool        valid;
+           bool        visible;
+           ItemId      lpp = PageGetItemId(page, lineoff);
 
-               tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
-               tuple->t_len = ItemIdGetLength(lpp);
-               ItemPointerSet(&(tuple->t_self), block, lineoff);
+           if (!ItemIdIsNormal(lpp))
+               continue;
 
-               /*
-                * if current tuple qualifies, return it.
-                */
-               valid = HeapTupleSatisfiesVisibility(tuple,
-                                                    scan->rs_base.rs_snapshot,
-                                                    scan->rs_cbuf);
+           tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+           tuple->t_len = ItemIdGetLength(lpp);
+           ItemPointerSet(&(tuple->t_self), block, lineoff);
 
-               HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
-                                                   tuple, scan->rs_cbuf,
-                                                   scan->rs_base.rs_snapshot);
+           visible = HeapTupleSatisfiesVisibility(tuple,
+                                                  scan->rs_base.rs_snapshot,
+                                                  scan->rs_cbuf);
 
-               if (valid && key != NULL)
-                   valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
-                                       nkeys, key);
+           HeapCheckForSerializableConflictOut(visible, scan->rs_base.rs_rd,
+                                               tuple, scan->rs_cbuf,
+                                               scan->rs_base.rs_snapshot);
 
-               if (valid)
-               {
-                   LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
-                   scan->rs_coffset = lineoff;
-                   return;
-               }
-           }
+           /* skip tuples not visible to this snapshot */
+           if (!visible)
+               continue;
 
-           /*
-            * otherwise move to the next item on the page
-            */
-           --linesleft;
-           if (backward)
-           {
-               --lpp;          /* move back in this page's ItemId array */
-               --lineoff;
-           }
-           else
-           {
-               ++lpp;          /* move forward in this page's ItemId array */
-               ++lineoff;
-           }
+           /* skip any tuples that don't match the scan key */
+           if (key != NULL &&
+               !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
+                            nkeys, key))
+               continue;
+
+           LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
+           scan->rs_coffset = lineoff;
+           return;
        }
 
        /*
@@ -841,40 +815,16 @@ heapgettup(HeapScanDesc scan,
 
        /* get the BlockNumber to scan next */
        block = heapgettup_advance_block(scan, block, dir);
+   }
 
-       /*
-        * return NULL if we've exhausted all the pages
-        */
-       if (block == InvalidBlockNumber)
-       {
-           if (BufferIsValid(scan->rs_cbuf))
-               ReleaseBuffer(scan->rs_cbuf);
-           scan->rs_cbuf = InvalidBuffer;
-           scan->rs_cblock = InvalidBlockNumber;
-           tuple->t_data = NULL;
-           scan->rs_inited = false;
-           return;
-       }
-
-       heapgetpage((TableScanDesc) scan, block);
-
-       LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
+   /* end of scan */
+   if (BufferIsValid(scan->rs_cbuf))
+       ReleaseBuffer(scan->rs_cbuf);
 
-       page = BufferGetPage(scan->rs_cbuf);
-       TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd,
-                          page);
-       linesleft = PageGetMaxOffsetNumber(page);
-       if (backward)
-       {
-           lineoff = linesleft;
-           lpp = PageGetItemId(page, linesleft);
-       }
-       else
-       {
-           lineoff = FirstOffsetNumber;
-           lpp = PageGetItemId(page, FirstOffsetNumber);
-       }
-   }
+   scan->rs_cbuf = InvalidBuffer;
+   scan->rs_cblock = InvalidBlockNumber;
+   tuple->t_data = NULL;
+   scan->rs_inited = false;
 }
 
 /* ----------------
@@ -897,35 +847,16 @@ heapgettup_pagemode(HeapScanDesc scan,
                    ScanKey key)
 {
    HeapTuple   tuple = &(scan->rs_ctup);
-   bool        backward = ScanDirectionIsBackward(dir);
    BlockNumber block;
    Page        page;
    int         lineindex;
-   OffsetNumber lineoff;
    int         linesleft;
-   ItemId      lpp;
 
    if (unlikely(!scan->rs_inited))
    {
        block = heapgettup_initial_block(scan, dir);
-
-       /*
-        * Check if we have reached the end of the scan already. This could
-        * happen if the table is empty or if the other workers in a parallel
-        * scan have already finished the scan.
-        */
-       if (block == InvalidBlockNumber)
-       {
-           Assert(!BufferIsValid(scan->rs_cbuf));
-           tuple->t_data = NULL;
-           return;
-       }
-
-       heapgetpage((TableScanDesc) scan, block);
-       page = BufferGetPage(scan->rs_cbuf);
-       TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
-       linesleft = scan->rs_ntuples;
-       lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
+       /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */
+       Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf));
        scan->rs_inited = true;
    }
    else
@@ -940,16 +871,31 @@ heapgettup_pagemode(HeapScanDesc scan,
            linesleft = scan->rs_ntuples - lineindex;
        else
            linesleft = scan->rs_cindex;
+       /* lineindex now references the next or previous visible tid */
+
+       goto continue_page;
    }
 
    /*
     * advance the scan until we find a qualifying tuple or run out of stuff
     * to scan
     */
-   for (;;)
+   while (block != InvalidBlockNumber)
    {
-       while (linesleft > 0)
+       heapgetpage((TableScanDesc) scan, block);
+       page = BufferGetPage(scan->rs_cbuf);
+       TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
+       linesleft = scan->rs_ntuples;
+       lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1;
+
+       /* lineindex now references the next or previous visible tid */
+continue_page:
+
+       for (; linesleft > 0; linesleft--, lineindex += dir)
        {
+           ItemId      lpp;
+           OffsetNumber lineoff;
+
            lineoff = scan->rs_vistuples[lineindex];
            lpp = PageGetItemId(page, lineoff);
            Assert(ItemIdIsNormal(lpp));
@@ -958,64 +904,27 @@ heapgettup_pagemode(HeapScanDesc scan,
            tuple->t_len = ItemIdGetLength(lpp);
            ItemPointerSet(&(tuple->t_self), block, lineoff);
 
-           /*
-            * if current tuple qualifies, return it.
-            */
-           if (key != NULL)
-           {
-               bool        valid;
-
-               valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
-                                   nkeys, key);
-               if (valid)
-               {
-                   scan->rs_cindex = lineindex;
-                   return;
-               }
-           }
-           else
-           {
-               scan->rs_cindex = lineindex;
-               return;
-           }
+           /* skip any tuples that don't match the scan key */
+           if (key != NULL &&
+               !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
+                            nkeys, key))
+               continue;
 
-           /*
-            * otherwise move to the next item on the page
-            */
-           --linesleft;
-           if (backward)
-               --lineindex;
-           else
-               ++lineindex;
+           scan->rs_cindex = lineindex;
+           return;
        }
 
        /* get the BlockNumber to scan next */
        block = heapgettup_advance_block(scan, block, dir);
-
-       /*
-        * return NULL if we've exhausted all the pages
-        */
-       if (block == InvalidBlockNumber)
-       {
-           if (BufferIsValid(scan->rs_cbuf))
-               ReleaseBuffer(scan->rs_cbuf);
-           scan->rs_cbuf = InvalidBuffer;
-           scan->rs_cblock = InvalidBlockNumber;
-           tuple->t_data = NULL;
-           scan->rs_inited = false;
-           return;
-       }
-
-       heapgetpage((TableScanDesc) scan, block);
-
-       page = BufferGetPage(scan->rs_cbuf);
-       TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page);
-       linesleft = scan->rs_ntuples;
-       if (backward)
-           lineindex = linesleft - 1;
-       else
-           lineindex = 0;
    }
+
+   /* end of scan */
+   if (BufferIsValid(scan->rs_cbuf))
+       ReleaseBuffer(scan->rs_cbuf);
+   scan->rs_cbuf = InvalidBuffer;
+   scan->rs_cblock = InvalidBlockNumber;
+   tuple->t_data = NULL;
+   scan->rs_inited = false;
 }