Dummy AM for experimentation. robert
authorRobert Haas <[email protected]>
Thu, 1 Aug 2019 18:56:54 +0000 (14:56 -0400)
committerRobert Haas <[email protected]>
Mon, 5 Aug 2019 13:26:48 +0000 (09:26 -0400)
26 files changed:
src/backend/access/Makefile
src/backend/access/rmgrdesc/Makefile
src/backend/access/rmgrdesc/robertdesc.c [new file with mode: 0644]
src/backend/access/robert/Makefile [new file with mode: 0644]
src/backend/access/robert/robert_ddl.c [new file with mode: 0644]
src/backend/access/robert/robert_dml.c [new file with mode: 0644]
src/backend/access/robert/robert_page.c [new file with mode: 0644]
src/backend/access/robert/robert_scan.c [new file with mode: 0644]
src/backend/access/robert/robert_slot.c [new file with mode: 0644]
src/backend/access/robert/robert_tuple.c [new file with mode: 0644]
src/backend/access/robert/robert_xlog.c [new file with mode: 0644]
src/backend/access/robert/robertam.c [new file with mode: 0644]
src/backend/access/transam/rmgr.c
src/backend/replication/logical/decode.c
src/bin/pg_waldump/rmgrdesc.c
src/include/access/rmgrlist.h
src/include/access/robert_page.h [new file with mode: 0644]
src/include/access/robert_scan.h [new file with mode: 0644]
src/include/access/robert_slot.h [new file with mode: 0644]
src/include/access/robert_tuple.h [new file with mode: 0644]
src/include/access/robert_xlog.h [new file with mode: 0644]
src/include/access/robertam.h [new file with mode: 0644]
src/include/catalog/pg_am.dat
src/include/catalog/pg_proc.dat
src/test/regress/expected/create_am.out
src/tools/pgindent/typedefs.list

index bf6d3fa1bd05900ccabf71d11b133d26fbf63129..8219b715774a5ac50bdf1f0abd48ade456ee2fe9 100644 (file)
@@ -9,6 +9,6 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 SUBDIRS            = brin common gin gist hash heap index nbtree rmgrdesc spgist \
-                         table tablesample transam undo
+                         table tablesample transam undo robert
 
 include $(top_srcdir)/src/backend/common.mk
index 640d37f37a38d9e16ebd03d9ddec7752cbecde8d..a18dc717cfeab09eaeb4f52e8096f3984c239bf4 100644 (file)
@@ -10,8 +10,8 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
           gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
-          mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
-          smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \
-          undologdesc.o xactdesc.o xlogdesc.o
+          mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o robertdesc.o \
+       seqdesc.o smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o \
+       undoactiondesc.o undologdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/robertdesc.c b/src/backend/access/rmgrdesc/robertdesc.c
new file mode 100644 (file)
index 0000000..5d03008
--- /dev/null
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * robertdesc.c
+ *       rmgr descriptor routines for access/robert
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/access/rmgrdesc/robertdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/robert_xlog.h"
+
+void
+robert_desc(StringInfo buf, XLogReaderState *record)
+{
+#if 0
+       char       *rec = XLogRecGetData(record);
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       info &= XLOG_HEAP_OPMASK;
+#endif
+}
+
+const char *
+robert_identify(uint8 info)
+{
+       const char *id = NULL;
+
+       switch (info & ~XLR_INFO_MASK)
+       {
+               default:
+                       id = "robert";
+       }
+
+       return id;
+}
diff --git a/src/backend/access/robert/Makefile b/src/backend/access/robert/Makefile
new file mode 100644 (file)
index 0000000..1f2f905
--- /dev/null
@@ -0,0 +1,18 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for access/robert
+#
+# IDENTIFICATION
+#    src/backend/access/robert/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/robert
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = robert_ddl.o robert_dml.o robert_page.o robert_scan.o \
+       robert_slot.o robert_tuple.o robert_xlog.o robertam.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/robert/robert_ddl.c b/src/backend/access/robert/robert_ddl.c
new file mode 100644 (file)
index 0000000..14f38b4
--- /dev/null
@@ -0,0 +1,366 @@
+/*
+ * robert_ddl.c
+ */
+
+#include "postgres.h"
+
+#include "access/multixact.h"
+#include "access/robert_scan.h"
+#include "access/robertam.h"
+#include "catalog/catalog.h"
+#include "catalog/storage.h"
+#include "catalog/storage_xlog.h"
+#include "commands/progress.h"
+#include "executor/executor.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "storage/procarray.h"
+
+/*
+ * robert_relation_set_new_filenode
+ *
+ * We do everything using FullTransactionIds and do not use MultiXactIds, so no
+ * freezing is required.
+ */
+void
+robert_relation_set_new_filenode(Relation rel,
+                                                                const RelFileNode *newrnode,
+                                                                char persistence,
+                                                                TransactionId *freezeXid,
+                                                                MultiXactId *minmulti)
+{
+       SMgrRelation srel;
+
+       /* No freezing required. */
+       *freezeXid = InvalidTransactionId;
+       *minmulti = InvalidMultiXactId;
+
+       /* Create main fork . */
+       srel = RelationCreateStorage(rel->rd_node, persistence);
+
+       /* If required, set up an init fork for an unlogged table. */
+       if (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
+       {
+               smgrcreate(srel, INIT_FORKNUM, false);
+               log_smgrcreate(&srel->smgr_rnode.node, INIT_FORKNUM);
+       }
+}
+
+/*
+ * robert_relation_nontransactional_truncate
+ */
+void
+robert_relation_nontransactional_truncate(Relation rel)
+{
+       RelationTruncate(rel, 0);
+}
+
+void
+robert_relation_copy_data(Relation rel, const RelFileNode *newrnode)
+{
+}
+
+void
+robert_relation_copy_for_cluster(Relation NewHeap,
+                                                                Relation OldHeap, Relation OldIndex,
+                                                                bool use_sort, TransactionId OldestXmin,
+                                                                TransactionId *xid_cutoff,
+                                                                MultiXactId *multi_cutoff,
+                                                                double *num_tuples, double *tups_vacuumed,
+                                                                double *tups_recently_dead)
+{
+}
+
+void
+robert_relation_vacuum(Relation onerel, VacuumParams *params,
+                                          BufferAccessStrategy bstrategy)
+{
+}
+
+bool
+robert_scan_analyze_next_block(TableScanDesc scan,
+                                                          BlockNumber blockno,
+                                                          BufferAccessStrategy bstrategy)
+{
+       return false;
+}
+
+bool
+robert_scan_analyze_next_tuple(TableScanDesc scan,
+                                                          TransactionId OldestXmin,
+                                                          double *liverows, double *deadrows,
+                                                          TupleTableSlot *slot)
+{
+       return false;
+}
+
+/*
+ * robert_index_build_range_scan
+ *
+ * XXX. There is a HUGE amount of duplication with the heap here.
+ */
+double
+robert_index_build_range_scan(Relation heapRelation,
+                                                         Relation indexRelation, IndexInfo *indexInfo,
+                                                         bool allow_sync, bool anyvisible, bool progress,
+                                                         BlockNumber start_blockno,
+                                                         BlockNumber numblocks,
+                                                         IndexBuildCallback callback,
+                                                         void *callback_state,
+                                                         TableScanDesc scan)
+{
+       RobertScanDesc hscan;
+       bool            is_system_catalog;
+       bool            checking_uniqueness;
+       Datum           values[INDEX_MAX_KEYS];
+       bool            isnull[INDEX_MAX_KEYS];
+       double          reltuples;
+       ExprState  *predicate;
+       TupleTableSlot *slot;
+       EState     *estate;
+       ExprContext *econtext;
+       Snapshot        snapshot;
+       bool            need_unregister_snapshot = false;
+       TransactionId OldestXmin;
+       BlockNumber     previous_blkno = InvalidBlockNumber;
+
+       /*
+        * sanity checks
+        */
+       Assert(OidIsValid(indexRelation->rd_rel->relam));
+
+       /* Remember if it's a system catalog */
+       is_system_catalog = IsSystemRelation(heapRelation);
+
+       /* See whether we're verifying uniqueness/exclusion properties */
+       checking_uniqueness = (indexInfo->ii_Unique ||
+                                                  indexInfo->ii_ExclusionOps != NULL);
+
+       /*
+        * "Any visible" mode is not compatible with uniqueness checks; make sure
+        * only one of those is requested.
+        */
+       Assert(!(anyvisible && checking_uniqueness));
+
+       /*
+        * Need an EState for evaluation of index expressions and partial-index
+        * predicates.  Also a slot to hold the current tuple.
+        */
+       estate = CreateExecutorState();
+       econtext = GetPerTupleExprContext(estate);
+       slot = table_slot_create(heapRelation, NULL);
+
+       /* Arrange for econtext's scan tuple to be the tuple under test */
+       econtext->ecxt_scantuple = slot;
+
+       /* Set up execution state for predicate, if any. */
+       predicate = ExecPrepareQual(indexInfo->ii_Predicate, estate);
+
+       /*
+        * Prepare for scan of the base relation.  In a normal index build, we use
+        * SnapshotAny because we must retrieve all tuples and do our own time
+        * qual checks (because we have to index RECENTLY_DEAD tuples). In a
+        * concurrent build, or during bootstrap, we take a regular MVCC snapshot
+        * and index whatever's live according to that.
+        */
+       OldestXmin = InvalidTransactionId;
+
+       /* okay to ignore lazy VACUUMs here */
+       if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
+               OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
+
+       if (!scan)
+       {
+               /*
+                * Serial index build.
+                *
+                * Must begin our own heap scan in this case.  We may also need to
+                * register a snapshot whose lifetime is under our direct control.
+                */
+               if (!TransactionIdIsValid(OldestXmin))
+               {
+                       snapshot = RegisterSnapshot(GetTransactionSnapshot());
+                       need_unregister_snapshot = true;
+               }
+               else
+                       snapshot = SnapshotAny;
+
+               scan = table_beginscan_strat(heapRelation,      /* relation */
+                                                                        snapshot,      /* snapshot */
+                                                                        0, /* number of keys */
+                                                                        NULL,  /* scan key */
+                                                                        true,  /* buffer access strategy OK */
+                                                                        allow_sync);   /* syncscan OK? */
+       }
+       else
+       {
+               /*
+                * Parallel index build.
+                *
+                * Parallel case never registers/unregisters own snapshot.  Snapshot
+                * is taken from parallel heap scan, and is SnapshotAny or an MVCC
+                * snapshot, based on same criteria as serial case.
+                */
+               Assert(!IsBootstrapProcessingMode());
+               Assert(allow_sync);
+               snapshot = scan->rs_snapshot;
+       }
+
+       hscan = (RobertScanDesc) scan;
+
+       /* Publish number of blocks to scan */
+       if (progress)
+       {
+               BlockNumber             nblocks;
+
+               if (hscan->rrs_base.rs_parallel != NULL)
+               {
+                       ParallelBlockTableScanDesc pbscan;
+
+                       pbscan = (ParallelBlockTableScanDesc) hscan->rrs_base.rs_parallel;
+                       nblocks = pbscan->phs_nblocks;
+               }
+               else
+                       nblocks = hscan->rrs_nblocks;
+
+               pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_TOTAL,
+                                                                        nblocks);
+       }
+
+       /*
+        * Must call GetOldestXmin() with SnapshotAny.  Should never call
+        * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
+        * this for parallel builds, since ambuild routines that support parallel
+        * builds must work these details out for themselves.)
+        */
+       Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
+       Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
+                  !TransactionIdIsValid(OldestXmin));
+       Assert(snapshot == SnapshotAny || !anyvisible);
+
+       /* set our scan endpoints */
+       if (!allow_sync)
+       {
+               hscan->rrs_startblock = start_blockno;
+               hscan->rrs_numblocks = numblocks;
+       }
+       else
+       {
+               /* syncscan can only be requested on whole relation */
+               Assert(start_blockno == 0);
+               Assert(numblocks == InvalidBlockNumber);
+       }
+
+       reltuples = 0;
+
+       /*
+        * Scan all tuples in the base relation.
+        */
+       while (robert_scan_getnextslot(scan, ForwardScanDirection, slot))
+       {
+               bool            tupleIsAlive;
+               HeapTupleData   htdata;
+               BlockNumber     blocks_done = robert_scan_get_blocks_done(hscan);
+
+               CHECK_FOR_INTERRUPTS();
+
+               /* Report scan progress, if asked to. */
+               if (progress && blocks_done != previous_blkno)
+               {
+                       pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
+                                                                                blocks_done);
+                       previous_blkno = blocks_done;
+               }
+
+               /* do our own time qual check */
+               /* XXX. This just completely ignores MVCC considerations. */
+               tupleIsAlive = true;
+               reltuples += 1;
+
+               MemoryContextReset(econtext->ecxt_per_tuple_memory);
+
+               /*
+                * In a partial index, discard tuples that don't satisfy the
+                * predicate.
+                */
+               if (predicate != NULL)
+               {
+                       if (!ExecQual(predicate, econtext))
+                               continue;
+               }
+
+               /*
+                * For the current heap tuple, extract all the attributes we use in
+                * this index, and note which are null.  This also performs evaluation
+                * of any expressions needed.
+                */
+               FormIndexDatum(indexInfo,
+                                          slot,
+                                          estate,
+                                          values,
+                                          isnull);
+
+               /*
+                * Call the AM's callback routine to process the tuple
+                *
+                * You'd think we should go ahead and build the index tuple here, but
+                * some index AMs want to do further processing on the data first.  So
+                * pass the values[] and isnull[] arrays, instead.
+                *
+                * XXX. Why the heck does this callback accept a HeapTuple?
+                */
+               htdata.t_data = NULL;
+               htdata.t_len = 0;
+               ItemPointerCopy(&slot->tts_tid, &htdata.t_self);
+               htdata.t_tableOid = RelationGetRelid(heapRelation);
+               callback(indexRelation, &htdata, values, isnull, tupleIsAlive,
+                                callback_state);
+
+               /* Stop if a block limit was specified and has been reached. */
+               if (numblocks != InvalidBlockNumber && blocks_done >= numblocks)
+                       break;
+       }
+
+       /* Report scan progress one last time. */
+       if (progress)
+       {
+               BlockNumber             blks_done;
+
+               if (hscan->rrs_base.rs_parallel != NULL)
+               {
+                       ParallelBlockTableScanDesc pbscan;
+
+                       pbscan = (ParallelBlockTableScanDesc) hscan->rrs_base.rs_parallel;
+                       blks_done = pbscan->phs_nblocks;
+               }
+               else
+                       blks_done = hscan->rrs_nblocks;
+
+               pgstat_progress_update_param(PROGRESS_SCAN_BLOCKS_DONE,
+                                                                        blks_done);
+       }
+
+       table_endscan(scan);
+
+       /* we can now forget our snapshot, if set and registered by us */
+       if (need_unregister_snapshot)
+               UnregisterSnapshot(snapshot);
+
+       ExecDropSingleTupleTableSlot(slot);
+
+       FreeExecutorState(estate);
+
+       /* These may have been pointing to the now-gone estate */
+       indexInfo->ii_ExpressionsState = NIL;
+       indexInfo->ii_PredicateState = NULL;
+
+       return reltuples;
+}
+
+void
+robert_index_validate_scan(Relation heap_rel, Relation index_rel,
+                                                  IndexInfo *index_info, Snapshot snapshot,
+                                                  ValidateIndexState *state)
+{
+}
diff --git a/src/backend/access/robert/robert_dml.c b/src/backend/access/robert/robert_dml.c
new file mode 100644 (file)
index 0000000..258a6b1
--- /dev/null
@@ -0,0 +1,409 @@
+/*
+ * robert_dml.c
+ */
+
+#include "postgres.h"
+
+#include "access/robert_page.h"
+#include "access/robert_slot.h"
+#include "access/robert_xlog.h"
+#include "access/robertam.h"
+#include "access/undoaccess.h"
+#include "access/undorecord.h"
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "storage/freespace.h"
+#include "storage/procarray.h"
+
+typedef enum
+{
+       CHOOSE_BLOCK_WITH_FREESPACE,
+       CHOOSE_LAST_BLOCK,
+       EXTEND_RELATION
+} BufferForTupleStrategy;
+
+static BlockNumber
+robert_choose_block(Relation rel, int len, BufferForTupleStrategy *strategy)
+{
+       BlockNumber             blkno;
+
+       switch (*strategy)
+       {
+               case CHOOSE_BLOCK_WITH_FREESPACE:
+                       blkno = GetPageWithFreeSpace(rel, len + 0.04 * BLCKSZ); /* XXX */
+                       if (blkno != InvalidBlockNumber)
+                               return blkno;
+                       *strategy = CHOOSE_LAST_BLOCK;
+                       /* FALLTHROUGH */
+
+               case CHOOSE_LAST_BLOCK:
+                       blkno = RelationGetNumberOfBlocks(rel);
+                       *strategy = EXTEND_RELATION;
+                       if (blkno > 0)
+                               return blkno - 1;
+                       /* FALLTHROUGH */
+
+               case EXTEND_RELATION:
+                       return P_NEW;
+       }
+
+       pg_unreachable();
+}
+
+/*
+ * Attempt to insert a tuple into a specified block of a relation.
+ *
+ * If the page is too full, either in terms of storage or of line pointers,
+ * then this function will do nothing and return false. Otherwise, it will
+ * insert the tuple and return true.
+ */
+static bool
+robert_tuple_try_insert(Relation rel, BlockNumber blkno, RobertTuple tuple,
+                                               CommandId cid, int options)
+{
+       Buffer          buffer = ReadBuffer(rel, blkno);
+       Page            page = BufferGetPage(buffer);
+       OffsetNumber    offnum;
+       UndoRecordInsertContext context = {{0}};
+       UnpackedUndoRecord      undorecord;
+       FullTransactionId       fxid;
+
+       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+       /*
+        * Choose an offset number at which to insert this tuple.
+        *
+        * If this is a new page, always use FirstOffsetNumber. Otherwise, let
+        * robert_page_free_offset find us a usable offset number.
+        */
+       if (blkno == P_NEW)
+       {
+               /* Initialize new page. */
+               PageInit(page, BufferGetPageSize(buffer), 0);
+
+               /* Find real page number. */
+               blkno = BufferGetBlockNumber(buffer);
+
+               /* All offsets available, so use the first one. */
+               offnum = FirstOffsetNumber;
+       }
+       else
+       {
+               /* Look for a usable offset. */
+               offnum = robert_page_free_offset(page, tuple->r_len);
+               if (offnum == InvalidOffsetNumber)
+               {
+                       /* This page is not usable for this tuple; e.g. too full. */
+                       UnlockReleaseBuffer(buffer);
+                       return false;
+               }
+       }
+
+       /* Prepare undo record. */
+       fxid = GetTopFullTransactionId();
+       undorecord.uur_rmid = RM_ROBERT_ID;
+       undorecord.uur_type = ROBERT_UNDO_INSERT;
+       undorecord.uur_info = 0;
+       undorecord.uur_reloid = RelationGetRelid(rel);
+       undorecord.uur_cid = GetCurrentCommandId(true);
+       undorecord.uur_fork = MAIN_FORKNUM;
+       undorecord.uur_prevundo = InvalidUndoRecPtr;
+       undorecord.uur_block = blkno;
+       undorecord.uur_offset = offnum;
+       undorecord.uur_fxid = fxid;
+       undorecord.uur_payload.len = 0;
+       undorecord.uur_tuple.len = 0;
+       undorecord.uur_txn = NULL;
+       undorecord.uur_logswitch = NULL;
+
+       /* Prepare to insert the undo record and update the tuple control data. */
+       BeginUndoRecordInsert(&context, UndoLogCategoryForRelation(rel), 1, NULL);
+       tuple->r_tableOid = undorecord.uur_reloid;
+       ItemPointerSet(&tuple->r_self, blkno, offnum);
+       tuple->r_data->r_undoptr = PrepareUndoInsert(&context, &undorecord,
+                                                                                                MyDatabaseId);
+
+       /* Perform the actual insert. */
+       START_CRIT_SECTION();
+       robert_page_add_item(page, offnum, tuple);
+       MarkBufferDirty(buffer);
+       InsertPreparedUndo(&context);
+       END_CRIT_SECTION();
+
+       /* All done. */
+       FinishUndoRecordInsert(&context);
+       UnlockReleaseBuffer(buffer);
+       return true;
+}
+
+/*
+ * robert_tuple_insert
+ *
+ * Insert a single tuple.
+ */
+void
+robert_tuple_insert(Relation rel, TupleTableSlot *slot,
+                                       CommandId cid, int options,
+                                       BulkInsertStateData *bistate)
+{
+       RobertTuple             tuple = ((RobertTupleTableSlot *) slot)->tuple;
+       bool                    done = false;
+       BufferForTupleStrategy strategy = CHOOSE_BLOCK_WITH_FREESPACE;
+
+       tuple = robert_toast_tuple(rel, slot, options);
+
+       while (!done)
+       {
+               BlockNumber             blkno;
+
+               blkno = robert_choose_block(rel, tuple->r_len, &strategy);
+               done = robert_tuple_try_insert(rel, blkno, tuple, cid, options);
+       }
+
+       elog(NOTICE, "insert %u (%u,%u): %s",
+                RelationGetRelid(rel),
+                ItemPointerGetBlockNumber(&tuple->r_self),
+                ItemPointerGetOffsetNumber(&tuple->r_self),
+                robert_print_tuple(tuple, slot->tts_tupleDescriptor));
+}
+
+void
+robert_tuple_insert_speculative(Relation rel, TupleTableSlot *slot,
+                                                               CommandId cid, int options,
+                                                               BulkInsertStateData *bistate,
+                                                               uint32 specToken)
+{
+}
+
+void
+robert_tuple_complete_speculative(Relation rel, TupleTableSlot *slot,
+                                                                 uint32 specToken, bool succeeded)
+{
+}
+
+void
+robert_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+                                       CommandId cid, int options,
+                                       BulkInsertStateData *bistate)
+{
+}
+
+/*
+ * Check whether it's OK to modify a tuple.
+ *
+ * This function determines whether or not it's OK for us to modify a given
+ * tuple, similar to HeapTupleSatisfiesUpdate.
+ */
+static TM_Result
+robert_check_modify(UndoRecPtr undoptr, Snapshot snapshot, CommandId cid)
+{
+       UndoRecordFetchContext context;
+       UnpackedUndoRecord *uur;
+       TM_Result       result = TM_Ok;
+       /* XXX bool             recheck_for_invisible = false; */
+       TransactionId   uxid;
+       CommandId       ucid;
+       uint8           utype;
+
+       BeginUndoFetch(&context);
+
+       uur = UndoFetchRecord(&context, undoptr);
+       utype = uur->uur_type;
+       uxid = XidFromFullTransactionId(uur->uur_fxid);
+       ucid = uur->uur_cid;
+       UndoRecordRelease(uur);
+
+       if (TransactionIdIsCurrentTransactionId(uxid))
+       {
+               /* XXX. Not always. */
+               result = TM_SelfModified;
+       }
+       else if (TransactionIdIsInProgress(uxid))
+       {
+               /* XXX Do stuff. */
+       }
+       else if (TransactionIdDidCommit(uxid))
+       {
+               /*
+                * XXX. Is the snapshot guaranteed to be an MVCC snapshot?  If not,
+                * what are we supposed to do in that case?
+                */
+               Assert(IsMVCCSnapshot(snapshot));
+               /* XXX. If XID is visible, then do stuff, otherwise different stuff? */
+       }
+       else
+       {
+               /* It must have aborted but not yet been cleaned up. */
+       }
+
+       /*
+        * XXX. We need to look up uur.uur_prevundo here, but we probably
+        * don't want to do that while holding the buffer lock.
+        *
+        * We need an equivalent of HeapTupleSatisfiesUpdate. Some notes on
+        * that:
+        *
+        * If the undo record to which uur.uur_prevundo points doesn't exist any
+        * more or if the XID/CID of the returned record are visible to our
+        * snapshot, then TM_Ok.  If the XID is our XID and the tuple isn't
+        * visible because the CID is too new, then TM_SelfModified (unless we
+        * couldn't see the old version of the row either, in which case
+        * TM_Invisible).  Otherwise, if the XID is still in progress, then
+        * TM_BeingModified (unless we couldn't see the old version of the row
+        * either, in which case TM_Invisible). If it's aborted, then do
+        * page-at-a-time undo and try again.
+        *
+        * Otherwise, it's committed but not visible to our snapshot. In that
+        * case, the answer must be TM_Invisible if the operation was an insert
+        * (and we should throw an error instead of returning TM_Invisible to the
+        * caller), TM_Updated if it was an update, or TM_Deleted if it was a
+        * delete.
+        *
+        * The discussion above only considers insert, update, and delete. If
+        * there were locks, we'd need to return TM_BeingModified if any existing
+        * lock taken by an XID other than our own conflicts with the lock
+        * required by the current operation.
+        */
+
+       return result;
+}
+
+/*
+ * robert_tuple_delete
+ *
+ * Delete a single tuple.
+ */
+TM_Result
+robert_tuple_delete(Relation rel, ItemPointer tid,
+                                       CommandId cid, Snapshot snapshot, Snapshot crosscheck,
+                                       bool wait, TM_FailureData *tmfd, bool changingPart)
+{
+       Buffer          buffer;
+       ItemId          iid;
+       UndoRecordInsertContext context = {{0}};
+       UnpackedUndoRecord      undorecord;
+       FullTransactionId       fxid;
+       UndoRecPtr      undoptr;
+       UndoRecPtr      checked_undoptr = InvalidUndoRecPtr;
+       RobertTupleHeader       td;
+       Page            page;
+
+       /*
+        * Prepare undo record contents, except for the previous block pointer,
+        * which we can't finalize without taking the buffer lock.
+        */
+       fxid = GetTopFullTransactionId();
+       undorecord.uur_rmid = RM_ROBERT_ID;
+       undorecord.uur_type = ROBERT_UNDO_DELETE;
+       undorecord.uur_info = 0;
+       undorecord.uur_reloid = RelationGetRelid(rel);
+       undorecord.uur_cid = GetCurrentCommandId(true);
+       undorecord.uur_fork = MAIN_FORKNUM;
+       undorecord.uur_prevundo = InvalidUndoRecPtr;
+       undorecord.uur_block = ItemPointerGetBlockNumber(tid);
+       undorecord.uur_offset = ItemPointerGetOffsetNumber(tid);
+       undorecord.uur_fxid = fxid;
+       undorecord.uur_payload.len = 0;
+       undorecord.uur_tuple.len = 0;
+       undorecord.uur_txn = NULL;
+       undorecord.uur_logswitch = NULL;
+
+       /* Pin the target buffer. */
+       buffer = ReadBuffer(rel, undorecord.uur_block);
+       page = BufferGetPage(buffer);
+
+       while (1)
+       {
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+               /*
+                * Prepare undo chain.
+                *
+                * The tuple's old undo pointer needs to be stored in the undo record
+                * we're about to insert, and the undo pointer we're about to insert
+                * will need to be stored into the page.
+                */
+               iid = PageGetItemId((PageHeader) page, undorecord.uur_offset);
+               td = (RobertTupleHeader) PageGetItem(page, iid);
+               memcpy(&undorecord.uur_prevundo, &td->r_undoptr, sizeof(UndoRecPtr));
+
+               /*
+                * Before actually performing the deletion, we must check whether it's
+                * really OK: somebody else could have updated the tuple and, if so,
+                * the version visible to our snapshot might no longer be the latest
+                * version.
+                *
+                * XXX. This code doesn't know anything about tuple locks, and if there
+                * are any of those, we need to wait for them, too.
+                */
+               if (undorecord.uur_prevundo != checked_undoptr &&
+                       !UndoRecPtrIsDiscarded(undorecord.uur_prevundo))
+               {
+                       TM_Result       result;
+
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       result = robert_check_modify(undorecord.uur_prevundo, snapshot,
+                                                                                cid);
+                       if (result != TM_Ok)
+                       {
+                               ReleaseBuffer(buffer);
+                               return result;
+                       }
+                       checked_undoptr = undorecord.uur_prevundo;
+                       continue;
+               }
+
+               /* Looks OK, so proceed with deletion. */
+               break;
+       }
+
+       /*
+        * Before entering the critical section, prepare for 1 undo insertion
+        * and stage the record to be inserted.
+        */
+       BeginUndoRecordInsert(&context, UndoLogCategoryForRelation(rel), 1, NULL);
+       undoptr = PrepareUndoInsert(&context, &undorecord, MyDatabaseId);
+
+       /*
+        * Perform the actual delete.
+        *
+        * Note that we use ItemIdMarkDead here, not ItemIdSetDead. The storage
+        * has to remain, because the tuple is still visible to concurrent
+        * transactions.
+        */
+       START_CRIT_SECTION();
+       ItemIdMarkDead(iid);
+       MarkBufferDirty(buffer);
+       memcpy(&td->r_undoptr, &undoptr, sizeof(UndoRecPtr));
+       InsertPreparedUndo(&context);
+       END_CRIT_SECTION();
+
+       /* All done. */
+       FinishUndoRecordInsert(&context);
+       UnlockReleaseBuffer(buffer);
+       return TM_Ok;
+}
+
+TM_Result
+robert_tuple_update(Relation rel, ItemPointer otid,
+                                       TupleTableSlot *slot, CommandId cid, Snapshot snapshot,
+                                       Snapshot crosscheck, bool wait, TM_FailureData *tmfd,
+                                       LockTupleMode *lockmode, bool *update_indexes)
+{
+       return TM_Ok;
+}
+
+TM_Result
+robert_tuple_lock(Relation rel, ItemPointer tid,
+                                 Snapshot snapshot, TupleTableSlot *slot, CommandId cid,
+                                 LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags,
+                                 TM_FailureData *tmfd)
+{
+       return TM_Ok;
+}
+
+void
+robert_finish_bulk_insert(Relation rel, int options)
+{
+}
diff --git a/src/backend/access/robert/robert_page.c b/src/backend/access/robert/robert_page.c
new file mode 100644 (file)
index 0000000..fe826e6
--- /dev/null
@@ -0,0 +1,90 @@
+/*-------------------------------------------------------------------------
+ *
+ * robert_page.c
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/access/common/robert_page.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/robert_page.h"
+#include "access/robert_tuple.h"
+
+/*
+ * robert_page_add_item
+ *
+ * Insert a tuple into a page.
+ */
+void
+robert_page_add_item(Page page, OffsetNumber offnum, RobertTuple tuple)
+{
+       Item                    item = (Item) tuple->r_data;
+       Size                    size = tuple->r_len;
+       PageHeader              phdr = (PageHeader) page;
+       ItemId                  iid = PageGetItemId(phdr, offnum);
+       OffsetNumber    maxoff = PageGetMaxOffsetNumber(page) + 1;
+       Size                    lower = phdr->pd_lower;
+       Size                    upper = (Size) phdr->pd_upper - size;
+
+       Assert(offnum <= maxoff);
+
+       if (offnum == maxoff)
+               lower += sizeof(ItemIdData);
+
+       Assert(lower <= upper);
+
+       memcpy((char *) page + upper, item, size);
+       phdr->pd_lower = lower;
+       phdr->pd_upper = upper;
+       ItemIdSetNormal(iid, upper, size);
+}
+
+/*
+ * robert_page_free_offset
+ *
+ * Returns an offset at which a tuple of a given size can be inserted into the
+ * given page.  If there are no unused line pointers and no more can be added,
+ * or if the item isn't going to fit on the page, returns InvalidOffsetNumber.
+ */
+OffsetNumber
+robert_page_free_offset(Page page, Size size)
+{
+       PageHeader              phdr = (PageHeader) page;
+       OffsetNumber    maxoff = PageGetMaxOffsetNumber(page);
+       OffsetNumber    offnum;
+       Size                    avail = phdr->pd_upper - phdr->pd_lower;
+
+       if (unlikely(phdr->pd_upper > BLCKSZ || phdr->pd_lower > phdr->pd_upper))
+               ereport(ERROR,
+                               (errcode(ERRCODE_DATA_CORRUPTED),
+                                errmsg("corrupted page header")));
+
+       if (avail < size)
+               return InvalidOffsetNumber;
+
+       for (offnum = FirstOffsetNumber; offnum <= maxoff;
+                offnum = OffsetNumberNext(offnum))
+       {
+               ItemId  iid = PageGetItemId(phdr, offnum);
+
+               if (!ItemIdIsUsed(iid))
+                       return offnum;
+       }
+
+       /*
+        * XXX. It's just plain crummy that we have to use a heap-specific
+        * constant for this, but there is plenty of code that thinks this is a
+        * universal limit rather than a heap-specific one.
+        */
+       if (avail < sizeof(ItemIdData) + size || offnum >= MaxHeapTuplesPerPage)
+               return InvalidOffsetNumber;
+
+       return offnum;
+}
diff --git a/src/backend/access/robert/robert_scan.c b/src/backend/access/robert/robert_scan.c
new file mode 100644 (file)
index 0000000..346a482
--- /dev/null
@@ -0,0 +1,632 @@
+/*
+ * robert_scan.c
+ */
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/robert_scan.h"
+#include "access/robert_slot.h"
+#include "access/tableam.h"
+#include "access/skey.h"
+#include "access/undoaccess.h"
+#include "pgstat.h"
+#include "storage/bufmgr.h"
+#include "storage/predicate.h"
+
+typedef struct IndexFetchRobertData
+{
+       IndexFetchTableData xs_base;
+       Buffer          xs_cbuf;                /* current buffer, if any */
+} IndexFetchRobertData;
+
+static void robert_scan_initialize(RobertScanDesc scan, ScanKey key,
+                                                                  bool keep_startblock);
+static void robert_scan_getnextblock(RobertScanDesc scan,
+                                                                        ScanDirection direction,
+                                                                        BlockNumber blkno);
+static bool robert_scan_getnexttuple(RobertScanDesc scan,
+                                                                        TupleTableSlot *slot);
+
+/*
+ * Prepare to scan a robert table.
+ */
+TableScanDesc
+robert_scan_begin(Relation rel, Snapshot snapshot,
+                                 int nkeys, ScanKeyData *key,
+                                 ParallelTableScanDesc pscan, uint32 flags)
+{
+       RobertScanDesc scan;
+
+       /* Out of sheer paranoia, hold a ref count while scanning relation. */
+       RelationIncrementReferenceCount(rel);
+
+       /* Allocate and initialize scan descriptor. */
+       scan = palloc(sizeof(RobertScanDescData));
+       scan->rrs_base.rs_rd = rel;
+       scan->rrs_base.rs_snapshot = snapshot;
+       scan->rrs_base.rs_nkeys = nkeys;
+       if (key == NULL)
+               scan->rrs_base.rs_key = NULL;
+       else
+       {
+               scan->rrs_base.rs_key = palloc(nkeys * sizeof(ScanKeyData));
+               memcpy(scan->rrs_base.rs_key, key, nkeys * sizeof(ScanKeyData));
+       }
+       scan->rrs_base.rs_parallel = pscan;
+       scan->rrs_base.rs_flags = flags;
+       scan->rrs_cpage = palloc(BLCKSZ);
+       scan->rrs_strategy = NULL;
+       robert_scan_initialize(scan, key, false);
+
+       /* For sequential and sample scans, just predicate-lock the whole thing. */
+       if ((scan->rrs_base.rs_flags & (SO_TYPE_SEQSCAN | SO_TYPE_SAMPLESCAN)) != 0)
+       {
+               Assert(snapshot);
+               PredicateLockRelation(rel, snapshot);
+       }
+
+       return &scan->rrs_base;
+}
+
+/*
+ * Clean up after completing a scan of a robert table.
+ */
+void
+robert_scan_end(TableScanDesc sscan)
+{
+       RobertScanDesc scan = (RobertScanDesc) sscan;
+
+       RelationDecrementReferenceCount(scan->rrs_base.rs_rd);
+
+       if (scan->rrs_base.rs_key)
+               pfree(scan->rrs_base.rs_key);
+       if (scan->rrs_base.rs_flags & SO_TEMP_SNAPSHOT)
+               UnregisterSnapshot(scan->rrs_base.rs_snapshot);
+       pfree(scan->rrs_cpage);
+       pfree(scan);
+}
+
+void
+robert_scan_rescan(TableScanDesc sscan, ScanKeyData *key,
+                                  bool set_params, bool allow_strat,
+                                  bool allow_sync, bool allow_pagemode)
+{
+}
+
+/*
+ * robert_scan_initialize
+ *
+ * Perform common initialization required by both robert_scan_begin and
+ * robert_scan_rescan.
+ *
+ * XXX. An awful lot of this logic duplicates initscan().  Instead of copying
+ * the code, we should try to have common code that can be used by any
+ * block-based table AM.
+ */
+static void
+robert_scan_initialize(RobertScanDesc scan, ScanKey key, bool keep_startblock)
+{
+       ParallelBlockTableScanDesc bpscan = NULL;
+       bool            allow_strat;
+       bool            allow_sync;
+
+       /*
+        * Determine the number of blocks we need to scan.
+        *
+        * If the relation is extended, the new tuples won't be visible to this
+        * scan anyway (except for non-MVCC scans, which are unstable anyway).
+        */
+       if (scan->rrs_base.rs_parallel != NULL)
+       {
+               bpscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel;
+               scan->rrs_nblocks = bpscan->phs_nblocks;
+       }
+       else
+               scan->rrs_nblocks = RelationGetNumberOfBlocks(scan->rrs_base.rs_rd);
+
+       /*
+        * This algorithm may not be for the best, but it is the one used by the
+        * 'heap' table AM, so we'll stick with it for the sake of tradition.
+        */
+       if (!RelationUsesLocalBuffers(scan->rrs_base.rs_rd) &&
+               scan->rrs_nblocks > NBuffers / 4)
+       {
+               allow_strat = (scan->rrs_base.rs_flags & SO_ALLOW_STRAT) != 0;
+               allow_sync = (scan->rrs_base.rs_flags & SO_ALLOW_SYNC) != 0;
+       }
+       else
+               allow_strat = allow_sync = false;
+
+       if (allow_strat)
+       {
+               /* During a rescan, keep the previous strategy object. */
+               if (scan->rrs_strategy == NULL)
+                       scan->rrs_strategy = GetAccessStrategy(BAS_BULKREAD);
+       }
+       else
+       {
+               if (scan->rrs_strategy != NULL)
+                       FreeAccessStrategy(scan->rrs_strategy);
+               scan->rrs_strategy = NULL;
+       }
+
+       if (scan->rrs_base.rs_parallel != NULL)
+       {
+               /* For parallel scan, believe whatever ParallelTableScanDesc says. */
+               if (scan->rrs_base.rs_parallel->phs_syncscan)
+                       scan->rrs_base.rs_flags |= SO_ALLOW_SYNC;
+               else
+                       scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+       }
+       else if (keep_startblock)
+       {
+               /*
+                * When rescanning, we want to keep the previous startblock setting,
+                * so that rewinding a cursor doesn't generate surprising results.
+                * Reset the active syncscan setting, though.
+                */
+               if (allow_sync && synchronize_seqscans)
+                       scan->rrs_base.rs_flags |= SO_ALLOW_SYNC;
+               else
+                       scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+       }
+       else if (allow_sync && synchronize_seqscans)
+       {
+               scan->rrs_base.rs_flags |= SO_ALLOW_SYNC;
+               /* XXX ss_get_location is in heapam.h */
+               scan->rrs_startblock =
+                       ss_get_location(scan->rrs_base.rs_rd, scan->rrs_nblocks);
+       }
+       else
+       {
+               scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+               scan->rrs_startblock = 0;
+       }
+
+       scan->rrs_numblocks = InvalidBlockNumber;
+       scan->rrs_state = ROBERT_SCAN_NOT_STARTED;
+       scan->rrs_cblock = InvalidBlockNumber;
+}
+
+/*
+ * robert_scan_getnextslot
+ *
+ * Get the next tuple from a sequential scan and store it into the given slot.
+ *
+ * XXX. The name of this method does not make it abundantly clear that it only
+ * applies to sequential scans, but an examination of the heap code shows that
+ * to be the case.
+ */
+bool
+robert_scan_getnextslot(TableScanDesc sscan,
+                                               ScanDirection direction,
+                                               TupleTableSlot *slot)
+{
+       RobertScanDesc scan = (RobertScanDesc) sscan;
+
+       /* Initialize the scan if that's not yet done. */
+       if (scan->rrs_state == ROBERT_SCAN_NOT_STARTED)
+       {
+               BlockNumber blkno;
+
+               /* If there's nothing to scan, give up immediately. */
+               if (scan->rrs_nblocks == 0 || scan->rrs_numblocks == 0)
+               {
+                       ExecClearTuple(slot);
+                       return false;
+               }
+
+               /* Figure out which block to read first. */
+               if (ScanDirectionIsBackward(direction))
+               {
+                       /* don't report syncscans when scanning backwards */
+                       scan->rrs_base.rs_flags &= ~SO_ALLOW_SYNC;
+
+                       /* start from last page of the scan */
+                       if (scan->rrs_startblock > 0)
+                               blkno = scan->rrs_startblock - 1;
+                       else
+                               blkno = scan->rrs_nblocks - 1;
+               }
+               else if (ScanDirectionIsNoMovement(direction))
+               {
+                       /* no prior tuple, so refetch yields nothing */
+                       ExecClearTuple(slot);
+                       return false;
+               }
+               else if (scan->rrs_base.rs_parallel != NULL)
+               {
+                       ParallelBlockTableScanDesc pbscan;
+
+                       pbscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel;
+                       table_block_parallelscan_startblock_init(scan->rrs_base.rs_rd,
+                                                                                                        pbscan);
+                       blkno = table_block_parallelscan_nextpage(scan->rrs_base.rs_rd,
+                                                                                                         pbscan);
+                       if (blkno == InvalidBlockNumber)
+                       {
+                               /* other participants already finished scan */
+                               ExecClearTuple(slot);
+                               return false;
+                       }
+               }
+               else
+                       blkno = scan->rrs_startblock;
+
+               /* Read the chosen block. */
+               robert_scan_getnextblock(scan, direction, blkno);
+       }
+
+       while (1)
+       {
+               if (scan->rrs_state == ROBERT_SCAN_TUPLE_DONE)
+               {
+                       /* Advance to next tuple. */
+                       if (ScanDirectionIsBackward(direction))
+                       {
+                               if (scan->rrs_coffset == FirstOffsetNumber)
+                                       scan->rrs_state = ROBERT_SCAN_BLOCK_DONE;
+                               else
+                               {
+                                       scan->rrs_coffset = OffsetNumberPrev(scan->rrs_coffset);
+                                       scan->rrs_state = ROBERT_SCAN_READY;
+                               }
+                       }
+                       else if (ScanDirectionIsForward(direction))
+                       {
+                               if (scan->rrs_coffset == scan->rrs_lastoffset)
+                                       scan->rrs_state = ROBERT_SCAN_BLOCK_DONE;
+                               else
+                               {
+                                       scan->rrs_coffset = OffsetNumberNext(scan->rrs_coffset);
+                                       scan->rrs_state = ROBERT_SCAN_READY;
+                               }
+                       }
+                       else
+                       {
+                               Assert(ScanDirectionIsNoMovement(direction));
+                               /* nothing to do */
+                       }
+               }
+               else if (scan->rrs_state == ROBERT_SCAN_BLOCK_DONE)
+               {
+                       bool            finished;
+                       BlockNumber blkno;
+
+                       /* Select new block. */
+                       if (ScanDirectionIsBackward(direction))
+                       {
+                               finished = (scan->rrs_cblock == scan->rrs_startblock) ||
+                                       (scan->rrs_numblocks != InvalidBlockNumber ?
+                                        --scan->rrs_numblocks == 0 : false);
+                               blkno = (scan->rrs_cblock == 0) ? scan->rrs_nblocks :
+                                       scan->rrs_cblock - 1;
+                       }
+                       else if (scan->rrs_base.rs_parallel != NULL)
+                       {
+                               ParallelBlockTableScanDesc pbscan;
+
+                               pbscan = (ParallelBlockTableScanDesc)
+                                       scan->rrs_base.rs_parallel;
+                               blkno = table_block_parallelscan_nextpage(scan->rrs_base.rs_rd,
+                                                                                                                 pbscan);
+                               finished = (blkno == InvalidBlockNumber);
+                       }
+                       else
+                       {
+                               blkno = scan->rrs_cblock + 1;
+                               if (blkno >= scan->rrs_nblocks)
+                                       blkno = 0;
+                               finished = (blkno == scan->rrs_startblock) ||
+                                       (scan->rrs_numblocks != InvalidBlockNumber ?
+                                        --scan->rrs_numblocks == 0 : false);
+                               if (scan->rrs_base.rs_flags & SO_ALLOW_SYNC)
+                                       ss_report_location(scan->rrs_base.rs_rd, blkno);
+                       }
+
+                       /* Scan is done if no blocks remain. */
+                       if (finished)
+                       {
+                               scan->rrs_cblock = InvalidBlockNumber;
+                               scan->rrs_state = ROBERT_SCAN_NOT_STARTED;
+                               ExecClearTuple(slot);
+                               return false;
+                       }
+
+                       /* Read the chosen block. */
+                       robert_scan_getnextblock(scan, direction, blkno);
+               }
+               else
+               {
+                       Assert(scan->rrs_state == ROBERT_SCAN_READY);
+                       scan->rrs_state = ROBERT_SCAN_TUPLE_DONE;
+
+                       if (!robert_scan_getnexttuple(scan, slot))
+                               continue;
+                       pgstat_count_heap_getnext(scan->rrs_base.rs_rd);
+                       return true;
+               }
+       }
+}
+
+/*
+ * robert_scan_getnextblock
+ *
+ * Read the indicated block of the relation and update the scan state
+ * accordingly.
+ */
+static void
+robert_scan_getnextblock(RobertScanDesc scan, ScanDirection direction,
+                                                BlockNumber blkno)
+{
+       Buffer          buffer;
+
+       /* Copy the new page and remember the page number. */
+       buffer = ReadBufferExtended(scan->rrs_base.rs_rd, MAIN_FORKNUM, blkno,
+                                                               RBM_NORMAL, scan->rrs_strategy);
+       LockBuffer(buffer, BUFFER_LOCK_SHARE);
+       memcpy(scan->rrs_cpage, BufferGetPage(buffer), BLCKSZ);
+       UnlockReleaseBuffer(buffer);
+       scan->rrs_cblock = blkno;
+
+       /* Determine how many tuples there may be on the page. */
+       scan->rrs_lastoffset = PageGetMaxOffsetNumber(scan->rrs_cpage);
+
+       /* Determine starting scan position. */
+       if (ScanDirectionIsBackward(direction))
+               scan->rrs_coffset = scan->rrs_lastoffset;
+       else
+               scan->rrs_coffset = FirstOffsetNumber;
+
+       /* If no tuples, tell caller to request the next block. */
+       if (scan->rrs_lastoffset == 0)
+               scan->rrs_state = ROBERT_SCAN_BLOCK_DONE;
+       else
+               scan->rrs_state = ROBERT_SCAN_READY;
+}
+
+/*
+ * robert_scan_getnexttuple
+ *
+ * The scan contains a page (scan->rrs_cpage) and identifies a particular
+ * tuple of interest (scan->rrs_coffset). Extract that tuple and store it
+ * into the given slot.
+ */
+static bool
+robert_scan_getnexttuple(RobertScanDesc scan, TupleTableSlot *slot)
+{
+       ItemId          iid = PageGetItemId(scan->rrs_cpage, scan->rrs_coffset);
+       RobertTupleHeader td;
+       Size            len;
+
+       /* If the item has no storage, it is definitely not visible. */
+       if (!ItemIdHasStorage(iid))
+               return false;
+
+       /* Extract tuple pointer and length. */
+       td = (RobertTupleHeader) PageGetItem(scan->rrs_cpage, iid);
+       len = ItemIdGetLength(iid);
+
+       /* Return visible version of tuple. */
+       return robert_slot_store_visible(slot, td, len, slot->tts_tableOid,
+                                                                        scan->rrs_cblock, scan->rrs_coffset,
+                                                                        scan->rrs_base.rs_snapshot,
+                                                                        ItemIdIsDead(iid));
+}
+
+/*
+ * robert_scan_get_blocks_done
+ *
+ * Return the number of blocks that have been read by this scan since
+ * starting. For a non-parallel scan, this should be completely accurate.
+ * For a parallel scan, it will discount the effects of other backends.
+ */
+BlockNumber
+robert_scan_get_blocks_done(RobertScanDesc scan)
+{
+       ParallelBlockTableScanDesc bpscan = NULL;
+       BlockNumber startblock;
+       BlockNumber blocks_done;
+
+       if (scan->rrs_base.rs_parallel != NULL)
+       {
+               bpscan = (ParallelBlockTableScanDesc) scan->rrs_base.rs_parallel;
+               startblock = bpscan->phs_startblock;
+       }
+       else
+               startblock = scan->rrs_startblock;
+
+       /*
+        * Might have wrapped around the end of the relation, if startblock was
+        * not zero.
+        */
+       if (scan->rrs_cblock > startblock)
+               blocks_done = scan->rrs_cblock - startblock;
+       else
+       {
+               BlockNumber nblocks;
+
+               nblocks = bpscan != NULL ? bpscan->phs_nblocks : scan->rrs_nblocks;
+               blocks_done = nblocks - startblock + scan->rrs_cblock;
+       }
+
+       /* If we're also done with the current block, add one. */
+       if (scan->rrs_state == ROBERT_SCAN_BLOCK_DONE)
+               blocks_done++;
+
+       return blocks_done;
+}
+
+/*
+ * robert_index_fetch_begin
+ *
+ * Prepare for index fetches by allocating a new IndexFetchRobertData.
+ */
+extern IndexFetchTableData *
+robert_index_fetch_begin(Relation rel)
+{
+       IndexFetchRobertData *scan = palloc0(sizeof(IndexFetchRobertData));
+
+       scan->xs_base.rel = rel;
+       scan->xs_cbuf = InvalidBuffer;
+
+       return &scan->xs_base;
+}
+
+/*
+ * robert_index_fetch_reset
+ *
+ * Release any buffer pin held from a previous index fetch.
+ */
+void
+robert_index_fetch_reset(IndexFetchTableData *data)
+{
+       IndexFetchRobertData *scan = (IndexFetchRobertData *) data;
+
+       if (BufferIsValid(scan->xs_cbuf))
+       {
+               ReleaseBuffer(scan->xs_cbuf);
+               scan->xs_cbuf = InvalidBuffer;
+       }
+}
+
+/*
+ * robert_index_fetch_end
+ *
+ * Clean up when finished with index fetches.
+ */
+void
+robert_index_fetch_end(IndexFetchTableData *data)
+{
+       robert_index_fetch_reset(data);
+       pfree(data);
+}
+
+/*
+ * robert_index_fetch_tuple
+ *
+ * XXX. This function is missing MVCC handling.
+ */
+bool
+robert_index_fetch_tuple(IndexFetchTableData *data,
+                                                ItemPointer tid, Snapshot snapshot,
+                                                TupleTableSlot *slot,
+                                                bool *call_again, bool *all_dead)
+{
+       IndexFetchRobertData *scan = (IndexFetchRobertData *) data;
+       Oid                     reloid = RelationGetRelid(scan->xs_base.rel);
+       BlockNumber blkno = ItemPointerGetBlockNumber(tid);
+       OffsetNumber offnum = ItemPointerGetOffsetNumber(tid);
+       Page            page;
+       bool            result = false;
+
+       scan->xs_cbuf =
+               ReleaseAndReadBuffer(scan->xs_cbuf, scan->xs_base.rel, blkno);
+       *call_again = false;
+       page = BufferGetPage(scan->xs_cbuf);
+
+       LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
+       if (offnum >= FirstOffsetNumber && offnum < PageGetMaxOffsetNumber(page))
+       {
+               ItemId          iid = PageGetItemId(page, offnum);
+
+               if (ItemIdHasStorage(iid))
+               {
+                       RobertTupleHeader td = (RobertTupleHeader) PageGetItem(page, iid);
+                       Size            len = ItemIdGetLength(iid);
+
+                       robert_slot_store(slot, td, len, reloid, blkno, offnum);
+                       result = true;
+               }
+       }
+       LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
+
+       return result;
+}
+
+/*
+ * robert_scan_bitmap_next_block
+ *
+ * Just as in a sequential scan, we choose to copy the entire page.  This is
+ * a more dubious strategy here, because it could turn out to be inefficient
+ * if we're only fetching a single tuple from the page.  However, we don't
+ * want to do visibility checks while holding the buffer lock, since that
+ * might involve visiting an arbitrarily large number of undo buffers, so it's
+ * not clear how to do better.
+ *
+ * XXX. It's possible that it would pay to do the visibility checks here
+ * rather than leaving all that work until robert_scan_bitmap_next_tuple
+ * is called; it would likely improve instruction cache locality.  It would
+ * also be more complex, so for right now we don't.
+ */
+bool
+robert_scan_bitmap_next_block(TableScanDesc sscan, TBMIterateResult *tbmres)
+{
+       RobertScanDesc scan = (RobertScanDesc) sscan;
+       BlockNumber blkno = tbmres->blockno;
+
+       /*
+        * Ignore any entries past our notion of where the relation ends; it may
+        * have been extended, but any new entries won't be visible to us.
+        */
+       if (blkno >= scan->rrs_nblocks)
+               return false;
+
+       /* Read the new block. */
+       robert_scan_getnextblock(scan, NoMovementScanDirection, blkno);
+
+       /* Reset the tuple index; will be ignored if page is lossy. */
+       scan->rrs_tupindex = 0;
+
+       /* False if block has no tuples; otherwise, true. */
+       return (scan->rrs_state != ROBERT_SCAN_BLOCK_DONE);
+}
+
+/*
+ * robert_scan_bitmap_next_tuple
+ */
+bool
+robert_scan_bitmap_next_tuple(TableScanDesc sscan,
+                                                         TBMIterateResult *tbmres,
+                                                         TupleTableSlot *slot)
+{
+       RobertScanDesc scan = (RobertScanDesc) sscan;
+
+       if (tbmres->ntuples == 0)
+       {
+               /* Page is lossy; just try the next offset. */
+               scan->rrs_coffset = OffsetNumberNext(scan->rrs_coffset);
+               if (scan->rrs_coffset == scan->rrs_lastoffset)
+                       return false;
+       }
+       else
+       {
+               /* Page is exact; try next indicated offset. */
+               if (scan->rrs_tupindex >= tbmres->ntuples)
+                       return false;
+               scan->rrs_coffset = tbmres->offsets[scan->rrs_tupindex];
+               scan->rrs_tupindex++;
+       }
+
+       robert_scan_getnexttuple(scan, slot);
+       pgstat_count_heap_fetch(scan->rrs_base.rs_rd);
+       return true;
+}
+
+bool
+robert_scan_sample_next_block(TableScanDesc scan,
+                                                         SampleScanState *scanstate)
+{
+       elog(NOTICE, "robert_scan_sample_next_block");
+       return false;
+}
+
+bool
+robert_scan_sample_next_tuple(TableScanDesc scan,
+                                                         SampleScanState *scanstate,
+                                                         TupleTableSlot *slot)
+{
+       elog(NOTICE, "robert_scan_sample_next_tuple");
+       return false;
+}
diff --git a/src/backend/access/robert/robert_slot.c b/src/backend/access/robert/robert_slot.c
new file mode 100644 (file)
index 0000000..bd87d9b
--- /dev/null
@@ -0,0 +1,492 @@
+/*-------------------------------------------------------------------------
+ *
+ * robert_slot.c
+ *       Slots for storing Robert tuples. To facilitate inplace updates, we
+ *       always copy the tuple rather than pointing to the original buffer,
+ *       so this is like HeapTupleTableSlot, not BufferHeapTupleTableSlot,
+ *       but with changes because of our differente on-disk tuple format.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/access/robert/robert_slot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/robert_slot.h"
+#include "access/robert_xlog.h"
+#include "access/undoaccess.h"
+#include "access/undorecord.h"
+#include "access/xact.h"
+#include "storage/procarray.h"
+#include "utils/snapmgr.h"
+
+extern PGDLLIMPORT const TupleTableSlotOps TTSOpsRobert;
+
+static void tts_robert_init(TupleTableSlot *slot);
+static void tts_robert_release(TupleTableSlot *slot);
+static void tts_robert_clear(TupleTableSlot *slot);
+static void tts_robert_getsomeattrs(TupleTableSlot *slot, int natts);
+static Datum tts_robert_getsysattr(TupleTableSlot *slot, int attnum,
+                                         bool *isnull);
+static void tts_robert_materialize(TupleTableSlot *slot);
+static void tts_robert_copyslot(TupleTableSlot *dstslot,
+                                       TupleTableSlot *srcslot);
+static HeapTuple tts_robert_copy_heap_tuple(TupleTableSlot *slot);
+static MinimalTuple tts_robert_copy_minimal_tuple(TupleTableSlot *slot);
+
+static bool robert_snapshot_test(Snapshot snapshot, FullTransactionId fxid,
+                                        CommandId cid, bool xmin_test, uint32 specToken);
+
+const TupleTableSlotOps TTSOpsRobert = {
+       .base_slot_size = sizeof(RobertTupleTableSlot),
+       .init = tts_robert_init,
+       .release = tts_robert_release,
+       .clear = tts_robert_clear,
+       .getsomeattrs = tts_robert_getsomeattrs,
+       .getsysattr = tts_robert_getsysattr,
+       .materialize = tts_robert_materialize,
+       .copyslot = tts_robert_copyslot,
+       .get_heap_tuple = NULL,
+       .get_minimal_tuple = NULL,
+       .copy_heap_tuple = tts_robert_copy_heap_tuple,
+       .copy_minimal_tuple = tts_robert_copy_minimal_tuple
+};
+
+/*
+ * robert_slot_callbacks
+ *
+ * We only use one kind of slot, so this is very simple.
+ */
+const TupleTableSlotOps *
+robert_slot_callbacks(Relation relation)
+{
+       return &TTSOpsRobert;
+}
+
+/*
+ * robert_slot_store
+ *
+ * Create a new robert tuple and store it in a robert slot.
+ *
+ * XXX. It would be nice to optimize this by reusing the previously-allocated
+ * chunk of memory, if there is one and if it's big enough.  With the current
+ * design, we'll often free a chunk and then allocate a new chunk of about the
+ * same size.  Note that if we did this, tts_robert_release might need to
+ * free any chunk that might be left lying around by tts_robert_clear.
+ */
+void
+robert_slot_store(TupleTableSlot *slot, RobertTupleHeader td,
+                                 Size len, Oid tableOid, BlockNumber blkno,
+                                 OffsetNumber offset)
+{
+       RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+       RobertTuple             tuple;
+
+       Assert(slot->tts_ops == &TTSOpsRobert);
+       tts_robert_clear(slot);
+
+       tuple = MemoryContextAlloc(slot->tts_mcxt, ROBERTTUPLESIZE + len);
+       tuple->r_len = len;
+       tuple->r_data = (RobertTupleHeader) ((char *) tuple + ROBERTTUPLESIZE);
+       tuple->r_tableOid = tableOid;
+       ItemPointerSet(&tuple->r_self, blkno, offset);
+       memcpy(tuple->r_data, td, len);
+
+       rslot->tuple = tuple;
+       ItemPointerSet(&slot->tts_tid, blkno, offset);
+       rslot->off = td->r_hoff;
+       slot->tts_flags &= ~TTS_FLAG_EMPTY;
+       slot->tts_flags |= TTS_FLAG_SHOULDFREE;
+}
+
+/*
+ * robert_slot_store_visible
+ *
+ * Figure out which version of a tuple is visible and store that version
+ * into a robert slot; then, return true.  If no version is visible,
+ * return false.
+ */
+bool
+robert_slot_store_visible(TupleTableSlot *slot, RobertTupleHeader td,
+                                                 Size len, Oid tableOid, BlockNumber blkno,
+                                                 OffsetNumber offset, Snapshot snapshot,
+                                                 bool item_is_dead)
+{
+       UndoRecPtr      current_recptr;
+       UnpackedUndoRecord *current = NULL;
+       UnpackedUndoRecord *previous = NULL;
+       bool            result = true;
+       UndoRecordFetchContext context;
+
+       /* It's not necessarily aligned, so we use memcpy. */
+       memcpy(&current_recptr, &td->r_undoptr, sizeof(UndoRecPtr));
+
+       /*
+        * Loop until we find the oldest undo record whose effects are not
+        * visible to us.
+        */
+       BeginUndoFetch(&context);
+       while (current_recptr != InvalidUndoRecPtr)
+       {
+               current = UndoFetchRecord(&context, current_recptr);
+
+               /*
+                * XXX. If there's a specToken, we should be passing that here instead
+                * of passing 0.
+                */
+               if (current == NULL ||
+                       robert_snapshot_test(snapshot, current->uur_fxid, current->uur_cid,
+                                                                true, 0))
+                       break;
+               current_recptr = current->uur_prevundo;
+               if (previous != NULL)
+                       UndoRecordRelease(previous);
+               previous = current;
+               current = NULL;
+       }
+       FinishUndoFetch(&context);
+
+       /*
+        * Release current record, if any.  This is the newest undo record whose
+        * effects are visible to our snapshot; we don't need it for anything.
+        */
+       if (current != NULL)
+       {
+               UndoRecordRelease(current);
+               current = NULL;
+       }
+
+       /* Core visibility logic. */
+       /* XXX. This probably ought to store some XID information in the slot. */
+       if (previous == NULL)
+       {
+               /* No undo records, or all changes visible to us. */
+               if (item_is_dead)
+                       result = false;
+               else
+                       robert_slot_store(slot, td, len, tableOid, blkno, offset);
+       }
+       else
+       {
+               switch (previous->uur_type)
+               {
+                       case ROBERT_UNDO_INSERT:
+                               /* Insert is not visible. Can't see tuple. */
+                               result = false;
+                               break;
+                       case ROBERT_UNDO_DELETE: /* XXX and non-in-place update */
+                               /* Delete is not visible. Can see current tuple. */
+                               robert_slot_store(slot, td, len, tableOid, blkno, offset);
+                               break;
+#if 0
+                       case ROBERT_UNDO_UPDATE_INPLACE:
+                               /*
+                                * In-place update is not visible. Can see tuple from undo
+                                * record.
+                                */
+                               /* XXX */
+                               robert_slot_store(slot,
+                                                                 record->uur_tuple.data,
+                                                                 record->uur_tuple.len,
+                                                                 tableOid, blkno, offset);
+#endif
+               }
+
+               /* Done with previous record. */
+               UndoRecordRelease(previous);
+               previous = NULL;
+       }
+
+       /* If not visible, we are all done. */
+       if (!result)
+               return false;
+
+       /* XXX other stuff like serializability */
+
+       return true;
+}
+
+/*
+ * tts_robert_init
+ *
+ * No special initialization is required when creating a slot.
+ */
+static void
+tts_robert_init(TupleTableSlot *slot)
+{
+       /* nothing */
+}
+
+/*
+ * tts_robert_release
+ *
+ * No special cleanup is required when dropping a slot.
+ */
+static void
+tts_robert_release(TupleTableSlot *slot)
+{
+       /* nothing */
+}
+
+/*
+ * tts_robert_clear
+ */
+static void
+tts_robert_clear(TupleTableSlot *slot)
+{
+       RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+
+       /* Free memory for the tuple if appropriate. */
+       if (TTS_SHOULDFREE(slot))
+       {
+               pfree(rslot->tuple);
+               slot->tts_flags &= ~TTS_FLAG_SHOULDFREE;
+       }
+
+       slot->tts_nvalid = 0;
+       slot->tts_flags |= TTS_FLAG_EMPTY;
+       ItemPointerSetInvalid(&slot->tts_tid);
+       rslot->tuple = NULL;
+       rslot->off = 0;
+}
+
+/*
+ * tts_robert_getsomeattrs
+ */
+static void
+tts_robert_getsomeattrs(TupleTableSlot *slot, int natts)
+{
+       RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+
+       robert_deform_tuple(slot->tts_tupleDescriptor, rslot->tuple,
+                                               slot->tts_values, slot->tts_isnull, natts,
+                                               slot->tts_nvalid, &rslot->off);
+       slot->tts_nvalid = natts;
+}
+
+/*
+ * tts_robert_getsysattr
+ *
+ * slot_getsysattr handles tableoid and ctid, so this function need only
+ * handle requests for cmin, cmax, xmin, and xmax. In contrast to the heap,
+ * we store that information in the undo log, not the tuple.
+ */
+static Datum
+tts_robert_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
+{
+       /*
+        * XXX. It's necessary to do undo log lookups to get this information,
+        * and in some cases we may not have it at all.  We should cache whatever
+        * we find out in the slot, so that if multiple attributes are requested
+        * (or the same ones are requested more than once?) we don't repeat the
+        * lookups.
+        */
+       return (Datum) 0;
+}
+
+/*
+ * tts_robert_materialize
+ *
+ * Make the contents of this slot independent of any external resources.
+ * The only thing we need to worry about is the possibility that entries in
+ * tts_values might point to data in some other memory context.
+ */
+static void
+tts_robert_materialize(TupleTableSlot *slot)
+{
+       RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) slot;
+       MemoryContext   oldcontext;
+
+       if (TTS_SHOULDFREE(slot))
+               return;
+
+       oldcontext = MemoryContextSwitchTo(slot->tts_mcxt);
+       rslot->tuple = robert_form_tuple(slot->tts_tupleDescriptor,
+                                                                        slot->tts_values,
+                                                                        slot->tts_isnull);
+       MemoryContextSwitchTo(oldcontext);
+
+       rslot->off = rslot->tuple->r_data->r_hoff;
+       slot->tts_flags |= TTS_FLAG_SHOULDFREE;
+       slot->tts_nvalid = 0;
+}
+
+/*
+ * tts_robert_copyslot
+ *
+ * It's not sufficient to just copy the tts_values and tts_isnull arrays
+ * from the source slot, because any pass-by-reference datums in the source
+ * slot will be stored in that slot's memory context, not ours. So, construct
+ * and store a tuple built from those arrays.
+ */
+static void
+tts_robert_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
+{
+       RobertTupleTableSlot *rslot = (RobertTupleTableSlot *) dstslot;
+       MemoryContext   oldcontext;
+
+       tts_robert_clear(dstslot);
+       slot_getallattrs(srcslot);
+
+       oldcontext = MemoryContextSwitchTo(dstslot->tts_mcxt);
+       rslot->tuple = robert_form_tuple(srcslot->tts_tupleDescriptor,
+                                                                        srcslot->tts_values,
+                                                                        srcslot->tts_isnull);
+       MemoryContextSwitchTo(oldcontext);
+
+       rslot->off = rslot->tuple->r_data->r_hoff;
+       dstslot->tts_flags = (dstslot->tts_flags | TTS_FLAG_SHOULDFREE)
+               & ~TTS_FLAG_EMPTY;
+}
+
+/*
+ * tts_robert_copy_heap_tuple
+ *
+ * Build a heap tuple representing the contents of this slot.
+ */
+static HeapTuple
+tts_robert_copy_heap_tuple(TupleTableSlot *slot)
+{
+       slot_getallattrs(slot);
+
+       return heap_form_tuple(slot->tts_tupleDescriptor,
+                                                  slot->tts_values, slot->tts_isnull);
+}
+
+/*
+ * tts_robert_copy_minimal_tuple
+ *
+ * Build a minimal tuple representing the contents of this slot.
+ */
+static MinimalTuple
+tts_robert_copy_minimal_tuple(TupleTableSlot *slot)
+{
+       slot_getallattrs(slot);
+
+       return heap_form_minimal_tuple(slot->tts_tupleDescriptor,
+                                                                  slot->tts_values, slot->tts_isnull);
+}
+
+/*
+ * robert_snapshot_test
+ *
+ * Test whether the effects of a change made by a given XID/CID should be
+ * visible to a process using a certain snapshot.
+ *
+ * 'snapshot' is the snapshot to be used for the visibility test.
+ *
+ * 'xid' and 'cid' are the values for the transaction that made the change.
+ *
+ * 'xmin_test' is true if we are testing a tuple's xmin and false if we are
+ * testing 'xmax'. For many snapshot types this doesn't matter, but for
+ * SNAPSHOT_DIRTY and SNAPSHOT_NON_VACUUMABLE it does.
+ *
+ * 'specToken' is only used for SNAPSHOT_DIRTY.
+ *
+ * Perhaps this function should be given a more generic name and moved to
+ * a location where anyone could use it.
+ */
+static bool
+robert_snapshot_test(Snapshot snapshot, FullTransactionId fxid, CommandId cid,
+                                        bool xmin_test, uint32 specToken)
+{
+       TransactionId   xid = XidFromFullTransactionId(fxid);
+
+       switch (snapshot->snapshot_type)
+       {
+               case SNAPSHOT_MVCC:
+               case SNAPSHOT_HISTORIC_MVCC:
+                       /*
+                        * For an MVCC snapshot, we should see effects of the current
+                        * transaction unless they are from a newer command.  We should
+                        * see other transactions if they are committed and not in our
+                        * MVCC snapshot.
+                        */
+                       if (TransactionIdIsCurrentTransactionId(xid))
+                       {
+                               if (snapshot->curcid <= cid)
+                                       return false;
+                       }
+                       else if (XidInMVCCSnapshot(xid, snapshot))
+                               return false;
+                       else if (!TransactionIdDidCommit(xid))
+                               return false;
+                       break;
+
+               case SNAPSHOT_SELF:
+                       /*
+                        * We should always see the effects of our own transaction, and
+                        * we should see other transactions if they committed.
+                        */
+                       if (!TransactionIdIsCurrentTransactionId(xid))
+                       {
+                               if (TransactionIdIsInProgress(xid))
+                                       return false;
+                               else if (!TransactionIdDidCommit(xid))
+                                       return false;
+                       }
+                       break;
+
+               case SNAPSHOT_ANY:
+               case SNAPSHOT_TOAST:
+                       /*
+                        * We should see everything.
+                        */
+                       break;
+
+               case SNAPSHOT_DIRTY:
+                       /*
+                        * We should see everything except for transactions that are
+                        * aborted.  However, if we see an in progress transaction, we
+                        * need to stash the XID and possibly the specToken in the
+                        * snapshot, due to the awful, hacky way that SnapshotDirty works.
+                        */
+                       if (!TransactionIdIsCurrentTransactionId(xid))
+                       {
+                               if (TransactionIdIsInProgress(xid))
+                               {
+                                       if (xmin_test)
+                                       {
+                                               snapshot->xmin = xid;
+                                               snapshot->speculativeToken = specToken;
+                                       }
+                                       else
+                                               snapshot->xmax = xid;
+                               }
+                               else if (TransactionIdDidCommit(xid))
+                                       return false;
+                       }
+                       break;
+
+               case SNAPSHOT_NON_VACUUMABLE:
+                       /*
+                        * This type of snapshot has assymmetric rules for xmin and xmax.
+                        * For xmin, we should see everything except for aborted
+                        * transactions. For xmax, we should see only committed XIDs
+                        * that precede snapshot->xmin.
+                        */
+                       if (xmin_test)
+                       {
+                               if (!TransactionIdIsCurrentTransactionId(xid) &&
+                                       !TransactionIdIsInProgress(xid) &&
+                                       !TransactionIdDidCommit(xid))
+                                       return false;
+                       }
+                       else
+                       {
+                               if (TransactionIdIsCurrentTransactionId(xid) ||
+                                       TransactionIdIsInProgress(xid) ||
+                                       !TransactionIdDidCommit(xid) ||
+                                       !TransactionIdPrecedes(xid, snapshot->xmin))
+                                       return false;
+                       }
+                       break;
+       }
+
+       return true;
+}
diff --git a/src/backend/access/robert/robert_tuple.c b/src/backend/access/robert/robert_tuple.c
new file mode 100644 (file)
index 0000000..a6c73d8
--- /dev/null
@@ -0,0 +1,771 @@
+/*-------------------------------------------------------------------------
+ *
+ * robert_tuple.c
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/access/common/robert_tuple.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heaptoast.h"
+#include "access/robert_tuple.h"
+#include "access/toast_helper.h"
+#include "executor/tuptable.h"
+#include "utils/expandeddatum.h"
+
+static void robert_fill_padding(RobertTupleHeader td, char **data,
+                                       char typalign);
+static int robert_tuple_bitmap_size(TupleDesc tupleDesc, bool *isnull);
+
+/*
+ * robert_compute_data_size
+ *
+ * Compute the amount of space needed to store a robert tuple's data.  The
+ * caller must pass the amount of space required for the tuple header and
+ * null bitmap via the 'hoff' argument; the returned value is the total space
+ * required to store the tuple.
+ */
+Size
+robert_compute_data_size(TupleDesc tupleDesc, Datum *values, bool *isnull,
+                                                uint8 hoff)
+{
+       Size            data_length = hoff;
+       int                     i;
+       int                     numberOfAttributes = tupleDesc->natts;
+
+       Assert(numberOfAttributes <= MaxTupleAttributeNumber);
+
+       for (i = 0; i < numberOfAttributes; i++)
+       {
+               Datum           datum;
+               Form_pg_attribute att;
+
+               if (isnull[i])
+                       continue;
+
+               datum = values[i];
+               att = TupleDescAttr(tupleDesc, i);
+
+               if (att->attbyval)
+               {
+                       /* we store attbyval attributes without alignment padding */
+                       data_length += att->attlen;
+               }
+               else if (att->attlen == -1)
+               {
+                       Pointer         val = DatumGetPointer(datum);
+
+                       if (att->attstorage != 'p' && VARATT_CAN_MAKE_SHORT(val))
+                               data_length += VARATT_CONVERTED_SHORT_SIZE(val);
+                       else if (VARATT_IS_EXTERNAL(val))
+                       {
+                               if (VARATT_IS_EXTERNAL_EXPANDED(val))
+                               {
+                                       /*
+                                        * we want to flatten the expanded value so that the
+                                        * constructed tuple doesn't depend on it
+                                        */
+                                       data_length =
+                                               att_align_nominal(data_length, att->attalign);
+                                       data_length += EOH_get_flat_size(DatumGetEOHP(datum));
+                               }
+                               else
+                                       data_length += VARSIZE_EXTERNAL(datum);
+                       }
+                       else if (VARATT_IS_SHORT(val))
+                               data_length += VARSIZE_SHORT(datum);
+                       else
+                       {
+                               data_length = att_align_nominal(data_length, att->attalign);
+                               data_length += VARSIZE(datum);
+                       }
+               }
+               else
+               {
+                       /* fixed-length passed by reference, and cstrings */
+                       data_length = att_align_nominal(data_length, att->attalign);
+                       data_length = att_addlength_datum(data_length, att->attlen, datum);
+               }
+       }
+
+       return data_length;
+}
+
+/*
+ * robert_fill_null_bitmap
+ *
+ * We start out by setting all the bits, and then clear those that correspond
+ * to a null attribute.
+ */
+void
+robert_fill_null_bitmap(bits8 *bits, int entries_needed, bool *isnull)
+{
+       int                     i;
+
+       memset(bits, 0xff, BITMAPLEN(entries_needed));
+
+       for (i = 0; i < entries_needed; ++i)
+               if (isnull[i])
+                       bits[i / BITS_PER_BYTE] &= ~(1 << (i % BITS_PER_BYTE));
+}
+
+/*
+ * robert_fill_tuple
+ *
+ * Fill in the contents of a robert tuple from values/isnull arrays.  On
+ * entry, the tuple header should already have been initialized, especially
+ * r_hoff. On exit, the tuple data area will have been filled in, and the
+ * r_flags field may also be updated.
+ */
+void
+robert_fill_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull,
+                                 RobertTupleHeader td, Size len)
+{
+       int                     i;
+       int                     numberOfAttributes = tupleDesc->natts;
+       char       *data = ((char *) td) + td->r_hoff;
+
+       td->r_flags &= ~ROBERT_HASEXTERNAL;
+
+       for (i = 0; i < numberOfAttributes; i++)
+       {
+               Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
+               Datum           datum = values[i];
+               Size            data_length;
+
+               if (isnull[i])
+                       continue;
+
+               if (att->attbyval)
+               {
+                       switch (att->attlen)
+                       {
+                               case sizeof(char):
+                                       {
+                                               char            c = DatumGetChar(datum);
+
+                                               memcpy(data, &c, sizeof(char));
+                                               break;
+                                       }
+                               case sizeof(int16):
+                                       {
+                                               int16           s = DatumGetInt16(datum);
+
+                                               memcpy(data, &s, sizeof(int16));
+                                               break;
+                                       }
+                               case sizeof(int32):
+                                       {
+                                               int32           i = DatumGetInt32(datum);
+
+                                               memcpy(data, &i, sizeof(int32));
+                                               break;
+                                       }
+                               case sizeof(Datum):
+                                       memcpy(data, &datum, sizeof(Datum));
+                                       break;
+                               default:
+                                       elog(ERROR, "unsupported byval length: %d", att->attlen);
+                                       break;
+                       }
+
+                       data_length = att->attlen;
+               }
+               else if (att->attlen == -1)
+               {
+                       /* varlena */
+                       Pointer         val = DatumGetPointer(datum);
+
+                       if (VARATT_IS_EXTERNAL(val))
+                       {
+                               if (VARATT_IS_EXTERNAL_EXPANDED(val))
+                               {
+                                       /*
+                                        * we want to flatten the expanded value so that the
+                                        * constructed tuple doesn't depend on it
+                                        */
+                                       ExpandedObjectHeader *eoh = DatumGetEOHP(datum);
+
+                                       robert_fill_padding(td, &data, att->attalign);
+                                       data_length = EOH_get_flat_size(eoh);
+                                       EOH_flatten_into(eoh, data, data_length);
+                               }
+                               else
+                               {
+                                       td->r_flags |= ROBERT_HASEXTERNAL;
+                                       /* no alignment, since it's short by definition */
+                                       data_length = VARSIZE_EXTERNAL(val);
+                                       memcpy(data, val, data_length);
+                               }
+                       }
+                       else if (VARATT_IS_SHORT(val))
+                       {
+                               /* no alignment for short varlenas */
+                               data_length = VARSIZE_SHORT(val);
+                               memcpy(data, val, data_length);
+                       }
+                       else if (att->attlen == -1 && att->attstorage != 'p' &&
+                                        VARATT_CAN_MAKE_SHORT(val))
+                       {
+                               /* convert to short varlena -- no alignment */
+                               data_length = VARATT_CONVERTED_SHORT_SIZE(val);
+                               SET_VARSIZE_SHORT(data, data_length);
+                               memcpy(data + 1, VARDATA(val), data_length - 1);
+                       }
+                       else
+                       {
+                               /* full 4-byte header varlena */
+                               robert_fill_padding(td, &data, att->attalign);
+                               data_length = VARSIZE(val);
+                               memcpy(data, val, data_length);
+                       }
+               }
+               else if (att->attlen == -2)
+               {
+                       /* cstring ... never needs alignment */
+                       Assert(att->attalign == 'c');
+                       data_length = strlen(DatumGetCString(datum)) + 1;
+                       memcpy(data, DatumGetPointer(datum), data_length);
+               }
+               else
+               {
+                       /* fixed-length pass-by-reference */
+                       robert_fill_padding(td, &data, att->attalign);
+                       Assert(att->attlen > 0);
+                       data_length = att->attlen;
+                       memcpy(data, DatumGetPointer(datum), data_length);
+               }
+
+               data += data_length;
+       }
+
+       Assert(data == ((char *) td) + len);
+}
+
+/*
+ * robert_form_tuple
+ *
+ * Construct a tuple from the given values[] and isnull[] arrays, which are of
+ * the length indicated by tupleDesc->natts.
+ *
+ * The result is allocated in the current memory context.
+ */
+RobertTuple
+robert_form_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull)
+{
+       RobertTuple tuple;                      /* return tuple */
+       RobertTupleHeader td;
+       Size            len;
+       uint8           hoff;
+       int                     nullBitmapEntriesNeeded;
+
+       /*
+        * Compute required space.  Note that, unlike the heap, there is no
+        * padding between the end of the null bitmap and the beginning of the
+        * data.
+        */
+       nullBitmapEntriesNeeded = robert_tuple_bitmap_size(tupleDesc, isnull);
+       hoff = offsetof(RobertTupleHeaderData, r_bits)
+               + BITMAPLEN(nullBitmapEntriesNeeded);
+       len = robert_compute_data_size(tupleDesc, values, isnull, hoff);
+
+       /* Allocate space. */
+       tuple = palloc(ROBERTTUPLESIZE + len);
+       td = (RobertTupleHeader) ((char *) tuple + ROBERTTUPLESIZE);
+
+       /* Fill in control information. */
+       tuple->r_len = len;
+       tuple->r_data = td;
+       tuple->r_tableOid = InvalidOid;
+       ItemPointerSetInvalid(&tuple->r_self);
+
+       /* Fill in tuple header information. */
+       td->r_undoptr = InvalidUndoRecPtr;
+       Assert(tupleDesc->natts <= ROBERT_NATTS_MASK);
+       td->r_flags = tupleDesc->natts;
+       td->r_hoff = hoff;
+
+       /* Fill null bitmap. */
+       robert_fill_null_bitmap(td->r_bits, nullBitmapEntriesNeeded, isnull);
+
+       /* Fill tuple data, maybe adjusting flags. */
+       robert_fill_tuple(tupleDesc, values, isnull, td, len);
+
+       return tuple;
+}
+
+/*
+ * robert_deform_tuple
+ *
+ * Partially or completely deform a robert tuple using the provided tupleDesc.
+ * Attribute numbers greater than or equal to oldnatts and less than natts
+ * are deformed.  *offp must point to the starting location of the first
+ * attribute to be deformed.
+ *
+ * The idea is that callers will initially pass oldnatts = 0 and *offp =
+ * tuple->r_hoff.  If, later, more columns are to be deformed, the previous
+ * value of natts should be passed as oldnatts, and *offp should have the value
+ * to which it was set by the previous call.
+ */
+void
+robert_deform_tuple(TupleDesc tupleDesc, RobertTuple tuple, Datum *values,
+                                       bool *isnull, int natts, int oldnatts, uint32 *offp)
+{
+       RobertTupleHeader tup = tuple->r_data;
+       int                     attnum;
+       uint32          off = *offp;
+       bits8      *bp = tup->r_bits;
+       int                     null_limit;
+
+       /* We can only fetch as many attributes as the tuple has. */
+       natts = Min((tuple->r_data->r_flags & ROBERT_NATTS_MASK), natts);
+
+       /* Attributes beyond the end of the null bitmap are not null. */
+       null_limit = (tup->r_hoff - SizeofRobertTupleHeader) * BITS_PER_BYTE;
+
+       for (attnum = oldnatts; attnum < natts; attnum++)
+       {
+               Form_pg_attribute att = TupleDescAttr(tupleDesc, attnum);
+               Datum      *value = &values[attnum];
+               char       *tp = (char *) tup + off;
+
+               if (attnum < null_limit && att_isnull(attnum, bp))
+               {
+                       *value = (Datum) 0;
+                       isnull[attnum] = true;
+                       continue;
+               }
+
+               isnull[attnum] = false;
+
+               if (att->attbyval)
+               {
+                       switch (att->attlen)
+                       {
+                               case sizeof(char):
+                                       {
+                                               char            c;
+
+                                               memcpy(&c, tp, sizeof(char));
+                                               *value = CharGetDatum(c);
+                                               break;
+                                       }
+                               case sizeof(int16):
+                                       {
+                                               int16           s;
+
+                                               memcpy(&s, tp, sizeof(int16));
+                                               *value = Int16GetDatum(s);
+                                               break;
+                                       }
+                               case sizeof(int32):
+                                       {
+                                               int32           i;
+
+                                               memcpy(&i, tp, sizeof(int32));
+                                               *value = Int32GetDatum(i);
+                                               break;
+                                       }
+                               case sizeof(Datum):
+                                       memcpy(value, tp, sizeof(Datum));
+                                       break;
+                               default:
+                                       elog(ERROR, "unsupported byval length: %d", att->attlen);
+                                       break;
+                       }
+
+                       off += att->attlen;
+               }
+               else if (att->attlen == -1)
+               {
+                       if (VARATT_NOT_PAD_BYTE(tp))
+                       {
+                               /* potentially unaligned varlena */
+                               *value = PointerGetDatum(tp);
+                               off += VARSIZE_ANY(*value);
+                       }
+                       else
+                       {
+                               /* we have at least one pad byte, so must be aligned varlena */
+                               off = att_align_nominal(off, att->attalign);
+                               tp = (char *) tup + off;
+                               *value = PointerGetDatum(tp);
+                               off += VARSIZE(*value);
+                       }
+               }
+               else if (att->attlen == -2)
+               {
+                       /* cstring */
+                       Assert(att->attalign == 'c');
+                       *value = PointerGetDatum(tp);
+                       off += strlen(tp);
+               }
+               else
+               {
+                       /* fixed-length pass-by-reference; skip any pad bytes */
+                       Assert(att->attlen > 0);
+                       off = att_align_nominal(off, att->attalign);
+                       tp = (char *) tup + off;
+                       *value = PointerGetDatum(tp);
+                       off += att->attlen;
+               }
+       }
+
+       /* Update caller-provided offset. */
+       *offp = off;
+}
+
+/*
+ * robert_toast_tuple
+ *
+ * Compress tuple attributes or store them externally as necessary to get the
+ * size of the tuple down to something acceptable.
+ *
+ * This function uses MaxHeapAttributeNumber as the maximum number of columns
+ * in the tuple to be inserted. That limit is actually sensible because our
+ * storage format and heap have similar constraints; however, even if they
+ * didn't, this is used by other parts of the system as if it were a global
+ * limit rather than an AM-specific property. Perhaps that can be cleaned up
+ * someday.
+ */
+RobertTuple
+robert_toast_tuple(Relation rel, TupleTableSlot *slot, int options)
+{
+       ToastTupleContext       ttc;
+       TupleDesc       tupleDesc = slot->tts_tupleDescriptor;
+       int                     natts = tupleDesc->natts;
+       Datum           toast_values[MaxHeapAttributeNumber];
+       ToastAttrInfo   toast_attr[MaxHeapAttributeNumber];
+       uint8           hoff;
+       Size            maxLen;
+       bool            done;
+       bool            for_compression = true;
+       RobertTuple     result;
+
+       /* Get all tuple attributes. */
+       slot_getallattrs(slot);
+
+       /*
+        * If this is neither a relation nor a materialized view, it should not
+        * require any TOAST work; it's presumably a TOAST table.  We can save
+        * some overhead by forming and returning the necessary tuple at once.
+        */
+       if (rel->rd_rel->relkind != RELKIND_RELATION &&
+               rel->rd_rel->relkind != RELKIND_MATVIEW)
+       {
+               RobertTuple     tuple;
+
+               tuple = robert_form_tuple(tupleDesc, slot->tts_values,
+                                                                 slot->tts_isnull);
+               Assert((tuple->r_data->r_flags & ROBERT_HASEXTERNAL) == 0);
+               return tuple;
+       }
+
+       /*
+        * We're going to scribble on the values array, so copy it into our
+        * scratch space.  The tts_isnull array will not be changed, so we don't
+        * need to copy it.
+        */
+       Assert(natts <= MaxHeapAttributeNumber);
+       memcpy(toast_values, slot->tts_values, sizeof(Datum) * natts);
+
+       /* Prepare for toasting. */
+       ttc.ttc_rel = rel;
+       ttc.ttc_values = toast_values;
+       ttc.ttc_isnull = slot->tts_isnull;
+       ttc.ttc_oldvalues = NULL;
+       ttc.ttc_oldisnull = NULL;
+       ttc.ttc_toastrel = NULL;
+       ttc.ttc_toastslot = NULL;
+       ttc.ttc_attr = toast_attr;
+       toast_tuple_init(&ttc);
+
+       /* Compute header overhead. */
+       hoff = offsetof(RobertTupleHeaderData, r_bits);
+       if ((ttc.ttc_flags & TOAST_HAS_NULLS) != 0)
+       {
+               int             nullBitmapEntriesNeeded;
+
+               nullBitmapEntriesNeeded =
+                       robert_tuple_bitmap_size(slot->tts_tupleDescriptor,
+                                                                        slot->tts_isnull);
+               hoff += BITMAPLEN(nullBitmapEntriesNeeded);
+       }
+
+       /*
+        * Compute maximum tuple length.  It's not really correct to use
+        * TOAST_TUPLE_TARGET here, because that's a heap-specific property, but
+        * it's also not exactly clear what value would be better.
+        */
+       maxLen = RelationGetToastTupleTarget(rel, TOAST_TUPLE_TARGET);
+
+       /*
+        * Compress attributes with attstorage 'x', and store large attributes
+        * with attstorage 'x' or 'e' externally.  If that isn't enough, make
+        * additional attributes with attstorage 'x' or 'e' external.  (However,
+        * if there's no TOAST table, then we can't make anything external.)
+        */
+       while (1)
+       {
+               int             biggest_attno;
+
+               /* See whether the tuple will be too large. */
+               done = maxLen >= robert_compute_data_size(tupleDesc, toast_values,
+                                                                                                 slot->tts_isnull, hoff);
+               if (done)
+                       break;
+
+               /*
+                * Find the largest attribute with attstorage 'x' or 'e'.  If
+                * for_compression is true, it must also be potentially compressible.
+                */
+               biggest_attno =
+                       toast_tuple_find_biggest_attribute(&ttc, for_compression, false);
+               if (biggest_attno < 0)
+               {
+                       /*
+                        * No suitable attribute was found.  If we were looking for
+                        * compressible attributes, we can still try looking for
+                        * non-compresesable attributes, provided that we have a TOAST
+                        * table to which to push them.
+                        */
+                       if (!for_compression || !OidIsValid(rel->rd_rel->reltoastrelid))
+                               break;
+                       for_compression = false;
+                       break;
+               }
+
+               if (for_compression)
+               {
+                       /* attempt to compress it inline if it has attstorage 'x' */
+                       if (TupleDescAttr(tupleDesc, biggest_attno)->attstorage == 'x')
+                               toast_tuple_try_compression(&ttc, biggest_attno);
+                       else
+                       {
+                               /* attstorage 'e', so flag incompressible */
+                               toast_attr[biggest_attno].tai_colflags |=
+                                       TOASTCOL_INCOMPRESSIBLE;
+                       }
+
+                       /*
+                        * If it's really big, push it out to the TOAST table immediately.
+                        * This avoids uselessly compressing other fileds in the common
+                        * case where we have one long field and several short ones.
+                        */
+                       if (toast_attr[biggest_attno].tai_size > maxLen &&
+                               OidIsValid(rel->rd_rel->reltoastrelid))
+                               toast_tuple_externalize(&ttc, biggest_attno, options,
+                                                                               TOAST_MAX_CHUNK_SIZE);
+               }
+               else
+               {
+                       /*
+                        * XXX. We really should not be using TOAST_MAX_CHUNK_SIZE here,
+                        * since that is a heap-specific value.
+                        */
+                       toast_tuple_externalize(&ttc, biggest_attno, options,
+                                                                       TOAST_MAX_CHUNK_SIZE);
+               }
+       }
+
+       /* Try compressing attributes with attstorage 'm.' */
+       while (!done)
+       {
+               int             biggest_attno;
+
+               biggest_attno = toast_tuple_find_biggest_attribute(&ttc, true, true);
+               if (biggest_attno < 0)
+                       break;
+               toast_tuple_try_compression(&ttc, biggest_attno);
+
+               /* See whether we've sufficiently shrunk the tuple. */
+               done = maxLen >= robert_compute_data_size(tupleDesc, toast_values,
+                                                                                                 slot->tts_isnull, hoff);
+       }
+
+       /*
+        * If we're not yet done and if there is a TOAST table, we can try storing
+        * attributes with attstorage 'm' externally.
+        */
+       if (OidIsValid(rel->rd_rel->reltoastrelid))
+       {
+               /* Only do this if it's *really* necessary. */
+               maxLen = TOAST_TUPLE_TARGET_MAIN;
+
+               while (!done)
+               {
+                       int             biggest_attno;
+
+                       biggest_attno =
+                               toast_tuple_find_biggest_attribute(&ttc, false, true);
+                       if (biggest_attno < 0)
+                               break;
+                       toast_tuple_try_compression(&ttc, biggest_attno);
+
+                       /* See whether we've sufficiently shrunk the tuple. */
+                       done = maxLen >= robert_compute_data_size(tupleDesc, toast_values,
+                                                                                                         slot->tts_isnull, hoff);
+               }
+       }
+
+       /*
+        * XXX. If the source slot is a robert tuple and no changes got made, we
+        * can optimize this.
+        */
+       result = robert_form_tuple(tupleDesc, toast_values, slot->tts_isnull);
+
+       toast_tuple_cleanup(&ttc, true);
+
+       return result;
+}
+
+/*
+ * robert_print_tuple
+ *
+ * Dump a tuple as a printable string. The caller may pfree the returned
+ * string, if desired.
+ *
+ * If the tupleDesc is passed as NULL, the data portion of the tuple will be
+ * dumped as one long string of bytes.  Otherwise, the tupleDesc will be
+ * used to deform the tuple, and the bytes for each attribute will be dumped
+ * individually.
+ *
+ * This is intended for debugging purposes. Where it's not excessively
+ * burdensome to do so, we try to guard against the possibility that this
+ * function might be passed a corrupt tuple; instead, we try to produce
+ * some kind of meaningful text representation and let the user sort it out.
+ * This is not perfect; a defective tuple can certainly cause a crash here,
+ * especially if tupleDesc is not NULL, but it helps.
+ */
+char *
+robert_print_tuple(RobertTuple tuple, TupleDesc tupleDesc)
+{
+       RobertTupleHeader       td = tuple->r_data;
+       unsigned char *s = (unsigned char *) td;
+       uint32  offp = 0;
+       StringInfoData  buf;
+
+       initStringInfo(&buf);
+
+       /* Decode tuple header. */
+       if (tuple->r_len >= SizeofRobertTupleHeader)
+       {
+               UndoLogNumber   ulogno = UndoRecPtrGetLogNo(td->r_undoptr);
+               UndoLogOffset   uoffset = UndoRecPtrGetOffset(td->r_undoptr);
+
+               appendStringInfo(&buf, "undo:%06X.%010" INT64_MODIFIER
+                                                "X flags:%04X hoff:%u", ulogno, uoffset,
+                                                td->r_flags, td->r_hoff);
+               offp = SizeofRobertTupleHeader;
+       }
+
+       /* Decode any null bitmap. */
+       if (tuple->r_len >= td->r_hoff && td->r_hoff > SizeofRobertTupleHeader)
+       {
+               Size    nullbytes = td->r_hoff - SizeofRobertTupleHeader;
+               Size    i;
+
+               appendStringInfoString(&buf, " nulls:");
+               for (i = 0; i < nullbytes; ++i)
+                       appendStringInfo(&buf, "%02x", s[i]);
+               offp = td->r_hoff;
+       }
+
+       /* Decode attributes, if a tupleDesc was provided. */
+       if (tupleDesc != NULL)
+       {
+               Datum  *values;
+               bool   *isnull;
+               int             attnum;
+
+               values = palloc(tupleDesc->natts * sizeof(Datum));
+               isnull = palloc(tupleDesc->natts * sizeof(bool));
+
+               /*
+                * Normally, we want to deform all attributes at once for efficiency,
+                * but here we deform the one by one so that we can learn the byte
+                * position of each attribute in the tuple.
+                */
+               for (attnum = 0; attnum < tupleDesc->natts; ++attnum)
+               {
+                       uint32  prev_offp = offp;
+                       Size    i;
+
+                       robert_deform_tuple(tupleDesc, tuple, values, isnull, attnum + 1,
+                                                               attnum, &offp);
+                       if (isnull[attnum])
+                               continue;
+                       appendStringInfo(&buf, " %d(%d):", attnum + 1, offp - prev_offp);
+                       for (i = prev_offp; i < offp; ++i)
+                               appendStringInfo(&buf, "%02x", s[i]);
+               }
+
+               pfree(values);
+               pfree(isnull);
+       }
+
+       /* Dump any remaining bytes. Unexpected if a tupleDesc was specified. */
+       if (offp < tuple->r_len)
+       {
+               Size    i;
+
+               appendStringInfoString(&buf, " rest:");
+               for (i = offp; i < tuple->r_len; ++i)
+                       appendStringInfo(&buf, "%02x", s[i]);
+       }
+
+       return buf.data;
+}
+
+/*
+ * robert_insert_padding
+ *
+ * Insert enough padding into a tuple so that the next field will start
+ * aligned on an appropriate boundary. Update *data to point to the first
+ * byte following the inserted padding bytes.
+ */
+static void
+robert_fill_padding(RobertTupleHeader td, char **data, char typalign)
+{
+       char *start = (char *) td;
+       int curbytes = *data - start;
+       int     padbytes = att_align_nominal(curbytes, typalign) - curbytes;
+
+       memset(*data, 0, padbytes);
+       *data += padbytes;
+}
+
+/*
+ * robert_tuple_bitmap_size
+ *
+ * Work out the number of entries that we'll need for the null bitmap.  Note
+ * that the return value is the number of entries, not the number of bytes.
+ */
+static int
+robert_tuple_bitmap_size(TupleDesc tupleDesc, bool *isnull)
+{
+       int                     nullBitmapEntriesNeeded = 0;
+       int                     i;
+
+       /*
+        * We'll only store enough bytes in the null bitmap to represent the nulls
+        * actually present in the tuple, so we need to find the last null actually
+        * present.
+        */
+       for (i = tupleDesc->natts; i >= 0; i--)
+       {
+               if (isnull[i])
+               {
+                       nullBitmapEntriesNeeded = i + 1;
+                       break;
+               }
+       }
+
+       return nullBitmapEntriesNeeded;
+}
diff --git a/src/backend/access/robert/robert_xlog.c b/src/backend/access/robert/robert_xlog.c
new file mode 100644 (file)
index 0000000..2f3c54a
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * robert_xlog.c
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "access/robert_xlog.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "utils/rel.h"
+
+static void robert_undo_buffer(Buffer buf, int nrecords, UndoRecInfo *records);
+
+void
+robert_redo(XLogReaderState *record)
+{
+}
+
+/*
+ * robert_undo
+ *
+ * Toplevel undo handler for this AM.
+ */
+void
+robert_undo(int nrecords, UndoRecInfo *records)
+{
+       Relation rel = NULL;
+       int             i,
+                       j;
+
+       Assert(nrecords > 0);
+
+       for (i = 0; i < nrecords; i = j)
+       {
+               UnpackedUndoRecord *record = records[i].uur;
+               Buffer  buf;
+
+               /*
+                * If this undo record is for the same relation as the previous undo
+                * record, we just keep the same relation open.  If it's for a
+                * different relation, close the old one and open the new one.
+                *
+                * XXX. It might be better to keep the lock on the old relation if
+                * there's more undo pending for that relation in this transaction,
+                * to avoid deadlock risk. But how would we arrange to close the
+                * relation at the end? Also, if this is the last chunk of undo then
+                * there shouldn't be any more records for this relation, so it would
+                * probably be better to release the lock right away.  And also, if
+                * we're already holding on to tons of locks, it might be better to
+                * drop some of them to free up space in the lock table so we don't
+                * just fail. For now, do the easy thing.
+                */
+               if (rel == NULL || RelationGetRelid(rel) != record->uur_reloid)
+               {
+                       elog(LOG, "robert_undo: select relation %u", record->uur_reloid);
+                       if (rel != NULL)
+                               relation_close(rel, RowExclusiveLock);
+                       rel = relation_open(record->uur_reloid, RowExclusiveLock);
+               }
+
+               /* Figure out where the records for this block stop. */
+               for (j = i + 1; j < nrecords; ++j)
+               {
+                       UnpackedUndoRecord *otherrec = records[j].uur;
+
+                       /*
+                        * XXX. This should also check uur_dbid, but right now that doesn't
+                        * work properly.
+                        */
+                       if (record->uur_reloid != otherrec->uur_reloid ||
+                               record->uur_fork != otherrec->uur_fork ||
+                               record->uur_block != otherrec->uur_block)
+                               break;
+               }
+
+               /* Read the target block. */
+               elog(LOG, "robert_undo: block %u records start at %d, end at %d",
+                        record->uur_block, i, j);
+               buf = ReadBuffer(rel, record->uur_block);
+
+               /*
+                * Process all records for the target block.  This function also
+                * releases the pin.
+                */
+               robert_undo_buffer(buf, j - i, records + i);
+       }
+
+       /* Clean up. */
+       if (rel != NULL)
+               relation_close(rel, RowExclusiveLock);
+}
+
+/*
+ * robert_undo_buffer
+ *
+ * Apply a group of undo actions that all relate to a particular buffer.
+ * On entry, the buffer should be pinned; this function will release the
+ * pin before returning.
+ */
+static void
+robert_undo_buffer(Buffer buf, int nrecords, UndoRecInfo *records)
+{
+       Page    page = BufferGetPage(buf);
+       int             i;
+
+       /* Take an exclusive lock on the buffer. */
+       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+       /*
+        * XXX. This is an obvious crock that only works for inserts.  For a
+        * an in-place update, we'd need to fetch the old tuple from undo and
+        * put it back. For a non-in-place update or delete, we only need to
+        * remove the delete-mark unless the page has been pruned.
+        */
+       for (i = 0; i < nrecords; ++i)
+       {
+               UnpackedUndoRecord *record = records[i].uur;
+               ItemId iid = PageGetItemId((PageHeader) page, record->uur_offset);
+
+               if (record->uur_type == ROBERT_UNDO_INSERT)
+                       ItemIdSetDead(iid);
+               else if (record->uur_type == ROBERT_UNDO_DELETE)
+                       ItemIdSetNormal(iid, ItemIdGetOffset(iid),
+                                                       ItemIdGetLength(iid));
+               else
+                       elog(NOTICE, "robert_undo: unknown type %d", record->uur_type);
+       }
+
+       /* All done. Note that caller expects us to drop the pin. */
+       UnlockReleaseBuffer(buf);
+}
+
+void
+robert_undo_desc(StringInfo buf, UnpackedUndoRecord *record)
+{
+       elog(LOG, "robert_undo_desc");
+}
diff --git a/src/backend/access/robert/robertam.c b/src/backend/access/robert/robertam.c
new file mode 100644 (file)
index 0000000..49045cd
--- /dev/null
@@ -0,0 +1,202 @@
+/*
+ * robertam.c
+ */
+
+#include "postgres.h"
+
+#include "access/heaptoast.h"
+#include "access/robertam.h"
+#include "access/robert_scan.h"
+#include "access/robert_slot.h"
+#include "utils/builtins.h"
+
+extern Datum robert_tableam_handler(PG_FUNCTION_ARGS);
+
+static const TableAmRoutine robertam_methods = {
+       .type = T_TableAmRoutine,
+
+       .slot_callbacks = robert_slot_callbacks,
+
+       .scan_begin = robert_scan_begin,
+       .scan_end = robert_scan_end,
+       .scan_rescan = robert_scan_rescan,
+       .scan_getnextslot = robert_scan_getnextslot,
+
+       .parallelscan_estimate = table_block_parallelscan_estimate,
+       .parallelscan_initialize = table_block_parallelscan_initialize,
+       .parallelscan_reinitialize = table_block_parallelscan_reinitialize,
+
+       .index_fetch_begin = robert_index_fetch_begin,
+       .index_fetch_reset = robert_index_fetch_reset,
+       .index_fetch_end = robert_index_fetch_end,
+       .index_fetch_tuple = robert_index_fetch_tuple,
+
+       .tuple_fetch_row_version = robert_tuple_fetch_row_version,
+       .tuple_tid_valid = robert_tuple_tid_valid,
+       .tuple_get_latest_tid = robert_tuple_get_latest_tid,
+       .tuple_satisfies_snapshot = robert_tuple_satisfies_snapshot,
+       .compute_xid_horizon_for_tuples = robert_compute_xid_horizon_for_tuples,
+
+       .tuple_insert = robert_tuple_insert,
+       .tuple_insert_speculative = robert_tuple_insert_speculative,
+       .tuple_complete_speculative = robert_tuple_complete_speculative,
+       .multi_insert = robert_multi_insert,
+       .tuple_delete = robert_tuple_delete,
+       .tuple_update = robert_tuple_update,
+       .tuple_lock = robert_tuple_lock,
+       .finish_bulk_insert = robert_finish_bulk_insert,
+
+       .relation_set_new_filenode = robert_relation_set_new_filenode,
+       .relation_nontransactional_truncate = robert_relation_nontransactional_truncate,
+       .relation_copy_data = robert_relation_copy_data,
+       .relation_copy_for_cluster = robert_relation_copy_for_cluster,
+       .relation_vacuum = robert_relation_vacuum,
+       .scan_analyze_next_block = robert_scan_analyze_next_block,
+       .scan_analyze_next_tuple = robert_scan_analyze_next_tuple,
+       .index_build_range_scan = robert_index_build_range_scan,
+       .index_validate_scan = robert_index_validate_scan,
+
+       .relation_size = table_block_relation_size,
+       .relation_needs_toast_table = robert_relation_needs_toast_table,
+       .relation_toast_am = robert_relation_toast_am,
+       .toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE,
+
+       .relation_estimate_size = robert_relation_estimate_size,
+
+       .scan_bitmap_next_block = robert_scan_bitmap_next_block,
+       .scan_bitmap_next_tuple = robert_scan_bitmap_next_tuple,
+
+       .scan_sample_next_block = robert_scan_sample_next_block,
+       .scan_sample_next_tuple = robert_scan_sample_next_tuple
+};
+
+Datum
+robert_tableam_handler(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_POINTER(&robertam_methods);
+}
+
+
+bool
+robert_tuple_fetch_row_version(Relation rel, ItemPointer tid,
+                                                          Snapshot snapshot, TupleTableSlot *slot)
+{
+       return false;
+}
+
+bool
+robert_tuple_tid_valid(TableScanDesc sscan, ItemPointer tid)
+{
+       RobertScanDesc scan = (RobertScanDesc) sscan;
+
+       return ItemPointerIsValid(tid) &&
+               ItemPointerGetBlockNumber(tid) < scan->rrs_nblocks;
+}
+
+void
+robert_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
+{
+}
+
+bool
+robert_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
+                                                               Snapshot snapshot)
+{
+       return false;
+}
+
+TransactionId
+robert_compute_xid_horizon_for_tuples(Relation rel,
+                                                                         ItemPointerData *items,
+                                                                         int nitems)
+{
+       return InvalidTransactionId;
+}
+
+/*
+ * Check to see whether the table needs a TOAST table.  It does only if
+ * (1) there are any toastable attributes, and (2) the maximum length
+ * of a tuple could exceed TOAST_TUPLE_THRESHOLD.  (We don't want to
+ * create a toast table for something like "f1 varchar(20)".)
+ */
+bool
+robert_relation_needs_toast_table(Relation rel)
+{
+       int32           data_length = 0;
+       bool            maxlength_unknown = false;
+       bool            has_toastable_attrs = false;
+       TupleDesc       tupdesc = rel->rd_att;
+       int32           tuple_length;
+       int                     i;
+
+       for (i = 0; i < tupdesc->natts; i++)
+       {
+               Form_pg_attribute att = TupleDescAttr(tupdesc, i);
+
+               if (att->attisdropped)
+                       continue;
+
+               /* we don't align if it's pass-by-value */
+               if (!att->attbyval)
+                       data_length = att_align_nominal(data_length, att->attalign);
+
+               if (att->attlen > 0)
+               {
+                       /* Fixed-length types are never toastable */
+                       data_length += att->attlen;
+               }
+               else
+               {
+                       int32           maxlen = type_maximum_size(att->atttypid,
+                                                                                                  att->atttypmod);
+
+                       if (maxlen < 0)
+                               maxlength_unknown = true;
+                       else
+                               data_length += maxlen;
+                       if (att->attstorage != 'p')
+                               has_toastable_attrs = true;
+               }
+       }
+       if (!has_toastable_attrs)
+               return false;                   /* nothing to toast? */
+       if (maxlength_unknown)
+               return true;                    /* any unlimited-length attrs? */
+       tuple_length = SizeofRobertTupleHeader + BITMAPLEN(tupdesc->natts) +
+               data_length;
+       return (tuple_length > TOAST_TUPLE_THRESHOLD);
+}
+
+/*
+ * robert_relation_toast_am
+ *
+ * TOAST tables for robert relations are just robert relations.
+ */
+Oid
+robert_relation_toast_am(Relation rel)
+{
+       return rel->rd_rel->relam;
+}
+
+/*
+ * robert_relation_estimate_size
+ *
+ * Each tuple involves an item pointer and an (unaligned) RobertTupleHeader.
+ *
+ * We don't currently have any special space, so only the size of the page
+ * header needs to be subtracted from the number of usable bytes per page.
+ */
+void
+robert_relation_estimate_size(Relation rel, int32 *attr_widths,
+                                                         BlockNumber *pages, double *tuples,
+                                                         double *allvisfrac)
+{
+       const Size overhead_bytes_per_tuple =
+               SizeofRobertTupleHeader + sizeof(ItemIdData);
+       const Size usable_bytes_per_page = BLCKSZ - SizeOfPageHeaderData;
+
+       table_block_relation_estimate_size(rel, attr_widths, pages, tuples,
+                                                                          allvisfrac,
+                                                                          overhead_bytes_per_tuple,
+                                                                          usable_bytes_per_page);
+}
index c57eca240f5b91f6cf536b0298cf12d8ec7c0a30..10f8226a1e054d45dd10cf6a95b3ff97839df892 100644 (file)
@@ -17,6 +17,7 @@
 #include "access/brin_xlog.h"
 #include "access/multixact.h"
 #include "access/nbtxlog.h"
+#include "access/robert_xlog.h"
 #include "access/spgxlog.h"
 #include "access/undoaction_xlog.h"
 #include "access/undolog_xlog.h"
index 272edcbcba38ee4741809e6f6d532af7c53738ec..3bf3e8dcd97ce54826eefee402b07cecbc146f45 100644 (file)
@@ -133,6 +133,12 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
                        DecodeLogicalMsgOp(ctx, &buf);
                        break;
 
+               case RM_ROBERT_ID:
+                       /* XXX. We sure need to do something better here. */
+                       ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+                                                                       buf.origptr);
+                       break;
+
                        /*
                         * Rmgrs irrelevant for logical decoding; they describe stuff not
                         * represented in logical decoding. Add new rmgrs in rmgrlist.h's
index 976f80e9c30d79c74ab9b7bc638c67a39ab868ef..b8177ab2fdf4d7787388e4d3c0d34cabe5498a0e 100644 (file)
@@ -19,6 +19,7 @@
 #include "access/multixact.h"
 #include "access/nbtxlog.h"
 #include "access/rmgr.h"
+#include "access/robert_xlog.h"
 #include "access/spgxlog.h"
 #include "access/undoaction_xlog.h"
 #include "access/undolog_xlog.h"
index 6da5930e0b7f0fdf0cb63a0c58d14fc376c60bb7..b6e6754447c9952b461bb7eae532d98d61c9eb28 100644 (file)
@@ -49,3 +49,4 @@ PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify,
 PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL, NULL)
 PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_ROBERT_ID, "Robert", robert_redo, robert_desc, robert_identify, NULL, NULL, NULL, robert_undo, NULL, robert_undo_desc)
diff --git a/src/include/access/robert_page.h b/src/include/access/robert_page.h
new file mode 100644 (file)
index 0000000..cf94de9
--- /dev/null
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * robert_page.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef ROBERT_PAGE_H
+#define ROBERT_PAGE_H
+
+#include "access/robert_tuple.h"
+#include "access/undolog.h"
+#include "storage/block.h"
+#include "storage/itemptr.h"
+#include "storage/off.h"
+
+/* External function prototypes. */
+extern OffsetNumber robert_page_free_offset(Page page, Size size);
+extern void robert_page_add_item(Page page, OffsetNumber offnum,
+                                        RobertTuple tuple);
+
+#endif
diff --git a/src/include/access/robert_scan.h b/src/include/access/robert_scan.h
new file mode 100644 (file)
index 0000000..d47d2b5
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * robert_scan.h
+ */
+
+#ifndef ROBERT_SCAN_H
+#define ROBERT_SCAN_H
+
+#include "access/relscan.h"
+#include "access/sdir.h"
+#include "access/skey.h"
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+#include "nodes/tidbitmap.h"
+#include "utils/snapshot.h"
+
+typedef enum RobertScanState
+{
+       ROBERT_SCAN_NOT_STARTED,
+       ROBERT_SCAN_BLOCK_DONE,
+       ROBERT_SCAN_TUPLE_DONE,
+       ROBERT_SCAN_READY
+} RobertScanState;
+
+typedef struct RobertScanDescData
+{
+       TableScanDescData rrs_base;     /* AM independent part of the descriptor */
+       BlockNumber rrs_nblocks;
+       BlockNumber rrs_startblock;
+       BlockNumber rrs_numblocks;
+       RobertScanState rrs_state;
+       BlockNumber     rrs_cblock;
+       OffsetNumber rrs_coffset;
+       OffsetNumber rrs_lastoffset;
+       int                     rrs_tupindex;
+       BufferAccessStrategy    rrs_strategy;
+       Page            rrs_cpage;
+} RobertScanDescData;
+
+typedef RobertScanDescData *RobertScanDesc;
+
+/* Table scans. */
+extern TableScanDesc robert_scan_begin(Relation rel, Snapshot snapshot,
+                                 int nkeys, ScanKeyData *key,
+                                 ParallelTableScanDesc pscan, uint32 flags);
+extern void robert_scan_end(TableScanDesc sscan);
+extern void robert_scan_rescan(TableScanDesc sscan, ScanKeyData *key,
+                                  bool set_params, bool allow_strat,
+                                  bool allow_sync, bool allow_pagemode);
+extern bool robert_scan_getnextslot(TableScanDesc sscan,
+                                               ScanDirection direction,
+                                               TupleTableSlot *slot);
+
+/* Index scans. */
+extern IndexFetchTableData *robert_index_fetch_begin(Relation rel);
+extern void robert_index_fetch_reset(IndexFetchTableData *data);
+extern void robert_index_fetch_end(IndexFetchTableData *data);
+extern bool robert_index_fetch_tuple(IndexFetchTableData *data,
+                                                ItemPointer tid, Snapshot snapshot,
+                                                TupleTableSlot *slot,
+                                                bool *call_again, bool *all_dead);
+
+/* Bitmap scans. */
+extern bool robert_scan_bitmap_next_block(TableScanDesc sscan,
+                                                         TBMIterateResult *tbmres);
+extern bool robert_scan_bitmap_next_tuple(TableScanDesc sscan,
+                                                         TBMIterateResult *tbmres,
+                                                         TupleTableSlot *slot);
+
+/* Sample scans. */
+extern bool robert_scan_sample_next_block(TableScanDesc scan,
+                                                         SampleScanState *scanstate);
+extern bool robert_scan_sample_next_tuple(TableScanDesc scan,
+                                                         SampleScanState *scanstate,
+                                                         TupleTableSlot *slot);
+
+/* Internal functions */
+extern BlockNumber robert_scan_get_blocks_done(RobertScanDesc scan);
+
+#endif
diff --git a/src/include/access/robert_slot.h b/src/include/access/robert_slot.h
new file mode 100644 (file)
index 0000000..8e7585b
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * robert_slot.h
+ */
+
+#ifndef ROBERT_SLOT_H
+#define ROBERT_SLOT_H
+
+#include "access/robert_tuple.h"
+#include "executor/tuptable.h"
+#include "utils/rel.h"
+
+typedef struct RobertTupleTableSlot
+{
+       TupleTableSlot  base;
+       RobertTuple             tuple;
+       uint32                  off;
+} RobertTupleTableSlot;
+
+extern PGDLLIMPORT const TupleTableSlotOps TTSOpsRobert;
+
+extern const TupleTableSlotOps *robert_slot_callbacks(Relation relation);
+extern void robert_slot_store(TupleTableSlot *slot, RobertTupleHeader td,
+                                 Size len, Oid tableOid, BlockNumber blkno,
+                                 OffsetNumber offset);
+extern bool robert_slot_store_visible(TupleTableSlot *slot,
+                                                                         RobertTupleHeader td, Size len,
+                                                                         Oid tableOid, BlockNumber blkno,
+                                                                         OffsetNumber offset, Snapshot snapshot,
+                                                                         bool item_is_dead);
+
+#endif
diff --git a/src/include/access/robert_tuple.h b/src/include/access/robert_tuple.h
new file mode 100644 (file)
index 0000000..28bbc90
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * robert_tuple.h
+ */
+
+#ifndef ROBERT_TUPLE_H
+#define ROBERT_TUPLE_H
+
+#include "access/transam.h"
+#include "access/tupdesc.h"
+#include "access/undolog.h"
+#include "storage/itemptr.h"
+#include "utils/relcache.h"
+
+struct TupleTableSlot;
+
+typedef struct RobertTupleHeaderData
+{
+       UndoRecPtr      r_undoptr;
+       uint16          r_flags;
+       uint8           r_hoff;
+       bits8           r_bits[FLEXIBLE_ARRAY_MEMBER];
+} RobertTupleHeaderData;
+
+#define SizeofRobertTupleHeader                offsetof(RobertTupleHeaderData, r_bits)
+
+/* Flags for use in r_flags. */
+#define ROBERT_NATTS_MASK                      0x07FF          /* 11 bits */
+#define        ROBERT_HASEXTERNAL                      0x0800
+
+typedef RobertTupleHeaderData *RobertTupleHeader;
+
+typedef struct RobertTupleData
+{
+       uint32          r_len;
+       RobertTupleHeader r_data;
+       Oid                     r_tableOid;
+       ItemPointerData r_self;
+} RobertTupleData;
+
+typedef RobertTupleData *RobertTuple;
+
+#define ROBERTTUPLESIZE   MAXALIGN(sizeof(RobertTupleData))
+
+extern Size robert_compute_data_size(TupleDesc tupleDesc, Datum *values,
+                                                bool *isnull, uint8 hoff);
+extern void robert_fill_null_bitmap(bits8 *bits, int entries_needed,
+                                               bool *isnull);
+extern void robert_fill_tuple(TupleDesc tupleDesc, Datum *values, bool *isnull,
+                                 RobertTupleHeader td, Size len);
+extern RobertTuple robert_form_tuple(TupleDesc tupleDesc, Datum *values,
+                                 bool *isnull);
+extern void robert_deform_tuple(TupleDesc tupleDesc, RobertTuple tuple,
+                                       Datum *values, bool *isnull, int natts, int oldnatts,
+                                       uint32 *offp);
+extern RobertTuple robert_toast_tuple(Relation rel,
+                                  struct TupleTableSlot *slot,
+                                  int options);
+extern char *robert_print_tuple(RobertTuple tuple, TupleDesc tupleDesc);
+
+#endif
diff --git a/src/include/access/robert_xlog.h b/src/include/access/robert_xlog.h
new file mode 100644 (file)
index 0000000..67496ff
--- /dev/null
@@ -0,0 +1,29 @@
+/*-------------------------------------------------------------------------
+ *
+ * robert_xlog.h
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/robert_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef ROBERT_XLOG_H
+#define ROBERT_XLOG_H
+
+#include "access/undoaccess.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+
+#define        ROBERT_UNDO_INSERT                      1
+#define ROBERT_UNDO_DELETE                     2
+
+extern void robert_redo(XLogReaderState *record);
+extern void robert_desc(StringInfo buf, XLogReaderState *record);
+extern const char *robert_identify(uint8 info);
+
+extern void    robert_undo(int nrecords, UndoRecInfo *records);
+extern void    robert_undo_desc(StringInfo buf, UnpackedUndoRecord *record);
+
+#endif                                                 /* ROBERT_XLOG_H */
diff --git a/src/include/access/robertam.h b/src/include/access/robertam.h
new file mode 100644 (file)
index 0000000..b6c3bc4
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * robertam.h
+ */
+
+#ifndef ROBERTAM_H
+#define ROBERTAM_H
+
+#include "access/hio.h"
+#include "access/skey.h"
+#include "access/tableam.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "nodes/execnodes.h"
+
+/* MVCC. */
+extern bool robert_tuple_fetch_row_version(Relation rel, ItemPointer tid,
+                                                          Snapshot snapshot, TupleTableSlot *slot);
+extern bool robert_tuple_tid_valid(TableScanDesc sscan, ItemPointer tid);
+extern void robert_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid);
+extern bool robert_tuple_satisfies_snapshot(Relation rel,
+                                                               TupleTableSlot *slot,
+                                                               Snapshot snapshot);
+extern TransactionId robert_compute_xid_horizon_for_tuples(Relation rel,
+                                                                         ItemPointerData *items,
+                                                                         int nitems);
+
+/* DML */
+extern void robert_tuple_insert(Relation rel, TupleTableSlot *slot,
+                                       CommandId cid, int options,
+                                       BulkInsertStateData *bistate);
+extern void robert_tuple_insert_speculative(Relation rel, TupleTableSlot *slot,
+                                                               CommandId cid, int options,
+                                                               BulkInsertStateData *bistate,
+                                                               uint32 specToken);
+extern void robert_tuple_complete_speculative(Relation rel,
+                                                                 TupleTableSlot *slot, uint32 specToken,
+                                                                 bool succeeded);
+extern void robert_multi_insert(Relation rel,
+                                       TupleTableSlot **slots, int nslots,
+                                       CommandId cid, int options,
+                                       BulkInsertStateData *bistate);
+extern TM_Result robert_tuple_delete(Relation rel, ItemPointer tid,
+                                       CommandId cid, Snapshot snapshot, Snapshot crosscheck,
+                                       bool wait, TM_FailureData *tmfd, bool changingPart);
+extern TM_Result robert_tuple_update(Relation rel, ItemPointer otid,
+                                       TupleTableSlot *slot, CommandId cid, Snapshot snapshot,
+                                       Snapshot crosscheck, bool wait, TM_FailureData *tmfd,
+                                       LockTupleMode *lockmode, bool *update_indexes);
+extern TM_Result robert_tuple_lock(Relation rel, ItemPointer tid,
+                                 Snapshot snapshot, TupleTableSlot *slot, CommandId cid,
+                                 LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags,
+                                 TM_FailureData *tmfd);
+extern void robert_finish_bulk_insert(Relation rel, int options);
+
+
+/* DDL. */
+extern void robert_relation_set_new_filenode(Relation rel,
+                                                                const RelFileNode *newrnode,
+                                                                char persistence,
+                                                                TransactionId *freezeXid,
+                                                                MultiXactId *minmulti);
+extern void robert_relation_nontransactional_truncate(Relation rel);
+extern void robert_relation_copy_data(Relation rel,
+                                                 const RelFileNode *newrnode);
+extern void robert_relation_copy_for_cluster(Relation NewHeap,
+                                                                Relation OldHeap, Relation OldIndex,
+                                                                bool use_sort, TransactionId OldestXmin,
+                                                                TransactionId *xid_cutoff,
+                                                                MultiXactId *multi_cutoff,
+                                                                double *num_tuples, double *tups_vacuumed,
+                                                                double *tups_recently_dead);
+extern void robert_relation_vacuum(Relation onerel, VacuumParams *params,
+                                          BufferAccessStrategy bstrategy);
+extern bool robert_scan_analyze_next_block(TableScanDesc scan,
+                                                          BlockNumber blockno,
+                                                          BufferAccessStrategy bstrategy);
+extern bool robert_scan_analyze_next_tuple(TableScanDesc scan,
+                                                          TransactionId OldestXmin,
+                                                          double *liverows, double *deadrows,
+                                                          TupleTableSlot *slot);
+extern double robert_index_build_range_scan(Relation heap_rel,
+                                                         Relation index_rel, IndexInfo *index_nfo,
+                                                         bool allow_sync, bool anyvisible, bool progress,
+                                                         BlockNumber start_blockno,
+                                                         BlockNumber end_blockno,
+                                                         IndexBuildCallback callback,
+                                                         void *callback_state,
+                                                         TableScanDesc scan);
+extern void robert_index_validate_scan(Relation heap_rel, Relation index_rel,
+                                                  IndexInfo *index_info, Snapshot snapshot,
+                                                  ValidateIndexState *state);
+
+/* Miscellaneous. */
+extern uint64 robert_relation_size(Relation rel, ForkNumber forkNumber);
+extern bool    robert_relation_needs_toast_table(Relation rel);
+extern Oid robert_relation_toast_am(Relation rel);
+
+/* Planner. */
+extern void robert_relation_estimate_size(Relation rel, int32 *attr_widths,
+                                                         BlockNumber *pages, double *tuples,
+                                                         double *allvisfrac);
+
+#endif
index 393b41dd684b81606972641d1caf0dd1353352f6..d855db86d4087eb93fc5bd2f5b095bb8d9353214 100644 (file)
@@ -33,5 +33,8 @@
 { oid => '3580', oid_symbol => 'BRIN_AM_OID',
   descr => 'block range index (BRIN) access method',
   amname => 'brin', amhandler => 'brinhandler', amtype => 'i' },
+{ oid => '8192', oid_symbol => 'ROBERT_HEAP_TABLE_AM_OID',
+  descr => 'robert table access method',
+  amname => 'robert', amhandler => 'robert_tableam_handler', amtype => 't' },
 
 ]
index de475cb2d4c2d72a7f1e3a46d48eb9e6cf21fa14..3417efc7aa954d5c8d169ac3816a820e51a128b5 100644 (file)
   proname => 'heap_tableam_handler', provolatile => 'v',
   prorettype => 'table_am_handler', proargtypes => 'internal',
   prosrc => 'heap_tableam_handler' },
+{ oid => '8193', oid_symbol => 'ROBERT_TABLE_AM_HANDLER_OID',
+  descr => 'robert table access method handler',
+  proname => 'robert_tableam_handler', provolatile => 'v',
+  prorettype => 'table_am_handler', proargtypes => 'internal',
+  prosrc => 'robert_tableam_handler' },
 
 # Index access method handlers
 { oid => '330', descr => 'btree index access method handler',
index 84da403afc5db8832012056dafc8a7d68b0f37e0..f9301388ee5a22b05da193af90c0eb2958fa334f 100644 (file)
@@ -126,11 +126,12 @@ ERROR:  function int4in(internal) does not exist
 CREATE ACCESS METHOD bogus TYPE TABLE HANDLER bthandler;
 ERROR:  function bthandler must return type table_am_handler
 SELECT amname, amhandler, amtype FROM pg_am where amtype = 't' ORDER BY 1, 2;
- amname |      amhandler       | amtype 
---------+----------------------+--------
- heap   | heap_tableam_handler | t
- heap2  | heap_tableam_handler | t
-(2 rows)
+ amname |       amhandler        | amtype 
+--------+------------------------+--------
+ heap   | heap_tableam_handler   | t
+ heap2  | heap_tableam_handler   | t
+ robert | robert_tableam_handler | t
+(3 rows)
 
 -- First create tables employing the new AM using USING
 -- plain CREATE TABLE
index 432d2d812e517d3fdbd35f94cb1bca6cf51cabb2..d792df52365b6e53986465c8f94d3335cce96547 100644 (file)
@@ -3459,3 +3459,8 @@ yyscan_t
 z_stream
 z_streamp
 zic_t
+RobertTuple
+RobertTupleData
+RobertTupleHeader
+RobertTupleHeaderData
+RobertTupleTableSlot