add ConveyorBeltLogicalTruncate
authorRobert Haas <[email protected]>
Fri, 15 Oct 2021 14:12:36 +0000 (10:12 -0400)
committerRobert Haas <[email protected]>
Fri, 15 Oct 2021 14:12:36 +0000 (10:12 -0400)
src/backend/access/conveyor/cbmodify.c
src/backend/access/conveyor/cbxlog.c
src/backend/access/conveyor/conveyor.c
src/include/access/cbmodify.h
src/include/access/cbxlog.h
src/include/access/conveyor.h

index f819b3ab443e7e9c67a102310dcde6a97c48efc7..2905b4e7a592ee84c98bbce110aa0e9b013eff17 100644 (file)
@@ -440,3 +440,44 @@ cb_relocate_index_entries(RelFileNode *rnode,
 
        END_CRIT_SECTION();
 }
+
+/*
+ * Logically truncate a conveyor belt by updating its notion of the oldest
+ * logical page.
+ */
+void
+cb_logical_truncate(RelFileNode *rnode,
+                                       ForkNumber fork,
+                                       Buffer metabuffer,
+                                       CBPageNo oldest_keeper,
+                                       bool needs_xlog)
+{
+       Page            metapage;
+       CBMetapageData *meta;
+
+       metapage = BufferGetPage(metabuffer);
+       meta = cb_metapage_get_special(metapage);
+
+       START_CRIT_SECTION();
+
+       cb_metapage_advance_oldest_logical_page(meta, oldest_keeper);
+
+       if (needs_xlog)
+       {
+               xl_cb_logical_truncate  xlrec;
+               XLogRecPtr      lsn;
+
+               xlrec.oldest_keeper = oldest_keeper;
+
+               XLogBeginInsert();
+               XLogRegisterBlock(0, rnode, fork, CONVEYOR_METAPAGE, metapage,
+                                                 REGBUF_STANDARD);
+               XLogRegisterData((char *) &xlrec, SizeOfCBLogicalTruncate);
+               lsn = XLogInsert(RM_CONVEYOR_ID,
+                                                XLOG_CONVEYOR_LOGICAL_TRUNCATE);
+
+               PageSetLSN(metapage, lsn);
+       }
+
+       END_CRIT_SECTION();
+}
index 5b18506786a65f3221bcf810ce2437b81a07bd84..2661124e058ada01c80cccb136be5d247e7a9ea4 100644 (file)
@@ -218,7 +218,6 @@ cb_xlog_relocate_index_entries(XLogReaderState *record)
 
        xlrec = (xl_cb_relocate_index_entries *) XLogRecGetData(record);
 
-       /* NB: metapage must be last due to lock ordering rules */
        if (XLogReadBufferForRedo(record, 1, &indexbuffer) == BLK_NEEDS_REDO)
        {
                Page    indexpage = BufferGetPage(indexbuffer);
@@ -251,6 +250,33 @@ cb_xlog_relocate_index_entries(XLogReaderState *record)
                UnlockReleaseBuffer(indexbuffer);
 }
 
+/*
+ * REDO function for cb_logical_truncate.
+ */
+static void
+cb_xlog_logical_truncate(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_cb_logical_truncate *xlrec;
+       Buffer          metabuffer;
+
+       xlrec = (xl_cb_logical_truncate *) XLogRecGetData(record);
+
+       if (XLogReadBufferForRedo(record, 0, &metabuffer) == BLK_NEEDS_REDO)
+       {
+               Page    metapage = BufferGetPage(metabuffer);
+               CBMetapageData *meta;
+
+               meta = cb_metapage_get_special(metapage);
+               cb_metapage_advance_oldest_logical_page(meta, xlrec->oldest_keeper);
+               PageSetLSN(metapage, lsn);
+               MarkBufferDirty(metabuffer);
+       }
+
+       if (BufferIsValid(metabuffer))
+               UnlockReleaseBuffer(metabuffer);
+}
+
 /*
  * Main entrypoint for conveyor belt REDO.
  */
@@ -276,6 +302,9 @@ conveyor_redo(XLogReaderState *record)
                case XLOG_CONVEYOR_RELOCATE_INDEX_ENTRIES:
                        cb_xlog_relocate_index_entries(record);
                        break;
+               case XLOG_CONVEYOR_LOGICAL_TRUNCATE:
+                       cb_xlog_logical_truncate(record);
+                       break;
                default:
                        elog(PANIC, "conveyor_redo: unknown op code %u", info);
        }
index 449ddfac34330a575eb7193a2868c52c936726d8..395bd70ea633cdbc23f5fef4ceb8afec819941c9 100644 (file)
@@ -891,6 +891,67 @@ ConveyorBeltGetBounds(ConveyorBelt *cb, CBPageNo *oldest_logical_page,
        UnlockReleaseBuffer(metabuffer);
 }
 
+/*
+ * Update the conveyor belt's notion of the oldest logical page to be kept.
+ *
+ * This doesn't physically shrink the relation, nor does it even make space
+ * available for reuse by future insertions. It just makes pages prior to
+ * 'oldest_keeper' unavailable, thus potentially allowing the segments
+ * containing those pages to be freed by a future call to ConveyorBeltVacuum.
+ *
+ * A call to this function shouldn't try to move the logical truncation point
+ * backwards. That is, the value of 'oldest_keeper' should always be greater
+ * than or equal to the value passed on the previous call for this conveyor
+ * belt. It also shouldn't try to move the logical truncation point beyond
+ * the current insertion point: don't try to throw away data that hasn't been
+ * inserted yet!
+ *
+ * For routine cleanup of a conveyor belt, the recommended sequence of calls
+ * is ConveyorBeltLogicalTruncate then ConveyorBeltVacuum then
+ * ConveyorBeltPhysicalTruncate. For more aggressive cleanup options, see
+ * ConveyorBeltCompact or ConveyorBeltRewrite.
+ */
+void
+ConveyorBeltLogicalTruncate(ConveyorBelt *cb, CBPageNo oldest_keeper)
+{
+       Buffer          metabuffer;
+       CBMetapageData *meta;
+       CBPageNo        oldest_logical_page;
+       CBPageNo        next_logical_page;
+       RelFileNode *rnode;
+       bool            needs_xlog;
+
+       /*
+        * We must take a cleanup lock to adjust the logical truncation point,
+        * as per the locking protocols in src/backend/access/conveyor/README.
+        */
+       metabuffer = ReadBufferExtended(cb->cb_rel, cb->cb_fork, CONVEYOR_METAPAGE,
+                                                                       RBM_NORMAL, NULL);
+       LockBufferForCleanup(metabuffer);
+
+       /* Sanity checks. */
+       meta = cb_metapage_get_special(BufferGetPage(metabuffer));
+       cb_metapage_get_bounds(meta, &oldest_logical_page, &next_logical_page);
+       if (oldest_keeper < oldest_logical_page)
+               elog(ERROR,
+                        "can't move truncation point backwards from " UINT64_FORMAT " to " UINT64_FORMAT,
+                        oldest_logical_page, oldest_keeper);
+       if (oldest_keeper > next_logical_page)
+               elog(ERROR,
+                        "can't move truncation point to " UINT64_FORMAT " beyond insert point " UINT64_FORMAT,
+                        oldest_keeper, next_logical_page);
+
+
+       /* Do the real work. */
+       rnode = &RelationGetSmgr(cb->cb_rel)->smgr_rnode.node;
+       needs_xlog = RelationNeedsWAL(cb->cb_rel) || cb->cb_fork == INIT_FORKNUM;
+       cb_logical_truncate(rnode, cb->cb_fork, metabuffer, oldest_keeper,
+                                               needs_xlog);
+
+       /* Release buffer lock. */
+       UnlockReleaseBuffer(metabuffer);
+}
+
 /*
  * Pin and return the block indicated by 'blkno', extending if needed.
  *
index f321bd1f9074812a46b214e3d06dee90f3ada3bc..0e11cef0703dc0dd2f002bbfa5fae5312b7072cf 100644 (file)
@@ -88,4 +88,10 @@ extern void cb_relocate_index_entries(RelFileNode *rnode,
                                                                          CBPageNo index_page_start,
                                                                          bool needs_xlog);
 
+extern void cb_logical_truncate(RelFileNode *rnode,
+                                                               ForkNumber fork,
+                                                               Buffer metabuffer,
+                                                               CBPageNo oldest_keeper,
+                                                               bool needs_xlog);
+
 #endif                                                 /* CBMODIFY_H */
index d2a10975f4d258493d3788b0cca7f24cec8cfb12..c5dca6790724da5e70a7b6b9d1bacd219e7453cf 100644 (file)
@@ -24,6 +24,7 @@
 #define        XLOG_CONVEYOR_ALLOCATE_INDEX_SEGMENT            0x30
 #define        XLOG_CONVEYOR_ALLOCATE_INDEX_PAGE                       0x40
 #define XLOG_CONVEYOR_RELOCATE_INDEX_ENTRIES           0x50
+#define XLOG_CONVEYOR_LOGICAL_TRUNCATE                         0x60
 
 typedef struct xl_cb_allocate_payload_segment
 {
@@ -63,6 +64,14 @@ typedef struct xl_cb_relocate_index_entries
 #define SizeOfCBRelocateIndexEntries \
        (offsetof(xl_cb_relocate_index_entries, index_entries))
 
+typedef struct xl_cb_logical_truncate
+{
+       CBPageNo        oldest_keeper;
+} xl_cb_logical_truncate;
+
+#define SizeOfCBLogicalTruncate \
+       (offsetof(xl_cb_logical_truncate, oldest_keeper) + sizeof(CBPageNo))
+
 extern void conveyor_desc(StringInfo buf, XLogReaderState *record);
 extern void conveyor_redo(XLogReaderState *record);
 extern const char *conveyor_identify(uint8 info);
index 4a489d9e68646c93e1f4b39fadb10b4e6f43fefd..9ab7c22a828cbf2836e7aa0aa9b3b82e006460db 100644 (file)
@@ -40,8 +40,10 @@ extern Buffer ConveyorBeltReadBuffer(ConveyorBelt *cb, CBPageNo pageno,
 extern void ConveyorBeltGetBounds(ConveyorBelt *cb,
                                                                  CBPageNo *oldest_logical_page,
                                                                  CBPageNo *next_logical_page);
-extern void ConveyorBeltTruncate(ConveyorBelt *cb, CBPageNo oldest_keeper);
+extern void ConveyorBeltLogicalTruncate(ConveyorBelt *cb,
+                                                                               CBPageNo oldest_keeper);
 extern void ConveyorBeltVacuum(ConveyorBelt *cb);
+extern void ConveyorBeltPhysicalTruncate(ConveyorBelt *cb);
 extern void ConveyorBeltCompact(ConveyorBelt *cb);
 extern ConveyorBelt *ConveyorBeltRewrite(ConveyorBelt *cb,
                                                                                 Relation rel,