Buffer *fsmbuffer);
static void ConveyorBeltClearSegment(ConveyorBelt *cb, CBSegNo segno,
bool include_first_page);
+static bool ConveyorBeltClearIndexSegmentEntries(ConveyorBelt *cb,
+ Buffer metabuffer,
+ CBSegNo index_segment,
+ CBPageNo index_vacuum_stop_point,
+ CBSegNo *next_index_segment);
static CBSegNo ConveyorBeltFreeOldestIndexSegment(ConveyorBelt *cb,
Buffer metabuffer,
CBSegNo oldest_index_segment,
UnlockReleaseBuffer(metabuffer);
}
-static unsigned
-ConveyorBeltClearIndexEntries(ConveyorBelt *cb, CBSegNo oldest_index_segment,
- CBPageNo index_vacuum_stop_point)
-{
- elog(ERROR, "ConveyorBeltClearIndexEntries not implemented yet");
-}
-
/*
* Recycle segments that are no longer needed.
*
}
else if (obsolete_state == CBM_OBSOLETE_SEGMENT_ENTRIES)
{
- unsigned empty_index_segments;
+ unsigned empty_index_segments = 0;
+ CBSegNo index_segment = oldest_index_segment;
/*
* Do this part just once. A single pass through the logic below
* Clear as many obsolete index entries out of index segments as
* we can.
*/
- empty_index_segments =
- ConveyorBeltClearIndexEntries(cb, oldest_index_segment,
- index_vacuum_stop_point);
-
- /*
- * If even the oldest index segment is still partially in use,
- * then all newer ones are needed also, and likewise everything in
- * the metapage, which means no further cleanup is possible.
- */
- if (empty_index_segments == 0)
- {
- ReleaseBuffer(metabuffer);
- return;
- }
+ while (index_segment != CB_INVALID_SEGMENT &&
+ ConveyorBeltClearIndexSegmentEntries(cb, metabuffer,
+ index_segment,
+ index_vacuum_stop_point,
+ &index_segment))
+ ++empty_index_segments;
/*
* Free old index segments.
}
}
+/*
+ * Clear obsolete index entries from a segment.
+ *
+ * metabuffer should be pinned but not locked when this function is called,
+ * and will be in the same state upon return.
+ *
+ * index_segment specifies the target index segment.
+ *
+ * index_vacuum_stop_point defines the point beyond which no index entries
+ * may be removed. If an index entry is found all or part of which would cover
+ * pages greater than or equal to this value, then this function does nothing
+ * further and returns false. If this limit is not reached, this function
+ * returns true.
+ *
+ * *next_index_segment is set to the segment number of the index segment
+ * that follows the one specified by index_segment, or CB_INVALID_SEGMENT
+ * if none.
+ */
+static bool
+ConveyorBeltClearIndexSegmentEntries(ConveyorBelt *cb, Buffer metabuffer,
+ CBSegNo index_segment,
+ CBPageNo index_vacuum_stop_point,
+ CBSegNo *next_index_segment)
+{
+ bool needs_xlog;
+ bool need_next_segment = true;
+ unsigned segoff;
+ BlockNumber fsmblock = InvalidBlockNumber;
+ Buffer fsmbuffer = InvalidBuffer;
+
+ /* Do we need to write XLOG for operations on this conveyor belt? */
+ needs_xlog = RelationNeedsWAL(cb->cb_rel) || cb->cb_fork == INIT_FORKNUM;
+
+ for (segoff = 0; segoff < cb->cb_pages_per_segment; ++segoff)
+ {
+ BlockNumber indexblock;
+ Buffer indexbuffer;
+ Page indexpage;
+ unsigned pageoffset = 0;
+ CBSegNo cleared_segno = CB_INVALID_SEGMENT;
+
+ indexblock = cb_segment_to_block(cb->cb_pages_per_segment,
+ index_segment, segoff);
+ indexbuffer = ConveyorBeltRead(cb, indexblock, BUFFER_LOCK_EXCLUSIVE);
+ indexpage = BufferGetPage(indexbuffer);
+
+ /*
+ * If this is the very first time we've locked an index page in this
+ * segment, it should be the first page, and it will tell us where to
+ * find the next segment once we finish with this one. Grab that
+ * information while we have the page lock.
+ */
+ if (need_next_segment)
+ {
+ Assert(segoff == 0);
+ *next_index_segment = cb_indexpage_get_next_segment(indexpage);
+ need_next_segment = false;
+ }
+
+ /*
+ * Loop over the index entries in this page.
+ *
+ * At the top of each iteration of the loop, the index page is
+ * exclusively locked. The lock may be released and reacquired before
+ * beginning the next iteration.
+ */
+ while (pageoffset < CB_INDEXPAGE_INDEX_ENTRIES)
+ {
+ CBSegNo segno;
+ CBPageNo first_page;
+
+ /* Find, or reconfirm, the location of the next obsolete entry. */
+ segno = cb_indexpage_get_obsolete_entry(indexpage, &pageoffset,
+ &first_page);
+ if (segno == CB_INVALID_SEGMENT)
+ {
+ /* No items remain in this page. */
+ UnlockReleaseBuffer(indexbuffer);
+ break;
+ }
+ if (first_page + (cb->cb_pages_per_segment * pageoffset) +
+ cb->cb_pages_per_segment > index_vacuum_stop_point)
+ {
+ /*
+ * At least one entry from this page is still needed, so no
+ * point in visiting future pages in this index segment, and
+ * no point in visiting any more index segments.
+ */
+ UnlockReleaseBuffer(indexbuffer);
+ return false;
+ }
+
+ /*
+ * If this is the first time we've considered clearing this
+ * particular payload segment, we'll need to release the buffer
+ * lock, do some necessary prep work, reacquire the buffer lock,
+ * and recheck to make sure nothing has changed.
+ */
+ if (segno != cleared_segno)
+ {
+ BlockNumber newfsmblock;
+
+ /* Release lock on index page. */
+ LockBuffer(indexbuffer, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Clear the segment that we want to recycle.
+ *
+ * Note that we could crash or error out while or after doing
+ * this and before we actually recycle the segment. If so,
+ * we'll do it again the next time someone tries to vacuum
+ * this conveyor belt. All of that is fine, because nobody
+ * can be looking at the data any more, and clearing the pages
+ * is idempotent.
+ */
+ ConveyorBeltClearSegment(cb, segno, true);
+
+ /*
+ * Make sure that we have the correct FSM buffer pinned.
+ *
+ * Often, any FSM buffer that we have pinned previously will
+ * still be the correct one, either because segment numbers
+ * allocated around the same time are likely to be close
+ * together numerically, or just because the conveyor belt may
+ * not be big enough to need lots of FSM pages.
+ *
+ * However, in the worst case, this can change every time.
+ */
+ newfsmblock = cb_segment_to_fsm_block(cb->cb_pages_per_segment,
+ segno);
+ if (fsmblock != newfsmblock)
+ {
+ ReleaseBuffer(fsmbuffer);
+ fsmblock = newfsmblock;
+ if (fsmblock == InvalidBlockNumber)
+ fsmbuffer = InvalidBuffer;
+ else
+ fsmbuffer =
+ ReadBufferExtended(cb->cb_rel, cb->cb_fork,
+ fsmblock, RBM_NORMAL, NULL);
+ }
+
+ /* Relock the index page and go around. */
+ LockBuffer(indexbuffer, BUFFER_LOCK_EXCLUSIVE);
+ cleared_segno = segno;
+ continue;
+ }
+
+ /*
+ * Clear the index entry referrring to the payload segment, and
+ * mark the segment free. To do this, we have to grab the lock
+ * on whatever page contains the free/busy state, which could be
+ * either an FSM page or the metapage.
+ */
+ if (fsmblock == InvalidBlockNumber)
+ {
+ LockBuffer(metabuffer, BUFFER_LOCK_EXCLUSIVE);
+ cb_recycle_payload_segment(&RelationGetSmgr(cb->cb_rel)->smgr_rnode.node,
+ cb->cb_fork,
+ metabuffer,
+ indexblock, indexbuffer,
+ InvalidBlockNumber, InvalidBuffer,
+ segno, pageoffset, needs_xlog);
+ LockBuffer(metabuffer, BUFFER_LOCK_UNLOCK);
+ }
+ else
+ {
+ LockBuffer(fsmbuffer, BUFFER_LOCK_EXCLUSIVE);
+ cb_recycle_payload_segment(&RelationGetSmgr(cb->cb_rel)->smgr_rnode.node,
+ cb->cb_fork,
+ InvalidBuffer,
+ indexblock, indexbuffer,
+ fsmblock, fsmbuffer,
+ segno, pageoffset, needs_xlog);
+ LockBuffer(fsmbuffer, BUFFER_LOCK_UNLOCK);
+ }
+
+ /* No need to consider this page offset again. */
+ ++pageoffset;
+
+ /* Now we're no longer prepared to clear any segment. */
+ cleared_segno = CB_INVALID_SEGMENT;
+ }
+ }
+
+ return true;
+}
+
/*
* Attempt to remve the oldest index segment.
*
CBSegNo oldest_remaining_index_segment = CB_INVALID_SEGMENT;
/*
- * Read and pin the first block of the index segment. The others will
- * already have been cleared, but the first one has to stick around until
- * we actually deallocate the segment, so that it remains possible to
- * walk the chain of index segments.
+ * Clear all the blocks in the oldest index segment except for the first.
+ * We must keep the first one until the bitter end, so that it remains
+ * possible to walk the chain of index segments.
+ */
+ ConveyorBeltClearSegment(cb, oldest_index_segment, false);
+
+ /*
+ * Read and pin the first block of the index segment.
*/
needs_xlog = RelationNeedsWAL(cb->cb_rel) || cb->cb_fork == INIT_FORKNUM;
firstindexblock = cb_segment_to_block(cb->cb_pages_per_segment,