Fix possible duplicate tuples while GiST scan. Now page is processed
authorTeodor Sigaev <[email protected]>
Sat, 23 Aug 2008 10:43:58 +0000 (10:43 +0000)
committerTeodor Sigaev <[email protected]>
Sat, 23 Aug 2008 10:43:58 +0000 (10:43 +0000)
at once and ItemPointers are collected in memory.

Remove tuple's killing by killtuple() if tuple was moved to another
page - it could produce unaceptable overhead.

Backpatch up to 8.1 because the bug was introduced by GiST's concurrency support.

src/backend/access/gist/gistget.c
src/backend/access/gist/gistscan.c
src/include/access/gist_private.h [new file with mode: 0644]

index 287e2f63ad3aecdb6432b614a280099bc68dc1c4..0195df80628defaa065a439a3163f4061a5b21a0 100644 (file)
@@ -30,62 +30,38 @@ static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
 static void
 killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr)
 {
-       Buffer          buffer = so->curbuf;
+       Page        p;
+       OffsetNumber offset;
 
-       for (;;)
+       LockBuffer(so->curbuf, GIST_SHARE);
+       p = (Page) BufferGetPage(so->curbuf);
+       if (XLByteEQ(so->stack->lsn, PageGetLSN(p)))
        {
-               Page            p;
-               BlockNumber blkno;
-               OffsetNumber offset,
-                                       maxoff;
-
-               LockBuffer(buffer, GIST_SHARE);
-               p = (Page) BufferGetPage(buffer);
-
-               if (buffer == so->curbuf && XLByteEQ(so->stack->lsn, PageGetLSN(p)))
-               {
-                       /* page unchanged, so all is simple */
-                       offset = ItemPointerGetOffsetNumber(iptr);
-                       PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
-                       SetBufferCommitInfoNeedsSave(buffer);
-                       LockBuffer(buffer, GIST_UNLOCK);
-                       break;
-               }
-
-               maxoff = PageGetMaxOffsetNumber(p);
+               /* page unchanged, so all is simple */
+               offset = ItemPointerGetOffsetNumber(iptr);
+               PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
+               SetBufferCommitInfoNeedsSave(so->curbuf);
+       }
+       else
+       {
+               OffsetNumber maxoff = PageGetMaxOffsetNumber(p);
 
                for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
                {
-                       IndexTuple      ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
+                       IndexTuple  ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset));
 
                        if (ItemPointerEquals(&(ituple->t_tid), iptr))
                        {
                                /* found */
                                PageGetItemId(p, offset)->lp_flags |= LP_DELETE;
-                               SetBufferCommitInfoNeedsSave(buffer);
-                               LockBuffer(buffer, GIST_UNLOCK);
-                               if (buffer != so->curbuf)
-                                       ReleaseBuffer(buffer);
-                               return;
+                               SetBufferCommitInfoNeedsSave(so->curbuf);
+                               break;
                        }
                }
-
-               /* follow right link */
-
-               /*
-                * ??? is it good? if tuple dropped by concurrent vacuum, we will read
-                * all leaf pages...
-                */
-               blkno = GistPageGetOpaque(p)->rightlink;
-               LockBuffer(buffer, GIST_UNLOCK);
-               if (buffer != so->curbuf)
-                       ReleaseBuffer(buffer);
-
-               if (blkno == InvalidBlockNumber)
-                       /* can't found, dropped by somebody else */
-                       return;
-               buffer = ReadBuffer(r, blkno);
        }
+
+       LockBuffer(so->curbuf, GIST_UNLOCK);
 }
 
 /*
@@ -146,7 +122,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
        GISTSearchStack *stk;
        IndexTuple      it;
        GISTPageOpaque opaque;
-       bool            resetoffset = false;
        int                     ntids = 0;
 
        so = (GISTScanOpaque) scan->opaque;
@@ -171,6 +146,42 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                return 0;
        }
 
+       /*
+        * check stored pointers from last visit 
+        */
+       if ( so->nPageData > 0 ) 
+       {
+               while( ntids < maxtids && so->curPageData < so->nPageData )
+               {
+                       tids[ ntids ] = scan->xs_ctup.t_self = so->pageData[ so->curPageData ];
+                               
+                       so->curPageData ++;
+                       ntids++;
+               }
+
+               if ( ntids == maxtids )
+                       return ntids;
+               
+               /*
+                * Go to the next page
+                */
+               stk = so->stack->next;
+               pfree(so->stack);
+               so->stack = stk;
+
+               /* If we're out of stack entries, we're done */
+               if (so->stack == NULL)
+               {
+                       ReleaseBuffer(so->curbuf);
+                       so->curbuf = InvalidBuffer;
+                       return ntids;
+               }
+
+               so->curbuf = ReleaseAndReadBuffer(so->curbuf,
+                                                                                 scan->indexRelation,
+                                                                                 stk->block);
+       }
+
        for (;;)
        {
                /* First of all, we need lock buffer */
@@ -178,30 +189,25 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                LockBuffer(so->curbuf, GIST_SHARE);
                p = BufferGetPage(so->curbuf);
                opaque = GistPageGetOpaque(p);
-               resetoffset = false;
 
-               if (XLogRecPtrIsInvalid(so->stack->lsn) || !XLByteEQ(so->stack->lsn, PageGetLSN(p)))
-               {
-                       /* page changed from last visit or visit first time , reset offset */
-                       so->stack->lsn = PageGetLSN(p);
-                       resetoffset = true;
-
-                       /* check page split, occured from last visit or visit to parent */
-                       if (!XLogRecPtrIsInvalid(so->stack->parentlsn) &&
-                               XLByteLT(so->stack->parentlsn, opaque->nsn) &&
-                               opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
-                               (so->stack->next == NULL || so->stack->next->block != opaque->rightlink)                /* check if already
-                                       added */ )
-                       {
-                               /* detect page split, follow right link to add pages */
+               /* remember lsn to identify page changed for tuple's killing */
+               so->stack->lsn = PageGetLSN(p);
 
-                               stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
-                               stk->next = so->stack->next;
-                               stk->block = opaque->rightlink;
-                               stk->parentlsn = so->stack->parentlsn;
-                               memset(&(stk->lsn), 0, sizeof(GistNSN));
-                               so->stack->next = stk;
-                       }
+               /* check page split, occured from last visit or visit to parent */
+               if (!XLogRecPtrIsInvalid(so->stack->parentlsn) &&
+                       XLByteLT(so->stack->parentlsn, opaque->nsn) &&
+                       opaque->rightlink != InvalidBlockNumber /* sanity check */ &&
+                       (so->stack->next == NULL || so->stack->next->block != opaque->rightlink)                /* check if already
+                                       added */ )
+               {
+                       /* detect page split, follow right link to add pages */
+
+                       stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack));
+                       stk->next = so->stack->next;
+                       stk->block = opaque->rightlink;
+                       stk->parentlsn = so->stack->parentlsn;
+                       memset(&(stk->lsn), 0, sizeof(GistNSN));
+                       so->stack->next = stk;
                }
 
                /* if page is empty, then just skip it */
@@ -224,24 +230,13 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                        continue;
                }
 
-               if (!GistPageIsLeaf(p) || resetoffset || ItemPointerIsValid(&scan->currentItemData) == false)
-               {
-                       if (ScanDirectionIsBackward(dir))
-                               n = PageGetMaxOffsetNumber(p);
-                       else
-                               n = FirstOffsetNumber;
-               }
+               if (ScanDirectionIsBackward(dir))
+                       n = PageGetMaxOffsetNumber(p);
                else
-               {
-                       n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
-
-                       if (ScanDirectionIsBackward(dir))
-                               n = OffsetNumberPrev(n);
-                       else
-                               n = OffsetNumberNext(n);
-               }
+                       n = FirstOffsetNumber;
 
                /* wonderfull, we can look at page */
+               so->nPageData = so->curPageData = 0;
 
                for (;;)
                {
@@ -249,6 +244,20 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
 
                        if (!OffsetNumberIsValid(n))
                        {
+                               while( ntids < maxtids && so->curPageData < so->nPageData )
+                               {
+                                       tids[ ntids ] = scan->xs_ctup.t_self = so->pageData[ so->curPageData ];
+                               
+                                       so->curPageData ++;
+                                       ntids++;
+                               }
+
+                               if ( ntids == maxtids )
+                               {
+                                       LockBuffer(so->curbuf, GIST_UNLOCK);
+                                       return ntids;
+                               }
+
                                /*
                                 * We ran out of matching index entries on the current page,
                                 * so pop the top stack entry and use it to continue the
@@ -288,14 +297,8 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, int maxtids, b
                                if (!(ignore_killed_tuples && ItemIdDeleted(PageGetItemId(p, n))))
                                {
                                        it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
-                                       tids[ntids] = scan->xs_ctup.t_self = it->t_tid;
-                                       ntids++;
-
-                                       if (ntids == maxtids)
-                                       {
-                                               LockBuffer(so->curbuf, GIST_UNLOCK);
-                                               return ntids;
-                                       }
+                                       so->pageData[ so->nPageData ] = it->t_tid;
+                                       so->nPageData ++;
                                }
                        }
                        else
index 92a5def7a03609dde57071d5331287e79b79dff5..70ba52bbdc3c548ac3793a397fd2a281f75b3cf1 100644 (file)
@@ -98,6 +98,7 @@ gistrescan(PG_FUNCTION_ARGS)
                        ReleaseBuffer(so->markbuf);
                        so->markbuf = InvalidBuffer;
                }
+
        }
        else
        {
@@ -113,6 +114,8 @@ gistrescan(PG_FUNCTION_ARGS)
                scan->opaque = so;
        }
 
+       so->nPageData = so->curPageData = 0;
+
        /* Update scan key, if a new one is given */
        if (key && scan->numberOfKeys > 0)
        {
@@ -179,6 +182,11 @@ gistmarkpos(PG_FUNCTION_ARGS)
                so->markbuf = so->curbuf;
        }
 
+       so->markNPageData = so->nPageData;
+       so->markCurPageData = so->curPageData;
+       if ( so->markNPageData > 0 )
+               memcpy( so->markPageData, so->pageData, sizeof(ItemPointerData) * so->markNPageData );          
+
        PG_RETURN_VOID();
 }
 
@@ -228,6 +236,11 @@ gistrestrpos(PG_FUNCTION_ARGS)
                so->curbuf = so->markbuf;
        }
 
+       so->nPageData = so->markNPageData;
+       so->curPageData = so->markNPageData;
+       if ( so->markNPageData > 0 )
+               memcpy( so->pageData, so->markPageData, sizeof(ItemPointerData) * so->markNPageData );          
+
        PG_RETURN_VOID();
 }
 
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
new file mode 100644 (file)
index 0000000..b4d73b2
--- /dev/null
@@ -0,0 +1,319 @@
+/*-------------------------------------------------------------------------
+ *
+ * gist_private.h
+ *       private declarations for GiST -- declarations related to the
+ *       internal implementation of GiST, not the public API
+ *
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef GIST_PRIVATE_H
+#define GIST_PRIVATE_H
+
+#include "access/itup.h"
+#include "access/gist.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "fmgr.h"
+
+#define GIST_UNLOCK BUFFER_LOCK_UNLOCK
+#define GIST_SHARE     BUFFER_LOCK_SHARE
+#define GIST_EXCLUSIVE BUFFER_LOCK_EXCLUSIVE
+
+
+/*
+ * XXX old comment!!!
+ * When we descend a tree, we keep a stack of parent pointers. This
+ * allows us to follow a chain of internal node points until we reach
+ * a leaf node, and then back up the stack to re-examine the internal
+ * nodes.
+ *
+ * 'parent' is the previous stack entry -- i.e. the node we arrived
+ * from. 'block' is the node's block number. 'offset' is the offset in
+ * the node's page that we stopped at (i.e. we followed the child
+ * pointer located at the specified offset).
+ */
+typedef struct GISTSearchStack
+{
+       struct GISTSearchStack *next;
+       BlockNumber block;
+       /* to identify page changed */
+       GistNSN         lsn;
+       /* to recognize split occured */
+       GistNSN         parentlsn;
+} GISTSearchStack;
+
+typedef struct GISTSTATE
+{
+       FmgrInfo        consistentFn[INDEX_MAX_KEYS];
+       FmgrInfo        unionFn[INDEX_MAX_KEYS];
+       FmgrInfo        compressFn[INDEX_MAX_KEYS];
+       FmgrInfo        decompressFn[INDEX_MAX_KEYS];
+       FmgrInfo        penaltyFn[INDEX_MAX_KEYS];
+       FmgrInfo        picksplitFn[INDEX_MAX_KEYS];
+       FmgrInfo        equalFn[INDEX_MAX_KEYS];
+
+       TupleDesc       tupdesc;
+} GISTSTATE;
+
+/*
+ *     When we're doing a scan, we need to keep track of the parent stack
+ *     for the marked and current items.
+ */
+typedef struct GISTScanOpaqueData
+{
+       GISTSearchStack *stack;
+       GISTSearchStack *markstk;
+       uint16          flags;
+       GISTSTATE  *giststate;
+       MemoryContext tempCxt;
+       Buffer          curbuf;
+       Buffer          markbuf;
+
+       ItemPointerData pageData[BLCKSZ/sizeof(IndexTupleData)];
+       OffsetNumber    nPageData;
+       OffsetNumber    curPageData;
+       ItemPointerData markPageData[BLCKSZ/sizeof(IndexTupleData)];
+       OffsetNumber    markNPageData;
+       OffsetNumber    markCurPageData;
+} GISTScanOpaqueData;
+
+typedef GISTScanOpaqueData *GISTScanOpaque;
+
+/* XLog stuff */
+extern const XLogRecPtr XLogRecPtrForTemp;
+
+#define XLOG_GIST_ENTRY_UPDATE 0x00
+#define XLOG_GIST_ENTRY_DELETE 0x10
+#define XLOG_GIST_NEW_ROOT     0x20
+
+typedef struct gistxlogEntryUpdate
+{
+       RelFileNode node;
+       BlockNumber blkno;
+
+       uint16          ntodelete;
+       bool            isemptypage;
+
+       /*
+        * It used to identify completeness of insert. Sets to leaf itup
+        */
+       ItemPointerData key;
+
+       /*
+        * follow: 1. todelete OffsetNumbers 2. tuples to insert
+        */
+} gistxlogEntryUpdate;
+
+#define XLOG_GIST_PAGE_SPLIT   0x30
+
+typedef struct gistxlogPageSplit
+{
+       RelFileNode node;
+       BlockNumber origblkno;          /* splitted page */
+       uint16          npage;
+
+       /* see comments on gistxlogEntryUpdate */
+       ItemPointerData key;
+
+       /*
+        * follow: 1. gistxlogPage and array of IndexTupleData per page
+        */
+} gistxlogPageSplit;
+
+#define XLOG_GIST_INSERT_COMPLETE  0x40
+
+typedef struct gistxlogPage
+{
+       BlockNumber blkno;
+       int                     num;
+} gistxlogPage;
+
+#define XLOG_GIST_CREATE_INDEX 0x50
+
+typedef struct gistxlogInsertComplete
+{
+       RelFileNode node;
+       /* follows ItemPointerData key to clean */
+} gistxlogInsertComplete;
+
+/* SplitedPageLayout - gistSplit function result */
+typedef struct SplitedPageLayout
+{
+       gistxlogPage block;
+       IndexTupleData *list;
+       int                     lenlist;
+       Buffer          buffer;                 /* to write after all proceed */
+
+       struct SplitedPageLayout *next;
+} SplitedPageLayout;
+
+/*
+ * GISTInsertStack used for locking buffers and transfer arguments during
+ * insertion
+ */
+
+typedef struct GISTInsertStack
+{
+       /* current page */
+       BlockNumber blkno;
+       Buffer          buffer;
+       Page            page;
+
+       /*
+        * log sequence number from page->lsn to recognize page update  and
+        * compare it with page's nsn to recognize page split
+        */
+       GistNSN         lsn;
+
+       /* child's offset */
+       OffsetNumber childoffnum;
+
+       /* pointer to parent and child */
+       struct GISTInsertStack *parent;
+       struct GISTInsertStack *child;
+
+       /* for gistFindPath */
+       struct GISTInsertStack *next;
+} GISTInsertStack;
+
+#define XLogRecPtrIsInvalid( r )       ( (r).xlogid == 0 && (r).xrecoff == 0 )
+
+typedef struct
+{
+       Relation        r;
+       IndexTuple *itup;                       /* in/out, points to compressed entry */
+       int                     ituplen;                /* length of itup */
+       GISTInsertStack *stack;
+       bool            needInsertComplete;
+
+       /* pointer to heap tuple */
+       ItemPointerData key;
+} GISTInsertState;
+
+/*
+ * When we're doing a scan and updating a tree at the same time, the
+ * updates may affect the scan.  We use the flags entry of the scan's
+ * opaque space to record our actual position in response to updates
+ * that we can't handle simply by adjusting pointers.
+ */
+#define GS_CURBEFORE   ((uint16) (1 << 0))
+#define GS_MRKBEFORE   ((uint16) (1 << 1))
+
+/* root page of a gist index */
+#define GIST_ROOT_BLKNO                                0
+
+/*
+ * When we update a relation on which we're doing a scan, we need to
+ * check the scan and fix it if the update affected any of the pages
+ * it touches. Otherwise, we can miss records that we should see.
+ * The only times we need to do this are for deletions and splits. See
+ * the code in gistscan.c for how the scan is fixed. These two
+ * constants tell us what sort of operation changed the index.
+ */
+#define GISTOP_DEL             0
+/* #define GISTOP_SPLIT 1 */
+
+#define ATTSIZE(datum, tupdesc, i, isnull) \
+               ( \
+                               (isnull) ? 0 : \
+                                  att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
+               )
+
+/*
+ * mark tuples on inner pages during recovery
+ */
+#define TUPLE_IS_VALID         0xffff
+#define TUPLE_IS_INVALID       0xfffe
+
+#define  GistTupleIsInvalid(itup)      ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
+#define  GistTupleSetValid(itup)       ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
+#define  GistTupleSetInvalid(itup)     ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID )
+
+/* gist.c */
+extern Datum gistbuild(PG_FUNCTION_ARGS);
+extern Datum gistinsert(PG_FUNCTION_ARGS);
+extern MemoryContext createTempGistContext(void);
+extern void initGISTstate(GISTSTATE *giststate, Relation index);
+extern void freeGISTstate(GISTSTATE *giststate);
+extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
+extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
+
+extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
+                 int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
+
+extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child,
+                        Buffer (*myReadBuffer) (Relation, BlockNumber));
+
+/* gistxlog.c */
+extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
+extern void gist_desc(char *buf, uint8 xl_info, char *rec);
+extern void gist_xlog_startup(void);
+extern void gist_xlog_cleanup(void);
+extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
+
+extern XLogRecData *formUpdateRdata(RelFileNode node, BlockNumber blkno,
+                               OffsetNumber *todelete, int ntodelete, bool emptypage,
+                               IndexTuple *itup, int ituplen, ItemPointer key);
+
+extern XLogRecData *formSplitRdata(RelFileNode node, BlockNumber blkno,
+                          ItemPointer key, SplitedPageLayout *dist);
+
+extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
+
+/* gistget.c */
+extern Datum gistgettuple(PG_FUNCTION_ARGS);
+extern Datum gistgetmulti(PG_FUNCTION_ARGS);
+
+/* gistutil.c */
+extern Buffer gistNewBuffer(Relation r);
+extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
+                          int len, OffsetNumber off);
+extern bool gistnospace(Page page, IndexTuple *itvec, int len);
+extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ );
+extern IndexTuple *gistjoinvector(
+                          IndexTuple *itvec, int *len,
+                          IndexTuple *additvec, int addlen);
+extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
+                 int len, GISTSTATE *giststate);
+extern IndexTuple gistgetadjusted(Relation r,
+                               IndexTuple oldtup,
+                               IndexTuple addtup,
+                               GISTSTATE *giststate);
+extern int gistfindgroup(GISTSTATE *giststate,
+                         GISTENTRY *valvec, GIST_SPLITVEC *spl);
+extern void gistadjsubkey(Relation r,
+                         IndexTuple *itup, int len,
+                         GIST_SPLITVEC *v,
+                         GISTSTATE *giststate);
+extern IndexTuple gistFormTuple(GISTSTATE *giststate,
+                         Relation r, Datum *attdata, int *datumsize, bool *isnull);
+
+extern OffsetNumber gistchoose(Relation r, Page p,
+                  IndexTuple it,
+                  GISTSTATE *giststate);
+extern void gistcentryinit(GISTSTATE *giststate, int nkey,
+                          GISTENTRY *e, Datum k,
+                          Relation r, Page pg,
+                          OffsetNumber o, int b, bool l, bool isNull);
+extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
+                                 IndexTuple tuple, Page p, OffsetNumber o,
+                                 GISTENTRY *attdata, bool *isnull);
+extern void gistunionsubkey(Relation r, GISTSTATE *giststate,
+                               IndexTuple *itvec, GIST_SPLITVEC *spl, bool isall);
+extern void GISTInitBuffer(Buffer b, uint32 f);
+extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
+                          Datum k, Relation r, Page pg, OffsetNumber o,
+                          int b, bool l, bool isNull);
+void gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
+                                 IndexTuple *itup, int len, GISTSTATE *giststate);
+
+/* gistvacuum.c */
+extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
+extern Datum gistvacuumcleanup(PG_FUNCTION_ARGS);
+
+#endif   /* GIST_PRIVATE_H */