}
/*
- * sorting support for PageRepairFragmentation and PageIndexMultiDelete
+ * Tuple defrag support for PageRepairFragmentation and PageIndexMultiDelete
*/
-typedef struct itemIdSortData
+typedef struct itemIdCompactData
{
uint16 offsetindex; /* linp array index */
int16 itemoff; /* page offset of item data */
uint16 alignedlen; /* MAXALIGN(item data len) */
-} itemIdSortData;
-typedef itemIdSortData *itemIdSort;
-
-static int
-itemoffcompare(const void *itemidp1, const void *itemidp2)
-{
- /* Sort in decreasing itemoff order */
- return ((itemIdSort) itemidp2)->itemoff -
- ((itemIdSort) itemidp1)->itemoff;
-}
+} itemIdCompactData;
+typedef itemIdCompactData *itemIdCompact;
/*
* After removing or marking some line pointers unused, move the tuples to
- * remove the gaps caused by the removed items.
+ * remove the gaps caused by the removed items and reorder them back into
+ * reverse line pointer order in the page.
+ *
+ * This function can often be fairly hot, so it pays to take some measures to
+ * make it as optimal as possible.
+ *
+ * Callers may pass 'presorted' as true if the 'itemidbase' array is sorted in
+ * descending order of itemoff. When this is true we can just memmove()
+ * tuples towards the end of the page. This is quite a common case as it's
+ * the order that tuples are initially inserted into pages. When we call this
+ * function to defragment the tuples in the page then any new line pointers
+ * added to the page will keep that presorted order, so hitting this case is
+ * still very common for tables that are commonly updated.
+ *
+ * When the 'itemidbase' array is not presorted then we're unable to just
+ * memmove() tuples around freely. Doing so could cause us to overwrite the
+ * memory belonging to a tuple we've not moved yet. In this case, we copy all
+ * the tuples that need to be moved into a temporary buffer. We can then
+ * simply memcpy() out of that temp buffer back into the page at the correct
+ * location. Tuples are copied back into the page in the same order as the
+ * 'itemidbase' array, so we end up reordering the tuples back into reverse
+ * line pointer order. This will increase the chances of hitting the
+ * presorted case the next time around.
+ *
+ * Callers must ensure that nitems is > 0
*/
static void
-compactify_tuples(itemIdSort itemidbase, int nitems, Page page)
+compactify_tuples(itemIdCompact itemidbase, int nitems, Page page, bool presorted)
{
PageHeader phdr = (PageHeader) page;
Offset upper;
+ Offset copy_tail;
+ Offset copy_head;
+ itemIdCompact itemidptr;
int i;
- /* sort itemIdSortData array into decreasing itemoff order */
- qsort((char *) itemidbase, nitems, sizeof(itemIdSortData),
- itemoffcompare);
+ /* Code within will not work correctly if nitems == 0 */
+ Assert(nitems > 0);
- upper = phdr->pd_special;
- for (i = 0; i < nitems; i++)
+ if (presorted)
{
- itemIdSort itemidptr = &itemidbase[i];
- ItemId lp;
- lp = PageGetItemId(page, itemidptr->offsetindex + 1);
- upper -= itemidptr->alignedlen;
+#ifdef USE_ASSERT_CHECKING
+ {
+ /*
+ * Verify we've not gotten any new callers that are incorrectly
+ * passing a true presorted value.
+ */
+ Offset lastoff = phdr->pd_special;
+
+ for (i = 0; i < nitems; i++)
+ {
+ itemidptr = &itemidbase[i];
+
+ Assert(lastoff > itemidptr->itemoff);
+
+ lastoff = itemidptr->itemoff;
+ }
+ }
+#endif /* USE_ASSERT_CHECKING */
+
+ /*
+ * 'itemidbase' is already in the optimal order, i.e, lower item
+ * pointers have a higher offset. This allows us to memmove() the
+ * tuples up to the end of the page without having to worry about
+ * overwriting other tuples that have not been moved yet.
+ *
+ * There's a good chance that there are tuples already right at the
+ * end of the page that we can simply skip over because they're
+ * already in the correct location within the page. We'll do that
+ * first...
+ */
+ upper = phdr->pd_special;
+ i = 0;
+ do
+ {
+ itemidptr = &itemidbase[i];
+ if (upper != itemidptr->itemoff + itemidptr->alignedlen)
+ break;
+ upper -= itemidptr->alignedlen;
+
+ i++;
+ } while (i < nitems);
+
+ /*
+ * Now that we've found the first tuple that needs to be moved, we can
+ * do the tuple compactification. We try and make the least number of
+ * memmove() calls and only call memmove() when there's a gap. When
+ * we see a gap we just move all tuples after the gap up until the
+ * point of the last move operation.
+ */
+ copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
+ for (; i < nitems; i++)
+ {
+ ItemId lp;
+
+ itemidptr = &itemidbase[i];
+ lp = PageGetItemId(page, itemidptr->offsetindex + 1);
+
+ if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
+ {
+ memmove((char *) page + upper,
+ page + copy_head,
+ copy_tail - copy_head);
+
+ /*
+ * We've now moved all tuples already seen, but not the
+ * current tuple, so we set the copy_tail to the end of this
+ * tuple so it can be moved in another iteration of the loop.
+ */
+ copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
+ }
+ /* shift the target offset down by the length of this tuple */
+ upper -= itemidptr->alignedlen;
+ /* point the copy_head to the start of this tuple */
+ copy_head = itemidptr->itemoff;
+
+ /* update the line pointer to reference the new offset */
+ lp->lp_off = upper;
+
+ }
+
+ /* move the remaining tuples. */
memmove((char *) page + upper,
- (char *) page + itemidptr->itemoff,
- itemidptr->alignedlen);
- lp->lp_off = upper;
+ page + copy_head,
+ copy_tail - copy_head);
+ }
+ else
+ {
+ PGAlignedBlock scratch;
+ char *scratchptr = scratch.data;
+
+ /*
+ * Non-presorted case: The tuples in the itemidbase array may be in
+ * any order. So, in order to move these to the end of the page we
+ * must make a temp copy of each tuple that needs to be moved before
+ * we copy them back into the page at the new offset.
+ *
+ * If a large percentage of tuples have been pruned (>75%) then we'll
+ * copy these into the temp buffer tuple-by-tuple, otherwise, we'll
+ * just do a single memcpy() for all tuples that need to be moved.
+ * When so many tuples have been removed there's likely to be a lot of
+ * gaps and it's unlikely that many non-movable tuples remain at the
+ * end of the page.
+ */
+ if (nitems < PageGetMaxOffsetNumber(page) / 4)
+ {
+ i = 0;
+ do
+ {
+ itemidptr = &itemidbase[i];
+ memcpy(scratchptr + itemidptr->itemoff, page + itemidptr->itemoff,
+ itemidptr->alignedlen);
+ i++;
+ } while (i < nitems);
+
+ /* Set things up for the compactification code below */
+ i = 0;
+ itemidptr = &itemidbase[0];
+ upper = phdr->pd_special;
+ }
+ else
+ {
+ upper = phdr->pd_special;
+
+ /*
+ * Many tuples are likely to already be in the correct location.
+ * There's no need to copy these into the temp buffer. Instead
+ * we'll just skip forward in the itemidbase array to the position
+ * that we do need to move tuples from so that the code below just
+ * leaves these ones alone.
+ */
+ i = 0;
+ do
+ {
+ itemidptr = &itemidbase[i];
+ if (upper != itemidptr->itemoff + itemidptr->alignedlen)
+ break;
+ upper -= itemidptr->alignedlen;
+
+ i++;
+ } while (i < nitems);
+
+ /* Copy all tuples that need to be moved into the temp buffer */
+ memcpy(scratchptr + phdr->pd_upper,
+ page + phdr->pd_upper,
+ upper - phdr->pd_upper);
+ }
+
+ /*
+ * Do the tuple compactification. itemidptr is already pointing to
+ * the first tuple that we're going to move. Here we collapse the
+ * memcpy calls for adjacent tuples into a single call. This is done
+ * by delaying the memcpy call until we find a gap that needs to be
+ * closed.
+ */
+ copy_tail = copy_head = itemidptr->itemoff + itemidptr->alignedlen;
+ for (; i < nitems; i++)
+ {
+ ItemId lp;
+
+ itemidptr = &itemidbase[i];
+ lp = PageGetItemId(page, itemidptr->offsetindex + 1);
+
+ /* copy pending tuples when we detect a gap */
+ if (copy_head != itemidptr->itemoff + itemidptr->alignedlen)
+ {
+ memcpy((char *) page + upper,
+ scratchptr + copy_head,
+ copy_tail - copy_head);
+
+ /*
+ * We've now copied all tuples already seen, but not the
+ * current tuple, so we set the copy_tail to the end of this
+ * tuple.
+ */
+ copy_tail = itemidptr->itemoff + itemidptr->alignedlen;
+ }
+ /* shift the target offset down by the length of this tuple */
+ upper -= itemidptr->alignedlen;
+ /* point the copy_head to the start of this tuple */
+ copy_head = itemidptr->itemoff;
+
+ /* update the line pointer to reference the new offset */
+ lp->lp_off = upper;
+
+ }
+
+ /* Copy the remaining chunk */
+ memcpy((char *) page + upper,
+ scratchptr + copy_head,
+ copy_tail - copy_head);
}
phdr->pd_upper = upper;
Offset pd_lower = ((PageHeader) page)->pd_lower;
Offset pd_upper = ((PageHeader) page)->pd_upper;
Offset pd_special = ((PageHeader) page)->pd_special;
- itemIdSortData itemidbase[MaxHeapTuplesPerPage];
- itemIdSort itemidptr;
+ Offset last_offset;
+ itemIdCompactData itemidbase[MaxHeapTuplesPerPage];
+ itemIdCompact itemidptr;
ItemId lp;
int nline,
nstorage,
nunused;
int i;
Size totallen;
+ bool presorted = true; /* For now */
/*
* It's worth the trouble to be more paranoid here than in most places,
nline = PageGetMaxOffsetNumber(page);
itemidptr = itemidbase;
nunused = totallen = 0;
+ last_offset = pd_special;
for (i = FirstOffsetNumber; i <= nline; i++)
{
lp = PageGetItemId(page, i);
{
itemidptr->offsetindex = i - 1;
itemidptr->itemoff = ItemIdGetOffset(lp);
+
+ if (last_offset > itemidptr->itemoff)
+ last_offset = itemidptr->itemoff;
+ else
+ presorted = false;
+
if (unlikely(itemidptr->itemoff < (int) pd_upper ||
itemidptr->itemoff >= (int) pd_special))
ereport(ERROR,
errmsg("corrupted item lengths: total %u, available space %u",
(unsigned int) totallen, pd_special - pd_lower)));
- compactify_tuples(itemidbase, nstorage, page);
+ compactify_tuples(itemidbase, nstorage, page, presorted);
}
/* Set hint bit for PageAddItem */
Offset pd_lower = phdr->pd_lower;
Offset pd_upper = phdr->pd_upper;
Offset pd_special = phdr->pd_special;
- itemIdSortData itemidbase[MaxIndexTuplesPerPage];
+ Offset last_offset;
+ itemIdCompactData itemidbase[MaxIndexTuplesPerPage];
ItemIdData newitemids[MaxIndexTuplesPerPage];
- itemIdSort itemidptr;
+ itemIdCompact itemidptr;
ItemId lp;
int nline,
nused;
unsigned offset;
int nextitm;
OffsetNumber offnum;
+ bool presorted = true; /* For now */
Assert(nitems <= MaxIndexTuplesPerPage);
totallen = 0;
nused = 0;
nextitm = 0;
+ last_offset = pd_special;
for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
{
lp = PageGetItemId(page, offnum);
{
itemidptr->offsetindex = nused; /* where it will go */
itemidptr->itemoff = offset;
+
+ if (last_offset > itemidptr->itemoff)
+ last_offset = itemidptr->itemoff;
+ else
+ presorted = false;
+
itemidptr->alignedlen = MAXALIGN(size);
totallen += itemidptr->alignedlen;
newitemids[nused] = *lp;
phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
/* and compactify the tuple data */
- compactify_tuples(itemidbase, nused, page);
+ if (nused > 0)
+ compactify_tuples(itemidbase, nused, page, presorted);
+ else
+ phdr->pd_upper = pd_special;
}