include $(top_builddir)/src/Makefile.global
SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist \
- table tablesample transam undo
+ table tablesample transam undo robert
include $(top_srcdir)/src/backend/common.mk
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
- mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
- smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \
- undologdesc.o xactdesc.o xlogdesc.o
+ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o robertdesc.o \
+ seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o \
+ undoactiondesc.o undologdesc.o xactdesc.o xlogdesc.o
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * robertdesc.c
+ * rmgr descriptor routines for access/robert
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/robertdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/robert_xlog.h"
+
+void
+robert_desc(StringInfo buf, XLogReaderState *record)
+{
+#if 0
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ info &= XLOG_HEAP_OPMASK;
+#endif
+}
+
+const char *
+robert_identify(uint8 info)
+{
+ const char *id = NULL;
+
+ switch (info & ~XLR_INFO_MASK)
+ {
+ default:
+ id = "robert";
+ }
+
+ return id;
+}
--- /dev/null
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for access/robert
+#
+# IDENTIFICATION
+# src/backend/access/robert/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/robert
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = robert_ddl.o robert_dml.o robert_page.o robert_scan.o \
+ robert_slot.o robert_tuple.o robert_xlog.o robertam.o
+
+include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+/*
+ * robert_ddl.c
+ */
+
+#include "postgres.h"
+
+#include "access/multixact.h"
+#include "access/robert_scan.h"
+#include "access/robertam.h"
+#include "catalog/catalog.h"
+#include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
+#include "commands/progress.h"
+#include "executor/executor.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "storage/procarray.h"
+
+/*
+ * robert_relation_set_new_filenode
+ *
+ * We do everything using FullTransactionIds and do not use MultiXactIds, so no
+ * freezing is required.
+ */
+void
+robert_relation_set_new_filenode(Relation rel,
+ const RelFileNode *newrnode,
+ char persistence,
+ TransactionId *freezeXid,
+ MultiXactId *minmulti)
+{
+ SMgrRelation srel;
+
+ /* No freezing required. */
+ *freezeXid = InvalidTransactionId;
+ *minmulti = InvalidMultiXactId;
+
+ /* Create main fork . */
+ srel = RelationCreateStorage(rel->rd_node, persistence);
+
+ /* If required, set up an init fork for an unlogged table. */
+ if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
+ {
+ smgrcreate(srel, INIT_FORKNUM, false);
+ log_smgrcreate(&srel->smgr_rnode.node, INIT_FORKNUM);
+ }
+}
+
+/*
+ * robert_relation_nontransactional_truncate
+ */
+void
+robert_relation_nontransactional_truncate(Relation rel)
+{
+ RelationTruncate(rel, 0);
+}
+
+void
+robert_relation_copy_data(Relation rel, const RelFileNode *newrnode)
+{
+}
+
+void
+robert_relation_copy_for_cluster(Relation NewHeap,
+ Relation OldHeap, Relation OldIndex,
+ bool use_sort, TransactionId OldestXmin,
+ TransactionId *xid_cutoff,
+ MultiXactId *multi_cutoff,
+ double *num_tuples, double *tups_vacuumed,
+ double *tups_recently_dead)
+{
+}
+
+void
+robert_relation_vacuum(Relation onerel, VacuumParams *params,
+ BufferAccessStrategy bstrategy)
+{
+}
+
+bool
+robert_scan_analyze_next_block(TableScanDesc scan,
+ BlockNumber blockno,
+ BufferAccessStrategy bstrategy)
+{
+ return false;
+}
+
+bool
+robert_scan_analyze_next_tuple(TableScanDesc scan,
+ TransactionId OldestXmin,
+ double *liverows, double *deadrows,
+ TupleTableSlot *slot)
+{
+ return false;
+}
+
+/*
+ * robert_index_build_range_scan
+ *
+ * XXX. There is a HUGE amount of duplication with the heap here.
+ */
+double
+robert_index_build_range_scan(Relation heapRelation,
+ Relation indexRelation, IndexInfo *indexInfo,
+ bool allow_sync, bool anyvisible, bool progress,
+ BlockNumber start_blockno,
+ BlockNumber numblocks,
+ IndexBuildCallback callback,
+ void *callback_state,
+ TableScanDesc scan)
+{
+ RobertScanDesc hscan;
+ bool is_system_catalog;
+ bool checking_uniqueness;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ double reltuples;
+ ExprState *predicate;
+ TupleTableSlot *slot;
+ EState *estate;
+ ExprContext *econtext;
+ Snapshot snapshot;
+ bool need_unregister_snapshot = false;
+ TransactionId OldestXmin;
+ BlockNumber previous_blkno = InvalidBlockNumber;
+
+ /*
+ * sanity checks
+ */
+ Assert(OidIsValid(indexRelation->rd_rel->relam));
+
+ /* Remember if it's a system catalog */
+ is_system_catalog = IsSystemRelation(heapRelation);
+
+ /* See whether we're verifying uniqueness/exclusion properties */
+ checking_uniqueness = (indexInfo->ii_Unique ||
+ indexInfo->ii_ExclusionOps != NULL);
+
+ /*
+ * "Any visible" mode is not compatible with uniqueness checks; make sure
+ * only one of those is requested.
+ */
+ Assert(!(anyvisible && checking_uniqueness));
+
+ /*
+ * Need an EState for evaluation of index expressions and partial-index
+ * predicates. Also a slot to hold the current tuple.
+ */
+ estate = CreateExecutorState();
+ econtext = GetPerTupleExprContext(estate);
+ slot = table_slot_create(heapRelation, NULL);
+
+ /* Arrange for econtext's scan tuple to be the tuple under test */
+ econtext->ecxt_scantuple = slot;
+
+ /* Set up execution state for predicate, if any. */
+ predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
+
+ /*
+ * Prepare for scan of the base relation. In a normal index build, we use
+ * SnapshotAny because we must retrieve all tuples and do our own time
+ * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+ * concurrent build, or during bootstrap, we take a regular MVCC snapshot
+ * and index whatever's live according to that.
+ */
+ OldestXmin = InvalidTransactionId;
+
+ /* okay to ignore lazy VACUUMs here */
+ if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
+ OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+
+ if (!scan)
+ {
+ /*
+ * Serial index build.
+ *
+ * Must begin our own heap scan in this case. We may also need to
+ * register a snapshot whose lifetime is under our direct control.
+ */
+ if (!TransactionIdIsValid(OldestXmin))
+ {
+ snapshot = RegisterSnapshot(GetTransactionSnapshot());
+ need_unregister_snapshot = true;
+ }
+ else
+ snapshot = SnapshotAny;
+
+ scan = table_beginscan_strat(heapRelation, /* relation */
+ snapshot, /* snapshot */
+ 0, /* number of keys */
+ NULL, /* scan key */
+ true, /* buffer access strategy OK */
+ allow_sync); /* syncscan OK? */
+ }
+ else
+ {
+ /*
+ * Parallel index build.
+ *
+ * Parallel case never registers/unregisters own snapshot. Snapshot
+ * is taken from parallel heap scan, and is SnapshotAny or an MVCC
+ * snapshot, based on same criteria as serial case.
+ */
+ Assert(!IsBootstrapProcessingMode());
+ Assert(allow_sync);
+ snapshot = scan->rs_snapshot;
+ }
+
+ hscan = (RobertScanDesc) scan;
+
+ /* Publish number of blocks to scan */
+ if (progress)
+ {
+ BlockNumber nblocks;
+
+ if (hscan->rrs_base.rs_parallel != NULL)
+ {
+ ParallelBlockTableScanDesc pbscan;
+
+ pbscan = (ParallelBlockTableScanDesc) hscan->rrs_base.rs_parallel;
+ nblocks = pbscan->phs_nblocks;
+ }
+ else
+ nblocks = hscan->rrs_nblocks;
+
+ pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_TOTAL,
+ nblocks);
+ }
+
+ /*
+ * Must call GetOldestXmin() with SnapshotAny. Should never call
+ * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
+ * this for parallel builds, since ambuild routines that support parallel
+ * builds must work these details out for themselves.)
+ */
+ Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
+ Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
+ !TransactionIdIsValid(OldestXmin));
+ Assert(snapshot == SnapshotAny || !anyvisible);
+
+ /* set our scan endpoints */
+ if (!allow_sync)
+ {
+ hscan->rrs_startblock = start_blockno;
+ hscan->rrs_numblocks = numblocks;
+ }
+ else
+ {
+ /* syncscan can only be requested on whole relation */
+ Assert(start_blockno == 0);
+ Assert(numblocks == InvalidBlockNumber);
+ }
+
+ reltuples = 0;
+
+ /*
+ * Scan all tuples in the base relation.
+ */
+ while (robert_scan_getnextslot(scan, ForwardScanDirection, slot))
+ {
+ bool tupleIsAlive;
+ HeapTupleData htdata;
+ BlockNumber blocks_done = robert_scan_get_blocks_done(hscan);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Report scan progress, if asked to. */
+ if (progress && blocks_done != previous_blkno)
+ {
+ pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
+ blocks_done);
+ previous_blkno = blocks_done;
+ }
+
+ /* do our own time qual check */
+ /* XXX. This just completely ignores MVCC considerations. */
+ tupleIsAlive = true;
+ reltuples += 1;
+
+ MemoryContextReset(econtext->ecxt_per_tuple_memory);
+
+ /*
+ * In a partial index, discard tuples that don't satisfy the
+ * predicate.
+ */
+ if (predicate != NULL)
+ {
+ if (!ExecQual(predicate, econtext))
+ continue;
+ }
+
+ /*
+ * For the current heap tuple, extract all the attributes we use in
+ * this index, and note which are null. This also performs evaluation
+ * of any expressions needed.
+ */
+ FormIndexDatum(indexInfo,
+ slot,
+ estate,
+ values,
+ isnull);
+
+ /*
+ * Call the AM's callback routine to process the tuple
+ *
+ * You'd think we should go ahead and build the index tuple here, but
+ * some index AMs want to do further processing on the data first. So
+ * pass the values[] and isnull[] arrays, instead.
+ *
+ * XXX. Why the heck does this callback accept a HeapTuple?
+ */
+ htdata.t_data = NULL;
+ htdata.t_len = 0;
+ ItemPointerCopy(&slot->tts_tid, &htdata.t_self);
+ htdata.t_tableOid = RelationGetRelid(heapRelation);
+ callback(indexRelation, &htdata, values, isnull, tupleIsAlive,
+ callback_state);
+
+ /* Stop if a block limit was specified and has been reached. */
+ if (numblocks != InvalidBlockNumber && blocks_done >= numblocks)
+ break;
+ }
+
+ /* Report scan progress one last time. */
+ if (progress)
+ {
+ BlockNumber blks_done;
+
+ if (hscan->rrs_base.rs_parallel != NULL)
+ {
+ ParallelBlockTableScanDesc pbscan;
+
+ pbscan = (ParallelBlockTableScanDesc) hscan->rrs_base.rs_parallel;
+ blks_done = pbscan->phs_nblocks;
+ }
+ else
+ blks_done = hscan->rrs_nblocks;
+
+ pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
+ blks_done);
+ }
+
+ table_endscan(scan);
+
+ /* we can now forget our snapshot, if set and registered by us */
+ if (need_unregister_snapshot)
+ UnregisterSnapshot(snapshot);
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ FreeExecutorState(estate);
+
+ /* These may have been pointing to the now-gone estate */
+ indexInfo->ii_ExpressionsState = NIL;
+ indexInfo->ii_PredicateState = NULL;
+
+ return reltuples;
+}
+
+void
+robert_index_validate_scan(Relation heap_rel, Relation index_rel,
+ IndexInfo *index_info, Snapshot snapshot,
+ ValidateIndexState *state)
+{
+}
--- /dev/null
+/*
+ * robert_dml.c
+ */
+
+#include "postgres.h"
+
+#include "access/robert_page.h"
+#include "access/robert_slot.h"
+#include "access/robert_xlog.h"
+#include "access/robertam.h"
+#include "access/undoaccess.h"
+#include "access/undorecord.h"
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
+#include "storage/procarray.h"
+
+typedef enum
+{
+ CHOOSE_BLOCK_WITH_FREESPACE,
+ CHOOSE_LAST_BLOCK,
+ EXTEND_RELATION
+} BufferForTupleStrategy;
+
+static BlockNumber
+robert_choose_block(Relation rel, int len, BufferForTupleStrategy *strategy)
+{
+ BlockNumber blkno;
+
+ switch (*strategy)
+ {
+ case CHOOSE_BLOCK_WITH_FREESPACE:
+ blkno = GetPageWithFreeSpace(rel, len + 0.04 * BLCKSZ); /* XXX */
+ if (blkno != InvalidBlockNumber)
+ return blkno;
+ *strategy = CHOOSE_LAST_BLOCK;
+ /* FALLTHROUGH */
+
+ case CHOOSE_LAST_BLOCK:
+ blkno = RelationGetNumberOfBlocks(rel);
+ *strategy = EXTEND_RELATION;
+ if (blkno > 0)
+ return blkno - 1;
+ /* FALLTHROUGH */
+
+ case EXTEND_RELATION:
+ return P_NEW;
+ }
+
+ pg_unreachable();
+}
+
+/*
+ * Attempt to insert a tuple into a specified block of a relation.
+ *
+ * If the page is too full, either in terms of storage or of line pointers,
+ * then this function will do nothing and return false. Otherwise, it will
+ * insert the tuple and return true.
+ */
+static bool
+robert_tuple_try_insert(Relation rel, BlockNumber blkno, RobertTuple tuple,
+ CommandId cid, int options)
+{
+ Buffer buffer = ReadBuffer(rel, blkno);
+ Page page = BufferGetPage(buffer);
+ OffsetNumber offnum;
+ UndoRecordInsertContext context = {{0}};
+ UnpackedUndoRecord undorecord;
+ FullTransactionId fxid;
+
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * Choose an offset number at which to insert this tuple.
+ *
+ * If this is a new page, always use FirstOffsetNumber. Otherwise, let
+ * robert_page_free_offset find us a usable offset number.
+ */
+ if (blkno == P_NEW)
+ {
+ /* Initialize new page. */
+ PageInit(page, BufferGetPageSize(buffer), 0);
+
+ /* Find real page number. */
+ blkno = BufferGetBlockNumber(buffer);
+
+ /* All offsets available, so use the first one. */
+ offnum = FirstOffsetNumber;
+ }
+ else
+ {
+ /* Look for a usable offset. */
+ offnum = robert_page_free_offset(page, tuple->r_len);
+ if (offnum == InvalidOffsetNumber)
+ {
+ /* This page is not usable for this tuple; e.g. too full. */
+ UnlockReleaseBuffer(buffer);
+ return false;
+ }
+ }
+
+ /* Prepare undo record. */
+ fxid = GetTopFullTransactionId();
+ undorecord.uur_rmid = RM_ROBERT_ID;
+ undorecord.uur_type = ROBERT_UNDO_INSERT;
+ undorecord.uur_info = 0;
+ undorecord.uur_reloid = RelationGetRelid(rel);
+ undorecord.uur_cid = GetCurrentCommandId(true);
+ undorecord.uur_fork = MAIN_FORKNUM;
+ undorecord.uur_prevundo = InvalidUndoRecPtr;
+ undorecord.uur_block = blkno;
+ undorecord.uur_offset = offnum;
+ undorecord.uur_fxid = fxid;
+ undorecord.uur_payload.len = 0;
+ undorecord.uur_tuple.len = 0;
+ undorecord.uur_txn = NULL;
+ undorecord.uur_logswitch = NULL;
+
+ /* Prepare to insert the undo record and update the tuple control data. */
+ BeginUndoRecordInsert(&context, UndoLogCategoryForRelation(rel), 1, NULL);
+ tuple->r_tableOid = undorecord.uur_reloid;
+ ItemPointerSet(&tuple->r_self, blkno, offnum);
+ tuple->r_data->r_undoptr = PrepareUndoInsert(&context, &undorecord,
+ MyDatabaseId);
+
+ /* Perform the actual insert. */
+ START_CRIT_SECTION();
+ robert_page_add_item(page, offnum, tuple);
+ MarkBufferDirty(buffer);
+ InsertPreparedUndo(&context);
+ END_CRIT_SECTION();
+
+ /* All done. */
+ FinishUndoRecordInsert(&context);
+ UnlockReleaseBuffer(buffer);
+ return true;
+}
+
+/*
+ * robert_tuple_insert
+ *
+ * Insert a single tuple.
+ */
+void
+robert_tuple_insert(Relation rel, TupleTableSlot *slot,
+ CommandId cid, int options,
+ BulkInsertStateData *bistate)
+{
+ RobertTuple tuple = ((RobertTupleTableSlot *) slot)->tuple;
+ bool done = false;
+ BufferForTupleStrategy strategy = CHOOSE_BLOCK_WITH_FREESPACE;
+
+ tuple = robert_toast_tuple(rel, slot, options);
+
+ while (!done)
+ {
+ BlockNumber blkno;
+
+ blkno = robert_choose_block(rel, tuple->r_len, &strategy);
+ done = robert_tuple_try_insert(rel, blkno, tuple, cid, options);
+ }
+
+ elog(NOTICE, "insert %u (%u,%u): %s",
+ RelationGetRelid(rel),
+ ItemPointerGetBlockNumber(&tuple->r_self),
+ ItemPointerGetOffsetNumber(&tuple->r_self),
+ robert_print_tuple(tuple, slot->tts_tupleDescriptor));
+}
+
+void
+robert_tuple_insert_speculative(Relation rel, TupleTableSlot *slot,
+ CommandId cid, int options,
+ BulkInsertStateData *bistate,
+ uint32 specToken)
+{
+}
+
+void
+robert_tuple_complete_speculative(Relation rel, TupleTableSlot *slot,
+ uint32 specToken, bool succeeded)
+{
+}
+
+void
+robert_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+ CommandId cid, int options,
+ BulkInsertStateData *bistate)
+{
+}
+
+/*
+ * Check whether it's OK to modify a tuple.
+ *
+ * This function determines whether or not it's OK for us to modify a given
+ * tuple, similar to HeapTupleSatisfiesUpdate.
+ */
+static TM_Result
+robert_check_modify(UndoRecPtr undoptr, Snapshot snapshot, CommandId cid)
+{
+ UndoRecordFetchContext context;
+ UnpackedUndoRecord *uur;
+ TM_Result result = TM_Ok;
+ /* XXX bool recheck_for_invisible = false; */
+ TransactionId uxid;
+ CommandId ucid;
+ uint8 utype;
+
+ BeginUndoFetch(&context);
+
+ uur = UndoFetchRecord(&context, undoptr);
+ utype = uur->uur_type;
+ uxid = XidFromFullTransactionId(uur->uur_fxid);
+ ucid = uur->uur_cid;
+ UndoRecordRelease(uur);
+
+ if (TransactionIdIsCurrentTransactionId(uxid))
+ {
+ /* XXX. Not always. */
+ result = TM_SelfModified;
+ }
+ else if (TransactionIdIsInProgress(uxid))
+ {
+ /* XXX Do stuff. */
+ }
+ else if (TransactionIdDidCommit(uxid))
+ {
+ /*
+ * XXX. Is the snapshot guaranteed to be an MVCC snapshot? If not,
+ * what are we supposed to do in that case?
+ */
+ Assert(IsMVCCSnapshot(snapshot));
+ /* XXX. If XID is visible, then do stuff, otherwise different stuff? */
+ }
+ else
+ {
+ /* It must have aborted but not yet been cleaned up. */
+ }
+
+ /*
+ * XXX. We need to look up uur.uur_prevundo here, but we probably
+ * don't want to do that while holding the buffer lock.
+ *
+ * We need an equivalent of HeapTupleSatisfiesUpdate. Some notes on
+ * that:
+ *
+ * If the undo record to which uur.uur_prevundo points doesn't exist any
+ * more or if the XID/CID of the returned record are visible to our
+ * snapshot, then TM_Ok. If the XID is our XID and the tuple isn't
+ * visible because the CID is too new, then TM_SelfModified (unless we
+ * couldn't see the old version of the row either, in which case
+ * TM_Invisible). Otherwise, if the XID is still in progress, then
+ * TM_BeingModified (unless we couldn't see the old version of the row
+ * either, in which case TM_Invisible). If it's aborted, then do
+ * page-at-a-time undo and try again.
+ *
+ * Otherwise, it's committed but not visible to our snapshot. In that
+ * case, the answer must be TM_Invisible if the operation was an insert
+ * (and we should throw an error instead of returning TM_Invisible to the
+ * caller), TM_Updated if it was an update, or TM_Deleted if it was a
+ * delete.
+ *
+ * The discussion above only considers insert, update, and delete. If
+ * there were locks, we'd need to return TM_BeingModified if any existing
+ * lock taken by an XID other than our own conflicts with the lock
+ * required by the current operation.
+ */
+
+ return result;
+}
+
+/*
+ * robert_tuple_delete
+ *
+ * Delete a single tuple.
+ */
+TM_Result
+robert_tuple_delete(Relation rel, ItemPointer tid,
+ CommandId cid, Snapshot snapshot, Snapshot crosscheck,
+ bool wait, TM_FailureData *tmfd, bool changingPart)
+{
+ Buffer buffer;
+ ItemId iid;
+ UndoRecordInsertContext context = {{0}};
+ UnpackedUndoRecord undorecord;
+ FullTransactionId fxid;
+ UndoRecPtr undoptr;
+ UndoRecPtr checked_undoptr = InvalidUndoRecPtr;
+ RobertTupleHeader td;
+ Page page;
+
+ /*
+ * Prepare undo record contents, except for the previous block pointer,
+ * which we can't finalize without taking the buffer lock.
+ */
+ fxid = GetTopFullTransactionId();
+ undorecord.uur_rmid = RM_ROBERT_ID;
+ undorecord.uur_type = ROBERT_UNDO_DELETE;
+ undorecord.uur_info = 0;
+ undorecord.uur_reloid = RelationGetRelid(rel);
+ undorecord.uur_cid = GetCurrentCommandId(true);
+ undorecord.uur_fork = MAIN_FORKNUM;
+ undorecord.uur_prevundo = InvalidUndoRecPtr;
+ undorecord.uur_block = ItemPointerGetBlockNumber(tid);
+ undorecord.uur_offset = ItemPointerGetOffsetNumber(tid);
+ undorecord.uur_fxid = fxid;
+ undorecord.uur_payload.len = 0;
+ undorecord.uur_tuple.len = 0;
+ undorecord.uur_txn = NULL;
+ undorecord.uur_logswitch = NULL;
+
+ /* Pin the target buffer. */
+ buffer = ReadBuffer(rel, undorecord.uur_block);
+ page = BufferGetPage(buffer);
+
+ while (1)
+ {
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * Prepare undo chain.
+ *
+ * The tuple's old undo pointer needs to be stored in the undo record
+ * we're about to insert, and the undo pointer we're about to insert
+ * will need to be stored into the page.
+ */
+ iid = PageGetItemId((PageHeader) page, undorecord.uur_offset);
+ td = (RobertTupleHeader) PageGetItem(page, iid);
+ memcpy(&undorecord.uur_prevundo, &td->r_undoptr, sizeof(UndoRecPtr));
+
+ /*
+ * Before actually performing the deletion, we must check whether it's
+ * really OK: somebody else could have updated the tuple and, if so,
+ * the version visible to our snapshot might no longer be the latest
+ * version.
+ *
+ * XXX. This code doesn't know anything about tuple locks, and if there
+ * are any of those, we need to wait for them, too.
+ */
+ if (undorecord.uur_prevundo != checked_undoptr &&
+ !UndoRecPtrIsDiscarded(undorecord.uur_prevundo))
+ {
+ TM_Result result;
+
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ result = robert_check_modify(undorecord.uur_prevundo, snapshot,
+ cid);
+ if (result != TM_Ok)
+ {
+ ReleaseBuffer(buffer);
+ return result;
+ }
+ checked_undoptr = undorecord.uur_prevundo;
+ continue;
+ }
+
+ /* Looks OK, so proceed with deletion. */
+ break;
+ }
+
+ /*
+ * Before entering the critical section, prepare for 1 undo insertion
+ * and stage the record to be inserted.
+ */
+ BeginUndoRecordInsert(&context, UndoLogCategoryForRelation(rel), 1, NULL);
+ undoptr = PrepareUndoInsert(&context, &undorecord, MyDatabaseId);
+
+ /*
+ * Perform the actual delete.
+ *
+ * Note that we use ItemIdMarkDead here, not ItemIdSetDead. The storage
+ * has to remain, because the tuple is still visible to concurrent
+ * transactions.
+ */
+ START_CRIT_SECTION();
+ ItemIdMarkDead(iid);
+ MarkBufferDirty(buffer);
+ memcpy(&td->r_undoptr, &undoptr, sizeof(UndoRecPtr));
+ InsertPreparedUndo(&context);
+ END_CRIT_SECTION();
+
+ /* All done. */
+ FinishUndoRecordInsert(&context);
+ UnlockReleaseBuffer(buffer);
+ return TM_Ok;
+}
+
+TM_Result
+robert_tuple_update(Relation rel, ItemPointer otid,
+ TupleTableSlot *slot, CommandId cid, Snapshot snapshot,
+ Snapshot crosscheck, bool wait, TM_FailureData *tmfd,
+ LockTupleMode *lockmode, bool *update_indexes)
+{
+ return TM_Ok;
+}
+
+TM_Result
+robert_tuple_lock(Relation rel, ItemPointer tid,
+ Snapshot snapshot, TupleTableSlot *slot, CommandId cid,
+ LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags,
+ TM_FailureData *tmfd)
+{
+ return TM_Ok;
+}
+
+void
+robert_finish_bulk_insert(Relation rel, int options)
+{
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * robert_page.c
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/robert_page.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/robert_page.h"
+#include "access/robert_tuple.h"
+
+/*
+ * robert_page_add_item
+ *
+ * Insert a tuple into a page.
+ */
+void
+robert_page_add_item(Page page, OffsetNumber offnum, RobertTuple tuple)
+{
+ Item item = (Item) tuple->r_data;
+ Size size = tuple->r_len;
+ PageHeader phdr = (PageHeader) page;
+ ItemId iid = PageGetItemId(phdr, offnum);
+ OffsetNumber maxoff = PageGetMaxOffsetNumber(page) + 1;
+ Size lower = phdr->pd_lower;
+ Size upper = (Size) phdr->pd_upper - size;
+
+ Assert(offnum <= maxoff);
+
+ if (offnum == maxoff)
+ lower += sizeof(ItemIdData);
+
+ Assert(lower <= upper);
+
+ memcpy((char *) page + upper, item, size);
+ phdr->pd_lower = lower;
+ phdr->pd_upper = upper;
+ ItemIdSetNormal(iid, upper, size);
+}
+
+/*
+ * robert_page_free_offset
+ *
+ * Returns an offset at which a tuple of a given size can be inserted into the
+ * given page. If there are no unused line pointers and no more can be added,
+ * or if the item isn't going to fit on the page, returns InvalidOffsetNumber.
+ */
+OffsetNumber
+robert_page_free_offset(Page page, Size size)
+{
+ PageHeader phdr = (PageHeader) page;
+ OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
+ OffsetNumber offnum;
+ Size avail = phdr->pd_upper - phdr->pd_lower;
+
+ if (unlikely(phdr->pd_upper > BLCKSZ || phdr->pd_lower > phdr->pd_upper))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("corrupted page header")));
+
+ if (avail < size)
+ return InvalidOffsetNumber;
+
+ for (offnum = FirstOffsetNumber; offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId iid = PageGetItemId(phdr, offnum);
+
+ if (!ItemIdIsUsed(iid))
+ return offnum;
+ }
+
+ /*
+ * XXX. It's just plain crummy that we have to use a heap-specific
+ * constant for this, but there is plenty of code that thinks this is a
+ * universal limit rather than a heap-specific one.
+ */
+ if (avail < sizeof(ItemIdData) + size || offnum >= MaxHeapTuplesPerPage)
+ return InvalidOffsetNumber;
+
+ return offnum;
+}
--- /dev/null
+/*
+ * robert_scan.c
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/robert_scan.h"
+#include "access/robert_slot.h"
+#include "access/tableam.h"
+#include "access/skey.h"
+#include "access/undoaccess.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "storage/predicate.h"
+
+typedef struct IndexFetchRobertData
+{
+ IndexFetchTableData xs_base;
+ Buffer xs_cbuf; /* current buffer, if any */
+} IndexFetchRobertData;
+
+static void robert_scan_initialize(RobertScanDesc scan, ScanKey key,
+ bool keep_startblock);
+static void robert_scan_getnextblock(RobertScanDesc scan,
+ ScanDirection direction,
+ BlockNumber blkno);
+static bool robert_scan_getnexttuple(RobertScanDesc scan,
+ TupleTableSlot *slot);
+
+/*
+ * Prepare to scan a robert table.
+ */
+TableScanDesc
+robert_scan_begin(Relation rel, Snapshot snapshot,
+ int nkeys, ScanKeyData *key,
+ ParallelTableScanDesc pscan, uint32 flags)
+{
+ RobertScanDesc scan;
+
+ /* Out of sheer paranoia, hold a ref count while scanning relation. */
+ RelationIncrementReferenceCount(rel);
+
+ /* Allocate and initialize scan descriptor. */
+ scan = palloc(sizeof(RobertScanDescData));
+ scan->rrs_base.rs_rd = rel;
+ scan->rrs_base.rs_snapshot = snapshot;
+ scan->rrs_base.rs_nkeys = nkeys;
+ if (key == NULL)
+ scan->rrs_base.rs_key = NULL;
+ else
+ {
+ scan->rrs_base.rs_key = palloc(nkeys * sizeof(ScanKeyData));
+ memcpy(scan->rrs_base.rs_key, key, nkeys * sizeof(ScanKeyData));
+ }
+ scan->rrs_base.rs_parallel = pscan;
+ scan->rrs_base.rs_flags = flags;
+ scan->rrs_cpage = palloc(BLCKSZ);
+ scan->rrs_strategy = NULL;
+ robert_scan_initialize(scan, key, false);
+
+ /* For sequential and sample scans, just predicate-lock the whole thing. */
+ if ((scan->rrs_base.rs_flags & (SO_TYPE_SEQSCAN | SO_TYPE_SAMPLESCAN)) != 0)
+ {
+ Assert(snapshot);
+ PredicateLockRelation(rel, snapshot);
+ }
+
+ return &scan->rrs_base;
+}
+
+/*
+ * Clean up after completing a scan of a robert table.
+ */
+void
+robert_scan_end(TableScanDesc sscan)
+{
+ RobertScanDesc scan = (RobertScanDesc) sscan;
+
+ RelationDecrementReferenceCount(scan->rrs_base.rs_rd);
+
+ if (scan->rrs_base.rs_key)
+ pfree(scan->rrs_base.rs_key);
+ if (scan->rrs_base.rs_flags & SO_TEMP_SNAPSHOT)
+ UnregisterSnapshot(scan->rrs_base.rs_snapshot);
+ pfree(scan->rrs_cpage);
+ pfree(scan);
+}
+
+void
+robert_scan_rescan(TableScanDesc sscan, ScanKeyData *key,
+ bool set_params, bool allow_strat,
+ bool allow_sync, bool allow_pagemode)
+{
+}
+
+/*
+ * robert_scan_initialize
+ *
+ * Perform common initialization required by both robert_scan_begin and
+ * robert_scan_rescan.
+ *
+ * XXX. An awful lot of this logic duplicates initscan(). Instead of copying
+ * the code, we should try to have common code that can be used by any
+ * block-based table AM.
+ */
+static void
+robert_scan_initialize(RobertScanDesc scan, ScanKey key, bool keep_startblock)
+{
+ ParallelBlockTableScanDesc bpscan = NULL;
+ bool allow_strat;
+ bool allow_sync;
+
+ /*
+ * Determine the number of blocks we need to scan.
+ *
+ * If the relation is extended, the new tuples won't be visible to this
+ * scan anyway (except for non-MVCC scans, which are unstable anyway).
+ */
+ if (scan->rrs_base.rs_parallel != NULL)
+ {
+ bpscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel;
+ scan->rrs_nblocks = bpscan->phs_nblocks;
+ }
+ else
+ scan->rrs_nblocks = RelationGetNumberOfBlocks(scan->rrs_base.rs_rd);
+
+ /*
+ * This algorithm may not be for the best, but it is the one used by the
+ * 'heap' table AM, so we'll stick with it for the sake of tradition.
+ */
+ if (!RelationUsesLocalBuffers(scan->rrs_base.rs_rd) &&
+ scan->rrs_nblocks > NBuffers / 4)
+ {
+ allow_strat = (scan->rrs_base.rs_flags & SO_ALLOW_STRAT) != 0;
+ allow_sync = (scan->rrs_base.rs_flags & SO_ALLOW_SYNC) != 0;
+ }
+ else
+ allow_strat = allow_sync = false;
+
+ if (allow_strat)
+ {
+ /* During a rescan, keep the previous strategy object. */
+ if (scan->rrs_strategy == NULL)
+ scan->rrs_strategy = GetAccessStrategy(BAS_BULKREAD);
+ }
+ else
+ {
+ if (scan->rrs_strategy != NULL)
+ FreeAccessStrategy(scan->rrs_strategy);
+ scan->rrs_strategy = NULL;
+ }
+
+ if (scan->rrs_base.rs_parallel != NULL)
+ {
+ /* For parallel scan, believe whatever ParallelTableScanDesc says. */
+ if (scan->rrs_base.rs_parallel->phs_syncscan)
+ scan->rrs_base.rs_flags |= SO_ALLOW_SYNC;
+ else
+ scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+ }
+ else if (keep_startblock)
+ {
+ /*
+ * When rescanning, we want to keep the previous startblock setting,
+ * so that rewinding a cursor doesn't generate surprising results.
+ * Reset the active syncscan setting, though.
+ */
+ if (allow_sync && synchronize_seqscans)
+ scan->rrs_base.rs_flags |= SO_ALLOW_SYNC;
+ else
+ scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+ }
+ else if (allow_sync && synchronize_seqscans)
+ {
+ scan->rrs_base.rs_flags |= SO_ALLOW_SYNC;
+ /* XXX ss_get_location is in heapam.h */
+ scan->rrs_startblock =
+ ss_get_location(scan->rrs_base.rs_rd, scan->rrs_nblocks);
+ }
+ else
+ {
+ scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+ scan->rrs_startblock = 0;
+ }
+
+ scan->rrs_numblocks = InvalidBlockNumber;
+ scan->rrs_state = ROBERT_SCAN_NOT_STARTED;
+ scan->rrs_cblock = InvalidBlockNumber;
+}
+
+/*
+ * robert_scan_getnextslot
+ *
+ * Get the next tuple from a sequential scan and store it into the given slot.
+ *
+ * XXX. The name of this method does not make it abundantly clear that it only
+ * applies to sequential scans, but an examination of the heap code shows that
+ * to be the case.
+ */
+bool
+robert_scan_getnextslot(TableScanDesc sscan,
+ ScanDirection direction,
+ TupleTableSlot *slot)
+{
+ RobertScanDesc scan = (RobertScanDesc) sscan;
+
+ /* Initialize the scan if that's not yet done. */
+ if (scan->rrs_state == ROBERT_SCAN_NOT_STARTED)
+ {
+ BlockNumber blkno;
+
+ /* If there's nothing to scan, give up immediately. */
+ if (scan->rrs_nblocks == 0 || scan->rrs_numblocks == 0)
+ {
+ ExecClearTuple(slot);
+ return false;
+ }
+
+ /* Figure out which block to read first. */
+ if (ScanDirectionIsBackward(direction))
+ {
+ /* don't report syncscans when scanning backwards */
+ scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+
+ /* start from last page of the scan */
+ if (scan->rrs_startblock > 0)
+ blkno = scan->rrs_startblock - 1;
+ else
+ blkno = scan->rrs_nblocks - 1;
+ }
+ else if (ScanDirectionIsNoMovement(direction))
+ {
+ /* no prior tuple, so refetch yields nothing */
+ ExecClearTuple(slot);
+ return false;
+ }
+ else if (scan->rrs_base.rs_parallel != NULL)
+ {
+ ParallelBlockTableScanDesc pbscan;
+
+ pbscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel;
+ table_block_parallelscan_startblock_init(scan->rrs_base.rs_rd,
+ pbscan);
+ blkno = table_block_parallelscan_nextpage(scan->rrs_base.rs_rd,
+ pbscan);
+ if (blkno == InvalidBlockNumber)
+ {
+ /* other participants already finished scan */
+ ExecClearTuple(slot);
+ return false;
+ }
+ }
+ else
+ blkno = scan->rrs_startblock;
+
+ /* Read the chosen block. */
+ robert_scan_getnextblock(scan, direction, blkno);
+ }
+
+ while (1)
+ {
+ if (scan->rrs_state == ROBERT_SCAN_TUPLE_DONE)
+ {
+ /* Advance to next tuple. */
+ if (ScanDirectionIsBackward(direction))
+ {
+ if (scan->rrs_coffset == FirstOffsetNumber)
+ scan->rrs_state = ROBERT_SCAN_BLOCK_DONE;
+ else
+ {
+ scan->rrs_coffset = OffsetNumberPrev(scan->rrs_coffset);
+ scan->rrs_state = ROBERT_SCAN_READY;
+ }
+ }
+ else if (ScanDirectionIsForward(direction))
+ {
+ if (scan->rrs_coffset == scan->rrs_lastoffset)
+ scan->rrs_state = ROBERT_SCAN_BLOCK_DONE;
+ else
+ {
+ scan->rrs_coffset = OffsetNumberNext(scan->rrs_coffset);
+ scan->rrs_state = ROBERT_SCAN_READY;
+ }
+ }
+ else
+ {
+ Assert(ScanDirectionIsNoMovement(direction));
+ /* nothing to do */
+ }
+ }
+ else if (scan->rrs_state == ROBERT_SCAN_BLOCK_DONE)
+ {
+ bool finished;
+ BlockNumber blkno;
+
+ /* Select new block. */
+ if (ScanDirectionIsBackward(direction))
+ {
+ finished = (scan->rrs_cblock == scan->rrs_startblock) ||
+ (scan->rrs_numblocks != InvalidBlockNumber ?
+ --scan->rrs_numblocks == 0 : false);
+ blkno = (scan->rrs_cblock == 0) ? scan->rrs_nblocks :
+ scan->rrs_cblock - 1;
+ }
+ else if (scan->rrs_base.rs_parallel != NULL)
+ {
+ ParallelBlockTableScanDesc pbscan;
+
+ pbscan = (ParallelBlockTableScanDesc)
+ scan->rrs_base.rs_parallel;
+ blkno = table_block_parallelscan_nextpage(scan->rrs_base.rs_rd,
+ pbscan);
+ finished = (blkno == InvalidBlockNumber);
+ }
+ else
+ {
+ blkno = scan->rrs_cblock + 1;
+ if (blkno >= scan->rrs_nblocks)
+ blkno = 0;
+ finished = (blkno == scan->rrs_startblock) ||
+ (scan->rrs_numblocks != InvalidBlockNumber ?
+ --scan->rrs_numblocks == 0 : false);
+ if (scan->rrs_base.rs_flags & SO_ALLOW_SYNC)
+ ss_report_location(scan->rrs_base.rs_rd, blkno);
+ }
+
+ /* Scan is done if no blocks remain. */
+ if (finished)
+ {
+ scan->rrs_cblock = InvalidBlockNumber;
+ scan->rrs_state = ROBERT_SCAN_NOT_STARTED;
+ ExecClearTuple(slot);
+ return false;
+ }
+
+ /* Read the chosen block. */
+ robert_scan_getnextblock(scan, direction, blkno);
+ }
+ else
+ {
+ Assert(scan->rrs_state == ROBERT_SCAN_READY);
+ scan->rrs_state = ROBERT_SCAN_TUPLE_DONE;
+
+ if (!robert_scan_getnexttuple(scan, slot))
+ continue;
+ pgstat_count_heap_getnext(scan->rrs_base.rs_rd);
+ return true;
+ }
+ }
+}
+
+/*
+ * robert_scan_getnextblock
+ *
+ * Read the indicated block of the relation and update the scan state
+ * accordingly.
+ */
+static void
+robert_scan_getnextblock(RobertScanDesc scan, ScanDirection direction,
+ BlockNumber blkno)
+{
+ Buffer buffer;
+
+ /* Copy the new page and remember the page number. */
+ buffer = ReadBufferExtended(scan->rrs_base.rs_rd, MAIN_FORKNUM, blkno,
+ RBM_NORMAL, scan->rrs_strategy);
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ memcpy(scan->rrs_cpage, BufferGetPage(buffer), BLCKSZ);
+ UnlockReleaseBuffer(buffer);
+ scan->rrs_cblock = blkno;
+
+ /* Determine how many tuples there may be on the page. */
+ scan->rrs_lastoffset = PageGetMaxOffsetNumber(scan->rrs_cpage);
+
+ /* Determine starting scan position. */
+ if (ScanDirectionIsBackward(direction))
+ scan->rrs_coffset = scan->rrs_lastoffset;
+ else
+ scan->rrs_coffset = FirstOffsetNumber;
+
+ /* If no tuples, tell caller to request the next block. */
+ if (scan->rrs_lastoffset == 0)
+ scan->rrs_state = ROBERT_SCAN_BLOCK_DONE;
+ else
+ scan->rrs_state = ROBERT_SCAN_READY;
+}
+
+/*
+ * robert_scan_getnexttuple
+ *
+ * The scan contains a page (scan->rrs_cpage) and identifies a particular
+ * tuple of interest (scan->rrs_coffset). Extract that tuple and store it
+ * into the given slot.
+ */
+static bool
+robert_scan_getnexttuple(RobertScanDesc scan, TupleTableSlot *slot)
+{
+ ItemId iid = PageGetItemId(scan->rrs_cpage, scan->rrs_coffset);
+ RobertTupleHeader td;
+ Size len;
+
+ /* If the item has no storage, it is definitely not visible. */
+ if (!ItemIdHasStorage(iid))
+ return false;
+
+ /* Extract tuple pointer and length. */
+ td = (RobertTupleHeader) PageGetItem(scan->rrs_cpage, iid);
+ len = ItemIdGetLength(iid);
+
+ /* Return visible version of tuple. */
+ return robert_slot_store_visible(slot, td, len, slot->tts_tableOid,
+ scan->rrs_cblock, scan->rrs_coffset,
+ scan->rrs_base.rs_snapshot,
+ ItemIdIsDead(iid));
+}
+
+/*
+ * robert_scan_get_blocks_done
+ *
+ * Return the number of blocks that have been read by this scan since
+ * starting. For a non-parallel scan, this should be completely accurate.
+ * For a parallel scan, it will discount the effects of other backends.
+ */
+BlockNumber
+robert_scan_get_blocks_done(RobertScanDesc scan)
+{
+ ParallelBlockTableScanDesc bpscan = NULL;
+ BlockNumber startblock;
+ BlockNumber blocks_done;
+
+ if (scan->rrs_base.rs_parallel != NULL)
+ {
+ bpscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel;
+ startblock = bpscan->phs_startblock;
+ }
+ else
+ startblock = scan->rrs_startblock;
+
+ /*
+ * Might have wrapped around the end of the relation, if startblock was
+ * not zero.
+ */
+ if (scan->rrs_cblock > startblock)
+ blocks_done = scan->rrs_cblock - startblock;
+ else
+ {
+ BlockNumber nblocks;
+
+ nblocks = bpscan != NULL ? bpscan->phs_nblocks : scan->rrs_nblocks;
+ blocks_done = nblocks - startblock + scan->rrs_cblock;
+ }
+
+ /* If we're also done with the current block, add one. */
+ if (scan->rrs_state == ROBERT_SCAN_BLOCK_DONE)
+ blocks_done++;
+
+ return blocks_done;
+}
+
+/*
+ * robert_index_fetch_begin
+ *
+ * Prepare for index fetches by allocating a new IndexFetchRobertData.
+ */
+extern IndexFetchTableData *
+robert_index_fetch_begin(Relation rel)
+{
+ IndexFetchRobertData *scan = palloc0(sizeof(IndexFetchRobertData));
+
+ scan->xs_base.rel = rel;
+ scan->xs_cbuf = InvalidBuffer;
+
+ return &scan->xs_base;
+}
+
+/*
+ * robert_index_fetch_reset
+ *
+ * Release any buffer pin held from a previous index fetch.
+ */
+void
+robert_index_fetch_reset(IndexFetchTableData *data)
+{
+ IndexFetchRobertData *scan = (IndexFetchRobertData *) data;
+
+ if (BufferIsValid(scan->xs_cbuf))
+ {
+ ReleaseBuffer(scan->xs_cbuf);
+ scan->xs_cbuf = InvalidBuffer;
+ }
+}
+
+/*
+ * robert_index_fetch_end
+ *
+ * Clean up when finished with index fetches.
+ */
+void
+robert_index_fetch_end(IndexFetchTableData *data)
+{
+ robert_index_fetch_reset(data);
+ pfree(data);
+}
+
+/*
+ * robert_index_fetch_tuple
+ *
+ * XXX. This function is missing MVCC handling.
+ */
+bool
+robert_index_fetch_tuple(IndexFetchTableData *data,
+ ItemPointer tid, Snapshot snapshot,
+ TupleTableSlot *slot,
+ bool *call_again, bool *all_dead)
+{
+ IndexFetchRobertData *scan = (IndexFetchRobertData *) data;
+ Oid reloid = RelationGetRelid(scan->xs_base.rel);
+ BlockNumber blkno = ItemPointerGetBlockNumber(tid);
+ OffsetNumber offnum = ItemPointerGetOffsetNumber(tid);
+ Page page;
+ bool result = false;
+
+ scan->xs_cbuf =
+ ReleaseAndReadBuffer(scan->xs_cbuf, scan->xs_base.rel, blkno);
+ *call_again = false;
+ page = BufferGetPage(scan->xs_cbuf);
+
+ LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
+ if (offnum >= FirstOffsetNumber && offnum < PageGetMaxOffsetNumber(page))
+ {
+ ItemId iid = PageGetItemId(page, offnum);
+
+ if (ItemIdHasStorage(iid))
+ {
+ RobertTupleHeader td = (RobertTupleHeader) PageGetItem(page, iid);
+ Size len = ItemIdGetLength(iid);
+
+ robert_slot_store(slot, td, len, reloid, blkno, offnum);
+ result = true;
+ }
+ }
+ LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
+
+ return result;
+}
+
+/*
+ * robert_scan_bitmap_next_block
+ *
+ * Just as in a sequential scan, we choose to copy the entire page. This is
+ * a more dubious strategy here, because it could turn out to be inefficient
+ * if we're only fetching a single tuple from the page. However, we don't
+ * want to do visibility checks while holding the buffer lock, since that
+ * might involve visiting an arbitrarily large number of undo buffers, so it's
+ * not clear how to do better.
+ *
+ * XXX. It's possible that it would pay to do the visibility checks here
+ * rather than leaving all that work until robert_scan_bitmap_next_tuple
+ * is called; it would likely improve instruction cache locality. It would
+ * also be more complex, so for right now we don't.
+ */
+bool
+robert_scan_bitmap_next_block(TableScanDesc sscan, TBMIterateResult *tbmres)
+{
+ RobertScanDesc scan = (RobertScanDesc) sscan;
+ BlockNumber blkno = tbmres->blockno;
+
+ /*
+ * Ignore any entries past our notion of where the relation ends; it may
+ * have been extended, but any new entries won't be visible to us.
+ */
+ if (blkno >= scan->rrs_nblocks)
+ return false;
+
+ /* Read the new block. */
+ robert_scan_getnextblock(scan, NoMovementScanDirection, blkno);
+
+ /* Reset the tuple index; will be ignored if page is lossy. */
+ scan->rrs_tupindex = 0;
+
+ /* False if block has no tuples; otherwise, true. */
+ return (scan->rrs_state != ROBERT_SCAN_BLOCK_DONE);
+}
+
+/*
+ * robert_scan_bitmap_next_tuple
+ */
+bool
+robert_scan_bitmap_next_tuple(TableScanDesc sscan,
+ TBMIterateResult *tbmres,
+ TupleTableSlot *slot)
+{
+ RobertScanDesc scan = (RobertScanDesc) sscan;
+
+ if (tbmres->ntuples == 0)
+ {
+ /* Page is lossy; just try the next offset. */
+ scan->rrs_coffset = OffsetNumberNext(scan->rrs_coffset);
+ if (scan->rrs_coffset == scan->rrs_lastoffset)
+ return false;
+ }
+ else
+ {
+ /* Page is exact; try next indicated offset. */
+ if (scan->rrs_tupindex >= tbmres->ntuples)
+ return false;
+ scan->rrs_coffset = tbmres->offsets[scan->rrs_tupindex];
+ scan->rrs_tupindex++;
+ }
+
+ robert_scan_getnexttuple(scan, slot);
+ pgstat_count_heap_fetch(scan->rrs_base.rs_rd);
+ return true;
+}
+
+bool
+robert_scan_sample_next_block(TableScanDesc scan,
+ SampleScanState *scanstate)
+{
+ elog(NOTICE, "robert_scan_sample_next_block");
+ return false;
+}
+
+bool
+robert_scan_sample_next_tuple(TableScanDesc scan,
+ SampleScanState *scanstate,
+ TupleTableSlot *slot)
+{
+ elog(NOTICE, "robert_scan_sample_next_tuple");
+ return false;
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * robert_slot.c
+ * Slots for storing Robert tuples. To facilitate inplace updates, we
+ * always copy the tuple rather than pointing to the original buffer,
+ * so this is like HeapTupleTableSlot, not BufferHeapTupleTableSlot,
+ * but with changes because of our differente on-disk tuple format.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/robert/robert_slot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/robert_slot.h"
+#include "access/robert_xlog.h"
+#include "access/undoaccess.h"
+#include "access/undorecord.h"
+#include "access/xact.h"
+#include "storage/procarray.h"
+#include "utils/snapmgr.h"
+
+extern PGDLLIMPORT const TupleTableSlotOps TTSOpsRobert;
+
+static void tts_robert_init(TupleTableSlot *slot);
+static void tts_robert_release(TupleTableSlot *slot);
+static void tts_robert_clear(TupleTableSlot *slot);
+static void tts_robert_getsomeattrs(TupleTableSlot *slot, int natts);
+static Datum tts_robert_getsysattr(TupleTableSlot *slot, int attnum,
+ bool *isnull);
+static void tts_robert_materialize(TupleTableSlot *slot);
+static void tts_robert_copyslot(TupleTableSlot *dstslot,
+ TupleTableSlot *srcslot);
+static HeapTuple tts_robert_copy_heap_tuple(TupleTableSlot *slot);
+static MinimalTuple tts_robert_copy_minimal_tuple(TupleTableSlot *slot);
+
+static bool robert_snapshot_test(Snapshot snapshot, FullTransactionId fxid,
+ CommandId cid, bool xmin_test, uint32 specToken);
+
+const TupleTableSlotOps TTSOpsRobert = {
+ .base_slot_size = sizeof(RobertTupleTableSlot),
+ .init = tts_robert_init,
+ .release = tts_robert_release,
+ .clear = tts_robert_clear,
+ .getsomeattrs = tts_robert_getsomeattrs,
+ .getsysattr = tts_robert_getsysattr,
+ .materialize = tts_robert_materialize,
+ .copyslot = tts_robert_copyslot,
+ .get_heap_tuple = NULL,
+ .get_minimal_tuple = NULL,
+ .copy_heap_tuple = tts_robert_copy_heap_tuple,
+ .copy_minimal_tuple = tts_robert_copy_minimal_tuple
+};
+
+/*
+ * robert_slot_callbacks
+ *
+ * We only use one kind of slot, so this is very simple.
+ */
+const TupleTableSlotOps *
+robert_slot_callbacks(Relation relation)
+{
+ return &TTSOpsRobert;
+}
+
+/*
+ * robert_slot_store
+ *
+ * Create a new robert tuple and store it in a robert slot.
+ *
+ * XXX. It would be nice to optimize this by reusing the previously-allocated
+ * chunk of memory, if there is one and if it's big enough. With the current
+ * design, we'll often free a chunk and then allocate a new chunk of about the
+ * same size. Note that if we did this, tts_robert_release might need to
+ * free any chunk that might be left lying around by tts_robert_clear.
+ */
+void
+robert_slot_store(TupleTableSlot *slot, RobertTupleHeader td,
+ Size len, Oid tableOid, BlockNumber blkno,
+ OffsetNumber offset)
+{
+ RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+ RobertTuple tuple;
+
+ Assert(slot->tts_ops == &TTSOpsRobert);
+ tts_robert_clear(slot);
+
+ tuple = MemoryContextAlloc(slot->tts_mcxt, ROBERTTUPLESIZE + len);
+ tuple->r_len = len;
+ tuple->r_data = (RobertTupleHeader) ((char *) tuple + ROBERTTUPLESIZE);
+ tuple->r_tableOid = tableOid;
+ ItemPointerSet(&tuple->r_self, blkno, offset);
+ memcpy(tuple->r_data, td, len);
+
+ rslot->tuple = tuple;
+ ItemPointerSet(&slot->tts_tid, blkno, offset);
+ rslot->off = td->r_hoff;
+ slot->tts_flags &= ~TTS_FLAG_EMPTY;
+ slot->tts_flags |= TTS_FLAG_SHOULDFREE;
+}
+
+/*
+ * robert_slot_store_visible
+ *
+ * Figure out which version of a tuple is visible and store that version
+ * into a robert slot; then, return true. If no version is visible,
+ * return false.
+ */
+bool
+robert_slot_store_visible(TupleTableSlot *slot, RobertTupleHeader td,
+ Size len, Oid tableOid, BlockNumber blkno,
+ OffsetNumber offset, Snapshot snapshot,
+ bool item_is_dead)
+{
+ UndoRecPtr current_recptr;
+ UnpackedUndoRecord *current = NULL;
+ UnpackedUndoRecord *previous = NULL;
+ bool result = true;
+ UndoRecordFetchContext context;
+
+ /* It's not necessarily aligned, so we use memcpy. */
+ memcpy(¤t_recptr, &td->r_undoptr, sizeof(UndoRecPtr));
+
+ /*
+ * Loop until we find the oldest undo record whose effects are not
+ * visible to us.
+ */
+ BeginUndoFetch(&context);
+ while (current_recptr != InvalidUndoRecPtr)
+ {
+ current = UndoFetchRecord(&context, current_recptr);
+
+ /*
+ * XXX. If there's a specToken, we should be passing that here instead
+ * of passing 0.
+ */
+ if (current == NULL ||
+ robert_snapshot_test(snapshot, current->uur_fxid, current->uur_cid,
+ true, 0))
+ break;
+ current_recptr = current->uur_prevundo;
+ if (previous != NULL)
+ UndoRecordRelease(previous);
+ previous = current;
+ current = NULL;
+ }
+ FinishUndoFetch(&context);
+
+ /*
+ * Release current record, if any. This is the newest undo record whose
+ * effects are visible to our snapshot; we don't need it for anything.
+ */
+ if (current != NULL)
+ {
+ UndoRecordRelease(current);
+ current = NULL;
+ }
+
+ /* Core visibility logic. */
+ /* XXX. This probably ought to store some XID information in the slot. */
+ if (previous == NULL)
+ {
+ /* No undo records, or all changes visible to us. */
+ if (item_is_dead)
+ result = false;
+ else
+ robert_slot_store(slot, td, len, tableOid, blkno, offset);
+ }
+ else
+ {
+ switch (previous->uur_type)
+ {
+ case ROBERT_UNDO_INSERT:
+ /* Insert is not visible. Can't see tuple. */
+ result = false;
+ break;
+ case ROBERT_UNDO_DELETE: /* XXX and non-in-place update */
+ /* Delete is not visible. Can see current tuple. */
+ robert_slot_store(slot, td, len, tableOid, blkno, offset);
+ break;
+#if 0
+ case ROBERT_UNDO_UPDATE_INPLACE:
+ /*
+ * In-place update is not visible. Can see tuple from undo
+ * record.
+ */
+ /* XXX */
+ robert_slot_store(slot,
+ record->uur_tuple.data,
+ record->uur_tuple.len,
+ tableOid, blkno, offset);
+#endif
+ }
+
+ /* Done with previous record. */
+ UndoRecordRelease(previous);
+ previous = NULL;
+ }
+
+ /* If not visible, we are all done. */
+ if (!result)
+ return false;
+
+ /* XXX other stuff like serializability */
+
+ return true;
+}
+
+/*
+ * tts_robert_init
+ *
+ * No special initialization is required when creating a slot.
+ */
+static void
+tts_robert_init(TupleTableSlot *slot)
+{
+ /* nothing */
+}
+
+/*
+ * tts_robert_release
+ *
+ * No special cleanup is required when dropping a slot.
+ */
+static void
+tts_robert_release(TupleTableSlot *slot)
+{
+ /* nothing */
+}
+
+/*
+ * tts_robert_clear
+ */
+static void
+tts_robert_clear(TupleTableSlot *slot)
+{
+ RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+
+ /* Free memory for the tuple if appropriate. */
+ if (TTS_SHOULDFREE(slot))
+ {
+ pfree(rslot->tuple);
+ slot->tts_flags &= ~TTS_FLAG_SHOULDFREE;
+ }
+
+ slot->tts_nvalid = 0;
+ slot->tts_flags |= TTS_FLAG_EMPTY;
+ ItemPointerSetInvalid(&slot->tts_tid);
+ rslot->tuple = NULL;
+ rslot->off = 0;
+}
+
+/*
+ * tts_robert_getsomeattrs
+ */
+static void
+tts_robert_getsomeattrs(TupleTableSlot *slot, int natts)
+{
+ RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+
+ robert_deform_tuple(slot->tts_tupleDescriptor, rslot->tuple,
+ slot->tts_values, slot->tts_isnull, natts,
+ slot->tts_nvalid, &rslot->off);
+ slot->tts_nvalid = natts;
+}
+
+/*
+ * tts_robert_getsysattr
+ *
+ * slot_getsysattr handles tableoid and ctid, so this function need only
+ * handle requests for cmin, cmax, xmin, and xmax. In contrast to the heap,
+ * we store that information in the undo log, not the tuple.
+ */
+static Datum
+tts_robert_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
+{
+ /*
+ * XXX. It's necessary to do undo log lookups to get this information,
+ * and in some cases we may not have it at all. We should cache whatever
+ * we find out in the slot, so that if multiple attributes are requested
+ * (or the same ones are requested more than once?) we don't repeat the
+ * lookups.
+ */
+ return (Datum) 0;
+}
+
+/*
+ * tts_robert_materialize
+ *
+ * Make the contents of this slot independent of any external resources.
+ * The only thing we need to worry about is the possibility that entries in
+ * tts_values might point to data in some other memory context.
+ */
+static void
+tts_robert_materialize(TupleTableSlot *slot)
+{
+ RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+ MemoryContext oldcontext;
+
+ if (TTS_SHOULDFREE(slot))
+ return;
+
+ oldcontext = MemoryContextSwitchTo(slot->tts_mcxt);
+ rslot->tuple = robert_form_tuple(slot->tts_tupleDescriptor,
+ slot->tts_values,
+ slot->tts_isnull);
+ MemoryContextSwitchTo(oldcontext);
+
+ rslot->off = rslot->tuple->r_data->r_hoff;
+ slot->tts_flags |= TTS_FLAG_SHOULDFREE;
+ slot->tts_nvalid = 0;
+}
+
+/*
+ * tts_robert_copyslot
+ *
+ * It's not sufficient to just copy the tts_values and tts_isnull arrays
+ * from the source slot, because any pass-by-reference datums in the source
+ * slot will be stored in that slot's memory context, not ours. So, construct
+ * and store a tuple built from those arrays.
+ */
+static void
+tts_robert_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
+{
+ RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) dstslot;
+ MemoryContext oldcontext;
+
+ tts_robert_clear(dstslot);
+ slot_getallattrs(srcslot);
+
+ oldcontext = MemoryContextSwitchTo(dstslot->tts_mcxt);
+ rslot->tuple = robert_form_tuple(srcslot->tts_tupleDescriptor,
+ srcslot->tts_values,
+ srcslot->tts_isnull);
+ MemoryContextSwitchTo(oldcontext);
+
+ rslot->off = rslot->tuple->r_data->r_hoff;
+ dstslot->tts_flags = (dstslot->tts_flags | TTS_FLAG_SHOULDFREE)
+ & ~TTS_FLAG_EMPTY;
+}
+
+/*
+ * tts_robert_copy_heap_tuple
+ *
+ * Build a heap tuple representing the contents of this slot.
+ */
+static HeapTuple
+tts_robert_copy_heap_tuple(TupleTableSlot *slot)
+{
+ slot_getallattrs(slot);
+
+ return heap_form_tuple(slot->tts_tupleDescriptor,
+ slot->tts_values, slot->tts_isnull);
+}
+
+/*
+ * tts_robert_copy_minimal_tuple
+ *
+ * Build a minimal tuple representing the contents of this slot.
+ */
+static MinimalTuple
+tts_robert_copy_minimal_tuple(TupleTableSlot *slot)
+{
+ slot_getallattrs(slot);
+
+ return heap_form_minimal_tuple(slot->tts_tupleDescriptor,
+ slot->tts_values, slot->tts_isnull);
+}
+
+/*
+ * robert_snapshot_test
+ *
+ * Test whether the effects of a change made by a given XID/CID should be
+ * visible to a process using a certain snapshot.
+ *
+ * 'snapshot' is the snapshot to be used for the visibility test.
+ *
+ * 'xid' and 'cid' are the values for the transaction that made the change.
+ *
+ * 'xmin_test' is true if we are testing a tuple's xmin and false if we are
+ * testing 'xmax'. For many snapshot types this doesn't matter, but for
+ * SNAPSHOT_DIRTY and SNAPSHOT_NON_VACUUMABLE it does.
+ *
+ * 'specToken' is only used for SNAPSHOT_DIRTY.
+ *
+ * Perhaps this function should be given a more generic name and moved to
+ * a location where anyone could use it.
+ */
+static bool
+robert_snapshot_test(Snapshot snapshot, FullTransactionId fxid, CommandId cid,
+ bool xmin_test, uint32 specToken)
+{
+ TransactionId xid = XidFromFullTransactionId(fxid);
+
+ switch (snapshot->snapshot_type)
+ {
+ case SNAPSHOT_MVCC:
+ case SNAPSHOT_HISTORIC_MVCC:
+ /*
+ * For an MVCC snapshot, we should see effects of the current
+ * transaction unless they are from a newer command. We should
+ * see other transactions if they are committed and not in our
+ * MVCC snapshot.
+ */
+ if (TransactionIdIsCurrentTransactionId(xid))
+ {
+ if (snapshot->curcid <= cid)
+ return false;
+ }
+ else if (XidInMVCCSnapshot(xid, snapshot))
+ return false;
+ else if (!TransactionIdDidCommit(xid))
+ return false;
+ break;
+
+ case SNAPSHOT_SELF:
+ /*
+ * We should always see the effects of our own transaction, and
+ * we should see other transactions if they committed.
+ */
+ if (!TransactionIdIsCurrentTransactionId(xid))
+ {
+ if (TransactionIdIsInProgress(xid))
+ return false;
+ else if (!TransactionIdDidCommit(xid))
+ return false;
+ }
+ break;
+
+ case SNAPSHOT_ANY:
+ case SNAPSHOT_TOAST:
+ /*
+ * We should see everything.
+ */
+ break;
+
+ case SNAPSHOT_DIRTY:
+ /*
+ * We should see everything except for transactions that are
+ * aborted. However, if we see an in progress transaction, we
+ * need to stash the XID and possibly the specToken in the
+ * snapshot, due to the awful, hacky way that SnapshotDirty works.
+ */
+ if (!TransactionIdIsCurrentTransactionId(xid))
+ {
+ if (TransactionIdIsInProgress(xid))
+ {
+ if (xmin_test)
+ {
+ snapshot->xmin = xid;
+ snapshot->speculativeToken = specToken;
+ }
+ else
+ snapshot->xmax = xid;
+ }
+ else if (TransactionIdDidCommit(xid))
+ return false;
+ }
+ break;
+
+ case SNAPSHOT_NON_VACUUMABLE:
+ /*
+ * This type of snapshot has assymmetric rules for xmin and xmax.
+ * For xmin, we should see everything except for aborted
+ * transactions. For xmax, we should see only committed XIDs
+ * that precede snapshot->xmin.
+ */
+ if (xmin_test)
+ {
+ if (!TransactionIdIsCurrentTransactionId(xid) &&
+ !TransactionIdIsInProgress(xid) &&
+ !TransactionIdDidCommit(xid))
+ return false;
+ }
+ else
+ {
+ if (TransactionIdIsCurrentTransactionId(xid) ||
+ TransactionIdIsInProgress(xid) ||
+ !TransactionIdDidCommit(xid) ||
+ !TransactionIdPrecedes(xid, snapshot->xmin))
+ return false;
+ }
+ break;
+ }
+
+ return true;
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * robert_tuple.c
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/robert_tuple.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heaptoast.h"
+#include "access/robert_tuple.h"
+#include "access/toast_helper.h"
+#include "executor/tuptable.h"
+#include "utils/expandeddatum.h"
+
+static void robert_fill_padding(RobertTupleHeader td, char **data,
+ char typalign);
+static int robert_tuple_bitmap_size(TupleDesc tupleDesc, bool *isnull);
+
+/*
+ * robert_compute_data_size
+ *
+ * Compute the amount of space needed to store a robert tuple's data. The
+ * caller must pass the amount of space required for the tuple header and
+ * null bitmap via the 'hoff' argument; the returned value is the total space
+ * required to store the tuple.
+ */
+Size
+robert_compute_data_size(TupleDesc tupleDesc, Datum *values, bool *isnull,
+ uint8 hoff)
+{
+ Size data_length = hoff;
+ int i;
+ int numberOfAttributes = tupleDesc->natts;
+
+ Assert(numberOfAttributes <= MaxTupleAttributeNumber);
+
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ Datum datum;
+ Form_pg_attribute att;
+
+ if (isnull[i])
+ continue;
+
+ datum = values[i];
+ att = TupleDescAttr(tupleDesc, i);
+
+ if (att->attbyval)
+ {
+ /* we store attbyval attributes without alignment padding */
+ data_length += att->attlen;
+ }
+ else if (att->attlen == -1)
+ {
+ Pointer val = DatumGetPointer(datum);
+
+ if (att->attstorage != 'p' && VARATT_CAN_MAKE_SHORT(val))
+ data_length += VARATT_CONVERTED_SHORT_SIZE(val);
+ else if (VARATT_IS_EXTERNAL(val))
+ {
+ if (VARATT_IS_EXTERNAL_EXPANDED(val))
+ {
+ /*
+ * we want to flatten the expanded value so that the
+ * constructed tuple doesn't depend on it
+ */
+ data_length =
+ att_align_nominal(data_length, att->attalign);
+ data_length += EOH_get_flat_size(DatumGetEOHP(datum));
+ }
+ else
+ data_length += VARSIZE_EXTERNAL(datum);
+ }
+ else if (VARATT_IS_SHORT(val))
+ data_length += VARSIZE_SHORT(datum);
+ else
+ {
+ data_length = att_align_nominal(data_length, att->attalign);
+ data_length += VARSIZE(datum);
+ }
+ }
+ else
+ {
+ /* fixed-length passed by reference, and cstrings */
+ data_length = att_align_nominal(data_length, att->attalign);
+ data_length = att_addlength_datum(data_length, att->attlen, datum);
+ }
+ }
+
+ return data_length;
+}
+
+/*
+ * robert_fill_null_bitmap
+ *
+ * We start out by setting all the bits, and then clear those that correspond
+ * to a null attribute.
+ */
+void
+robert_fill_null_bitmap(bits8 *bits, int entries_needed, bool *isnull)
+{
+ int i;
+
+ memset(bits, 0xff, BITMAPLEN(entries_needed));
+
+ for (i = 0; i < entries_needed; ++i)
+ if (isnull[i])
+ bits[i / BITS_PER_BYTE] &= ~(1 << (i % BITS_PER_BYTE));
+}
+
+/*
+ * robert_fill_tuple
+ *
+ * Fill in the contents of a robert tuple from values/isnull arrays. On
+ * entry, the tuple header should already have been initialized, especially
+ * r_hoff. On exit, the tuple data area will have been filled in, and the
+ * r_flags field may also be updated.
+ */
+void
+robert_fill_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull,
+ RobertTupleHeader td, Size len)
+{
+ int i;
+ int numberOfAttributes = tupleDesc->natts;
+ char *data = ((char *) td) + td->r_hoff;
+
+ td->r_flags &= ~ROBERT_HASEXTERNAL;
+
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
+ Datum datum = values[i];
+ Size data_length;
+
+ if (isnull[i])
+ continue;
+
+ if (att->attbyval)
+ {
+ switch (att->attlen)
+ {
+ case sizeof(char):
+ {
+ char c = DatumGetChar(datum);
+
+ memcpy(data, &c, sizeof(char));
+ break;
+ }
+ case sizeof(int16):
+ {
+ int16 s = DatumGetInt16(datum);
+
+ memcpy(data, &s, sizeof(int16));
+ break;
+ }
+ case sizeof(int32):
+ {
+ int32 i = DatumGetInt32(datum);
+
+ memcpy(data, &i, sizeof(int32));
+ break;
+ }
+ case sizeof(Datum):
+ memcpy(data, &datum, sizeof(Datum));
+ break;
+ default:
+ elog(ERROR, "unsupported byval length: %d", att->attlen);
+ break;
+ }
+
+ data_length = att->attlen;
+ }
+ else if (att->attlen == -1)
+ {
+ /* varlena */
+ Pointer val = DatumGetPointer(datum);
+
+ if (VARATT_IS_EXTERNAL(val))
+ {
+ if (VARATT_IS_EXTERNAL_EXPANDED(val))
+ {
+ /*
+ * we want to flatten the expanded value so that the
+ * constructed tuple doesn't depend on it
+ */
+ ExpandedObjectHeader *eoh = DatumGetEOHP(datum);
+
+ robert_fill_padding(td, &data, att->attalign);
+ data_length = EOH_get_flat_size(eoh);
+ EOH_flatten_into(eoh, data, data_length);
+ }
+ else
+ {
+ td->r_flags |= ROBERT_HASEXTERNAL;
+ /* no alignment, since it's short by definition */
+ data_length = VARSIZE_EXTERNAL(val);
+ memcpy(data, val, data_length);
+ }
+ }
+ else if (VARATT_IS_SHORT(val))
+ {
+ /* no alignment for short varlenas */
+ data_length = VARSIZE_SHORT(val);
+ memcpy(data, val, data_length);
+ }
+ else if (att->attlen == -1 && att->attstorage != 'p' &&
+ VARATT_CAN_MAKE_SHORT(val))
+ {
+ /* convert to short varlena -- no alignment */
+ data_length = VARATT_CONVERTED_SHORT_SIZE(val);
+ SET_VARSIZE_SHORT(data, data_length);
+ memcpy(data + 1, VARDATA(val), data_length - 1);
+ }
+ else
+ {
+ /* full 4-byte header varlena */
+ robert_fill_padding(td, &data, att->attalign);
+ data_length = VARSIZE(val);
+ memcpy(data, val, data_length);
+ }
+ }
+ else if (att->attlen == -2)
+ {
+ /* cstring ... never needs alignment */
+ Assert(att->attalign == 'c');
+ data_length = strlen(DatumGetCString(datum)) + 1;
+ memcpy(data, DatumGetPointer(datum), data_length);
+ }
+ else
+ {
+ /* fixed-length pass-by-reference */
+ robert_fill_padding(td, &data, att->attalign);
+ Assert(att->attlen > 0);
+ data_length = att->attlen;
+ memcpy(data, DatumGetPointer(datum), data_length);
+ }
+
+ data += data_length;
+ }
+
+ Assert(data == ((char *) td) + len);
+}
+
+/*
+ * robert_form_tuple
+ *
+ * Construct a tuple from the given values[] and isnull[] arrays, which are of
+ * the length indicated by tupleDesc->natts.
+ *
+ * The result is allocated in the current memory context.
+ */
+RobertTuple
+robert_form_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull)
+{
+ RobertTuple tuple; /* return tuple */
+ RobertTupleHeader td;
+ Size len;
+ uint8 hoff;
+ int nullBitmapEntriesNeeded;
+
+ /*
+ * Compute required space. Note that, unlike the heap, there is no
+ * padding between the end of the null bitmap and the beginning of the
+ * data.
+ */
+ nullBitmapEntriesNeeded = robert_tuple_bitmap_size(tupleDesc, isnull);
+ hoff = offsetof(RobertTupleHeaderData, r_bits)
+ + BITMAPLEN(nullBitmapEntriesNeeded);
+ len = robert_compute_data_size(tupleDesc, values, isnull, hoff);
+
+ /* Allocate space. */
+ tuple = palloc(ROBERTTUPLESIZE + len);
+ td = (RobertTupleHeader) ((char *) tuple + ROBERTTUPLESIZE);
+
+ /* Fill in control information. */
+ tuple->r_len = len;
+ tuple->r_data = td;
+ tuple->r_tableOid = InvalidOid;
+ ItemPointerSetInvalid(&tuple->r_self);
+
+ /* Fill in tuple header information. */
+ td->r_undoptr = InvalidUndoRecPtr;
+ Assert(tupleDesc->natts <= ROBERT_NATTS_MASK);
+ td->r_flags = tupleDesc->natts;
+ td->r_hoff = hoff;
+
+ /* Fill null bitmap. */
+ robert_fill_null_bitmap(td->r_bits, nullBitmapEntriesNeeded, isnull);
+
+ /* Fill tuple data, maybe adjusting flags. */
+ robert_fill_tuple(tupleDesc, values, isnull, td, len);
+
+ return tuple;
+}
+
+/*
+ * robert_deform_tuple
+ *
+ * Partially or completely deform a robert tuple using the provided tupleDesc.
+ * Attribute numbers greater than or equal to oldnatts and less than natts
+ * are deformed. *offp must point to the starting location of the first
+ * attribute to be deformed.
+ *
+ * The idea is that callers will initially pass oldnatts = 0 and *offp =
+ * tuple->r_hoff. If, later, more columns are to be deformed, the previous
+ * value of natts should be passed as oldnatts, and *offp should have the value
+ * to which it was set by the previous call.
+ */
+void
+robert_deform_tuple(TupleDesc tupleDesc, RobertTuple tuple, Datum *values,
+ bool *isnull, int natts, int oldnatts, uint32 *offp)
+{
+ RobertTupleHeader tup = tuple->r_data;
+ int attnum;
+ uint32 off = *offp;
+ bits8 *bp = tup->r_bits;
+ int null_limit;
+
+ /* We can only fetch as many attributes as the tuple has. */
+ natts = Min((tuple->r_data->r_flags & ROBERT_NATTS_MASK), natts);
+
+ /* Attributes beyond the end of the null bitmap are not null. */
+ null_limit = (tup->r_hoff - SizeofRobertTupleHeader) * BITS_PER_BYTE;
+
+ for (attnum = oldnatts; attnum < natts; attnum++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, attnum);
+ Datum *value = &values[attnum];
+ char *tp = (char *) tup + off;
+
+ if (attnum < null_limit && att_isnull(attnum, bp))
+ {
+ *value = (Datum) 0;
+ isnull[attnum] = true;
+ continue;
+ }
+
+ isnull[attnum] = false;
+
+ if (att->attbyval)
+ {
+ switch (att->attlen)
+ {
+ case sizeof(char):
+ {
+ char c;
+
+ memcpy(&c, tp, sizeof(char));
+ *value = CharGetDatum(c);
+ break;
+ }
+ case sizeof(int16):
+ {
+ int16 s;
+
+ memcpy(&s, tp, sizeof(int16));
+ *value = Int16GetDatum(s);
+ break;
+ }
+ case sizeof(int32):
+ {
+ int32 i;
+
+ memcpy(&i, tp, sizeof(int32));
+ *value = Int32GetDatum(i);
+ break;
+ }
+ case sizeof(Datum):
+ memcpy(value, tp, sizeof(Datum));
+ break;
+ default:
+ elog(ERROR, "unsupported byval length: %d", att->attlen);
+ break;
+ }
+
+ off += att->attlen;
+ }
+ else if (att->attlen == -1)
+ {
+ if (VARATT_NOT_PAD_BYTE(tp))
+ {
+ /* potentially unaligned varlena */
+ *value = PointerGetDatum(tp);
+ off += VARSIZE_ANY(*value);
+ }
+ else
+ {
+ /* we have at least one pad byte, so must be aligned varlena */
+ off = att_align_nominal(off, att->attalign);
+ tp = (char *) tup + off;
+ *value = PointerGetDatum(tp);
+ off += VARSIZE(*value);
+ }
+ }
+ else if (att->attlen == -2)
+ {
+ /* cstring */
+ Assert(att->attalign == 'c');
+ *value = PointerGetDatum(tp);
+ off += strlen(tp);
+ }
+ else
+ {
+ /* fixed-length pass-by-reference; skip any pad bytes */
+ Assert(att->attlen > 0);
+ off = att_align_nominal(off, att->attalign);
+ tp = (char *) tup + off;
+ *value = PointerGetDatum(tp);
+ off += att->attlen;
+ }
+ }
+
+ /* Update caller-provided offset. */
+ *offp = off;
+}
+
+/*
+ * robert_toast_tuple
+ *
+ * Compress tuple attributes or store them externally as necessary to get the
+ * size of the tuple down to something acceptable.
+ *
+ * This function uses MaxHeapAttributeNumber as the maximum number of columns
+ * in the tuple to be inserted. That limit is actually sensible because our
+ * storage format and heap have similar constraints; however, even if they
+ * didn't, this is used by other parts of the system as if it were a global
+ * limit rather than an AM-specific property. Perhaps that can be cleaned up
+ * someday.
+ */
+RobertTuple
+robert_toast_tuple(Relation rel, TupleTableSlot *slot, int options)
+{
+ ToastTupleContext ttc;
+ TupleDesc tupleDesc = slot->tts_tupleDescriptor;
+ int natts = tupleDesc->natts;
+ Datum toast_values[MaxHeapAttributeNumber];
+ ToastAttrInfo toast_attr[MaxHeapAttributeNumber];
+ uint8 hoff;
+ Size maxLen;
+ bool done;
+ bool for_compression = true;
+ RobertTuple result;
+
+ /* Get all tuple attributes. */
+ slot_getallattrs(slot);
+
+ /*
+ * If this is neither a relation nor a materialized view, it should not
+ * require any TOAST work; it's presumably a TOAST table. We can save
+ * some overhead by forming and returning the necessary tuple at once.
+ */
+ if (rel->rd_rel->relkind != RELKIND_RELATION &&
+ rel->rd_rel->relkind != RELKIND_MATVIEW)
+ {
+ RobertTuple tuple;
+
+ tuple = robert_form_tuple(tupleDesc, slot->tts_values,
+ slot->tts_isnull);
+ Assert((tuple->r_data->r_flags & ROBERT_HASEXTERNAL) == 0);
+ return tuple;
+ }
+
+ /*
+ * We're going to scribble on the values array, so copy it into our
+ * scratch space. The tts_isnull array will not be changed, so we don't
+ * need to copy it.
+ */
+ Assert(natts <= MaxHeapAttributeNumber);
+ memcpy(toast_values, slot->tts_values, sizeof(Datum) * natts);
+
+ /* Prepare for toasting. */
+ ttc.ttc_rel = rel;
+ ttc.ttc_values = toast_values;
+ ttc.ttc_isnull = slot->tts_isnull;
+ ttc.ttc_oldvalues = NULL;
+ ttc.ttc_oldisnull = NULL;
+ ttc.ttc_toastrel = NULL;
+ ttc.ttc_toastslot = NULL;
+ ttc.ttc_attr = toast_attr;
+ toast_tuple_init(&ttc);
+
+ /* Compute header overhead. */
+ hoff = offsetof(RobertTupleHeaderData, r_bits);
+ if ((ttc.ttc_flags & TOAST_HAS_NULLS) != 0)
+ {
+ int nullBitmapEntriesNeeded;
+
+ nullBitmapEntriesNeeded =
+ robert_tuple_bitmap_size(slot->tts_tupleDescriptor,
+ slot->tts_isnull);
+ hoff += BITMAPLEN(nullBitmapEntriesNeeded);
+ }
+
+ /*
+ * Compute maximum tuple length. It's not really correct to use
+ * TOAST_TUPLE_TARGET here, because that's a heap-specific property, but
+ * it's also not exactly clear what value would be better.
+ */
+ maxLen = RelationGetToastTupleTarget(rel, TOAST_TUPLE_TARGET);
+
+ /*
+ * Compress attributes with attstorage 'x', and store large attributes
+ * with attstorage 'x' or 'e' externally. If that isn't enough, make
+ * additional attributes with attstorage 'x' or 'e' external. (However,
+ * if there's no TOAST table, then we can't make anything external.)
+ */
+ while (1)
+ {
+ int biggest_attno;
+
+ /* See whether the tuple will be too large. */
+ done = maxLen >= robert_compute_data_size(tupleDesc, toast_values,
+ slot->tts_isnull, hoff);
+ if (done)
+ break;
+
+ /*
+ * Find the largest attribute with attstorage 'x' or 'e'. If
+ * for_compression is true, it must also be potentially compressible.
+ */
+ biggest_attno =
+ toast_tuple_find_biggest_attribute(&ttc, for_compression, false);
+ if (biggest_attno < 0)
+ {
+ /*
+ * No suitable attribute was found. If we were looking for
+ * compressible attributes, we can still try looking for
+ * non-compresesable attributes, provided that we have a TOAST
+ * table to which to push them.
+ */
+ if (!for_compression || !OidIsValid(rel->rd_rel->reltoastrelid))
+ break;
+ for_compression = false;
+ break;
+ }
+
+ if (for_compression)
+ {
+ /* attempt to compress it inline if it has attstorage 'x' */
+ if (TupleDescAttr(tupleDesc, biggest_attno)->attstorage == 'x')
+ toast_tuple_try_compression(&ttc, biggest_attno);
+ else
+ {
+ /* attstorage 'e', so flag incompressible */
+ toast_attr[biggest_attno].tai_colflags |=
+ TOASTCOL_INCOMPRESSIBLE;
+ }
+
+ /*
+ * If it's really big, push it out to the TOAST table immediately.
+ * This avoids uselessly compressing other fileds in the common
+ * case where we have one long field and several short ones.
+ */
+ if (toast_attr[biggest_attno].tai_size > maxLen &&
+ OidIsValid(rel->rd_rel->reltoastrelid))
+ toast_tuple_externalize(&ttc, biggest_attno, options,
+ TOAST_MAX_CHUNK_SIZE);
+ }
+ else
+ {
+ /*
+ * XXX. We really should not be using TOAST_MAX_CHUNK_SIZE here,
+ * since that is a heap-specific value.
+ */
+ toast_tuple_externalize(&ttc, biggest_attno, options,
+ TOAST_MAX_CHUNK_SIZE);
+ }
+ }
+
+ /* Try compressing attributes with attstorage 'm.' */
+ while (!done)
+ {
+ int biggest_attno;
+
+ biggest_attno = toast_tuple_find_biggest_attribute(&ttc, true, true);
+ if (biggest_attno < 0)
+ break;
+ toast_tuple_try_compression(&ttc, biggest_attno);
+
+ /* See whether we've sufficiently shrunk the tuple. */
+ done = maxLen >= robert_compute_data_size(tupleDesc, toast_values,
+ slot->tts_isnull, hoff);
+ }
+
+ /*
+ * If we're not yet done and if there is a TOAST table, we can try storing
+ * attributes with attstorage 'm' externally.
+ */
+ if (OidIsValid(rel->rd_rel->reltoastrelid))
+ {
+ /* Only do this if it's *really* necessary. */
+ maxLen = TOAST_TUPLE_TARGET_MAIN;
+
+ while (!done)
+ {
+ int biggest_attno;
+
+ biggest_attno =
+ toast_tuple_find_biggest_attribute(&ttc, false, true);
+ if (biggest_attno < 0)
+ break;
+ toast_tuple_try_compression(&ttc, biggest_attno);
+
+ /* See whether we've sufficiently shrunk the tuple. */
+ done = maxLen >= robert_compute_data_size(tupleDesc, toast_values,
+ slot->tts_isnull, hoff);
+ }
+ }
+
+ /*
+ * XXX. If the source slot is a robert tuple and no changes got made, we
+ * can optimize this.
+ */
+ result = robert_form_tuple(tupleDesc, toast_values, slot->tts_isnull);
+
+ toast_tuple_cleanup(&ttc, true);
+
+ return result;
+}
+
+/*
+ * robert_print_tuple
+ *
+ * Dump a tuple as a printable string. The caller may pfree the returned
+ * string, if desired.
+ *
+ * If the tupleDesc is passed as NULL, the data portion of the tuple will be
+ * dumped as one long string of bytes. Otherwise, the tupleDesc will be
+ * used to deform the tuple, and the bytes for each attribute will be dumped
+ * individually.
+ *
+ * This is intended for debugging purposes. Where it's not excessively
+ * burdensome to do so, we try to guard against the possibility that this
+ * function might be passed a corrupt tuple; instead, we try to produce
+ * some kind of meaningful text representation and let the user sort it out.
+ * This is not perfect; a defective tuple can certainly cause a crash here,
+ * especially if tupleDesc is not NULL, but it helps.
+ */
+char *
+robert_print_tuple(RobertTuple tuple, TupleDesc tupleDesc)
+{
+ RobertTupleHeader td = tuple->r_data;
+ unsigned char *s = (unsigned char *) td;
+ uint32 offp = 0;
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+
+ /* Decode tuple header. */
+ if (tuple->r_len >= SizeofRobertTupleHeader)
+ {
+ UndoLogNumber ulogno = UndoRecPtrGetLogNo(td->r_undoptr);
+ UndoLogOffset uoffset = UndoRecPtrGetOffset(td->r_undoptr);
+
+ appendStringInfo(&buf, "undo:%06X.%010" INT64_MODIFIER
+ "X flags:%04X hoff:%u", ulogno, uoffset,
+ td->r_flags, td->r_hoff);
+ offp = SizeofRobertTupleHeader;
+ }
+
+ /* Decode any null bitmap. */
+ if (tuple->r_len >= td->r_hoff && td->r_hoff > SizeofRobertTupleHeader)
+ {
+ Size nullbytes = td->r_hoff - SizeofRobertTupleHeader;
+ Size i;
+
+ appendStringInfoString(&buf, " nulls:");
+ for (i = 0; i < nullbytes; ++i)
+ appendStringInfo(&buf, "%02x", s[i]);
+ offp = td->r_hoff;
+ }
+
+ /* Decode attributes, if a tupleDesc was provided. */
+ if (tupleDesc != NULL)
+ {
+ Datum *values;
+ bool *isnull;
+ int attnum;
+
+ values = palloc(tupleDesc->natts * sizeof(Datum));
+ isnull = palloc(tupleDesc->natts * sizeof(bool));
+
+ /*
+ * Normally, we want to deform all attributes at once for efficiency,
+ * but here we deform the one by one so that we can learn the byte
+ * position of each attribute in the tuple.
+ */
+ for (attnum = 0; attnum < tupleDesc->natts; ++attnum)
+ {
+ uint32 prev_offp = offp;
+ Size i;
+
+ robert_deform_tuple(tupleDesc, tuple, values, isnull, attnum + 1,
+ attnum, &offp);
+ if (isnull[attnum])
+ continue;
+ appendStringInfo(&buf, " %d(%d):", attnum + 1, offp - prev_offp);
+ for (i = prev_offp; i < offp; ++i)
+ appendStringInfo(&buf, "%02x", s[i]);
+ }
+
+ pfree(values);
+ pfree(isnull);
+ }
+
+ /* Dump any remaining bytes. Unexpected if a tupleDesc was specified. */
+ if (offp < tuple->r_len)
+ {
+ Size i;
+
+ appendStringInfoString(&buf, " rest:");
+ for (i = offp; i < tuple->r_len; ++i)
+ appendStringInfo(&buf, "%02x", s[i]);
+ }
+
+ return buf.data;
+}
+
+/*
+ * robert_insert_padding
+ *
+ * Insert enough padding into a tuple so that the next field will start
+ * aligned on an appropriate boundary. Update *data to point to the first
+ * byte following the inserted padding bytes.
+ */
+static void
+robert_fill_padding(RobertTupleHeader td, char **data, char typalign)
+{
+ char *start = (char *) td;
+ int curbytes = *data - start;
+ int padbytes = att_align_nominal(curbytes, typalign) - curbytes;
+
+ memset(*data, 0, padbytes);
+ *data += padbytes;
+}
+
+/*
+ * robert_tuple_bitmap_size
+ *
+ * Work out the number of entries that we'll need for the null bitmap. Note
+ * that the return value is the number of entries, not the number of bytes.
+ */
+static int
+robert_tuple_bitmap_size(TupleDesc tupleDesc, bool *isnull)
+{
+ int nullBitmapEntriesNeeded = 0;
+ int i;
+
+ /*
+ * We'll only store enough bytes in the null bitmap to represent the nulls
+ * actually present in the tuple, so we need to find the last null actually
+ * present.
+ */
+ for (i = tupleDesc->natts; i >= 0; i--)
+ {
+ if (isnull[i])
+ {
+ nullBitmapEntriesNeeded = i + 1;
+ break;
+ }
+ }
+
+ return nullBitmapEntriesNeeded;
+}
--- /dev/null
+/*
+ * robert_xlog.c
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "access/robert_xlog.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "utils/rel.h"
+
+static void robert_undo_buffer(Buffer buf, int nrecords, UndoRecInfo *records);
+
+void
+robert_redo(XLogReaderState *record)
+{
+}
+
+/*
+ * robert_undo
+ *
+ * Toplevel undo handler for this AM.
+ */
+void
+robert_undo(int nrecords, UndoRecInfo *records)
+{
+ Relation rel = NULL;
+ int i,
+ j;
+
+ Assert(nrecords > 0);
+
+ for (i = 0; i < nrecords; i = j)
+ {
+ UnpackedUndoRecord *record = records[i].uur;
+ Buffer buf;
+
+ /*
+ * If this undo record is for the same relation as the previous undo
+ * record, we just keep the same relation open. If it's for a
+ * different relation, close the old one and open the new one.
+ *
+ * XXX. It might be better to keep the lock on the old relation if
+ * there's more undo pending for that relation in this transaction,
+ * to avoid deadlock risk. But how would we arrange to close the
+ * relation at the end? Also, if this is the last chunk of undo then
+ * there shouldn't be any more records for this relation, so it would
+ * probably be better to release the lock right away. And also, if
+ * we're already holding on to tons of locks, it might be better to
+ * drop some of them to free up space in the lock table so we don't
+ * just fail. For now, do the easy thing.
+ */
+ if (rel == NULL || RelationGetRelid(rel) != record->uur_reloid)
+ {
+ elog(LOG, "robert_undo: select relation %u", record->uur_reloid);
+ if (rel != NULL)
+ relation_close(rel, RowExclusiveLock);
+ rel = relation_open(record->uur_reloid, RowExclusiveLock);
+ }
+
+ /* Figure out where the records for this block stop. */
+ for (j = i + 1; j < nrecords; ++j)
+ {
+ UnpackedUndoRecord *otherrec = records[j].uur;
+
+ /*
+ * XXX. This should also check uur_dbid, but right now that doesn't
+ * work properly.
+ */
+ if (record->uur_reloid != otherrec->uur_reloid ||
+ record->uur_fork != otherrec->uur_fork ||
+ record->uur_block != otherrec->uur_block)
+ break;
+ }
+
+ /* Read the target block. */
+ elog(LOG, "robert_undo: block %u records start at %d, end at %d",
+ record->uur_block, i, j);
+ buf = ReadBuffer(rel, record->uur_block);
+
+ /*
+ * Process all records for the target block. This function also
+ * releases the pin.
+ */
+ robert_undo_buffer(buf, j - i, records + i);
+ }
+
+ /* Clean up. */
+ if (rel != NULL)
+ relation_close(rel, RowExclusiveLock);
+}
+
+/*
+ * robert_undo_buffer
+ *
+ * Apply a group of undo actions that all relate to a particular buffer.
+ * On entry, the buffer should be pinned; this function will release the
+ * pin before returning.
+ */
+static void
+robert_undo_buffer(Buffer buf, int nrecords, UndoRecInfo *records)
+{
+ Page page = BufferGetPage(buf);
+ int i;
+
+ /* Take an exclusive lock on the buffer. */
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * XXX. This is an obvious crock that only works for inserts. For a
+ * an in-place update, we'd need to fetch the old tuple from undo and
+ * put it back. For a non-in-place update or delete, we only need to
+ * remove the delete-mark unless the page has been pruned.
+ */
+ for (i = 0; i < nrecords; ++i)
+ {
+ UnpackedUndoRecord *record = records[i].uur;
+ ItemId iid = PageGetItemId((PageHeader) page, record->uur_offset);
+
+ if (record->uur_type == ROBERT_UNDO_INSERT)
+ ItemIdSetDead(iid);
+ else if (record->uur_type == ROBERT_UNDO_DELETE)
+ ItemIdSetNormal(iid, ItemIdGetOffset(iid),
+ ItemIdGetLength(iid));
+ else
+ elog(NOTICE, "robert_undo: unknown type %d", record->uur_type);
+ }
+
+ /* All done. Note that caller expects us to drop the pin. */
+ UnlockReleaseBuffer(buf);
+}
+
+void
+robert_undo_desc(StringInfo buf, UnpackedUndoRecord *record)
+{
+ elog(LOG, "robert_undo_desc");
+}
--- /dev/null
+/*
+ * robertam.c
+ */
+
+#include "postgres.h"
+
+#include "access/heaptoast.h"
+#include "access/robertam.h"
+#include "access/robert_scan.h"
+#include "access/robert_slot.h"
+#include "utils/builtins.h"
+
+extern Datum robert_tableam_handler(PG_FUNCTION_ARGS);
+
+static const TableAmRoutine robertam_methods = {
+ .type = T_TableAmRoutine,
+
+ .slot_callbacks = robert_slot_callbacks,
+
+ .scan_begin = robert_scan_begin,
+ .scan_end = robert_scan_end,
+ .scan_rescan = robert_scan_rescan,
+ .scan_getnextslot = robert_scan_getnextslot,
+
+ .parallelscan_estimate = table_block_parallelscan_estimate,
+ .parallelscan_initialize = table_block_parallelscan_initialize,
+ .parallelscan_reinitialize = table_block_parallelscan_reinitialize,
+
+ .index_fetch_begin = robert_index_fetch_begin,
+ .index_fetch_reset = robert_index_fetch_reset,
+ .index_fetch_end = robert_index_fetch_end,
+ .index_fetch_tuple = robert_index_fetch_tuple,
+
+ .tuple_fetch_row_version = robert_tuple_fetch_row_version,
+ .tuple_tid_valid = robert_tuple_tid_valid,
+ .tuple_get_latest_tid = robert_tuple_get_latest_tid,
+ .tuple_satisfies_snapshot = robert_tuple_satisfies_snapshot,
+ .compute_xid_horizon_for_tuples = robert_compute_xid_horizon_for_tuples,
+
+ .tuple_insert = robert_tuple_insert,
+ .tuple_insert_speculative = robert_tuple_insert_speculative,
+ .tuple_complete_speculative = robert_tuple_complete_speculative,
+ .multi_insert = robert_multi_insert,
+ .tuple_delete = robert_tuple_delete,
+ .tuple_update = robert_tuple_update,
+ .tuple_lock = robert_tuple_lock,
+ .finish_bulk_insert = robert_finish_bulk_insert,
+
+ .relation_set_new_filenode = robert_relation_set_new_filenode,
+ .relation_nontransactional_truncate = robert_relation_nontransactional_truncate,
+ .relation_copy_data = robert_relation_copy_data,
+ .relation_copy_for_cluster = robert_relation_copy_for_cluster,
+ .relation_vacuum = robert_relation_vacuum,
+ .scan_analyze_next_block = robert_scan_analyze_next_block,
+ .scan_analyze_next_tuple = robert_scan_analyze_next_tuple,
+ .index_build_range_scan = robert_index_build_range_scan,
+ .index_validate_scan = robert_index_validate_scan,
+
+ .relation_size = table_block_relation_size,
+ .relation_needs_toast_table = robert_relation_needs_toast_table,
+ .relation_toast_am = robert_relation_toast_am,
+ .toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE,
+
+ .relation_estimate_size = robert_relation_estimate_size,
+
+ .scan_bitmap_next_block = robert_scan_bitmap_next_block,
+ .scan_bitmap_next_tuple = robert_scan_bitmap_next_tuple,
+
+ .scan_sample_next_block = robert_scan_sample_next_block,
+ .scan_sample_next_tuple = robert_scan_sample_next_tuple
+};
+
+Datum
+robert_tableam_handler(PG_FUNCTION_ARGS)
+{
+ PG_RETURN_POINTER(&robertam_methods);
+}
+
+
+bool
+robert_tuple_fetch_row_version(Relation rel, ItemPointer tid,
+ Snapshot snapshot, TupleTableSlot *slot)
+{
+ return false;
+}
+
+bool
+robert_tuple_tid_valid(TableScanDesc sscan, ItemPointer tid)
+{
+ RobertScanDesc scan = (RobertScanDesc) sscan;
+
+ return ItemPointerIsValid(tid) &&
+ ItemPointerGetBlockNumber(tid) < scan->rrs_nblocks;
+}
+
+void
+robert_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
+{
+}
+
+bool
+robert_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
+ Snapshot snapshot)
+{
+ return false;
+}
+
+TransactionId
+robert_compute_xid_horizon_for_tuples(Relation rel,
+ ItemPointerData *items,
+ int nitems)
+{
+ return InvalidTransactionId;
+}
+
+/*
+ * Check to see whether the table needs a TOAST table. It does only if
+ * (1) there are any toastable attributes, and (2) the maximum length
+ * of a tuple could exceed TOAST_TUPLE_THRESHOLD. (We don't want to
+ * create a toast table for something like "f1 varchar(20)".)
+ */
+bool
+robert_relation_needs_toast_table(Relation rel)
+{
+ int32 data_length = 0;
+ bool maxlength_unknown = false;
+ bool has_toastable_attrs = false;
+ TupleDesc tupdesc = rel->rd_att;
+ int32 tuple_length;
+ int i;
+
+ for (i = 0; i < tupdesc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupdesc, i);
+
+ if (att->attisdropped)
+ continue;
+
+ /* we don't align if it's pass-by-value */
+ if (!att->attbyval)
+ data_length = att_align_nominal(data_length, att->attalign);
+
+ if (att->attlen > 0)
+ {
+ /* Fixed-length types are never toastable */
+ data_length += att->attlen;
+ }
+ else
+ {
+ int32 maxlen = type_maximum_size(att->atttypid,
+ att->atttypmod);
+
+ if (maxlen < 0)
+ maxlength_unknown = true;
+ else
+ data_length += maxlen;
+ if (att->attstorage != 'p')
+ has_toastable_attrs = true;
+ }
+ }
+ if (!has_toastable_attrs)
+ return false; /* nothing to toast? */
+ if (maxlength_unknown)
+ return true; /* any unlimited-length attrs? */
+ tuple_length = SizeofRobertTupleHeader + BITMAPLEN(tupdesc->natts) +
+ data_length;
+ return (tuple_length > TOAST_TUPLE_THRESHOLD);
+}
+
+/*
+ * robert_relation_toast_am
+ *
+ * TOAST tables for robert relations are just robert relations.
+ */
+Oid
+robert_relation_toast_am(Relation rel)
+{
+ return rel->rd_rel->relam;
+}
+
+/*
+ * robert_relation_estimate_size
+ *
+ * Each tuple involves an item pointer and an (unaligned) RobertTupleHeader.
+ *
+ * We don't currently have any special space, so only the size of the page
+ * header needs to be subtracted from the number of usable bytes per page.
+ */
+void
+robert_relation_estimate_size(Relation rel, int32 *attr_widths,
+ BlockNumber *pages, double *tuples,
+ double *allvisfrac)
+{
+ const Size overhead_bytes_per_tuple =
+ SizeofRobertTupleHeader + sizeof(ItemIdData);
+ const Size usable_bytes_per_page = BLCKSZ - SizeOfPageHeaderData;
+
+ table_block_relation_estimate_size(rel, attr_widths, pages, tuples,
+ allvisfrac,
+ overhead_bytes_per_tuple,
+ usable_bytes_per_page);
+}
#include "access/brin_xlog.h"
#include "access/multixact.h"
#include "access/nbtxlog.h"
+#include "access/robert_xlog.h"
#include "access/spgxlog.h"
#include "access/undoaction_xlog.h"
#include "access/undolog_xlog.h"
DecodeLogicalMsgOp(ctx, &buf);
break;
+ case RM_ROBERT_ID:
+ /* XXX. We sure need to do something better here. */
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+ buf.origptr);
+ break;
+
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
#include "access/multixact.h"
#include "access/nbtxlog.h"
#include "access/rmgr.h"
+#include "access/robert_xlog.h"
#include "access/spgxlog.h"
#include "access/undoaction_xlog.h"
#include "access/undolog_xlog.h"
PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL, NULL)
PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL, NULL)
PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_ROBERT_ID, "Robert", robert_redo, robert_desc, robert_identify, NULL, NULL, NULL, robert_undo, NULL, robert_undo_desc)
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * robert_page.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef ROBERT_PAGE_H
+#define ROBERT_PAGE_H
+
+#include "access/robert_tuple.h"
+#include "access/undolog.h"
+#include "storage/block.h"
+#include "storage/itemptr.h"
+#include "storage/off.h"
+
+/* External function prototypes. */
+extern OffsetNumber robert_page_free_offset(Page page, Size size);
+extern void robert_page_add_item(Page page, OffsetNumber offnum,
+ RobertTuple tuple);
+
+#endif
--- /dev/null
+/*
+ * robert_scan.h
+ */
+
+#ifndef ROBERT_SCAN_H
+#define ROBERT_SCAN_H
+
+#include "access/relscan.h"
+#include "access/sdir.h"
+#include "access/skey.h"
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+#include "nodes/tidbitmap.h"
+#include "utils/snapshot.h"
+
+typedef enum RobertScanState
+{
+ ROBERT_SCAN_NOT_STARTED,
+ ROBERT_SCAN_BLOCK_DONE,
+ ROBERT_SCAN_TUPLE_DONE,
+ ROBERT_SCAN_READY
+} RobertScanState;
+
+typedef struct RobertScanDescData
+{
+ TableScanDescData rrs_base; /* AM independent part of the descriptor */
+ BlockNumber rrs_nblocks;
+ BlockNumber rrs_startblock;
+ BlockNumber rrs_numblocks;
+ RobertScanState rrs_state;
+ BlockNumber rrs_cblock;
+ OffsetNumber rrs_coffset;
+ OffsetNumber rrs_lastoffset;
+ int rrs_tupindex;
+ BufferAccessStrategy rrs_strategy;
+ Page rrs_cpage;
+} RobertScanDescData;
+
+typedef RobertScanDescData *RobertScanDesc;
+
+/* Table scans. */
+extern TableScanDesc robert_scan_begin(Relation rel, Snapshot snapshot,
+ int nkeys, ScanKeyData *key,
+ ParallelTableScanDesc pscan, uint32 flags);
+extern void robert_scan_end(TableScanDesc sscan);
+extern void robert_scan_rescan(TableScanDesc sscan, ScanKeyData *key,
+ bool set_params, bool allow_strat,
+ bool allow_sync, bool allow_pagemode);
+extern bool robert_scan_getnextslot(TableScanDesc sscan,
+ ScanDirection direction,
+ TupleTableSlot *slot);
+
+/* Index scans. */
+extern IndexFetchTableData *robert_index_fetch_begin(Relation rel);
+extern void robert_index_fetch_reset(IndexFetchTableData *data);
+extern void robert_index_fetch_end(IndexFetchTableData *data);
+extern bool robert_index_fetch_tuple(IndexFetchTableData *data,
+ ItemPointer tid, Snapshot snapshot,
+ TupleTableSlot *slot,
+ bool *call_again, bool *all_dead);
+
+/* Bitmap scans. */
+extern bool robert_scan_bitmap_next_block(TableScanDesc sscan,
+ TBMIterateResult *tbmres);
+extern bool robert_scan_bitmap_next_tuple(TableScanDesc sscan,
+ TBMIterateResult *tbmres,
+ TupleTableSlot *slot);
+
+/* Sample scans. */
+extern bool robert_scan_sample_next_block(TableScanDesc scan,
+ SampleScanState *scanstate);
+extern bool robert_scan_sample_next_tuple(TableScanDesc scan,
+ SampleScanState *scanstate,
+ TupleTableSlot *slot);
+
+/* Internal functions */
+extern BlockNumber robert_scan_get_blocks_done(RobertScanDesc scan);
+
+#endif
--- /dev/null
+/*
+ * robert_slot.h
+ */
+
+#ifndef ROBERT_SLOT_H
+#define ROBERT_SLOT_H
+
+#include "access/robert_tuple.h"
+#include "executor/tuptable.h"
+#include "utils/rel.h"
+
+typedef struct RobertTupleTableSlot
+{
+ TupleTableSlot base;
+ RobertTuple tuple;
+ uint32 off;
+} RobertTupleTableSlot;
+
+extern PGDLLIMPORT const TupleTableSlotOps TTSOpsRobert;
+
+extern const TupleTableSlotOps *robert_slot_callbacks(Relation relation);
+extern void robert_slot_store(TupleTableSlot *slot, RobertTupleHeader td,
+ Size len, Oid tableOid, BlockNumber blkno,
+ OffsetNumber offset);
+extern bool robert_slot_store_visible(TupleTableSlot *slot,
+ RobertTupleHeader td, Size len,
+ Oid tableOid, BlockNumber blkno,
+ OffsetNumber offset, Snapshot snapshot,
+ bool item_is_dead);
+
+#endif
--- /dev/null
+/*
+ * robert_tuple.h
+ */
+
+#ifndef ROBERT_TUPLE_H
+#define ROBERT_TUPLE_H
+
+#include "access/transam.h"
+#include "access/tupdesc.h"
+#include "access/undolog.h"
+#include "storage/itemptr.h"
+#include "utils/relcache.h"
+
+struct TupleTableSlot;
+
+typedef struct RobertTupleHeaderData
+{
+ UndoRecPtr r_undoptr;
+ uint16 r_flags;
+ uint8 r_hoff;
+ bits8 r_bits[FLEXIBLE_ARRAY_MEMBER];
+} RobertTupleHeaderData;
+
+#define SizeofRobertTupleHeader offsetof(RobertTupleHeaderData, r_bits)
+
+/* Flags for use in r_flags. */
+#define ROBERT_NATTS_MASK 0x07FF /* 11 bits */
+#define ROBERT_HASEXTERNAL 0x0800
+
+typedef RobertTupleHeaderData *RobertTupleHeader;
+
+typedef struct RobertTupleData
+{
+ uint32 r_len;
+ RobertTupleHeader r_data;
+ Oid r_tableOid;
+ ItemPointerData r_self;
+} RobertTupleData;
+
+typedef RobertTupleData *RobertTuple;
+
+#define ROBERTTUPLESIZE MAXALIGN(sizeof(RobertTupleData))
+
+extern Size robert_compute_data_size(TupleDesc tupleDesc, Datum *values,
+ bool *isnull, uint8 hoff);
+extern void robert_fill_null_bitmap(bits8 *bits, int entries_needed,
+ bool *isnull);
+extern void robert_fill_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull,
+ RobertTupleHeader td, Size len);
+extern RobertTuple robert_form_tuple(TupleDesc tupleDesc, Datum *values,
+ bool *isnull);
+extern void robert_deform_tuple(TupleDesc tupleDesc, RobertTuple tuple,
+ Datum *values, bool *isnull, int natts, int oldnatts,
+ uint32 *offp);
+extern RobertTuple robert_toast_tuple(Relation rel,
+ struct TupleTableSlot *slot,
+ int options);
+extern char *robert_print_tuple(RobertTuple tuple, TupleDesc tupleDesc);
+
+#endif
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * robert_xlog.h
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/robert_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ROBERT_XLOG_H
+#define ROBERT_XLOG_H
+
+#include "access/undoaccess.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+
+#define ROBERT_UNDO_INSERT 1
+#define ROBERT_UNDO_DELETE 2
+
+extern void robert_redo(XLogReaderState *record);
+extern void robert_desc(StringInfo buf, XLogReaderState *record);
+extern const char *robert_identify(uint8 info);
+
+extern void robert_undo(int nrecords, UndoRecInfo *records);
+extern void robert_undo_desc(StringInfo buf, UnpackedUndoRecord *record);
+
+#endif /* ROBERT_XLOG_H */
--- /dev/null
+/*
+ * robertam.h
+ */
+
+#ifndef ROBERTAM_H
+#define ROBERTAM_H
+
+#include "access/hio.h"
+#include "access/skey.h"
+#include "access/tableam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "nodes/execnodes.h"
+
+/* MVCC. */
+extern bool robert_tuple_fetch_row_version(Relation rel, ItemPointer tid,
+ Snapshot snapshot, TupleTableSlot *slot);
+extern bool robert_tuple_tid_valid(TableScanDesc sscan, ItemPointer tid);
+extern void robert_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid);
+extern bool robert_tuple_satisfies_snapshot(Relation rel,
+ TupleTableSlot *slot,
+ Snapshot snapshot);
+extern TransactionId robert_compute_xid_horizon_for_tuples(Relation rel,
+ ItemPointerData *items,
+ int nitems);
+
+/* DML */
+extern void robert_tuple_insert(Relation rel, TupleTableSlot *slot,
+ CommandId cid, int options,
+ BulkInsertStateData *bistate);
+extern void robert_tuple_insert_speculative(Relation rel, TupleTableSlot *slot,
+ CommandId cid, int options,
+ BulkInsertStateData *bistate,
+ uint32 specToken);
+extern void robert_tuple_complete_speculative(Relation rel,
+ TupleTableSlot *slot, uint32 specToken,
+ bool succeeded);
+extern void robert_multi_insert(Relation rel,
+ TupleTableSlot **slots, int nslots,
+ CommandId cid, int options,
+ BulkInsertStateData *bistate);
+extern TM_Result robert_tuple_delete(Relation rel, ItemPointer tid,
+ CommandId cid, Snapshot snapshot, Snapshot crosscheck,
+ bool wait, TM_FailureData *tmfd, bool changingPart);
+extern TM_Result robert_tuple_update(Relation rel, ItemPointer otid,
+ TupleTableSlot *slot, CommandId cid, Snapshot snapshot,
+ Snapshot crosscheck, bool wait, TM_FailureData *tmfd,
+ LockTupleMode *lockmode, bool *update_indexes);
+extern TM_Result robert_tuple_lock(Relation rel, ItemPointer tid,
+ Snapshot snapshot, TupleTableSlot *slot, CommandId cid,
+ LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags,
+ TM_FailureData *tmfd);
+extern void robert_finish_bulk_insert(Relation rel, int options);
+
+
+/* DDL. */
+extern void robert_relation_set_new_filenode(Relation rel,
+ const RelFileNode *newrnode,
+ char persistence,
+ TransactionId *freezeXid,
+ MultiXactId *minmulti);
+extern void robert_relation_nontransactional_truncate(Relation rel);
+extern void robert_relation_copy_data(Relation rel,
+ const RelFileNode *newrnode);
+extern void robert_relation_copy_for_cluster(Relation NewHeap,
+ Relation OldHeap, Relation OldIndex,
+ bool use_sort, TransactionId OldestXmin,
+ TransactionId *xid_cutoff,
+ MultiXactId *multi_cutoff,
+ double *num_tuples, double *tups_vacuumed,
+ double *tups_recently_dead);
+extern void robert_relation_vacuum(Relation onerel, VacuumParams *params,
+ BufferAccessStrategy bstrategy);
+extern bool robert_scan_analyze_next_block(TableScanDesc scan,
+ BlockNumber blockno,
+ BufferAccessStrategy bstrategy);
+extern bool robert_scan_analyze_next_tuple(TableScanDesc scan,
+ TransactionId OldestXmin,
+ double *liverows, double *deadrows,
+ TupleTableSlot *slot);
+extern double robert_index_build_range_scan(Relation heap_rel,
+ Relation index_rel, IndexInfo *index_nfo,
+ bool allow_sync, bool anyvisible, bool progress,
+ BlockNumber start_blockno,
+ BlockNumber end_blockno,
+ IndexBuildCallback callback,
+ void *callback_state,
+ TableScanDesc scan);
+extern void robert_index_validate_scan(Relation heap_rel, Relation index_rel,
+ IndexInfo *index_info, Snapshot snapshot,
+ ValidateIndexState *state);
+
+/* Miscellaneous. */
+extern uint64 robert_relation_size(Relation rel, ForkNumber forkNumber);
+extern bool robert_relation_needs_toast_table(Relation rel);
+extern Oid robert_relation_toast_am(Relation rel);
+
+/* Planner. */
+extern void robert_relation_estimate_size(Relation rel, int32 *attr_widths,
+ BlockNumber *pages, double *tuples,
+ double *allvisfrac);
+
+#endif
{ oid => '3580', oid_symbol => 'BRIN_AM_OID',
descr => 'block range index (BRIN) access method',
amname => 'brin', amhandler => 'brinhandler', amtype => 'i' },
+{ oid => '8192', oid_symbol => 'ROBERT_HEAP_TABLE_AM_OID',
+ descr => 'robert table access method',
+ amname => 'robert', amhandler => 'robert_tableam_handler', amtype => 't' },
]
proname => 'heap_tableam_handler', provolatile => 'v',
prorettype => 'table_am_handler', proargtypes => 'internal',
prosrc => 'heap_tableam_handler' },
+{ oid => '8193', oid_symbol => 'ROBERT_TABLE_AM_HANDLER_OID',
+ descr => 'robert table access method handler',
+ proname => 'robert_tableam_handler', provolatile => 'v',
+ prorettype => 'table_am_handler', proargtypes => 'internal',
+ prosrc => 'robert_tableam_handler' },
# Index access method handlers
{ oid => '330', descr => 'btree index access method handler',
CREATE ACCESS METHOD bogus TYPE TABLE HANDLER bthandler;
ERROR: function bthandler must return type table_am_handler
SELECT amname, amhandler, amtype FROM pg_am where amtype = 't' ORDER BY 1, 2;
- amname | amhandler | amtype
---------+----------------------+--------
- heap | heap_tableam_handler | t
- heap2 | heap_tableam_handler | t
-(2 rows)
+ amname | amhandler | amtype
+--------+------------------------+--------
+ heap | heap_tableam_handler | t
+ heap2 | heap_tableam_handler | t
+ robert | robert_tableam_handler | t
+(3 rows)
-- First create tables employing the new AM using USING
-- plain CREATE TABLE
z_stream
z_streamp
zic_t
+RobertTuple
+RobertTupleData
+RobertTupleHeader
+RobertTupleHeaderData
+RobertTupleTableSlot