Port single-page btree vacuum logic to hash indexes.
authorRobert Haas <[email protected]>
Thu, 16 Mar 2017 02:18:56 +0000 (22:18 -0400)
committerRobert Haas <[email protected]>
Thu, 16 Mar 2017 02:18:56 +0000 (22:18 -0400)
This is advantageous for hash indexes for the same reasons it's good
for btrees: it accelerates space recycling, reducing bloat.

Ashutosh Sharma, reviewed by Amit Kapila and by me.  A bit of
additional hacking by me.

Discussion: http://postgr.es/m/CAE9k0PkRSyzx8dOnokEpUi2A-RFZK72WN0h9DEMv_ut9q6bPRw@mail.gmail.com

src/backend/access/hash/README
src/backend/access/hash/hash.c
src/backend/access/hash/hash_xlog.c
src/backend/access/hash/hashinsert.c
src/backend/access/hash/hashsearch.c
src/backend/access/hash/hashsort.c
src/backend/access/hash/hashutil.c
src/backend/access/rmgrdesc/hashdesc.c
src/include/access/hash.h
src/include/access/hash_xlog.h

index 53b0e0def1536bf670470a9d498b7c9e0fec2d2f..15414383540f04e7a6c8a67166efd4dfbe15086b 100644 (file)
@@ -284,7 +284,10 @@ The insertion algorithm is rather similar:
        if we get the lock on both the buckets
            finish the split using algorithm mentioned below for split
        release the pin on old bucket and restart the insert from beginning.
-   if current page is full, release lock but not pin, read/exclusive-lock
+   if current page is full, first check if this page contains any dead tuples.
+   if yes, remove dead tuples from the current page and again check for the
+   availability of the space. If enough space found, insert the tuple else
+   release lock but not pin, read/exclusive-lock
      next page; repeat as needed
    >> see below if no space in any page of bucket
    take buffer content lock in exclusive mode on metapage
index 641676964bb351fb8ff1888dbb066748a4996873..cfcec3475d420effd4eeb87051cb4e7618c4dca7 100644 (file)
@@ -36,6 +36,7 @@ typedef struct
 {
    HSpool     *spool;          /* NULL if not using spooling */
    double      indtuples;      /* # tuples accepted into index */
+   Relation    heapRel;        /* heap relation descriptor */
 } HashBuildState;
 
 static void hashbuildCallback(Relation index,
@@ -154,6 +155,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 
    /* prepare to build the index */
    buildstate.indtuples = 0;
+   buildstate.heapRel = heap;
 
    /* do the heap scan */
    reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
@@ -162,7 +164,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
    if (buildstate.spool)
    {
        /* sort the tuples and insert them into the index */
-       _h_indexbuild(buildstate.spool);
+       _h_indexbuild(buildstate.spool, buildstate.heapRel);
        _h_spooldestroy(buildstate.spool);
    }
 
@@ -218,7 +220,7 @@ hashbuildCallback(Relation index,
        itup = index_form_tuple(RelationGetDescr(index),
                                index_values, index_isnull);
        itup->t_tid = htup->t_self;
-       _hash_doinsert(index, itup);
+       _hash_doinsert(index, itup, buildstate->heapRel);
        pfree(itup);
    }
 
@@ -251,7 +253,7 @@ hashinsert(Relation rel, Datum *values, bool *isnull,
    itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
    itup->t_tid = *ht_ctid;
 
-   _hash_doinsert(rel, itup);
+   _hash_doinsert(rel, itup, heapRel);
 
    pfree(itup);
 
@@ -331,14 +333,24 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
        if (scan->kill_prior_tuple)
        {
            /*
-            * Yes, so mark it by setting the LP_DEAD state in the item flags.
+            * Yes, so remember it for later. (We'll deal with all such
+            * tuples at once right after leaving the index page or at
+            * end of scan.) In case if caller reverses the indexscan
+            * direction it is quite possible that the same item might
+            * get entered multiple times. But, we don't detect that;
+            * instead, we just forget any excess entries.
             */
-           ItemIdMarkDead(PageGetItemId(page, offnum));
+           if (so->killedItems == NULL)
+               so->killedItems = palloc(MaxIndexTuplesPerPage *
+                                        sizeof(HashScanPosItem));
 
-           /*
-            * Since this can be redone later if needed, mark as a hint.
-            */
-           MarkBufferDirtyHint(buf, true);
+           if (so->numKilled < MaxIndexTuplesPerPage)
+           {
+               so->killedItems[so->numKilled].heapTid = so->hashso_heappos;
+               so->killedItems[so->numKilled].indexOffset =
+                           ItemPointerGetOffsetNumber(&(so->hashso_curpos));
+               so->numKilled++;
+           }
        }
 
        /*
@@ -446,6 +458,9 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
    so->hashso_buc_populated = false;
    so->hashso_buc_split = false;
 
+   so->killedItems = NULL;
+   so->numKilled = 0;
+
    scan->opaque = so;
 
    return scan;
@@ -461,6 +476,10 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
    HashScanOpaque so = (HashScanOpaque) scan->opaque;
    Relation    rel = scan->indexRelation;
 
+   /* Before leaving current page, deal with any killed items */
+   if (so->numKilled > 0)
+       _hash_kill_items(scan);
+
    _hash_dropscanbuf(rel, so);
 
    /* set position invalid (this will cause _hash_first call) */
@@ -488,8 +507,14 @@ hashendscan(IndexScanDesc scan)
    HashScanOpaque so = (HashScanOpaque) scan->opaque;
    Relation    rel = scan->indexRelation;
 
+   /* Before leaving current page, deal with any killed items */
+   if (so->numKilled > 0)
+       _hash_kill_items(scan);
+
    _hash_dropscanbuf(rel, so);
 
+   if (so->killedItems != NULL)
+       pfree(so->killedItems);
    pfree(so);
    scan->opaque = NULL;
 }
@@ -848,6 +873,16 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
 
            PageIndexMultiDelete(page, deletable, ndeletable);
            bucket_dirty = true;
+
+           /*
+            * Let us mark the page as clean if vacuum removes the DEAD tuples
+            * from an index page. We do this by clearing LH_PAGE_HAS_DEAD_TUPLES
+            * flag. Clearing this flag is just a hint; replay won't redo this.
+            */
+           if (tuples_removed && *tuples_removed > 0 &&
+               opaque->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
+               opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
            MarkBufferDirty(buf);
 
            /* XLOG stuff */
index 0c830ab595268390b720a0a79e6fefbe4a51a9d1..8647e8c6adca5cb9dfbda70d87f20c44cf5819b8 100644 (file)
  */
 #include "postgres.h"
 
+#include "access/heapam_xlog.h"
 #include "access/bufmask.h"
 #include "access/hash.h"
 #include "access/hash_xlog.h"
 #include "access/xlogutils.h"
+#include "access/xlog.h"
+#include "access/transam.h"
+#include "storage/procarray.h"
+#include "miscadmin.h"
 
 /*
  * replay a hash index meta page
@@ -915,6 +920,235 @@ hash_xlog_update_meta_page(XLogReaderState *record)
        UnlockReleaseBuffer(metabuf);
 }
 
+/*
+ * Get the latestRemovedXid from the heap pages pointed at by the index
+ * tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
+ * on which this function is based.
+ */
+static TransactionId
+hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
+{
+   xl_hash_vacuum_one_page *xlrec;
+   OffsetNumber    *unused;
+   Buffer      ibuffer,
+               hbuffer;
+   Page        ipage,
+               hpage;
+   RelFileNode rnode;
+   BlockNumber blkno;
+   ItemId      iitemid,
+               hitemid;
+   IndexTuple  itup;
+   HeapTupleHeader htuphdr;
+   BlockNumber hblkno;
+   OffsetNumber    hoffnum;
+   TransactionId   latestRemovedXid = InvalidTransactionId;
+   int     i;
+   char *ptr;
+   Size len;
+
+   xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+
+   /*
+    * If there's nothing running on the standby we don't need to derive a
+    * full latestRemovedXid value, so use a fast path out of here.  This
+    * returns InvalidTransactionId, and so will conflict with all HS
+    * transactions; but since we just worked out that that's zero people,
+    * it's OK.
+    *
+    * XXX There is a race condition here, which is that a new backend might
+    * start just after we look.  If so, it cannot need to conflict, but this
+    * coding will result in throwing a conflict anyway.
+    */
+   if (CountDBBackends(InvalidOid) == 0)
+       return latestRemovedXid;
+
+   /*
+    * Get index page.  If the DB is consistent, this should not fail, nor
+    * should any of the heap page fetches below.  If one does, we return
+    * InvalidTransactionId to cancel all HS transactions.  That's probably
+    * overkill, but it's safe, and certainly better than panicking here.
+    */
+   XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno);
+   ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
+
+   if (!BufferIsValid(ibuffer))
+       return InvalidTransactionId;
+   LockBuffer(ibuffer, HASH_READ);
+   ipage = (Page) BufferGetPage(ibuffer);
+
+   /*
+    * Loop through the deleted index items to obtain the TransactionId from
+    * the heap items they point to.
+    */
+   ptr = XLogRecGetBlockData(record, 1, &len);
+
+   unused = (OffsetNumber *) ptr;
+
+   for (i = 0; i < xlrec->ntuples; i++)
+   {
+       /*
+        * Identify the index tuple about to be deleted.
+        */
+       iitemid = PageGetItemId(ipage, unused[i]);
+       itup = (IndexTuple) PageGetItem(ipage, iitemid);
+
+       /*
+        * Locate the heap page that the index tuple points at
+        */
+       hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
+       hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
+                                        hblkno, RBM_NORMAL);
+
+       if (!BufferIsValid(hbuffer))
+       {
+           UnlockReleaseBuffer(ibuffer);
+           return InvalidTransactionId;
+       }
+       LockBuffer(hbuffer, HASH_READ);
+       hpage = (Page) BufferGetPage(hbuffer);
+
+       /*
+        * Look up the heap tuple header that the index tuple points at by
+        * using the heap node supplied with the xlrec. We can't use
+        * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
+        * Note that we are not looking at tuple data here, just headers.
+        */
+       hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
+       hitemid = PageGetItemId(hpage, hoffnum);
+
+       /*
+        * Follow any redirections until we find something useful.
+        */
+       while (ItemIdIsRedirected(hitemid))
+       {
+           hoffnum = ItemIdGetRedirect(hitemid);
+           hitemid = PageGetItemId(hpage, hoffnum);
+           CHECK_FOR_INTERRUPTS();
+       }
+
+       /*
+        * If the heap item has storage, then read the header and use that to
+        * set latestRemovedXid.
+        *
+        * Some LP_DEAD items may not be accessible, so we ignore them.
+        */
+       if (ItemIdHasStorage(hitemid))
+       {
+           htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
+           HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
+       }
+       else if (ItemIdIsDead(hitemid))
+       {
+           /*
+            * Conjecture: if hitemid is dead then it had xids before the xids
+            * marked on LP_NORMAL items. So we just ignore this item and move
+            * onto the next, for the purposes of calculating
+            * latestRemovedxids.
+            */
+       }
+       else
+           Assert(!ItemIdIsUsed(hitemid));
+
+       UnlockReleaseBuffer(hbuffer);
+   }
+
+   UnlockReleaseBuffer(ibuffer);
+
+   /*
+    * If all heap tuples were LP_DEAD then we will be returning
+    * InvalidTransactionId here, which avoids conflicts. This matches
+    * existing logic which assumes that LP_DEAD tuples must already be older
+    * than the latestRemovedXid on the cleanup record that set them as
+    * LP_DEAD, hence must already have generated a conflict.
+    */
+   return latestRemovedXid;
+}
+
+/*
+ * replay delete operation in hash index to remove
+ * tuples marked as DEAD during index tuple insertion.
+ */
+static void
+hash_xlog_vacuum_one_page(XLogReaderState *record)
+{
+   XLogRecPtr lsn = record->EndRecPtr;
+   xl_hash_vacuum_one_page *xldata;
+   Buffer buffer;
+   Buffer metabuf;
+   Page page;
+   XLogRedoAction action;
+
+   xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
+
+   /*
+    * If we have any conflict processing to do, it must happen before we
+    * update the page.
+    *
+    * Hash index records that are marked as LP_DEAD and being removed during
+    * hash index tuple insertion can conflict with standby queries. You might
+    * think that vacuum records would conflict as well, but we've handled
+    * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
+    * cleaned by the vacuum of the heap and so we can resolve any conflicts
+    * just once when that arrives.  After that we know that no conflicts
+    * exist from individual hash index vacuum records on that index.
+    */
+   if (InHotStandby)
+   {
+       TransactionId latestRemovedXid =
+                   hash_xlog_vacuum_get_latestRemovedXid(record);
+       RelFileNode rnode;
+
+       XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
+       ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+   }
+
+   action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
+
+   if (action == BLK_NEEDS_REDO)
+   {
+       char *ptr;
+       Size len;
+
+       ptr = XLogRecGetBlockData(record, 0, &len);
+
+       page = (Page) BufferGetPage(buffer);
+
+       if (len > 0)
+       {
+           OffsetNumber *unused;
+           OffsetNumber *unend;
+
+           unused = (OffsetNumber *) ptr;
+           unend = (OffsetNumber *) ((char *) ptr + len);
+
+           if ((unend - unused) > 0)
+               PageIndexMultiDelete(page, unused, unend - unused);
+       }
+
+       PageSetLSN(page, lsn);
+       MarkBufferDirty(buffer);
+   }
+   if (BufferIsValid(buffer))
+       UnlockReleaseBuffer(buffer);
+
+   if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
+   {
+       Page metapage;
+       HashMetaPage metap;
+
+       metapage = BufferGetPage(metabuf);
+       metap = HashPageGetMeta(metapage);
+
+       metap->hashm_ntuples -= xldata->ntuples;
+
+       PageSetLSN(metapage, lsn);
+       MarkBufferDirty(metabuf);
+   }
+   if (BufferIsValid(metabuf))
+       UnlockReleaseBuffer(metabuf);
+}
+
 void
 hash_redo(XLogReaderState *record)
 {
@@ -958,6 +1192,9 @@ hash_redo(XLogReaderState *record)
        case XLOG_HASH_UPDATE_META_PAGE:
            hash_xlog_update_meta_page(record);
            break;
+       case XLOG_HASH_VACUUM_ONE_PAGE:
+           hash_xlog_vacuum_one_page(record);
+           break;
        default:
            elog(PANIC, "hash_redo: unknown op code %u", info);
    }
index 241728fe6b1db7ea0f3e273c0e68f1a07e536694..8b6d0a0ff7821bbbe2717369e21f2108d06a9e65 100644 (file)
 
 #include "access/hash.h"
 #include "access/hash_xlog.h"
+#include "access/heapam.h"
 #include "miscadmin.h"
 #include "utils/rel.h"
+#include "storage/lwlock.h"
+#include "storage/buf_internals.h"
 
+static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+                                 RelFileNode hnode);
 
 /*
  * _hash_doinsert() -- Handle insertion of a single index tuple.
@@ -28,7 +33,7 @@
  *     and hashinsert.  By here, itup is completely filled in.
  */
 void
-_hash_doinsert(Relation rel, IndexTuple itup)
+_hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
 {
    Buffer      buf = InvalidBuffer;
    Buffer      bucket_buf;
@@ -118,10 +123,30 @@ restart_insert:
    /* Do the insertion */
    while (PageGetFreeSpace(page) < itemsz)
    {
+       BlockNumber nextblkno;
+
+       /*
+        * Check if current page has any DEAD tuples. If yes,
+        * delete these tuples and see if we can get a space for
+        * the new item to be inserted before moving to the next
+        * page in the bucket chain.
+        */
+       if (H_HAS_DEAD_TUPLES(pageopaque))
+       {
+
+           if (IsBufferCleanupOK(buf))
+           {
+               _hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);
+
+               if (PageGetFreeSpace(page) >= itemsz)
+                   break;              /* OK, now we have enough space */
+           }
+       }
+
        /*
         * no space on this page; check for an overflow page
         */
-       BlockNumber nextblkno = pageopaque->hasho_nextblkno;
+       nextblkno = pageopaque->hasho_nextblkno;
 
        if (BlockNumberIsValid(nextblkno))
        {
@@ -157,7 +182,7 @@ restart_insert:
            Assert(PageGetFreeSpace(page) >= itemsz);
        }
        pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
-       Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE);
+       Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
        Assert(pageopaque->hasho_bucket == bucket);
    }
 
@@ -300,3 +325,93 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
                 RelationGetRelationName(rel));
    }
 }
+
+/*
+ * _hash_vacuum_one_page - vacuum just one index page.
+ *
+ * Try to remove LP_DEAD items from the given page. We must acquire cleanup
+ * lock on the page being modified before calling this function.
+ */
+
+static void
+_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
+                     RelFileNode hnode)
+{
+   OffsetNumber    deletable[MaxOffsetNumber];
+   int ndeletable = 0;
+   OffsetNumber offnum,
+                maxoff;
+   Page    page = BufferGetPage(buf);
+   HashPageOpaque  pageopaque;
+   HashMetaPage    metap;
+   double tuples_removed = 0;
+
+   /* Scan each tuple in page to see if it is marked as LP_DEAD */
+   maxoff = PageGetMaxOffsetNumber(page);
+   for (offnum = FirstOffsetNumber;
+        offnum <= maxoff;
+        offnum = OffsetNumberNext(offnum))
+   {
+       ItemId  itemId = PageGetItemId(page, offnum);
+
+       if (ItemIdIsDead(itemId))
+       {
+           deletable[ndeletable++] = offnum;
+           tuples_removed += 1;
+       }
+   }
+
+   if (ndeletable > 0)
+   {
+       /*
+        * Write-lock the meta page so that we can decrement
+        * tuple count.
+        */
+       LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
+
+       /* No ereport(ERROR) until changes are logged */
+       START_CRIT_SECTION();
+
+       PageIndexMultiDelete(page, deletable, ndeletable);
+
+       pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
+       pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
+
+       metap = HashPageGetMeta(BufferGetPage(metabuf));
+       metap->hashm_ntuples -= tuples_removed;
+
+       MarkBufferDirty(buf);
+       MarkBufferDirty(metabuf);
+
+       /* XLOG stuff */
+       if (RelationNeedsWAL(rel))
+       {
+           xl_hash_vacuum_one_page xlrec;
+           XLogRecPtr  recptr;
+
+           xlrec.hnode = hnode;
+           xlrec.ntuples = tuples_removed;
+
+           XLogBeginInsert();
+           XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
+
+           XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
+           XLogRegisterBufData(0, (char *) deletable,
+                       ndeletable * sizeof(OffsetNumber));
+
+           XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
+
+           recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
+
+           PageSetLSN(BufferGetPage(buf), recptr);
+           PageSetLSN(BufferGetPage(metabuf), recptr);
+       }
+
+       END_CRIT_SECTION();
+       /*
+        * Releasing write lock on meta page as we have updated
+        * the tuple count.
+        */
+       LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
+   }
+}
index d7337703b0b17c870609fd1ecd1ffa2d4cfc95ec..2d9204903fac91346958e21758ea19ee013cddb1 100644 (file)
@@ -465,6 +465,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
                            break;      /* yes, so exit for-loop */
                    }
 
+                   /* Before leaving current page, deal with any killed items */
+                   if (so->numKilled > 0)
+                       _hash_kill_items(scan);
+
                    /*
                     * ran off the end of this page, try the next
                     */
@@ -518,6 +522,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
                            break;      /* yes, so exit for-loop */
                    }
 
+                   /* Before leaving current page, deal with any killed items */
+                   if (so->numKilled > 0)
+                       _hash_kill_items(scan);
+
                    /*
                     * ran off the end of this page, try the next
                     */
index ea8f109a575d09573a032deef841bf248192ff64..0e0f3937117637ac866382da486fc59559576099 100644 (file)
@@ -101,7 +101,7 @@ _h_spool(HSpool *hspool, ItemPointer self, Datum *values, bool *isnull)
  * create an entire index.
  */
 void
-_h_indexbuild(HSpool *hspool)
+_h_indexbuild(HSpool *hspool, Relation heapRel)
 {
    IndexTuple  itup;
 #ifdef USE_ASSERT_CHECKING
@@ -126,6 +126,6 @@ _h_indexbuild(HSpool *hspool)
        Assert(hashkey >= lasthashkey);
 #endif
 
-       _hash_doinsert(hspool->index, itup);
+       _hash_doinsert(hspool->index, itup, heapRel);
    }
 }
index c705531f04a813d3c710cc17ffc4d9c784567208..2e9971920bca9f4b77e9d0059e4dfafd17090a6e 100644 (file)
@@ -19,6 +19,7 @@
 #include "access/relscan.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "storage/buf_internals.h"
 
 #define CALC_NEW_BUCKET(old_bucket, lowmask) \
            old_bucket | (lowmask + 1)
@@ -446,3 +447,70 @@ _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
 
    return new_bucket;
 }
+
+/*
+ * _hash_kill_items - set LP_DEAD state for items an indexscan caller has
+ * told us were killed.
+ *
+ * scan->opaque, referenced locally through so, contains information about the
+ * current page and killed tuples thereon (generally, this should only be
+ * called if so->numKilled > 0).
+ *
+ * We match items by heap TID before assuming they are the right ones to
+ * delete.
+ */
+void
+_hash_kill_items(IndexScanDesc scan)
+{
+   HashScanOpaque  so = (HashScanOpaque) scan->opaque;
+   Page    page;
+   HashPageOpaque  opaque;
+   OffsetNumber    offnum, maxoff;
+   int numKilled = so->numKilled;
+   int     i;
+   bool    killedsomething = false;
+
+   Assert(so->numKilled > 0);
+   Assert(so->killedItems != NULL);
+
+   /*
+    * Always reset the scan state, so we don't look for same
+    * items on other pages.
+    */
+   so->numKilled = 0;
+
+   page = BufferGetPage(so->hashso_curbuf);
+   opaque = (HashPageOpaque) PageGetSpecialPointer(page);
+   maxoff = PageGetMaxOffsetNumber(page);
+
+   for (i = 0; i < numKilled; i++)
+   {
+       offnum = so->killedItems[i].indexOffset;
+
+       while (offnum <= maxoff)
+       {
+           ItemId  iid = PageGetItemId(page, offnum);
+           IndexTuple  ituple = (IndexTuple) PageGetItem(page, iid);
+
+           if (ItemPointerEquals(&ituple->t_tid, &so->killedItems[i].heapTid))
+           {
+               /* found the item */
+               ItemIdMarkDead(iid);
+               killedsomething = true;
+               break;      /* out of inner search loop */
+           }
+           offnum = OffsetNumberNext(offnum);
+       }
+   }
+
+   /*
+    * Since this can be redone later if needed, mark as dirty hint.
+    * Whenever we mark anything LP_DEAD, we also set the page's
+    * LH_PAGE_HAS_DEAD_TUPLES flag, which is likewise just a hint.
+    */
+   if (killedsomething)
+   {
+       opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
+       MarkBufferDirtyHint(so->hashso_curbuf, true);
+   }
+}
index f1cc9ff9514a9dd94d58be192f66a08acc1311a8..5bd5c8dc0103cdf5902f32fd00bbd131d90529df 100644 (file)
@@ -154,6 +154,8 @@ hash_identify(uint8 info)
        case XLOG_HASH_UPDATE_META_PAGE:
            id = "UPDATE_META_PAGE";
            break;
+       case XLOG_HASH_VACUUM_ONE_PAGE:
+           id = "VACUUM_ONE_PAGE";
    }
 
    return id;
index bfdfed8657f58ec3e5e18b1e39e4ac25f1880620..eb1df57291bade4f633fd70372a01e945c66377e 100644 (file)
@@ -57,6 +57,7 @@ typedef uint32 Bucket;
 #define LH_BUCKET_BEING_POPULATED  (1 << 4)
 #define LH_BUCKET_BEING_SPLIT  (1 << 5)
 #define LH_BUCKET_NEEDS_SPLIT_CLEANUP  (1 << 6)
+#define LH_PAGE_HAS_DEAD_TUPLES    (1 << 7)
 
 #define LH_PAGE_TYPE \
    (LH_OVERFLOW_PAGE|LH_BUCKET_PAGE|LH_BITMAP_PAGE|LH_META_PAGE)
@@ -86,6 +87,7 @@ typedef HashPageOpaqueData *HashPageOpaque;
 #define H_NEEDS_SPLIT_CLEANUP(opaque)  ((opaque)->hasho_flag & LH_BUCKET_NEEDS_SPLIT_CLEANUP)
 #define H_BUCKET_BEING_SPLIT(opaque)   ((opaque)->hasho_flag & LH_BUCKET_BEING_SPLIT)
 #define H_BUCKET_BEING_POPULATED(opaque)   ((opaque)->hasho_flag & LH_BUCKET_BEING_POPULATED)
+#define H_HAS_DEAD_TUPLES(opaque)      ((opaque)->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
 
 /*
  * The page ID is for the convenience of pg_filedump and similar utilities,
@@ -95,6 +97,13 @@ typedef HashPageOpaqueData *HashPageOpaque;
  */
 #define HASHO_PAGE_ID      0xFF80
 
+typedef struct HashScanPosItem    /* what we remember about each match */
+{
+   ItemPointerData heapTid;    /* TID of referenced heap item */
+   OffsetNumber indexOffset;   /* index item's location within page */
+} HashScanPosItem;
+
+
 /*
  * HashScanOpaqueData is private state for a hash index scan.
  */
@@ -135,6 +144,9 @@ typedef struct HashScanOpaqueData
     * referred only when hashso_buc_populated is true.
     */
    bool        hashso_buc_split;
+   /* info about killed items if any (killedItems is NULL if never used) */
+   HashScanPosItem *killedItems;   /* tids and offset numbers of killed items */
+   int         numKilled;          /* number of currently stored items */
 } HashScanOpaqueData;
 
 typedef HashScanOpaqueData *HashScanOpaque;
@@ -300,7 +312,7 @@ extern Datum hash_uint32(uint32 k);
 /* private routines */
 
 /* hashinsert.c */
-extern void _hash_doinsert(Relation rel, IndexTuple itup);
+extern void _hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel);
 extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
               Size itemsize, IndexTuple itup);
 extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
@@ -361,7 +373,7 @@ extern HSpool *_h_spoolinit(Relation heap, Relation index, uint32 num_buckets);
 extern void _h_spooldestroy(HSpool *hspool);
 extern void _h_spool(HSpool *hspool, ItemPointer self,
         Datum *values, bool *isnull);
-extern void _h_indexbuild(HSpool *hspool);
+extern void _h_indexbuild(HSpool *hspool, Relation heapRel);
 
 /* hashutil.c */
 extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
@@ -381,6 +393,7 @@ extern BlockNumber _hash_get_oldblock_from_newbucket(Relation rel, Bucket new_bu
 extern BlockNumber _hash_get_newblock_from_oldbucket(Relation rel, Bucket old_bucket);
 extern Bucket _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
                                   uint32 lowmask, uint32 maxbucket);
+extern void _hash_kill_items(IndexScanDesc scan);
 
 /* hash.c */
 extern void hashbucketcleanup(Relation rel, Bucket cur_bucket,
index 552d6428cad84758e629f343f845a9318b504ab1..dfd923781997e4586b90c492c2dff700d79fc40c 100644 (file)
@@ -44,6 +44,7 @@
 #define XLOG_HASH_UPDATE_META_PAGE 0xB0        /* update meta page after
                                                 * vacuum */
 
+#define XLOG_HASH_VACUUM_ONE_PAGE  0xC0    /* remove dead tuples from index page */
 
 /*
  * xl_hash_split_allocate_page flag values, 8 bits are available.
@@ -250,6 +251,24 @@ typedef struct xl_hash_init_bitmap_page
 #define SizeOfHashInitBitmapPage   \
    (offsetof(xl_hash_init_bitmap_page, bmsize) + sizeof(uint16))
 
+/*
+ * This is what we need for index tuple deletion and to
+ * update the meta page.
+ *
+ * This data record is used for XLOG_HASH_VACUUM_ONE_PAGE
+ *
+ * Backup Blk 0: bucket page
+ * Backup Blk 1: meta page
+ */
+typedef struct xl_hash_vacuum_one_page
+{
+   RelFileNode hnode;
+   double      ntuples;
+}  xl_hash_vacuum_one_page;
+
+#define SizeOfHashVacuumOnePage    \
+   (offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(double))
+
 extern void hash_redo(XLogReaderState *record);
 extern void hash_desc(StringInfo buf, XLogReaderState *record);
 extern const char *hash_identify(uint8 info);