From: Robert Haas Date: Thu, 1 Aug 2019 18:56:54 +0000 (-0400) Subject: Dummy AM for experimentation. X-Git-Url: http://git.postgresql.org/gitweb/?a=commitdiff_plain;h=cd998e5adc2acd79a1027d758b31104bb0035098;p=users%2Frhaas%2Fpostgres.git Dummy AM for experimentation. --- diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile index bf6d3fa1bd..8219b71577 100644 --- a/src/backend/access/Makefile +++ b/src/backend/access/Makefile @@ -9,6 +9,6 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist \ - table tablesample transam undo + table tablesample transam undo robert include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 640d37f37a..a18dc717cf 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -10,8 +10,8 @@ include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ - mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \ - undologdesc.o xactdesc.o xlogdesc.o + mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o robertdesc.o \ + seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o \ + undoactiondesc.o undologdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/robertdesc.c b/src/backend/access/rmgrdesc/robertdesc.c new file mode 100644 index 0000000000..5d030086ab --- /dev/null +++ b/src/backend/access/rmgrdesc/robertdesc.c @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * robertdesc.c + * rmgr descriptor routines for access/robert + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/robertdesc.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/robert_xlog.h" + +void +robert_desc(StringInfo buf, XLogReaderState *record) +{ +#if 0 + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + info &= XLOG_HEAP_OPMASK; +#endif +} + +const char * +robert_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + default: + id = "robert"; + } + + return id; +} diff --git a/src/backend/access/robert/Makefile b/src/backend/access/robert/Makefile new file mode 100644 index 0000000000..1f2f905b13 --- /dev/null +++ b/src/backend/access/robert/Makefile @@ -0,0 +1,18 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/robert +# +# IDENTIFICATION +# src/backend/access/robert/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/robert +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = robert_ddl.o robert_dml.o robert_page.o robert_scan.o \ + robert_slot.o robert_tuple.o robert_xlog.o robertam.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/robert/robert_ddl.c b/src/backend/access/robert/robert_ddl.c new file mode 100644 index 0000000000..14f38b4e25 --- /dev/null +++ b/src/backend/access/robert/robert_ddl.c @@ -0,0 +1,366 @@ +/* + * robert_ddl.c + */ + +#include "postgres.h" + +#include "access/multixact.h" +#include "access/robert_scan.h" +#include "access/robertam.h" +#include "catalog/catalog.h" +#include "catalog/storage.h" +#include "catalog/storage_xlog.h" +#include "commands/progress.h" +#include "executor/executor.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "storage/procarray.h" + +/* + * robert_relation_set_new_filenode + * + * We do everything using FullTransactionIds and do not use MultiXactIds, so no + * freezing is required. + */ +void +robert_relation_set_new_filenode(Relation rel, + const RelFileNode *newrnode, + char persistence, + TransactionId *freezeXid, + MultiXactId *minmulti) +{ + SMgrRelation srel; + + /* No freezing required. */ + *freezeXid = InvalidTransactionId; + *minmulti = InvalidMultiXactId; + + /* Create main fork . */ + srel = RelationCreateStorage(rel->rd_node, persistence); + + /* If required, set up an init fork for an unlogged table. */ + if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED) + { + smgrcreate(srel, INIT_FORKNUM, false); + log_smgrcreate(&srel->smgr_rnode.node, INIT_FORKNUM); + } +} + +/* + * robert_relation_nontransactional_truncate + */ +void +robert_relation_nontransactional_truncate(Relation rel) +{ + RelationTruncate(rel, 0); +} + +void +robert_relation_copy_data(Relation rel, const RelFileNode *newrnode) +{ +} + +void +robert_relation_copy_for_cluster(Relation NewHeap, + Relation OldHeap, Relation OldIndex, + bool use_sort, TransactionId OldestXmin, + TransactionId *xid_cutoff, + MultiXactId *multi_cutoff, + double *num_tuples, double *tups_vacuumed, + double *tups_recently_dead) +{ +} + +void +robert_relation_vacuum(Relation onerel, VacuumParams *params, + BufferAccessStrategy bstrategy) +{ +} + +bool +robert_scan_analyze_next_block(TableScanDesc scan, + BlockNumber blockno, + BufferAccessStrategy bstrategy) +{ + return false; +} + +bool +robert_scan_analyze_next_tuple(TableScanDesc scan, + TransactionId OldestXmin, + double *liverows, double *deadrows, + TupleTableSlot *slot) +{ + return false; +} + +/* + * robert_index_build_range_scan + * + * XXX. There is a HUGE amount of duplication with the heap here. + */ +double +robert_index_build_range_scan(Relation heapRelation, + Relation indexRelation, IndexInfo *indexInfo, + bool allow_sync, bool anyvisible, bool progress, + BlockNumber start_blockno, + BlockNumber numblocks, + IndexBuildCallback callback, + void *callback_state, + TableScanDesc scan) +{ + RobertScanDesc hscan; + bool is_system_catalog; + bool checking_uniqueness; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + double reltuples; + ExprState *predicate; + TupleTableSlot *slot; + EState *estate; + ExprContext *econtext; + Snapshot snapshot; + bool need_unregister_snapshot = false; + TransactionId OldestXmin; + BlockNumber previous_blkno = InvalidBlockNumber; + + /* + * sanity checks + */ + Assert(OidIsValid(indexRelation->rd_rel->relam)); + + /* Remember if it's a system catalog */ + is_system_catalog = IsSystemRelation(heapRelation); + + /* See whether we're verifying uniqueness/exclusion properties */ + checking_uniqueness = (indexInfo->ii_Unique || + indexInfo->ii_ExclusionOps != NULL); + + /* + * "Any visible" mode is not compatible with uniqueness checks; make sure + * only one of those is requested. + */ + Assert(!(anyvisible && checking_uniqueness)); + + /* + * Need an EState for evaluation of index expressions and partial-index + * predicates. Also a slot to hold the current tuple. + */ + estate = CreateExecutorState(); + econtext = GetPerTupleExprContext(estate); + slot = table_slot_create(heapRelation, NULL); + + /* Arrange for econtext's scan tuple to be the tuple under test */ + econtext->ecxt_scantuple = slot; + + /* Set up execution state for predicate, if any. */ + predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate); + + /* + * Prepare for scan of the base relation. In a normal index build, we use + * SnapshotAny because we must retrieve all tuples and do our own time + * qual checks (because we have to index RECENTLY_DEAD tuples). In a + * concurrent build, or during bootstrap, we take a regular MVCC snapshot + * and index whatever's live according to that. + */ + OldestXmin = InvalidTransactionId; + + /* okay to ignore lazy VACUUMs here */ + if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent) + OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM); + + if (!scan) + { + /* + * Serial index build. + * + * Must begin our own heap scan in this case. We may also need to + * register a snapshot whose lifetime is under our direct control. + */ + if (!TransactionIdIsValid(OldestXmin)) + { + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + need_unregister_snapshot = true; + } + else + snapshot = SnapshotAny; + + scan = table_beginscan_strat(heapRelation, /* relation */ + snapshot, /* snapshot */ + 0, /* number of keys */ + NULL, /* scan key */ + true, /* buffer access strategy OK */ + allow_sync); /* syncscan OK? */ + } + else + { + /* + * Parallel index build. + * + * Parallel case never registers/unregisters own snapshot. Snapshot + * is taken from parallel heap scan, and is SnapshotAny or an MVCC + * snapshot, based on same criteria as serial case. + */ + Assert(!IsBootstrapProcessingMode()); + Assert(allow_sync); + snapshot = scan->rs_snapshot; + } + + hscan = (RobertScanDesc) scan; + + /* Publish number of blocks to scan */ + if (progress) + { + BlockNumber nblocks; + + if (hscan->rrs_base.rs_parallel != NULL) + { + ParallelBlockTableScanDesc pbscan; + + pbscan = (ParallelBlockTableScanDesc) hscan->rrs_base.rs_parallel; + nblocks = pbscan->phs_nblocks; + } + else + nblocks = hscan->rrs_nblocks; + + pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_TOTAL, + nblocks); + } + + /* + * Must call GetOldestXmin() with SnapshotAny. Should never call + * GetOldestXmin() with MVCC snapshot. (It's especially worth checking + * this for parallel builds, since ambuild routines that support parallel + * builds must work these details out for themselves.) + */ + Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot)); + Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) : + !TransactionIdIsValid(OldestXmin)); + Assert(snapshot == SnapshotAny || !anyvisible); + + /* set our scan endpoints */ + if (!allow_sync) + { + hscan->rrs_startblock = start_blockno; + hscan->rrs_numblocks = numblocks; + } + else + { + /* syncscan can only be requested on whole relation */ + Assert(start_blockno == 0); + Assert(numblocks == InvalidBlockNumber); + } + + reltuples = 0; + + /* + * Scan all tuples in the base relation. + */ + while (robert_scan_getnextslot(scan, ForwardScanDirection, slot)) + { + bool tupleIsAlive; + HeapTupleData htdata; + BlockNumber blocks_done = robert_scan_get_blocks_done(hscan); + + CHECK_FOR_INTERRUPTS(); + + /* Report scan progress, if asked to. */ + if (progress && blocks_done != previous_blkno) + { + pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE, + blocks_done); + previous_blkno = blocks_done; + } + + /* do our own time qual check */ + /* XXX. This just completely ignores MVCC considerations. */ + tupleIsAlive = true; + reltuples += 1; + + MemoryContextReset(econtext->ecxt_per_tuple_memory); + + /* + * In a partial index, discard tuples that don't satisfy the + * predicate. + */ + if (predicate != NULL) + { + if (!ExecQual(predicate, econtext)) + continue; + } + + /* + * For the current heap tuple, extract all the attributes we use in + * this index, and note which are null. This also performs evaluation + * of any expressions needed. + */ + FormIndexDatum(indexInfo, + slot, + estate, + values, + isnull); + + /* + * Call the AM's callback routine to process the tuple + * + * You'd think we should go ahead and build the index tuple here, but + * some index AMs want to do further processing on the data first. So + * pass the values[] and isnull[] arrays, instead. + * + * XXX. Why the heck does this callback accept a HeapTuple? + */ + htdata.t_data = NULL; + htdata.t_len = 0; + ItemPointerCopy(&slot->tts_tid, &htdata.t_self); + htdata.t_tableOid = RelationGetRelid(heapRelation); + callback(indexRelation, &htdata, values, isnull, tupleIsAlive, + callback_state); + + /* Stop if a block limit was specified and has been reached. */ + if (numblocks != InvalidBlockNumber && blocks_done >= numblocks) + break; + } + + /* Report scan progress one last time. */ + if (progress) + { + BlockNumber blks_done; + + if (hscan->rrs_base.rs_parallel != NULL) + { + ParallelBlockTableScanDesc pbscan; + + pbscan = (ParallelBlockTableScanDesc) hscan->rrs_base.rs_parallel; + blks_done = pbscan->phs_nblocks; + } + else + blks_done = hscan->rrs_nblocks; + + pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE, + blks_done); + } + + table_endscan(scan); + + /* we can now forget our snapshot, if set and registered by us */ + if (need_unregister_snapshot) + UnregisterSnapshot(snapshot); + + ExecDropSingleTupleTableSlot(slot); + + FreeExecutorState(estate); + + /* These may have been pointing to the now-gone estate */ + indexInfo->ii_ExpressionsState = NIL; + indexInfo->ii_PredicateState = NULL; + + return reltuples; +} + +void +robert_index_validate_scan(Relation heap_rel, Relation index_rel, + IndexInfo *index_info, Snapshot snapshot, + ValidateIndexState *state) +{ +} diff --git a/src/backend/access/robert/robert_dml.c b/src/backend/access/robert/robert_dml.c new file mode 100644 index 0000000000..258a6b1fda --- /dev/null +++ b/src/backend/access/robert/robert_dml.c @@ -0,0 +1,409 @@ +/* + * robert_dml.c + */ + +#include "postgres.h" + +#include "access/robert_page.h" +#include "access/robert_slot.h" +#include "access/robert_xlog.h" +#include "access/robertam.h" +#include "access/undoaccess.h" +#include "access/undorecord.h" +#include "access/xact.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/freespace.h" +#include "storage/procarray.h" + +typedef enum +{ + CHOOSE_BLOCK_WITH_FREESPACE, + CHOOSE_LAST_BLOCK, + EXTEND_RELATION +} BufferForTupleStrategy; + +static BlockNumber +robert_choose_block(Relation rel, int len, BufferForTupleStrategy *strategy) +{ + BlockNumber blkno; + + switch (*strategy) + { + case CHOOSE_BLOCK_WITH_FREESPACE: + blkno = GetPageWithFreeSpace(rel, len + 0.04 * BLCKSZ); /* XXX */ + if (blkno != InvalidBlockNumber) + return blkno; + *strategy = CHOOSE_LAST_BLOCK; + /* FALLTHROUGH */ + + case CHOOSE_LAST_BLOCK: + blkno = RelationGetNumberOfBlocks(rel); + *strategy = EXTEND_RELATION; + if (blkno > 0) + return blkno - 1; + /* FALLTHROUGH */ + + case EXTEND_RELATION: + return P_NEW; + } + + pg_unreachable(); +} + +/* + * Attempt to insert a tuple into a specified block of a relation. + * + * If the page is too full, either in terms of storage or of line pointers, + * then this function will do nothing and return false. Otherwise, it will + * insert the tuple and return true. + */ +static bool +robert_tuple_try_insert(Relation rel, BlockNumber blkno, RobertTuple tuple, + CommandId cid, int options) +{ + Buffer buffer = ReadBuffer(rel, blkno); + Page page = BufferGetPage(buffer); + OffsetNumber offnum; + UndoRecordInsertContext context = {{0}}; + UnpackedUndoRecord undorecord; + FullTransactionId fxid; + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Choose an offset number at which to insert this tuple. + * + * If this is a new page, always use FirstOffsetNumber. Otherwise, let + * robert_page_free_offset find us a usable offset number. + */ + if (blkno == P_NEW) + { + /* Initialize new page. */ + PageInit(page, BufferGetPageSize(buffer), 0); + + /* Find real page number. */ + blkno = BufferGetBlockNumber(buffer); + + /* All offsets available, so use the first one. */ + offnum = FirstOffsetNumber; + } + else + { + /* Look for a usable offset. */ + offnum = robert_page_free_offset(page, tuple->r_len); + if (offnum == InvalidOffsetNumber) + { + /* This page is not usable for this tuple; e.g. too full. */ + UnlockReleaseBuffer(buffer); + return false; + } + } + + /* Prepare undo record. */ + fxid = GetTopFullTransactionId(); + undorecord.uur_rmid = RM_ROBERT_ID; + undorecord.uur_type = ROBERT_UNDO_INSERT; + undorecord.uur_info = 0; + undorecord.uur_reloid = RelationGetRelid(rel); + undorecord.uur_cid = GetCurrentCommandId(true); + undorecord.uur_fork = MAIN_FORKNUM; + undorecord.uur_prevundo = InvalidUndoRecPtr; + undorecord.uur_block = blkno; + undorecord.uur_offset = offnum; + undorecord.uur_fxid = fxid; + undorecord.uur_payload.len = 0; + undorecord.uur_tuple.len = 0; + undorecord.uur_txn = NULL; + undorecord.uur_logswitch = NULL; + + /* Prepare to insert the undo record and update the tuple control data. */ + BeginUndoRecordInsert(&context, UndoLogCategoryForRelation(rel), 1, NULL); + tuple->r_tableOid = undorecord.uur_reloid; + ItemPointerSet(&tuple->r_self, blkno, offnum); + tuple->r_data->r_undoptr = PrepareUndoInsert(&context, &undorecord, + MyDatabaseId); + + /* Perform the actual insert. */ + START_CRIT_SECTION(); + robert_page_add_item(page, offnum, tuple); + MarkBufferDirty(buffer); + InsertPreparedUndo(&context); + END_CRIT_SECTION(); + + /* All done. */ + FinishUndoRecordInsert(&context); + UnlockReleaseBuffer(buffer); + return true; +} + +/* + * robert_tuple_insert + * + * Insert a single tuple. + */ +void +robert_tuple_insert(Relation rel, TupleTableSlot *slot, + CommandId cid, int options, + BulkInsertStateData *bistate) +{ + RobertTuple tuple = ((RobertTupleTableSlot *) slot)->tuple; + bool done = false; + BufferForTupleStrategy strategy = CHOOSE_BLOCK_WITH_FREESPACE; + + tuple = robert_toast_tuple(rel, slot, options); + + while (!done) + { + BlockNumber blkno; + + blkno = robert_choose_block(rel, tuple->r_len, &strategy); + done = robert_tuple_try_insert(rel, blkno, tuple, cid, options); + } + + elog(NOTICE, "insert %u (%u,%u): %s", + RelationGetRelid(rel), + ItemPointerGetBlockNumber(&tuple->r_self), + ItemPointerGetOffsetNumber(&tuple->r_self), + robert_print_tuple(tuple, slot->tts_tupleDescriptor)); +} + +void +robert_tuple_insert_speculative(Relation rel, TupleTableSlot *slot, + CommandId cid, int options, + BulkInsertStateData *bistate, + uint32 specToken) +{ +} + +void +robert_tuple_complete_speculative(Relation rel, TupleTableSlot *slot, + uint32 specToken, bool succeeded) +{ +} + +void +robert_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, + CommandId cid, int options, + BulkInsertStateData *bistate) +{ +} + +/* + * Check whether it's OK to modify a tuple. + * + * This function determines whether or not it's OK for us to modify a given + * tuple, similar to HeapTupleSatisfiesUpdate. + */ +static TM_Result +robert_check_modify(UndoRecPtr undoptr, Snapshot snapshot, CommandId cid) +{ + UndoRecordFetchContext context; + UnpackedUndoRecord *uur; + TM_Result result = TM_Ok; + /* XXX bool recheck_for_invisible = false; */ + TransactionId uxid; + CommandId ucid; + uint8 utype; + + BeginUndoFetch(&context); + + uur = UndoFetchRecord(&context, undoptr); + utype = uur->uur_type; + uxid = XidFromFullTransactionId(uur->uur_fxid); + ucid = uur->uur_cid; + UndoRecordRelease(uur); + + if (TransactionIdIsCurrentTransactionId(uxid)) + { + /* XXX. Not always. */ + result = TM_SelfModified; + } + else if (TransactionIdIsInProgress(uxid)) + { + /* XXX Do stuff. */ + } + else if (TransactionIdDidCommit(uxid)) + { + /* + * XXX. Is the snapshot guaranteed to be an MVCC snapshot? If not, + * what are we supposed to do in that case? + */ + Assert(IsMVCCSnapshot(snapshot)); + /* XXX. If XID is visible, then do stuff, otherwise different stuff? */ + } + else + { + /* It must have aborted but not yet been cleaned up. */ + } + + /* + * XXX. We need to look up uur.uur_prevundo here, but we probably + * don't want to do that while holding the buffer lock. + * + * We need an equivalent of HeapTupleSatisfiesUpdate. Some notes on + * that: + * + * If the undo record to which uur.uur_prevundo points doesn't exist any + * more or if the XID/CID of the returned record are visible to our + * snapshot, then TM_Ok. If the XID is our XID and the tuple isn't + * visible because the CID is too new, then TM_SelfModified (unless we + * couldn't see the old version of the row either, in which case + * TM_Invisible). Otherwise, if the XID is still in progress, then + * TM_BeingModified (unless we couldn't see the old version of the row + * either, in which case TM_Invisible). If it's aborted, then do + * page-at-a-time undo and try again. + * + * Otherwise, it's committed but not visible to our snapshot. In that + * case, the answer must be TM_Invisible if the operation was an insert + * (and we should throw an error instead of returning TM_Invisible to the + * caller), TM_Updated if it was an update, or TM_Deleted if it was a + * delete. + * + * The discussion above only considers insert, update, and delete. If + * there were locks, we'd need to return TM_BeingModified if any existing + * lock taken by an XID other than our own conflicts with the lock + * required by the current operation. + */ + + return result; +} + +/* + * robert_tuple_delete + * + * Delete a single tuple. + */ +TM_Result +robert_tuple_delete(Relation rel, ItemPointer tid, + CommandId cid, Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, bool changingPart) +{ + Buffer buffer; + ItemId iid; + UndoRecordInsertContext context = {{0}}; + UnpackedUndoRecord undorecord; + FullTransactionId fxid; + UndoRecPtr undoptr; + UndoRecPtr checked_undoptr = InvalidUndoRecPtr; + RobertTupleHeader td; + Page page; + + /* + * Prepare undo record contents, except for the previous block pointer, + * which we can't finalize without taking the buffer lock. + */ + fxid = GetTopFullTransactionId(); + undorecord.uur_rmid = RM_ROBERT_ID; + undorecord.uur_type = ROBERT_UNDO_DELETE; + undorecord.uur_info = 0; + undorecord.uur_reloid = RelationGetRelid(rel); + undorecord.uur_cid = GetCurrentCommandId(true); + undorecord.uur_fork = MAIN_FORKNUM; + undorecord.uur_prevundo = InvalidUndoRecPtr; + undorecord.uur_block = ItemPointerGetBlockNumber(tid); + undorecord.uur_offset = ItemPointerGetOffsetNumber(tid); + undorecord.uur_fxid = fxid; + undorecord.uur_payload.len = 0; + undorecord.uur_tuple.len = 0; + undorecord.uur_txn = NULL; + undorecord.uur_logswitch = NULL; + + /* Pin the target buffer. */ + buffer = ReadBuffer(rel, undorecord.uur_block); + page = BufferGetPage(buffer); + + while (1) + { + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Prepare undo chain. + * + * The tuple's old undo pointer needs to be stored in the undo record + * we're about to insert, and the undo pointer we're about to insert + * will need to be stored into the page. + */ + iid = PageGetItemId((PageHeader) page, undorecord.uur_offset); + td = (RobertTupleHeader) PageGetItem(page, iid); + memcpy(&undorecord.uur_prevundo, &td->r_undoptr, sizeof(UndoRecPtr)); + + /* + * Before actually performing the deletion, we must check whether it's + * really OK: somebody else could have updated the tuple and, if so, + * the version visible to our snapshot might no longer be the latest + * version. + * + * XXX. This code doesn't know anything about tuple locks, and if there + * are any of those, we need to wait for them, too. + */ + if (undorecord.uur_prevundo != checked_undoptr && + !UndoRecPtrIsDiscarded(undorecord.uur_prevundo)) + { + TM_Result result; + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + result = robert_check_modify(undorecord.uur_prevundo, snapshot, + cid); + if (result != TM_Ok) + { + ReleaseBuffer(buffer); + return result; + } + checked_undoptr = undorecord.uur_prevundo; + continue; + } + + /* Looks OK, so proceed with deletion. */ + break; + } + + /* + * Before entering the critical section, prepare for 1 undo insertion + * and stage the record to be inserted. + */ + BeginUndoRecordInsert(&context, UndoLogCategoryForRelation(rel), 1, NULL); + undoptr = PrepareUndoInsert(&context, &undorecord, MyDatabaseId); + + /* + * Perform the actual delete. + * + * Note that we use ItemIdMarkDead here, not ItemIdSetDead. The storage + * has to remain, because the tuple is still visible to concurrent + * transactions. + */ + START_CRIT_SECTION(); + ItemIdMarkDead(iid); + MarkBufferDirty(buffer); + memcpy(&td->r_undoptr, &undoptr, sizeof(UndoRecPtr)); + InsertPreparedUndo(&context); + END_CRIT_SECTION(); + + /* All done. */ + FinishUndoRecordInsert(&context); + UnlockReleaseBuffer(buffer); + return TM_Ok; +} + +TM_Result +robert_tuple_update(Relation rel, ItemPointer otid, + TupleTableSlot *slot, CommandId cid, Snapshot snapshot, + Snapshot crosscheck, bool wait, TM_FailureData *tmfd, + LockTupleMode *lockmode, bool *update_indexes) +{ + return TM_Ok; +} + +TM_Result +robert_tuple_lock(Relation rel, ItemPointer tid, + Snapshot snapshot, TupleTableSlot *slot, CommandId cid, + LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd) +{ + return TM_Ok; +} + +void +robert_finish_bulk_insert(Relation rel, int options) +{ +} diff --git a/src/backend/access/robert/robert_page.c b/src/backend/access/robert/robert_page.c new file mode 100644 index 0000000000..fe826e60df --- /dev/null +++ b/src/backend/access/robert/robert_page.c @@ -0,0 +1,90 @@ +/*------------------------------------------------------------------------- + * + * robert_page.c + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/robert_page.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/robert_page.h" +#include "access/robert_tuple.h" + +/* + * robert_page_add_item + * + * Insert a tuple into a page. + */ +void +robert_page_add_item(Page page, OffsetNumber offnum, RobertTuple tuple) +{ + Item item = (Item) tuple->r_data; + Size size = tuple->r_len; + PageHeader phdr = (PageHeader) page; + ItemId iid = PageGetItemId(phdr, offnum); + OffsetNumber maxoff = PageGetMaxOffsetNumber(page) + 1; + Size lower = phdr->pd_lower; + Size upper = (Size) phdr->pd_upper - size; + + Assert(offnum <= maxoff); + + if (offnum == maxoff) + lower += sizeof(ItemIdData); + + Assert(lower <= upper); + + memcpy((char *) page + upper, item, size); + phdr->pd_lower = lower; + phdr->pd_upper = upper; + ItemIdSetNormal(iid, upper, size); +} + +/* + * robert_page_free_offset + * + * Returns an offset at which a tuple of a given size can be inserted into the + * given page. If there are no unused line pointers and no more can be added, + * or if the item isn't going to fit on the page, returns InvalidOffsetNumber. + */ +OffsetNumber +robert_page_free_offset(Page page, Size size) +{ + PageHeader phdr = (PageHeader) page; + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + OffsetNumber offnum; + Size avail = phdr->pd_upper - phdr->pd_lower; + + if (unlikely(phdr->pd_upper > BLCKSZ || phdr->pd_lower > phdr->pd_upper)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("corrupted page header"))); + + if (avail < size) + return InvalidOffsetNumber; + + for (offnum = FirstOffsetNumber; offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemId iid = PageGetItemId(phdr, offnum); + + if (!ItemIdIsUsed(iid)) + return offnum; + } + + /* + * XXX. It's just plain crummy that we have to use a heap-specific + * constant for this, but there is plenty of code that thinks this is a + * universal limit rather than a heap-specific one. + */ + if (avail < sizeof(ItemIdData) + size || offnum >= MaxHeapTuplesPerPage) + return InvalidOffsetNumber; + + return offnum; +} diff --git a/src/backend/access/robert/robert_scan.c b/src/backend/access/robert/robert_scan.c new file mode 100644 index 0000000000..346a48292e --- /dev/null +++ b/src/backend/access/robert/robert_scan.c @@ -0,0 +1,632 @@ +/* + * robert_scan.c + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/robert_scan.h" +#include "access/robert_slot.h" +#include "access/tableam.h" +#include "access/skey.h" +#include "access/undoaccess.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "storage/predicate.h" + +typedef struct IndexFetchRobertData +{ + IndexFetchTableData xs_base; + Buffer xs_cbuf; /* current buffer, if any */ +} IndexFetchRobertData; + +static void robert_scan_initialize(RobertScanDesc scan, ScanKey key, + bool keep_startblock); +static void robert_scan_getnextblock(RobertScanDesc scan, + ScanDirection direction, + BlockNumber blkno); +static bool robert_scan_getnexttuple(RobertScanDesc scan, + TupleTableSlot *slot); + +/* + * Prepare to scan a robert table. + */ +TableScanDesc +robert_scan_begin(Relation rel, Snapshot snapshot, + int nkeys, ScanKeyData *key, + ParallelTableScanDesc pscan, uint32 flags) +{ + RobertScanDesc scan; + + /* Out of sheer paranoia, hold a ref count while scanning relation. */ + RelationIncrementReferenceCount(rel); + + /* Allocate and initialize scan descriptor. */ + scan = palloc(sizeof(RobertScanDescData)); + scan->rrs_base.rs_rd = rel; + scan->rrs_base.rs_snapshot = snapshot; + scan->rrs_base.rs_nkeys = nkeys; + if (key == NULL) + scan->rrs_base.rs_key = NULL; + else + { + scan->rrs_base.rs_key = palloc(nkeys * sizeof(ScanKeyData)); + memcpy(scan->rrs_base.rs_key, key, nkeys * sizeof(ScanKeyData)); + } + scan->rrs_base.rs_parallel = pscan; + scan->rrs_base.rs_flags = flags; + scan->rrs_cpage = palloc(BLCKSZ); + scan->rrs_strategy = NULL; + robert_scan_initialize(scan, key, false); + + /* For sequential and sample scans, just predicate-lock the whole thing. */ + if ((scan->rrs_base.rs_flags & (SO_TYPE_SEQSCAN | SO_TYPE_SAMPLESCAN)) != 0) + { + Assert(snapshot); + PredicateLockRelation(rel, snapshot); + } + + return &scan->rrs_base; +} + +/* + * Clean up after completing a scan of a robert table. + */ +void +robert_scan_end(TableScanDesc sscan) +{ + RobertScanDesc scan = (RobertScanDesc) sscan; + + RelationDecrementReferenceCount(scan->rrs_base.rs_rd); + + if (scan->rrs_base.rs_key) + pfree(scan->rrs_base.rs_key); + if (scan->rrs_base.rs_flags & SO_TEMP_SNAPSHOT) + UnregisterSnapshot(scan->rrs_base.rs_snapshot); + pfree(scan->rrs_cpage); + pfree(scan); +} + +void +robert_scan_rescan(TableScanDesc sscan, ScanKeyData *key, + bool set_params, bool allow_strat, + bool allow_sync, bool allow_pagemode) +{ +} + +/* + * robert_scan_initialize + * + * Perform common initialization required by both robert_scan_begin and + * robert_scan_rescan. + * + * XXX. An awful lot of this logic duplicates initscan(). Instead of copying + * the code, we should try to have common code that can be used by any + * block-based table AM. + */ +static void +robert_scan_initialize(RobertScanDesc scan, ScanKey key, bool keep_startblock) +{ + ParallelBlockTableScanDesc bpscan = NULL; + bool allow_strat; + bool allow_sync; + + /* + * Determine the number of blocks we need to scan. + * + * If the relation is extended, the new tuples won't be visible to this + * scan anyway (except for non-MVCC scans, which are unstable anyway). + */ + if (scan->rrs_base.rs_parallel != NULL) + { + bpscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel; + scan->rrs_nblocks = bpscan->phs_nblocks; + } + else + scan->rrs_nblocks = RelationGetNumberOfBlocks(scan->rrs_base.rs_rd); + + /* + * This algorithm may not be for the best, but it is the one used by the + * 'heap' table AM, so we'll stick with it for the sake of tradition. + */ + if (!RelationUsesLocalBuffers(scan->rrs_base.rs_rd) && + scan->rrs_nblocks > NBuffers / 4) + { + allow_strat = (scan->rrs_base.rs_flags & SO_ALLOW_STRAT) != 0; + allow_sync = (scan->rrs_base.rs_flags & SO_ALLOW_SYNC) != 0; + } + else + allow_strat = allow_sync = false; + + if (allow_strat) + { + /* During a rescan, keep the previous strategy object. */ + if (scan->rrs_strategy == NULL) + scan->rrs_strategy = GetAccessStrategy(BAS_BULKREAD); + } + else + { + if (scan->rrs_strategy != NULL) + FreeAccessStrategy(scan->rrs_strategy); + scan->rrs_strategy = NULL; + } + + if (scan->rrs_base.rs_parallel != NULL) + { + /* For parallel scan, believe whatever ParallelTableScanDesc says. */ + if (scan->rrs_base.rs_parallel->phs_syncscan) + scan->rrs_base.rs_flags |= SO_ALLOW_SYNC; + else + scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC; + } + else if (keep_startblock) + { + /* + * When rescanning, we want to keep the previous startblock setting, + * so that rewinding a cursor doesn't generate surprising results. + * Reset the active syncscan setting, though. + */ + if (allow_sync && synchronize_seqscans) + scan->rrs_base.rs_flags |= SO_ALLOW_SYNC; + else + scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC; + } + else if (allow_sync && synchronize_seqscans) + { + scan->rrs_base.rs_flags |= SO_ALLOW_SYNC; + /* XXX ss_get_location is in heapam.h */ + scan->rrs_startblock = + ss_get_location(scan->rrs_base.rs_rd, scan->rrs_nblocks); + } + else + { + scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC; + scan->rrs_startblock = 0; + } + + scan->rrs_numblocks = InvalidBlockNumber; + scan->rrs_state = ROBERT_SCAN_NOT_STARTED; + scan->rrs_cblock = InvalidBlockNumber; +} + +/* + * robert_scan_getnextslot + * + * Get the next tuple from a sequential scan and store it into the given slot. + * + * XXX. The name of this method does not make it abundantly clear that it only + * applies to sequential scans, but an examination of the heap code shows that + * to be the case. + */ +bool +robert_scan_getnextslot(TableScanDesc sscan, + ScanDirection direction, + TupleTableSlot *slot) +{ + RobertScanDesc scan = (RobertScanDesc) sscan; + + /* Initialize the scan if that's not yet done. */ + if (scan->rrs_state == ROBERT_SCAN_NOT_STARTED) + { + BlockNumber blkno; + + /* If there's nothing to scan, give up immediately. */ + if (scan->rrs_nblocks == 0 || scan->rrs_numblocks == 0) + { + ExecClearTuple(slot); + return false; + } + + /* Figure out which block to read first. */ + if (ScanDirectionIsBackward(direction)) + { + /* don't report syncscans when scanning backwards */ + scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC; + + /* start from last page of the scan */ + if (scan->rrs_startblock > 0) + blkno = scan->rrs_startblock - 1; + else + blkno = scan->rrs_nblocks - 1; + } + else if (ScanDirectionIsNoMovement(direction)) + { + /* no prior tuple, so refetch yields nothing */ + ExecClearTuple(slot); + return false; + } + else if (scan->rrs_base.rs_parallel != NULL) + { + ParallelBlockTableScanDesc pbscan; + + pbscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel; + table_block_parallelscan_startblock_init(scan->rrs_base.rs_rd, + pbscan); + blkno = table_block_parallelscan_nextpage(scan->rrs_base.rs_rd, + pbscan); + if (blkno == InvalidBlockNumber) + { + /* other participants already finished scan */ + ExecClearTuple(slot); + return false; + } + } + else + blkno = scan->rrs_startblock; + + /* Read the chosen block. */ + robert_scan_getnextblock(scan, direction, blkno); + } + + while (1) + { + if (scan->rrs_state == ROBERT_SCAN_TUPLE_DONE) + { + /* Advance to next tuple. */ + if (ScanDirectionIsBackward(direction)) + { + if (scan->rrs_coffset == FirstOffsetNumber) + scan->rrs_state = ROBERT_SCAN_BLOCK_DONE; + else + { + scan->rrs_coffset = OffsetNumberPrev(scan->rrs_coffset); + scan->rrs_state = ROBERT_SCAN_READY; + } + } + else if (ScanDirectionIsForward(direction)) + { + if (scan->rrs_coffset == scan->rrs_lastoffset) + scan->rrs_state = ROBERT_SCAN_BLOCK_DONE; + else + { + scan->rrs_coffset = OffsetNumberNext(scan->rrs_coffset); + scan->rrs_state = ROBERT_SCAN_READY; + } + } + else + { + Assert(ScanDirectionIsNoMovement(direction)); + /* nothing to do */ + } + } + else if (scan->rrs_state == ROBERT_SCAN_BLOCK_DONE) + { + bool finished; + BlockNumber blkno; + + /* Select new block. */ + if (ScanDirectionIsBackward(direction)) + { + finished = (scan->rrs_cblock == scan->rrs_startblock) || + (scan->rrs_numblocks != InvalidBlockNumber ? + --scan->rrs_numblocks == 0 : false); + blkno = (scan->rrs_cblock == 0) ? scan->rrs_nblocks : + scan->rrs_cblock - 1; + } + else if (scan->rrs_base.rs_parallel != NULL) + { + ParallelBlockTableScanDesc pbscan; + + pbscan = (ParallelBlockTableScanDesc) + scan->rrs_base.rs_parallel; + blkno = table_block_parallelscan_nextpage(scan->rrs_base.rs_rd, + pbscan); + finished = (blkno == InvalidBlockNumber); + } + else + { + blkno = scan->rrs_cblock + 1; + if (blkno >= scan->rrs_nblocks) + blkno = 0; + finished = (blkno == scan->rrs_startblock) || + (scan->rrs_numblocks != InvalidBlockNumber ? + --scan->rrs_numblocks == 0 : false); + if (scan->rrs_base.rs_flags & SO_ALLOW_SYNC) + ss_report_location(scan->rrs_base.rs_rd, blkno); + } + + /* Scan is done if no blocks remain. */ + if (finished) + { + scan->rrs_cblock = InvalidBlockNumber; + scan->rrs_state = ROBERT_SCAN_NOT_STARTED; + ExecClearTuple(slot); + return false; + } + + /* Read the chosen block. */ + robert_scan_getnextblock(scan, direction, blkno); + } + else + { + Assert(scan->rrs_state == ROBERT_SCAN_READY); + scan->rrs_state = ROBERT_SCAN_TUPLE_DONE; + + if (!robert_scan_getnexttuple(scan, slot)) + continue; + pgstat_count_heap_getnext(scan->rrs_base.rs_rd); + return true; + } + } +} + +/* + * robert_scan_getnextblock + * + * Read the indicated block of the relation and update the scan state + * accordingly. + */ +static void +robert_scan_getnextblock(RobertScanDesc scan, ScanDirection direction, + BlockNumber blkno) +{ + Buffer buffer; + + /* Copy the new page and remember the page number. */ + buffer = ReadBufferExtended(scan->rrs_base.rs_rd, MAIN_FORKNUM, blkno, + RBM_NORMAL, scan->rrs_strategy); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + memcpy(scan->rrs_cpage, BufferGetPage(buffer), BLCKSZ); + UnlockReleaseBuffer(buffer); + scan->rrs_cblock = blkno; + + /* Determine how many tuples there may be on the page. */ + scan->rrs_lastoffset = PageGetMaxOffsetNumber(scan->rrs_cpage); + + /* Determine starting scan position. */ + if (ScanDirectionIsBackward(direction)) + scan->rrs_coffset = scan->rrs_lastoffset; + else + scan->rrs_coffset = FirstOffsetNumber; + + /* If no tuples, tell caller to request the next block. */ + if (scan->rrs_lastoffset == 0) + scan->rrs_state = ROBERT_SCAN_BLOCK_DONE; + else + scan->rrs_state = ROBERT_SCAN_READY; +} + +/* + * robert_scan_getnexttuple + * + * The scan contains a page (scan->rrs_cpage) and identifies a particular + * tuple of interest (scan->rrs_coffset). Extract that tuple and store it + * into the given slot. + */ +static bool +robert_scan_getnexttuple(RobertScanDesc scan, TupleTableSlot *slot) +{ + ItemId iid = PageGetItemId(scan->rrs_cpage, scan->rrs_coffset); + RobertTupleHeader td; + Size len; + + /* If the item has no storage, it is definitely not visible. */ + if (!ItemIdHasStorage(iid)) + return false; + + /* Extract tuple pointer and length. */ + td = (RobertTupleHeader) PageGetItem(scan->rrs_cpage, iid); + len = ItemIdGetLength(iid); + + /* Return visible version of tuple. */ + return robert_slot_store_visible(slot, td, len, slot->tts_tableOid, + scan->rrs_cblock, scan->rrs_coffset, + scan->rrs_base.rs_snapshot, + ItemIdIsDead(iid)); +} + +/* + * robert_scan_get_blocks_done + * + * Return the number of blocks that have been read by this scan since + * starting. For a non-parallel scan, this should be completely accurate. + * For a parallel scan, it will discount the effects of other backends. + */ +BlockNumber +robert_scan_get_blocks_done(RobertScanDesc scan) +{ + ParallelBlockTableScanDesc bpscan = NULL; + BlockNumber startblock; + BlockNumber blocks_done; + + if (scan->rrs_base.rs_parallel != NULL) + { + bpscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel; + startblock = bpscan->phs_startblock; + } + else + startblock = scan->rrs_startblock; + + /* + * Might have wrapped around the end of the relation, if startblock was + * not zero. + */ + if (scan->rrs_cblock > startblock) + blocks_done = scan->rrs_cblock - startblock; + else + { + BlockNumber nblocks; + + nblocks = bpscan != NULL ? bpscan->phs_nblocks : scan->rrs_nblocks; + blocks_done = nblocks - startblock + scan->rrs_cblock; + } + + /* If we're also done with the current block, add one. */ + if (scan->rrs_state == ROBERT_SCAN_BLOCK_DONE) + blocks_done++; + + return blocks_done; +} + +/* + * robert_index_fetch_begin + * + * Prepare for index fetches by allocating a new IndexFetchRobertData. + */ +extern IndexFetchTableData * +robert_index_fetch_begin(Relation rel) +{ + IndexFetchRobertData *scan = palloc0(sizeof(IndexFetchRobertData)); + + scan->xs_base.rel = rel; + scan->xs_cbuf = InvalidBuffer; + + return &scan->xs_base; +} + +/* + * robert_index_fetch_reset + * + * Release any buffer pin held from a previous index fetch. + */ +void +robert_index_fetch_reset(IndexFetchTableData *data) +{ + IndexFetchRobertData *scan = (IndexFetchRobertData *) data; + + if (BufferIsValid(scan->xs_cbuf)) + { + ReleaseBuffer(scan->xs_cbuf); + scan->xs_cbuf = InvalidBuffer; + } +} + +/* + * robert_index_fetch_end + * + * Clean up when finished with index fetches. + */ +void +robert_index_fetch_end(IndexFetchTableData *data) +{ + robert_index_fetch_reset(data); + pfree(data); +} + +/* + * robert_index_fetch_tuple + * + * XXX. This function is missing MVCC handling. + */ +bool +robert_index_fetch_tuple(IndexFetchTableData *data, + ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, + bool *call_again, bool *all_dead) +{ + IndexFetchRobertData *scan = (IndexFetchRobertData *) data; + Oid reloid = RelationGetRelid(scan->xs_base.rel); + BlockNumber blkno = ItemPointerGetBlockNumber(tid); + OffsetNumber offnum = ItemPointerGetOffsetNumber(tid); + Page page; + bool result = false; + + scan->xs_cbuf = + ReleaseAndReadBuffer(scan->xs_cbuf, scan->xs_base.rel, blkno); + *call_again = false; + page = BufferGetPage(scan->xs_cbuf); + + LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE); + if (offnum >= FirstOffsetNumber && offnum < PageGetMaxOffsetNumber(page)) + { + ItemId iid = PageGetItemId(page, offnum); + + if (ItemIdHasStorage(iid)) + { + RobertTupleHeader td = (RobertTupleHeader) PageGetItem(page, iid); + Size len = ItemIdGetLength(iid); + + robert_slot_store(slot, td, len, reloid, blkno, offnum); + result = true; + } + } + LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK); + + return result; +} + +/* + * robert_scan_bitmap_next_block + * + * Just as in a sequential scan, we choose to copy the entire page. This is + * a more dubious strategy here, because it could turn out to be inefficient + * if we're only fetching a single tuple from the page. However, we don't + * want to do visibility checks while holding the buffer lock, since that + * might involve visiting an arbitrarily large number of undo buffers, so it's + * not clear how to do better. + * + * XXX. It's possible that it would pay to do the visibility checks here + * rather than leaving all that work until robert_scan_bitmap_next_tuple + * is called; it would likely improve instruction cache locality. It would + * also be more complex, so for right now we don't. + */ +bool +robert_scan_bitmap_next_block(TableScanDesc sscan, TBMIterateResult *tbmres) +{ + RobertScanDesc scan = (RobertScanDesc) sscan; + BlockNumber blkno = tbmres->blockno; + + /* + * Ignore any entries past our notion of where the relation ends; it may + * have been extended, but any new entries won't be visible to us. + */ + if (blkno >= scan->rrs_nblocks) + return false; + + /* Read the new block. */ + robert_scan_getnextblock(scan, NoMovementScanDirection, blkno); + + /* Reset the tuple index; will be ignored if page is lossy. */ + scan->rrs_tupindex = 0; + + /* False if block has no tuples; otherwise, true. */ + return (scan->rrs_state != ROBERT_SCAN_BLOCK_DONE); +} + +/* + * robert_scan_bitmap_next_tuple + */ +bool +robert_scan_bitmap_next_tuple(TableScanDesc sscan, + TBMIterateResult *tbmres, + TupleTableSlot *slot) +{ + RobertScanDesc scan = (RobertScanDesc) sscan; + + if (tbmres->ntuples == 0) + { + /* Page is lossy; just try the next offset. */ + scan->rrs_coffset = OffsetNumberNext(scan->rrs_coffset); + if (scan->rrs_coffset == scan->rrs_lastoffset) + return false; + } + else + { + /* Page is exact; try next indicated offset. */ + if (scan->rrs_tupindex >= tbmres->ntuples) + return false; + scan->rrs_coffset = tbmres->offsets[scan->rrs_tupindex]; + scan->rrs_tupindex++; + } + + robert_scan_getnexttuple(scan, slot); + pgstat_count_heap_fetch(scan->rrs_base.rs_rd); + return true; +} + +bool +robert_scan_sample_next_block(TableScanDesc scan, + SampleScanState *scanstate) +{ + elog(NOTICE, "robert_scan_sample_next_block"); + return false; +} + +bool +robert_scan_sample_next_tuple(TableScanDesc scan, + SampleScanState *scanstate, + TupleTableSlot *slot) +{ + elog(NOTICE, "robert_scan_sample_next_tuple"); + return false; +} diff --git a/src/backend/access/robert/robert_slot.c b/src/backend/access/robert/robert_slot.c new file mode 100644 index 0000000000..bd87d9b51f --- /dev/null +++ b/src/backend/access/robert/robert_slot.c @@ -0,0 +1,492 @@ +/*------------------------------------------------------------------------- + * + * robert_slot.c + * Slots for storing Robert tuples. To facilitate inplace updates, we + * always copy the tuple rather than pointing to the original buffer, + * so this is like HeapTupleTableSlot, not BufferHeapTupleTableSlot, + * but with changes because of our differente on-disk tuple format. + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/robert/robert_slot.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/robert_slot.h" +#include "access/robert_xlog.h" +#include "access/undoaccess.h" +#include "access/undorecord.h" +#include "access/xact.h" +#include "storage/procarray.h" +#include "utils/snapmgr.h" + +extern PGDLLIMPORT const TupleTableSlotOps TTSOpsRobert; + +static void tts_robert_init(TupleTableSlot *slot); +static void tts_robert_release(TupleTableSlot *slot); +static void tts_robert_clear(TupleTableSlot *slot); +static void tts_robert_getsomeattrs(TupleTableSlot *slot, int natts); +static Datum tts_robert_getsysattr(TupleTableSlot *slot, int attnum, + bool *isnull); +static void tts_robert_materialize(TupleTableSlot *slot); +static void tts_robert_copyslot(TupleTableSlot *dstslot, + TupleTableSlot *srcslot); +static HeapTuple tts_robert_copy_heap_tuple(TupleTableSlot *slot); +static MinimalTuple tts_robert_copy_minimal_tuple(TupleTableSlot *slot); + +static bool robert_snapshot_test(Snapshot snapshot, FullTransactionId fxid, + CommandId cid, bool xmin_test, uint32 specToken); + +const TupleTableSlotOps TTSOpsRobert = { + .base_slot_size = sizeof(RobertTupleTableSlot), + .init = tts_robert_init, + .release = tts_robert_release, + .clear = tts_robert_clear, + .getsomeattrs = tts_robert_getsomeattrs, + .getsysattr = tts_robert_getsysattr, + .materialize = tts_robert_materialize, + .copyslot = tts_robert_copyslot, + .get_heap_tuple = NULL, + .get_minimal_tuple = NULL, + .copy_heap_tuple = tts_robert_copy_heap_tuple, + .copy_minimal_tuple = tts_robert_copy_minimal_tuple +}; + +/* + * robert_slot_callbacks + * + * We only use one kind of slot, so this is very simple. + */ +const TupleTableSlotOps * +robert_slot_callbacks(Relation relation) +{ + return &TTSOpsRobert; +} + +/* + * robert_slot_store + * + * Create a new robert tuple and store it in a robert slot. + * + * XXX. It would be nice to optimize this by reusing the previously-allocated + * chunk of memory, if there is one and if it's big enough. With the current + * design, we'll often free a chunk and then allocate a new chunk of about the + * same size. Note that if we did this, tts_robert_release might need to + * free any chunk that might be left lying around by tts_robert_clear. + */ +void +robert_slot_store(TupleTableSlot *slot, RobertTupleHeader td, + Size len, Oid tableOid, BlockNumber blkno, + OffsetNumber offset) +{ + RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot; + RobertTuple tuple; + + Assert(slot->tts_ops == &TTSOpsRobert); + tts_robert_clear(slot); + + tuple = MemoryContextAlloc(slot->tts_mcxt, ROBERTTUPLESIZE + len); + tuple->r_len = len; + tuple->r_data = (RobertTupleHeader) ((char *) tuple + ROBERTTUPLESIZE); + tuple->r_tableOid = tableOid; + ItemPointerSet(&tuple->r_self, blkno, offset); + memcpy(tuple->r_data, td, len); + + rslot->tuple = tuple; + ItemPointerSet(&slot->tts_tid, blkno, offset); + rslot->off = td->r_hoff; + slot->tts_flags &= ~TTS_FLAG_EMPTY; + slot->tts_flags |= TTS_FLAG_SHOULDFREE; +} + +/* + * robert_slot_store_visible + * + * Figure out which version of a tuple is visible and store that version + * into a robert slot; then, return true. If no version is visible, + * return false. + */ +bool +robert_slot_store_visible(TupleTableSlot *slot, RobertTupleHeader td, + Size len, Oid tableOid, BlockNumber blkno, + OffsetNumber offset, Snapshot snapshot, + bool item_is_dead) +{ + UndoRecPtr current_recptr; + UnpackedUndoRecord *current = NULL; + UnpackedUndoRecord *previous = NULL; + bool result = true; + UndoRecordFetchContext context; + + /* It's not necessarily aligned, so we use memcpy. */ + memcpy(¤t_recptr, &td->r_undoptr, sizeof(UndoRecPtr)); + + /* + * Loop until we find the oldest undo record whose effects are not + * visible to us. + */ + BeginUndoFetch(&context); + while (current_recptr != InvalidUndoRecPtr) + { + current = UndoFetchRecord(&context, current_recptr); + + /* + * XXX. If there's a specToken, we should be passing that here instead + * of passing 0. + */ + if (current == NULL || + robert_snapshot_test(snapshot, current->uur_fxid, current->uur_cid, + true, 0)) + break; + current_recptr = current->uur_prevundo; + if (previous != NULL) + UndoRecordRelease(previous); + previous = current; + current = NULL; + } + FinishUndoFetch(&context); + + /* + * Release current record, if any. This is the newest undo record whose + * effects are visible to our snapshot; we don't need it for anything. + */ + if (current != NULL) + { + UndoRecordRelease(current); + current = NULL; + } + + /* Core visibility logic. */ + /* XXX. This probably ought to store some XID information in the slot. */ + if (previous == NULL) + { + /* No undo records, or all changes visible to us. */ + if (item_is_dead) + result = false; + else + robert_slot_store(slot, td, len, tableOid, blkno, offset); + } + else + { + switch (previous->uur_type) + { + case ROBERT_UNDO_INSERT: + /* Insert is not visible. Can't see tuple. */ + result = false; + break; + case ROBERT_UNDO_DELETE: /* XXX and non-in-place update */ + /* Delete is not visible. Can see current tuple. */ + robert_slot_store(slot, td, len, tableOid, blkno, offset); + break; +#if 0 + case ROBERT_UNDO_UPDATE_INPLACE: + /* + * In-place update is not visible. Can see tuple from undo + * record. + */ + /* XXX */ + robert_slot_store(slot, + record->uur_tuple.data, + record->uur_tuple.len, + tableOid, blkno, offset); +#endif + } + + /* Done with previous record. */ + UndoRecordRelease(previous); + previous = NULL; + } + + /* If not visible, we are all done. */ + if (!result) + return false; + + /* XXX other stuff like serializability */ + + return true; +} + +/* + * tts_robert_init + * + * No special initialization is required when creating a slot. + */ +static void +tts_robert_init(TupleTableSlot *slot) +{ + /* nothing */ +} + +/* + * tts_robert_release + * + * No special cleanup is required when dropping a slot. + */ +static void +tts_robert_release(TupleTableSlot *slot) +{ + /* nothing */ +} + +/* + * tts_robert_clear + */ +static void +tts_robert_clear(TupleTableSlot *slot) +{ + RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot; + + /* Free memory for the tuple if appropriate. */ + if (TTS_SHOULDFREE(slot)) + { + pfree(rslot->tuple); + slot->tts_flags &= ~TTS_FLAG_SHOULDFREE; + } + + slot->tts_nvalid = 0; + slot->tts_flags |= TTS_FLAG_EMPTY; + ItemPointerSetInvalid(&slot->tts_tid); + rslot->tuple = NULL; + rslot->off = 0; +} + +/* + * tts_robert_getsomeattrs + */ +static void +tts_robert_getsomeattrs(TupleTableSlot *slot, int natts) +{ + RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot; + + robert_deform_tuple(slot->tts_tupleDescriptor, rslot->tuple, + slot->tts_values, slot->tts_isnull, natts, + slot->tts_nvalid, &rslot->off); + slot->tts_nvalid = natts; +} + +/* + * tts_robert_getsysattr + * + * slot_getsysattr handles tableoid and ctid, so this function need only + * handle requests for cmin, cmax, xmin, and xmax. In contrast to the heap, + * we store that information in the undo log, not the tuple. + */ +static Datum +tts_robert_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull) +{ + /* + * XXX. It's necessary to do undo log lookups to get this information, + * and in some cases we may not have it at all. We should cache whatever + * we find out in the slot, so that if multiple attributes are requested + * (or the same ones are requested more than once?) we don't repeat the + * lookups. + */ + return (Datum) 0; +} + +/* + * tts_robert_materialize + * + * Make the contents of this slot independent of any external resources. + * The only thing we need to worry about is the possibility that entries in + * tts_values might point to data in some other memory context. + */ +static void +tts_robert_materialize(TupleTableSlot *slot) +{ + RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot; + MemoryContext oldcontext; + + if (TTS_SHOULDFREE(slot)) + return; + + oldcontext = MemoryContextSwitchTo(slot->tts_mcxt); + rslot->tuple = robert_form_tuple(slot->tts_tupleDescriptor, + slot->tts_values, + slot->tts_isnull); + MemoryContextSwitchTo(oldcontext); + + rslot->off = rslot->tuple->r_data->r_hoff; + slot->tts_flags |= TTS_FLAG_SHOULDFREE; + slot->tts_nvalid = 0; +} + +/* + * tts_robert_copyslot + * + * It's not sufficient to just copy the tts_values and tts_isnull arrays + * from the source slot, because any pass-by-reference datums in the source + * slot will be stored in that slot's memory context, not ours. So, construct + * and store a tuple built from those arrays. + */ +static void +tts_robert_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot) +{ + RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) dstslot; + MemoryContext oldcontext; + + tts_robert_clear(dstslot); + slot_getallattrs(srcslot); + + oldcontext = MemoryContextSwitchTo(dstslot->tts_mcxt); + rslot->tuple = robert_form_tuple(srcslot->tts_tupleDescriptor, + srcslot->tts_values, + srcslot->tts_isnull); + MemoryContextSwitchTo(oldcontext); + + rslot->off = rslot->tuple->r_data->r_hoff; + dstslot->tts_flags = (dstslot->tts_flags | TTS_FLAG_SHOULDFREE) + & ~TTS_FLAG_EMPTY; +} + +/* + * tts_robert_copy_heap_tuple + * + * Build a heap tuple representing the contents of this slot. + */ +static HeapTuple +tts_robert_copy_heap_tuple(TupleTableSlot *slot) +{ + slot_getallattrs(slot); + + return heap_form_tuple(slot->tts_tupleDescriptor, + slot->tts_values, slot->tts_isnull); +} + +/* + * tts_robert_copy_minimal_tuple + * + * Build a minimal tuple representing the contents of this slot. + */ +static MinimalTuple +tts_robert_copy_minimal_tuple(TupleTableSlot *slot) +{ + slot_getallattrs(slot); + + return heap_form_minimal_tuple(slot->tts_tupleDescriptor, + slot->tts_values, slot->tts_isnull); +} + +/* + * robert_snapshot_test + * + * Test whether the effects of a change made by a given XID/CID should be + * visible to a process using a certain snapshot. + * + * 'snapshot' is the snapshot to be used for the visibility test. + * + * 'xid' and 'cid' are the values for the transaction that made the change. + * + * 'xmin_test' is true if we are testing a tuple's xmin and false if we are + * testing 'xmax'. For many snapshot types this doesn't matter, but for + * SNAPSHOT_DIRTY and SNAPSHOT_NON_VACUUMABLE it does. + * + * 'specToken' is only used for SNAPSHOT_DIRTY. + * + * Perhaps this function should be given a more generic name and moved to + * a location where anyone could use it. + */ +static bool +robert_snapshot_test(Snapshot snapshot, FullTransactionId fxid, CommandId cid, + bool xmin_test, uint32 specToken) +{ + TransactionId xid = XidFromFullTransactionId(fxid); + + switch (snapshot->snapshot_type) + { + case SNAPSHOT_MVCC: + case SNAPSHOT_HISTORIC_MVCC: + /* + * For an MVCC snapshot, we should see effects of the current + * transaction unless they are from a newer command. We should + * see other transactions if they are committed and not in our + * MVCC snapshot. + */ + if (TransactionIdIsCurrentTransactionId(xid)) + { + if (snapshot->curcid <= cid) + return false; + } + else if (XidInMVCCSnapshot(xid, snapshot)) + return false; + else if (!TransactionIdDidCommit(xid)) + return false; + break; + + case SNAPSHOT_SELF: + /* + * We should always see the effects of our own transaction, and + * we should see other transactions if they committed. + */ + if (!TransactionIdIsCurrentTransactionId(xid)) + { + if (TransactionIdIsInProgress(xid)) + return false; + else if (!TransactionIdDidCommit(xid)) + return false; + } + break; + + case SNAPSHOT_ANY: + case SNAPSHOT_TOAST: + /* + * We should see everything. + */ + break; + + case SNAPSHOT_DIRTY: + /* + * We should see everything except for transactions that are + * aborted. However, if we see an in progress transaction, we + * need to stash the XID and possibly the specToken in the + * snapshot, due to the awful, hacky way that SnapshotDirty works. + */ + if (!TransactionIdIsCurrentTransactionId(xid)) + { + if (TransactionIdIsInProgress(xid)) + { + if (xmin_test) + { + snapshot->xmin = xid; + snapshot->speculativeToken = specToken; + } + else + snapshot->xmax = xid; + } + else if (TransactionIdDidCommit(xid)) + return false; + } + break; + + case SNAPSHOT_NON_VACUUMABLE: + /* + * This type of snapshot has assymmetric rules for xmin and xmax. + * For xmin, we should see everything except for aborted + * transactions. For xmax, we should see only committed XIDs + * that precede snapshot->xmin. + */ + if (xmin_test) + { + if (!TransactionIdIsCurrentTransactionId(xid) && + !TransactionIdIsInProgress(xid) && + !TransactionIdDidCommit(xid)) + return false; + } + else + { + if (TransactionIdIsCurrentTransactionId(xid) || + TransactionIdIsInProgress(xid) || + !TransactionIdDidCommit(xid) || + !TransactionIdPrecedes(xid, snapshot->xmin)) + return false; + } + break; + } + + return true; +} diff --git a/src/backend/access/robert/robert_tuple.c b/src/backend/access/robert/robert_tuple.c new file mode 100644 index 0000000000..a6c73d8c8f --- /dev/null +++ b/src/backend/access/robert/robert_tuple.c @@ -0,0 +1,771 @@ +/*------------------------------------------------------------------------- + * + * robert_tuple.c + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/robert_tuple.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heaptoast.h" +#include "access/robert_tuple.h" +#include "access/toast_helper.h" +#include "executor/tuptable.h" +#include "utils/expandeddatum.h" + +static void robert_fill_padding(RobertTupleHeader td, char **data, + char typalign); +static int robert_tuple_bitmap_size(TupleDesc tupleDesc, bool *isnull); + +/* + * robert_compute_data_size + * + * Compute the amount of space needed to store a robert tuple's data. The + * caller must pass the amount of space required for the tuple header and + * null bitmap via the 'hoff' argument; the returned value is the total space + * required to store the tuple. + */ +Size +robert_compute_data_size(TupleDesc tupleDesc, Datum *values, bool *isnull, + uint8 hoff) +{ + Size data_length = hoff; + int i; + int numberOfAttributes = tupleDesc->natts; + + Assert(numberOfAttributes <= MaxTupleAttributeNumber); + + for (i = 0; i < numberOfAttributes; i++) + { + Datum datum; + Form_pg_attribute att; + + if (isnull[i]) + continue; + + datum = values[i]; + att = TupleDescAttr(tupleDesc, i); + + if (att->attbyval) + { + /* we store attbyval attributes without alignment padding */ + data_length += att->attlen; + } + else if (att->attlen == -1) + { + Pointer val = DatumGetPointer(datum); + + if (att->attstorage != 'p' && VARATT_CAN_MAKE_SHORT(val)) + data_length += VARATT_CONVERTED_SHORT_SIZE(val); + else if (VARATT_IS_EXTERNAL(val)) + { + if (VARATT_IS_EXTERNAL_EXPANDED(val)) + { + /* + * we want to flatten the expanded value so that the + * constructed tuple doesn't depend on it + */ + data_length = + att_align_nominal(data_length, att->attalign); + data_length += EOH_get_flat_size(DatumGetEOHP(datum)); + } + else + data_length += VARSIZE_EXTERNAL(datum); + } + else if (VARATT_IS_SHORT(val)) + data_length += VARSIZE_SHORT(datum); + else + { + data_length = att_align_nominal(data_length, att->attalign); + data_length += VARSIZE(datum); + } + } + else + { + /* fixed-length passed by reference, and cstrings */ + data_length = att_align_nominal(data_length, att->attalign); + data_length = att_addlength_datum(data_length, att->attlen, datum); + } + } + + return data_length; +} + +/* + * robert_fill_null_bitmap + * + * We start out by setting all the bits, and then clear those that correspond + * to a null attribute. + */ +void +robert_fill_null_bitmap(bits8 *bits, int entries_needed, bool *isnull) +{ + int i; + + memset(bits, 0xff, BITMAPLEN(entries_needed)); + + for (i = 0; i < entries_needed; ++i) + if (isnull[i]) + bits[i / BITS_PER_BYTE] &= ~(1 << (i % BITS_PER_BYTE)); +} + +/* + * robert_fill_tuple + * + * Fill in the contents of a robert tuple from values/isnull arrays. On + * entry, the tuple header should already have been initialized, especially + * r_hoff. On exit, the tuple data area will have been filled in, and the + * r_flags field may also be updated. + */ +void +robert_fill_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull, + RobertTupleHeader td, Size len) +{ + int i; + int numberOfAttributes = tupleDesc->natts; + char *data = ((char *) td) + td->r_hoff; + + td->r_flags &= ~ROBERT_HASEXTERNAL; + + for (i = 0; i < numberOfAttributes; i++) + { + Form_pg_attribute att = TupleDescAttr(tupleDesc, i); + Datum datum = values[i]; + Size data_length; + + if (isnull[i]) + continue; + + if (att->attbyval) + { + switch (att->attlen) + { + case sizeof(char): + { + char c = DatumGetChar(datum); + + memcpy(data, &c, sizeof(char)); + break; + } + case sizeof(int16): + { + int16 s = DatumGetInt16(datum); + + memcpy(data, &s, sizeof(int16)); + break; + } + case sizeof(int32): + { + int32 i = DatumGetInt32(datum); + + memcpy(data, &i, sizeof(int32)); + break; + } + case sizeof(Datum): + memcpy(data, &datum, sizeof(Datum)); + break; + default: + elog(ERROR, "unsupported byval length: %d", att->attlen); + break; + } + + data_length = att->attlen; + } + else if (att->attlen == -1) + { + /* varlena */ + Pointer val = DatumGetPointer(datum); + + if (VARATT_IS_EXTERNAL(val)) + { + if (VARATT_IS_EXTERNAL_EXPANDED(val)) + { + /* + * we want to flatten the expanded value so that the + * constructed tuple doesn't depend on it + */ + ExpandedObjectHeader *eoh = DatumGetEOHP(datum); + + robert_fill_padding(td, &data, att->attalign); + data_length = EOH_get_flat_size(eoh); + EOH_flatten_into(eoh, data, data_length); + } + else + { + td->r_flags |= ROBERT_HASEXTERNAL; + /* no alignment, since it's short by definition */ + data_length = VARSIZE_EXTERNAL(val); + memcpy(data, val, data_length); + } + } + else if (VARATT_IS_SHORT(val)) + { + /* no alignment for short varlenas */ + data_length = VARSIZE_SHORT(val); + memcpy(data, val, data_length); + } + else if (att->attlen == -1 && att->attstorage != 'p' && + VARATT_CAN_MAKE_SHORT(val)) + { + /* convert to short varlena -- no alignment */ + data_length = VARATT_CONVERTED_SHORT_SIZE(val); + SET_VARSIZE_SHORT(data, data_length); + memcpy(data + 1, VARDATA(val), data_length - 1); + } + else + { + /* full 4-byte header varlena */ + robert_fill_padding(td, &data, att->attalign); + data_length = VARSIZE(val); + memcpy(data, val, data_length); + } + } + else if (att->attlen == -2) + { + /* cstring ... never needs alignment */ + Assert(att->attalign == 'c'); + data_length = strlen(DatumGetCString(datum)) + 1; + memcpy(data, DatumGetPointer(datum), data_length); + } + else + { + /* fixed-length pass-by-reference */ + robert_fill_padding(td, &data, att->attalign); + Assert(att->attlen > 0); + data_length = att->attlen; + memcpy(data, DatumGetPointer(datum), data_length); + } + + data += data_length; + } + + Assert(data == ((char *) td) + len); +} + +/* + * robert_form_tuple + * + * Construct a tuple from the given values[] and isnull[] arrays, which are of + * the length indicated by tupleDesc->natts. + * + * The result is allocated in the current memory context. + */ +RobertTuple +robert_form_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull) +{ + RobertTuple tuple; /* return tuple */ + RobertTupleHeader td; + Size len; + uint8 hoff; + int nullBitmapEntriesNeeded; + + /* + * Compute required space. Note that, unlike the heap, there is no + * padding between the end of the null bitmap and the beginning of the + * data. + */ + nullBitmapEntriesNeeded = robert_tuple_bitmap_size(tupleDesc, isnull); + hoff = offsetof(RobertTupleHeaderData, r_bits) + + BITMAPLEN(nullBitmapEntriesNeeded); + len = robert_compute_data_size(tupleDesc, values, isnull, hoff); + + /* Allocate space. */ + tuple = palloc(ROBERTTUPLESIZE + len); + td = (RobertTupleHeader) ((char *) tuple + ROBERTTUPLESIZE); + + /* Fill in control information. */ + tuple->r_len = len; + tuple->r_data = td; + tuple->r_tableOid = InvalidOid; + ItemPointerSetInvalid(&tuple->r_self); + + /* Fill in tuple header information. */ + td->r_undoptr = InvalidUndoRecPtr; + Assert(tupleDesc->natts <= ROBERT_NATTS_MASK); + td->r_flags = tupleDesc->natts; + td->r_hoff = hoff; + + /* Fill null bitmap. */ + robert_fill_null_bitmap(td->r_bits, nullBitmapEntriesNeeded, isnull); + + /* Fill tuple data, maybe adjusting flags. */ + robert_fill_tuple(tupleDesc, values, isnull, td, len); + + return tuple; +} + +/* + * robert_deform_tuple + * + * Partially or completely deform a robert tuple using the provided tupleDesc. + * Attribute numbers greater than or equal to oldnatts and less than natts + * are deformed. *offp must point to the starting location of the first + * attribute to be deformed. + * + * The idea is that callers will initially pass oldnatts = 0 and *offp = + * tuple->r_hoff. If, later, more columns are to be deformed, the previous + * value of natts should be passed as oldnatts, and *offp should have the value + * to which it was set by the previous call. + */ +void +robert_deform_tuple(TupleDesc tupleDesc, RobertTuple tuple, Datum *values, + bool *isnull, int natts, int oldnatts, uint32 *offp) +{ + RobertTupleHeader tup = tuple->r_data; + int attnum; + uint32 off = *offp; + bits8 *bp = tup->r_bits; + int null_limit; + + /* We can only fetch as many attributes as the tuple has. */ + natts = Min((tuple->r_data->r_flags & ROBERT_NATTS_MASK), natts); + + /* Attributes beyond the end of the null bitmap are not null. */ + null_limit = (tup->r_hoff - SizeofRobertTupleHeader) * BITS_PER_BYTE; + + for (attnum = oldnatts; attnum < natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupleDesc, attnum); + Datum *value = &values[attnum]; + char *tp = (char *) tup + off; + + if (attnum < null_limit && att_isnull(attnum, bp)) + { + *value = (Datum) 0; + isnull[attnum] = true; + continue; + } + + isnull[attnum] = false; + + if (att->attbyval) + { + switch (att->attlen) + { + case sizeof(char): + { + char c; + + memcpy(&c, tp, sizeof(char)); + *value = CharGetDatum(c); + break; + } + case sizeof(int16): + { + int16 s; + + memcpy(&s, tp, sizeof(int16)); + *value = Int16GetDatum(s); + break; + } + case sizeof(int32): + { + int32 i; + + memcpy(&i, tp, sizeof(int32)); + *value = Int32GetDatum(i); + break; + } + case sizeof(Datum): + memcpy(value, tp, sizeof(Datum)); + break; + default: + elog(ERROR, "unsupported byval length: %d", att->attlen); + break; + } + + off += att->attlen; + } + else if (att->attlen == -1) + { + if (VARATT_NOT_PAD_BYTE(tp)) + { + /* potentially unaligned varlena */ + *value = PointerGetDatum(tp); + off += VARSIZE_ANY(*value); + } + else + { + /* we have at least one pad byte, so must be aligned varlena */ + off = att_align_nominal(off, att->attalign); + tp = (char *) tup + off; + *value = PointerGetDatum(tp); + off += VARSIZE(*value); + } + } + else if (att->attlen == -2) + { + /* cstring */ + Assert(att->attalign == 'c'); + *value = PointerGetDatum(tp); + off += strlen(tp); + } + else + { + /* fixed-length pass-by-reference; skip any pad bytes */ + Assert(att->attlen > 0); + off = att_align_nominal(off, att->attalign); + tp = (char *) tup + off; + *value = PointerGetDatum(tp); + off += att->attlen; + } + } + + /* Update caller-provided offset. */ + *offp = off; +} + +/* + * robert_toast_tuple + * + * Compress tuple attributes or store them externally as necessary to get the + * size of the tuple down to something acceptable. + * + * This function uses MaxHeapAttributeNumber as the maximum number of columns + * in the tuple to be inserted. That limit is actually sensible because our + * storage format and heap have similar constraints; however, even if they + * didn't, this is used by other parts of the system as if it were a global + * limit rather than an AM-specific property. Perhaps that can be cleaned up + * someday. + */ +RobertTuple +robert_toast_tuple(Relation rel, TupleTableSlot *slot, int options) +{ + ToastTupleContext ttc; + TupleDesc tupleDesc = slot->tts_tupleDescriptor; + int natts = tupleDesc->natts; + Datum toast_values[MaxHeapAttributeNumber]; + ToastAttrInfo toast_attr[MaxHeapAttributeNumber]; + uint8 hoff; + Size maxLen; + bool done; + bool for_compression = true; + RobertTuple result; + + /* Get all tuple attributes. */ + slot_getallattrs(slot); + + /* + * If this is neither a relation nor a materialized view, it should not + * require any TOAST work; it's presumably a TOAST table. We can save + * some overhead by forming and returning the necessary tuple at once. + */ + if (rel->rd_rel->relkind != RELKIND_RELATION && + rel->rd_rel->relkind != RELKIND_MATVIEW) + { + RobertTuple tuple; + + tuple = robert_form_tuple(tupleDesc, slot->tts_values, + slot->tts_isnull); + Assert((tuple->r_data->r_flags & ROBERT_HASEXTERNAL) == 0); + return tuple; + } + + /* + * We're going to scribble on the values array, so copy it into our + * scratch space. The tts_isnull array will not be changed, so we don't + * need to copy it. + */ + Assert(natts <= MaxHeapAttributeNumber); + memcpy(toast_values, slot->tts_values, sizeof(Datum) * natts); + + /* Prepare for toasting. */ + ttc.ttc_rel = rel; + ttc.ttc_values = toast_values; + ttc.ttc_isnull = slot->tts_isnull; + ttc.ttc_oldvalues = NULL; + ttc.ttc_oldisnull = NULL; + ttc.ttc_toastrel = NULL; + ttc.ttc_toastslot = NULL; + ttc.ttc_attr = toast_attr; + toast_tuple_init(&ttc); + + /* Compute header overhead. */ + hoff = offsetof(RobertTupleHeaderData, r_bits); + if ((ttc.ttc_flags & TOAST_HAS_NULLS) != 0) + { + int nullBitmapEntriesNeeded; + + nullBitmapEntriesNeeded = + robert_tuple_bitmap_size(slot->tts_tupleDescriptor, + slot->tts_isnull); + hoff += BITMAPLEN(nullBitmapEntriesNeeded); + } + + /* + * Compute maximum tuple length. It's not really correct to use + * TOAST_TUPLE_TARGET here, because that's a heap-specific property, but + * it's also not exactly clear what value would be better. + */ + maxLen = RelationGetToastTupleTarget(rel, TOAST_TUPLE_TARGET); + + /* + * Compress attributes with attstorage 'x', and store large attributes + * with attstorage 'x' or 'e' externally. If that isn't enough, make + * additional attributes with attstorage 'x' or 'e' external. (However, + * if there's no TOAST table, then we can't make anything external.) + */ + while (1) + { + int biggest_attno; + + /* See whether the tuple will be too large. */ + done = maxLen >= robert_compute_data_size(tupleDesc, toast_values, + slot->tts_isnull, hoff); + if (done) + break; + + /* + * Find the largest attribute with attstorage 'x' or 'e'. If + * for_compression is true, it must also be potentially compressible. + */ + biggest_attno = + toast_tuple_find_biggest_attribute(&ttc, for_compression, false); + if (biggest_attno < 0) + { + /* + * No suitable attribute was found. If we were looking for + * compressible attributes, we can still try looking for + * non-compresesable attributes, provided that we have a TOAST + * table to which to push them. + */ + if (!for_compression || !OidIsValid(rel->rd_rel->reltoastrelid)) + break; + for_compression = false; + break; + } + + if (for_compression) + { + /* attempt to compress it inline if it has attstorage 'x' */ + if (TupleDescAttr(tupleDesc, biggest_attno)->attstorage == 'x') + toast_tuple_try_compression(&ttc, biggest_attno); + else + { + /* attstorage 'e', so flag incompressible */ + toast_attr[biggest_attno].tai_colflags |= + TOASTCOL_INCOMPRESSIBLE; + } + + /* + * If it's really big, push it out to the TOAST table immediately. + * This avoids uselessly compressing other fileds in the common + * case where we have one long field and several short ones. + */ + if (toast_attr[biggest_attno].tai_size > maxLen && + OidIsValid(rel->rd_rel->reltoastrelid)) + toast_tuple_externalize(&ttc, biggest_attno, options, + TOAST_MAX_CHUNK_SIZE); + } + else + { + /* + * XXX. We really should not be using TOAST_MAX_CHUNK_SIZE here, + * since that is a heap-specific value. + */ + toast_tuple_externalize(&ttc, biggest_attno, options, + TOAST_MAX_CHUNK_SIZE); + } + } + + /* Try compressing attributes with attstorage 'm.' */ + while (!done) + { + int biggest_attno; + + biggest_attno = toast_tuple_find_biggest_attribute(&ttc, true, true); + if (biggest_attno < 0) + break; + toast_tuple_try_compression(&ttc, biggest_attno); + + /* See whether we've sufficiently shrunk the tuple. */ + done = maxLen >= robert_compute_data_size(tupleDesc, toast_values, + slot->tts_isnull, hoff); + } + + /* + * If we're not yet done and if there is a TOAST table, we can try storing + * attributes with attstorage 'm' externally. + */ + if (OidIsValid(rel->rd_rel->reltoastrelid)) + { + /* Only do this if it's *really* necessary. */ + maxLen = TOAST_TUPLE_TARGET_MAIN; + + while (!done) + { + int biggest_attno; + + biggest_attno = + toast_tuple_find_biggest_attribute(&ttc, false, true); + if (biggest_attno < 0) + break; + toast_tuple_try_compression(&ttc, biggest_attno); + + /* See whether we've sufficiently shrunk the tuple. */ + done = maxLen >= robert_compute_data_size(tupleDesc, toast_values, + slot->tts_isnull, hoff); + } + } + + /* + * XXX. If the source slot is a robert tuple and no changes got made, we + * can optimize this. + */ + result = robert_form_tuple(tupleDesc, toast_values, slot->tts_isnull); + + toast_tuple_cleanup(&ttc, true); + + return result; +} + +/* + * robert_print_tuple + * + * Dump a tuple as a printable string. The caller may pfree the returned + * string, if desired. + * + * If the tupleDesc is passed as NULL, the data portion of the tuple will be + * dumped as one long string of bytes. Otherwise, the tupleDesc will be + * used to deform the tuple, and the bytes for each attribute will be dumped + * individually. + * + * This is intended for debugging purposes. Where it's not excessively + * burdensome to do so, we try to guard against the possibility that this + * function might be passed a corrupt tuple; instead, we try to produce + * some kind of meaningful text representation and let the user sort it out. + * This is not perfect; a defective tuple can certainly cause a crash here, + * especially if tupleDesc is not NULL, but it helps. + */ +char * +robert_print_tuple(RobertTuple tuple, TupleDesc tupleDesc) +{ + RobertTupleHeader td = tuple->r_data; + unsigned char *s = (unsigned char *) td; + uint32 offp = 0; + StringInfoData buf; + + initStringInfo(&buf); + + /* Decode tuple header. */ + if (tuple->r_len >= SizeofRobertTupleHeader) + { + UndoLogNumber ulogno = UndoRecPtrGetLogNo(td->r_undoptr); + UndoLogOffset uoffset = UndoRecPtrGetOffset(td->r_undoptr); + + appendStringInfo(&buf, "undo:%06X.%010" INT64_MODIFIER + "X flags:%04X hoff:%u", ulogno, uoffset, + td->r_flags, td->r_hoff); + offp = SizeofRobertTupleHeader; + } + + /* Decode any null bitmap. */ + if (tuple->r_len >= td->r_hoff && td->r_hoff > SizeofRobertTupleHeader) + { + Size nullbytes = td->r_hoff - SizeofRobertTupleHeader; + Size i; + + appendStringInfoString(&buf, " nulls:"); + for (i = 0; i < nullbytes; ++i) + appendStringInfo(&buf, "%02x", s[i]); + offp = td->r_hoff; + } + + /* Decode attributes, if a tupleDesc was provided. */ + if (tupleDesc != NULL) + { + Datum *values; + bool *isnull; + int attnum; + + values = palloc(tupleDesc->natts * sizeof(Datum)); + isnull = palloc(tupleDesc->natts * sizeof(bool)); + + /* + * Normally, we want to deform all attributes at once for efficiency, + * but here we deform the one by one so that we can learn the byte + * position of each attribute in the tuple. + */ + for (attnum = 0; attnum < tupleDesc->natts; ++attnum) + { + uint32 prev_offp = offp; + Size i; + + robert_deform_tuple(tupleDesc, tuple, values, isnull, attnum + 1, + attnum, &offp); + if (isnull[attnum]) + continue; + appendStringInfo(&buf, " %d(%d):", attnum + 1, offp - prev_offp); + for (i = prev_offp; i < offp; ++i) + appendStringInfo(&buf, "%02x", s[i]); + } + + pfree(values); + pfree(isnull); + } + + /* Dump any remaining bytes. Unexpected if a tupleDesc was specified. */ + if (offp < tuple->r_len) + { + Size i; + + appendStringInfoString(&buf, " rest:"); + for (i = offp; i < tuple->r_len; ++i) + appendStringInfo(&buf, "%02x", s[i]); + } + + return buf.data; +} + +/* + * robert_insert_padding + * + * Insert enough padding into a tuple so that the next field will start + * aligned on an appropriate boundary. Update *data to point to the first + * byte following the inserted padding bytes. + */ +static void +robert_fill_padding(RobertTupleHeader td, char **data, char typalign) +{ + char *start = (char *) td; + int curbytes = *data - start; + int padbytes = att_align_nominal(curbytes, typalign) - curbytes; + + memset(*data, 0, padbytes); + *data += padbytes; +} + +/* + * robert_tuple_bitmap_size + * + * Work out the number of entries that we'll need for the null bitmap. Note + * that the return value is the number of entries, not the number of bytes. + */ +static int +robert_tuple_bitmap_size(TupleDesc tupleDesc, bool *isnull) +{ + int nullBitmapEntriesNeeded = 0; + int i; + + /* + * We'll only store enough bytes in the null bitmap to represent the nulls + * actually present in the tuple, so we need to find the last null actually + * present. + */ + for (i = tupleDesc->natts; i >= 0; i--) + { + if (isnull[i]) + { + nullBitmapEntriesNeeded = i + 1; + break; + } + } + + return nullBitmapEntriesNeeded; +} diff --git a/src/backend/access/robert/robert_xlog.c b/src/backend/access/robert/robert_xlog.c new file mode 100644 index 0000000000..2f3c54ab85 --- /dev/null +++ b/src/backend/access/robert/robert_xlog.c @@ -0,0 +1,138 @@ +/* + * robert_xlog.c + */ + +#include "postgres.h" + +#include "access/relation.h" +#include "access/robert_xlog.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "utils/rel.h" + +static void robert_undo_buffer(Buffer buf, int nrecords, UndoRecInfo *records); + +void +robert_redo(XLogReaderState *record) +{ +} + +/* + * robert_undo + * + * Toplevel undo handler for this AM. + */ +void +robert_undo(int nrecords, UndoRecInfo *records) +{ + Relation rel = NULL; + int i, + j; + + Assert(nrecords > 0); + + for (i = 0; i < nrecords; i = j) + { + UnpackedUndoRecord *record = records[i].uur; + Buffer buf; + + /* + * If this undo record is for the same relation as the previous undo + * record, we just keep the same relation open. If it's for a + * different relation, close the old one and open the new one. + * + * XXX. It might be better to keep the lock on the old relation if + * there's more undo pending for that relation in this transaction, + * to avoid deadlock risk. But how would we arrange to close the + * relation at the end? Also, if this is the last chunk of undo then + * there shouldn't be any more records for this relation, so it would + * probably be better to release the lock right away. And also, if + * we're already holding on to tons of locks, it might be better to + * drop some of them to free up space in the lock table so we don't + * just fail. For now, do the easy thing. + */ + if (rel == NULL || RelationGetRelid(rel) != record->uur_reloid) + { + elog(LOG, "robert_undo: select relation %u", record->uur_reloid); + if (rel != NULL) + relation_close(rel, RowExclusiveLock); + rel = relation_open(record->uur_reloid, RowExclusiveLock); + } + + /* Figure out where the records for this block stop. */ + for (j = i + 1; j < nrecords; ++j) + { + UnpackedUndoRecord *otherrec = records[j].uur; + + /* + * XXX. This should also check uur_dbid, but right now that doesn't + * work properly. + */ + if (record->uur_reloid != otherrec->uur_reloid || + record->uur_fork != otherrec->uur_fork || + record->uur_block != otherrec->uur_block) + break; + } + + /* Read the target block. */ + elog(LOG, "robert_undo: block %u records start at %d, end at %d", + record->uur_block, i, j); + buf = ReadBuffer(rel, record->uur_block); + + /* + * Process all records for the target block. This function also + * releases the pin. + */ + robert_undo_buffer(buf, j - i, records + i); + } + + /* Clean up. */ + if (rel != NULL) + relation_close(rel, RowExclusiveLock); +} + +/* + * robert_undo_buffer + * + * Apply a group of undo actions that all relate to a particular buffer. + * On entry, the buffer should be pinned; this function will release the + * pin before returning. + */ +static void +robert_undo_buffer(Buffer buf, int nrecords, UndoRecInfo *records) +{ + Page page = BufferGetPage(buf); + int i; + + /* Take an exclusive lock on the buffer. */ + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + + /* + * XXX. This is an obvious crock that only works for inserts. For a + * an in-place update, we'd need to fetch the old tuple from undo and + * put it back. For a non-in-place update or delete, we only need to + * remove the delete-mark unless the page has been pruned. + */ + for (i = 0; i < nrecords; ++i) + { + UnpackedUndoRecord *record = records[i].uur; + ItemId iid = PageGetItemId((PageHeader) page, record->uur_offset); + + if (record->uur_type == ROBERT_UNDO_INSERT) + ItemIdSetDead(iid); + else if (record->uur_type == ROBERT_UNDO_DELETE) + ItemIdSetNormal(iid, ItemIdGetOffset(iid), + ItemIdGetLength(iid)); + else + elog(NOTICE, "robert_undo: unknown type %d", record->uur_type); + } + + /* All done. Note that caller expects us to drop the pin. */ + UnlockReleaseBuffer(buf); +} + +void +robert_undo_desc(StringInfo buf, UnpackedUndoRecord *record) +{ + elog(LOG, "robert_undo_desc"); +} diff --git a/src/backend/access/robert/robertam.c b/src/backend/access/robert/robertam.c new file mode 100644 index 0000000000..49045cd0ad --- /dev/null +++ b/src/backend/access/robert/robertam.c @@ -0,0 +1,202 @@ +/* + * robertam.c + */ + +#include "postgres.h" + +#include "access/heaptoast.h" +#include "access/robertam.h" +#include "access/robert_scan.h" +#include "access/robert_slot.h" +#include "utils/builtins.h" + +extern Datum robert_tableam_handler(PG_FUNCTION_ARGS); + +static const TableAmRoutine robertam_methods = { + .type = T_TableAmRoutine, + + .slot_callbacks = robert_slot_callbacks, + + .scan_begin = robert_scan_begin, + .scan_end = robert_scan_end, + .scan_rescan = robert_scan_rescan, + .scan_getnextslot = robert_scan_getnextslot, + + .parallelscan_estimate = table_block_parallelscan_estimate, + .parallelscan_initialize = table_block_parallelscan_initialize, + .parallelscan_reinitialize = table_block_parallelscan_reinitialize, + + .index_fetch_begin = robert_index_fetch_begin, + .index_fetch_reset = robert_index_fetch_reset, + .index_fetch_end = robert_index_fetch_end, + .index_fetch_tuple = robert_index_fetch_tuple, + + .tuple_fetch_row_version = robert_tuple_fetch_row_version, + .tuple_tid_valid = robert_tuple_tid_valid, + .tuple_get_latest_tid = robert_tuple_get_latest_tid, + .tuple_satisfies_snapshot = robert_tuple_satisfies_snapshot, + .compute_xid_horizon_for_tuples = robert_compute_xid_horizon_for_tuples, + + .tuple_insert = robert_tuple_insert, + .tuple_insert_speculative = robert_tuple_insert_speculative, + .tuple_complete_speculative = robert_tuple_complete_speculative, + .multi_insert = robert_multi_insert, + .tuple_delete = robert_tuple_delete, + .tuple_update = robert_tuple_update, + .tuple_lock = robert_tuple_lock, + .finish_bulk_insert = robert_finish_bulk_insert, + + .relation_set_new_filenode = robert_relation_set_new_filenode, + .relation_nontransactional_truncate = robert_relation_nontransactional_truncate, + .relation_copy_data = robert_relation_copy_data, + .relation_copy_for_cluster = robert_relation_copy_for_cluster, + .relation_vacuum = robert_relation_vacuum, + .scan_analyze_next_block = robert_scan_analyze_next_block, + .scan_analyze_next_tuple = robert_scan_analyze_next_tuple, + .index_build_range_scan = robert_index_build_range_scan, + .index_validate_scan = robert_index_validate_scan, + + .relation_size = table_block_relation_size, + .relation_needs_toast_table = robert_relation_needs_toast_table, + .relation_toast_am = robert_relation_toast_am, + .toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE, + + .relation_estimate_size = robert_relation_estimate_size, + + .scan_bitmap_next_block = robert_scan_bitmap_next_block, + .scan_bitmap_next_tuple = robert_scan_bitmap_next_tuple, + + .scan_sample_next_block = robert_scan_sample_next_block, + .scan_sample_next_tuple = robert_scan_sample_next_tuple +}; + +Datum +robert_tableam_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&robertam_methods); +} + + +bool +robert_tuple_fetch_row_version(Relation rel, ItemPointer tid, + Snapshot snapshot, TupleTableSlot *slot) +{ + return false; +} + +bool +robert_tuple_tid_valid(TableScanDesc sscan, ItemPointer tid) +{ + RobertScanDesc scan = (RobertScanDesc) sscan; + + return ItemPointerIsValid(tid) && + ItemPointerGetBlockNumber(tid) < scan->rrs_nblocks; +} + +void +robert_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid) +{ +} + +bool +robert_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, + Snapshot snapshot) +{ + return false; +} + +TransactionId +robert_compute_xid_horizon_for_tuples(Relation rel, + ItemPointerData *items, + int nitems) +{ + return InvalidTransactionId; +} + +/* + * Check to see whether the table needs a TOAST table. It does only if + * (1) there are any toastable attributes, and (2) the maximum length + * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to + * create a toast table for something like "f1 varchar(20)".) + */ +bool +robert_relation_needs_toast_table(Relation rel) +{ + int32 data_length = 0; + bool maxlength_unknown = false; + bool has_toastable_attrs = false; + TupleDesc tupdesc = rel->rd_att; + int32 tuple_length; + int i; + + for (i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + if (att->attisdropped) + continue; + + /* we don't align if it's pass-by-value */ + if (!att->attbyval) + data_length = att_align_nominal(data_length, att->attalign); + + if (att->attlen > 0) + { + /* Fixed-length types are never toastable */ + data_length += att->attlen; + } + else + { + int32 maxlen = type_maximum_size(att->atttypid, + att->atttypmod); + + if (maxlen < 0) + maxlength_unknown = true; + else + data_length += maxlen; + if (att->attstorage != 'p') + has_toastable_attrs = true; + } + } + if (!has_toastable_attrs) + return false; /* nothing to toast? */ + if (maxlength_unknown) + return true; /* any unlimited-length attrs? */ + tuple_length = SizeofRobertTupleHeader + BITMAPLEN(tupdesc->natts) + + data_length; + return (tuple_length > TOAST_TUPLE_THRESHOLD); +} + +/* + * robert_relation_toast_am + * + * TOAST tables for robert relations are just robert relations. + */ +Oid +robert_relation_toast_am(Relation rel) +{ + return rel->rd_rel->relam; +} + +/* + * robert_relation_estimate_size + * + * Each tuple involves an item pointer and an (unaligned) RobertTupleHeader. + * + * We don't currently have any special space, so only the size of the page + * header needs to be subtracted from the number of usable bytes per page. + */ +void +robert_relation_estimate_size(Relation rel, int32 *attr_widths, + BlockNumber *pages, double *tuples, + double *allvisfrac) +{ + const Size overhead_bytes_per_tuple = + SizeofRobertTupleHeader + sizeof(ItemIdData); + const Size usable_bytes_per_page = BLCKSZ - SizeOfPageHeaderData; + + table_block_relation_estimate_size(rel, attr_widths, pages, tuples, + allvisfrac, + overhead_bytes_per_tuple, + usable_bytes_per_page); +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index c57eca240f..10f8226a1e 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -17,6 +17,7 @@ #include "access/brin_xlog.h" #include "access/multixact.h" #include "access/nbtxlog.h" +#include "access/robert_xlog.h" #include "access/spgxlog.h" #include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 272edcbcba..3bf3e8dcd9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -133,6 +133,12 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeLogicalMsgOp(ctx, &buf); break; + case RM_ROBERT_ID: + /* XXX. We sure need to do something better here. */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); + break; + /* * Rmgrs irrelevant for logical decoding; they describe stuff not * represented in logical decoding. Add new rmgrs in rmgrlist.h's diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 976f80e9c3..b8177ab2fd 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -19,6 +19,7 @@ #include "access/multixact.h" #include "access/nbtxlog.h" #include "access/rmgr.h" +#include "access/robert_xlog.h" #include "access/spgxlog.h" #include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 6da5930e0b..b6e6754447 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -49,3 +49,4 @@ PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL, NULL) PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL, NULL) PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_ROBERT_ID, "Robert", robert_redo, robert_desc, robert_identify, NULL, NULL, NULL, robert_undo, NULL, robert_undo_desc) diff --git a/src/include/access/robert_page.h b/src/include/access/robert_page.h new file mode 100644 index 0000000000..cf94de9691 --- /dev/null +++ b/src/include/access/robert_page.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * robert_page.h + * + *------------------------------------------------------------------------- + */ + +#ifndef ROBERT_PAGE_H +#define ROBERT_PAGE_H + +#include "access/robert_tuple.h" +#include "access/undolog.h" +#include "storage/block.h" +#include "storage/itemptr.h" +#include "storage/off.h" + +/* External function prototypes. */ +extern OffsetNumber robert_page_free_offset(Page page, Size size); +extern void robert_page_add_item(Page page, OffsetNumber offnum, + RobertTuple tuple); + +#endif diff --git a/src/include/access/robert_scan.h b/src/include/access/robert_scan.h new file mode 100644 index 0000000000..d47d2b51ad --- /dev/null +++ b/src/include/access/robert_scan.h @@ -0,0 +1,79 @@ +/* + * robert_scan.h + */ + +#ifndef ROBERT_SCAN_H +#define ROBERT_SCAN_H + +#include "access/relscan.h" +#include "access/sdir.h" +#include "access/skey.h" +#include "executor/tuptable.h" +#include "nodes/execnodes.h" +#include "nodes/tidbitmap.h" +#include "utils/snapshot.h" + +typedef enum RobertScanState +{ + ROBERT_SCAN_NOT_STARTED, + ROBERT_SCAN_BLOCK_DONE, + ROBERT_SCAN_TUPLE_DONE, + ROBERT_SCAN_READY +} RobertScanState; + +typedef struct RobertScanDescData +{ + TableScanDescData rrs_base; /* AM independent part of the descriptor */ + BlockNumber rrs_nblocks; + BlockNumber rrs_startblock; + BlockNumber rrs_numblocks; + RobertScanState rrs_state; + BlockNumber rrs_cblock; + OffsetNumber rrs_coffset; + OffsetNumber rrs_lastoffset; + int rrs_tupindex; + BufferAccessStrategy rrs_strategy; + Page rrs_cpage; +} RobertScanDescData; + +typedef RobertScanDescData *RobertScanDesc; + +/* Table scans. */ +extern TableScanDesc robert_scan_begin(Relation rel, Snapshot snapshot, + int nkeys, ScanKeyData *key, + ParallelTableScanDesc pscan, uint32 flags); +extern void robert_scan_end(TableScanDesc sscan); +extern void robert_scan_rescan(TableScanDesc sscan, ScanKeyData *key, + bool set_params, bool allow_strat, + bool allow_sync, bool allow_pagemode); +extern bool robert_scan_getnextslot(TableScanDesc sscan, + ScanDirection direction, + TupleTableSlot *slot); + +/* Index scans. */ +extern IndexFetchTableData *robert_index_fetch_begin(Relation rel); +extern void robert_index_fetch_reset(IndexFetchTableData *data); +extern void robert_index_fetch_end(IndexFetchTableData *data); +extern bool robert_index_fetch_tuple(IndexFetchTableData *data, + ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, + bool *call_again, bool *all_dead); + +/* Bitmap scans. */ +extern bool robert_scan_bitmap_next_block(TableScanDesc sscan, + TBMIterateResult *tbmres); +extern bool robert_scan_bitmap_next_tuple(TableScanDesc sscan, + TBMIterateResult *tbmres, + TupleTableSlot *slot); + +/* Sample scans. */ +extern bool robert_scan_sample_next_block(TableScanDesc scan, + SampleScanState *scanstate); +extern bool robert_scan_sample_next_tuple(TableScanDesc scan, + SampleScanState *scanstate, + TupleTableSlot *slot); + +/* Internal functions */ +extern BlockNumber robert_scan_get_blocks_done(RobertScanDesc scan); + +#endif diff --git a/src/include/access/robert_slot.h b/src/include/access/robert_slot.h new file mode 100644 index 0000000000..8e7585b21d --- /dev/null +++ b/src/include/access/robert_slot.h @@ -0,0 +1,31 @@ +/* + * robert_slot.h + */ + +#ifndef ROBERT_SLOT_H +#define ROBERT_SLOT_H + +#include "access/robert_tuple.h" +#include "executor/tuptable.h" +#include "utils/rel.h" + +typedef struct RobertTupleTableSlot +{ + TupleTableSlot base; + RobertTuple tuple; + uint32 off; +} RobertTupleTableSlot; + +extern PGDLLIMPORT const TupleTableSlotOps TTSOpsRobert; + +extern const TupleTableSlotOps *robert_slot_callbacks(Relation relation); +extern void robert_slot_store(TupleTableSlot *slot, RobertTupleHeader td, + Size len, Oid tableOid, BlockNumber blkno, + OffsetNumber offset); +extern bool robert_slot_store_visible(TupleTableSlot *slot, + RobertTupleHeader td, Size len, + Oid tableOid, BlockNumber blkno, + OffsetNumber offset, Snapshot snapshot, + bool item_is_dead); + +#endif diff --git a/src/include/access/robert_tuple.h b/src/include/access/robert_tuple.h new file mode 100644 index 0000000000..28bbc90885 --- /dev/null +++ b/src/include/access/robert_tuple.h @@ -0,0 +1,60 @@ +/* + * robert_tuple.h + */ + +#ifndef ROBERT_TUPLE_H +#define ROBERT_TUPLE_H + +#include "access/transam.h" +#include "access/tupdesc.h" +#include "access/undolog.h" +#include "storage/itemptr.h" +#include "utils/relcache.h" + +struct TupleTableSlot; + +typedef struct RobertTupleHeaderData +{ + UndoRecPtr r_undoptr; + uint16 r_flags; + uint8 r_hoff; + bits8 r_bits[FLEXIBLE_ARRAY_MEMBER]; +} RobertTupleHeaderData; + +#define SizeofRobertTupleHeader offsetof(RobertTupleHeaderData, r_bits) + +/* Flags for use in r_flags. */ +#define ROBERT_NATTS_MASK 0x07FF /* 11 bits */ +#define ROBERT_HASEXTERNAL 0x0800 + +typedef RobertTupleHeaderData *RobertTupleHeader; + +typedef struct RobertTupleData +{ + uint32 r_len; + RobertTupleHeader r_data; + Oid r_tableOid; + ItemPointerData r_self; +} RobertTupleData; + +typedef RobertTupleData *RobertTuple; + +#define ROBERTTUPLESIZE MAXALIGN(sizeof(RobertTupleData)) + +extern Size robert_compute_data_size(TupleDesc tupleDesc, Datum *values, + bool *isnull, uint8 hoff); +extern void robert_fill_null_bitmap(bits8 *bits, int entries_needed, + bool *isnull); +extern void robert_fill_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull, + RobertTupleHeader td, Size len); +extern RobertTuple robert_form_tuple(TupleDesc tupleDesc, Datum *values, + bool *isnull); +extern void robert_deform_tuple(TupleDesc tupleDesc, RobertTuple tuple, + Datum *values, bool *isnull, int natts, int oldnatts, + uint32 *offp); +extern RobertTuple robert_toast_tuple(Relation rel, + struct TupleTableSlot *slot, + int options); +extern char *robert_print_tuple(RobertTuple tuple, TupleDesc tupleDesc); + +#endif diff --git a/src/include/access/robert_xlog.h b/src/include/access/robert_xlog.h new file mode 100644 index 0000000000..67496ff0b0 --- /dev/null +++ b/src/include/access/robert_xlog.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * robert_xlog.h + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/robert_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef ROBERT_XLOG_H +#define ROBERT_XLOG_H + +#include "access/undoaccess.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" + +#define ROBERT_UNDO_INSERT 1 +#define ROBERT_UNDO_DELETE 2 + +extern void robert_redo(XLogReaderState *record); +extern void robert_desc(StringInfo buf, XLogReaderState *record); +extern const char *robert_identify(uint8 info); + +extern void robert_undo(int nrecords, UndoRecInfo *records); +extern void robert_undo_desc(StringInfo buf, UnpackedUndoRecord *record); + +#endif /* ROBERT_XLOG_H */ diff --git a/src/include/access/robertam.h b/src/include/access/robertam.h new file mode 100644 index 0000000000..b6c3bc42d9 --- /dev/null +++ b/src/include/access/robertam.h @@ -0,0 +1,103 @@ +/* + * robertam.h + */ + +#ifndef ROBERTAM_H +#define ROBERTAM_H + +#include "access/hio.h" +#include "access/skey.h" +#include "access/tableam.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "nodes/execnodes.h" + +/* MVCC. */ +extern bool robert_tuple_fetch_row_version(Relation rel, ItemPointer tid, + Snapshot snapshot, TupleTableSlot *slot); +extern bool robert_tuple_tid_valid(TableScanDesc sscan, ItemPointer tid); +extern void robert_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid); +extern bool robert_tuple_satisfies_snapshot(Relation rel, + TupleTableSlot *slot, + Snapshot snapshot); +extern TransactionId robert_compute_xid_horizon_for_tuples(Relation rel, + ItemPointerData *items, + int nitems); + +/* DML */ +extern void robert_tuple_insert(Relation rel, TupleTableSlot *slot, + CommandId cid, int options, + BulkInsertStateData *bistate); +extern void robert_tuple_insert_speculative(Relation rel, TupleTableSlot *slot, + CommandId cid, int options, + BulkInsertStateData *bistate, + uint32 specToken); +extern void robert_tuple_complete_speculative(Relation rel, + TupleTableSlot *slot, uint32 specToken, + bool succeeded); +extern void robert_multi_insert(Relation rel, + TupleTableSlot **slots, int nslots, + CommandId cid, int options, + BulkInsertStateData *bistate); +extern TM_Result robert_tuple_delete(Relation rel, ItemPointer tid, + CommandId cid, Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, bool changingPart); +extern TM_Result robert_tuple_update(Relation rel, ItemPointer otid, + TupleTableSlot *slot, CommandId cid, Snapshot snapshot, + Snapshot crosscheck, bool wait, TM_FailureData *tmfd, + LockTupleMode *lockmode, bool *update_indexes); +extern TM_Result robert_tuple_lock(Relation rel, ItemPointer tid, + Snapshot snapshot, TupleTableSlot *slot, CommandId cid, + LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd); +extern void robert_finish_bulk_insert(Relation rel, int options); + + +/* DDL. */ +extern void robert_relation_set_new_filenode(Relation rel, + const RelFileNode *newrnode, + char persistence, + TransactionId *freezeXid, + MultiXactId *minmulti); +extern void robert_relation_nontransactional_truncate(Relation rel); +extern void robert_relation_copy_data(Relation rel, + const RelFileNode *newrnode); +extern void robert_relation_copy_for_cluster(Relation NewHeap, + Relation OldHeap, Relation OldIndex, + bool use_sort, TransactionId OldestXmin, + TransactionId *xid_cutoff, + MultiXactId *multi_cutoff, + double *num_tuples, double *tups_vacuumed, + double *tups_recently_dead); +extern void robert_relation_vacuum(Relation onerel, VacuumParams *params, + BufferAccessStrategy bstrategy); +extern bool robert_scan_analyze_next_block(TableScanDesc scan, + BlockNumber blockno, + BufferAccessStrategy bstrategy); +extern bool robert_scan_analyze_next_tuple(TableScanDesc scan, + TransactionId OldestXmin, + double *liverows, double *deadrows, + TupleTableSlot *slot); +extern double robert_index_build_range_scan(Relation heap_rel, + Relation index_rel, IndexInfo *index_nfo, + bool allow_sync, bool anyvisible, bool progress, + BlockNumber start_blockno, + BlockNumber end_blockno, + IndexBuildCallback callback, + void *callback_state, + TableScanDesc scan); +extern void robert_index_validate_scan(Relation heap_rel, Relation index_rel, + IndexInfo *index_info, Snapshot snapshot, + ValidateIndexState *state); + +/* Miscellaneous. */ +extern uint64 robert_relation_size(Relation rel, ForkNumber forkNumber); +extern bool robert_relation_needs_toast_table(Relation rel); +extern Oid robert_relation_toast_am(Relation rel); + +/* Planner. */ +extern void robert_relation_estimate_size(Relation rel, int32 *attr_widths, + BlockNumber *pages, double *tuples, + double *allvisfrac); + +#endif diff --git a/src/include/catalog/pg_am.dat b/src/include/catalog/pg_am.dat index 393b41dd68..d855db86d4 100644 --- a/src/include/catalog/pg_am.dat +++ b/src/include/catalog/pg_am.dat @@ -33,5 +33,8 @@ { oid => '3580', oid_symbol => 'BRIN_AM_OID', descr => 'block range index (BRIN) access method', amname => 'brin', amhandler => 'brinhandler', amtype => 'i' }, +{ oid => '8192', oid_symbol => 'ROBERT_HEAP_TABLE_AM_OID', + descr => 'robert table access method', + amname => 'robert', amhandler => 'robert_tableam_handler', amtype => 't' }, ] diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index de475cb2d4..3417efc7aa 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -873,6 +873,11 @@ proname => 'heap_tableam_handler', provolatile => 'v', prorettype => 'table_am_handler', proargtypes => 'internal', prosrc => 'heap_tableam_handler' }, +{ oid => '8193', oid_symbol => 'ROBERT_TABLE_AM_HANDLER_OID', + descr => 'robert table access method handler', + proname => 'robert_tableam_handler', provolatile => 'v', + prorettype => 'table_am_handler', proargtypes => 'internal', + prosrc => 'robert_tableam_handler' }, # Index access method handlers { oid => '330', descr => 'btree index access method handler', diff --git a/src/test/regress/expected/create_am.out b/src/test/regress/expected/create_am.out index 84da403afc..f9301388ee 100644 --- a/src/test/regress/expected/create_am.out +++ b/src/test/regress/expected/create_am.out @@ -126,11 +126,12 @@ ERROR: function int4in(internal) does not exist CREATE ACCESS METHOD bogus TYPE TABLE HANDLER bthandler; ERROR: function bthandler must return type table_am_handler SELECT amname, amhandler, amtype FROM pg_am where amtype = 't' ORDER BY 1, 2; - amname | amhandler | amtype ---------+----------------------+-------- - heap | heap_tableam_handler | t - heap2 | heap_tableam_handler | t -(2 rows) + amname | amhandler | amtype +--------+------------------------+-------- + heap | heap_tableam_handler | t + heap2 | heap_tableam_handler | t + robert | robert_tableam_handler | t +(3 rows) -- First create tables employing the new AM using USING -- plain CREATE TABLE diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 432d2d812e..d792df5236 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3459,3 +3459,8 @@ yyscan_t z_stream z_streamp zic_t +RobertTuple +RobertTupleData +RobertTupleHeader +RobertTupleHeaderData +RobertTupleTableSlot