Fix assorted bugs in GIN's WAL replay logic. REL8_3_STABLE
authorTom Lane <[email protected]>
Mon, 11 Oct 2010 23:04:59 +0000 (19:04 -0400)
committerTom Lane <[email protected]>
Mon, 11 Oct 2010 23:04:59 +0000 (19:04 -0400)
The original coding was quite sloppy about handling the case where
XLogReadBuffer fails (because the page has since been deleted).  This
would result in either "bad buffer id: 0" or an Assert failure during
replay, if indeed the page were no longer there.  In a couple of places
it also neglected to check whether the change had already been applied,
which would probably result in corrupted index contents.  I believe that
bug #5703 is an instance of the first problem.  These issues could show up
without replication, but only if you were unfortunate enough to crash
between modification of a GIN index and the next checkpoint.

Back-patch to 8.2, which is as far back as GIN has WAL support.

src/backend/access/gin/ginxlog.c

index 37d04cfb7e99433ee8b5f708c50b05842e7415bb..b7b89e4e49d943fd7387bfc07618a0442a391d32 100644 (file)
@@ -121,22 +121,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
    Buffer      buffer;
    Page        page;
 
+   /* first, forget any incomplete split this insertion completes */
+   if (data->isData)
+   {
+       Assert(data->isDelete == FALSE);
+       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+       {
+           PostingItem *pitem;
+
+           pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+           forgetIncompleteSplit(data->node,
+                                 PostingItemGetBlockNumber(pitem),
+                                 data->updateBlkno);
+       }
+   }
+   else
+   {
+       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+       {
+           IndexTuple  itup;
+
+           itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
+           forgetIncompleteSplit(data->node,
+                                 GinItemPointerGetBlockNumber(&itup->t_tid),
+                                 data->updateBlkno);
+       }
+   }
+
    /* nothing else to do if page was backed up */
    if (record->xl_info & XLR_BKP_BLOCK_1)
        return;
 
    reln = XLogOpenRelation(data->node);
    buffer = XLogReadBuffer(reln, data->blkno, false);
-   Assert(BufferIsValid(buffer));
+   if (!BufferIsValid(buffer))
+       return;
    page = (Page) BufferGetPage(buffer);
 
-   if (data->isData)
+   if (!XLByteLE(lsn, PageGetLSN(page)))
    {
-       Assert(data->isDelete == FALSE);
-       Assert(GinPageIsData(page));
-
-       if (!XLByteLE(lsn, PageGetLSN(page)))
+       if (data->isData)
        {
+           Assert(GinPageIsData(page));
+
            if (data->isLeaf)
            {
                OffsetNumber i;
@@ -166,23 +193,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
                GinDataPageAddItem(page, pitem, data->offset);
            }
        }
-
-       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
+       else
        {
-           PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-
-           forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
-       }
+           IndexTuple  itup;
 
-   }
-   else
-   {
-       IndexTuple  itup;
+           Assert(!GinPageIsData(page));
 
-       Assert(!GinPageIsData(page));
-
-       if (!XLByteLE(lsn, PageGetLSN(page)))
-       {
            if (data->updateBlkno != InvalidBlockNumber)
            {
                /* update link to right page after split */
@@ -203,23 +219,15 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
 
            if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
                elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                 data->node.spcNode, data->node.dbNode, data->node.relNode);
+                    data->node.spcNode, data->node.dbNode, data->node.relNode);
        }
 
-       if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
-       {
-           itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
-           forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
-       }
-   }
-
-   if (!XLByteLE(lsn, PageGetLSN(page)))
-   {
        PageSetLSN(page, lsn);
        PageSetTLI(page, ThisTimeLineID);
 
        MarkBufferDirty(buffer);
    }
+
    UnlockReleaseBuffer(buffer);
 }
 
@@ -241,7 +249,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
    if (data->isData)
        flags |= GIN_DATA;
 
-   lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit);
+   lbuffer = XLogReadBuffer(reln, data->lblkno, true);
    Assert(BufferIsValid(lbuffer));
    lpage = (Page) BufferGetPage(lbuffer);
    GinInitBuffer(lbuffer, flags);
@@ -318,7 +326,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
 
    if (data->isRootSplit)
    {
-       Buffer      rootBuf = XLogReadBuffer(reln, data->rootBlkno, false);
+       Buffer      rootBuf = XLogReadBuffer(reln, data->rootBlkno, true);
        Page        rootPage = BufferGetPage(rootBuf);
 
        GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
@@ -355,46 +363,51 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
    Buffer      buffer;
    Page        page;
 
-   /* nothing else to do if page was backed up (and no info to do it with) */
+   /* nothing to do if page was backed up (and no info to do it with) */
    if (record->xl_info & XLR_BKP_BLOCK_1)
        return;
 
    reln = XLogOpenRelation(data->node);
    buffer = XLogReadBuffer(reln, data->blkno, false);
-   Assert(BufferIsValid(buffer));
+   if (!BufferIsValid(buffer))
+       return;
    page = (Page) BufferGetPage(buffer);
 
-   if (GinPageIsData(page))
-   {
-       memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
-              GinSizeOfItem(page) *data->nitem);
-       GinPageGetOpaque(page)->maxoff = data->nitem;
-   }
-   else
+   if (!XLByteLE(lsn, PageGetLSN(page)))
    {
-       OffsetNumber i,
-                  *tod;
-       IndexTuple  itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
+       if (GinPageIsData(page))
+       {
+           memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
+                  GinSizeOfItem(page) *data->nitem);
+           GinPageGetOpaque(page)->maxoff = data->nitem;
+       }
+       else
+       {
+           OffsetNumber i,
+               *tod;
+           IndexTuple  itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
 
-       tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
-       for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
-           tod[i - 1] = i;
+           tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
+           for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
+               tod[i - 1] = i;
 
-       PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
+           PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
 
-       for (i = 0; i < data->nitem; i++)
-       {
-           if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
-               elog(ERROR, "failed to add item to index page in %u/%u/%u",
-                 data->node.spcNode, data->node.dbNode, data->node.relNode);
-           itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+           for (i = 0; i < data->nitem; i++)
+           {
+               if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
+                   elog(ERROR, "failed to add item to index page in %u/%u/%u",
+                        data->node.spcNode, data->node.dbNode, data->node.relNode);
+               itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
+           }
        }
-   }
 
-   PageSetLSN(page, lsn);
-   PageSetTLI(page, ThisTimeLineID);
+       PageSetLSN(page, lsn);
+       PageSetTLI(page, ThisTimeLineID);
+
+       MarkBufferDirty(buffer);
+   }
 
-   MarkBufferDirty(buffer);
    UnlockReleaseBuffer(buffer);
 }
 
@@ -411,38 +424,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
    if (!(record->xl_info & XLR_BKP_BLOCK_1))
    {
        buffer = XLogReadBuffer(reln, data->blkno, false);
-       page = BufferGetPage(buffer);
-       Assert(GinPageIsData(page));
-       GinPageGetOpaque(page)->flags = GIN_DELETED;
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
-       MarkBufferDirty(buffer);
-       UnlockReleaseBuffer(buffer);
+       if (BufferIsValid(buffer))
+       {
+           page = BufferGetPage(buffer);
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               Assert(GinPageIsData(page));
+               GinPageGetOpaque(page)->flags = GIN_DELETED;
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+           UnlockReleaseBuffer(buffer);
+       }
    }
 
    if (!(record->xl_info & XLR_BKP_BLOCK_2))
    {
        buffer = XLogReadBuffer(reln, data->parentBlkno, false);
-       page = BufferGetPage(buffer);
-       Assert(GinPageIsData(page));
-       Assert(!GinPageIsLeaf(page));
-       PageDeletePostingItem(page, data->parentOffset);
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
-       MarkBufferDirty(buffer);
-       UnlockReleaseBuffer(buffer);
+       if (BufferIsValid(buffer))
+       {
+           page = BufferGetPage(buffer);
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               Assert(GinPageIsData(page));
+               Assert(!GinPageIsLeaf(page));
+               PageDeletePostingItem(page, data->parentOffset);
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+           UnlockReleaseBuffer(buffer);
+       }
    }
 
    if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
    {
        buffer = XLogReadBuffer(reln, data->leftBlkno, false);
-       page = BufferGetPage(buffer);
-       Assert(GinPageIsData(page));
-       GinPageGetOpaque(page)->rightlink = data->rightLink;
-       PageSetLSN(page, lsn);
-       PageSetTLI(page, ThisTimeLineID);
-       MarkBufferDirty(buffer);
-       UnlockReleaseBuffer(buffer);
+       if (BufferIsValid(buffer))
+       {
+           page = BufferGetPage(buffer);
+           if (!XLByteLE(lsn, PageGetLSN(page)))
+           {
+               Assert(GinPageIsData(page));
+               GinPageGetOpaque(page)->rightlink = data->rightLink;
+               PageSetLSN(page, lsn);
+               PageSetTLI(page, ThisTimeLineID);
+               MarkBufferDirty(buffer);
+           }
+           UnlockReleaseBuffer(buffer);
+       }
    }
 }
 
@@ -560,6 +591,13 @@ ginContinueSplit(ginIncompleteSplit *split)
 
    buffer = XLogReadBuffer(reln, split->leftBlkno, false);
 
+   /*
+    * Failure should be impossible here, because we wrote the page earlier.
+    */
+   if (!BufferIsValid(buffer))
+       elog(PANIC, "ginContinueSplit: left block %u not found",
+            split->leftBlkno);
+
    if (split->rootBlkno == GIN_ROOT_BLKNO)
    {
        prepareEntryScan(&btree, reln, (Datum) 0, NULL);