top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = undolog.o
+OBJS = undoaccess.o undolog.o undorecord.o
include $(top_srcdir)/src/backend/common.mk
--- /dev/null
+Undo record interface layer
+---------------------------
+This is the next layer which sits on top of the undo log storage, which will
+provide an interface for prepare, insert, or fetch the undo records. This
+layer will use undo-log-storage to reserve the space for the undo records
+and buffer management routine to write and read the undo records.
+
+Writing an undo record
+----------------------
+To prepare an undo record, first, it will allocate required space using
+undo log storage module. Next, it will pin and lock the required buffers and
+return an undo record pointer where it will insert the record. Finally, it
+calls the Insert routine for final insertion of prepared record. Additionally,
+there is a mechanism for multi-insert, wherein multiple records are prepared
+and inserted at a time.
+
+Fetching and undo record
+------------------------
+To fetch an undo record, a caller must provide a valid undo record pointer.
+Optionally, the caller can provide a callback function with the information of
+the block and offset, which will help in faster retrieval of undo record,
+otherwise, it has to traverse the undo-chain.
+
+There is also an interface to bulk fetch the undo records. Where the caller
+can provide a TO and FROM undo record pointer and the memory limit for storing
+the undo records. This API will return all the undo record between FROM and TO
+undo record pointers if they can fit into provided memory limit otherwise, it
+return whatever can fit into the memory limit. And, the caller can call it
+repeatedly until it fetches all the records.
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undoaccess.c
+ * entry points for inserting/fetching undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undoaccess.c
+ *
+ * NOTES:
+ * Undo record layout:
+ *
+ * Undo records are stored in sequential order in the undo log. Each undo
+ * record consists of a variable length header, tuple data, and payload
+ * information. The first undo record of each transaction contains a
+ * transaction header that points to the next transaction's start header.
+ * This allows us to discard the entire transaction's log at one-shot rather
+ * than record-by-record. The callers are not aware of transaction header,
+ * this is entirely maintained and used by undo record layer. See
+ * undorecord.h for detailed information about undo record header.
+ *
+ * Multiple logs:
+ *
+ * It is possible that the undo records for a transaction spans multiple undo
+ * logs. We need some special handling while inserting them to ensure that
+ * discard and rollbacks can work sanely.
+ *
+ * When the undo record for a transaction gets inserted in the next log then we
+ * add a transaction header for the first record of the transaction in the new
+ * log and connect this undo record to the first record of the transaction in
+ * the next log by updating the "uur_next" field.
+ *
+ * We will also keep a previous undo record pointer to the first and last undo
+ * record of the transaction in the previous log. The last undo record
+ * location is used find the previous undo record pointer during rollback.
+ * The first undo record location is used to find the previous transaction
+ * header which is required to update the undo apply progress.
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/undorecord.h"
+#include "access/undoaccess.h"
+#include "access/undolog_xlog.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/tablecmds.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "miscadmin.h"
+
+/*
+ * Information for compression of the undo records on a page so that we can
+ * avoid duplicating same values across multiple undo records on a page.
+ *
+ * The cid/xid/reloid/rmid information will be added in the undo record header
+ * in the following cases:
+ * a) The first undo record of the transaction.
+ * b) First undo record of the page.
+ * c) All subsequent record for the transaction which is not the first
+ * transaction on the page.
+ * Except above cases, If the rmid/reloid/xid/cid is same in the subsequent
+ * records this information will not be stored in the record, these information
+ * will be retrieved from the first undo record of that page.
+ * If any of the member rmid/reloid/xid/cid has changed, the changed information
+ * will be stored in the undo record and the remaining information will be
+ * retrieved from the first complete undo record of the page
+ */
+UndoCompressionInfo undo_compression_info[UndoLogCategories];
+
+/* Prototypes for static functions. */
+static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
+ UndoRecPtr urp, RelFileNode rnode,
+ UndoLogCategory category,
+ Buffer *prevbuf);
+static int UndoRecordPrepareTransInfo(UndoRecordInsertContext *context,
+ UndoRecPtr xact_urp, int size, int offset);
+static void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context,
+ int idx);
+static void UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context,
+ UndoRecPtr urecptr, UndoRecPtr xact_urp);
+static int UndoGetBufferSlot(UndoRecordInsertContext *context,
+ RelFileNode rnode, BlockNumber blk,
+ ReadBufferMode rbm);
+static uint16 UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer,
+ UndoLogCategory category);
+static bool UndoSetCommonInfo(UndoCompressionInfo *compressioninfo,
+ UnpackedUndoRecord *urec, UndoRecPtr urp,
+ Buffer buffer);
+
+/*
+ * Structure to hold the prepared undo information.
+ */
+struct PreparedUndoSpace
+{
+ UndoRecPtr urp; /* undo record pointer */
+ UnpackedUndoRecord *urec; /* undo record */
+ uint16 size; /* undo record size */
+ int undo_buffer_idx[MAX_BUFFER_PER_UNDO]; /* undo_buffer array
+ * index */
+};
+
+/*
+ * This holds undo buffers information required for PreparedUndoSpace during
+ * prepare undo time. Basically, during prepare time which is called outside
+ * the critical section we will acquire all necessary undo buffers pin and lock.
+ * Later, during insert phase we will write actual records into thse buffers.
+ */
+struct PreparedUndoBuffer
+{
+ UndoLogNumber logno; /* Undo log number */
+ BlockNumber blk; /* block number */
+ Buffer buf; /* buffer allocated for the block */
+ bool zero; /* new block full of zeroes */
+};
+
+/*
+ * Compute the size of the partial record on the undo page.
+ *
+ * Compute the complete record size by uur_info and variable field length
+ * stored in the page header and then subtract the offset of the record so that
+ * we can get the exact size of partial record in this page.
+ */
+static inline Size
+UndoPagePartialRecSize(UndoPageHeader phdr)
+{
+ Size size = UndoRecordHeaderSize(phdr->uur_info);
+
+ /*
+ * Add length of the variable part and undo length. Now, we know the
+ * complete length of the undo record.
+ */
+ size += phdr->tuple_len + phdr->payload_len + sizeof(uint16);
+
+ /*
+ * Subtract the size which is stored in the previous page to get the
+ * partial record size stored in this page.
+ */
+ size -= phdr->record_offset;
+
+ return size;
+}
+
+/*
+ * Prepare to update the transaction header
+ *
+ * It's a helper function for PrepareUpdateNext and
+ * PrepareUpdateUndoActionProgress
+ *
+ * xact_urp - undo record pointer to be updated.
+ * size - number of bytes to be updated.
+ * offset - offset in undo record where to start update.
+ */
+static int
+UndoRecordPrepareTransInfo(UndoRecordInsertContext *context,
+ UndoRecPtr xact_urp, int size, int offset)
+{
+ BlockNumber cur_blk;
+ RelFileNode rnode;
+ int starting_byte;
+ int bufidx;
+ int index = 0;
+ int remaining_bytes;
+ XactUndoRecordInfo *xact_info;
+
+ xact_info = &context->xact_urec_info[context->nxact_urec_info];
+
+ UndoRecPtrAssignRelFileNode(rnode, xact_urp);
+ cur_blk = UndoRecPtrGetBlockNum(xact_urp);
+ starting_byte = UndoRecPtrGetPageOffset(xact_urp);
+
+ /* Remaining bytes on the current block. */
+ remaining_bytes = BLCKSZ - starting_byte;
+
+ /*
+ * Is there some byte of the urec_next on the current block, if not then
+ * start from the next block.
+ */
+ if (remaining_bytes <= offset)
+ {
+ cur_blk++;
+ starting_byte = UndoLogBlockHeaderSize;
+ starting_byte += (offset - remaining_bytes);
+ }
+ else
+ starting_byte += offset;
+
+ /*
+ * Set the offset in the first block where we need to start writing,
+ * during the prepare phase so that during update phase we need not to
+ * compute it again.
+ */
+ xact_info->offset = starting_byte;
+
+ /* Loop until we have fetched all the buffers in which we need to write. */
+ while (size > 0)
+ {
+ bufidx = UndoGetBufferSlot(context, rnode, cur_blk, RBM_NORMAL);
+
+ Assert(index < MAX_BUFFER_PER_UNDO);
+
+ xact_info->idx_undo_buffers[index++] = bufidx;
+ size -= (BLCKSZ - starting_byte);
+ starting_byte = UndoLogBlockHeaderSize;
+ cur_blk++;
+ }
+
+ xact_info->next = InvalidUndoRecPtr;
+ xact_info->progress = 0;
+ xact_info->urecptr = xact_urp;
+ context->nxact_urec_info++;
+
+ return (context->nxact_urec_info - 1);
+}
+
+/*
+ * Prepare to update the transaction's next undo pointer.
+ *
+ * Lock necessary buffer for updating the next of the transaction header. This
+ * function is called for
+ * a. Updating the current transaction's start undo record pointer in previous
+ * transaction's start header.
+ * b. For multi-log transaction update the start undo record pointer of the
+ * current log in the same transaction's start undo record pointer in the
+ * previous log.
+ *
+ * xact_urp - undo record pointer to be updated
+ * urecptr - current transaction's undo record pointer which need to be set in
+ * the previous transaction's header.
+ */
+static void
+UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context,
+ UndoRecPtr urecptr, UndoRecPtr xact_urp)
+{
+ UndoLogSlot *slot;
+ int index = 0;
+ int offset;
+
+ /*
+ * The absence of previous transaction's undo indicate that this backend
+ * is preparing its first undo in which case we have nothing to update.
+ */
+ if (!UndoRecPtrIsValid(xact_urp))
+ return;
+
+ slot = UndoLogGetSlot(UndoRecPtrGetLogNo(xact_urp), false);
+
+ /*
+ * Acquire the discard lock before reading the undo record so that discard
+ * worker doesn't remove the record while we are in process of reading it.
+ */
+ LWLockAcquire(&slot->discard_update_lock, LW_SHARED);
+ /* Check if it is already discarded. */
+ if (UndoRecPtrIsDiscarded(xact_urp))
+ {
+ /* Release lock and return. */
+ LWLockRelease(&slot->discard_update_lock);
+ return;
+ }
+
+ /* Compute the offset of the uur_next in the undo record. */
+ offset = SizeOfUndoRecordHeader +
+ offsetof(UndoRecordTransaction, urec_next);
+
+ index = UndoRecordPrepareTransInfo(context, xact_urp,
+ sizeof(UndoRecPtr), offset);
+
+ /*
+ * Set the next pointer in xact_urec_info, this will be overwritten in
+ * actual undo record during update phase.
+ */
+ context->xact_urec_info[index].next = urecptr;
+
+ /* We can now release the discard lock as we have read the undo record. */
+ LWLockRelease(&slot->discard_update_lock);
+}
+
+/*
+ * Overwrite the first undo record of the previous transaction to update its
+ * next pointer.
+ *
+ * This will insert the already prepared record by UndoRecordPrepareTransInfo.
+ * This must be called under the critical section. This will just overwrite the
+ * header of the undo record.
+ */
+static void
+UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, int idx)
+{
+ Page page = NULL;
+ int i = 0;
+ int write_bytes;
+ int write_offset;
+ char *sourceptr;
+ XactUndoRecordInfo *xact_info = &context->xact_urec_info[idx];
+
+ /* Whether to update the next or undo apply progress. */
+ if (UndoRecPtrIsValid(xact_info->next))
+ {
+ sourceptr = (char *) &xact_info->next;
+ write_bytes = sizeof(xact_info->next);
+ }
+ else
+ {
+ sourceptr = (char *) &xact_info->progress;
+ write_bytes = sizeof(xact_info->progress);
+ }
+
+ /* Where to start writing in the current block. */
+ write_offset = xact_info->offset;
+
+ /*
+ * Start writing directly from the write offset calculated during prepare
+ * phase. And, loop until we write required bytes.
+ */
+ while (write_bytes > 0)
+ {
+ Buffer buffer;
+ int buf_idx;
+ int can_write;
+ char *writeptr;
+
+ buf_idx = xact_info->idx_undo_buffers[i];
+ buffer = context->prepared_undo_buffers[buf_idx].buf;
+
+ /* How may bytes can be written in the current page. */
+ can_write = Min((BLCKSZ - write_offset), write_bytes);
+
+ /*
+ * If buffer is valid then write it otherwise just skip writing it but
+ * compute the variable for writing into the next block.
+ */
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+
+ /* Compute the write pointer. */
+ writeptr = (char *) page + write_offset;
+
+ /* Copy the bytes we can write. */
+ memcpy(writeptr, sourceptr, can_write);
+ MarkBufferDirty(buffer);
+ }
+
+ /* Update bookkeeping information. */
+ write_bytes -= can_write;
+ sourceptr += can_write;
+ write_offset = UndoLogBlockHeaderSize;
+ i++;
+ }
+}
+
+/*
+ * Find the block number in undo buffer array
+ *
+ * If it is present then just return its index otherwise search the buffer and
+ * insert an entry and lock the buffer in exclusive mode.
+ *
+ * Undo log insertions are append-only. If the caller is writing new data
+ * that begins exactly at the beginning of a page, then there cannot be any
+ * useful data after that point. In that case RBM_ZERO can be passed in as
+ * rbm so that we can skip a useless read of a disk block. In all other
+ * cases, RBM_NORMAL should be passed in, to read the page in if it doesn't
+ * happen to be already in the buffer pool.
+ */
+static int
+UndoGetBufferSlot(UndoRecordInsertContext *context,
+ RelFileNode rnode,
+ BlockNumber blk,
+ ReadBufferMode rbm)
+{
+ int i;
+ Buffer buffer;
+ XLogRedoAction action = BLK_NEEDS_REDO;
+ PreparedUndoBuffer *prepared_buffer;
+ UndoLogCategory category = context->alloc_context.category;
+
+ /* Don't do anything, if we already have a buffer pinned for the block. */
+ for (i = 0; i < context->nprepared_undo_buffer; i++)
+ {
+ prepared_buffer = &context->prepared_undo_buffers[i];
+
+ /*
+ * It's not enough to just compare the block number because the
+ * undo_buffer might holds the undo from different undo logs (e.g when
+ * previous transaction start header is in previous undo log) so
+ * compare (logno + blkno).
+ */
+ if ((blk == prepared_buffer->blk) &&
+ (prepared_buffer->logno == rnode.relNode))
+ {
+ /* caller must hold exclusive lock on buffer */
+ Assert(BufferIsLocal(prepared_buffer->buf) ||
+ LWLockHeldByMeInMode(BufferDescriptorGetContentLock(
+ GetBufferDescriptor(prepared_buffer->buf - 1)),
+ LW_EXCLUSIVE));
+ return i;
+ }
+ }
+
+ /*
+ * We did not find the block so allocate the buffer and insert into the
+ * undo buffer array.
+ */
+ if (InRecovery)
+ action = XLogReadBufferForRedoBlock(context->alloc_context.xlog_record,
+ rnode,
+ UndoLogForkNum,
+ blk,
+ rbm,
+ false,
+ &buffer);
+ else
+ {
+ buffer = ReadBufferWithoutRelcache(rnode,
+ UndoLogForkNum,
+ blk,
+ rbm,
+ NULL,
+ RelPersistenceForUndoLogCategory(category));
+
+ /* Lock the buffer */
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ }
+
+ prepared_buffer =
+ &context->prepared_undo_buffers[context->nprepared_undo_buffer];
+
+ if (action == BLK_NOTFOUND)
+ {
+ Assert(InRecovery);
+
+ prepared_buffer->buf = InvalidBuffer;
+ prepared_buffer->blk = InvalidBlockNumber;
+ }
+ else
+ {
+ prepared_buffer->buf = buffer;
+ prepared_buffer->blk = blk;
+ prepared_buffer->logno = rnode.relNode;
+ prepared_buffer->zero = rbm == RBM_ZERO;
+ }
+
+ context->nprepared_undo_buffer++;
+
+ return i;
+}
+
+/*
+ * Exclude the common info in undo record flag and also set the compression
+ * info in the context.
+ *
+ * This function will check what information we need to include in the current
+ * undo record based on the undo compression information. And, it will also
+ * update the compression info if we are writing the first undo record on the
+ * page.
+ */
+static bool
+UndoSetCommonInfo(UndoCompressionInfo *compressioninfo,
+ UnpackedUndoRecord *urec, UndoRecPtr urp,
+ Buffer buffer)
+{
+ bool record_updated = false;
+ bool first_complete_undo = false;
+ UndoRecPtr lasturp = compressioninfo->last_urecptr;
+
+ /*
+ * If we have valid compression info and the for the same transaction and
+ * the current undo record is on the same block as the last undo record
+ * then exclude the common information which are same as first complete
+ * record on the page.
+ */
+ if (compressioninfo->valid &&
+ FullTransactionIdEquals(compressioninfo->fxid, urec->uur_fxid) &&
+ UndoRecPtrGetBlockNum(urp) == UndoRecPtrGetBlockNum(lasturp))
+ {
+ urec->uur_info &= ~UREC_INFO_XID;
+
+ /* Don't include rmid if it's same. */
+ if (urec->uur_rmid == compressioninfo->rmid)
+ urec->uur_info &= ~UREC_INFO_RMID;
+
+ /* Don't include reloid if it's same. */
+ if (urec->uur_reloid == compressioninfo->reloid)
+ urec->uur_info &= ~UREC_INFO_RELOID;
+
+ /* Don't include cid if it's same. */
+ if (urec->uur_cid == compressioninfo->cid)
+ urec->uur_info &= ~UREC_INFO_CID;
+
+ record_updated = true;
+ }
+
+ /*
+ * If the undo record is starting just after the undo page header then
+ * this is the first complete undo on the page.
+ */
+ if (UndoRecPtrGetPageOffset(urp) == SizeOfUndoPageHeaderData)
+ first_complete_undo = true;
+ else if (UndoRecPtrIsValid(lasturp))
+ {
+ /*
+ * If we already have the valid last undo record pointer which
+ * inserted undo in this log then we can identify whether this is the
+ * first undo of the page by checking the block number of the previous
+ * record and the current record.
+ */
+ if (UndoRecPtrGetBlockNum(urp) != UndoRecPtrGetBlockNum(lasturp))
+ first_complete_undo = true;
+ }
+ else
+ {
+ Page page = BufferGetPage(buffer);
+ uint16 offset;
+ UndoPageHeader phdr = (UndoPageHeader) page;
+
+ /*
+ * We need to compute the offset of the first complete record of the
+ * page and if this undo record starting from that page then this is
+ * the first complete record on the page.
+ */
+ offset = SizeOfUndoPageHeaderData + UndoPagePartialRecSize(phdr);
+ if (UndoRecPtrGetPageOffset(urp) == offset)
+ first_complete_undo = true;
+ }
+
+ /*
+ * If we are writing first undo record for the page the we can set the
+ * compression so that subsequent records from the same transaction can
+ * avoid including common information in the undo records.
+ */
+ if (first_complete_undo)
+ {
+ /* Set this information. */
+ compressioninfo->rmid = urec->uur_rmid;
+ compressioninfo->reloid = urec->uur_reloid;
+ compressioninfo->fxid = urec->uur_fxid;
+ compressioninfo->cid = urec->uur_cid;
+
+ /* Set that we have valid compression info. */
+ compressioninfo->valid = true;
+ }
+
+ return record_updated;
+}
+
+/*
+ * This function must be called before all the undo records which are going to
+ * get inserted under a single WAL record.
+ *
+ * nprepared - This defines the max number of undo records that can be
+ * prepared before inserting them.
+ */
+void
+BeginUndoRecordInsert(UndoRecordInsertContext *context,
+ UndoLogCategory category,
+ int nprepared,
+ XLogReaderState *xlog_record)
+{
+ uint32 nbuffers;
+
+ /* At least one prepared record should be there. */
+ if (nprepared <= 0)
+ elog(ERROR, "at least one undo record should be prepared");
+
+ /* Initialize undo log context. */
+ UndoLogBeginInsert(&context->alloc_context, category, xlog_record);
+
+ /* Initialize undo insert context. */
+ context->max_prepared_undo = nprepared;
+ context->nprepared_undo = 0;
+ context->nprepared_undo_buffer = 0;
+ context->nxact_urec_info = 0;
+
+ /* Allocate memory for prepared undo record space. */
+ context->prepared_undo = (PreparedUndoSpace *) palloc(nprepared *
+ sizeof(PreparedUndoSpace));
+
+ /* Compute number of buffers. */
+ nbuffers = (nprepared + MAX_XACT_UNDO_INFO) * MAX_BUFFER_PER_UNDO;
+
+ /*
+ * Copy the compression global compression info to our context before
+ * starting prepare because this value might get updated multiple time in
+ * case of multi-prepare but the global value should be updated only after
+ * we have successfully inserted the undo record.
+ */
+ memcpy(&context->undo_compression_info[category],
+ &undo_compression_info[category], sizeof(UndoCompressionInfo));
+
+ /* Allocate memory for the prepared buffers. */
+ context->prepared_undo_buffers =
+ palloc(nbuffers * sizeof(PreparedUndoBuffer));
+}
+
+/*
+ * Call PrepareUndoInsert to tell the undo subsystem about the undo record you
+ * intended to insert. Upon return, the necessary undo buffers are pinned and
+ * locked.
+ *
+ * This should be done before any critical section is established, since it
+ * can fail.
+ */
+UndoRecPtr
+PrepareUndoInsert(UndoRecordInsertContext *context,
+ UnpackedUndoRecord *urec,
+ Oid dbid)
+{
+ UndoRecordSize size;
+ UndoRecPtr urecptr = InvalidUndoRecPtr;
+ RelFileNode rnode;
+ UndoRecordSize cur_size = 0;
+ BlockNumber cur_blk;
+ FullTransactionId fxid;
+ int starting_byte;
+ int index = 0;
+ int bufidx;
+ bool logswitched = false;
+ bool resize = false;
+ ReadBufferMode rbm;
+ bool need_xact_header;
+ UndoRecPtr last_xact_start;
+ UndoRecPtr prevlog_xact_start = InvalidUndoRecPtr;
+ UndoRecPtr prevlog_insert_urp = InvalidUndoRecPtr;
+ UndoRecPtr prevlogurp = InvalidUndoRecPtr;
+ PreparedUndoSpace *prepared_undo;
+ UndoCompressionInfo *compression_info =
+ &context->undo_compression_info[context->alloc_context.category];
+
+ /* Already reached maximum prepared limit. */
+ if (context->nprepared_undo == context->max_prepared_undo)
+ elog(ERROR, "already reached the maximum prepared limit");
+
+ /* Extract the full transaction id from the input undo record. */
+ fxid = urec->uur_fxid;
+ Assert(FullTransactionIdIsValid(fxid));
+
+ /*
+ * We don't yet know if this record needs a transaction header (ie is the
+ * first undo record for a given transaction in a given undo log), because
+ * you can only find out by allocating. We'll resolve this circularity by
+ * allocating enough space for a transaction header. Similarly log switch
+ * will only be detected after allocation so include the log switch header
+ * and common information because in case of log switch we need to include
+ * log switch header and also we need to include common header. After
+ * allocation We'll only advance by as many bytes as we turn out to need.
+ */
+ UndoRecordSetInfo(urec);
+ urec->uur_info |= UREC_INFO_TRANSACTION;
+ urec->uur_info |= UREC_INFO_LOGSWITCH;
+ urec->uur_info |= UREC_INFO_PAGE_COMMON;
+
+ size = UndoRecordExpectedSize(urec);
+
+ /* Allocate space for the record. */
+ if (InRecovery)
+ {
+ /*
+ * We'll figure out where the space needs to be allocated by
+ * inspecting the xlog_record. We don't expect to see temporary or
+ * unlogged undo data here.
+ */
+ Assert(context->alloc_context.category != UNDO_TEMP &&
+ context->alloc_context.category != UNDO_UNLOGGED);
+ urecptr = UndoLogAllocateInRecovery(&context->alloc_context,
+ XidFromFullTransactionId(fxid),
+ size,
+ &need_xact_header,
+ &last_xact_start,
+ &prevlog_xact_start,
+ &prevlogurp);
+ }
+ else
+ {
+ /* Allocate space for writing the undo record. */
+ urecptr = UndoLogAllocate(&context->alloc_context,
+ size,
+ &need_xact_header, &last_xact_start,
+ &prevlog_xact_start, &prevlog_insert_urp);
+
+ /*
+ * If prevlog_xact_start is a valid undo record pointer that means
+ * this transaction's undo records are split across undo logs.
+ */
+ if (UndoRecPtrIsValid(prevlog_xact_start))
+ {
+ uint16 prevlen;
+
+ /*
+ * If undo log is switch during transaction then we must get a
+ * valid insert location in the previous undo log so that we can
+ * compute the undo record pointer of the transaction's last
+ * record in the previous undo log.
+ */
+ Assert(UndoRecPtrIsValid(prevlog_insert_urp));
+
+ /* Fetch length of the last undo record of the previous log. */
+ prevlen = UndoGetPrevRecordLen(prevlog_insert_urp, InvalidBuffer,
+ context->alloc_context.category);
+
+ /*
+ * If the undo log got switched during the transaction then for
+ * collecting all the undo record for the transaction during bulk
+ * fetch, we can not read the prevlen from the end of the record
+ * as we will not know what was the previous undo log. So during
+ * log switch we will directly store the last undo record pointer
+ * of the transaction into transaction's first record of the next
+ * undo log.
+ *
+ * TODO: instead of storing this in the transaction header we can
+ * have separate undo log switch header and store it there.
+ */
+ prevlogurp =
+ MakeUndoRecPtr(UndoRecPtrGetLogNo(prevlog_insert_urp),
+ (UndoRecPtrGetOffset(prevlog_insert_urp) - prevlen));
+
+ /*
+ * Undo log switched so set prevlog info in current undo log.
+ *
+ * XXX can we do this directly in UndoLogAllocate ? but for that
+ * the UndoLogAllocate might need to read the length of the last
+ * undo record from the previous undo log but for that it might
+ * use callback?
+ */
+ UndoLogSwitchSetPrevLogInfo(UndoRecPtrGetLogNo(urecptr),
+ prevlog_xact_start, prevlogurp);
+ }
+ }
+
+ /*
+ * If undo log is switched then set the logswitch flag and also reset the
+ * compression info because we can use same compression info for the new
+ * undo log.
+ */
+ if (UndoRecPtrIsValid(prevlog_xact_start))
+ {
+ logswitched = true;
+ compression_info->valid = false;
+ compression_info->last_urecptr = InvalidUndoRecPtr;
+ }
+
+ /*
+ * If we need a transaction header then allocate memory for it and
+ * initialize it.
+ */
+ if (need_xact_header)
+ {
+ urec->uur_txn = palloc(SizeOfUndoRecordTransaction);
+ urec->uur_txn->urec_dbid = dbid;
+ urec->uur_txn->urec_progress = InvalidBlockNumber;
+ urec->uur_txn->urec_next = InvalidUndoRecPtr;
+ }
+ else
+ {
+ /* We don't need a transaction header after all. */
+ urec->uur_info &= ~UREC_INFO_TRANSACTION;
+ resize = true;
+ urec->uur_txn = NULL;
+ }
+
+ /*
+ * If undo log got switched then initialize the log switch header
+ * otherwise reset it in uur_info and recalculate the size.
+ */
+ if (logswitched)
+ {
+ urec->uur_logswitch = palloc(SizeOfUndoRecordLogSwitch);
+ urec->uur_logswitch->urec_prevurp = prevlogurp;
+ urec->uur_logswitch->urec_prevlogstart = prevlog_xact_start;
+ }
+ else
+ {
+ /* We don't need a log transaction header after all. */
+ urec->uur_info &= ~UREC_INFO_LOGSWITCH;
+ resize = true;
+ urec->uur_logswitch = NULL;
+ }
+
+ /*
+ * If there is a physically preceding transaction in this undo log, and we
+ * are writing the first record for this transaction that is in this undo
+ * log (not necessarily the first ever for the transaction, because we
+ * could have switched logs), then we need to update the size of the
+ * preceding transaction.
+ */
+ if (need_xact_header &&
+ UndoRecPtrGetOffset(urecptr) > UndoLogBlockHeaderSize)
+ UndoRecordPrepareUpdateNext(context, urecptr, last_xact_start);
+
+ cur_blk = UndoRecPtrGetBlockNum(urecptr);
+ UndoRecPtrAssignRelFileNode(rnode, urecptr);
+ starting_byte = UndoRecPtrGetPageOffset(urecptr);
+
+ /*
+ * If we happen to be writing the very first byte into this page, then
+ * there is no need to read from disk.
+ */
+ if (starting_byte == UndoLogBlockHeaderSize)
+ rbm = RBM_ZERO;
+ else
+ rbm = RBM_NORMAL;
+
+ prepared_undo = &context->prepared_undo[context->nprepared_undo];
+
+ do
+ {
+ bufidx = UndoGetBufferSlot(context, rnode, cur_blk, rbm);
+ if (cur_size == 0)
+ cur_size = BLCKSZ - starting_byte;
+ else
+ cur_size += BLCKSZ - UndoLogBlockHeaderSize;
+
+ /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+ Assert(index < MAX_BUFFER_PER_UNDO);
+
+ /* Keep the track of the buffers we have pinned and locked. */
+ prepared_undo->undo_buffer_idx[index++] = bufidx;
+
+ /*
+ * If we need more pages they'll be all new so we can definitely skip
+ * reading from disk.
+ */
+ rbm = RBM_ZERO;
+ cur_blk++;
+ } while (cur_size < size);
+
+ /*
+ * Set/overwrite compression info if required and also exclude the common
+ * fields from the undo record if possible.
+ */
+ if (UndoSetCommonInfo(compression_info, urec, urecptr,
+ context->prepared_undo_buffers[prepared_undo->undo_buffer_idx[0]].buf))
+ resize = true;
+
+ if (resize)
+ size = UndoRecordExpectedSize(urec);
+
+ /*
+ * If the transaction's undo records are split across the undo logs. So
+ * we need to update our own transaction header in the previous log.
+ */
+ if (logswitched)
+ {
+ Assert(UndoRecPtrIsValid(prevlogurp));
+ UndoRecordPrepareUpdateNext(context, urecptr, prevlog_xact_start);
+ }
+
+ UndoLogAdvance(&context->alloc_context, size);
+
+ /*
+ * Save prepared undo record information into the context which will be
+ * used by InsertPreparedUndo to insert the undo record.
+ */
+ prepared_undo->urec = urec;
+ prepared_undo->urp = urecptr;
+ prepared_undo->size = size;
+
+ /* Set the current undo pointer in the compression info. */
+ compression_info->last_urecptr = urecptr;
+
+ context->nprepared_undo++;
+
+ return urecptr;
+}
+
+/*
+ * Insert a previously-prepared undo records.
+ *
+ * This function will write the actual undo record into the buffers which are
+ * already pinned and locked in PreparedUndoInsert, and mark them dirty. This
+ * step should be performed inside a critical section.
+ */
+void
+InsertPreparedUndo(UndoRecordInsertContext *context)
+{
+ UndoPackContext ucontext = {{0}};
+ PreparedUndoSpace *prepared_undo;
+ Page page = NULL;
+ int starting_byte;
+ int bufidx = 0;
+ int idx;
+ int i;
+
+ /* There must be at least one prepared undo record. */
+ Assert(context->nprepared_undo > 0);
+
+ /*
+ * This must be called under a critical section or we must be in recovery.
+ */
+ Assert(InRecovery || CritSectionCount > 0);
+
+ for (idx = 0; idx < context->nprepared_undo; idx++)
+ {
+ prepared_undo = &context->prepared_undo[idx];
+
+ Assert(prepared_undo->size ==
+ UndoRecordExpectedSize(prepared_undo->urec));
+
+ bufidx = 0;
+
+ /*
+ * Compute starting offset of the page where to start inserting undo
+ * record.
+ */
+ starting_byte = UndoRecPtrGetPageOffset(prepared_undo->urp);
+
+ /* Initiate inserting the undo record. */
+ BeginInsertUndo(&ucontext, prepared_undo->urec);
+
+ /* Main loop for writing the undo record. */
+ do
+ {
+ Buffer buffer;
+
+ buffer = context->prepared_undo_buffers[
+ prepared_undo->undo_buffer_idx[bufidx]].buf;
+
+ /*
+ * During recovery, there might be some blocks which are already
+ * deleted due to some discard command so we can just skip
+ * inserting into those blocks.
+ */
+ if (!BufferIsValid(buffer))
+ {
+ Assert(InRecovery);
+
+ /*
+ * Skip actual writing just update the context so that we have
+ * write offset for inserting into next blocks.
+ */
+ SkipInsertingUndoData(&ucontext, BLCKSZ - starting_byte);
+ if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+ break;
+ }
+ else
+ {
+ page = BufferGetPage(buffer);
+
+ /*
+ * Initialize the page whenever we try to write the first
+ * record in page. We start writing immediately after the
+ * block header.
+ */
+ if (starting_byte == UndoLogBlockHeaderSize)
+ UndoPageInit(page, BLCKSZ, prepared_undo->urec->uur_info,
+ ucontext.already_processed,
+ prepared_undo->urec->uur_tuple.len,
+ prepared_undo->urec->uur_payload.len);
+
+ /*
+ * Try to insert the record into the current page. If it
+ * doesn't succeed then recall the routine with the next page.
+ */
+ InsertUndoData(&ucontext, page, starting_byte);
+ if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+ {
+ MarkBufferDirty(buffer);
+ break;
+ }
+ MarkBufferDirty(buffer);
+ }
+
+ /* Insert remaining record in next block. */
+ starting_byte = UndoLogBlockHeaderSize;
+ bufidx++;
+
+ /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+ Assert(bufidx < MAX_BUFFER_PER_UNDO);
+ } while (true);
+
+ /* Advance the insert pointer past this record. */
+ UndoLogAdvanceFinal(prepared_undo->urp, prepared_undo->size);
+ }
+
+ /* Update previously prepared transaction headers. */
+ for (i = 0; i < context->nxact_urec_info; i++)
+ UndoRecordUpdateTransInfo(context, i);
+
+ /*
+ * We have successfully inserted prepared undo records so overwrite the
+ * global compression.
+ */
+ memcpy(&undo_compression_info[context->alloc_context.category],
+ &context->undo_compression_info[context->alloc_context.category],
+ sizeof(UndoCompressionInfo));
+
+}
+
+/*
+ * Release all the memory and buffer pins hold for inserting the undo records.
+ */
+void
+FinishUndoRecordInsert(UndoRecordInsertContext *context)
+{
+ int i;
+
+ /* Release buffer pins and lock. */
+ for (i = 0; i < context->nprepared_undo_buffer; i++)
+ {
+ if (BufferIsValid(context->prepared_undo_buffers[i].buf))
+ UnlockReleaseBuffer(context->prepared_undo_buffers[i].buf);
+ }
+
+ /*
+ * Release memory for the transaction header and log switch header if we
+ * have allocated it in the prepare time.
+ */
+ for (i = 0; i < context->nprepared_undo; i++)
+ {
+ if (context->prepared_undo[i].urec->uur_txn)
+ pfree(context->prepared_undo[i].urec->uur_txn);
+ if (context->prepared_undo[i].urec->uur_logswitch)
+ pfree(context->prepared_undo[i].urec->uur_logswitch);
+ }
+
+ /* Free memory allocated for the prepare undo and prepared buffers. */
+ pfree(context->prepared_undo_buffers);
+ pfree(context->prepared_undo);
+}
+
+/*
+ * Helper function for UndoGetOneRecord
+ *
+ * If any of rmid/reloid/xid/cid is not available in the undo record, then
+ * it will get the information from the first complete undo record in the
+ * page.
+ */
+static void
+GetCommonUndoRecInfo(UndoPackContext *ucontext, UndoRecPtr urp,
+ RelFileNode rnode, UndoLogCategory category, Buffer buffer)
+{
+ /*
+ * If any of the common header field is not available in the current undo
+ * record then we must read it from the first complete record of the page.
+ */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_PAGE_COMMON) !=
+ UREC_INFO_PAGE_COMMON)
+ {
+ UnpackedUndoRecord first_uur = {0};
+ Page page = BufferGetPage(buffer);
+ UndoPageHeader undo_phdr = (UndoPageHeader) page;
+ UndoRecPtr first_urp = InvalidUndoRecPtr;
+ Size partial_rec_size = SizeOfUndoPageHeaderData;
+ BlockNumber blkno = UndoRecPtrGetBlockNum(urp);
+
+ /*
+ * If there is a partial record in the page then compute the size of
+ * it so that we can compute the undo record pointer of the first
+ * complete undo record of the page.
+ */
+ if (undo_phdr->record_offset != 0)
+ partial_rec_size += UndoPagePartialRecSize(undo_phdr);
+
+ /*
+ * Compute the undo record pointer of the first complete record of the
+ * page.
+ */
+ first_urp = MakeUndoRecPtr(rnode.relNode,
+ UndoRecPageOffsetGetRecPtr(partial_rec_size, blkno));
+
+ /* Fetch the first undo record of the page. */
+ UndoGetOneRecord(&first_uur, first_urp, rnode, category, &buffer);
+
+ /*
+ * Get all missing common header information from the first undo
+ * records.
+ */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_RMID) == 0)
+ ucontext->urec_rmid = first_uur.uur_rmid;
+
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_RELOID) == 0)
+ ucontext->urec_reloid = first_uur.uur_reloid;
+
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_XID) == 0)
+ ucontext->urec_fxid = first_uur.uur_fxid;
+
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_CID) == 0)
+ ucontext->urec_cid = first_uur.uur_cid;
+
+ ucontext->urec_hd.urec_info |= UREC_INFO_PAGE_COMMON;
+ }
+}
+
+/*
+ * Helper function for UndoFetchRecord and UndoBulkFetchRecord
+ *
+ * curbuf - If an input buffer is valid then this function will not release the
+ * pin on that buffer. If the buffer is not valid then it will assign curbuf
+ * with the first buffer of the current undo record and also it will keep the
+ * pin and lock on that buffer in a hope that while traversing the undo chain
+ * the caller might want to read the previous undo record from the same block.
+ */
+static UnpackedUndoRecord *
+UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode,
+ UndoLogCategory category, Buffer *curbuf)
+{
+ Page page;
+ int starting_byte = UndoRecPtrGetPageOffset(urp);
+ BlockNumber cur_blk;
+ UndoPackContext ucontext = {{0}};
+ Buffer buffer = *curbuf;
+
+ cur_blk = UndoRecPtrGetBlockNum(urp);
+
+ /* Initiate unpacking one undo record. */
+ BeginUnpackUndo(&ucontext);
+
+ while (true)
+ {
+ /* If we already have a buffer then no need to allocate a new one. */
+ if (!BufferIsValid(buffer))
+ {
+ buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, cur_blk,
+ RBM_NORMAL, NULL,
+ RelPersistenceForUndoLogCategory(category));
+
+ /*
+ * Remember the first buffer where this undo started as next undo
+ * record what we fetch might fall on the same buffer.
+ */
+ if (!BufferIsValid(*curbuf))
+ *curbuf = buffer;
+ }
+
+ /* Acquire shared lock on the buffer before reading undo from it. */
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+ page = BufferGetPage(buffer);
+
+ UnpackUndoData(&ucontext, page, starting_byte);
+
+ /*
+ * We are done if we have reached to the done stage otherwise move to
+ * next block and continue reading from there.
+ */
+ if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+ {
+ if (buffer != *curbuf)
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * Get any of the missing fields from the first record of the
+ * page.
+ */
+ GetCommonUndoRecInfo(&ucontext, urp, rnode, category, *curbuf);
+ break;
+ }
+
+ /*
+ * The record spans more than a page so we would have copied it (see
+ * UnpackUndoRecord). In such cases, we can release the buffer.
+ */
+ if (buffer != *curbuf)
+ UnlockReleaseBuffer(buffer);
+ buffer = InvalidBuffer;
+
+ /* Go to next block. */
+ cur_blk++;
+ starting_byte = UndoLogBlockHeaderSize;
+ }
+
+ /* Final step of unpacking. */
+ FinishUnpackUndo(&ucontext, urec);
+
+ /* Unlock the buffer but keep the pin. */
+ LockBuffer(*curbuf, BUFFER_LOCK_UNLOCK);
+
+ return urec;
+}
+
+/*
+ * BeginUndoFetch to fetch undo record.
+ */
+void
+BeginUndoFetch(UndoRecordFetchContext *context)
+{
+ context->buffer = InvalidBuffer;
+ context->urp = InvalidUndoRecPtr;
+}
+
+/*
+ * Fetch the undo record for given undo record pointer.
+ *
+ * This will internally allocate the memory for the unpacked undo record which
+ * intern will hold the pointers to the optional headers and the variable data.
+ * The undo record should be freed by the caller by calling ReleaseUndoRecord.
+ * This function will old the pin on the buffer where we read the previous undo
+ * record so that when this function is called repeatedly with the same context
+ * then it can be benefitted by avoid reading the buffer again if the current
+ * undo record is in the same buffer.
+ */
+UnpackedUndoRecord *
+UndoFetchRecord(UndoRecordFetchContext *context, UndoRecPtr urp)
+{
+ RelFileNode rnode;
+ int logno;
+ UndoLogSlot *slot;
+ UnpackedUndoRecord *uur = NULL;
+
+ logno = UndoRecPtrGetLogNo(urp);
+ slot = UndoLogGetSlot(logno, true);
+
+ /*
+ * If slot is NULL that means undo log number is unknown. Presumably it
+ * has been entirely discarded.
+ */
+ if (slot == NULL)
+ return NULL;
+
+ /*
+ * Prevent UndoDiscardOneLog() from discarding data while we try to read
+ * it. Usually we would acquire log->mutex to read log->meta members, but
+ * in this case we know that discard can't move without also holding
+ * log->discard_lock.
+ *
+ * In Hot Standby mode log->oldest_data is never initialized because it's
+ * get updated by undo discard worker whereas in HotStandby undo logs are
+ * getting discarded using discard WAL. So in HotStandby we can directly
+ * check whether the undo record pointer is discarded or not. But, we can
+ * not do same for normal case because discard worker can concurrently
+ * discard the undo logs.
+ *
+ * XXX We can avoid this check by always initializing log->oldest_data in
+ * HotStandby mode as well whenever we apply discard WAL. But, for doing
+ * that we need to acquire discard lock just for setting this variable?
+ */
+ if (InHotStandby)
+ {
+ if (UndoRecPtrIsDiscarded(urp))
+ return NULL;
+ }
+ else
+ {
+ LWLockAcquire(&slot->discard_lock, LW_SHARED);
+ if (slot->logno != logno || urp < slot->oldest_data)
+ {
+ /*
+ * The slot has been recycled because the undo log was entirely
+ * discarded, or the pointer is before the oldest data.
+ */
+ LWLockRelease(&slot->discard_lock);
+ return NULL;
+ }
+ }
+
+ /*
+ * Allocate memory for holding the undo record, caller should be
+ * responsible for freeing this memory by calling UndoRecordRelease.
+ */
+ uur = palloc0(sizeof(UnpackedUndoRecord));
+ UndoRecPtrAssignRelFileNode(rnode, urp);
+
+ /*
+ * Before fetching the next record check whether we have a valid buffer in
+ * the context. If so and if we are reading the current record from the
+ * same then pass that buffer to fetch the undo record, otherwise release
+ * the buffer.
+ */
+ if (BufferIsValid(context->buffer) &&
+ (UndoRecPtrGetLogNo(context->urp) != UndoRecPtrGetLogNo(urp) ||
+ UndoRecPtrGetBlockNum(context->urp) != UndoRecPtrGetBlockNum(urp)))
+ {
+ ReleaseBuffer(context->buffer);
+ context->buffer = InvalidBuffer;
+ }
+
+ /* Fetch the current undo record. */
+ UndoGetOneRecord(uur, urp, rnode, slot->meta.category, &context->buffer);
+
+ /* Release the discard lock after fetching the record. */
+ if (!InHotStandby)
+ LWLockRelease(&slot->discard_lock);
+
+ context->urp = urp;
+
+ return uur;
+}
+
+/*
+ * Finish undo record fetch.
+ */
+void
+FinishUndoFetch(UndoRecordFetchContext *context)
+{
+ if (BufferIsValid(context->buffer))
+ ReleaseBuffer(context->buffer);
+}
+
+/*
+ * Release the memory of the undo record allocated by UndoFetchRecord and
+ * UndoBulkFetchRecord.
+ */
+void
+UndoRecordRelease(UnpackedUndoRecord *urec)
+{
+ /* Release the memory of payload data if we allocated it. */
+ if (urec->uur_payload.data)
+ pfree(urec->uur_payload.data);
+
+ /* Release memory of tuple data if we allocated it. */
+ if (urec->uur_tuple.data)
+ pfree(urec->uur_tuple.data);
+
+ /* Release memory of the transaction header if we allocated it. */
+ if (urec->uur_txn)
+ pfree(urec->uur_txn);
+
+ /* Release memory of the logswitch header if we allocated it. */
+ if (urec->uur_logswitch)
+ pfree(urec->uur_logswitch);
+
+ /* Release the memory of the undo record. */
+ pfree(urec);
+}
+
+/*
+ * Prefetch undo pages, if prefetch_pages are behind prefetch_target
+ */
+static void
+PrefetchUndoPages(RelFileNode rnode, int prefetch_target, int *prefetch_pages,
+ BlockNumber to_blkno, BlockNumber from_blkno,
+ UndoLogCategory category)
+{
+ int nprefetch;
+ BlockNumber startblock;
+ BlockNumber lastprefetched;
+
+ /* Calculate last prefetched page in the previous iteration. */
+ lastprefetched = from_blkno - *prefetch_pages;
+
+ /* We have already prefetched all the pages of the transaction's undo. */
+ if (lastprefetched <= to_blkno)
+ return;
+
+ /* Calculate number of blocks to be prefetched. */
+ nprefetch =
+ Min(prefetch_target - *prefetch_pages, lastprefetched - to_blkno);
+
+ /* Where to start prefetch. */
+ startblock = lastprefetched - nprefetch;
+
+ while (nprefetch--)
+ {
+ PrefetchBufferWithoutRelcache(rnode, MAIN_FORKNUM, startblock++,
+ category == UNDO_TEMP);
+ (*prefetch_pages)++;
+ }
+}
+
+/*
+ * Read undo records of the transaction in bulk
+ *
+ * Read undo records between from_urecptr and to_urecptr until we exhaust the
+ * the memory size specified by undo_apply_size. If we could not read all the
+ * records till to_urecptr then the caller should consume current set of records
+ * and call this function again.
+ *
+ * from_urecptr - Where to start fetching the undo records. If we can not
+ * read all the records because of memory limit then this
+ * will be set to the previous undo record pointer from where
+ * we need to start fetching on next call. Otherwise it will
+ * be set to InvalidUndoRecPtr.
+ * to_urecptr - Last undo record pointer to be fetched.
+ * undo_apply_size - Memory segment limit to collect undo records.
+ * nrecords - Number of undo records read.
+ * one_page - Caller is applying undo only for one block not for
+ * complete transaction. If this is set true then instead
+ * of following transaction undo chain using prevlen we will
+ * follow the block prev chain of the block so that we can
+ * avoid reading many unnecessary undo records of the
+ * transaction.
+ */
+UndoRecInfo *
+UndoBulkFetchRecord(UndoRecPtr *from_urecptr, UndoRecPtr to_urecptr,
+ int undo_apply_size, int *nrecords, bool one_page)
+{
+ RelFileNode rnode;
+ UndoRecPtr urecptr,
+ prev_urec_ptr;
+ BlockNumber blkno;
+ BlockNumber to_blkno;
+ Buffer buffer = InvalidBuffer;
+ UnpackedUndoRecord *uur = NULL;
+ UndoRecInfo *urp_array;
+ int urp_array_size = 1024;
+ int urp_index = 0;
+ int prefetch_target = 0;
+ int prefetch_pages = 0;
+ Size total_size = 0;
+ FullTransactionId fxid = InvalidFullTransactionId;
+
+ /*
+ * In one_page mode we are fetching undo only for one page instead of
+ * fetching all the undo of the transaction. Basically, we are fetching
+ * interleaved undo records. So it does not make sense to do any prefetch
+ * in that case. Also, if we are fetching undo records from more than one
+ * log, we don't know the boundaries for prefetching. Hence, we can't use
+ * prefetching in this case.
+ */
+ if (!one_page &&
+ (UndoRecPtrGetLogNo(*from_urecptr) == UndoRecPtrGetLogNo(to_urecptr)))
+ prefetch_target = target_prefetch_pages;
+
+ /*
+ * Allocate initial memory to hold the undo record info, we can expand it
+ * if needed.
+ */
+ urp_array = (UndoRecInfo *) palloc(sizeof(UndoRecInfo) * urp_array_size);
+ urecptr = *from_urecptr;
+
+ prev_urec_ptr = InvalidUndoRecPtr;
+ *from_urecptr = InvalidUndoRecPtr;
+
+ /* Read undo chain backward until we reach to the first undo record. */
+ while (1)
+ {
+ BlockNumber from_blkno;
+ UndoLogSlot *slot;
+ UndoLogCategory category;
+ int size;
+ int logno;
+
+ logno = UndoRecPtrGetLogNo(urecptr);
+ slot = UndoLogGetSlot(logno, true);
+ if (slot == NULL)
+ {
+ if (BufferIsValid(buffer))
+ ReleaseBuffer(buffer);
+ return NULL;
+ }
+ category = slot->meta.category;
+
+ UndoRecPtrAssignRelFileNode(rnode, urecptr);
+ to_blkno = UndoRecPtrGetBlockNum(to_urecptr);
+ from_blkno = UndoRecPtrGetBlockNum(urecptr);
+
+ /* Allocate memory for next undo record. */
+ uur = palloc0(sizeof(UnpackedUndoRecord));
+
+ /*
+ * If next undo record pointer to be fetched is not on the same block
+ * then release the old buffer and reduce the prefetch_pages count by
+ * one as we have consumed one page. Otherwise, just set the old
+ * buffer into the new undo record so that UndoGetOneRecord don't read
+ * the buffer again.
+ */
+ blkno = UndoRecPtrGetBlockNum(urecptr);
+ if (!UndoRecPtrIsValid(prev_urec_ptr) ||
+ UndoRecPtrGetLogNo(prev_urec_ptr) != logno ||
+ UndoRecPtrGetBlockNum(prev_urec_ptr) != blkno)
+ {
+ /* Release the previous buffer */
+ if (BufferIsValid(buffer))
+ {
+ ReleaseBuffer(buffer);
+ buffer = InvalidBuffer;
+ }
+
+ if (prefetch_pages > 0)
+ prefetch_pages--;
+ }
+
+ /*
+ * If prefetch_pages are half of the prefetch_target then it's time to
+ * prefetch again.
+ */
+ if (prefetch_pages < prefetch_target / 2)
+ PrefetchUndoPages(rnode, prefetch_target, &prefetch_pages, to_blkno,
+ from_blkno, category);
+
+ /*
+ * In one_page mode it's possible that the undo of the transaction
+ * might have been applied by worker and undo got discarded. Prevent
+ * discard worker from discarding undo data while we are reading it.
+ * See detail comment in UndoFetchRecord. In normal mode we are
+ * holding transaction undo action lock so it can not be discarded.
+ */
+ if (one_page)
+ {
+ /* Refer comments in UndoFetchRecord. */
+ if (InHotStandby)
+ {
+ if (UndoRecPtrIsDiscarded(urecptr))
+ break;
+ }
+ else
+ {
+ LWLockAcquire(&slot->discard_lock, LW_SHARED);
+ if (slot->logno != logno || urecptr < slot->oldest_data)
+ {
+ /*
+ * The undo log slot has been recycled because it was
+ * entirely discarded, or the data has been discarded
+ * already.
+ */
+ LWLockRelease(&slot->discard_lock);
+ break;
+ }
+ }
+
+ /* Read the undo record. */
+ UndoGetOneRecord(uur, urecptr, rnode, category, &buffer);
+
+ /* Release the discard lock after fetching the record. */
+ if (!InHotStandby)
+ LWLockRelease(&slot->discard_lock);
+ }
+ else
+ UndoGetOneRecord(uur, urecptr, rnode, category, &buffer);
+
+ /*
+ * As soon as the transaction id is changed we can stop fetching the
+ * undo record. Ideally, to_urecptr should control this but while
+ * reading undo only for a page we don't know what is the end undo
+ * record pointer for the transaction.
+ */
+ if (one_page)
+ {
+ if (!FullTransactionIdIsValid(fxid))
+ fxid = uur->uur_fxid;
+ else if (!FullTransactionIdEquals(fxid, uur->uur_fxid))
+ break;
+ }
+
+ /* Remember the previous undo record pointer. */
+ prev_urec_ptr = urecptr;
+
+ /*
+ * Calculate the previous undo record pointer of the transaction. If
+ * we are reading undo only for a page then follow the blkprev chain
+ * of the page. Otherwise, calculate the previous undo record pointer
+ * using transaction's current undo record pointer and the prevlen. If
+ * undo record has a valid uur_prevurp, this is the case of log switch
+ * during the transaction so we can directly use uur_prevurp as our
+ * previous undo record pointer of the transaction.
+ */
+ if (one_page)
+ urecptr = uur->uur_prevundo;
+ else if (uur->uur_logswitch)
+ urecptr = uur->uur_logswitch->urec_prevurp;
+ else if (prev_urec_ptr == to_urecptr ||
+ uur->uur_info & UREC_INFO_TRANSACTION)
+ urecptr = InvalidUndoRecPtr;
+ else
+ urecptr = UndoGetPrevUndoRecptr(prev_urec_ptr, buffer, category);
+
+ /* We have consumed all elements of the urp_array so expand its size. */
+ if (urp_index >= urp_array_size)
+ {
+ urp_array_size *= 2;
+ urp_array =
+ repalloc(urp_array, sizeof(UndoRecInfo) * urp_array_size);
+ }
+
+ /* Add entry in the urp_array */
+ urp_array[urp_index].index = urp_index;
+ urp_array[urp_index].urp = prev_urec_ptr;
+ urp_array[urp_index].uur = uur;
+ urp_index++;
+
+ /* We have fetched all the undo records for the transaction. */
+ if (!UndoRecPtrIsValid(urecptr) || (prev_urec_ptr == to_urecptr))
+ break;
+
+ size = UnpackedUndoRecordSize(uur);
+ total_size += size;
+
+ /*
+ * Including current record, if we have crossed the memory limit or
+ * undo log got switched then stop processing more records. Remember
+ * to set the from_urecptr so that on next call we can resume fetching
+ * undo records where we left it.
+ */
+ if (total_size >= undo_apply_size || uur->uur_logswitch)
+ {
+ *from_urecptr = urecptr;
+ break;
+ }
+ }
+
+ /* Release the last buffer. */
+ if (BufferIsValid(buffer))
+ ReleaseBuffer(buffer);
+
+ *nrecords = urp_index;
+
+ return urp_array;
+}
+
+/*
+ * Register the undo buffers.
+ */
+void
+RegisterUndoLogBuffers(UndoRecordInsertContext *context, uint8 first_block_id)
+{
+ int idx;
+ int flags;
+
+ for (idx = 0; idx < context->nprepared_undo_buffer; idx++)
+ {
+ flags = context->prepared_undo_buffers[idx].zero
+ ? REGBUF_KEEP_DATA_AFTER_CP | REGBUF_WILL_INIT
+ : REGBUF_KEEP_DATA_AFTER_CP;
+ XLogRegisterBuffer(first_block_id + idx,
+ context->prepared_undo_buffers[idx].buf, flags);
+ UndoLogRegister(&context->alloc_context, first_block_id + idx,
+ context->prepared_undo_buffers[idx].logno);
+ }
+}
+
+/*
+ * Set LSN on undo page.
+*/
+void
+UndoLogBuffersSetLSN(UndoRecordInsertContext *context, XLogRecPtr recptr)
+{
+ int idx;
+
+ for (idx = 0; idx < context->nprepared_undo_buffer; idx++)
+ PageSetLSN(BufferGetPage(context->prepared_undo_buffers[idx].buf),
+ recptr);
+}
+
+/*
+ * Read length of the previous undo record.
+ *
+ * This function will take an undo record pointer as an input and read the
+ * length of the previous undo record which is stored at the end of the previous
+ * undo record. If the undo record is split then this will add the undo block
+ * header size in the total length.
+ */
+static uint16
+UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer,
+ UndoLogCategory category)
+{
+ UndoLogOffset page_offset = UndoRecPtrGetPageOffset(urp);
+ BlockNumber cur_blk = UndoRecPtrGetBlockNum(urp);
+ Buffer buffer = input_buffer;
+ Page page = NULL;
+ char *pagedata = NULL;
+ char prevlen[2];
+ RelFileNode rnode;
+ int byte_to_read = sizeof(uint16);
+ char persistence;
+ uint16 prev_rec_len = 0;
+
+ /* Get relfilenode. */
+ UndoRecPtrAssignRelFileNode(rnode, urp);
+ persistence = RelPersistenceForUndoLogCategory(category);
+
+ if (BufferIsValid(buffer))
+ {
+ page = BufferGetPage(buffer);
+ pagedata = (char *) page;
+ }
+
+ /*
+ * Length if the previous undo record is store at the end of that record
+ * so just fetch last 2 bytes.
+ */
+ while (byte_to_read > 0)
+ {
+ /* Read buffer if the current buffer is not valid. */
+ if (!BufferIsValid(buffer))
+ {
+ buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum,
+ cur_blk, RBM_NORMAL, NULL,
+ persistence);
+
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+ page = BufferGetPage(buffer);
+ pagedata = (char *) page;
+ }
+
+ page_offset -= 1;
+
+ /*
+ * Read current prevlen byte from current block if page_offset hasn't
+ * reach to undo block header. Otherwise, go to the previous block
+ * and continue reading from there.
+ */
+ if (page_offset >= UndoLogBlockHeaderSize)
+ {
+ prevlen[byte_to_read - 1] = pagedata[page_offset];
+ byte_to_read -= 1;
+ }
+ else
+ {
+ /*
+ * Release the current buffer if it is not provide by the caller.
+ */
+ if (input_buffer != buffer)
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * Could not read complete prevlen from the current block so go to
+ * the previous block and start reading from end of the block.
+ */
+ cur_blk -= 1;
+ page_offset = BLCKSZ;
+
+ /*
+ * Reset buffer so that we can read it again for the previous
+ * block.
+ */
+ buffer = InvalidBuffer;
+ }
+ }
+
+ prev_rec_len = *(uint16 *) (prevlen);
+
+ /*
+ * If previous undo record is not completely stored in this page then add
+ * UndoLogBlockHeaderSize in total length so that the call can use this
+ * length to compute the undo record pointer of the previous undo record.
+ */
+ if (UndoRecPtrGetPageOffset(urp) - UndoLogBlockHeaderSize < prev_rec_len)
+ prev_rec_len += UndoLogBlockHeaderSize;
+
+ /* Release the buffer if we have locally read it. */
+ if (input_buffer != buffer)
+ UnlockReleaseBuffer(buffer);
+
+ return prev_rec_len;
+}
+
+/*
+ * Calculate the previous undo record pointer for the transaction.
+ *
+ * This will take current undo record pointer of the transaction as an input
+ * and calculate the previous undo record pointer of the transaction.
+ */
+UndoRecPtr
+UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
+ UndoLogCategory category)
+{
+ UndoLogNumber logno = UndoRecPtrGetLogNo(urp);
+ UndoLogOffset offset = UndoRecPtrGetOffset(urp);
+ uint16 prevlen;
+
+ /* Read length of the previous undo record. */
+ prevlen = UndoGetPrevRecordLen(urp, buffer, category);
+
+ /* calculate the previous undo record pointer */
+ return MakeUndoRecPtr(logno, offset - prevlen);
+}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.c
+ * encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorecord.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/subtrans.h"
+#include "access/undorecord.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+
+/* Prototypes for static functions. */
+static bool InsertUndoBytes(char *sourceptr, int sourcelen,
+ char **writeptr, char *endptr,
+ int *total_bytes_written, int *partial_write);
+static bool ReadUndoBytes(char *destptr, int readlen,
+ char **readptr, char *endptr,
+ int *total_bytes_read, int *partial_read);
+
+ /*
+ * Compute the header size of the undo record.
+ */
+Size
+UndoRecordHeaderSize(uint16 uur_info)
+{
+ Size size;
+
+ /* Add fixed header size. */
+ size = SizeOfUndoRecordHeader;
+
+ /* Add size of transaction header if it presets. */
+ if ((uur_info & UREC_INFO_TRANSACTION) != 0)
+ size += SizeOfUndoRecordTransaction;
+
+ /* Add size of rmid if it presets. */
+ if ((uur_info & UREC_INFO_RMID) != 0)
+ size += sizeof(RmgrId);
+
+ /* Add size of reloid if it presets. */
+ if ((uur_info & UREC_INFO_RELOID) != 0)
+ size += sizeof(Oid);
+
+ /* Add size of fxid if it presets. */
+ if ((uur_info & UREC_INFO_XID) != 0)
+ size += sizeof(FullTransactionId);
+
+ /* Add size of cid if it presets. */
+ if ((uur_info & UREC_INFO_CID) != 0)
+ size += sizeof(CommandId);
+
+ /* Add size of forknum if it presets. */
+ if ((uur_info & UREC_INFO_FORK) != 0)
+ size += sizeof(ForkNumber);
+
+ /* Add size of prevundo if it presets. */
+ if ((uur_info & UREC_INFO_PREVUNDO) != 0)
+ size += sizeof(UndoRecPtr);
+
+ /* Add size of the block header if it presets. */
+ if ((uur_info & UREC_INFO_BLOCK) != 0)
+ size += SizeOfUndoRecordBlock;
+
+ /* Add size of the log switch header if it presets. */
+ if ((uur_info & UREC_INFO_LOGSWITCH) != 0)
+ size += SizeOfUndoRecordLogSwitch;
+
+ /* Add size of the payload header if it presets. */
+ if ((uur_info & UREC_INFO_PAYLOAD) != 0)
+ size += SizeOfUndoRecordPayload;
+
+ return size;
+}
+
+/*
+ * Compute and return the expected size of an undo record.
+ */
+Size
+UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+{
+ Size size;
+
+ /* Header size. */
+ size = UndoRecordHeaderSize(uur->uur_info);
+
+ /* Payload data size. */
+ if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ size += uur->uur_payload.len;
+ size += uur->uur_tuple.len;
+ }
+
+ /* Add undo record length size. */
+ size += sizeof(uint16);
+
+ return size;
+}
+
+/*
+ * Calculate the size of the undo record stored on the page.
+ */
+static inline Size
+UndoRecordSizeOnPage(char *page_ptr)
+{
+ uint16 uur_info = ((UndoRecordHeader *) page_ptr)->urec_info;
+ Size size;
+
+ /* Header size. */
+ size = UndoRecordHeaderSize(uur_info);
+
+ /* Payload data size. */
+ if ((uur_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ UndoRecordPayload *payload = (UndoRecordPayload *) (page_ptr + size);
+
+ size += payload->urec_payload_len;
+ size += payload->urec_tuple_len;
+ }
+
+ return size;
+}
+
+/*
+ * Compute size of the Unpacked undo record in memory
+ */
+Size
+UnpackedUndoRecordSize(UnpackedUndoRecord *uur)
+{
+ Size size;
+
+ size = sizeof(UnpackedUndoRecord);
+
+ /* Add payload size if record contains payload data. */
+ if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ size += uur->uur_payload.len;
+ size += uur->uur_tuple.len;
+ }
+
+ return size;
+}
+
+/*
+ * Initiate inserting an undo record.
+ *
+ * This function will initialize the context for inserting and undo record
+ * which will be inserted by calling InsertUndoData.
+ */
+void
+BeginInsertUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur)
+{
+ ucontext->stage = UNDO_PACK_STAGE_HEADER;
+ ucontext->already_processed = 0;
+ ucontext->partial_bytes = 0;
+
+ /* Copy undo record header. */
+ ucontext->urec_hd.urec_type = uur->uur_type;
+ ucontext->urec_hd.urec_info = uur->uur_info;
+
+ /* Copy undo record transaction header if it is present. */
+ if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+ memcpy(&ucontext->urec_txn, uur->uur_txn, SizeOfUndoRecordTransaction);
+
+ /* Copy rmid if present. */
+ if ((uur->uur_info & UREC_INFO_RMID) != 0)
+ ucontext->urec_rmid = uur->uur_rmid;
+
+ /* Copy reloid if present. */
+ if ((uur->uur_info & UREC_INFO_RELOID) != 0)
+ ucontext->urec_reloid = uur->uur_reloid;
+
+ /* Copy fxid if present. */
+ if ((uur->uur_info & UREC_INFO_XID) != 0)
+ ucontext->urec_fxid = uur->uur_fxid;
+
+ /* Copy cid if present. */
+ if ((uur->uur_info & UREC_INFO_CID) != 0)
+ ucontext->urec_cid = uur->uur_cid;
+
+ /* Copy undo record relation header if it is present. */
+ if ((uur->uur_info & UREC_INFO_FORK) != 0)
+ ucontext->urec_fork = uur->uur_fork;
+
+ /* Copy prev undo record pointer if it is present. */
+ if ((uur->uur_info & UREC_INFO_PREVUNDO) != 0)
+ ucontext->urec_prevundo = uur->uur_prevundo;
+
+ /* Copy undo record block header if it is present. */
+ if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+ {
+ ucontext->urec_blk.urec_block = uur->uur_block;
+ ucontext->urec_blk.urec_offset = uur->uur_offset;
+ }
+
+ /* Copy undo record log switch header if it is present. */
+ if ((uur->uur_info & UREC_INFO_LOGSWITCH) != 0)
+ memcpy(&ucontext->urec_logswitch, uur->uur_logswitch,
+ SizeOfUndoRecordLogSwitch);
+
+ /* Copy undo record payload header and data if it is present. */
+ if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ ucontext->urec_payload.urec_payload_len = uur->uur_payload.len;
+ ucontext->urec_payload.urec_tuple_len = uur->uur_tuple.len;
+ ucontext->urec_payloaddata = uur->uur_payload.data;
+ ucontext->urec_tupledata = uur->uur_tuple.data;
+ }
+ else
+ {
+ ucontext->urec_payload.urec_payload_len = 0;
+ ucontext->urec_payload.urec_tuple_len = 0;
+ }
+
+ /* Compute undo record expected size and store in the context. */
+ ucontext->undo_len = UndoRecordExpectedSize(uur);
+}
+
+/*
+ * Insert the undo record into the input page from the unpack undo context.
+ *
+ * Caller can call this function multiple times until desired stage is reached.
+ * This will write the undo record into the page.
+ */
+void
+InsertUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
+{
+ char *writeptr = (char *) page + starting_byte;
+ char *endptr = (char *) page + BLCKSZ;
+
+ switch (ucontext->stage)
+ {
+ case UNDO_PACK_STAGE_HEADER:
+ /* Insert undo record header. */
+ if (!InsertUndoBytes((char *) &ucontext->urec_hd,
+ SizeOfUndoRecordHeader, &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_TRANSACTION:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+ {
+ /* Insert undo record transaction header. */
+ if (!InsertUndoBytes((char *) &ucontext->urec_txn,
+ SizeOfUndoRecordTransaction,
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_RMID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_RMID:
+ /* Write rmid(if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_RMID) != 0)
+ {
+ if (!InsertUndoBytes((char *) &(ucontext->urec_rmid), sizeof(RmgrId),
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_RELOID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_RELOID:
+ /* Write reloid(if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_RELOID) != 0)
+ {
+ if (!InsertUndoBytes((char *) &(ucontext->urec_reloid), sizeof(Oid),
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_XID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_XID:
+ /* Write xid(if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_XID) != 0)
+ {
+ if (!InsertUndoBytes((char *) &(ucontext->urec_fxid), sizeof(FullTransactionId),
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_CID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_CID:
+ /* Write cid(if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_CID) != 0)
+ {
+ if (!InsertUndoBytes((char *) &(ucontext->urec_cid), sizeof(CommandId),
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_FORKNUM:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+ {
+ /* Insert undo record fork number. */
+ if (!InsertUndoBytes((char *) &ucontext->urec_fork,
+ sizeof(ForkNumber),
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_PREVUNDO;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_PREVUNDO:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_PREVUNDO) != 0)
+ {
+ /* Insert undo record blkprev. */
+ if (!InsertUndoBytes((char *) &ucontext->urec_prevundo,
+ sizeof(UndoRecPtr),
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_BLOCK:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+ {
+ /* Insert undo record block header. */
+ if (!InsertUndoBytes((char *) &ucontext->urec_blk,
+ SizeOfUndoRecordBlock,
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_LOGSWITCH;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_LOGSWITCH:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_LOGSWITCH) != 0)
+ {
+ /* Insert undo record transaction header. */
+ if (!InsertUndoBytes((char *) &ucontext->urec_logswitch,
+ SizeOfUndoRecordLogSwitch,
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_PAYLOAD:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ /* Insert undo record payload header. */
+ if (!InsertUndoBytes((char *) &ucontext->urec_payload,
+ SizeOfUndoRecordPayload,
+ &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_PAYLOAD_DATA:
+ {
+ int len = ucontext->urec_payload.urec_payload_len;
+
+ if (len > 0)
+ {
+ /* Insert payload data. */
+ if (!InsertUndoBytes((char *) ucontext->urec_payloaddata,
+ len, &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+ }
+ /* fall through */
+
+ case UNDO_PACK_STAGE_TUPLE_DATA:
+ {
+ int len = ucontext->urec_payload.urec_tuple_len;
+
+ if (len > 0)
+ {
+ /* Insert tuple data. */
+ if (!InsertUndoBytes((char *) ucontext->urec_tupledata,
+ len, &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_UNDO_LENGTH;
+ }
+ /* fall through */
+
+ case UNDO_PACK_STAGE_UNDO_LENGTH:
+ /* Insert undo length. */
+ if (!InsertUndoBytes((char *) &ucontext->undo_len,
+ sizeof(uint16), &writeptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+
+ ucontext->stage = UNDO_PACK_STAGE_DONE;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_DONE:
+ /* Nothing to be done. */
+ break;
+
+ default:
+ Assert(0); /* Invalid stage */
+ }
+}
+
+/*
+ * Skip inserting undo record
+ *
+ * Don't insert the actual undo record instead just update the context data
+ * so that if we need to insert the remaining partial record to the next
+ * block then we have right context.
+ */
+void
+SkipInsertingUndoData(UndoPackContext *ucontext, int bytes_to_skip)
+{
+ switch (ucontext->stage)
+ {
+ case UNDO_PACK_STAGE_HEADER:
+ if (bytes_to_skip < SizeOfUndoRecordHeader)
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= SizeOfUndoRecordHeader;
+ ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_TRANSACTION:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+ {
+ if (bytes_to_skip < SizeOfUndoRecordTransaction)
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= SizeOfUndoRecordTransaction;
+ }
+
+ ucontext->stage = UNDO_PACK_STAGE_RMID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_RMID:
+ /* Write rmid (if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_RMID) != 0)
+ {
+ if (bytes_to_skip < (sizeof(RmgrId)))
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= sizeof(RmgrId);
+ }
+ ucontext->stage = UNDO_PACK_STAGE_RELOID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_RELOID:
+ /* Write reloid (if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_RELOID) != 0)
+ {
+ if (bytes_to_skip < sizeof(Oid))
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= sizeof(Oid);
+ }
+ ucontext->stage = UNDO_PACK_STAGE_XID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_XID:
+ /* Write xid (if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_XID) != 0)
+ {
+ if (bytes_to_skip < (sizeof(TransactionId)))
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= sizeof(TransactionId);
+ }
+ ucontext->stage = UNDO_PACK_STAGE_CID;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_CID:
+ /* Write cid (if needed and not already done). */
+ if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_CID) != 0)
+ {
+ if (bytes_to_skip < sizeof(CommandId))
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= sizeof(CommandId);
+ }
+ ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_FORKNUM:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+ {
+ if (bytes_to_skip < sizeof(ForkNumber))
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= sizeof(ForkNumber);
+ }
+
+ ucontext->stage = UNDO_PACK_STAGE_PREVUNDO;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_PREVUNDO:
+ if ((ucontext->urec_hd.urec_info & UNDO_PACK_STAGE_PREVUNDO) != 0)
+ {
+ if (bytes_to_skip < sizeof(UndoRecPtr))
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= sizeof(UndoRecPtr);
+ }
+ ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_BLOCK:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+ {
+ if (bytes_to_skip < SizeOfUndoRecordBlock)
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= SizeOfUndoRecordBlock;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_LOGSWITCH;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_LOGSWITCH:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_LOGSWITCH) != 0)
+ {
+ if (bytes_to_skip < SizeOfUndoRecordLogSwitch)
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= SizeOfUndoRecordLogSwitch;
+ }
+
+ ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_PAYLOAD:
+ /* Skip payload header. */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ if (bytes_to_skip < SizeOfUndoRecordPayload)
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= SizeOfUndoRecordPayload;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_PAYLOAD_DATA:
+ if (ucontext->urec_payload.urec_payload_len > 0)
+ {
+ if (bytes_to_skip < ucontext->urec_payload.urec_payload_len)
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= ucontext->urec_payload.urec_payload_len;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_TUPLE_DATA:
+ if (ucontext->urec_payload.urec_tuple_len > 0)
+ {
+ if (bytes_to_skip < ucontext->urec_payload.urec_tuple_len)
+ {
+ ucontext->partial_bytes = bytes_to_skip;
+ return;
+ }
+ bytes_to_skip -= ucontext->urec_payload.urec_tuple_len;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_UNDO_LENGTH;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_UNDO_LENGTH:
+ ucontext->stage = UNDO_PACK_STAGE_DONE;
+ /* fall through */ ;
+
+ case UNDO_PACK_STAGE_DONE:
+ /* Nothing to be done. */
+ break;
+
+ default:
+ Assert(0); /* Invalid stage */
+ }
+}
+
+/*
+ * Write undo bytes from a particular source, but only to the extent that
+ * they weren't written previously and will fit.
+ *
+ * 'sourceptr' points to the source data, and 'sourcelen' is the length of
+ * that data in bytes.
+ *
+ * 'writeptr' points to the insertion point for these bytes, and is updated
+ * for whatever we write. The insertion point must not pass 'endptr', which
+ * represents the end of the buffer into which we are writing.
+ *
+ * 'my_bytes_written' is a pointer to the count of previous-written bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.
+ *
+ * 'total_bytes_written' points to the count of all previously-written bytes,
+ * and must it must be updated for the bytes we write.
+ *
+ * The return value is false if we ran out of space before writing all
+ * the bytes, and otherwise true.
+ */
+static bool
+InsertUndoBytes(char *sourceptr, int sourcelen, char **writeptr, char *endptr,
+ int *total_bytes_written, int *partial_write)
+{
+ int can_write;
+ int remaining;
+
+ /* Compute number of bytes we can write. */
+ remaining = sourcelen - *partial_write;
+ can_write = Min(remaining, endptr - *writeptr);
+
+ /* Bail out if no bytes can be written. */
+ if (can_write == 0)
+ return false;
+
+ /* Copy the bytes we can write. */
+ memcpy(*writeptr, sourceptr + *partial_write, can_write);
+
+ /* Update bookkeeping information. */
+ *writeptr += can_write;
+ *total_bytes_written += can_write;
+
+ /* Could not read whole data so set the partial_read. */
+ if (can_write < remaining)
+ {
+ *partial_write += can_write;
+ return false;
+ }
+
+ /* Return true only if we wrote the whole thing. */
+ *partial_write = 0;
+ return true;
+}
+
+/*
+ * Initiate unpacking an undo record.
+ *
+ * This function will initialize the context for unpacking the undo record which
+ * will be unpacked by calling UnpackUndoData.
+ */
+void
+BeginUnpackUndo(UndoPackContext *ucontext)
+{
+ ucontext->stage = UNDO_PACK_STAGE_HEADER;
+ ucontext->already_processed = 0;
+ ucontext->partial_bytes = 0;
+}
+
+/*
+ * Read the undo record from the input page to the unpack undo context.
+ *
+ * Caller can call this function multiple times until desired stage is reached.
+ * This will read the undo record from the page and store the data into unpack
+ * undo context, which can be later copied to unpacked undo record by calling
+ * FinishUnpackUndo.
+ */
+void
+UnpackUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
+{
+ char *readptr = (char *) page + starting_byte;
+ char *endptr = (char *) page + BLCKSZ;
+
+ switch (ucontext->stage)
+ {
+ case UNDO_PACK_STAGE_HEADER:
+ if (!ReadUndoBytes((char *) &ucontext->urec_hd,
+ SizeOfUndoRecordHeader, &readptr, endptr,
+ &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+ /* fall through */
+ case UNDO_PACK_STAGE_TRANSACTION:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_txn,
+ SizeOfUndoRecordTransaction,
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_RMID;
+ /* fall through */
+ case UNDO_PACK_STAGE_RMID:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_RMID) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_rmid,
+ sizeof(RmgrId),
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_RELOID;
+ /* fall through */
+ case UNDO_PACK_STAGE_RELOID:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_RELOID) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_reloid,
+ sizeof(Oid),
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_XID;
+ /* fall through */
+ case UNDO_PACK_STAGE_XID:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_XID) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_fxid,
+ sizeof(FullTransactionId),
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_CID;
+ /* fall through */
+ case UNDO_PACK_STAGE_CID:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_CID) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_cid,
+ sizeof(CommandId),
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+ /* fall through */
+ case UNDO_PACK_STAGE_FORKNUM:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_fork,
+ sizeof(ForkNumber),
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_PREVUNDO;
+ /* fall through */
+ case UNDO_PACK_STAGE_PREVUNDO:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_PREVUNDO) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_prevundo,
+ sizeof(UndoRecPtr),
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+ /* fall through */
+
+ case UNDO_PACK_STAGE_BLOCK:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_blk,
+ SizeOfUndoRecordBlock,
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_LOGSWITCH;
+ /* fall through */
+ case UNDO_PACK_STAGE_LOGSWITCH:
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_LOGSWITCH) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_logswitch,
+ SizeOfUndoRecordLogSwitch,
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+ /* fall through */
+ case UNDO_PACK_STAGE_PAYLOAD:
+ /* Read payload header. */
+ if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ if (!ReadUndoBytes((char *) &ucontext->urec_payload,
+ SizeOfUndoRecordPayload,
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+ /* fall through */
+ case UNDO_PACK_STAGE_PAYLOAD_DATA:
+ {
+ int len = ucontext->urec_payload.urec_payload_len;
+
+ /* Allocate memory for the payload data if not already done. */
+ if (len > 0)
+ {
+ if (ucontext->urec_payloaddata == NULL)
+ ucontext->urec_payloaddata = (char *) palloc(len);
+
+ /* Read payload data. */
+ if (!ReadUndoBytes((char *) ucontext->urec_payloaddata, len,
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+ ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+ /* fall through */
+ }
+ case UNDO_PACK_STAGE_TUPLE_DATA:
+ {
+ int len = ucontext->urec_payload.urec_tuple_len;
+
+ /* Allocate memory for the tuple data if not already done. */
+ if (len > 0)
+ {
+ if (ucontext->urec_tupledata == NULL)
+ ucontext->urec_tupledata = (char *) palloc(len);
+
+ /* Read tuple data. */
+ if (!ReadUndoBytes((char *) ucontext->urec_tupledata, len,
+ &readptr, endptr, &ucontext->already_processed,
+ &ucontext->partial_bytes))
+ return;
+ }
+
+ ucontext->stage = UNDO_PACK_STAGE_DONE;
+ /* fall through */
+ }
+ case UNDO_PACK_STAGE_DONE:
+ /* Nothing to be done. */
+ break;
+ default:
+ Assert(0); /* Invalid stage */
+ }
+
+ return;
+}
+
+/*
+ * Final step of unpacking the undo record.
+ *
+ * Copy the undo record data from the unpack undo context to the input unpacked
+ * undo record.
+ */
+void
+FinishUnpackUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur)
+{
+ /* Copy undo record header. */
+ uur->uur_type = ucontext->urec_hd.urec_type;
+ uur->uur_info = ucontext->urec_hd.urec_info;
+
+ /* Copy undo record transaction header if it is present. */
+ if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+ {
+ uur->uur_txn = palloc(SizeOfUndoRecordTransaction);
+ memcpy(uur->uur_txn, &ucontext->urec_txn, SizeOfUndoRecordTransaction);
+ }
+
+ /*
+ * Copy the common field. All of these field must present in the final
+ * unpacked undo record.
+ */
+ Assert((uur->uur_info & UREC_INFO_PAGE_COMMON) == UREC_INFO_PAGE_COMMON);
+
+ uur->uur_rmid = ucontext->urec_rmid;
+ uur->uur_reloid = ucontext->urec_reloid;
+ uur->uur_fxid = ucontext->urec_fxid;
+ uur->uur_cid = ucontext->urec_cid;
+
+ /* Copy undo record relation header if it is present. */
+ if ((uur->uur_info & UREC_INFO_FORK) != 0)
+ uur->uur_fork = ucontext->urec_fork;
+
+ /* Copy previous undo record pointer if it is present. */
+ if ((uur->uur_info & UREC_INFO_PREVUNDO) != 0)
+ uur->uur_prevundo = ucontext->urec_prevundo;
+
+ /* Copy undo record block header if it is present. */
+ if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+ {
+ uur->uur_block = ucontext->urec_blk.urec_block;
+ uur->uur_offset = ucontext->urec_blk.urec_offset;
+ }
+
+ /* Copy undo record log switch header if it is present. */
+ if ((uur->uur_info & UREC_INFO_LOGSWITCH) != 0)
+ {
+ uur->uur_logswitch = palloc(SizeOfUndoRecordLogSwitch);
+ memcpy(uur->uur_logswitch, &ucontext->urec_logswitch,
+ SizeOfUndoRecordLogSwitch);
+ }
+
+ /* Copy undo record payload header and data if it is present. */
+ if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+ {
+ uur->uur_payload.len = ucontext->urec_payload.urec_payload_len;
+ uur->uur_tuple.len = ucontext->urec_payload.urec_tuple_len;
+
+ /* Read payload data if its length is not 0. */
+ if (uur->uur_payload.len != 0)
+ uur->uur_payload.data = ucontext->urec_payloaddata;
+
+ /* Read tuple data if its length is not 0. */
+ if (uur->uur_tuple.len != 0)
+ uur->uur_tuple.data = ucontext->urec_tupledata;
+ }
+}
+
+/*
+ * Read undo bytes into a particular destination,
+ *
+ * 'destptr' points to the source data, and 'readlen' is the length of
+ * that data to be read in bytes.
+ *
+ * 'readptr' points to the read point for these bytes, and is updated
+ * for how much we read. The read point must not pass 'endptr', which
+ * represents the end of the buffer from which we are reading.
+ *
+ * 'partial_read' is a pointer to the count of previous partial read bytes
+ *
+ * 'total_bytes_read' points to the count of all previously-read bytes,
+ * and must likewise be updated for the bytes we read.
+ *
+ * nocopy if this flag is set true then it will just skip the readlen
+ * size in undo but it will not copy into the buffer.
+ *
+ * The return value is false if we ran out of space before read all
+ * the bytes, and otherwise true.
+ */
+static bool
+ReadUndoBytes(char *destptr, int readlen, char **readptr, char *endptr,
+ int *total_bytes_read, int *partial_read)
+{
+ int can_read;
+ int remaining;
+
+ /* Compute number of bytes we can read. */
+ remaining = readlen - *partial_read;
+ can_read = Min(remaining, endptr - *readptr);
+
+ /* Bail out if no bytes can be read. */
+ if (can_read == 0)
+ return false;
+
+ /* Copy the bytes we can read. */
+ memcpy(destptr + *partial_read, *readptr, can_read);
+
+ /* Update bookkeeping information. */
+ *readptr += can_read;
+ *total_bytes_read += can_read;
+
+ /* Could not read whole data so set the partial_read. */
+ if (can_read < remaining)
+ {
+ *partial_read += can_read;
+ return false;
+ }
+
+ /* Return true only if we wrote the whole thing. */
+ *partial_read = 0;
+
+ return true;
+}
+
+/*
+ * Set uur_info for an UnpackedUndoRecord appropriately based on which
+ * fields are set.
+ *
+ * Other flags i.e UREC_INFO_TRANSACTION, UREC_INFO_PAGE_COMMON and,
+ * UREC_INFO_LOGSWITCH are directly set by the PrepareUndoInsert function.
+ */
+void
+UndoRecordSetInfo(UnpackedUndoRecord *uur)
+{
+ /*
+ * If fork number is not the main fork then we need to store it in the
+ * undo record so set the flag.
+ */
+ if (uur->uur_fork != MAIN_FORKNUM)
+ uur->uur_info |= UREC_INFO_FORK;
+
+ /* If prevundo is valid undo record pointer then set the flag. */
+ if (uur->uur_prevundo != InvalidUndoRecPtr)
+ uur->uur_info |= UREC_INFO_PREVUNDO;
+
+ /* If the block number is valid then set the flag for the block header. */
+ if (uur->uur_block != InvalidBlockNumber)
+ uur->uur_info |= UREC_INFO_BLOCK;
+
+ /*
+ * Either of the payload or the tuple length is non-zero then we need the
+ * payload header.
+ */
+ if (uur->uur_payload.len || uur->uur_tuple.len)
+ uur->uur_info |= UREC_INFO_PAYLOAD;
+}
/* p->pd_prune_xid = InvalidTransactionId; done by above MemSet */
}
+/*
+ * UndoPageInit
+ * Initializes the contents of an undo page.
+ * Note that we don't calculate an initial checksum here; that's not done
+ * until it's time to write.
+ */
+void
+UndoPageInit(Page page, Size pageSize, uint16 uur_info, uint16 record_offset,
+ uint16 tuple_len, uint16 payload_len)
+{
+ UndoPageHeader p = (UndoPageHeader) page;
+
+ Assert(pageSize == BLCKSZ);
+
+ /* Make sure all fields of page are zero, as well as unused space. */
+ MemSet(p, 0, pageSize);
+
+ p->pd_flags = 0;
+ p->pd_lower = SizeOfUndoPageHeaderData;
+ p->pd_upper = pageSize;
+ p->pd_special = pageSize;
+ p->uur_info = uur_info;
+ p->record_offset = record_offset;
+ p->tuple_len = tuple_len;
+ p->payload_len = payload_len;
+ PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
+}
+
/*
* PageIsVerified
#define EpochFromFullTransactionId(x) ((uint32) ((x).value >> 32))
#define XidFromFullTransactionId(x) ((uint32) (x).value)
#define U64FromFullTransactionId(x) ((x).value)
+#define FullTransactionIdEquals(a, b) ((a).value == (b).value)
#define FullTransactionIdPrecedes(a, b) ((a).value < (b).value)
#define FullTransactionIdIsValid(x) TransactionIdIsValid(XidFromFullTransactionId(x))
#define InvalidFullTransactionId FullTransactionIdFromEpochAndXid(0, InvalidTransactionId)
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undoaccess.h
+ * entry points for inserting/fetching undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoaccess.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOACCESS_H
+#define UNDOACCESS_H
+
+#include "access/undolog.h"
+#include "access/undorecord.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+
+/*
+ * XXX Do we want to support undo tuple size which is more than the BLCKSZ
+ * if not than undo record can spread across 2 buffers at the max.
+ */
+#define MAX_BUFFER_PER_UNDO 2
+
+/*
+ * Maximum number of the XactUndoRecordInfo for updating the transaction header.
+ * Usually it's 1 for updating next link of previous transaction's header
+ * if we are starting a new transaction. But, in some cases where the same
+ * transaction is spilled to the next log, we update our own transaction's
+ * header in previous undo log as well as the header of the previous transaction
+ * in the new log.
+ */
+#define MAX_XACT_UNDO_INFO 2
+
+typedef struct PreparedUndoSpace PreparedUndoSpace;
+typedef struct PreparedUndoBuffer PreparedUndoBuffer;
+
+/*
+ * Undo record element. Used for storing the group of undo record in a array
+ * using UndoBulkFetchRecord.
+ */
+typedef struct UndoRecInfo
+{
+ int index; /* Index of the element. For stable qsort. */
+ UndoRecPtr urp; /* undo recptr (undo record location). */
+ UnpackedUndoRecord *uur; /* actual undo record. */
+} UndoRecInfo;
+
+/*
+ * This structure holds the informations for updating the transaction's undo
+ * record header (first undo record of the transaction). We need to update the
+ * transaction header for various purposes a) updating the next undo record
+ * pointer for maintaining the transactions chain inside a undo log
+ * b) updating the undo apply progress in the transaction header. During
+ * prepare phase we will keep all the information handy in this structure and
+ * that will be used for updating the actual record inside the critical section.
+ */
+typedef struct XactUndoRecordInfo
+{
+ UndoRecPtr urecptr; /* Undo record pointer to be updated. */
+ uint32 offset; /* offset in page where to start updating. */
+ UndoRecPtr next; /* first urp of the next transaction which is
+ * be updated in transaction header */
+ BlockNumber progress; /* undo apply action progress. */
+ int idx_undo_buffers[MAX_BUFFER_PER_UNDO];
+} XactUndoRecordInfo;
+
+/*
+ * Context for preparing and inserting undo records..
+ */
+typedef struct UndoRecordInsertContext
+{
+ UndoLogAllocContext alloc_context;
+ PreparedUndoSpace *prepared_undo; /* prepared undo. */
+ PreparedUndoBuffer *prepared_undo_buffers; /* Buffers for prepared undo. */
+ XactUndoRecordInfo xact_urec_info[MAX_XACT_UNDO_INFO]; /* Information for
+ * Updating transaction
+ * header. */
+ UndoCompressionInfo undo_compression_info[UndoLogCategories]; /* Compression info. */
+ int nprepared_undo; /* Number of prepared undo records. */
+ int max_prepared_undo; /* Max prepared undo for this operation. */
+ int nprepared_undo_buffer; /* Number of undo buffers. */
+ int nxact_urec_info; /* Number of previous xact info. */
+} UndoRecordInsertContext;
+
+/*
+ * Context for fetching the required undo record.
+ */
+typedef struct UndoRecordFetchContext
+{
+ Buffer buffer; /* Previous undo record pinned buffer. */
+ UndoRecPtr urp; /* Previous undo record pointer. */
+} UndoRecordFetchContext;
+
+extern void BeginUndoRecordInsert(UndoRecordInsertContext *context,
+ UndoLogCategory category,
+ int nprepared,
+ XLogReaderState *xlog_record);
+extern UndoRecPtr PrepareUndoInsert(UndoRecordInsertContext *context,
+ UnpackedUndoRecord *urec, Oid dbid);
+extern void InsertPreparedUndo(UndoRecordInsertContext *context);
+extern void FinishUndoRecordInsert(UndoRecordInsertContext *context);
+extern void BeginUndoFetch(UndoRecordFetchContext *context);
+extern UnpackedUndoRecord *UndoFetchRecord(UndoRecordFetchContext *context,
+ UndoRecPtr urp);
+extern void FinishUndoFetch(UndoRecordFetchContext *context);
+extern void UndoRecordRelease(UnpackedUndoRecord *urec);
+extern UndoRecInfo *UndoBulkFetchRecord(UndoRecPtr *from_urecptr,
+ UndoRecPtr to_urecptr,
+ int undo_apply_size, int *nrecords,
+ bool one_page);
+extern void RegisterUndoLogBuffers(UndoRecordInsertContext *context,
+ uint8 first_block_id);
+extern void UndoLogBuffersSetLSN(UndoRecordInsertContext *context,
+ XLogRecPtr recptr);
+extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
+ UndoLogCategory category);
+
+#endif /* UNDOINSERT_H */
(((uint64) (logno) << UndoLogOffsetBits) | (offset))
/* The number of unusable bytes in the header of each block. */
-#define UndoLogBlockHeaderSize SizeOfPageHeaderData
+#define UndoLogBlockHeaderSize SizeOfUndoPageHeaderData
/* The number of usable bytes we can store per block. */
#define UndoLogUsableBytesPerPage (BLCKSZ - UndoLogBlockHeaderSize)
#define UndoRecPtrGetPageOffset(urp) \
(UndoRecPtrGetOffset(urp) % BLCKSZ)
+/* Compute the undo record pointer offset given the undo rec page offset and the block number. */
+#define UndoRecPageOffsetGetRecPtr(offset, blkno) \
+ ((blkno * BLCKSZ) + offset)
+
/* Compare two undo checkpoint files to find the oldest file. */
#define UndoCheckPointFilenamePrecedes(file1, file2) \
(strcmp(file1, file2) < 0)
*/
typedef struct UndoLogUnloggedMetaData
{
- UndoLogOffset insert; /* next insertion point (head) */
+ UndoLogOffset insert; /* next insertion point (head) */
UndoLogOffset last_xact_start; /* last transaction's first byte in this log */
UndoLogOffset this_xact_start; /* this transaction's first byte in this log */
TransactionId xid; /* currently attached/writing xid */
static pg_attribute_always_inline UndoLogTableEntry *
UndoLogGetTableEntry(UndoLogNumber logno)
{
- UndoLogTableEntry *entry;
+ UndoLogTableEntry *entry;
/* Fast path. */
entry = undologtable_lookup(undologtable_cache, logno);
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.h
+ * encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorecord.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDORECORD_H
+#define UNDORECORD_H
+
+#include "access/undolog.h"
+#include "access/transam.h"
+#include "lib/stringinfo.h"
+#include "storage/block.h"
+#include "storage/bufpage.h"
+#include "storage/buf.h"
+#include "storage/off.h"
+
+/*
+ * The below common information will be stored in the first undo record of the page.
+ * Every subsequent undo record will not store this information, if required this information
+ * will be retrieved from the first undo record of the page.
+ */
+typedef struct UndoCompressionInfo
+{
+ bool valid; /* Undo compression info is valid ? */
+ UndoRecPtr last_urecptr; /* last undo rec */
+ FullTransactionId fxid; /* transaction id */
+ RmgrId rmid; /* rmgr ID */
+ Oid reloid; /* relation OID */
+ CommandId cid; /* command id */
+} UndoCompressionInfo;
+
+/*
+ * If UREC_INFO_TRANSACTION is set, an UndoRecordTransaction structure
+ * follows.
+ * If UREC_INFO_RMID is set, rmgr id follows.
+ * if UREC_INFO_RELOID is set, relation oid follows.
+ * If UREC_INFO_XID is set, full transaction id follows.
+ * If UREC_INFO_CID is set, command id follows.
+ * If UREC_INFO_FORK is set, fork number follows.
+ * If UREC_INFO_PREVUNDO is set, previous undo record pointer follows.
+ * If UREC_INFO_BLOCK is set, an UndoRecordBlock structure follows.
+ * If UREC_INFO_LOGSWITCH is set, an UndoRecordLogSwitch structure follows.
+ * If UREC_INFO_PAYLOAD is set, an UndoRecordPayload structure follows.
+ *
+ * When (as will often be the case) multiple structures are present, they
+ * appear in the same order in which the constants are defined here. That is,
+ * UndoRecordTransaction appears first.
+ */
+#define UREC_INFO_TRANSACTION 0x001
+#define UREC_INFO_RMID 0x002
+#define UREC_INFO_RELOID 0x004
+#define UREC_INFO_XID 0x008
+#define UREC_INFO_CID 0x010
+#define UREC_INFO_FORK 0x020
+#define UREC_INFO_PREVUNDO 0x040
+#define UREC_INFO_BLOCK 0x080
+#define UREC_INFO_LOGSWITCH 0x100
+#define UREC_INFO_PAYLOAD 0x200
+
+#define UREC_INFO_PAGE_COMMON (UREC_INFO_RMID | UREC_INFO_RELOID | UREC_INFO_XID | UREC_INFO_CID)
+
+/*
+ * Every undo record begins with an UndoRecordHeader structure, which is
+ * followed by the additional structures indicated by the contents of
+ * urec_info. All structures are packed into the alignment without padding
+ * bytes, and the undo record itself need not be aligned either, so care
+ * must be taken when reading the header.
+ */
+typedef struct UndoRecordHeader
+{
+ uint8 urec_type; /* record type code */
+ uint16 urec_info; /* flag bits */
+} UndoRecordHeader;
+
+#define SizeOfUndoRecordHeader \
+ (offsetof(UndoRecordHeader, urec_info) + sizeof(uint16))
+
+/*
+ * Information for a transaction to which this undo belongs. This
+ * also stores the dbid and the progress of the undo apply during rollback.
+ */
+typedef struct UndoRecordTransaction
+{
+ /*
+ * Undo block number where we need to start reading the undo for applying
+ * the undo action. InvalidBlockNumber means undo applying hasn't
+ * started for the transaction and MaxBlockNumber mean undo completely
+ * applied. And, any other block number means we have applied partial undo
+ * so next we can start from this block.
+ */
+ BlockNumber urec_progress;
+ Oid urec_dbid; /* database id */
+ UndoRecPtr urec_next; /* urec pointer of the next transaction */
+} UndoRecordTransaction;
+
+#define SizeOfUndoRecordTransaction \
+ (offsetof(UndoRecordTransaction, urec_next) + sizeof(UndoRecPtr))
+
+/*
+ * Information for a block to which this record pertains.
+ */
+typedef struct UndoRecordBlock
+{
+ BlockNumber urec_block; /* block number */
+ OffsetNumber urec_offset; /* offset number */
+} UndoRecordBlock;
+
+#define SizeOfUndoRecordBlock \
+ (offsetof(UndoRecordBlock, urec_offset) + sizeof(OffsetNumber))
+
+/*
+ * Information of the transaction's undo in the previous log. If a transaction
+ * is split across the undo logs then this header will be included in the first
+ * undo record of the transaction in next log.
+ */
+typedef struct UndoRecordLogSwitch
+{
+ UndoRecPtr urec_prevurp; /* Transaction's last undo record pointer in
+ * the previous undo log. */
+ UndoRecPtr urec_prevlogstart; /* Transaction's first undo record pointer
+ * in previous undo log. */
+} UndoRecordLogSwitch;
+
+#define SizeOfUndoRecordLogSwitch \
+ (offsetof(UndoRecordLogSwitch, urec_prevlogstart) + sizeof(UndoRecPtr))
+
+/*
+ * Information about the amount of payload data and tuple data present
+ * in this record. The payload bytes immediately follow the structures
+ * specified by flag bits in urec_info, and the tuple bytes follow the
+ * payload bytes.
+ */
+typedef struct UndoRecordPayload
+{
+ uint16 urec_payload_len; /* # of payload bytes */
+ uint16 urec_tuple_len; /* # of tuple bytes */
+} UndoRecordPayload;
+
+#define SizeOfUndoRecordPayload \
+ (offsetof(UndoRecordPayload, urec_tuple_len) + sizeof(uint16))
+
+typedef enum UndoPackStage
+{
+ UNDO_PACK_STAGE_HEADER, /* We have not yet processed even the record
+ * header; we need to do that next. */
+ UNDO_PACK_STAGE_TRANSACTION, /* The next thing to be processed is the
+ * transaction details, if present. */
+ UNDO_PACK_STAGE_RMID, /* The next thing to be processed is the rmid
+ * if present */
+
+ UNDO_PACK_STAGE_RELOID, /* The next thing to be processed is the
+ * reloid if present */
+
+ UNDO_PACK_STAGE_XID, /* The next thing to be processed is the xid
+ * if present */
+
+ UNDO_PACK_STAGE_CID, /* The next thing to be processed is the cid
+ * if present */
+
+ UNDO_PACK_STAGE_FORKNUM, /* The next thing to be processed is the
+ * relation fork number, if present. */
+ UNDO_PACK_STAGE_PREVUNDO, /* The next thing to be processed is the prev
+ * undo info. */
+
+ UNDO_PACK_STAGE_BLOCK, /* The next thing to be processed is the block
+ * details, if present. */
+ UNDO_PACK_STAGE_LOGSWITCH, /* The next thing to be processed is the log
+ * switch details. */
+ UNDO_PACK_STAGE_PAYLOAD, /* The next thing to be processed is the
+ * payload details, if present */
+ UNDO_PACK_STAGE_PAYLOAD_DATA, /* The next thing to be processed is the
+ * payload data */
+ UNDO_PACK_STAGE_TUPLE_DATA, /* The next thing to be processed is the tuple
+ * data */
+ UNDO_PACK_STAGE_UNDO_LENGTH, /* Next thing to processed is undo length. */
+
+ UNDO_PACK_STAGE_DONE /* complete */
+} UndoPackStage;
+
+/*
+ * Undo record context for inserting/unpacking undo record. This will hold
+ * intermediate state of undo record processed so far.
+ */
+typedef struct UndoPackContext
+{
+ UndoRecordHeader urec_hd; /* Main header */
+ UndoRecordTransaction urec_txn; /* Transaction header */
+
+ RmgrId urec_rmid; /* rmgrid */
+ Oid urec_reloid; /* relation OID */
+
+ /*
+ * Transaction id that has modified the tuple for which this undo record
+ * is written. We use this to skip the undo records. See comments atop
+ * function UndoFetchRecord.
+ */
+ FullTransactionId urec_fxid; /* Transaction id */
+ CommandId urec_cid; /* command id */
+
+ ForkNumber urec_fork; /* Relation fork number */
+ UndoRecPtr urec_prevundo; /* Block prev */
+ UndoRecordBlock urec_blk; /* Block header */
+ UndoRecordLogSwitch urec_logswitch; /* Log switch header */
+ UndoRecordPayload urec_payload; /* Payload data */
+ char *urec_payloaddata;
+ char *urec_tupledata;
+ uint16 undo_len; /* Length of the undo record. */
+ int already_processed; /* Number of bytes read/written so far */
+ int partial_bytes; /* Number of partial bytes read/written */
+ UndoPackStage stage; /* Undo pack stage */
+} UndoPackContext;
+
+/*
+ * Information that can be used to create an undo record or that can be
+ * extracted from one previously created. The raw undo record format is
+ * difficult to manage, so this structure provides a convenient intermediate
+ * form that is easier for callers to manage.
+ *
+ * When creating an undo record from an UnpackedUndoRecord, caller should
+ * set uur_info to 0. It will be initialized by the first call to
+ * UndoRecordSetInfo or InsertUndoRecord. We do set it in
+ * UndoRecordAllocate for transaction specific header information.
+ *
+ * When an undo record is decoded into an UnpackedUndoRecord, all fields
+ * will be initialized, but those for which no information is available
+ * will be set to invalid or default values, as appropriate.
+ */
+typedef struct UnpackedUndoRecord
+{
+ RmgrId uur_rmid; /* rmgr ID */
+ uint8 uur_type; /* record type code */
+ uint16 uur_info; /* flag bits */
+ Oid uur_reloid; /* relation OID */
+ CommandId uur_cid; /* command id */
+ ForkNumber uur_fork; /* fork number */
+ UndoRecPtr uur_prevundo; /* byte offset of previous undo for block */
+ BlockNumber uur_block; /* block number */
+ OffsetNumber uur_offset; /* offset number */
+ FullTransactionId uur_fxid; /* transaction id */
+ StringInfoData uur_payload; /* payload bytes */
+ StringInfoData uur_tuple; /* tuple bytes */
+
+ /*
+ * Below header will be internally set by the undo layer. Above this all
+ * information should be set by the caller.
+ */
+ UndoRecordTransaction *uur_txn; /* Transaction header, included in the
+ * first record of the transaction in a
+ * undo log. */
+ UndoRecordLogSwitch *uur_logswitch; /* Log switch header, included in the
+ * first record of the transaction
+ * only after undo log is switched
+ * during a transaction. */
+} UnpackedUndoRecord;
+
+extern Size UndoRecordHeaderSize(uint16 uur_info);
+extern Size UndoRecordExpectedSize(UnpackedUndoRecord *uur);
+extern Size UnpackedUndoRecordSize(UnpackedUndoRecord *uur);
+extern void BeginInsertUndo(UndoPackContext *ucontext,
+ UnpackedUndoRecord *uur);
+extern void InsertUndoData(UndoPackContext *ucontext, Page page,
+ int starting_byte);
+extern void SkipInsertingUndoData(UndoPackContext *ucontext,
+ int bytes_to_skip);
+extern void BeginUnpackUndo(UndoPackContext *ucontext);
+extern void UnpackUndoData(UndoPackContext *ucontext, Page page,
+ int starting_byte);
+extern void FinishUnpackUndo(UndoPackContext *ucontext,
+ UnpackedUndoRecord *uur);
+extern void UndoRecordSetInfo(UnpackedUndoRecord *uur);
+
+#endif /* UNDORECORD_H */
*/
#define SizeOfPageHeaderData (offsetof(PageHeaderData, pd_linp))
+/*
+ * Same as PageHeaderData + some additional information to detect partial
+ * undo record on a undo page.
+ *
+ * FIXME : for undo page do we need to keep all the information which is
+ * required for the PageHeaderData e.g. pd_lower, pd_upper, pd_special?
+ */
+typedef struct UndoPageHeaderData
+{
+ /* XXX LSN is member of *any* block, not only page-organized ones */
+ PageXLogRecPtr pd_lsn; /* LSN: next byte after last byte of xlog
+ * record for last change to this page */
+ uint16 pd_checksum; /* checksum */
+ uint16 pd_flags; /* flag bits, see below */
+ LocationIndex pd_lower; /* offset to start of free space */
+ LocationIndex pd_upper; /* offset to end of free space */
+ LocationIndex pd_special; /* offset to start of special space */
+ uint16 pd_pagesize_version;
+
+ /*
+ * Below fields required for computing the offset of the first complete
+ * record on a undo page, which will be used for the undo record compression
+ * and undo page consistency checking.
+ */
+ uint16 uur_info; /* uur_info field of the partial record. */
+ uint16 record_offset; /* offset of the partial undo record. */
+ uint16 tuple_len; /* Length of the tuple data in the partial
+ * record. */
+ uint16 payload_len; /* Length of the payload data in the partial
+ * record. */
+} UndoPageHeaderData;
+
+typedef UndoPageHeaderData *UndoPageHeader;
+
+#define SizeOfUndoPageHeaderData (offsetof(UndoPageHeaderData, payload_len) + \
+ sizeof(uint16))
+
/*
* PageIsEmpty
* returns true iff no itemid has been allocated on the page
((is_heap) ? PAI_IS_HEAP : 0))
extern void PageInit(Page page, Size pageSize, Size specialSize);
+extern void UndoPageInit(Page page, Size pageSize, uint16 uur_info,
+ uint16 record_offset, uint16 tuple_len,
+ uint16 payload_len);
extern bool PageIsVerified(Page page, BlockNumber blkno);
extern OffsetNumber PageAddItemExtended(Page page, Item item, Size size,
OffsetNumber offsetNumber, int flags);