Provide interfaces to store and fetch undo records.
authorDilip Kumar <[email protected]>
Thu, 2 May 2019 05:58:13 +0000 (11:28 +0530)
committerDilip Kumar <[email protected]>
Wed, 17 Jul 2019 11:55:48 +0000 (17:25 +0530)
Add the capability to form undo records and store them in undo logs.  We
also provide the capability to fetch the undo records.  This layer will use
undo-log-storage to reserve the space for the undo records and buffer
management routines to write and read the undo records.

Undo records are stored in sequential order in the undo log.  Each undo
record consists of a variable length header, tuple data, and payload
information.  The undo records are stored without any sort of alignment
padding and a undo record can span across multiple pages.  The undo records
for a transaction can span across multiple undo logs.

Author: Dilip Kumar with contributions from Robert Haas, Amit Kapila,
        Thomas Munro, Vignesh C and Rafia Sabih
Reviewed-by: Earlier version of this patch is reviewed by Amit Kapila
Tested-by: Neha Sharma
Discussion: https://www.postgresql.org/message-id/CAFiTN-uVxxopn0UZ64%3DF-sydbETBbGjWapnBikNo1%3DXv78UeFw%40mail.gmail.com

src/backend/access/undo/Makefile
src/backend/access/undo/README.undointerface [new file with mode: 0644]
src/backend/access/undo/undoaccess.c [new file with mode: 0644]
src/backend/access/undo/undorecord.c [new file with mode: 0755]
src/backend/storage/page/bufpage.c
src/include/access/transam.h
src/include/access/undoaccess.h [new file with mode: 0644]
src/include/access/undolog.h
src/include/access/undorecord.h [new file with mode: 0644]
src/include/storage/bufpage.h

index 219c6963cf8a7c0e6cdb38284e12e59e81216840..049a416f0714d8b380476d7d5d51c30e2f5cc02c 100644 (file)
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undolog.o
+OBJS = undoaccess.o undolog.o undorecord.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/undo/README.undointerface b/src/backend/access/undo/README.undointerface
new file mode 100644 (file)
index 0000000..41e2c0e
--- /dev/null
@@ -0,0 +1,29 @@
+Undo record interface layer
+---------------------------
+This is the next layer which sits on top of the undo log storage, which will
+provide an interface for prepare, insert, or fetch the undo records.  This
+layer will use undo-log-storage to reserve the space for the undo records
+and buffer management routine to write and read the undo records.
+
+Writing an undo record
+----------------------
+To prepare an undo record, first, it will allocate required space using
+undo log storage module.  Next, it will pin and lock the required buffers and
+return an undo record pointer where it will insert the record.  Finally, it
+calls the Insert routine for final insertion of prepared record.  Additionally,
+there is a mechanism for multi-insert, wherein multiple records are prepared
+and inserted at a time.
+
+Fetching and undo record
+------------------------
+To fetch an undo record, a caller must provide a valid undo record pointer.
+Optionally, the caller can provide a callback function with the information of
+the block and offset, which will help in faster retrieval of undo record,
+otherwise, it has to traverse the undo-chain.
+
+There is also an interface to bulk fetch the undo records.  Where the caller
+can provide a TO and FROM undo record pointer and the memory limit for storing
+the undo records.  This API will return all the undo record between FROM and TO
+undo record pointers if they can fit into provided memory limit otherwise, it
+return whatever can fit into the memory limit.  And, the caller can call it
+repeatedly until it fetches all the records.
diff --git a/src/backend/access/undo/undoaccess.c b/src/backend/access/undo/undoaccess.c
new file mode 100644 (file)
index 0000000..a8cf469
--- /dev/null
@@ -0,0 +1,1754 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoaccess.c
+ *       entry points for inserting/fetching undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undoaccess.c
+ *
+ * NOTES:
+ * Undo record layout:
+ *
+ * Undo records are stored in sequential order in the undo log.  Each undo
+ * record consists of a variable length header, tuple data, and payload
+ * information.  The first undo record of each transaction contains a
+ * transaction header that points to the next transaction's start header.
+ * This allows us to discard the entire transaction's log at one-shot rather
+ * than record-by-record.  The callers are not aware of transaction header,
+ * this is entirely maintained and used by undo record layer.   See
+ * undorecord.h for detailed information about undo record header.
+ *
+ * Multiple logs:
+ *
+ * It is possible that the undo records for a transaction spans multiple undo
+ * logs.  We need some special handling while inserting them to ensure that
+ * discard and rollbacks can work sanely.
+ *
+ * When the undo record for a transaction gets inserted in the next log then we
+ * add a transaction header for the first record of the transaction in the new
+ * log and connect this undo record to the first record of the transaction in
+ * the next log by updating the "uur_next" field.
+ *
+ * We will also keep a previous undo record pointer to the first and last undo
+ * record of the transaction in the previous log.  The last undo record
+ * location is used find the previous undo record pointer during rollback.
+ * The first undo record location is used to find the previous transaction
+ * header which is required to update the undo apply progress.
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/undorecord.h"
+#include "access/undoaccess.h"
+#include "access/undolog_xlog.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/tablecmds.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "miscadmin.h"
+
+/*
+ * Information for compression of the undo records on a page so that we can
+ * avoid duplicating same values across multiple undo records on a page.
+ *
+ * The cid/xid/reloid/rmid information will be added in the undo record header
+ * in the following cases:
+ * a) The first undo record of the transaction.
+ * b) First undo record of the page.
+ * c) All subsequent record for the transaction which is not the first
+ *       transaction on the page.
+ * Except above cases,  If the rmid/reloid/xid/cid is same in the subsequent
+ * records this information will not be stored in the record, these information
+ * will be retrieved from the first undo record of that page.
+ * If any of the member rmid/reloid/xid/cid has changed, the changed information
+ * will be stored in the undo record and the remaining information will be
+ * retrieved from the first complete undo record of the page
+ */
+UndoCompressionInfo undo_compression_info[UndoLogCategories];
+
+/* Prototypes for static functions. */
+static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
+                                                                                       UndoRecPtr urp, RelFileNode rnode,
+                                                                                       UndoLogCategory category,
+                                                                                       Buffer *prevbuf);
+static int     UndoRecordPrepareTransInfo(UndoRecordInsertContext *context,
+                                                                          UndoRecPtr xact_urp, int size, int offset);
+static void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context,
+                                                                         int idx);
+static void UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context,
+                                                                               UndoRecPtr urecptr, UndoRecPtr xact_urp);
+static int     UndoGetBufferSlot(UndoRecordInsertContext *context,
+                                                         RelFileNode rnode, BlockNumber blk,
+                                                         ReadBufferMode rbm);
+static uint16 UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer,
+                                                                  UndoLogCategory category);
+static bool UndoSetCommonInfo(UndoCompressionInfo *compressioninfo,
+                                                         UnpackedUndoRecord *urec, UndoRecPtr urp,
+                                                         Buffer buffer);
+
+/*
+ * Structure to hold the prepared undo information.
+ */
+struct PreparedUndoSpace
+{
+       UndoRecPtr      urp;                    /* undo record pointer */
+       UnpackedUndoRecord *urec;       /* undo record */
+       uint16          size;                   /* undo record size */
+       int                     undo_buffer_idx[MAX_BUFFER_PER_UNDO];   /* undo_buffer array
+                                                                                                                * index */
+};
+
+/*
+ * This holds undo buffers information required for PreparedUndoSpace during
+ * prepare undo time.  Basically, during prepare time which is called outside
+ * the critical section we will acquire all necessary undo buffers pin and lock.
+ * Later, during insert phase we will write actual records into thse buffers.
+ */
+struct PreparedUndoBuffer
+{
+       UndoLogNumber logno;            /* Undo log number */
+       BlockNumber blk;                        /* block number */
+       Buffer          buf;                    /* buffer allocated for the block */
+       bool            zero;                   /* new block full of zeroes */
+};
+
+/*
+ * Compute the size of the partial record on the undo page.
+ *
+ * Compute the complete record size by uur_info and variable field length
+ * stored in the page header and then subtract the offset of the record so that
+ * we can get the exact size of partial record in this page.
+ */
+static inline Size
+UndoPagePartialRecSize(UndoPageHeader phdr)
+{
+       Size            size = UndoRecordHeaderSize(phdr->uur_info);
+
+       /*
+        * Add length of the variable part and undo length. Now, we know the
+        * complete length of the undo record.
+        */
+       size += phdr->tuple_len + phdr->payload_len + sizeof(uint16);
+
+       /*
+        * Subtract the size which is stored in the previous page to get the
+        * partial record size stored in this page.
+        */
+       size -= phdr->record_offset;
+
+       return size;
+}
+
+/*
+ * Prepare to update the transaction header
+ *
+ * It's a helper function for PrepareUpdateNext and
+ * PrepareUpdateUndoActionProgress
+ *
+ * xact_urp  - undo record pointer to be updated.
+ * size - number of bytes to be updated.
+ * offset - offset in undo record where to start update.
+ */
+static int
+UndoRecordPrepareTransInfo(UndoRecordInsertContext *context,
+                                                  UndoRecPtr xact_urp, int size, int offset)
+{
+       BlockNumber cur_blk;
+       RelFileNode rnode;
+       int                     starting_byte;
+       int                     bufidx;
+       int                     index = 0;
+       int                     remaining_bytes;
+       XactUndoRecordInfo *xact_info;
+
+       xact_info = &context->xact_urec_info[context->nxact_urec_info];
+
+       UndoRecPtrAssignRelFileNode(rnode, xact_urp);
+       cur_blk = UndoRecPtrGetBlockNum(xact_urp);
+       starting_byte = UndoRecPtrGetPageOffset(xact_urp);
+
+       /* Remaining bytes on the current block. */
+       remaining_bytes = BLCKSZ - starting_byte;
+
+       /*
+        * Is there some byte of the urec_next on the current block, if not then
+        * start from the next block.
+        */
+       if (remaining_bytes <= offset)
+       {
+               cur_blk++;
+               starting_byte = UndoLogBlockHeaderSize;
+               starting_byte += (offset - remaining_bytes);
+       }
+       else
+               starting_byte += offset;
+
+       /*
+        * Set the offset in the first block where we need to start writing,
+        * during the prepare phase so that during update phase we need not to
+        * compute it again.
+        */
+       xact_info->offset = starting_byte;
+
+       /* Loop until we have fetched all the buffers in which we need to write. */
+       while (size > 0)
+       {
+               bufidx = UndoGetBufferSlot(context, rnode, cur_blk, RBM_NORMAL);
+
+               Assert(index < MAX_BUFFER_PER_UNDO);
+
+               xact_info->idx_undo_buffers[index++] = bufidx;
+               size -= (BLCKSZ - starting_byte);
+               starting_byte = UndoLogBlockHeaderSize;
+               cur_blk++;
+       }
+
+       xact_info->next = InvalidUndoRecPtr;
+       xact_info->progress = 0;
+       xact_info->urecptr = xact_urp;
+       context->nxact_urec_info++;
+
+       return (context->nxact_urec_info - 1);
+}
+
+/*
+ * Prepare to update the transaction's next undo pointer.
+ *
+ * Lock necessary buffer for updating the next of the transaction header.  This
+ * function is called for
+ * a. Updating the current transaction's start undo record pointer in previous
+ * transaction's start header.
+ * b. For multi-log transaction update the start undo record pointer of the
+ * current log in the same transaction's start undo record pointer in the
+ * previous log.
+ *
+ * xact_urp - undo record pointer to be updated
+ * urecptr - current transaction's undo record pointer which need to be set in
+ *                      the previous transaction's header.
+ */
+static void
+UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context,
+                                                       UndoRecPtr urecptr, UndoRecPtr xact_urp)
+{
+       UndoLogSlot *slot;
+       int                     index = 0;
+       int                     offset;
+
+       /*
+        * The absence of previous transaction's undo indicate that this backend
+        * is preparing its first undo in which case we have nothing to update.
+        */
+       if (!UndoRecPtrIsValid(xact_urp))
+               return;
+
+       slot = UndoLogGetSlot(UndoRecPtrGetLogNo(xact_urp), false);
+
+       /*
+        * Acquire the discard lock before reading the undo record so that discard
+        * worker doesn't remove the record while we are in process of reading it.
+        */
+       LWLockAcquire(&slot->discard_update_lock, LW_SHARED);
+       /* Check if it is already discarded. */
+       if (UndoRecPtrIsDiscarded(xact_urp))
+       {
+               /* Release lock and return. */
+               LWLockRelease(&slot->discard_update_lock);
+               return;
+       }
+
+       /* Compute the offset of the uur_next in the undo record. */
+       offset = SizeOfUndoRecordHeader +
+               offsetof(UndoRecordTransaction, urec_next);
+
+       index = UndoRecordPrepareTransInfo(context, xact_urp,
+                                                                          sizeof(UndoRecPtr), offset);
+
+       /*
+        * Set the next pointer in xact_urec_info, this will be overwritten in
+        * actual undo record during update phase.
+        */
+       context->xact_urec_info[index].next = urecptr;
+
+       /* We can now release the discard lock as we have read the undo record. */
+       LWLockRelease(&slot->discard_update_lock);
+}
+
+/*
+ * Overwrite the first undo record of the previous transaction to update its
+ * next pointer.
+ *
+ * This will insert the already prepared record by UndoRecordPrepareTransInfo.
+ * This must be called under the critical section.  This will just overwrite the
+ * header of the undo record.
+ */
+static void
+UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, int idx)
+{
+       Page            page = NULL;
+       int                     i = 0;
+       int                     write_bytes;
+       int                     write_offset;
+       char       *sourceptr;
+       XactUndoRecordInfo *xact_info = &context->xact_urec_info[idx];
+
+       /* Whether to update the next or undo apply progress. */
+       if (UndoRecPtrIsValid(xact_info->next))
+       {
+               sourceptr = (char *) &xact_info->next;
+               write_bytes = sizeof(xact_info->next);
+       }
+       else
+       {
+               sourceptr = (char *) &xact_info->progress;
+               write_bytes = sizeof(xact_info->progress);
+       }
+
+       /* Where to start writing in the current block. */
+       write_offset = xact_info->offset;
+
+       /*
+        * Start writing directly from the write offset calculated during prepare
+        * phase.  And, loop until we write required bytes.
+        */
+       while (write_bytes > 0)
+       {
+               Buffer          buffer;
+               int                     buf_idx;
+               int                     can_write;
+               char       *writeptr;
+
+               buf_idx = xact_info->idx_undo_buffers[i];
+               buffer = context->prepared_undo_buffers[buf_idx].buf;
+
+               /* How may bytes can be written in the current page. */
+               can_write = Min((BLCKSZ - write_offset), write_bytes);
+
+               /*
+                * If buffer is valid then write it otherwise just skip writing it but
+                * compute the variable for writing into the next block.
+                */
+               if (BufferIsValid(buffer))
+               {
+                       page = BufferGetPage(buffer);
+
+                       /* Compute the write pointer. */
+                       writeptr = (char *) page + write_offset;
+
+                       /* Copy the bytes we can write. */
+                       memcpy(writeptr, sourceptr, can_write);
+                       MarkBufferDirty(buffer);
+               }
+
+               /* Update bookkeeping information. */
+               write_bytes -= can_write;
+               sourceptr += can_write;
+               write_offset = UndoLogBlockHeaderSize;
+               i++;
+       }
+}
+
+/*
+ * Find the block number in undo buffer array
+ *
+ * If it is present then just return its index otherwise search the buffer and
+ * insert an entry and lock the buffer in exclusive mode.
+ *
+ * Undo log insertions are append-only.  If the caller is writing new data
+ * that begins exactly at the beginning of a page, then there cannot be any
+ * useful data after that point.  In that case RBM_ZERO can be passed in as
+ * rbm so that we can skip a useless read of a disk block.  In all other
+ * cases, RBM_NORMAL should be passed in, to read the page in if it doesn't
+ * happen to be already in the buffer pool.
+ */
+static int
+UndoGetBufferSlot(UndoRecordInsertContext *context,
+                                 RelFileNode rnode,
+                                 BlockNumber blk,
+                                 ReadBufferMode rbm)
+{
+       int                     i;
+       Buffer          buffer;
+       XLogRedoAction action = BLK_NEEDS_REDO;
+       PreparedUndoBuffer *prepared_buffer;
+       UndoLogCategory category = context->alloc_context.category;
+
+       /* Don't do anything, if we already have a buffer pinned for the block. */
+       for (i = 0; i < context->nprepared_undo_buffer; i++)
+       {
+               prepared_buffer = &context->prepared_undo_buffers[i];
+
+               /*
+                * It's not enough to just compare the block number because the
+                * undo_buffer might holds the undo from different undo logs (e.g when
+                * previous transaction start header is in previous undo log) so
+                * compare (logno + blkno).
+                */
+               if ((blk == prepared_buffer->blk) &&
+                       (prepared_buffer->logno == rnode.relNode))
+               {
+                       /* caller must hold exclusive lock on buffer */
+                       Assert(BufferIsLocal(prepared_buffer->buf) ||
+                                  LWLockHeldByMeInMode(BufferDescriptorGetContentLock(
+                                                                                                                                          GetBufferDescriptor(prepared_buffer->buf - 1)),
+                                                                               LW_EXCLUSIVE));
+                       return i;
+               }
+       }
+
+       /*
+        * We did not find the block so allocate the buffer and insert into the
+        * undo buffer array.
+        */
+       if (InRecovery)
+               action = XLogReadBufferForRedoBlock(context->alloc_context.xlog_record,
+                                                                                       rnode,
+                                                                                       UndoLogForkNum,
+                                                                                       blk,
+                                                                                       rbm,
+                                                                                       false,
+                                                                                       &buffer);
+       else
+       {
+               buffer = ReadBufferWithoutRelcache(rnode,
+                                                                                  UndoLogForkNum,
+                                                                                  blk,
+                                                                                  rbm,
+                                                                                  NULL,
+                                                                                  RelPersistenceForUndoLogCategory(category));
+
+               /* Lock the buffer */
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+       }
+
+       prepared_buffer =
+               &context->prepared_undo_buffers[context->nprepared_undo_buffer];
+
+       if (action == BLK_NOTFOUND)
+       {
+               Assert(InRecovery);
+
+               prepared_buffer->buf = InvalidBuffer;
+               prepared_buffer->blk = InvalidBlockNumber;
+       }
+       else
+       {
+               prepared_buffer->buf = buffer;
+               prepared_buffer->blk = blk;
+               prepared_buffer->logno = rnode.relNode;
+               prepared_buffer->zero = rbm == RBM_ZERO;
+       }
+
+       context->nprepared_undo_buffer++;
+
+       return i;
+}
+
+/*
+ * Exclude the common info in undo record flag and also set the compression
+ * info in the context.
+ *
+ * This function will check what information we need to include in the current
+ * undo record based on the undo compression information.  And, it will also
+ * update the compression info if we are writing the first undo record on the
+ * page.
+ */
+static bool
+UndoSetCommonInfo(UndoCompressionInfo *compressioninfo,
+                                 UnpackedUndoRecord *urec, UndoRecPtr urp,
+                                 Buffer buffer)
+{
+       bool            record_updated = false;
+       bool            first_complete_undo = false;
+       UndoRecPtr      lasturp = compressioninfo->last_urecptr;
+
+       /*
+        * If we have valid compression info and the for the same transaction and
+        * the current undo record is on the same block as the last undo record
+        * then exclude the common information which are same as first complete
+        * record on the page.
+        */
+       if (compressioninfo->valid &&
+               FullTransactionIdEquals(compressioninfo->fxid, urec->uur_fxid) &&
+               UndoRecPtrGetBlockNum(urp) == UndoRecPtrGetBlockNum(lasturp))
+       {
+               urec->uur_info &= ~UREC_INFO_XID;
+
+               /* Don't include rmid if it's same. */
+               if (urec->uur_rmid == compressioninfo->rmid)
+                       urec->uur_info &= ~UREC_INFO_RMID;
+
+               /* Don't include reloid if it's same. */
+               if (urec->uur_reloid == compressioninfo->reloid)
+                       urec->uur_info &= ~UREC_INFO_RELOID;
+
+               /* Don't include cid if it's same. */
+               if (urec->uur_cid == compressioninfo->cid)
+                       urec->uur_info &= ~UREC_INFO_CID;
+
+               record_updated = true;
+       }
+
+       /*
+        * If the undo record is starting just after the undo page header then
+        * this is the first complete undo on the page.
+        */
+       if (UndoRecPtrGetPageOffset(urp) == SizeOfUndoPageHeaderData)
+               first_complete_undo = true;
+       else if (UndoRecPtrIsValid(lasturp))
+       {
+               /*
+                * If we already have the valid last undo record pointer which
+                * inserted undo in this log then we can identify whether this is the
+                * first undo of the page by checking the block number of the previous
+                * record and the current record.
+                */
+               if (UndoRecPtrGetBlockNum(urp) != UndoRecPtrGetBlockNum(lasturp))
+                       first_complete_undo = true;
+       }
+       else
+       {
+               Page            page = BufferGetPage(buffer);
+               uint16          offset;
+               UndoPageHeader phdr = (UndoPageHeader) page;
+
+               /*
+                * We need to compute the offset of the first complete record of the
+                * page and if this undo record starting from that page then this is
+                * the first complete record on the page.
+                */
+               offset = SizeOfUndoPageHeaderData + UndoPagePartialRecSize(phdr);
+               if (UndoRecPtrGetPageOffset(urp) == offset)
+                       first_complete_undo = true;
+       }
+
+       /*
+        * If we are writing first undo record for the page the we can set the
+        * compression so that subsequent records from the same transaction can
+        * avoid including common information in the undo records.
+        */
+       if (first_complete_undo)
+       {
+               /* Set this information. */
+               compressioninfo->rmid = urec->uur_rmid;
+               compressioninfo->reloid = urec->uur_reloid;
+               compressioninfo->fxid = urec->uur_fxid;
+               compressioninfo->cid = urec->uur_cid;
+
+               /* Set that we have valid compression info. */
+               compressioninfo->valid = true;
+       }
+
+       return record_updated;
+}
+
+/*
+ * This function must be called before all the undo records which are going to
+ * get inserted under a single WAL record.
+ *
+ * nprepared - This defines the max number of undo records that can be
+ * prepared before inserting them.
+ */
+void
+BeginUndoRecordInsert(UndoRecordInsertContext *context,
+                                         UndoLogCategory category,
+                                         int nprepared,
+                                         XLogReaderState *xlog_record)
+{
+       uint32          nbuffers;
+
+       /* At least one prepared record should be there. */
+       if (nprepared <= 0)
+               elog(ERROR, "at least one undo record should be prepared");
+
+       /* Initialize undo log context. */
+       UndoLogBeginInsert(&context->alloc_context, category, xlog_record);
+
+       /* Initialize undo insert context. */
+       context->max_prepared_undo = nprepared;
+       context->nprepared_undo = 0;
+       context->nprepared_undo_buffer = 0;
+       context->nxact_urec_info = 0;
+
+       /* Allocate memory for prepared undo record space. */
+       context->prepared_undo = (PreparedUndoSpace *) palloc(nprepared *
+                                                                                                                 sizeof(PreparedUndoSpace));
+
+       /* Compute number of buffers. */
+       nbuffers = (nprepared + MAX_XACT_UNDO_INFO) * MAX_BUFFER_PER_UNDO;
+
+       /*
+        * Copy the compression global compression info to our context before
+        * starting prepare because this value might get updated multiple time in
+        * case of multi-prepare but the global value should be updated only after
+        * we have successfully inserted the undo record.
+        */
+       memcpy(&context->undo_compression_info[category],
+                  &undo_compression_info[category], sizeof(UndoCompressionInfo));
+
+       /* Allocate memory for the prepared buffers. */
+       context->prepared_undo_buffers =
+               palloc(nbuffers * sizeof(PreparedUndoBuffer));
+}
+
+/*
+ * Call PrepareUndoInsert to tell the undo subsystem about the undo record you
+ * intended to insert.  Upon return, the necessary undo buffers are pinned and
+ * locked.
+ *
+ * This should be done before any critical section is established, since it
+ * can fail.
+ */
+UndoRecPtr
+PrepareUndoInsert(UndoRecordInsertContext *context,
+                                 UnpackedUndoRecord *urec,
+                                 Oid dbid)
+{
+       UndoRecordSize size;
+       UndoRecPtr      urecptr = InvalidUndoRecPtr;
+       RelFileNode rnode;
+       UndoRecordSize cur_size = 0;
+       BlockNumber cur_blk;
+       FullTransactionId fxid;
+       int                     starting_byte;
+       int                     index = 0;
+       int                     bufidx;
+       bool            logswitched = false;
+       bool            resize = false;
+       ReadBufferMode rbm;
+       bool            need_xact_header;
+       UndoRecPtr      last_xact_start;
+       UndoRecPtr      prevlog_xact_start = InvalidUndoRecPtr;
+       UndoRecPtr      prevlog_insert_urp = InvalidUndoRecPtr;
+       UndoRecPtr      prevlogurp = InvalidUndoRecPtr;
+       PreparedUndoSpace *prepared_undo;
+       UndoCompressionInfo *compression_info =
+       &context->undo_compression_info[context->alloc_context.category];
+
+       /* Already reached maximum prepared limit. */
+       if (context->nprepared_undo == context->max_prepared_undo)
+               elog(ERROR, "already reached the maximum prepared limit");
+
+       /* Extract the full transaction id from the input undo record. */
+       fxid = urec->uur_fxid;
+       Assert(FullTransactionIdIsValid(fxid));
+
+       /*
+        * We don't yet know if this record needs a transaction header (ie is the
+        * first undo record for a given transaction in a given undo log), because
+        * you can only find out by allocating.  We'll resolve this circularity by
+        * allocating enough space for a transaction header.  Similarly log switch
+        * will only be detected after allocation so include the log switch header
+        * and common information because in case of log switch we need to include
+        * log switch header and also we need to include common header.  After
+        * allocation We'll only advance by as many bytes as we turn out to need.
+        */
+       UndoRecordSetInfo(urec);
+       urec->uur_info |= UREC_INFO_TRANSACTION;
+       urec->uur_info |= UREC_INFO_LOGSWITCH;
+       urec->uur_info |= UREC_INFO_PAGE_COMMON;
+
+       size = UndoRecordExpectedSize(urec);
+
+       /* Allocate space for the record. */
+       if (InRecovery)
+       {
+               /*
+                * We'll figure out where the space needs to be allocated by
+                * inspecting the xlog_record.  We don't expect to see temporary or
+                * unlogged undo data here.
+                */
+               Assert(context->alloc_context.category != UNDO_TEMP &&
+                          context->alloc_context.category != UNDO_UNLOGGED);
+               urecptr = UndoLogAllocateInRecovery(&context->alloc_context,
+                                                                                       XidFromFullTransactionId(fxid),
+                                                                                       size,
+                                                                                       &need_xact_header,
+                                                                                       &last_xact_start,
+                                                                                       &prevlog_xact_start,
+                                                                                       &prevlogurp);
+       }
+       else
+       {
+               /* Allocate space for writing the undo record. */
+               urecptr = UndoLogAllocate(&context->alloc_context,
+                                                                 size,
+                                                                 &need_xact_header, &last_xact_start,
+                                                                 &prevlog_xact_start, &prevlog_insert_urp);
+
+               /*
+                * If prevlog_xact_start is a valid undo record pointer that means
+                * this transaction's undo records are split across undo logs.
+                */
+               if (UndoRecPtrIsValid(prevlog_xact_start))
+               {
+                       uint16          prevlen;
+
+                       /*
+                        * If undo log is switch during transaction then we must get a
+                        * valid insert location in the previous undo log so that we can
+                        * compute the undo record pointer of the transaction's last
+                        * record in the previous undo log.
+                        */
+                       Assert(UndoRecPtrIsValid(prevlog_insert_urp));
+
+                       /* Fetch length of the last undo record of the previous log. */
+                       prevlen = UndoGetPrevRecordLen(prevlog_insert_urp, InvalidBuffer,
+                                                                                  context->alloc_context.category);
+
+                       /*
+                        * If the undo log got switched during the transaction then for
+                        * collecting all the undo record for the transaction during bulk
+                        * fetch,  we  can not read the prevlen from the end of the record
+                        * as we will not know what was the previous undo log.  So during
+                        * log switch we will directly store the last undo record pointer
+                        * of the transaction into transaction's first record of the next
+                        * undo log.
+                        *
+                        * TODO:  instead of storing this in the transaction header we can
+                        * have separate undo log switch header and store it there.
+                        */
+                       prevlogurp =
+                               MakeUndoRecPtr(UndoRecPtrGetLogNo(prevlog_insert_urp),
+                                                          (UndoRecPtrGetOffset(prevlog_insert_urp) - prevlen));
+
+                       /*
+                        * Undo log switched so set prevlog info in current undo log.
+                        *
+                        * XXX can we do this directly in UndoLogAllocate ? but for that
+                        * the UndoLogAllocate might need to read the length of the last
+                        * undo record from the previous undo log but for that it might
+                        * use callback?
+                        */
+                       UndoLogSwitchSetPrevLogInfo(UndoRecPtrGetLogNo(urecptr),
+                                                                               prevlog_xact_start, prevlogurp);
+               }
+       }
+
+       /*
+        * If undo log is switched then set the logswitch flag and also reset the
+        * compression info because we can use same compression info for the new
+        * undo log.
+        */
+       if (UndoRecPtrIsValid(prevlog_xact_start))
+       {
+               logswitched = true;
+               compression_info->valid = false;
+               compression_info->last_urecptr = InvalidUndoRecPtr;
+       }
+
+       /*
+        * If we need a transaction header then allocate memory for it and
+        * initialize it.
+        */
+       if (need_xact_header)
+       {
+               urec->uur_txn = palloc(SizeOfUndoRecordTransaction);
+               urec->uur_txn->urec_dbid = dbid;
+               urec->uur_txn->urec_progress = InvalidBlockNumber;
+               urec->uur_txn->urec_next = InvalidUndoRecPtr;
+       }
+       else
+       {
+               /* We don't need a transaction header after all. */
+               urec->uur_info &= ~UREC_INFO_TRANSACTION;
+               resize = true;
+               urec->uur_txn = NULL;
+       }
+
+       /*
+        * If undo log got switched then initialize the log switch header
+        * otherwise reset it in uur_info and recalculate the size.
+        */
+       if (logswitched)
+       {
+               urec->uur_logswitch = palloc(SizeOfUndoRecordLogSwitch);
+               urec->uur_logswitch->urec_prevurp = prevlogurp;
+               urec->uur_logswitch->urec_prevlogstart = prevlog_xact_start;
+       }
+       else
+       {
+               /* We don't need a log transaction header after all. */
+               urec->uur_info &= ~UREC_INFO_LOGSWITCH;
+               resize = true;
+               urec->uur_logswitch = NULL;
+       }
+
+       /*
+        * If there is a physically preceding transaction in this undo log, and we
+        * are writing the first record for this transaction that is in this undo
+        * log (not necessarily the first ever for the transaction, because we
+        * could have switched logs), then we need to update the size of the
+        * preceding transaction.
+        */
+       if (need_xact_header &&
+               UndoRecPtrGetOffset(urecptr) > UndoLogBlockHeaderSize)
+               UndoRecordPrepareUpdateNext(context, urecptr, last_xact_start);
+
+       cur_blk = UndoRecPtrGetBlockNum(urecptr);
+       UndoRecPtrAssignRelFileNode(rnode, urecptr);
+       starting_byte = UndoRecPtrGetPageOffset(urecptr);
+
+       /*
+        * If we happen to be writing the very first byte into this page, then
+        * there is no need to read from disk.
+        */
+       if (starting_byte == UndoLogBlockHeaderSize)
+               rbm = RBM_ZERO;
+       else
+               rbm = RBM_NORMAL;
+
+       prepared_undo = &context->prepared_undo[context->nprepared_undo];
+
+       do
+       {
+               bufidx = UndoGetBufferSlot(context, rnode, cur_blk, rbm);
+               if (cur_size == 0)
+                       cur_size = BLCKSZ - starting_byte;
+               else
+                       cur_size += BLCKSZ - UndoLogBlockHeaderSize;
+
+               /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+               Assert(index < MAX_BUFFER_PER_UNDO);
+
+               /* Keep the track of the buffers we have pinned and locked. */
+               prepared_undo->undo_buffer_idx[index++] = bufidx;
+
+               /*
+                * If we need more pages they'll be all new so we can definitely skip
+                * reading from disk.
+                */
+               rbm = RBM_ZERO;
+               cur_blk++;
+       } while (cur_size < size);
+
+       /*
+        * Set/overwrite compression info if required and also exclude the common
+        * fields from the undo record if possible.
+        */
+       if (UndoSetCommonInfo(compression_info, urec, urecptr,
+                                                 context->prepared_undo_buffers[prepared_undo->undo_buffer_idx[0]].buf))
+               resize = true;
+
+       if (resize)
+               size = UndoRecordExpectedSize(urec);
+
+       /*
+        * If the transaction's undo records are split across the undo logs.  So
+        * we need to  update our own transaction header in the previous log.
+        */
+       if (logswitched)
+       {
+               Assert(UndoRecPtrIsValid(prevlogurp));
+               UndoRecordPrepareUpdateNext(context, urecptr, prevlog_xact_start);
+       }
+
+       UndoLogAdvance(&context->alloc_context, size);
+
+       /*
+        * Save prepared undo record information into the context which will be
+        * used by InsertPreparedUndo to insert the undo record.
+        */
+       prepared_undo->urec = urec;
+       prepared_undo->urp = urecptr;
+       prepared_undo->size = size;
+
+       /* Set the current undo pointer in the compression info. */
+       compression_info->last_urecptr = urecptr;
+
+       context->nprepared_undo++;
+
+       return urecptr;
+}
+
+/*
+ * Insert a previously-prepared undo records.
+ *
+ * This function will write the actual undo record into the buffers which are
+ * already pinned and locked in PreparedUndoInsert, and mark them dirty.  This
+ * step should be performed inside a critical section.
+ */
+void
+InsertPreparedUndo(UndoRecordInsertContext *context)
+{
+       UndoPackContext ucontext = {{0}};
+       PreparedUndoSpace *prepared_undo;
+       Page            page = NULL;
+       int                     starting_byte;
+       int                     bufidx = 0;
+       int                     idx;
+       int                     i;
+
+       /* There must be at least one prepared undo record. */
+       Assert(context->nprepared_undo > 0);
+
+       /*
+        * This must be called under a critical section or we must be in recovery.
+        */
+       Assert(InRecovery || CritSectionCount > 0);
+
+       for (idx = 0; idx < context->nprepared_undo; idx++)
+       {
+               prepared_undo = &context->prepared_undo[idx];
+
+               Assert(prepared_undo->size ==
+                          UndoRecordExpectedSize(prepared_undo->urec));
+
+               bufidx = 0;
+
+               /*
+                * Compute starting offset of the page where to start inserting undo
+                * record.
+                */
+               starting_byte = UndoRecPtrGetPageOffset(prepared_undo->urp);
+
+               /* Initiate inserting the undo record. */
+               BeginInsertUndo(&ucontext, prepared_undo->urec);
+
+               /* Main loop for writing the undo record. */
+               do
+               {
+                       Buffer          buffer;
+
+                       buffer = context->prepared_undo_buffers[
+                                                                                                       prepared_undo->undo_buffer_idx[bufidx]].buf;
+
+                       /*
+                        * During recovery, there might be some blocks which are already
+                        * deleted due to some discard command so we can just skip
+                        * inserting into those blocks.
+                        */
+                       if (!BufferIsValid(buffer))
+                       {
+                               Assert(InRecovery);
+
+                               /*
+                                * Skip actual writing just update the context so that we have
+                                * write offset for inserting into next blocks.
+                                */
+                               SkipInsertingUndoData(&ucontext, BLCKSZ - starting_byte);
+                               if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+                                       break;
+                       }
+                       else
+                       {
+                               page = BufferGetPage(buffer);
+
+                               /*
+                                * Initialize the page whenever we try to write the first
+                                * record in page.  We start writing immediately after the
+                                * block header.
+                                */
+                               if (starting_byte == UndoLogBlockHeaderSize)
+                                       UndoPageInit(page, BLCKSZ, prepared_undo->urec->uur_info,
+                                                                ucontext.already_processed,
+                                                                prepared_undo->urec->uur_tuple.len,
+                                                                prepared_undo->urec->uur_payload.len);
+
+                               /*
+                                * Try to insert the record into the current page. If it
+                                * doesn't succeed then recall the routine with the next page.
+                                */
+                               InsertUndoData(&ucontext, page, starting_byte);
+                               if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+                               {
+                                       MarkBufferDirty(buffer);
+                                       break;
+                               }
+                               MarkBufferDirty(buffer);
+                       }
+
+                       /* Insert remaining record in next block. */
+                       starting_byte = UndoLogBlockHeaderSize;
+                       bufidx++;
+
+                       /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+                       Assert(bufidx < MAX_BUFFER_PER_UNDO);
+               } while (true);
+
+               /* Advance the insert pointer past this record. */
+               UndoLogAdvanceFinal(prepared_undo->urp, prepared_undo->size);
+       }
+
+       /* Update previously prepared transaction headers. */
+       for (i = 0; i < context->nxact_urec_info; i++)
+               UndoRecordUpdateTransInfo(context, i);
+
+       /*
+        * We have successfully inserted prepared undo records so overwrite the
+        * global compression.
+        */
+       memcpy(&undo_compression_info[context->alloc_context.category],
+                  &context->undo_compression_info[context->alloc_context.category],
+                  sizeof(UndoCompressionInfo));
+
+}
+
+/*
+ * Release all the memory and buffer pins hold for inserting the undo records.
+ */
+void
+FinishUndoRecordInsert(UndoRecordInsertContext *context)
+{
+       int                     i;
+
+       /* Release buffer pins and lock. */
+       for (i = 0; i < context->nprepared_undo_buffer; i++)
+       {
+               if (BufferIsValid(context->prepared_undo_buffers[i].buf))
+                       UnlockReleaseBuffer(context->prepared_undo_buffers[i].buf);
+       }
+
+       /*
+        * Release memory for the transaction header and log switch header if we
+        * have allocated it in the prepare time.
+        */
+       for (i = 0; i < context->nprepared_undo; i++)
+       {
+               if (context->prepared_undo[i].urec->uur_txn)
+                       pfree(context->prepared_undo[i].urec->uur_txn);
+               if (context->prepared_undo[i].urec->uur_logswitch)
+                       pfree(context->prepared_undo[i].urec->uur_logswitch);
+       }
+
+       /* Free memory allocated for the prepare undo and prepared buffers. */
+       pfree(context->prepared_undo_buffers);
+       pfree(context->prepared_undo);
+}
+
+/*
+ * Helper function for UndoGetOneRecord
+ *
+ * If any of  rmid/reloid/xid/cid is not available in the undo record, then
+ * it will get the information from the first complete undo record in the
+ * page.
+ */
+static void
+GetCommonUndoRecInfo(UndoPackContext *ucontext, UndoRecPtr urp,
+                                        RelFileNode rnode, UndoLogCategory category, Buffer buffer)
+{
+       /*
+        * If any of the common header field is not available in the current undo
+        * record then we must read it from the first complete record of the page.
+        */
+       if ((ucontext->urec_hd.urec_info & UREC_INFO_PAGE_COMMON) !=
+               UREC_INFO_PAGE_COMMON)
+       {
+               UnpackedUndoRecord first_uur = {0};
+               Page            page = BufferGetPage(buffer);
+               UndoPageHeader undo_phdr = (UndoPageHeader) page;
+               UndoRecPtr      first_urp = InvalidUndoRecPtr;
+               Size            partial_rec_size = SizeOfUndoPageHeaderData;
+               BlockNumber blkno = UndoRecPtrGetBlockNum(urp);
+
+               /*
+                * If there is a partial record in the page then compute the size of
+                * it so that we can compute the undo record pointer of the first
+                * complete undo record of the page.
+                */
+               if (undo_phdr->record_offset != 0)
+                       partial_rec_size += UndoPagePartialRecSize(undo_phdr);
+
+               /*
+                * Compute the undo record pointer of the first complete record of the
+                * page.
+                */
+               first_urp = MakeUndoRecPtr(rnode.relNode,
+                                                                  UndoRecPageOffsetGetRecPtr(partial_rec_size, blkno));
+
+               /* Fetch the first undo record of the page. */
+               UndoGetOneRecord(&first_uur, first_urp, rnode, category, &buffer);
+
+               /*
+                * Get all missing common header information from the first undo
+                * records.
+                */
+               if ((ucontext->urec_hd.urec_info & UREC_INFO_RMID) == 0)
+                       ucontext->urec_rmid = first_uur.uur_rmid;
+
+               if ((ucontext->urec_hd.urec_info & UREC_INFO_RELOID) == 0)
+                       ucontext->urec_reloid = first_uur.uur_reloid;
+
+               if ((ucontext->urec_hd.urec_info & UREC_INFO_XID) == 0)
+                       ucontext->urec_fxid = first_uur.uur_fxid;
+
+               if ((ucontext->urec_hd.urec_info & UREC_INFO_CID) == 0)
+                       ucontext->urec_cid = first_uur.uur_cid;
+
+               ucontext->urec_hd.urec_info |= UREC_INFO_PAGE_COMMON;
+       }
+}
+
+/*
+ * Helper function for UndoFetchRecord and UndoBulkFetchRecord
+ *
+ * curbuf - If an input buffer is valid then this function will not release the
+ * pin on that buffer.  If the buffer is not valid then it will assign curbuf
+ * with the first buffer of the current undo record and also it will keep the
+ * pin and lock on that buffer in a hope that while traversing the undo chain
+ * the caller might want to read the previous undo record from the same block.
+ */
+static UnpackedUndoRecord *
+UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode,
+                                UndoLogCategory category, Buffer *curbuf)
+{
+       Page            page;
+       int                     starting_byte = UndoRecPtrGetPageOffset(urp);
+       BlockNumber cur_blk;
+       UndoPackContext ucontext = {{0}};
+       Buffer          buffer = *curbuf;
+
+       cur_blk = UndoRecPtrGetBlockNum(urp);
+
+       /* Initiate unpacking one undo record. */
+       BeginUnpackUndo(&ucontext);
+
+       while (true)
+       {
+               /* If we already have a buffer then no need to allocate a new one. */
+               if (!BufferIsValid(buffer))
+               {
+                       buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, cur_blk,
+                                                                                          RBM_NORMAL, NULL,
+                                                                                          RelPersistenceForUndoLogCategory(category));
+
+                       /*
+                        * Remember the first buffer where this undo started as next undo
+                        * record what we fetch might fall on the same buffer.
+                        */
+                       if (!BufferIsValid(*curbuf))
+                               *curbuf = buffer;
+               }
+
+               /* Acquire shared lock on the buffer before reading undo from it. */
+               LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+               page = BufferGetPage(buffer);
+
+               UnpackUndoData(&ucontext, page, starting_byte);
+
+               /*
+                * We are done if we have reached to the done stage otherwise move to
+                * next block and continue reading from there.
+                */
+               if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+               {
+                       if (buffer != *curbuf)
+                               UnlockReleaseBuffer(buffer);
+
+                       /*
+                        * Get any of the missing fields from the first record of the
+                        * page.
+                        */
+                       GetCommonUndoRecInfo(&ucontext, urp, rnode, category, *curbuf);
+                       break;
+               }
+
+               /*
+                * The record spans more than a page so we would have copied it (see
+                * UnpackUndoRecord).  In such cases, we can release the buffer.
+                */
+               if (buffer != *curbuf)
+                       UnlockReleaseBuffer(buffer);
+               buffer = InvalidBuffer;
+
+               /* Go to next block. */
+               cur_blk++;
+               starting_byte = UndoLogBlockHeaderSize;
+       }
+
+       /* Final step of unpacking. */
+       FinishUnpackUndo(&ucontext, urec);
+
+       /* Unlock the buffer but keep the pin. */
+       LockBuffer(*curbuf, BUFFER_LOCK_UNLOCK);
+
+       return urec;
+}
+
+/*
+ * BeginUndoFetch to fetch undo record.
+ */
+void
+BeginUndoFetch(UndoRecordFetchContext *context)
+{
+       context->buffer = InvalidBuffer;
+       context->urp = InvalidUndoRecPtr;
+}
+
+/*
+ * Fetch the undo record for given undo record pointer.
+ *
+ * This will internally allocate the memory for the unpacked undo record which
+ * intern will hold the pointers to the optional headers and the variable data.
+ * The undo record should be freed by the caller by calling ReleaseUndoRecord.
+ * This function will old the pin on the buffer where we read the previous undo
+ * record so that when this function is called repeatedly with the same context
+ * then it can be benefitted by avoid reading the buffer again if the current
+ * undo record is in the same buffer.
+ */
+UnpackedUndoRecord *
+UndoFetchRecord(UndoRecordFetchContext *context, UndoRecPtr urp)
+{
+       RelFileNode rnode;
+       int                     logno;
+       UndoLogSlot *slot;
+       UnpackedUndoRecord *uur = NULL;
+
+       logno = UndoRecPtrGetLogNo(urp);
+       slot = UndoLogGetSlot(logno, true);
+
+       /*
+        * If slot is NULL that means undo log number is unknown.  Presumably it
+        * has been entirely discarded.
+        */
+       if (slot == NULL)
+               return NULL;
+
+       /*
+        * Prevent UndoDiscardOneLog() from discarding data while we try to read
+        * it.  Usually we would acquire log->mutex to read log->meta members, but
+        * in this case we know that discard can't move without also holding
+        * log->discard_lock.
+        *
+        * In Hot Standby mode log->oldest_data is never initialized because it's
+        * get updated by undo discard worker whereas in HotStandby undo logs are
+        * getting discarded using discard WAL.  So in HotStandby we can directly
+        * check whether the undo record pointer is discarded or not.  But, we can
+        * not do same for normal case because discard worker can concurrently
+        * discard the undo logs.
+        *
+        * XXX We can avoid this check by always initializing log->oldest_data in
+        * HotStandby mode as well whenever we apply discard WAL.  But, for doing
+        * that we need to acquire discard lock just for setting this variable?
+        */
+       if (InHotStandby)
+       {
+               if (UndoRecPtrIsDiscarded(urp))
+                       return NULL;
+       }
+       else
+       {
+               LWLockAcquire(&slot->discard_lock, LW_SHARED);
+               if (slot->logno != logno || urp < slot->oldest_data)
+               {
+                       /*
+                        * The slot has been recycled because the undo log was entirely
+                        * discarded, or the pointer is before the oldest data.
+                        */
+                       LWLockRelease(&slot->discard_lock);
+                       return NULL;
+               }
+       }
+
+       /*
+        * Allocate memory for holding the undo record, caller should be
+        * responsible for freeing this memory by calling UndoRecordRelease.
+        */
+       uur = palloc0(sizeof(UnpackedUndoRecord));
+       UndoRecPtrAssignRelFileNode(rnode, urp);
+
+       /*
+        * Before fetching the next record check whether we have a valid buffer in
+        * the context.  If so and if we are reading the current record from the
+        * same then pass that buffer to fetch the undo record, otherwise release
+        * the buffer.
+        */
+       if (BufferIsValid(context->buffer) &&
+               (UndoRecPtrGetLogNo(context->urp) != UndoRecPtrGetLogNo(urp) ||
+                UndoRecPtrGetBlockNum(context->urp) != UndoRecPtrGetBlockNum(urp)))
+       {
+               ReleaseBuffer(context->buffer);
+               context->buffer = InvalidBuffer;
+       }
+
+       /* Fetch the current undo record. */
+       UndoGetOneRecord(uur, urp, rnode, slot->meta.category, &context->buffer);
+
+       /* Release the discard lock after fetching the record. */
+       if (!InHotStandby)
+               LWLockRelease(&slot->discard_lock);
+
+       context->urp = urp;
+
+       return uur;
+}
+
+/*
+ * Finish undo record fetch.
+ */
+void
+FinishUndoFetch(UndoRecordFetchContext *context)
+{
+       if (BufferIsValid(context->buffer))
+               ReleaseBuffer(context->buffer);
+}
+
+/*
+ * Release the memory of the undo record allocated by UndoFetchRecord and
+ * UndoBulkFetchRecord.
+ */
+void
+UndoRecordRelease(UnpackedUndoRecord *urec)
+{
+       /* Release the memory of payload data if we allocated it. */
+       if (urec->uur_payload.data)
+               pfree(urec->uur_payload.data);
+
+       /* Release memory of tuple data if we allocated it. */
+       if (urec->uur_tuple.data)
+               pfree(urec->uur_tuple.data);
+
+       /* Release memory of the transaction header if we allocated it. */
+       if (urec->uur_txn)
+               pfree(urec->uur_txn);
+
+       /* Release memory of the logswitch header if we allocated it. */
+       if (urec->uur_logswitch)
+               pfree(urec->uur_logswitch);
+
+       /* Release the memory of the undo record. */
+       pfree(urec);
+}
+
+/*
+ * Prefetch undo pages, if prefetch_pages are behind prefetch_target
+ */
+static void
+PrefetchUndoPages(RelFileNode rnode, int prefetch_target, int *prefetch_pages,
+                                 BlockNumber to_blkno, BlockNumber from_blkno,
+                                 UndoLogCategory category)
+{
+       int                     nprefetch;
+       BlockNumber startblock;
+       BlockNumber lastprefetched;
+
+       /* Calculate last prefetched page in the previous iteration. */
+       lastprefetched = from_blkno - *prefetch_pages;
+
+       /* We have already prefetched all the pages of the transaction's undo. */
+       if (lastprefetched <= to_blkno)
+               return;
+
+       /* Calculate number of blocks to be prefetched. */
+       nprefetch =
+               Min(prefetch_target - *prefetch_pages, lastprefetched - to_blkno);
+
+       /* Where to start prefetch. */
+       startblock = lastprefetched - nprefetch;
+
+       while (nprefetch--)
+       {
+               PrefetchBufferWithoutRelcache(rnode, MAIN_FORKNUM, startblock++,
+                                                                         category == UNDO_TEMP);
+               (*prefetch_pages)++;
+       }
+}
+
+/*
+ * Read undo records of the transaction in bulk
+ *
+ * Read undo records between from_urecptr and to_urecptr until we exhaust the
+ * the memory size specified by undo_apply_size.  If we could not read all the
+ * records till to_urecptr then the caller should consume current set of records
+ * and call this function again.
+ *
+ * from_urecptr                - Where to start fetching the undo records.  If we can not
+ *                                       read all the records because of memory limit then this
+ *                                       will be set to the previous undo record pointer from where
+ *                                       we need to start fetching on next call. Otherwise it will
+ *                                       be set to InvalidUndoRecPtr.
+ * to_urecptr          - Last undo record pointer to be fetched.
+ * undo_apply_size     - Memory segment limit to collect undo records.
+ * nrecords                    - Number of undo records read.
+ * one_page                    - Caller is applying undo only for one block not for
+ *                                       complete transaction.  If this is set true then instead
+ *                                       of following transaction undo chain using prevlen we will
+ *                                       follow the block prev chain of the block so that we can
+ *                                       avoid reading many unnecessary undo records of the
+ *                                       transaction.
+ */
+UndoRecInfo *
+UndoBulkFetchRecord(UndoRecPtr *from_urecptr, UndoRecPtr to_urecptr,
+                                       int undo_apply_size, int *nrecords, bool one_page)
+{
+       RelFileNode rnode;
+       UndoRecPtr      urecptr,
+                               prev_urec_ptr;
+       BlockNumber blkno;
+       BlockNumber to_blkno;
+       Buffer          buffer = InvalidBuffer;
+       UnpackedUndoRecord *uur = NULL;
+       UndoRecInfo *urp_array;
+       int                     urp_array_size = 1024;
+       int                     urp_index = 0;
+       int                     prefetch_target = 0;
+       int                     prefetch_pages = 0;
+       Size            total_size = 0;
+       FullTransactionId fxid = InvalidFullTransactionId;
+
+       /*
+        * In one_page mode we are fetching undo only for one page instead of
+        * fetching all the undo of the transaction.  Basically, we are fetching
+        * interleaved undo records.  So it does not make sense to do any prefetch
+        * in that case.  Also, if we are fetching undo records from more than one
+        * log, we don't know the boundaries for prefetching.  Hence, we can't use
+        * prefetching in this case.
+        */
+       if (!one_page &&
+               (UndoRecPtrGetLogNo(*from_urecptr) == UndoRecPtrGetLogNo(to_urecptr)))
+               prefetch_target = target_prefetch_pages;
+
+       /*
+        * Allocate initial memory to hold the undo record info, we can expand it
+        * if needed.
+        */
+       urp_array = (UndoRecInfo *) palloc(sizeof(UndoRecInfo) * urp_array_size);
+       urecptr = *from_urecptr;
+
+       prev_urec_ptr = InvalidUndoRecPtr;
+       *from_urecptr = InvalidUndoRecPtr;
+
+       /* Read undo chain backward until we reach to the first undo record.  */
+       while (1)
+       {
+               BlockNumber from_blkno;
+               UndoLogSlot *slot;
+               UndoLogCategory category;
+               int                     size;
+               int                     logno;
+
+               logno = UndoRecPtrGetLogNo(urecptr);
+               slot = UndoLogGetSlot(logno, true);
+               if (slot == NULL)
+               {
+                       if (BufferIsValid(buffer))
+                               ReleaseBuffer(buffer);
+                       return NULL;
+               }
+               category = slot->meta.category;
+
+               UndoRecPtrAssignRelFileNode(rnode, urecptr);
+               to_blkno = UndoRecPtrGetBlockNum(to_urecptr);
+               from_blkno = UndoRecPtrGetBlockNum(urecptr);
+
+               /* Allocate memory for next undo record. */
+               uur = palloc0(sizeof(UnpackedUndoRecord));
+
+               /*
+                * If next undo record pointer to be fetched is not on the same block
+                * then release the old buffer and reduce the prefetch_pages count by
+                * one as we have consumed one page. Otherwise, just set the old
+                * buffer into the new undo record so that UndoGetOneRecord don't read
+                * the buffer again.
+                */
+               blkno = UndoRecPtrGetBlockNum(urecptr);
+               if (!UndoRecPtrIsValid(prev_urec_ptr) ||
+                       UndoRecPtrGetLogNo(prev_urec_ptr) != logno ||
+                       UndoRecPtrGetBlockNum(prev_urec_ptr) != blkno)
+               {
+                       /* Release the previous buffer */
+                       if (BufferIsValid(buffer))
+                       {
+                               ReleaseBuffer(buffer);
+                               buffer = InvalidBuffer;
+                       }
+
+                       if (prefetch_pages > 0)
+                               prefetch_pages--;
+               }
+
+               /*
+                * If prefetch_pages are half of the prefetch_target then it's time to
+                * prefetch again.
+                */
+               if (prefetch_pages < prefetch_target / 2)
+                       PrefetchUndoPages(rnode, prefetch_target, &prefetch_pages, to_blkno,
+                                                         from_blkno, category);
+
+               /*
+                * In one_page mode it's possible that the undo of the transaction
+                * might have been applied by worker and undo got discarded. Prevent
+                * discard worker from discarding undo data while we are reading it.
+                * See detail comment in UndoFetchRecord.  In normal mode we are
+                * holding transaction undo action lock so it can not be discarded.
+                */
+               if (one_page)
+               {
+                       /* Refer comments in UndoFetchRecord. */
+                       if (InHotStandby)
+                       {
+                               if (UndoRecPtrIsDiscarded(urecptr))
+                                       break;
+                       }
+                       else
+                       {
+                               LWLockAcquire(&slot->discard_lock, LW_SHARED);
+                               if (slot->logno != logno || urecptr < slot->oldest_data)
+                               {
+                                       /*
+                                        * The undo log slot has been recycled because it was
+                                        * entirely discarded, or the data has been discarded
+                                        * already.
+                                        */
+                                       LWLockRelease(&slot->discard_lock);
+                                       break;
+                               }
+                       }
+
+                       /* Read the undo record. */
+                       UndoGetOneRecord(uur, urecptr, rnode, category, &buffer);
+
+                       /* Release the discard lock after fetching the record. */
+                       if (!InHotStandby)
+                               LWLockRelease(&slot->discard_lock);
+               }
+               else
+                       UndoGetOneRecord(uur, urecptr, rnode, category, &buffer);
+
+               /*
+                * As soon as the transaction id is changed we can stop fetching the
+                * undo record.  Ideally, to_urecptr should control this but while
+                * reading undo only for a page we don't know what is the end undo
+                * record pointer for the transaction.
+                */
+               if (one_page)
+               {
+                       if (!FullTransactionIdIsValid(fxid))
+                               fxid = uur->uur_fxid;
+                       else if (!FullTransactionIdEquals(fxid, uur->uur_fxid))
+                               break;
+               }
+
+               /* Remember the previous undo record pointer. */
+               prev_urec_ptr = urecptr;
+
+               /*
+                * Calculate the previous undo record pointer of the transaction.  If
+                * we are reading undo only for a page then follow the blkprev chain
+                * of the page.  Otherwise, calculate the previous undo record pointer
+                * using transaction's current undo record pointer and the prevlen. If
+                * undo record has a valid uur_prevurp, this is the case of log switch
+                * during the transaction so we can directly use uur_prevurp as our
+                * previous undo record pointer of the transaction.
+                */
+               if (one_page)
+                       urecptr = uur->uur_prevundo;
+               else if (uur->uur_logswitch)
+                       urecptr = uur->uur_logswitch->urec_prevurp;
+               else if (prev_urec_ptr == to_urecptr ||
+                                uur->uur_info & UREC_INFO_TRANSACTION)
+                       urecptr = InvalidUndoRecPtr;
+               else
+                       urecptr = UndoGetPrevUndoRecptr(prev_urec_ptr, buffer, category);
+
+               /* We have consumed all elements of the urp_array so expand its size. */
+               if (urp_index >= urp_array_size)
+               {
+                       urp_array_size *= 2;
+                       urp_array =
+                               repalloc(urp_array, sizeof(UndoRecInfo) * urp_array_size);
+               }
+
+               /* Add entry in the urp_array */
+               urp_array[urp_index].index = urp_index;
+               urp_array[urp_index].urp = prev_urec_ptr;
+               urp_array[urp_index].uur = uur;
+               urp_index++;
+
+               /* We have fetched all the undo records for the transaction. */
+               if (!UndoRecPtrIsValid(urecptr) || (prev_urec_ptr == to_urecptr))
+                       break;
+
+               size = UnpackedUndoRecordSize(uur);
+               total_size += size;
+
+               /*
+                * Including current record, if we have crossed the memory limit or
+                * undo log got switched then stop processing more records.  Remember
+                * to set the from_urecptr so that on next call we can resume fetching
+                * undo records where we left it.
+                */
+               if (total_size >= undo_apply_size || uur->uur_logswitch)
+               {
+                       *from_urecptr = urecptr;
+                       break;
+               }
+       }
+
+       /* Release the last buffer. */
+       if (BufferIsValid(buffer))
+               ReleaseBuffer(buffer);
+
+       *nrecords = urp_index;
+
+       return urp_array;
+}
+
+/*
+ * Register the undo buffers.
+ */
+void
+RegisterUndoLogBuffers(UndoRecordInsertContext *context, uint8 first_block_id)
+{
+       int                     idx;
+       int                     flags;
+
+       for (idx = 0; idx < context->nprepared_undo_buffer; idx++)
+       {
+               flags = context->prepared_undo_buffers[idx].zero
+                       ? REGBUF_KEEP_DATA_AFTER_CP | REGBUF_WILL_INIT
+                       : REGBUF_KEEP_DATA_AFTER_CP;
+               XLogRegisterBuffer(first_block_id + idx,
+                                                  context->prepared_undo_buffers[idx].buf, flags);
+               UndoLogRegister(&context->alloc_context, first_block_id + idx,
+                                               context->prepared_undo_buffers[idx].logno);
+       }
+}
+
+/*
+ * Set LSN on undo page.
+*/
+void
+UndoLogBuffersSetLSN(UndoRecordInsertContext *context, XLogRecPtr recptr)
+{
+       int                     idx;
+
+       for (idx = 0; idx < context->nprepared_undo_buffer; idx++)
+               PageSetLSN(BufferGetPage(context->prepared_undo_buffers[idx].buf),
+                                  recptr);
+}
+
+/*
+ * Read length of the previous undo record.
+ *
+ * This function will take an undo record pointer as an input and read the
+ * length of the previous undo record which is stored at the end of the previous
+ * undo record.  If the undo record is split then this will add the undo block
+ * header size in the total length.
+ */
+static uint16
+UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer,
+                                        UndoLogCategory category)
+{
+       UndoLogOffset page_offset = UndoRecPtrGetPageOffset(urp);
+       BlockNumber cur_blk = UndoRecPtrGetBlockNum(urp);
+       Buffer          buffer = input_buffer;
+       Page            page = NULL;
+       char       *pagedata = NULL;
+       char            prevlen[2];
+       RelFileNode rnode;
+       int                     byte_to_read = sizeof(uint16);
+       char            persistence;
+       uint16          prev_rec_len = 0;
+
+       /* Get relfilenode. */
+       UndoRecPtrAssignRelFileNode(rnode, urp);
+       persistence = RelPersistenceForUndoLogCategory(category);
+
+       if (BufferIsValid(buffer))
+       {
+               page = BufferGetPage(buffer);
+               pagedata = (char *) page;
+       }
+
+       /*
+        * Length if the previous undo record is store at the end of that record
+        * so just fetch last 2 bytes.
+        */
+       while (byte_to_read > 0)
+       {
+               /* Read buffer if the current buffer is not valid. */
+               if (!BufferIsValid(buffer))
+               {
+                       buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum,
+                                                                                          cur_blk, RBM_NORMAL, NULL,
+                                                                                          persistence);
+
+                       LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+                       page = BufferGetPage(buffer);
+                       pagedata = (char *) page;
+               }
+
+               page_offset -= 1;
+
+               /*
+                * Read current prevlen byte from current block if page_offset hasn't
+                * reach to undo block header.  Otherwise, go to the previous block
+                * and continue reading from there.
+                */
+               if (page_offset >= UndoLogBlockHeaderSize)
+               {
+                       prevlen[byte_to_read - 1] = pagedata[page_offset];
+                       byte_to_read -= 1;
+               }
+               else
+               {
+                       /*
+                        * Release the current buffer if it is not provide by the caller.
+                        */
+                       if (input_buffer != buffer)
+                               UnlockReleaseBuffer(buffer);
+
+                       /*
+                        * Could not read complete prevlen from the current block so go to
+                        * the previous block and start reading from end of the block.
+                        */
+                       cur_blk -= 1;
+                       page_offset = BLCKSZ;
+
+                       /*
+                        * Reset buffer so that we can read it again for the previous
+                        * block.
+                        */
+                       buffer = InvalidBuffer;
+               }
+       }
+
+       prev_rec_len = *(uint16 *) (prevlen);
+
+       /*
+        * If previous undo record is not completely stored in this page then add
+        * UndoLogBlockHeaderSize in total length so that the call can use this
+        * length to compute the undo record pointer of the previous undo record.
+        */
+       if (UndoRecPtrGetPageOffset(urp) - UndoLogBlockHeaderSize < prev_rec_len)
+               prev_rec_len += UndoLogBlockHeaderSize;
+
+       /* Release the buffer if we have locally read it. */
+       if (input_buffer != buffer)
+               UnlockReleaseBuffer(buffer);
+
+       return prev_rec_len;
+}
+
+/*
+ * Calculate the previous undo record pointer for the transaction.
+ *
+ * This will take current undo record pointer of the transaction as an input
+ * and calculate the previous undo record pointer of the transaction.
+ */
+UndoRecPtr
+UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
+                                         UndoLogCategory category)
+{
+       UndoLogNumber logno = UndoRecPtrGetLogNo(urp);
+       UndoLogOffset offset = UndoRecPtrGetOffset(urp);
+       uint16          prevlen;
+
+       /* Read length of the previous undo record. */
+       prevlen = UndoGetPrevRecordLen(urp, buffer, category);
+
+       /* calculate the previous undo record pointer */
+       return MakeUndoRecPtr(logno, offset - prevlen);
+}
diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c
new file mode 100755 (executable)
index 0000000..a44bfbd
--- /dev/null
@@ -0,0 +1,1051 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.c
+ *       encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorecord.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/subtrans.h"
+#include "access/undorecord.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+
+/* Prototypes for static functions. */
+static bool InsertUndoBytes(char *sourceptr, int sourcelen,
+                                                       char **writeptr, char *endptr,
+                                                       int *total_bytes_written, int *partial_write);
+static bool ReadUndoBytes(char *destptr, int readlen,
+                                                 char **readptr, char *endptr,
+                                                 int *total_bytes_read, int *partial_read);
+
+ /*
+  * Compute the header size of the undo record.
+  */
+Size
+UndoRecordHeaderSize(uint16 uur_info)
+{
+       Size            size;
+
+       /* Add fixed header size. */
+       size = SizeOfUndoRecordHeader;
+
+       /* Add size of transaction header if it presets. */
+       if ((uur_info & UREC_INFO_TRANSACTION) != 0)
+               size += SizeOfUndoRecordTransaction;
+
+       /* Add size of rmid if it presets. */
+       if ((uur_info & UREC_INFO_RMID) != 0)
+               size += sizeof(RmgrId);
+
+       /* Add size of reloid if it presets. */
+       if ((uur_info & UREC_INFO_RELOID) != 0)
+               size += sizeof(Oid);
+
+       /* Add size of fxid if it presets. */
+       if ((uur_info & UREC_INFO_XID) != 0)
+               size += sizeof(FullTransactionId);
+
+       /* Add size of cid if it presets. */
+       if ((uur_info & UREC_INFO_CID) != 0)
+               size += sizeof(CommandId);
+
+       /* Add size of forknum if it presets. */
+       if ((uur_info & UREC_INFO_FORK) != 0)
+               size += sizeof(ForkNumber);
+
+       /* Add size of prevundo if it presets. */
+       if ((uur_info & UREC_INFO_PREVUNDO) != 0)
+               size += sizeof(UndoRecPtr);
+
+       /* Add size of the block header if it presets. */
+       if ((uur_info & UREC_INFO_BLOCK) != 0)
+               size += SizeOfUndoRecordBlock;
+
+       /* Add size of the log switch header if it presets. */
+       if ((uur_info & UREC_INFO_LOGSWITCH) != 0)
+               size += SizeOfUndoRecordLogSwitch;
+
+       /* Add size of the payload header if it presets. */
+       if ((uur_info & UREC_INFO_PAYLOAD) != 0)
+               size += SizeOfUndoRecordPayload;
+
+       return size;
+}
+
+/*
+ * Compute and return the expected size of an undo record.
+ */
+Size
+UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+{
+       Size            size;
+
+       /* Header size. */
+       size = UndoRecordHeaderSize(uur->uur_info);
+
+       /* Payload data size. */
+       if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+       {
+               size += uur->uur_payload.len;
+               size += uur->uur_tuple.len;
+       }
+
+       /* Add undo record length size. */
+       size += sizeof(uint16);
+
+       return size;
+}
+
+/*
+ * Calculate the size of the undo record stored on the page.
+ */
+static inline Size
+UndoRecordSizeOnPage(char *page_ptr)
+{
+       uint16          uur_info = ((UndoRecordHeader *) page_ptr)->urec_info;
+       Size            size;
+
+       /* Header size. */
+       size = UndoRecordHeaderSize(uur_info);
+
+       /* Payload data size. */
+       if ((uur_info & UREC_INFO_PAYLOAD) != 0)
+       {
+               UndoRecordPayload *payload = (UndoRecordPayload *) (page_ptr + size);
+
+               size += payload->urec_payload_len;
+               size += payload->urec_tuple_len;
+       }
+
+       return size;
+}
+
+/*
+ * Compute size of the Unpacked undo record in memory
+ */
+Size
+UnpackedUndoRecordSize(UnpackedUndoRecord *uur)
+{
+       Size            size;
+
+       size = sizeof(UnpackedUndoRecord);
+
+       /* Add payload size if record contains payload data. */
+       if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+       {
+               size += uur->uur_payload.len;
+               size += uur->uur_tuple.len;
+       }
+
+       return size;
+}
+
+/*
+ * Initiate inserting an undo record.
+ *
+ * This function will initialize the context for inserting and undo record
+ * which will be inserted by calling InsertUndoData.
+ */
+void
+BeginInsertUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur)
+{
+       ucontext->stage = UNDO_PACK_STAGE_HEADER;
+       ucontext->already_processed = 0;
+       ucontext->partial_bytes = 0;
+
+       /* Copy undo record header. */
+       ucontext->urec_hd.urec_type = uur->uur_type;
+       ucontext->urec_hd.urec_info = uur->uur_info;
+
+       /* Copy undo record transaction header if it is present. */
+       if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+               memcpy(&ucontext->urec_txn, uur->uur_txn, SizeOfUndoRecordTransaction);
+
+       /* Copy rmid if present. */
+       if ((uur->uur_info & UREC_INFO_RMID) != 0)
+               ucontext->urec_rmid = uur->uur_rmid;
+
+       /* Copy reloid if present. */
+       if ((uur->uur_info & UREC_INFO_RELOID) != 0)
+               ucontext->urec_reloid = uur->uur_reloid;
+
+       /* Copy fxid if present. */
+       if ((uur->uur_info & UREC_INFO_XID) != 0)
+               ucontext->urec_fxid = uur->uur_fxid;
+
+       /* Copy cid if present. */
+       if ((uur->uur_info & UREC_INFO_CID) != 0)
+               ucontext->urec_cid = uur->uur_cid;
+
+       /* Copy undo record relation header if it is present. */
+       if ((uur->uur_info & UREC_INFO_FORK) != 0)
+               ucontext->urec_fork = uur->uur_fork;
+
+       /* Copy prev undo record pointer if it is present. */
+       if ((uur->uur_info & UREC_INFO_PREVUNDO) != 0)
+               ucontext->urec_prevundo = uur->uur_prevundo;
+
+       /* Copy undo record block header if it is present. */
+       if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+       {
+               ucontext->urec_blk.urec_block = uur->uur_block;
+               ucontext->urec_blk.urec_offset = uur->uur_offset;
+       }
+
+       /* Copy undo record log switch header if it is present. */
+       if ((uur->uur_info & UREC_INFO_LOGSWITCH) != 0)
+               memcpy(&ucontext->urec_logswitch, uur->uur_logswitch,
+                          SizeOfUndoRecordLogSwitch);
+
+       /* Copy undo record payload header and data if it is present. */
+       if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+       {
+               ucontext->urec_payload.urec_payload_len = uur->uur_payload.len;
+               ucontext->urec_payload.urec_tuple_len = uur->uur_tuple.len;
+               ucontext->urec_payloaddata = uur->uur_payload.data;
+               ucontext->urec_tupledata = uur->uur_tuple.data;
+       }
+       else
+       {
+               ucontext->urec_payload.urec_payload_len = 0;
+               ucontext->urec_payload.urec_tuple_len = 0;
+       }
+
+       /* Compute undo record expected size and store in the context. */
+       ucontext->undo_len = UndoRecordExpectedSize(uur);
+}
+
+/*
+ * Insert the undo record into the input page from the unpack undo context.
+ *
+ * Caller can  call this function multiple times until desired stage is reached.
+ * This will write the undo record into the page.
+ */
+void
+InsertUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
+{
+       char       *writeptr = (char *) page + starting_byte;
+       char       *endptr = (char *) page + BLCKSZ;
+
+       switch (ucontext->stage)
+       {
+               case UNDO_PACK_STAGE_HEADER:
+                       /* Insert undo record header. */
+                       if (!InsertUndoBytes((char *) &ucontext->urec_hd,
+                                                                SizeOfUndoRecordHeader, &writeptr, endptr,
+                                                                &ucontext->already_processed,
+                                                                &ucontext->partial_bytes))
+                               return;
+                       ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_TRANSACTION:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+                       {
+                               /* Insert undo record transaction header. */
+                               if (!InsertUndoBytes((char *) &ucontext->urec_txn,
+                                                                        SizeOfUndoRecordTransaction,
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_RMID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_RMID:
+                       /* Write rmid(if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_RMID) != 0)
+                       {
+                               if (!InsertUndoBytes((char *) &(ucontext->urec_rmid), sizeof(RmgrId),
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_RELOID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_RELOID:
+                       /* Write reloid(if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_RELOID) != 0)
+                       {
+                               if (!InsertUndoBytes((char *) &(ucontext->urec_reloid), sizeof(Oid),
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_XID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_XID:
+                       /* Write xid(if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_XID) != 0)
+                       {
+                               if (!InsertUndoBytes((char *) &(ucontext->urec_fxid), sizeof(FullTransactionId),
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_CID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_CID:
+                       /* Write cid(if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_CID) != 0)
+                       {
+                               if (!InsertUndoBytes((char *) &(ucontext->urec_cid), sizeof(CommandId),
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_FORKNUM:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+                       {
+                               /* Insert undo record fork number. */
+                               if (!InsertUndoBytes((char *) &ucontext->urec_fork,
+                                                                        sizeof(ForkNumber),
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_PREVUNDO;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_PREVUNDO:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_PREVUNDO) != 0)
+                       {
+                               /* Insert undo record blkprev. */
+                               if (!InsertUndoBytes((char *) &ucontext->urec_prevundo,
+                                                                        sizeof(UndoRecPtr),
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_BLOCK:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+                       {
+                               /* Insert undo record block header. */
+                               if (!InsertUndoBytes((char *) &ucontext->urec_blk,
+                                                                        SizeOfUndoRecordBlock,
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_LOGSWITCH;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_LOGSWITCH:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_LOGSWITCH) != 0)
+                       {
+                               /* Insert undo record transaction header. */
+                               if (!InsertUndoBytes((char *) &ucontext->urec_logswitch,
+                                                                        SizeOfUndoRecordLogSwitch,
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_PAYLOAD:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+                       {
+                               /* Insert undo record payload header. */
+                               if (!InsertUndoBytes((char *) &ucontext->urec_payload,
+                                                                        SizeOfUndoRecordPayload,
+                                                                        &writeptr, endptr,
+                                                                        &ucontext->already_processed,
+                                                                        &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_PAYLOAD_DATA:
+                       {
+                               int                     len = ucontext->urec_payload.urec_payload_len;
+
+                               if (len > 0)
+                               {
+                                       /* Insert payload data. */
+                                       if (!InsertUndoBytes((char *) ucontext->urec_payloaddata,
+                                                                                len, &writeptr, endptr,
+                                                                                &ucontext->already_processed,
+                                                                                &ucontext->partial_bytes))
+                                               return;
+                               }
+                               ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+                       }
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_TUPLE_DATA:
+                       {
+                               int                     len = ucontext->urec_payload.urec_tuple_len;
+
+                               if (len > 0)
+                               {
+                                       /* Insert tuple data. */
+                                       if (!InsertUndoBytes((char *) ucontext->urec_tupledata,
+                                                                                len, &writeptr, endptr,
+                                                                                &ucontext->already_processed,
+                                                                                &ucontext->partial_bytes))
+                                               return;
+                               }
+                               ucontext->stage = UNDO_PACK_STAGE_UNDO_LENGTH;
+                       }
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_UNDO_LENGTH:
+                       /* Insert undo length. */
+                       if (!InsertUndoBytes((char *) &ucontext->undo_len,
+                                                                sizeof(uint16), &writeptr, endptr,
+                                                                &ucontext->already_processed,
+                                                                &ucontext->partial_bytes))
+                               return;
+
+                       ucontext->stage = UNDO_PACK_STAGE_DONE;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_DONE:
+                       /* Nothing to be done. */
+                       break;
+
+               default:
+                       Assert(0);                      /* Invalid stage */
+       }
+}
+
+/*
+ * Skip inserting undo record
+ *
+ * Don't insert the actual undo record instead just update the context data
+ * so that if we need to insert the remaining partial record to the next
+ * block then we have right context.
+ */
+void
+SkipInsertingUndoData(UndoPackContext *ucontext, int bytes_to_skip)
+{
+       switch (ucontext->stage)
+       {
+               case UNDO_PACK_STAGE_HEADER:
+                       if (bytes_to_skip < SizeOfUndoRecordHeader)
+                       {
+                               ucontext->partial_bytes = bytes_to_skip;
+                               return;
+                       }
+                       bytes_to_skip -= SizeOfUndoRecordHeader;
+                       ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_TRANSACTION:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+                       {
+                               if (bytes_to_skip < SizeOfUndoRecordTransaction)
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= SizeOfUndoRecordTransaction;
+                       }
+
+                       ucontext->stage = UNDO_PACK_STAGE_RMID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_RMID:
+                       /* Write rmid (if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_RMID) != 0)
+                       {
+                               if (bytes_to_skip < (sizeof(RmgrId)))
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= sizeof(RmgrId);
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_RELOID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_RELOID:
+                       /* Write reloid (if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_RELOID) != 0)
+                       {
+                               if (bytes_to_skip < sizeof(Oid))
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= sizeof(Oid);
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_XID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_XID:
+                       /* Write xid (if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_XID) != 0)
+                       {
+                               if (bytes_to_skip < (sizeof(TransactionId)))
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= sizeof(TransactionId);
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_CID;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_CID:
+                       /* Write cid (if needed and not already done). */
+                       if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_CID) != 0)
+                       {
+                               if (bytes_to_skip < sizeof(CommandId))
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= sizeof(CommandId);
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_FORKNUM:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+                       {
+                               if (bytes_to_skip < sizeof(ForkNumber))
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= sizeof(ForkNumber);
+                       }
+
+                       ucontext->stage = UNDO_PACK_STAGE_PREVUNDO;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_PREVUNDO:
+                       if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_PREVUNDO) != 0)
+                       {
+                               if (bytes_to_skip < sizeof(UndoRecPtr))
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= sizeof(UndoRecPtr);
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_BLOCK:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+                       {
+                               if (bytes_to_skip < SizeOfUndoRecordBlock)
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= SizeOfUndoRecordBlock;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_LOGSWITCH;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_LOGSWITCH:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_LOGSWITCH) != 0)
+                       {
+                               if (bytes_to_skip < SizeOfUndoRecordLogSwitch)
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= SizeOfUndoRecordLogSwitch;
+                       }
+
+                       ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_PAYLOAD:
+                       /* Skip payload header. */
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+                       {
+                               if (bytes_to_skip < SizeOfUndoRecordPayload)
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= SizeOfUndoRecordPayload;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_PAYLOAD_DATA:
+                       if (ucontext->urec_payload.urec_payload_len > 0)
+                       {
+                               if (bytes_to_skip < ucontext->urec_payload.urec_payload_len)
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= ucontext->urec_payload.urec_payload_len;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_TUPLE_DATA:
+                       if (ucontext->urec_payload.urec_tuple_len > 0)
+                       {
+                               if (bytes_to_skip < ucontext->urec_payload.urec_tuple_len)
+                               {
+                                       ucontext->partial_bytes = bytes_to_skip;
+                                       return;
+                               }
+                               bytes_to_skip -= ucontext->urec_payload.urec_tuple_len;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_UNDO_LENGTH;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_UNDO_LENGTH:
+                       ucontext->stage = UNDO_PACK_STAGE_DONE;
+                        /* fall through */ ;
+
+               case UNDO_PACK_STAGE_DONE:
+                       /* Nothing to be done. */
+                       break;
+
+               default:
+                       Assert(0);                      /* Invalid stage */
+       }
+}
+
+/*
+ * Write undo bytes from a particular source, but only to the extent that
+ * they weren't written previously and will fit.
+ *
+ * 'sourceptr' points to the source data, and 'sourcelen' is the length of
+ * that data in bytes.
+ *
+ * 'writeptr' points to the insertion point for these bytes, and is updated
+ * for whatever we write.  The insertion point must not pass 'endptr', which
+ * represents the end of the buffer into which we are writing.
+ *
+ * 'my_bytes_written' is a pointer to the count of previous-written bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.
+ *
+ * 'total_bytes_written' points to the count of all previously-written bytes,
+ * and must it must be updated for the bytes we write.
+ *
+ * The return value is false if we ran out of space before writing all
+ * the bytes, and otherwise true.
+ */
+static bool
+InsertUndoBytes(char *sourceptr, int sourcelen, char **writeptr, char *endptr,
+                               int *total_bytes_written, int *partial_write)
+{
+       int                     can_write;
+       int                     remaining;
+
+       /* Compute number of bytes we can write. */
+       remaining = sourcelen - *partial_write;
+       can_write = Min(remaining, endptr - *writeptr);
+
+       /* Bail out if no bytes can be written. */
+       if (can_write == 0)
+               return false;
+
+       /* Copy the bytes we can write. */
+       memcpy(*writeptr, sourceptr + *partial_write, can_write);
+
+       /* Update bookkeeping information. */
+       *writeptr += can_write;
+       *total_bytes_written += can_write;
+
+       /* Could not read whole data so set the partial_read. */
+       if (can_write < remaining)
+       {
+               *partial_write += can_write;
+               return false;
+       }
+
+       /* Return true only if we wrote the whole thing. */
+       *partial_write = 0;
+       return true;
+}
+
+/*
+ * Initiate unpacking an undo record.
+ *
+ * This function will initialize the context for unpacking the undo record which
+ * will be unpacked by calling UnpackUndoData.
+ */
+void
+BeginUnpackUndo(UndoPackContext *ucontext)
+{
+       ucontext->stage = UNDO_PACK_STAGE_HEADER;
+       ucontext->already_processed = 0;
+       ucontext->partial_bytes = 0;
+}
+
+/*
+ * Read the undo record from the input page to the unpack undo context.
+ *
+ * Caller can  call this function multiple times until desired stage is reached.
+ * This will read the undo record from the page and store the data into unpack
+ * undo context, which can be later copied to unpacked undo record by calling
+ * FinishUnpackUndo.
+ */
+void
+UnpackUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
+{
+       char       *readptr = (char *) page + starting_byte;
+       char       *endptr = (char *) page + BLCKSZ;
+
+       switch (ucontext->stage)
+       {
+               case UNDO_PACK_STAGE_HEADER:
+                       if (!ReadUndoBytes((char *) &ucontext->urec_hd,
+                                                          SizeOfUndoRecordHeader, &readptr, endptr,
+                                                          &ucontext->already_processed,
+                                                          &ucontext->partial_bytes))
+                               return;
+                       ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+                       /* fall through */
+               case UNDO_PACK_STAGE_TRANSACTION:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_txn,
+                                                                  SizeOfUndoRecordTransaction,
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_RMID;
+                       /* fall through */
+               case UNDO_PACK_STAGE_RMID:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_RMID) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_rmid,
+                                                                  sizeof(RmgrId),
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_RELOID;
+                       /* fall through */
+               case UNDO_PACK_STAGE_RELOID:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_RELOID) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_reloid,
+                                                                  sizeof(Oid),
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_XID;
+                       /* fall through */
+               case UNDO_PACK_STAGE_XID:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_XID) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_fxid,
+                                                                  sizeof(FullTransactionId),
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_CID;
+                       /* fall through */
+               case UNDO_PACK_STAGE_CID:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_CID) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_cid,
+                                                                  sizeof(CommandId),
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+                       /* fall through */
+               case UNDO_PACK_STAGE_FORKNUM:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_fork,
+                                                                  sizeof(ForkNumber),
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_PREVUNDO;
+                       /* fall through */
+               case UNDO_PACK_STAGE_PREVUNDO:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_PREVUNDO) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_prevundo,
+                                                                  sizeof(UndoRecPtr),
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+                       /* fall through */
+
+               case UNDO_PACK_STAGE_BLOCK:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_blk,
+                                                                  SizeOfUndoRecordBlock,
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_LOGSWITCH;
+                       /* fall through */
+               case UNDO_PACK_STAGE_LOGSWITCH:
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_LOGSWITCH) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_logswitch,
+                                                                  SizeOfUndoRecordLogSwitch,
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+                       /* fall through */
+               case UNDO_PACK_STAGE_PAYLOAD:
+                       /* Read payload header. */
+                       if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+                       {
+                               if (!ReadUndoBytes((char *) &ucontext->urec_payload,
+                                                                  SizeOfUndoRecordPayload,
+                                                                  &readptr, endptr, &ucontext->already_processed,
+                                                                  &ucontext->partial_bytes))
+                                       return;
+                       }
+                       ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+                       /* fall through */
+               case UNDO_PACK_STAGE_PAYLOAD_DATA:
+                       {
+                               int                     len = ucontext->urec_payload.urec_payload_len;
+
+                               /* Allocate memory for the payload data if not already done. */
+                               if (len > 0)
+                               {
+                                       if (ucontext->urec_payloaddata == NULL)
+                                               ucontext->urec_payloaddata = (char *) palloc(len);
+
+                                       /* Read payload data. */
+                                       if (!ReadUndoBytes((char *) ucontext->urec_payloaddata, len,
+                                                                          &readptr, endptr, &ucontext->already_processed,
+                                                                          &ucontext->partial_bytes))
+                                               return;
+                               }
+                               ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+                               /* fall through */
+                       }
+               case UNDO_PACK_STAGE_TUPLE_DATA:
+                       {
+                               int                     len = ucontext->urec_payload.urec_tuple_len;
+
+                               /* Allocate memory for the tuple data if not already done. */
+                               if (len > 0)
+                               {
+                                       if (ucontext->urec_tupledata == NULL)
+                                               ucontext->urec_tupledata = (char *) palloc(len);
+
+                                       /* Read tuple data. */
+                                       if (!ReadUndoBytes((char *) ucontext->urec_tupledata, len,
+                                                                          &readptr, endptr, &ucontext->already_processed,
+                                                                          &ucontext->partial_bytes))
+                                               return;
+                               }
+
+                               ucontext->stage = UNDO_PACK_STAGE_DONE;
+                               /* fall through */
+                       }
+               case UNDO_PACK_STAGE_DONE:
+                       /* Nothing to be done. */
+                       break;
+               default:
+                       Assert(0);                      /* Invalid stage */
+       }
+
+       return;
+}
+
+/*
+ * Final step of unpacking the undo record.
+ *
+ * Copy the undo record data from the unpack undo context to the input unpacked
+ * undo record.
+ */
+void
+FinishUnpackUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur)
+{
+       /* Copy undo record header. */
+       uur->uur_type = ucontext->urec_hd.urec_type;
+       uur->uur_info = ucontext->urec_hd.urec_info;
+
+       /* Copy undo record transaction header if it is present. */
+       if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+       {
+               uur->uur_txn = palloc(SizeOfUndoRecordTransaction);
+               memcpy(uur->uur_txn, &ucontext->urec_txn, SizeOfUndoRecordTransaction);
+       }
+
+       /*
+        * Copy the common field.  All of these field must present in the final
+        * unpacked undo record.
+        */
+       Assert((uur->uur_info & UREC_INFO_PAGE_COMMON) == UREC_INFO_PAGE_COMMON);
+
+       uur->uur_rmid = ucontext->urec_rmid;
+       uur->uur_reloid = ucontext->urec_reloid;
+       uur->uur_fxid = ucontext->urec_fxid;
+       uur->uur_cid = ucontext->urec_cid;
+
+       /* Copy undo record relation header if it is present. */
+       if ((uur->uur_info & UREC_INFO_FORK) != 0)
+               uur->uur_fork = ucontext->urec_fork;
+
+       /* Copy previous undo record pointer if it is present. */
+       if ((uur->uur_info & UREC_INFO_PREVUNDO) != 0)
+               uur->uur_prevundo = ucontext->urec_prevundo;
+
+       /* Copy undo record block header if it is present. */
+       if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+       {
+               uur->uur_block = ucontext->urec_blk.urec_block;
+               uur->uur_offset = ucontext->urec_blk.urec_offset;
+       }
+
+       /* Copy undo record log switch header if it is present. */
+       if ((uur->uur_info & UREC_INFO_LOGSWITCH) != 0)
+       {
+               uur->uur_logswitch = palloc(SizeOfUndoRecordLogSwitch);
+               memcpy(uur->uur_logswitch, &ucontext->urec_logswitch,
+                          SizeOfUndoRecordLogSwitch);
+       }
+
+       /* Copy undo record payload header and data if it is present. */
+       if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+       {
+               uur->uur_payload.len = ucontext->urec_payload.urec_payload_len;
+               uur->uur_tuple.len = ucontext->urec_payload.urec_tuple_len;
+
+               /* Read payload data if its length is not 0. */
+               if (uur->uur_payload.len != 0)
+                       uur->uur_payload.data = ucontext->urec_payloaddata;
+
+               /* Read tuple data if its length is not 0. */
+               if (uur->uur_tuple.len != 0)
+                       uur->uur_tuple.data = ucontext->urec_tupledata;
+       }
+}
+
+/*
+ * Read undo bytes into a particular destination,
+ *
+ * 'destptr' points to the source data, and 'readlen' is the length of
+ * that data to be read in bytes.
+ *
+ * 'readptr' points to the read point for these bytes, and is updated
+ * for how much we read.  The read point must not pass 'endptr', which
+ * represents the end of the buffer from which we are reading.
+ *
+ * 'partial_read' is a pointer to the count of previous partial read bytes
+ *
+ * 'total_bytes_read' points to the count of all previously-read bytes,
+ * and must likewise be updated for the bytes we read.
+ *
+ * nocopy if this flag is set true then it will just skip the readlen
+ * size in undo but it will not copy into the buffer.
+ *
+ * The return value is false if we ran out of space before read all
+ * the bytes, and otherwise true.
+ */
+static bool
+ReadUndoBytes(char *destptr, int readlen, char **readptr, char *endptr,
+                         int *total_bytes_read, int *partial_read)
+{
+       int                     can_read;
+       int                     remaining;
+
+       /* Compute number of bytes we can read. */
+       remaining = readlen - *partial_read;
+       can_read = Min(remaining, endptr - *readptr);
+
+       /* Bail out if no bytes can be read. */
+       if (can_read == 0)
+               return false;
+
+       /* Copy the bytes we can read. */
+       memcpy(destptr + *partial_read, *readptr, can_read);
+
+       /* Update bookkeeping information. */
+       *readptr += can_read;
+       *total_bytes_read += can_read;
+
+       /* Could not read whole data so set the partial_read. */
+       if (can_read < remaining)
+       {
+               *partial_read += can_read;
+               return false;
+       }
+
+       /* Return true only if we wrote the whole thing. */
+       *partial_read = 0;
+
+       return true;
+}
+
+/*
+ * Set uur_info for an UnpackedUndoRecord appropriately based on which
+ * fields are set.
+ *
+ * Other flags i.e UREC_INFO_TRANSACTION, UREC_INFO_PAGE_COMMON and,
+ * UREC_INFO_LOGSWITCH are directly set by the PrepareUndoInsert function.
+ */
+void
+UndoRecordSetInfo(UnpackedUndoRecord *uur)
+{
+       /*
+        * If fork number is not the main fork then we need to store it in the
+        * undo record so set the flag.
+        */
+       if (uur->uur_fork != MAIN_FORKNUM)
+               uur->uur_info |= UREC_INFO_FORK;
+
+       /* If prevundo is valid undo record pointer then set the flag. */
+       if (uur->uur_prevundo != InvalidUndoRecPtr)
+               uur->uur_info |= UREC_INFO_PREVUNDO;
+
+       /* If the block number is valid then set the flag for the block header. */
+       if (uur->uur_block != InvalidBlockNumber)
+               uur->uur_info |= UREC_INFO_BLOCK;
+
+       /*
+        * Either of the payload or the tuple length is non-zero then we need the
+        * payload header.
+        */
+       if (uur->uur_payload.len || uur->uur_tuple.len)
+               uur->uur_info |= UREC_INFO_PAYLOAD;
+}
index 6b49810e37802851c9abb6746d5b097d288db5d3..f5133c78e86783d6bc67fc83545a30553a03aca1 100644 (file)
@@ -59,6 +59,34 @@ PageInit(Page page, Size pageSize, Size specialSize)
        /* p->pd_prune_xid = InvalidTransactionId;              done by above MemSet */
 }
 
+/*
+ * UndoPageInit
+ *             Initializes the contents of an undo page.
+ *             Note that we don't calculate an initial checksum here; that's not done
+ *             until it's time to write.
+ */
+void
+UndoPageInit(Page page, Size pageSize, uint16 uur_info, uint16 record_offset,
+                        uint16 tuple_len, uint16 payload_len)
+{
+       UndoPageHeader  p = (UndoPageHeader) page;
+
+       Assert(pageSize == BLCKSZ);
+
+       /* Make sure all fields of page are zero, as well as unused space. */
+       MemSet(p, 0, pageSize);
+
+       p->pd_flags = 0;
+       p->pd_lower = SizeOfUndoPageHeaderData;
+       p->pd_upper = pageSize;
+       p->pd_special = pageSize;
+       p->uur_info = uur_info;
+       p->record_offset = record_offset;
+       p->tuple_len = tuple_len;
+       p->payload_len = payload_len;
+       PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
+}
+
 
 /*
  * PageIsVerified
index 33fd052156f86bb7f71a4212b5df5c7a386cd742..cc005096991f428220eda16210998b8ebac37301 100644 (file)
@@ -47,6 +47,7 @@
 #define EpochFromFullTransactionId(x)  ((uint32) ((x).value >> 32))
 #define XidFromFullTransactionId(x)            ((uint32) (x).value)
 #define U64FromFullTransactionId(x)            ((x).value)
+#define FullTransactionIdEquals(a, b)  ((a).value == (b).value)
 #define FullTransactionIdPrecedes(a, b)        ((a).value < (b).value)
 #define FullTransactionIdIsValid(x)            TransactionIdIsValid(XidFromFullTransactionId(x))
 #define InvalidFullTransactionId               FullTransactionIdFromEpochAndXid(0, InvalidTransactionId)
diff --git a/src/include/access/undoaccess.h b/src/include/access/undoaccess.h
new file mode 100644 (file)
index 0000000..f7cfa9f
--- /dev/null
@@ -0,0 +1,121 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoaccess.h
+ *       entry points for inserting/fetching undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoaccess.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOACCESS_H
+#define UNDOACCESS_H
+
+#include "access/undolog.h"
+#include "access/undorecord.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+
+/*
+ * XXX Do we want to support undo tuple size which is more than the BLCKSZ
+ * if not than undo record can spread across 2 buffers at the max.
+ */
+#define MAX_BUFFER_PER_UNDO    2
+
+/*
+ * Maximum number of the XactUndoRecordInfo for updating the transaction header.
+ * Usually it's 1 for updating next link of previous transaction's header
+ * if we are starting a new transaction.  But, in some cases where the same
+ * transaction is spilled to the next log, we update our own transaction's
+ * header in previous undo log as well as the header of the previous transaction
+ * in the new log.
+ */
+#define MAX_XACT_UNDO_INFO     2
+
+typedef struct PreparedUndoSpace PreparedUndoSpace;
+typedef struct PreparedUndoBuffer PreparedUndoBuffer;
+
+/*
+ * Undo record element.  Used for storing the group of undo record in a array
+ * using UndoBulkFetchRecord.
+ */
+typedef struct UndoRecInfo
+{
+       int                     index;                  /* Index of the element.  For stable qsort. */
+       UndoRecPtr      urp;                    /* undo recptr (undo record location). */
+       UnpackedUndoRecord *uur;        /* actual undo record. */
+} UndoRecInfo;
+
+/*
+ * This structure holds the informations for updating the transaction's undo
+ * record header (first undo record of the transaction).  We need to update the
+ * transaction header for various purposes a) updating the next undo record
+ * pointer for maintaining the transactions chain inside a undo log
+ * b) updating the undo apply progress in the transaction header.  During
+ * prepare phase we will keep all the information handy in this structure and
+ * that will be used for updating the actual record inside the critical section.
+ */
+typedef struct XactUndoRecordInfo
+{
+       UndoRecPtr      urecptr;                /* Undo record pointer to be updated. */
+       uint32          offset;                 /* offset in page where to start updating. */
+       UndoRecPtr      next;                   /* first urp of the next transaction which is
+                                                                * be updated in transaction header */
+       BlockNumber progress;           /* undo apply action progress. */
+       int                     idx_undo_buffers[MAX_BUFFER_PER_UNDO];
+} XactUndoRecordInfo;
+
+/*
+ * Context for preparing and inserting undo records..
+ */
+typedef struct UndoRecordInsertContext
+{
+       UndoLogAllocContext alloc_context;
+       PreparedUndoSpace *prepared_undo;       /* prepared undo. */
+       PreparedUndoBuffer *prepared_undo_buffers;      /* Buffers for prepared undo. */
+       XactUndoRecordInfo xact_urec_info[MAX_XACT_UNDO_INFO];  /* Information for
+                                                                                                                        * Updating transaction
+                                                                                                                        * header. */
+       UndoCompressionInfo undo_compression_info[UndoLogCategories];   /* Compression info. */
+       int                     nprepared_undo; /* Number of prepared undo records. */
+       int                     max_prepared_undo;      /* Max prepared undo for this operation. */
+       int                     nprepared_undo_buffer;  /* Number of undo buffers. */
+       int                     nxact_urec_info;        /* Number of previous xact info. */
+} UndoRecordInsertContext;
+
+/*
+ * Context for fetching the required undo record.
+ */
+typedef struct UndoRecordFetchContext
+{
+       Buffer          buffer;                 /* Previous undo record pinned buffer. */
+       UndoRecPtr      urp;                    /* Previous undo record pointer. */
+} UndoRecordFetchContext;
+
+extern void BeginUndoRecordInsert(UndoRecordInsertContext *context,
+                                                                 UndoLogCategory category,
+                                                                 int nprepared,
+                                                                 XLogReaderState *xlog_record);
+extern UndoRecPtr PrepareUndoInsert(UndoRecordInsertContext *context,
+                                                                       UnpackedUndoRecord *urec, Oid dbid);
+extern void InsertPreparedUndo(UndoRecordInsertContext *context);
+extern void FinishUndoRecordInsert(UndoRecordInsertContext *context);
+extern void BeginUndoFetch(UndoRecordFetchContext *context);
+extern UnpackedUndoRecord *UndoFetchRecord(UndoRecordFetchContext *context,
+                                                                                  UndoRecPtr urp);
+extern void FinishUndoFetch(UndoRecordFetchContext *context);
+extern void UndoRecordRelease(UnpackedUndoRecord *urec);
+extern UndoRecInfo *UndoBulkFetchRecord(UndoRecPtr *from_urecptr,
+                                                                               UndoRecPtr to_urecptr,
+                                                                               int undo_apply_size, int *nrecords,
+                                                                               bool one_page);
+extern void RegisterUndoLogBuffers(UndoRecordInsertContext *context,
+                                                                  uint8 first_block_id);
+extern void UndoLogBuffersSetLSN(UndoRecordInsertContext *context,
+                                                                XLogRecPtr recptr);
+extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
+                                                                               UndoLogCategory category);
+
+#endif                                                 /* UNDOINSERT_H */
index 1c79c1ff32df20cf86d9a8beb717434a96057b1a..7ec4cb0ec1447de4060811ad1fdd9bee2cf32013 100644 (file)
@@ -139,7 +139,7 @@ typedef int UndoLogNumber;
        (((uint64) (logno) << UndoLogOffsetBits) | (offset))
 
 /* The number of unusable bytes in the header of each block. */
-#define UndoLogBlockHeaderSize SizeOfPageHeaderData
+#define UndoLogBlockHeaderSize SizeOfUndoPageHeaderData
 
 /* The number of usable bytes we can store per block. */
 #define UndoLogUsableBytesPerPage (BLCKSZ - UndoLogBlockHeaderSize)
@@ -169,6 +169,10 @@ typedef int UndoLogNumber;
 #define UndoRecPtrGetPageOffset(urp)                   \
        (UndoRecPtrGetOffset(urp) % BLCKSZ)
 
+/* Compute the undo record pointer offset given the undo rec page offset and the block number. */
+#define UndoRecPageOffsetGetRecPtr(offset, blkno)             \
+       ((blkno * BLCKSZ) + offset)
+
 /* Compare two undo checkpoint files to find the oldest file. */
 #define UndoCheckPointFilenamePrecedes(file1, file2)   \
        (strcmp(file1, file2) < 0)
@@ -207,7 +211,7 @@ typedef int UndoLogNumber;
  */
 typedef struct UndoLogUnloggedMetaData
 {
-       UndoLogOffset insert;                   /* next insertion point (head) */
+       UndoLogOffset insert;           /* next insertion point (head) */
        UndoLogOffset last_xact_start;  /* last transaction's first byte in this log */
        UndoLogOffset this_xact_start;  /* this transaction's first byte in this log */
        TransactionId xid;                              /* currently attached/writing xid */
@@ -352,7 +356,7 @@ extern PGDLLIMPORT undologtable_hash *undologtable_cache;
 static pg_attribute_always_inline UndoLogTableEntry *
 UndoLogGetTableEntry(UndoLogNumber logno)
 {
-       UndoLogTableEntry  *entry;
+       UndoLogTableEntry *entry;
 
        /* Fast path. */
        entry = undologtable_lookup(undologtable_cache, logno);
diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h
new file mode 100644 (file)
index 0000000..0653267
--- /dev/null
@@ -0,0 +1,279 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.h
+ *       encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorecord.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDORECORD_H
+#define UNDORECORD_H
+
+#include "access/undolog.h"
+#include "access/transam.h"
+#include "lib/stringinfo.h"
+#include "storage/block.h"
+#include "storage/bufpage.h"
+#include "storage/buf.h"
+#include "storage/off.h"
+
+/*
+ * The below common information will be stored in the first undo record of the page.
+ * Every subsequent undo record will not store this information, if required this information
+ * will be retrieved from the first undo record of the page.
+ */
+typedef struct UndoCompressionInfo
+{
+       bool            valid;                  /* Undo compression info is valid ? */
+       UndoRecPtr      last_urecptr;   /* last undo rec */
+       FullTransactionId fxid;         /* transaction id */
+       RmgrId          rmid;                   /* rmgr ID */
+       Oid                     reloid;                 /* relation OID */
+       CommandId       cid;                    /* command id */
+} UndoCompressionInfo;
+
+/*
+ * If UREC_INFO_TRANSACTION is set, an UndoRecordTransaction structure
+ * follows.
+ * If UREC_INFO_RMID is set, rmgr id follows.
+ * if UREC_INFO_RELOID is set, relation oid follows.
+ * If UREC_INFO_XID    is set, full transaction id follows.
+ * If UREC_INFO_CID    is set, command id follows.
+ * If UREC_INFO_FORK is set, fork number follows.
+ * If UREC_INFO_PREVUNDO is set, previous undo record pointer follows.
+ * If UREC_INFO_BLOCK is set, an UndoRecordBlock structure follows.
+ * If UREC_INFO_LOGSWITCH is set, an UndoRecordLogSwitch structure follows.
+ * If UREC_INFO_PAYLOAD is set, an UndoRecordPayload structure follows.
+ *
+ * When (as will often be the case) multiple structures are present, they
+ * appear in the same order in which the constants are defined here.  That is,
+ * UndoRecordTransaction appears first.
+ */
+#define UREC_INFO_TRANSACTION                          0x001
+#define UREC_INFO_RMID                                         0x002
+#define UREC_INFO_RELOID                                       0x004
+#define UREC_INFO_XID                                          0x008
+#define UREC_INFO_CID                                          0x010
+#define UREC_INFO_FORK                                         0x020
+#define UREC_INFO_PREVUNDO                                     0x040
+#define UREC_INFO_BLOCK                                                0x080
+#define UREC_INFO_LOGSWITCH                                    0x100
+#define UREC_INFO_PAYLOAD                                      0x200
+
+#define UREC_INFO_PAGE_COMMON  (UREC_INFO_RMID | UREC_INFO_RELOID | UREC_INFO_XID | UREC_INFO_CID)
+
+/*
+ * Every undo record begins with an UndoRecordHeader structure, which is
+ * followed by the additional structures indicated by the contents of
+ * urec_info.  All structures are packed into the alignment without padding
+ * bytes, and the undo record itself need not be aligned either, so care
+ * must be taken when reading the header.
+ */
+typedef struct UndoRecordHeader
+{
+       uint8           urec_type;              /* record type code */
+       uint16          urec_info;              /* flag bits */
+} UndoRecordHeader;
+
+#define SizeOfUndoRecordHeader \
+       (offsetof(UndoRecordHeader, urec_info) + sizeof(uint16))
+
+/*
+ * Information for a transaction to which this undo belongs.  This
+ * also stores the dbid and the progress of the undo apply during rollback.
+ */
+typedef struct UndoRecordTransaction
+{
+       /*
+        * Undo block number where we need to start reading the undo for applying
+        * the undo action.   InvalidBlockNumber means undo applying hasn't
+        * started for the transaction and MaxBlockNumber mean undo completely
+        * applied. And, any other block number means we have applied partial undo
+        * so next we can start from this block.
+        */
+       BlockNumber urec_progress;
+       Oid                     urec_dbid;              /* database id */
+       UndoRecPtr      urec_next;              /* urec pointer of the next transaction */
+} UndoRecordTransaction;
+
+#define SizeOfUndoRecordTransaction \
+       (offsetof(UndoRecordTransaction, urec_next) + sizeof(UndoRecPtr))
+
+/*
+ *  Information for a block to which this record pertains.
+ */
+typedef struct UndoRecordBlock
+{
+       BlockNumber urec_block;         /* block number */
+       OffsetNumber urec_offset;       /* offset number */
+} UndoRecordBlock;
+
+#define SizeOfUndoRecordBlock \
+       (offsetof(UndoRecordBlock, urec_offset) + sizeof(OffsetNumber))
+
+/*
+ * Information of the transaction's undo in the previous log.  If a transaction
+ * is split across the undo logs then this header will be included in the first
+ * undo record of the transaction in next log.
+ */
+typedef struct UndoRecordLogSwitch
+{
+       UndoRecPtr      urec_prevurp;   /* Transaction's last undo record pointer in
+                                                                * the previous undo log. */
+       UndoRecPtr      urec_prevlogstart;      /* Transaction's first undo record pointer
+                                                                        * in previous undo log. */
+} UndoRecordLogSwitch;
+
+#define SizeOfUndoRecordLogSwitch \
+       (offsetof(UndoRecordLogSwitch, urec_prevlogstart) + sizeof(UndoRecPtr))
+
+/*
+ * Information about the amount of payload data and tuple data present
+ * in this record.  The payload bytes immediately follow the structures
+ * specified by flag bits in urec_info, and the tuple bytes follow the
+ * payload bytes.
+ */
+typedef struct UndoRecordPayload
+{
+       uint16          urec_payload_len;       /* # of payload bytes */
+       uint16          urec_tuple_len; /* # of tuple bytes */
+} UndoRecordPayload;
+
+#define SizeOfUndoRecordPayload \
+       (offsetof(UndoRecordPayload, urec_tuple_len) + sizeof(uint16))
+
+typedef enum UndoPackStage
+{
+       UNDO_PACK_STAGE_HEADER,         /* We have not yet processed even the record
+                                                                * header; we need to do that next. */
+       UNDO_PACK_STAGE_TRANSACTION,    /* The next thing to be processed is the
+                                                                        * transaction details, if present. */
+       UNDO_PACK_STAGE_RMID,           /* The next thing to be processed is the rmid
+                                                                * if present */
+
+       UNDO_PACK_STAGE_RELOID,         /* The next thing to be processed is the
+                                                                * reloid if present */
+
+       UNDO_PACK_STAGE_XID,            /* The next thing to be processed is the xid
+                                                                * if present */
+
+       UNDO_PACK_STAGE_CID,            /* The next thing to be processed is the cid
+                                                                * if present */
+
+       UNDO_PACK_STAGE_FORKNUM,        /* The next thing to be processed is the
+                                                                * relation fork number, if present. */
+       UNDO_PACK_STAGE_PREVUNDO,       /* The next thing to be processed is the prev
+                                                                * undo info. */
+
+       UNDO_PACK_STAGE_BLOCK,          /* The next thing to be processed is the block
+                                                                * details, if present. */
+       UNDO_PACK_STAGE_LOGSWITCH,      /* The next thing to be processed is the log
+                                                                * switch details. */
+       UNDO_PACK_STAGE_PAYLOAD,        /* The next thing to be processed is the
+                                                                * payload details, if present */
+       UNDO_PACK_STAGE_PAYLOAD_DATA,   /* The next thing to be processed is the
+                                                                        * payload data */
+       UNDO_PACK_STAGE_TUPLE_DATA, /* The next thing to be processed is the tuple
+                                                                * data */
+       UNDO_PACK_STAGE_UNDO_LENGTH,    /* Next thing to processed is undo length. */
+
+       UNDO_PACK_STAGE_DONE            /* complete */
+} UndoPackStage;
+
+/*
+ * Undo record context for inserting/unpacking undo record.  This will hold
+ * intermediate state of undo record processed so far.
+ */
+typedef struct UndoPackContext
+{
+       UndoRecordHeader urec_hd;       /* Main header */
+       UndoRecordTransaction urec_txn; /* Transaction header */
+
+       RmgrId          urec_rmid;              /* rmgrid */
+       Oid                     urec_reloid;    /* relation OID */
+
+       /*
+        * Transaction id that has modified the tuple for which this undo record
+        * is written.  We use this to skip the undo records.  See comments atop
+        * function UndoFetchRecord.
+        */
+       FullTransactionId urec_fxid;    /* Transaction id */
+       CommandId       urec_cid;               /* command id */
+
+       ForkNumber      urec_fork;              /* Relation fork number */
+       UndoRecPtr      urec_prevundo;  /* Block prev */
+       UndoRecordBlock urec_blk;       /* Block header */
+       UndoRecordLogSwitch urec_logswitch; /* Log switch header */
+       UndoRecordPayload urec_payload; /* Payload data */
+       char       *urec_payloaddata;
+       char       *urec_tupledata;
+       uint16          undo_len;               /* Length of the undo record. */
+       int                     already_processed;      /* Number of bytes read/written so far */
+       int                     partial_bytes;  /* Number of partial bytes read/written */
+       UndoPackStage stage;            /* Undo pack stage */
+} UndoPackContext;
+
+/*
+ * Information that can be used to create an undo record or that can be
+ * extracted from one previously created.  The raw undo record format is
+ * difficult to manage, so this structure provides a convenient intermediate
+ * form that is easier for callers to manage.
+ *
+ * When creating an undo record from an UnpackedUndoRecord, caller should
+ * set uur_info to 0.  It will be initialized by the first call to
+ * UndoRecordSetInfo or InsertUndoRecord.  We do set it in
+ * UndoRecordAllocate for transaction specific header information.
+ *
+ * When an undo record is decoded into an UnpackedUndoRecord, all fields
+ * will be initialized, but those for which no information is available
+ * will be set to invalid or default values, as appropriate.
+ */
+typedef struct UnpackedUndoRecord
+{
+       RmgrId          uur_rmid;               /* rmgr ID */
+       uint8           uur_type;               /* record type code */
+       uint16          uur_info;               /* flag bits */
+       Oid                     uur_reloid;             /* relation OID */
+       CommandId       uur_cid;                /* command id */
+       ForkNumber      uur_fork;               /* fork number */
+       UndoRecPtr      uur_prevundo;   /* byte offset of previous undo for block */
+       BlockNumber uur_block;          /* block number */
+       OffsetNumber uur_offset;        /* offset number */
+       FullTransactionId uur_fxid; /* transaction id */
+       StringInfoData uur_payload; /* payload bytes */
+       StringInfoData uur_tuple;       /* tuple bytes */
+
+       /*
+        * Below header will be internally set by the undo layer.  Above this all
+        * information should be set by the caller.
+        */
+       UndoRecordTransaction *uur_txn; /* Transaction header, included in the
+                                                                        * first record of the transaction in a
+                                                                        * undo log. */
+       UndoRecordLogSwitch *uur_logswitch; /* Log switch header, included in the
+                                                                                * first record of the transaction
+                                                                                * only after undo log is switched
+                                                                                * during a transaction. */
+} UnpackedUndoRecord;
+
+extern Size UndoRecordHeaderSize(uint16 uur_info);
+extern Size UndoRecordExpectedSize(UnpackedUndoRecord *uur);
+extern Size UnpackedUndoRecordSize(UnpackedUndoRecord *uur);
+extern void BeginInsertUndo(UndoPackContext *ucontext,
+                                                       UnpackedUndoRecord *uur);
+extern void InsertUndoData(UndoPackContext *ucontext, Page page,
+                                                  int starting_byte);
+extern void SkipInsertingUndoData(UndoPackContext *ucontext,
+                                                                 int bytes_to_skip);
+extern void BeginUnpackUndo(UndoPackContext *ucontext);
+extern void UnpackUndoData(UndoPackContext *ucontext, Page page,
+                                                  int starting_byte);
+extern void FinishUnpackUndo(UndoPackContext *ucontext,
+                                                        UnpackedUndoRecord *uur);
+extern void UndoRecordSetInfo(UnpackedUndoRecord *uur);
+
+#endif                                                 /* UNDORECORD_H */
index 34b68ad0e0027a117b6647a663c2575c820b6042..93d108c207dfa082094238939a85b88b23ecd758 100644 (file)
@@ -215,6 +215,43 @@ typedef PageHeaderData *PageHeader;
  */
 #define SizeOfPageHeaderData (offsetof(PageHeaderData, pd_linp))
 
+/*
+ * Same as PageHeaderData + some additional information to detect partial
+ * undo record on a undo page.
+ *
+ * FIXME : for undo page do we need to keep all the information which is
+ * required for the PageHeaderData e.g. pd_lower, pd_upper, pd_special?
+ */
+typedef struct UndoPageHeaderData
+{
+       /* XXX LSN is member of *any* block, not only page-organized ones */
+       PageXLogRecPtr pd_lsn;          /* LSN: next byte after last byte of xlog
+                                                                * record for last change to this page */
+       uint16          pd_checksum;    /* checksum */
+       uint16          pd_flags;               /* flag bits, see below */
+       LocationIndex pd_lower;         /* offset to start of free space */
+       LocationIndex pd_upper;         /* offset to end of free space */
+       LocationIndex pd_special;       /* offset to start of special space */
+       uint16          pd_pagesize_version;
+
+       /*
+        * Below fields required for computing the offset of the first complete
+        * record on a undo page, which will be used for the undo record compression
+        * and undo page consistency checking.
+        */
+       uint16          uur_info;               /* uur_info field of the partial record. */
+       uint16          record_offset;  /* offset of the partial undo record. */
+       uint16          tuple_len;              /* Length of the tuple data in the partial
+                                                                * record. */
+       uint16          payload_len;    /* Length of the payload data in the partial
+                                                                * record. */
+} UndoPageHeaderData;
+
+typedef UndoPageHeaderData *UndoPageHeader;
+
+#define SizeOfUndoPageHeaderData (offsetof(UndoPageHeaderData, payload_len) + \
+                                                                 sizeof(uint16))
+
 /*
  * PageIsEmpty
  *             returns true iff no itemid has been allocated on the page
@@ -419,6 +456,9 @@ do { \
                                                ((is_heap) ? PAI_IS_HEAP : 0))
 
 extern void PageInit(Page page, Size pageSize, Size specialSize);
+extern void UndoPageInit(Page page, Size pageSize, uint16 uur_info,
+                                                uint16 record_offset, uint16 tuple_len,
+                                                uint16 payload_len);
 extern bool PageIsVerified(Page page, BlockNumber blkno);
 extern OffsetNumber PageAddItemExtended(Page page, Item item, Size size,
                                                                                OffsetNumber offsetNumber, int flags);