END_CRIT_SECTION();
}
+
+/*
+ * Logically truncate a conveyor belt by updating its notion of the oldest
+ * logical page.
+ */
+void
+cb_logical_truncate(RelFileNode *rnode,
+ ForkNumber fork,
+ Buffer metabuffer,
+ CBPageNo oldest_keeper,
+ bool needs_xlog)
+{
+ Page metapage;
+ CBMetapageData *meta;
+
+ metapage = BufferGetPage(metabuffer);
+ meta = cb_metapage_get_special(metapage);
+
+ START_CRIT_SECTION();
+
+ cb_metapage_advance_oldest_logical_page(meta, oldest_keeper);
+
+ if (needs_xlog)
+ {
+ xl_cb_logical_truncate xlrec;
+ XLogRecPtr lsn;
+
+ xlrec.oldest_keeper = oldest_keeper;
+
+ XLogBeginInsert();
+ XLogRegisterBlock(0, rnode, fork, CONVEYOR_METAPAGE, metapage,
+ REGBUF_STANDARD);
+ XLogRegisterData((char *) &xlrec, SizeOfCBLogicalTruncate);
+ lsn = XLogInsert(RM_CONVEYOR_ID,
+ XLOG_CONVEYOR_LOGICAL_TRUNCATE);
+
+ PageSetLSN(metapage, lsn);
+ }
+
+ END_CRIT_SECTION();
+}
xlrec = (xl_cb_relocate_index_entries *) XLogRecGetData(record);
- /* NB: metapage must be last due to lock ordering rules */
if (XLogReadBufferForRedo(record, 1, &indexbuffer) == BLK_NEEDS_REDO)
{
Page indexpage = BufferGetPage(indexbuffer);
UnlockReleaseBuffer(indexbuffer);
}
+/*
+ * REDO function for cb_logical_truncate.
+ */
+static void
+cb_xlog_logical_truncate(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_cb_logical_truncate *xlrec;
+ Buffer metabuffer;
+
+ xlrec = (xl_cb_logical_truncate *) XLogRecGetData(record);
+
+ if (XLogReadBufferForRedo(record, 0, &metabuffer) == BLK_NEEDS_REDO)
+ {
+ Page metapage = BufferGetPage(metabuffer);
+ CBMetapageData *meta;
+
+ meta = cb_metapage_get_special(metapage);
+ cb_metapage_advance_oldest_logical_page(meta, xlrec->oldest_keeper);
+ PageSetLSN(metapage, lsn);
+ MarkBufferDirty(metabuffer);
+ }
+
+ if (BufferIsValid(metabuffer))
+ UnlockReleaseBuffer(metabuffer);
+}
+
/*
* Main entrypoint for conveyor belt REDO.
*/
case XLOG_CONVEYOR_RELOCATE_INDEX_ENTRIES:
cb_xlog_relocate_index_entries(record);
break;
+ case XLOG_CONVEYOR_LOGICAL_TRUNCATE:
+ cb_xlog_logical_truncate(record);
+ break;
default:
elog(PANIC, "conveyor_redo: unknown op code %u", info);
}
UnlockReleaseBuffer(metabuffer);
}
+/*
+ * Update the conveyor belt's notion of the oldest logical page to be kept.
+ *
+ * This doesn't physically shrink the relation, nor does it even make space
+ * available for reuse by future insertions. It just makes pages prior to
+ * 'oldest_keeper' unavailable, thus potentially allowing the segments
+ * containing those pages to be freed by a future call to ConveyorBeltVacuum.
+ *
+ * A call to this function shouldn't try to move the logical truncation point
+ * backwards. That is, the value of 'oldest_keeper' should always be greater
+ * than or equal to the value passed on the previous call for this conveyor
+ * belt. It also shouldn't try to move the logical truncation point beyond
+ * the current insertion point: don't try to throw away data that hasn't been
+ * inserted yet!
+ *
+ * For routine cleanup of a conveyor belt, the recommended sequence of calls
+ * is ConveyorBeltLogicalTruncate then ConveyorBeltVacuum then
+ * ConveyorBeltPhysicalTruncate. For more aggressive cleanup options, see
+ * ConveyorBeltCompact or ConveyorBeltRewrite.
+ */
+void
+ConveyorBeltLogicalTruncate(ConveyorBelt *cb, CBPageNo oldest_keeper)
+{
+ Buffer metabuffer;
+ CBMetapageData *meta;
+ CBPageNo oldest_logical_page;
+ CBPageNo next_logical_page;
+ RelFileNode *rnode;
+ bool needs_xlog;
+
+ /*
+ * We must take a cleanup lock to adjust the logical truncation point,
+ * as per the locking protocols in src/backend/access/conveyor/README.
+ */
+ metabuffer = ReadBufferExtended(cb->cb_rel, cb->cb_fork, CONVEYOR_METAPAGE,
+ RBM_NORMAL, NULL);
+ LockBufferForCleanup(metabuffer);
+
+ /* Sanity checks. */
+ meta = cb_metapage_get_special(BufferGetPage(metabuffer));
+ cb_metapage_get_bounds(meta, &oldest_logical_page, &next_logical_page);
+ if (oldest_keeper < oldest_logical_page)
+ elog(ERROR,
+ "can't move truncation point backwards from " UINT64_FORMAT " to " UINT64_FORMAT,
+ oldest_logical_page, oldest_keeper);
+ if (oldest_keeper > next_logical_page)
+ elog(ERROR,
+ "can't move truncation point to " UINT64_FORMAT " beyond insert point " UINT64_FORMAT,
+ oldest_keeper, next_logical_page);
+
+
+ /* Do the real work. */
+ rnode = &RelationGetSmgr(cb->cb_rel)->smgr_rnode.node;
+ needs_xlog = RelationNeedsWAL(cb->cb_rel) || cb->cb_fork == INIT_FORKNUM;
+ cb_logical_truncate(rnode, cb->cb_fork, metabuffer, oldest_keeper,
+ needs_xlog);
+
+ /* Release buffer lock. */
+ UnlockReleaseBuffer(metabuffer);
+}
+
/*
* Pin and return the block indicated by 'blkno', extending if needed.
*
CBPageNo index_page_start,
bool needs_xlog);
+extern void cb_logical_truncate(RelFileNode *rnode,
+ ForkNumber fork,
+ Buffer metabuffer,
+ CBPageNo oldest_keeper,
+ bool needs_xlog);
+
#endif /* CBMODIFY_H */
#define XLOG_CONVEYOR_ALLOCATE_INDEX_SEGMENT 0x30
#define XLOG_CONVEYOR_ALLOCATE_INDEX_PAGE 0x40
#define XLOG_CONVEYOR_RELOCATE_INDEX_ENTRIES 0x50
+#define XLOG_CONVEYOR_LOGICAL_TRUNCATE 0x60
typedef struct xl_cb_allocate_payload_segment
{
#define SizeOfCBRelocateIndexEntries \
(offsetof(xl_cb_relocate_index_entries, index_entries))
+typedef struct xl_cb_logical_truncate
+{
+ CBPageNo oldest_keeper;
+} xl_cb_logical_truncate;
+
+#define SizeOfCBLogicalTruncate \
+ (offsetof(xl_cb_logical_truncate, oldest_keeper) + sizeof(CBPageNo))
+
extern void conveyor_desc(StringInfo buf, XLogReaderState *record);
extern void conveyor_redo(XLogReaderState *record);
extern const char *conveyor_identify(uint8 info);
extern void ConveyorBeltGetBounds(ConveyorBelt *cb,
CBPageNo *oldest_logical_page,
CBPageNo *next_logical_page);
-extern void ConveyorBeltTruncate(ConveyorBelt *cb, CBPageNo oldest_keeper);
+extern void ConveyorBeltLogicalTruncate(ConveyorBelt *cb,
+ CBPageNo oldest_keeper);
extern void ConveyorBeltVacuum(ConveyorBelt *cb);
+extern void ConveyorBeltPhysicalTruncate(ConveyorBelt *cb);
extern void ConveyorBeltCompact(ConveyorBelt *cb);
extern ConveyorBelt *ConveyorBeltRewrite(ConveyorBelt *cb,
Relation rel,