Infrastructure to execute pending undo actions.
authorAmit Kapila <[email protected]>
Thu, 13 Jun 2019 09:57:58 +0000 (15:27 +0530)
committerKuntal Ghosh <[email protected]>
Fri, 19 Jul 2019 08:48:51 +0000 (14:18 +0530)
To apply the undo actions, we collect the undo records in bulk and try to
process them together.  We ensure to update the transaction's progress at
regular intervals so that after a crash we can skip already applied undo.

This provides a way for users to register a callback for processing the
undo records based on resource manager.

Dilip Kumar, Amit Kapila, Thomas Munro and Kuntal Ghosh with inputs from
Robert Haas

16 files changed:
src/backend/access/rmgrdesc/Makefile
src/backend/access/rmgrdesc/undoactiondesc.c [new file with mode: 0644]
src/backend/access/transam/rmgr.c
src/backend/access/undo/Makefile
src/backend/access/undo/undoaccess.c
src/backend/access/undo/undoaction.c [new file with mode: 0644]
src/backend/access/undo/undoactionxlog.c [new file with mode: 0644]
src/backend/replication/logical/decode.c
src/bin/pg_rewind/parsexlog.c
src/bin/pg_waldump/rmgrdesc.c
src/include/access/rmgr.h
src/include/access/rmgrlist.h
src/include/access/undoaccess.h
src/include/access/undoaction_xlog.h [new file with mode: 0644]
src/include/access/undorequest.h
src/include/access/xlog_internal.h

index 91ad1ef8a3da1b76bc70dbc64533a56f5ce1abd3..640d37f37a38d9e16ebd03d9ddec7752cbecde8d 100644 (file)
@@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
           gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
           mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
-          smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undologdesc.o xactdesc.o xlogdesc.o
+          smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \
+          undologdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/undoactiondesc.c b/src/backend/access/rmgrdesc/undoactiondesc.c
new file mode 100644 (file)
index 0000000..c396582
--- /dev/null
@@ -0,0 +1,47 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoactiondesc.c
+ *       rmgr descriptor routines for access/undo/undoactionxlog.c
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/access/rmgrdesc/undoactiondesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/undoaction_xlog.h"
+
+void
+undoaction_desc(StringInfo buf, XLogReaderState *record)
+{
+       char       *rec = XLogRecGetData(record);
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       if (info == XLOG_UNDO_APPLY_PROGRESS)
+       {
+               xl_undoapply_progress *xlrec = (xl_undoapply_progress *) rec;
+
+               appendStringInfo(buf, "urec_ptr %lu progress %u",
+                                                xlrec->urec_ptr, xlrec->progress);
+       }
+}
+
+const char *
+undoaction_identify(uint8 info)
+{
+       const char *id = NULL;
+
+       switch (info & ~XLR_INFO_MASK)
+       {
+               case XLOG_UNDO_APPLY_PROGRESS:
+                       id = "UNDO_APPLY_PROGRESS";
+                       break;
+       }
+
+       return id;
+}
index 8b0537405a9c3e7c7b937ff899d49f71dad5a026..c57eca240f5b91f6cf536b0298cf12d8ec7c0a30 100644 (file)
@@ -18,6 +18,7 @@
 #include "access/multixact.h"
 #include "access/nbtxlog.h"
 #include "access/spgxlog.h"
+#include "access/undoaction_xlog.h"
 #include "access/undolog_xlog.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
@@ -31,8 +32,8 @@
 #include "utils/relmapper.h"
 
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
-       { name, redo, desc, identify, startup, cleanup, mask },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \
+       { name, redo, desc, identify, startup, cleanup, mask, undo, undo_status, undo_desc },
 
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
index 73275028be9b655136896f33ec60ca30b417f5d1..68696bc81a8402f5d1ece9b0f466a35be74b77bf 100644 (file)
@@ -12,6 +12,7 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undoaccess.o undolog.o undorecord.o undorequest.o
+OBJS = undoaccess.o undoaction.o undoactionxlog.o undolog.o undorecord.o \
+          undorequest.o
 
 include $(top_srcdir)/src/backend/common.mk
index 4ffec58c5b50ecfdcbbb672e2a7bc51b8364743c..66c3175d0323ad0730696c902058a089f42a83fd 100644 (file)
@@ -84,8 +84,6 @@ static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
                                                                                        Buffer *prevbuf);
 static int     UndoRecordPrepareTransInfo(UndoRecordInsertContext *context,
                                                                           UndoRecPtr xact_urp, int size, int offset);
-static void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context,
-                                                                         int idx);
 static void UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context,
                                                                                UndoRecPtr urecptr, UndoRecPtr xact_urp);
 static int     UndoGetBufferSlot(UndoRecordInsertContext *context,
@@ -284,6 +282,41 @@ UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context,
        LWLockRelease(&slot->discard_update_lock);
 }
 
+/*
+ * Prepare to update the undo apply progress in the transaction header.
+ */
+void
+UndoRecordPrepareApplyProgress(UndoRecordInsertContext *context,
+                                                          UndoRecPtr xact_urp, BlockNumber progress)
+{
+       int                     index = 0;
+       int                     offset;
+
+       Assert(UndoRecPtrIsValid(xact_urp));
+
+       /*
+        * Temporary undo logs are discarded on transaction commit so we don't
+        * need to do anything.
+        */
+       if (UndoRecPtrGetCategory(xact_urp) == UNDO_TEMP)
+               return;
+
+       /* It shouldn't be discarded. */
+       Assert(!UndoRecPtrIsDiscarded(xact_urp));
+
+       /* Compute the offset of the uur_next in the undo record. */
+       offset = SizeOfUndoRecordHeader +
+                                       offsetof(UndoRecordTransaction, urec_progress);
+
+       index = UndoRecordPrepareTransInfo(context, xact_urp,
+                                                                          sizeof(UndoRecPtr), offset);
+       /*
+        * Set the undo action progress in xact_urec_info, this will be overwritten
+        * in actual undo record during update phase.
+        */
+       context->xact_urec_info[index].progress = progress;
+}
+
 /*
  * Overwrite the first undo record of the previous transaction to update its
  * next pointer.
@@ -292,7 +325,7 @@ UndoRecordPrepareUpdateNext(UndoRecordInsertContext *context,
  * This must be called under the critical section.  This will just overwrite the
  * header of the undo record.
  */
-static void
+void
 UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, int idx)
 {
        Page            page = NULL;
diff --git a/src/backend/access/undo/undoaction.c b/src/backend/access/undo/undoaction.c
new file mode 100644 (file)
index 0000000..96766ee
--- /dev/null
@@ -0,0 +1,522 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoaction.c
+ *       execute undo actions
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undoaction.c
+ *
+ * To apply the undo actions, we collect the undo records in bulk and try to
+ * process them together.  We ensure to update the transaction's progress at
+ * regular intervals so that after a crash we can skip already applied undo.
+ * The undo apply progress is updated in terms of the number of blocks
+ * processed.  Undo apply progress value XACT_APPLY_PROGRESS_COMPLETED
+ * indicates that all the undo is applied, XACT_APPLY_PROGRESS_NOT_STARTED
+ * indicates that no undo action has been applied yet and any other value
+ * indicates that we have applied undo partially and after crash recovery, we
+ * need to start processing the undo from the same location.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "access/undoaction_xlog.h"
+#include "access/undolog.h"
+#include "access/undorequest.h"
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "access/xlog_internal.h"
+#include "nodes/pg_list.h"
+#include "pgstat.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/bufmgr.h"
+#include "utils/relfilenodemap.h"
+#include "utils/syscache.h"
+#include "miscadmin.h"
+#include "storage/shmem.h"
+
+static void UpdateUndoApplyProgress(UndoRecPtr last_log_start_urec_ptr,
+                                                 BlockNumber block_num);
+static bool UndoAlreadyApplied(FullTransactionId full_xid,
+                                               UndoRecPtr to_urecptr);
+static void ApplyUndo(UndoRecInfo *urecinfo, int nrecords);
+static void ProcessAndApplyUndo(FullTransactionId full_xid,
+                               UndoRecPtr from_urecptr, UndoRecPtr to_urecptr,
+                               UndoRecPtr last_log_start_urec_ptr, bool complete_xact);
+
+/*
+ * undo_record_comparator
+ *
+ * qsort comparator to handle undo record for applying undo actions of the
+ * transaction.
+ */
+static int
+undo_record_comparator(const void *left, const void *right)
+{
+       UnpackedUndoRecord *luur = ((UndoRecInfo *) left)->uur;
+       UnpackedUndoRecord *ruur = ((UndoRecInfo *) right)->uur;
+
+       if (luur->uur_rmid < ruur->uur_rmid)
+               return -1;
+       else if (luur->uur_rmid > ruur->uur_rmid)
+               return 1;
+       else if (luur->uur_reloid < ruur->uur_reloid)
+               return -1;
+       else if (luur->uur_reloid > ruur->uur_reloid)
+               return 1;
+       else if (luur->uur_block < ruur->uur_block)
+               return -1;
+       else if (luur->uur_block > ruur->uur_block)
+               return 1;
+       else if (luur->uur_offset < ruur->uur_offset)
+               return -1;
+       else if (luur->uur_offset > ruur->uur_offset)
+               return 1;
+       else if (((UndoRecInfo *) left)->index < ((UndoRecInfo *) right)->index)
+       {
+               /*
+                * If records are for the same block and offset, then maintain their
+                * existing order by comparing their index in the array.
+                */
+               return -1;
+       }
+       else
+               return 1;
+}
+
+/*
+ * UpdateUndoApplyProgress - Updates how far undo actions from a particular
+ * log have been applied while rolling back a transaction.  This progress is
+ * measured in terms of undo block number of the undo log till which the
+ * undo actions have been applied.
+ */
+static void
+UpdateUndoApplyProgress(UndoRecPtr progress_urec_ptr,
+                                               BlockNumber block_num)
+{
+       UndoLogCategory category;
+       UndoRecordInsertContext context = {{0}};
+
+       category =
+               UndoLogNumberGetCategory(UndoRecPtrGetLogNo(progress_urec_ptr));
+
+       /*
+        * We don't need to update the progress for temp tables as they get
+        * discraded after startup.
+        */
+       if (category == UNDO_TEMP)
+               return;
+
+       BeginUndoRecordInsert(&context, category, 1, NULL);
+
+       /*
+        * Prepare and update the undo apply progress in the transaction header.
+        */
+       UndoRecordPrepareApplyProgress(&context, progress_urec_ptr, block_num);
+
+       START_CRIT_SECTION();
+
+       /* Update the progress in the transaction header. */
+       UndoRecordUpdateTransInfo(&context, 0);
+
+       /* WAL log the undo apply progress. */
+       {
+               XLogRecPtr      lsn;
+               xl_undoapply_progress xlrec;
+
+               xlrec.urec_ptr = progress_urec_ptr;
+               xlrec.progress = block_num;
+
+               XLogBeginInsert();
+               XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+               RegisterUndoLogBuffers(&context, 1);
+               lsn = XLogInsert(RM_UNDOACTION_ID, XLOG_UNDO_APPLY_PROGRESS);
+               UndoLogBuffersSetLSN(&context, lsn);
+       }
+
+       END_CRIT_SECTION();
+
+       /* Release undo buffers. */
+       FinishUndoRecordInsert(&context);
+}
+
+/*
+ * UndoAlreadyApplied - Retruns true, if the actions are already applied,
+ *     false, otherwise.
+ */
+static bool
+UndoAlreadyApplied(FullTransactionId full_xid, UndoRecPtr to_urecptr)
+{
+       UnpackedUndoRecord *uur = NULL;
+       UndoRecordFetchContext  context;
+
+       /* Fetch the undo record. */
+       BeginUndoFetch(&context);
+       uur = UndoFetchRecord(&context, to_urecptr);
+       FinishUndoFetch(&context);
+
+       /* already processed and discarded */
+       if (uur == NULL)
+       {
+               /*
+                * Undo action is already applied, so delete the hash table entry
+                * if exists.
+                */
+               RollbackHTRemoveEntry(full_xid, to_urecptr);
+               return true;
+       }
+
+       /* already processed */
+       if (IsXactApplyProgressCompleted(uur->uur_txn->urec_progress))
+       {
+               /*
+                * Undo action is already applied, so delete the hash table entry
+                * if exists.
+                */
+               RollbackHTRemoveEntry(full_xid, to_urecptr);
+               UndoRecordRelease(uur);
+               return true;
+       }
+
+       Assert(FullTransactionIdEquals(full_xid, uur->uur_fxid));
+
+       UndoRecordRelease(uur);
+
+       return false;
+}
+
+/*
+ * ApplyUndo - Invode rmgr specific undo apply functions.
+ *
+ * urecinfo - An array of undo records sorted in the rmgr order.
+ * nrecords - number of records in this array.
+ */
+static void
+ApplyUndo(UndoRecInfo *urecinfo, int nrecords)
+{
+       int                     rmgr_start_idx = 0;
+       int                     rmgr_nrecords = 0;
+       int                     prev_rmid = -1;
+       int                     i;
+
+       /* Apply the undo action for each rmgr. */
+       for (i = 0; i < nrecords; i++)
+       {
+               UnpackedUndoRecord *uur = urecinfo[i].uur;
+
+               Assert(uur->uur_rmid >= 0);
+
+               /*
+                * If this undo is not for the same rmgr then apply all undo
+                * actions for the previous rmgr.
+                */
+               if (prev_rmid >= 0 &&
+                       prev_rmid != uur->uur_rmid)
+               {
+                       Assert(urecinfo[rmgr_start_idx].uur->uur_rmid == prev_rmid);
+                       RmgrTable[prev_rmid].rm_undo(rmgr_nrecords,
+                                                                                &urecinfo[rmgr_start_idx]);
+
+                       rmgr_start_idx = i;
+                       rmgr_nrecords = 0;
+               }
+
+               rmgr_nrecords++;
+               prev_rmid = uur->uur_rmid;
+       }
+
+       /* Apply the last set of the actions. */
+       Assert(urecinfo[rmgr_start_idx].uur->uur_rmid == prev_rmid);
+       RmgrTable[prev_rmid].rm_undo(rmgr_nrecords, &urecinfo[rmgr_start_idx]);
+}
+
+/*
+ * ProcessAndApplyUndo - Fetch undo records and apply actions.
+ *
+ * We always process the undo of the last log when the undo for a transaction
+ * spans across multiple logs.  Then from there onwards the previous undo logs
+ * for the same transaction are processed.
+ *
+ * We also update the undo apply progress in the transaction header so that
+ * after recovery we don't need to process the records that are already
+ * processed.  As we update the progress only after one batch of records,
+ * the crash in-between can cause us to read/apply part of undo records
+ * again but this will never be more than one-batch.  We can further optimize
+ * it by marking the progress in each record, but that has its own downsides
+ * like it will generate more WAL and I/O corresponding to dirty undo buffers.
+ */
+static void
+ProcessAndApplyUndo(FullTransactionId full_xid, UndoRecPtr from_urecptr,
+                                       UndoRecPtr to_urecptr, UndoRecPtr last_log_start_urec_ptr,
+                                       bool complete_xact)
+{
+       UndoRecInfo *urecinfo;
+       UndoRecPtr      urec_ptr = from_urecptr;
+       int                     undo_apply_size;
+
+       /*
+        * We choose maintenance_work_mem to collect the undo records for
+        * rollbacks as most of the large rollback requests are done by
+        * background worker which can be considered as maintainence operation.
+        * However, we can introduce a new guc for this as well.
+        */
+       undo_apply_size = maintenance_work_mem * 1024L;
+
+       /*
+        * Fetch the multiple undo records that can fit into undo_apply_size; sort
+        * them and then rmgr specific callback to process them.  Repeat this
+        * until we process all the records for the transaction being rolled back.
+        */
+       do
+       {
+               BlockNumber     progress_block_num = InvalidBlockNumber;
+               int                     i;
+               int                     nrecords;
+               bool            log_switched = false;
+               bool            rollback_completed = false;
+               bool            update_progress = false;
+               UndoRecPtr      progress_urec_ptr = InvalidUndoRecPtr;
+               UndoRecInfo     *first_urecinfo;
+               UndoRecInfo     *last_urecinfo;
+
+               /*
+                * Fetch multiple undo records at once.
+                *
+                * At a time, we only fetch the undo records from a single undo log.
+                * Once, we process all the undo records from one undo log, we update
+                * the last_log_start_urec_ptr and proceed to the previous undo log.
+                */
+               urecinfo = UndoBulkFetchRecord(&urec_ptr, last_log_start_urec_ptr,
+                                                                          undo_apply_size, &nrecords, false);
+
+               /*
+                * Since the rollback of this transaction is in-progress, there will be
+                * at least one undo record which is not yet discarded.
+                */
+               Assert(nrecords > 0);
+
+               /*
+                * Get the required information from first and last undo record before
+                * we sort all the records.
+                */
+               first_urecinfo = &urecinfo[0];
+               last_urecinfo = &urecinfo[nrecords - 1];
+               if (last_urecinfo->uur->uur_info & UREC_INFO_LOGSWITCH)
+               {
+                       UndoRecordLogSwitch *logswitch = last_urecinfo->uur->uur_logswitch;
+
+                       /*
+                        * We have crossed the log boundary.  The rest of the undo for
+                        * this transaction is in some other log, the location of which
+                        * can be found from this record.  See commets atop undoaccess.c.
+                        */
+                       log_switched = true;
+
+                       /*
+                        * We need to save the undo record pointer of the last record from
+                        * previous undo log.  We will use the same as from location in
+                        * next iteration of bulk fetch.
+                        */
+                       Assert(UndoRecPtrIsValid(logswitch->urec_prevurp));
+                       urec_ptr = logswitch->urec_prevurp;
+
+                       /*
+                        * The last fetched undo record corresponds to the first undo
+                        * record of the current log.  Once, the undo actions are performed
+                        * from this log, we've to mark the progress as completed.
+                        */
+                       progress_urec_ptr = last_urecinfo->urp;
+
+                       /*
+                        * We also need to save the start location of this transaction in
+                        * previous log.  This will be used in the next iteration of bulk
+                        * fetch and updating progress location.
+                        */
+                       if (complete_xact)
+                       {
+                               Assert(UndoRecPtrIsValid(logswitch->urec_prevlogstart));
+                               last_log_start_urec_ptr = logswitch->urec_prevlogstart;
+                       }
+
+                       /* We've to update the progress for the current log as completed. */
+                       update_progress = true;
+               }
+               else if (complete_xact)
+               {
+                       if (UndoRecPtrIsValid(urec_ptr))
+                       {
+                               /*
+                                * There are still some undo actions pending in this log.  So,
+                                * just update the progress block number.
+                                */
+                               progress_block_num = UndoRecPtrGetBlockNum(last_urecinfo->urp);
+
+                               /*
+                                * If we've not fetched undo records for more than one undo
+                                * block, we can't update the progress block number.  Because,
+                                * there can still be undo records in this block that needs to
+                                * be applied for rolling back this transaction.
+                                */
+                               if (UndoRecPtrGetBlockNum(first_urecinfo->urp) > progress_block_num)
+                               {
+                                       update_progress = true;
+                                       progress_urec_ptr = last_log_start_urec_ptr;
+                               }
+                       }
+                       else
+                       {
+                               /*
+                                * Invalid urec_ptr indicates that we have executed all the undo
+                                * actions for this transaction.  So, mark current log header
+                                * as complete.
+                                */
+                               Assert(last_log_start_urec_ptr == to_urecptr);
+                               rollback_completed = true;
+                               update_progress = true;
+                               progress_urec_ptr = last_log_start_urec_ptr;
+                       }
+               }
+
+               /*
+                * The undo records must belong to the transaction that is being
+                * rolled back.
+                */
+               Assert(FullTransactionIdEquals(full_xid, urecinfo[0].uur->uur_fxid));
+
+               /* Sort the undo record array in order of target blocks. */
+               qsort((void *) urecinfo, nrecords, sizeof(UndoRecInfo),
+                         undo_record_comparator);
+
+               /* Call resource manager specific callbacks to apply actions. */
+               ApplyUndo(urecinfo, nrecords);
+
+               /* Set undo action apply progress if required. */
+               if (update_progress)
+               {
+                       Assert(UndoRecPtrIsValid(progress_urec_ptr));
+
+                       if (log_switched || rollback_completed)
+                       {
+                               /*
+                                * We have crossed the log boundary or executed all the undo
+                                * actions for the main transaction.  So, mark current log
+                                * header as complete and set the next progress location in
+                                * the previous log.
+                                */
+                               UpdateUndoApplyProgress(progress_urec_ptr,
+                                                                               XACT_APPLY_PROGRESS_COMPLETED);
+                       }
+                       else
+                       {
+                               /*
+                                * Update the progress block number.  We increase the block
+                                * number by one since the current block might have some undo
+                                * records that are yet to be applied.  But, all undo records
+                                * from the next block must have been applied.
+                                */
+                               UpdateUndoApplyProgress(progress_urec_ptr,
+                                                                               progress_block_num + 1);
+                       }
+               }
+
+               /* Free all undo records. */
+               for (i = 0; i < nrecords; i++)
+                       UndoRecordRelease(urecinfo[i].uur);
+
+               /* Free urp array for the current batch of undo records. */
+               pfree(urecinfo);
+
+               /*
+                * Invalid urec_ptr indicates that we have executed all the undo
+                * actions for this transaction.
+                */
+               if (!UndoRecPtrIsValid(urec_ptr))
+                       break;
+       } while (true);
+}
+
+/*
+ * execute_undo_actions - Execute the undo actions
+ *
+ * full_xid - Transaction id that is getting rolled back.
+ * from_urecptr - undo record pointer from where to start applying undo
+ *                             actions.
+ * to_urecptr  - undo record pointer up to which the undo actions need to be
+ *                             applied.
+ * complete_xact       - true if rollback is for complete transaction.
+ */
+void
+execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr,
+                                        UndoRecPtr to_urecptr, bool complete_xact)
+{
+       UndoRecPtr last_log_start_urec_ptr = to_urecptr;
+
+       /* 'from' and 'to' pointers must be valid. */
+       Assert(from_urecptr != InvalidUndoRecPtr);
+       Assert(to_urecptr != InvalidUndoRecPtr);
+
+       /*
+        * Here we compute the last log start urp which is used for fetching the
+        * undo records and updating the undo action progress.
+        *
+        * For rollbacks of subtransaction, we won't be able to calculate the last
+        * log start urp since we don't have the start urp of the top xid and hence
+        * we won't be able to follow the transaction chains to find the last log.
+        */
+       if (complete_xact)
+       {
+               if (UndoRecPtrGetCategory(to_urecptr) == UNDO_TEMP)
+               {
+                       UndoRecPtr end_urec_ptr = from_urecptr;
+
+                       /*
+                        * For temporary tables, we don't push the rollback request in the
+                        * rollback hash table so we can't directly get the last log start
+                        * urp from there.  Instead, we need to compute it now.
+                        */
+                       (void) FindUndoEndLocationAndSize(to_urecptr, &end_urec_ptr,
+                                                                                         &last_log_start_urec_ptr,
+                                                                                         full_xid);
+               }
+               else
+               {
+                       /*
+                        * It is important here to fetch the latest undo record and validate if
+                        * the actions are already executed.  The reason is that it is possible
+                        * that discard worker or backend might try to execute the rollback
+                        * request which is already executed.  For ex., after discard worker
+                        * fetches the record and found that this transaction need to be
+                        * rolledback, backend might concurrently execute the actions and
+                        * remove the request from rollback hash table.
+                        *
+                        * The other case where this will be required is when the transactions
+                        * records span across multiple logs.  Say, we register the
+                        * transaction from the first log and then we encounter the same
+                        * transaction in the second log where its status is still not marked
+                        * as done.  Now, before we try to register the request for the second
+                        * log, the undo worker came along rolled back the previous request
+                        * and removed its hash entry.  In this case, we will successfully
+                        * register the request from the second log and it should be detected
+                        * here.
+                        */
+                       if (UndoAlreadyApplied(full_xid, to_urecptr))
+                               return;
+
+                       last_log_start_urec_ptr =
+                               RollbackHTGetLastLogStartUrp(full_xid, to_urecptr);
+               }
+       }
+
+       ProcessAndApplyUndo(full_xid, from_urecptr, to_urecptr,
+                                               last_log_start_urec_ptr, complete_xact);
+
+       /*
+        * Undo actions are applied so delete the hash table entry.
+        */
+       RollbackHTRemoveEntry(full_xid, to_urecptr);
+}
diff --git a/src/backend/access/undo/undoactionxlog.c b/src/backend/access/undo/undoactionxlog.c
new file mode 100644 (file)
index 0000000..8d4ff7f
--- /dev/null
@@ -0,0 +1,60 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoactionxlog.c
+ *       WAL replay logic for undo actions.
+ *
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/access/undo/undoactionxlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/undoaction_xlog.h"
+#include "access/undoaccess.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+
+/*
+ * Replay of undo apply progress.
+ */
+static void
+undo_xlog_apply_progress(XLogReaderState *record)
+{
+       xl_undoapply_progress *xlrec = (xl_undoapply_progress *) XLogRecGetData(record);
+       UndoLogCategory category;
+       UndoRecordInsertContext context = {{0}};
+
+       category =
+               UndoLogNumberGetCategory(UndoRecPtrGetLogNo(xlrec->urec_ptr));
+
+       BeginUndoRecordInsert(&context, category, 1, record);
+
+       /* Update the undo apply progress in the transaction header. */
+       UndoRecordPrepareApplyProgress(&context, xlrec->urec_ptr,
+                                                                  xlrec->progress);
+
+       UndoRecordUpdateTransInfo(&context, 0);
+
+       /* Release undo buffers. */
+       FinishUndoRecordInsert(&context);
+}
+
+void
+undoaction_redo(XLogReaderState *record)
+{
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       switch (info)
+       {
+               case XLOG_UNDO_APPLY_PROGRESS:
+                       undo_xlog_apply_progress(record);
+                       break;
+               default:
+                       elog(PANIC, "undoaction_redo: unknown op code %u", info);
+       }
+}
index d3a9c4d64c6650b5e3c0ffc51efbf37559a239f3..272edcbcba38ee4741809e6f6d532af7c53738ec 100644 (file)
@@ -155,6 +155,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
                case RM_REPLORIGIN_ID:
                case RM_GENERIC_ID:
                case RM_UNDOLOG_ID:
+               case RM_UNDOACTION_ID:
                        /* just deal with xid, and done */
                        ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
                                                                        buf.origptr);
index 287af60c4e720affbeaf558c3c4a4452f8c4882a..b26c45e4a28e7cd5c604f86690eea10ab58efcb4 100644 (file)
@@ -28,7 +28,7 @@
  * RmgrNames is an array of resource manager names, to make error messages
  * a bit nicer.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \
   name,
 
 static const char *RmgrNames[RM_MAX_ID + 1] = {
index 938150dd915e2ab223c914e5dbb41a8e97133d52..976f80e9c30d79c74ab9b7bc638c67a39ab868ef 100644 (file)
@@ -20,6 +20,7 @@
 #include "access/nbtxlog.h"
 #include "access/rmgr.h"
 #include "access/spgxlog.h"
+#include "access/undoaction_xlog.h"
 #include "access/undolog_xlog.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
@@ -33,7 +34,7 @@
 #include "storage/standbydefs.h"
 #include "utils/relmapper.h"
 
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \
        { name, desc, identify},
 
 const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
index c9b5c56a4c601535b9396d1ebeddeb2638225044..0a3794a44e593a82a81a863654b85bb6c3d5109f 100644 (file)
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
  * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
  * file format.
  */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_status,undo_desc) \
        symname,
 
 typedef enum RmgrIds
index 6945e3e9504c6ef254c38edd82e859ea9ca0b0e4..6da5930e0b7f0fdf0cb63a0c58d14fc376c60bb7 100644 (file)
  */
 
 /* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
-PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, NULL, NULL, NULL)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, NULL, NULL, NULL)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask, NULL, NULL, NULL)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL, NULL)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL, NULL)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL, NULL)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL, NULL)
+PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL, NULL)
index 24ea97b8d3888d6d34fa62ce6803e9de9f1838fc..7c31332993e0337bf76abfad474a3f775df3ec07 100644 (file)
@@ -13,6 +13,7 @@
 #ifndef UNDOACCESS_H
 #define UNDOACCESS_H
 
+#include "access/transam.h"
 #include "access/undolog.h"
 #include "access/undorecord.h"
 #include "access/xlogdefs.h"
@@ -94,6 +95,9 @@ typedef struct UndoRecordFetchContext
        UndoRecPtr      urp;                    /* Previous undo record pointer. */
 } UndoRecordFetchContext;
 
+extern void UndoRecordPrepareApplyProgress(UndoRecordInsertContext *context,
+                                       UndoRecPtr urecptr, BlockNumber progress);
+extern void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, int idx);
 extern void BeginUndoRecordInsert(UndoRecordInsertContext *context,
                                                                  UndoLogCategory category,
                                                                  int nprepared,
diff --git a/src/include/access/undoaction_xlog.h b/src/include/access/undoaction_xlog.h
new file mode 100644 (file)
index 0000000..b9e65d1
--- /dev/null
@@ -0,0 +1,39 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoaction_xlog.h
+ *       undo action XLOG definitions
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoaction_xlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOACTION_XLOG_H
+#define UNDOACTION_XLOG_H
+
+#include "access/undolog.h"
+#include "access/xlogreader.h"
+#include "lib/stringinfo.h"
+#include "storage/off.h"
+
+/*
+ * WAL record definitions for undoactions.c's WAL operations
+ */
+#define XLOG_UNDO_APPLY_PROGRESS       0x00
+
+/* This is what we need to know about undo apply progress */
+typedef struct xl_undoapply_progress
+{
+       UndoRecPtr      urec_ptr;
+       uint32          progress;
+} xl_undoapply_progress;
+
+#define SizeOfUndoActionProgress       (offsetof(xl_undoapply_progress, progress) + sizeof(uint32))
+
+extern void undoaction_redo(XLogReaderState *record);
+extern void undoaction_desc(StringInfo buf, XLogReaderState *record);
+extern const char *undoaction_identify(uint8 info);
+
+#endif                                                 /* UNDOACTION_XLOG_H */
index defc810d967533b30ab2b0c650206919a9b4d34a..e197aef965abb21eb7f44af1105ddb94071c97e2 100644 (file)
@@ -223,8 +223,5 @@ extern FullTransactionId RollbackHTGetOldestFullXid(FullTransactionId oldestXmin
 /* functions exposed from undoaction.c */
 extern void execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr,
                                         UndoRecPtr to_urecptr, bool nopartial);
-extern bool execute_undo_actions_page(UndoRecInfo *urp_array, int first_idx,
-                                                 int last_idx, Oid reloid, FullTransactionId full_xid,
-                                                 BlockNumber blkno, bool blk_chain_complete);
 
 #endif                                                 /* _UNDOREQUEST_H */
index b986a385756fd10a2eccb0bfe6193eccd34a9566..35aff69492312ea1bff8e810eb428a6d97825ee0 100644 (file)
 #ifndef XLOG_INTERNAL_H
 #define XLOG_INTERNAL_H
 
+#include "access/transam.h"
+#include "access/undoaccess.h"
+#include "access/undorecord.h"
 #include "access/xlogdefs.h"
 #include "access/xlogreader.h"
 #include "datatype/timestamp.h"
 #include "lib/stringinfo.h"
+#include "nodes/pg_list.h"
 #include "pgtime.h"
 #include "storage/block.h"
 #include "storage/relfilenode.h"
@@ -270,6 +274,15 @@ typedef enum
        RECOVERY_TARGET_ACTION_SHUTDOWN
 }                      RecoveryTargetAction;
 
+/*
+ * Return values for undo status callback functions.
+ */
+typedef enum UndoStatus
+{
+       UNDO_STATUS_WAIT_XMIN,          /* wait until the xmin passes an xid */
+       UNDO_STATUS_DISCARD                     /* the record set should be discarded */
+} UndoStatus;
+
 /*
  * Method table for resource managers.
  *
@@ -295,9 +308,12 @@ typedef struct RmgrData
        void            (*rm_startup) (void);
        void            (*rm_cleanup) (void);
        void            (*rm_mask) (char *pagedata, BlockNumber blkno);
+       void            (*rm_undo) (int nrecords, UndoRecInfo *records);
+       UndoStatus      (*rm_undo_status) (UnpackedUndoRecord *record, TransactionId *xid);
+       void            (*rm_undo_desc) (StringInfo buf, UnpackedUndoRecord *record);
 } RmgrData;
 
-extern const RmgrData RmgrTable[];
+extern PGDLLIMPORT const RmgrData RmgrTable[];
 
 /*
  * Exported to support xlog switching from checkpointer