sketch out WAL support for adding index segments
authorRobert Haas <[email protected]>
Mon, 20 Sep 2021 17:21:15 +0000 (13:21 -0400)
committerRobert Haas <[email protected]>
Mon, 20 Sep 2021 17:21:15 +0000 (13:21 -0400)
src/backend/access/conveyor/cbmodify.c
src/backend/access/conveyor/cbxlog.c
src/include/access/cbmodify.h
src/include/access/cbxlog.h
src/tools/pgindent/typedefs.list

index 017290d870efee39f9458170ddccc20e2aa4c902..a46fada9a4c01d2417864c7657421e907911a455 100644 (file)
@@ -145,6 +145,10 @@ cb_insert_payload_page(RelFileNode *rnode, ForkNumber fork, Buffer metabuffer,
  * InvalidBuffer. Otherwise, 'fsmblock' should be the block number of the
  * relevant freespace map block and 'fsmbuffer' the corresponding buffer.
  *
+ * 'is_extend' should be true when we're allocating a segment that hasn't
+ * existed before, necessitating an adjustment to the metapage's
+ * next-segment counter.
+ *
  * See cb_xlog_allocate_payload_segment for the corresponding REDO routine.
  */
 void
@@ -192,13 +196,14 @@ cb_allocate_payload_segment(RelFileNode *rnode,
                                                  REGBUF_STANDARD);
                if (fsmblock != InvalidBlockNumber)
                        XLogRegisterBlock(1, rnode, fork, fsmblock,
-                                                         BufferGetPage(fsmbuffer),
-                                                         REGBUF_STANDARD);
+                                                         BufferGetPage(fsmbuffer), REGBUF_STANDARD);
                XLogRegisterData((char *) &xlrec, sizeof(xlrec));
                lsn = XLogInsert(RM_CONVEYOR_ID,
                                                 XLOG_CONVEYOR_ALLOCATE_PAYLOAD_SEGMENT);
 
                PageSetLSN(metapage, lsn);
+               if (fsmblock != InvalidBlockNumber)
+                       PageSetLSN(BufferGetPage(fsmbuffer), lsn);
        }
 
        END_CRIT_SECTION();
@@ -224,6 +229,12 @@ cb_allocate_payload_segment(RelFileNode *rnode,
  *
  * 'segno' is the segment number of the new index segment, and 'pageno'
  * is the first logical page for which it will store index information.
+ *
+ * 'is_extend' should be true when we're allocating a segment that hasn't
+ * existed before, necessitating an adjustment to the metapage's
+ * next-segment counter.
+ *
+ * See cb_xlog_allocate_index_segment for the corresponding REDO routine.
  */
 void
 cb_allocate_index_segment(RelFileNode *rnode,
@@ -237,6 +248,7 @@ cb_allocate_index_segment(RelFileNode *rnode,
                                                  Buffer fsmbuffer,
                                                  CBSegNo segno,
                                                  CBPageNo pageno,
+                                                 bool is_extend,
                                                  bool needs_xlog)
 {
        Page            metapage;
@@ -253,6 +265,9 @@ cb_allocate_index_segment(RelFileNode *rnode,
        cb_metapage_add_index_segment(meta, segno);
        MarkBufferDirty(metabuffer);
 
+       if (is_extend)
+               cb_metapage_increment_next_segment(meta, segno);
+
        cb_indexpage_initialize(indexpage, pageno, true);
        MarkBufferDirty(indexbuffer);
 
@@ -272,7 +287,34 @@ cb_allocate_index_segment(RelFileNode *rnode,
 
        if (needs_xlog)
        {
-               /* XXX write xlog, set LSNs */
+               xl_cb_allocate_index_segment    xlrec;
+               XLogRecPtr      lsn;
+
+               xlrec.segno = segno;
+               xlrec.pageno = pageno;
+               xlrec.is_extend = is_extend;
+
+               XLogBeginInsert();
+               XLogRegisterBlock(0, rnode, fork, CONVEYOR_METAPAGE, metapage,
+                                                 REGBUF_STANDARD);
+               XLogRegisterBlock(1, rnode, fork, indexblock, indexpage,
+                                                 REGBUF_STANDARD | REGBUF_WILL_INIT);
+               if (prevblock != InvalidBlockNumber)
+                       XLogRegisterBlock(2, rnode, fork, prevblock,
+                                                         BufferGetPage(prevbuffer), REGBUF_STANDARD);
+               if (fsmblock != InvalidBlockNumber)
+                       XLogRegisterBlock(3, rnode, fork, fsmblock,
+                                                         BufferGetPage(fsmbuffer), REGBUF_STANDARD);
+               XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+               lsn = XLogInsert(RM_CONVEYOR_ID,
+                                                XLOG_CONVEYOR_ALLOCATE_INDEX_SEGMENT);
+
+               PageSetLSN(metapage, lsn);
+               PageSetLSN(indexpage, lsn);
+               if (prevblock != InvalidBlockNumber)
+                       PageSetLSN(BufferGetPage(prevbuffer), lsn);
+               if (fsmblock != InvalidBlockNumber)
+                       PageSetLSN(BufferGetPage(fsmbuffer), lsn);
        }
 
        END_CRIT_SECTION();
index 4251a801109e73442abdcfe186fbd9820e2124b6..d0298ff02d0f2ddde3c8d1cf849af64188e69cc2 100644 (file)
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "access/cbfsmpage.h"
+#include "access/cbindexpage.h"
 #include "access/cbmetapage.h"
 #include "access/cbxlog.h"
 #include "access/xloginsert.h"
@@ -101,6 +102,80 @@ cb_xlog_allocate_payload_segment(XLogReaderState *record)
                UnlockReleaseBuffer(fsmbuffer);
 }
 
+/*
+ * REDO function for cb_allocate_index_segment.
+ */
+static void
+cb_xlog_allocate_index_segment(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_cb_allocate_index_segment *xlrec;
+       bool            have_prev_page;
+       bool            have_fsm_page;
+       Buffer          metabuffer;
+       Buffer          indexbuffer;
+       Buffer          prevbuffer = InvalidBuffer;
+       Buffer          fsmbuffer = InvalidBuffer;
+
+       have_prev_page = XLogRecGetBlockTag(record, 2, NULL, NULL, NULL);
+       have_fsm_page = XLogRecGetBlockTag(record, 3, NULL, NULL, NULL);
+
+       xlrec = (xl_cb_allocate_index_segment *) XLogRecGetData(record);
+
+       if (XLogReadBufferForRedo(record, 0, &metabuffer) == BLK_NEEDS_REDO)
+       {
+               Page    metapage = BufferGetPage(metabuffer);
+               CBMetapageData *meta;
+
+               meta = cb_metapage_get_special(metapage);
+               cb_metapage_add_index_segment(meta, xlrec->segno);
+               if (xlrec->is_extend)
+                       cb_metapage_increment_next_segment(meta, xlrec->segno);
+               if (!have_fsm_page)
+                       cb_metapage_set_fsm_bit(meta, xlrec->segno, true);
+               PageSetLSN(metapage, lsn);
+               MarkBufferDirty(metabuffer);
+       }
+
+       if (XLogReadBufferForRedo(record, 1, &indexbuffer) == BLK_NEEDS_REDO)
+       {
+               Page    indexpage = BufferGetPage(indexbuffer);
+
+               cb_indexpage_initialize(indexpage, xlrec->pageno, true);
+               PageSetLSN(indexpage, lsn);
+               MarkBufferDirty(indexbuffer);
+       }
+
+       if (have_prev_page &&
+               XLogReadBufferForRedo(record, 2, &prevbuffer) == BLK_NEEDS_REDO)
+       {
+               Page    prevpage = BufferGetPage(prevbuffer);
+
+               cb_indexpage_set_next_segment(prevpage, xlrec->segno);
+               PageSetLSN(prevpage, lsn);
+               MarkBufferDirty(prevbuffer);
+       }
+
+       if (have_fsm_page &&
+               XLogReadBufferForRedo(record, 3, &fsmbuffer) == BLK_NEEDS_REDO)
+       {
+               Page    fsmpage = BufferGetPage(fsmbuffer);
+
+               cb_fsmpage_set_fsm_bit(fsmpage, xlrec->segno, true);
+               PageSetLSN(fsmpage, lsn);
+               MarkBufferDirty(fsmbuffer);
+       }
+
+       if (BufferIsValid(metabuffer))
+               UnlockReleaseBuffer(metabuffer);
+       if (BufferIsValid(indexbuffer))
+               UnlockReleaseBuffer(indexbuffer);
+       if (BufferIsValid(prevbuffer))
+               UnlockReleaseBuffer(prevbuffer);
+       if (BufferIsValid(fsmbuffer))
+               UnlockReleaseBuffer(fsmbuffer);
+}
+
 /*
  * Main entrypoint for conveyor belt REDO.
  */
@@ -117,6 +192,9 @@ conveyor_redo(XLogReaderState *record)
                case XLOG_CONVEYOR_ALLOCATE_PAYLOAD_SEGMENT:
                        cb_xlog_allocate_payload_segment(record);
                        break;
+               case XLOG_CONVEYOR_ALLOCATE_INDEX_SEGMENT:
+                       cb_xlog_allocate_index_segment(record);
+                       break;
                default:
                        elog(PANIC, "conveyor_redo: unknown op code %u", info);
        }
index 8e98115345a6597c14c1f39f11e3ecbc799fbde4..fb0152809b5fb5e247f538d9903647b6d4220b8a 100644 (file)
@@ -59,7 +59,7 @@ extern void cb_allocate_payload_segment(RelFileNode *rnode,
 extern void cb_allocate_index_segment(RelFileNode *rnode,
                                                                          ForkNumber fork,
                                                                          Buffer metabuffer,
-                                                                         BlockNumber indexblock,
+                                                                         BlockNumber indexblock,
                                                                          Buffer indexbuffer,
                                                                          BlockNumber prevblock,
                                                                          Buffer prevbuffer,
@@ -67,6 +67,7 @@ extern void cb_allocate_index_segment(RelFileNode *rnode,
                                                                          Buffer fsmbuffer,
                                                                          CBSegNo segno,
                                                                          CBPageNo pageno,
+                                                                         bool is_extend,
                                                                          bool needs_xlog);
 
 #endif                                                 /* CBMODIFY_H */
index aee28fb670f5e54b53129d09789ea8f43c7847ee..c7c2660d5f1043edbaab9802d4c13b31aa6de477 100644 (file)
 
 #define XLOG_CONVEYOR_INSERT_PAYLOAD_PAGE                      0x10
 #define        XLOG_CONVEYOR_ALLOCATE_PAYLOAD_SEGMENT          0x20
+#define        XLOG_CONVEYOR_ALLOCATE_INDEX_SEGMENT            0x30
 
 typedef struct xl_cb_allocate_payload_segment
 {
-       CBSegNo segno;
-       bool    is_extend;
+       CBSegNo         segno;
+       bool            is_extend;
 } xl_cb_allocate_payload_segment;
 
+typedef struct xl_cb_allocate_index_segment
+{
+       CBSegNo         segno;
+       CBPageNo        pageno;
+       bool            is_extend;
+} xl_cb_allocate_index_segment;
+
 extern void conveyor_desc(StringInfo buf, XLogReaderState *record);
 extern void conveyor_redo(XLogReaderState *record);
 extern const char *conveyor_identify(uint8 info);
index 6667e3c794a427c88ad43bd8bc0c6f0149b4ca3b..fe813993ba10ed58cc9926ebe469486fd9a902c2 100644 (file)
@@ -3771,3 +3771,5 @@ CBMetapageData
 CBPageNo
 CBSegNo
 ConveyorBelt
+xl_cb_allocate_index_segment
+xl_cb_allocate_payload_segment