Infrastructure to register and fetch undo action requests.
authorAmit Kapila <[email protected]>
Thu, 13 Jun 2019 09:40:06 +0000 (15:10 +0530)
committerKuntal Ghosh <[email protected]>
Fri, 19 Jul 2019 08:48:44 +0000 (14:18 +0530)
This infrasture provides a way to allow execution of undo actions.  One
might think that we can always execute undo actions on error or explicit
rollabck by user, however there are cases when that is not posssible.
For example, (a) if the system crash while doing operation, then after
startup, we need a way to perform undo actions; (b) If we get error while
performing undo actions.

Apart from this, when there are large rollback requests, then it is quite
inefficient to perform all the undo actions and then return control to
user.

To allow efficient execution of the undo actions, we create three queues
and a hash table for the rollback requests.  A Xid based priority queue
which will allow us to process the requests of older transactions and help
us to move oldesdXidHavingUnappliedUndo (this is a xid-horizon below which
all the transactions are visible) forward.  A size-based queue which will
help us to perform the rollbacks of larger aborts in a timely fashion so
that we don't get stuck while processing them during discard of the logs.
An error queue to hold the requests for transactions that failed to apply
its undo.  The rollback hash table is used to avoid duplicate undo requests
by backends and discard worker.

Amit Kapila and Kuntal Ghosh, design idea by Andres Freund.

13 files changed:
src/backend/access/undo/Makefile
src/backend/access/undo/undoaccess.c
src/backend/access/undo/undorequest.c [new file with mode: 0644]
src/backend/storage/lmgr/lwlocknames.txt
src/backend/storage/lmgr/proc.c
src/backend/utils/init/postinit.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/access/transam.h
src/include/access/undoaccess.h
src/include/access/undorequest.h [new file with mode: 0644]
src/include/miscadmin.h
src/include/storage/proc.h

index 049a416f0714d8b380476d7d5d51c30e2f5cc02c..73275028be9b655136896f33ec60ca30b417f5d1 100644 (file)
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undoaccess.o undolog.o undorecord.o
+OBJS = undoaccess.o undolog.o undorecord.o undorequest.o
 
 include $(top_srcdir)/src/backend/common.mk
index a8cf46964ee907bc4fa6fbe84940c15b7bab8af0..4ffec58c5b50ecfdcbbb672e2a7bc51b8364743c 100644 (file)
@@ -46,6 +46,7 @@
 #include "access/undorecord.h"
 #include "access/undoaccess.h"
 #include "access/undolog_xlog.h"
+#include "access/undorequest.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xlogutils.h"
@@ -754,7 +755,7 @@ PrepareUndoInsert(UndoRecordInsertContext *context,
        {
                urec->uur_txn = palloc(SizeOfUndoRecordTransaction);
                urec->uur_txn->urec_dbid = dbid;
-               urec->uur_txn->urec_progress = InvalidBlockNumber;
+               urec->uur_txn->urec_progress = XACT_APPLY_PROGRESS_NOT_STARTED;
                urec->uur_txn->urec_next = InvalidUndoRecPtr;
        }
        else
@@ -1752,3 +1753,50 @@ UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
        /* calculate the previous undo record pointer */
        return MakeUndoRecPtr(logno, offset - prevlen);
 }
+
+/*
+ * Returns the undo record pointer corresponding to first record in the given
+ * block.
+ */
+UndoRecPtr
+UndoBlockGetFirstUndoRecord(BlockNumber blkno, UndoRecPtr urec_ptr,
+                                                       UndoLogCategory category)
+{
+       Buffer buffer;
+       Page page;
+       UndoPageHeader  phdr;
+       RelFileNode             rnode;
+       UndoLogOffset   log_cur_off;
+       Size                    partial_rec_size;
+       int                             offset_cur_page;
+
+       if (!BlockNumberIsValid(blkno))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("invalid undo block number")));
+
+       UndoRecPtrAssignRelFileNode(rnode, urec_ptr);
+
+       buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, blkno,
+                                                                          RBM_NORMAL, NULL,
+                                                                          RelPersistenceForUndoLogCategory(category));
+
+       LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+       page = BufferGetPage(buffer);
+       phdr = (UndoPageHeader)page;
+
+       /* Calculate the size of the partial record. */
+       partial_rec_size = UndoRecordHeaderSize(phdr->uur_info) +
+                                               phdr->tuple_len + phdr->payload_len -
+                                               phdr->record_offset;
+
+       /* calculate the offset in current log. */
+       offset_cur_page = SizeOfUndoPageHeaderData + partial_rec_size;
+       log_cur_off = (blkno * BLCKSZ) + offset_cur_page;
+
+       UnlockReleaseBuffer(buffer);
+
+       /* calculate the undo record pointer based on current offset in log. */
+       return MakeUndoRecPtr(UndoRecPtrGetLogNo(urec_ptr), log_cur_off);
+}
diff --git a/src/backend/access/undo/undorequest.c b/src/backend/access/undo/undorequest.c
new file mode 100644 (file)
index 0000000..916d879
--- /dev/null
@@ -0,0 +1,1641 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorequest.c
+ *       This contains routines to register and fetch undo action requests.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorequest.c
+ *
+ * To increase the efficiency of the rollbacks, we create three queues and
+ * a hash table for the rollback requests.  A Xid based priority queue which
+ * will allow us to process the requests of older transactions and help us
+ * to move oldesdXidHavingUndo forward.  A size-based queue which will help
+ * us to perform the rollbacks of larger aborts in a timely fashion, so that
+ * we don't get stuck while processing them during discard of the logs.
+ * An error queue to hold the requests for transactions that failed to apply
+ * its undo.  The rollback hash table is used to avoid duplicate undo requests
+ * by backends and discard worker.  The table must be able to accommodate all
+ * active undo requests.  The undo requests must appear in both xid and size
+ * requests queues or neither.  As of now we, process the requests from these
+ * queues in a round-robin fashion to give equal priority to all three type
+ * of requests.
+ *
+ * The rollback requests exceeding a certain threshold are pushed into both
+ * xid and size based queues.  They are also registered in the hash table.
+ *
+ * To ensure that backend and discard worker don't register the same request
+ * in the hash table, we always register the request with full_xid and the
+ * start pointer for the transaction in the hash table as key.  Backends
+ * always remember the value of start pointer, but discard worker doesn't know
+ * the actual start value in case transaction's undo spans across multiple
+ * logs.  The reason for the same is that discard worker might encounter the
+ * log which has overflowed undo records of the transaction first.  In such
+ * cases, we need to compute the actual start position.  The first record of a
+ * transaction in each undo log contains a reference to the first record of
+ * this transaction in the previous log.  By following the previous log chain
+ * of this transaction, we find the initial location which is used to register
+ * the request.
+ *
+ * To process the request, we get the request from one of the queues, search
+ * it in hash table and mark it as in-progress and then remove from the
+ * respective queue.  Once we process all the actions, the request is removed
+ * from the hash table.  If the worker found the request in the queue, but
+ * the request is not present in hash table or is marked as in-progress, then
+ * it can ignore such a request (and remove it from that queue) as it must
+ * have been already processed or is being processed.
+ *
+ * Also note that, if the work queues are full, then we put backpressure on
+ * backends to complete the requests by themselves.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "miscadmin.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/transam.h"
+#include "access/undorequest.h"
+#include "access/xact.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_database.h"
+#include "lib/binaryheap.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/procarray.h"
+#include "utils/fmgroids.h"
+#include "access/xlog.h"
+#include "storage/proc.h"
+
+#define        MAX_UNDO_WORK_QUEUES    3
+#define UNDO_PEEK_DEPTH                10
+#define UNDO_FAILURE_RETRY_DELAY_MS 10000
+
+int                    rollback_overflow_size = 64;
+int                    pending_undo_queue_size = 1024;
+
+/* Each worker queue is a binary heap. */
+typedef struct
+{
+       binaryheap *bh;
+       union
+       {
+               UndoXidQueue *xid_elems;
+               UndoSizeQueue *size_elems;
+               UndoErrorQueue *error_elems;
+       }                       q_choice;
+} UndoWorkerQueue;
+
+/* This is the hash table to store all the rollabck requests. */
+static HTAB *RollbackHT;
+static UndoWorkerQueue UndoWorkerQueues[MAX_UNDO_WORK_QUEUES];
+
+static uint32 cur_undo_queue = 0;
+
+/* Different operations for XID queue */
+#define InitXidQueue(bh, elems) \
+( \
+       UndoWorkerQueues[XID_QUEUE].bh = bh, \
+       UndoWorkerQueues[XID_QUEUE].q_choice.xid_elems = elems \
+)
+
+#define XidQueueIsEmpty() \
+       (binaryheap_empty(UndoWorkerQueues[XID_QUEUE].bh))
+
+#define GetXidQueueSize() \
+       (binaryheap_cur_size(UndoWorkerQueues[XID_QUEUE].bh))
+
+#define GetXidQueueElem(elem) \
+       (UndoWorkerQueues[XID_QUEUE].q_choice.xid_elems[elem])
+
+#define GetXidQueueTopElem() \
+( \
+       AssertMacro(!binaryheap_empty(UndoWorkerQueues[XID_QUEUE].bh)), \
+       DatumGetPointer(binaryheap_first(UndoWorkerQueues[XID_QUEUE].bh)) \
+)
+
+#define GetXidQueueNthElem(n) \
+( \
+       AssertMacro(!XidQueueIsEmpty()), \
+       DatumGetPointer(binaryheap_nth(UndoWorkerQueues[XID_QUEUE].bh, n)) \
+)
+
+#define SetXidQueueElem(elem, e_dbid, e_full_xid, e_start_urec_ptr) \
+( \
+       GetXidQueueElem(elem).dbid = e_dbid, \
+       GetXidQueueElem(elem).full_xid = e_full_xid, \
+       GetXidQueueElem(elem).start_urec_ptr = e_start_urec_ptr \
+)
+
+/* Different operations for SIZE queue */
+#define InitSizeQueue(bh, elems) \
+( \
+       UndoWorkerQueues[SIZE_QUEUE].bh = bh, \
+       UndoWorkerQueues[SIZE_QUEUE].q_choice.size_elems = elems \
+)
+
+#define SizeQueueIsEmpty() \
+       (binaryheap_empty(UndoWorkerQueues[SIZE_QUEUE].bh))
+
+#define GetSizeQueueSize() \
+       (binaryheap_cur_size(UndoWorkerQueues[SIZE_QUEUE].bh))
+
+#define GetSizeQueueElem(elem) \
+       (UndoWorkerQueues[SIZE_QUEUE].q_choice.size_elems[elem])
+
+#define GetSizeQueueTopElem() \
+( \
+       AssertMacro(!SizeQueueIsEmpty()), \
+       DatumGetPointer(binaryheap_first(UndoWorkerQueues[SIZE_QUEUE].bh)) \
+)
+
+#define GetSizeQueueNthElem(n) \
+( \
+       AssertMacro(!SizeQueueIsEmpty()), \
+       DatumGetPointer(binaryheap_nth(UndoWorkerQueues[SIZE_QUEUE].bh, n)) \
+)
+
+#define SetSizeQueueElem(elem, e_dbid, e_full_xid, e_size, e_start_urec_ptr) \
+( \
+       GetSizeQueueElem(elem).dbid = e_dbid, \
+       GetSizeQueueElem(elem).full_xid = e_full_xid, \
+       GetSizeQueueElem(elem).request_size = e_size, \
+       GetSizeQueueElem(elem).start_urec_ptr = e_start_urec_ptr \
+)
+
+/* Different operations for Error queue */
+#define InitErrorQueue(bh, elems) \
+( \
+       UndoWorkerQueues[ERROR_QUEUE].bh = bh, \
+       UndoWorkerQueues[ERROR_QUEUE].q_choice.error_elems = elems \
+)
+
+#define ErrorQueueIsEmpty() \
+       (binaryheap_empty(UndoWorkerQueues[ERROR_QUEUE].bh))
+
+#define GetErrorQueueSize() \
+       (binaryheap_cur_size(UndoWorkerQueues[ERROR_QUEUE].bh))
+
+#define GetErrorQueueElem(elem) \
+       (UndoWorkerQueues[ERROR_QUEUE].q_choice.error_elems[elem])
+
+#define GetErrorQueueTopElem() \
+( \
+       AssertMacro(!binaryheap_empty(UndoWorkerQueues[ERROR_QUEUE].bh)), \
+       DatumGetPointer(binaryheap_first(UndoWorkerQueues[ERROR_QUEUE].bh)) \
+)
+
+#define GetErrorQueueNthElem(n) \
+( \
+       AssertMacro(!ErrorQueueIsEmpty()), \
+       DatumGetPointer(binaryheap_nth(UndoWorkerQueues[ERROR_QUEUE].bh, n)) \
+)
+
+#define SetErrorQueueElem(elem, e_dbid, e_full_xid, e_retry_at, e_occurred_at) \
+( \
+       GetErrorQueueElem(elem).dbid = e_dbid, \
+       GetErrorQueueElem(elem).full_xid = e_full_xid, \
+       GetErrorQueueElem(elem).next_retry_at = e_retry_at, \
+       GetErrorQueueElem(elem).err_occurred_at = e_occurred_at \
+)
+
+/*
+ * Binary heap comparison function to compare the age of transactions.
+ */
+static int
+undo_age_comparator(Datum a, Datum b, void *arg)
+{
+       UndoXidQueue *xidQueueElem1 = (UndoXidQueue *) DatumGetPointer(a);
+       UndoXidQueue *xidQueueElem2 = (UndoXidQueue *) DatumGetPointer(b);
+
+       if (FullTransactionIdPrecedes(xidQueueElem1->full_xid,
+                                                                 xidQueueElem2->full_xid))
+               return 1;
+       else if (FullTransactionIdFollows(xidQueueElem1->full_xid,
+                                                                         xidQueueElem2->full_xid))
+               return -1;
+       return 0;
+}
+
+/*
+ * Binary heap comparison function to compare the size of transactions.
+ */
+static int
+undo_size_comparator(Datum a, Datum b, void *arg)
+{
+       UndoSizeQueue *sizeQueueElem1 = (UndoSizeQueue *) DatumGetPointer(a);
+       UndoSizeQueue *sizeQueueElem2 = (UndoSizeQueue *) DatumGetPointer(b);
+
+       if (sizeQueueElem1->request_size > sizeQueueElem2->request_size)
+               return 1;
+       else if (sizeQueueElem1->request_size < sizeQueueElem2->request_size)
+               return -1;
+       return 0;
+}
+
+/*
+ * Binary heap comparison function to compare the time at which an error
+ * occurred for transactions.
+ *
+ * The error queue is sorted by next_retry_at and err_occurred_at.  Currently,
+ * the next_retry_at has some constant delay time (see PushErrorQueueElem), so
+ * it doesn't make much sense to sort by both values.  However, in future, if
+ * we have some different algorithm for next_retry_at, then it will work
+ * seamlessly.
+ */
+static int
+undo_err_time_comparator(Datum a, Datum b, void *arg)
+{
+       UndoErrorQueue *errQueueElem1 = (UndoErrorQueue *) DatumGetPointer(a);
+       UndoErrorQueue *errQueueElem2 = (UndoErrorQueue *) DatumGetPointer(b);
+
+       if (errQueueElem1->next_retry_at < errQueueElem2->next_retry_at)
+               return 1;
+       else if (errQueueElem1->next_retry_at > errQueueElem2->next_retry_at)
+               return -1;
+       if (errQueueElem1->err_occurred_at < errQueueElem2->err_occurred_at)
+               return 1;
+       else if (errQueueElem1->err_occurred_at > errQueueElem2->err_occurred_at)
+               return -1;
+       return 0;
+}
+
+/* Returns the size of xid based queue. */
+static int
+UndoXidQueueElemsShmSize(void)
+{
+       return mul_size(pending_undo_queue_size, sizeof(UndoXidQueue));
+}
+
+/* Returns the size of rollback request size based queue. */
+static int
+UndoSizeQueueElemsShmSize(void)
+{
+       return mul_size(pending_undo_queue_size, sizeof(UndoSizeQueue));
+}
+
+/* Returns the size of error queue. */
+static int
+UndoErrorQueueElemsShmSize(void)
+{
+       return mul_size(pending_undo_queue_size, sizeof(UndoErrorQueue));
+}
+
+/* Returns the size of rollback hash table. */
+int
+UndoRollbackHashTableSize()
+{
+       /*
+        * The rollback hash table is used to avoid duplicate undo requests by
+        * backends and discard worker.  The table must be able to accomodate all
+        * active undo requests.  The undo requests must appear in both xid and
+        * size requests queues or neither.  In same transaction, there can be two
+        * requests one for logged relations and another for unlogged relations.
+        * So, the rollback hash table size should be equal to two request queues,
+        * an error queue (currently this is same as request queue) and max
+        * backends. This will ensure that it won't get filled.
+        */
+       return ((2 * pending_undo_queue_size) + pending_undo_queue_size +
+                       MaxBackends);
+}
+
+/* Get the first free element of xid based request array. */
+static int
+UndoXidQueueGetFreeElem(void)
+{
+       int                     i;
+
+       for (i = 0; i < pending_undo_queue_size; i++)
+       {
+               if (FullTransactionIdEquals(GetXidQueueElem(i).full_xid,
+                                                                       InvalidFullTransactionId))
+                       return i;
+       }
+
+       /* we should never call this function when the request queue is full. */
+       Assert(false);
+
+       /* silence compiler. */
+       return -1;
+}
+
+/* Push an element in the xid based request queue. */
+static void
+PushXidQueueElem(UndoRequestInfo * urinfo)
+{
+       int                     elem = UndoXidQueueGetFreeElem();
+
+       SetXidQueueElem(elem, urinfo->dbid, urinfo->full_xid,
+                                       urinfo->start_urec_ptr);
+
+       binaryheap_add(UndoWorkerQueues[XID_QUEUE].bh,
+                                  PointerGetDatum(&GetXidQueueElem(elem)));
+}
+
+/* Pop nth element from the xid based request queue. */
+static UndoXidQueue *
+PopXidQueueNthElem(int n)
+{
+       Datum           elem;
+
+       Assert(!XidQueueIsEmpty());
+       elem = binaryheap_remove_nth(UndoWorkerQueues[XID_QUEUE].bh, n);
+
+       return (UndoXidQueue *) (DatumGetPointer(elem));
+}
+
+/* Get the first free element of size based request array. */
+static int
+UndoSizeQueueGetFreeElem(void)
+{
+       int                     i;
+
+       for (i = 0; i < pending_undo_queue_size; i++)
+       {
+               if (FullTransactionIdEquals(GetSizeQueueElem(i).full_xid,
+                                                                       InvalidFullTransactionId))
+                       return i;
+       }
+
+       /* we should never call this function when the request queue is full. */
+       Assert(false);
+
+       /* silence compiler. */
+       return -1;
+}
+
+/*
+ * Traverse the queue and remove dangling entries, if any.  The queue
+ * entry is considered dangling if the hash table doesn't contain the
+ * corresponding entry.
+ */
+static int
+RemoveOldElemsFromXidQueue()
+{
+       int                     nCleaned = 0;
+       int                     i = 0;
+
+       Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE));
+
+       while (i < GetXidQueueSize())
+       {
+               RollbackHashEntry *rh;
+               RollbackHashKey hkey;
+               UndoXidQueue *elem = (UndoXidQueue *) GetXidQueueNthElem(i);
+
+               hkey.full_xid = elem->full_xid;
+               hkey.start_urec_ptr = elem->start_urec_ptr;
+               rh = (RollbackHashEntry *) hash_search(RollbackHT,
+                                                                                          (void *) &hkey,
+                                                                                          HASH_FIND, NULL);
+
+               /*
+                * If some undo worker is already processing the rollback request or
+                * it is already processed, then we drop that request from the queue.
+                */
+               if (!rh || UndoRequestIsInProgress(rh))
+               {
+                       elem->dbid = InvalidOid;
+                       elem->full_xid = InvalidFullTransactionId;
+                       nCleaned++;
+                       binaryheap_remove_nth_unordered(UndoWorkerQueues[XID_QUEUE].bh, i);
+
+                       continue;
+               }
+
+               /*
+                * The request that is present in any queue must be a valid request
+                * and its status must be in_queue.
+                */
+               Assert(UndoRequestIsValid(rh));
+               Assert(UndoRequestIsInQueue(rh));
+
+               i++;
+       }
+
+       binaryheap_build(UndoWorkerQueues[XID_QUEUE].bh);
+
+       return nCleaned;
+}
+
+/* Push an element in the size based request queue */
+static void
+PushSizeQueueElem(UndoRequestInfo * urinfo)
+{
+       int                     elem = UndoSizeQueueGetFreeElem();
+
+       SetSizeQueueElem(elem, urinfo->dbid, urinfo->full_xid,
+                                        urinfo->request_size, urinfo->start_urec_ptr);
+
+       binaryheap_add(UndoWorkerQueues[SIZE_QUEUE].bh,
+                                  PointerGetDatum(&GetSizeQueueElem(elem)));
+}
+
+/* Pop nth element from the size based request queue */
+static UndoSizeQueue *
+PopSizeQueueNthElem(int n)
+{
+       Datum           elem;
+
+       Assert(!binaryheap_empty(UndoWorkerQueues[SIZE_QUEUE].bh));
+       elem = binaryheap_remove_nth(UndoWorkerQueues[SIZE_QUEUE].bh, n);
+
+       return (UndoSizeQueue *) DatumGetPointer(elem);
+}
+
+/*
+ * Traverse the queue and remove dangling entries, if any.  The queue
+ * entry is considered dangling if the hash table doesn't contain the
+ * corresponding entry.
+ */
+static int
+RemoveOldElemsFromSizeQueue()
+{
+       int                     nCleaned = 0;
+       int                     i = 0;
+
+       Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE));
+
+       while (i < GetSizeQueueSize())
+       {
+               RollbackHashEntry *rh;
+               RollbackHashKey hkey;
+               UndoSizeQueue *elem = (UndoSizeQueue *) GetSizeQueueNthElem(i);
+
+               hkey.full_xid = elem->full_xid;
+               hkey.start_urec_ptr = elem->start_urec_ptr;
+               rh = (RollbackHashEntry *) hash_search(RollbackHT,
+                                                                                          (void *) &hkey,
+                                                                                          HASH_FIND, NULL);
+
+               /*
+                * If some undo worker is already processing the rollback request or
+                * it is already processed, then we drop that request from the queue.
+                */
+               if (!rh || UndoRequestIsInProgress(rh))
+               {
+                       elem->dbid = InvalidOid;
+                       elem->full_xid = InvalidFullTransactionId;
+                       elem->request_size = 0;
+                       binaryheap_remove_nth_unordered(UndoWorkerQueues[SIZE_QUEUE].bh, i);
+                       nCleaned++;
+                       continue;
+               }
+
+               /*
+                * The request that is present in any queue must be a valid request
+                * and its status must be in_queue.
+                */
+               Assert(UndoRequestIsValid(rh));
+               Assert(UndoRequestIsInQueue(rh));
+
+               i++;
+       }
+
+       binaryheap_build(UndoWorkerQueues[SIZE_QUEUE].bh);
+
+       return nCleaned;
+}
+
+/* Get the first free element of error time based request array. */
+static int
+UndoErrorQueueGetFreeElem(void)
+{
+       int                     i;
+
+       for (i = 0; i < pending_undo_queue_size; i++)
+       {
+               if (FullTransactionIdEquals(GetErrorQueueElem(i).full_xid,
+                                                                       InvalidFullTransactionId))
+                       return i;
+       }
+
+       /* we should never call this function when the request queue is full. */
+       Assert(false);
+
+       /* silence compiler. */
+       return -1;
+}
+
+/* Push an element in the error time based request queue */
+static void
+PushErrorQueueElem(volatile UndoRequestInfo *urinfo)
+{
+       int                     elem = UndoErrorQueueGetFreeElem();
+       TimestampTz now = GetCurrentTimestamp();
+       TimestampTz next_retry;
+
+       /*
+        * We want to retry this error request after some constant amount of time,
+        * rather than retrying immediately, otherwise, in some cases (ex. when
+        * all the pending requests are failed requests) worker will keep retrying
+        * such errors constantly.
+        *
+        * In future, we might want some more sophisticated back-off algorithm
+        * to delay the execution of such requests.
+        */
+       next_retry = TimestampTzPlusMilliseconds(now, UNDO_FAILURE_RETRY_DELAY_MS);
+       SetErrorQueueElem(elem, urinfo->dbid, urinfo->full_xid, next_retry, now);
+
+       binaryheap_add(UndoWorkerQueues[ERROR_QUEUE].bh,
+                                  PointerGetDatum(&GetErrorQueueElem(elem)));
+}
+
+/* Pop nth element from the error time based request queue */
+static UndoErrorQueue *
+PopErrorQueueNthElem(int n)
+{
+       Datum           elem;
+
+       Assert(!ErrorQueueIsEmpty());
+       elem = binaryheap_remove_nth(UndoWorkerQueues[ERROR_QUEUE].bh, n);
+
+       return (UndoErrorQueue *) (DatumGetPointer(elem));
+}
+
+/*
+ * Traverse the queue and remove dangling entries, if any.  The queue
+ * entry is considered dangling if the hash table doesn't contain the
+ * corresponding entry.
+ */
+static int
+RemoveOldElemsFromErrorQueue()
+{
+       int                     nCleaned = 0;
+       int                     i = 0;
+
+       Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE));
+
+       while (i < GetErrorQueueSize())
+       {
+               RollbackHashEntry *rh;
+               RollbackHashKey hkey;
+               UndoErrorQueue *elem = (UndoErrorQueue *) GetErrorQueueNthElem(i);
+
+               hkey.full_xid = elem->full_xid;
+               hkey.start_urec_ptr = elem->start_urec_ptr;
+               rh = (RollbackHashEntry *) hash_search(RollbackHT,
+                                                                                          (void *) &hkey,
+                                                                                          HASH_FIND, NULL);
+
+               /*
+                * If some undo worker is already processing the rollback request or
+                * it is already processed, then we drop that request from the queue.
+                */
+               if (!rh || UndoRequestIsInProgress(rh))
+               {
+                       elem->dbid = InvalidOid;
+                       elem->full_xid = InvalidFullTransactionId;
+                       elem->next_retry_at = 0;
+                       elem->err_occurred_at = 0;
+                       binaryheap_remove_nth_unordered(UndoWorkerQueues[ERROR_QUEUE].bh, i);
+                       nCleaned++;
+                       continue;
+               }
+
+               /*
+                * The request that is present in any queue must be a valid request
+                * and its status must be in_queue.
+                */
+               Assert(UndoRequestIsValid(rh));
+               Assert(UndoRequestIsInQueue(rh));
+
+               i++;
+       }
+
+       binaryheap_build(UndoWorkerQueues[ERROR_QUEUE].bh);
+
+       return nCleaned;
+}
+
+/*
+ * Remove nth work item from queue and clear the array element as well from
+ * the corresponding queue.
+ */
+static void
+RemoveRequestFromQueue(UndoWorkerQueueType type, int n)
+{
+       if (type == XID_QUEUE)
+       {
+               UndoXidQueue *uXidQueueElem = (UndoXidQueue *) PopXidQueueNthElem(n);
+
+               Assert(FullTransactionIdIsValid(uXidQueueElem->full_xid));
+               uXidQueueElem->dbid = InvalidOid;
+               uXidQueueElem->full_xid = InvalidFullTransactionId;
+       }
+       else if (type == SIZE_QUEUE)
+       {
+               UndoSizeQueue *uSizeQueueElem = (UndoSizeQueue *) PopSizeQueueNthElem(n);
+
+               Assert(FullTransactionIdIsValid(uSizeQueueElem->full_xid));
+               uSizeQueueElem->dbid = InvalidOid;
+               uSizeQueueElem->full_xid = InvalidFullTransactionId;
+               uSizeQueueElem->request_size = 0;
+       }
+       else
+       {
+               UndoErrorQueue *uErrorQueueElem = (UndoErrorQueue *) PopErrorQueueNthElem(n);
+
+               Assert(type == ERROR_QUEUE);
+               Assert(FullTransactionIdIsValid(uErrorQueueElem->full_xid));
+               uErrorQueueElem->dbid = InvalidOid;
+               uErrorQueueElem->full_xid = InvalidFullTransactionId;
+               uErrorQueueElem->next_retry_at = 0;
+               uErrorQueueElem->err_occurred_at = 0;
+       }
+}
+
+/*
+ * Returns true, if there is some valid request in the given queue, false,
+ * otherwise.
+ *
+ * It fills hkey with hash key corresponding to the nth element of the
+ * specified queue.
+ */
+static bool
+GetRollbackHashKeyFromQueue(UndoWorkerQueueType cur_queue, int n,
+                                                       RollbackHashKey *hkey)
+{
+       if (cur_queue == XID_QUEUE)
+       {
+               UndoXidQueue *elem;
+
+               /* check if there is a work in the next queue */
+               if (GetXidQueueSize() <= n)
+                       return false;
+
+               elem = (UndoXidQueue *) GetXidQueueNthElem(n);
+               hkey->full_xid = elem->full_xid;
+               hkey->start_urec_ptr = elem->start_urec_ptr;
+       }
+       else if (cur_queue == SIZE_QUEUE)
+       {
+               UndoSizeQueue *elem;
+
+               /* check if there is a work in the next queue */
+               if (GetSizeQueueSize() <= n)
+                       return false;
+
+               elem = (UndoSizeQueue *) GetSizeQueueNthElem(n);
+               hkey->full_xid = elem->full_xid;
+               hkey->start_urec_ptr = elem->start_urec_ptr;
+       }
+       else
+       {
+               UndoErrorQueue *elem;
+
+               /* It must be an error queue. */
+               Assert(cur_queue == ERROR_QUEUE);
+
+               /* check if there is a work in the next queue */
+               if (GetErrorQueueSize() <= n)
+                       return false;
+
+               elem = (UndoErrorQueue *) GetErrorQueueNthElem(n);
+
+               /*
+                * If it is too early to try the error request again, then check the
+                * work in some other queue.
+                */
+               if (GetCurrentTimestamp() < elem->next_retry_at)
+                       return false;
+
+               hkey->full_xid = elem->full_xid;
+               hkey->start_urec_ptr = elem->start_urec_ptr;
+       }
+
+       return true;
+}
+
+/*
+ * Fetch the end urec pointer for the transaction and the undo request size.
+ *
+ * end_urecptr_out - This is an INOUT parameter. If end undo pointer is
+ * specified, we use the same to calculate the size.  Else, we calculate
+ * the end undo pointer and return the same.
+ *
+ * last_log_start_urec_ptr_out - This is an OUT parameter.  If a transaction
+ * writes undo records in multiple undo logs, this is set to the start undo
+ * record pointer of this transaction in the last log.  If the transaction
+ * writes undo records only in single undo log, it is set to start_urec_ptr.
+ * This value is used to update the rollback progress of the transaction in
+ * the last log.  Once, we have start location in last log, the start location
+ * in all the previous logs can be computed.  See execute_undo_actions for
+ * more details.
+ *
+ * XXX: We don't calculate the exact undo size.  We always skip the size of
+ * the last undo record (if not already discarded) from the calculation.  This
+ * optimization allows us to skip fetching an undo record for the most
+ * frequent cases where the end pointer and current start pointer belong to
+ * the same log.  A simple subtraction between them gives us the size.  In
+ * future this function can be modified if someone needs the exact undo size.
+ * As of now, we use this function to calculate the undo size for inserting
+ * in the pending undo actions in undo worker's size queue.
+ */
+uint64
+FindUndoEndLocationAndSize(UndoRecPtr start_urecptr,
+                                                  UndoRecPtr *end_urecptr_out,
+                                                  UndoRecPtr *last_log_start_urecptr_out,
+                                                  FullTransactionId full_xid)
+{
+       UnpackedUndoRecord *uur = NULL;
+       UndoLogSlot *slot = NULL;
+       UndoRecPtr      urecptr = start_urecptr;
+       UndoRecPtr      end_urecptr = InvalidUndoRecPtr;
+       UndoRecPtr      last_log_start_urecptr = InvalidUndoRecPtr;
+       uint64          sz = 0;
+       UndoLogCategory category;
+
+       Assert(urecptr != InvalidUndoRecPtr);
+
+       while (true)
+       {
+               UndoRecPtr      next_urecptr = InvalidUndoRecPtr;
+               UndoLogOffset next_insert;
+               UndoRecordFetchContext  context;
+
+               if (*end_urecptr_out != InvalidUndoRecPtr)
+               {
+                       /*
+                        * Check whether end pointer and the current pointer belong to
+                        * same log. In that case, we can get the size easily.
+                        */
+                       if (UndoRecPtrGetLogNo(urecptr) == UndoRecPtrGetLogNo(*end_urecptr_out))
+                       {
+                               last_log_start_urecptr = urecptr;
+                               sz += (*end_urecptr_out - urecptr);
+                               break;
+                       }
+               }
+
+               /*
+                * Fetch the log and undo record corresponding to the current undo
+                * pointer.
+                */
+               if ((slot == NULL) || (UndoRecPtrGetLogNo(urecptr) != slot->logno))
+                       slot = UndoLogGetSlot(UndoRecPtrGetLogNo(urecptr), false);
+
+               Assert(slot != NULL);
+               category = slot->meta.category;
+
+               next_insert = UndoLogGetNextInsertPtr(slot->logno);
+
+               /* The corresponding log must be ahead urecptr. */
+               Assert(MakeUndoRecPtr(slot->logno, slot->meta.unlogged.insert) >= urecptr);
+
+               /* Fetch the undo record. */
+               BeginUndoFetch(&context);
+               uur = UndoFetchRecord(&context, urecptr);
+               FinishUndoFetch(&context);
+
+               /*
+                * If the corresponding undo record got rolled back and discarded as
+                * well, we return from here.
+                */
+               if (uur == NULL)
+                       break;
+
+               /* The undo must belongs to a same transaction. */
+               Assert(FullTransactionIdEquals(full_xid, uur->uur_fxid));
+
+               /*
+                * Since this is the first undo record of this transaction in this
+                * log, this must include the transaction header.
+                */
+               Assert(uur->uur_info & UREC_INFO_TRANSACTION);
+
+               /*
+                * Case 1: Check whether any undo records have been applied from this
+                * log.  Else, we've to find the undo location till where the undo
+                * actions have been applied.
+                */
+               if (!IsXactApplyProgressNotStarted(uur->uur_txn->urec_progress))
+               {
+                       /*
+                        * If all the undo records in this log corresponding to this
+                        * transaction, has been applied, we return from here.
+                        */
+                       if (IsXactApplyProgressCompleted(uur->uur_txn->urec_progress))
+                               break;
+
+                       /*
+                        * Find the first undo record of uur_progress block number.  We'll
+                        * set end_urec_ptr to this undo record.
+                        */
+                       end_urecptr = UndoBlockGetFirstUndoRecord(uur->uur_txn->urec_progress,
+                                                                                                         urecptr, category);
+
+                       /*
+                        * Since rollbacks from this undo log are in-progress, all undo
+                        * records from subsequent undo logs must have been applied.  Hence,
+                        * this is the last log.  So, we set last_log_start_urecptr as the
+                        * start undo record pointer of this transaction from current log.
+                        */
+                       last_log_start_urecptr = urecptr;
+                       sz += (end_urecptr - urecptr);
+                       break;
+               }
+
+               next_urecptr = uur->uur_txn->urec_next;
+
+               /*
+                * Case 2: If this is the last transaction in the log then calculate
+                * the latest urec pointer using next insert location of the undo log.
+                *
+                * Even if some new undo got inserted after we have fetched this
+                * transactions undo record, still the next_insert location will give
+                * us the right point to compute end_urecptr.
+                */
+               if (!UndoRecPtrIsValid(next_urecptr))
+               {
+                       last_log_start_urecptr = urecptr;
+                       end_urecptr = UndoGetPrevUndoRecptr(next_insert, InvalidBuffer, category);
+                       sz += (end_urecptr - urecptr);
+                       Assert(UndoRecPtrIsValid(end_urecptr));
+                       break;
+               }
+
+               /*
+                * Case 3: The transaction ended in the same undo log, but this is not
+                * the last transaction.
+                */
+               if (UndoRecPtrGetLogNo(next_urecptr) == slot->logno)
+               {
+                       last_log_start_urecptr = urecptr;
+                       end_urecptr =
+                               UndoGetPrevUndoRecptr(next_urecptr, InvalidBuffer, category);
+                       sz += (end_urecptr - urecptr);
+                       Assert(UndoRecPtrIsValid(end_urecptr));
+                       break;
+               }
+
+               /*
+                * Case 4: If transaction is overflowed to a different undolog and
+                * it's already discarded.  It means that the undo actions for this
+                * transaction which are in the next log are already executed.
+                */
+               if (UndoRecPtrIsDiscarded(next_urecptr))
+               {
+                       UndoLogOffset next_insert;
+
+                       next_insert = UndoLogGetNextInsertPtr(slot->logno);
+                       Assert(UndoRecPtrIsValid(next_insert));
+
+                       last_log_start_urecptr = urecptr;
+                       end_urecptr = UndoGetPrevUndoRecptr(next_insert, InvalidBuffer, category);
+                       sz += (next_insert - urecptr);
+                       Assert(UndoRecPtrIsValid(end_urecptr));
+                       break;
+               }
+
+               /*
+                * Case 5: The transaction is overflowed to a different log, so
+                * restart the processing from then next log but before that consider
+                * this log for request size computation.
+                */
+               {
+                       UndoLogOffset next_insert;
+
+                       next_insert = UndoLogGetNextInsertPtr(slot->logno);
+                       Assert(UndoRecPtrIsValid(next_insert));
+
+                       last_log_start_urecptr = urecptr;
+                       end_urecptr = UndoGetPrevUndoRecptr(next_insert, InvalidBuffer, category);
+                       sz += (next_insert - urecptr);
+
+                       UndoRecordRelease(uur);
+                       uur = NULL;
+               }
+
+               /* Follow the undo chain */
+               urecptr = next_urecptr;
+       }
+
+       if (uur != NULL)
+               UndoRecordRelease(uur);
+
+       if (end_urecptr_out && (*end_urecptr_out == InvalidUndoRecPtr))
+               *end_urecptr_out = end_urecptr;
+       if (last_log_start_urecptr_out &&
+               (*last_log_start_urecptr_out == InvalidUndoRecPtr))
+               *last_log_start_urecptr_out = last_log_start_urecptr;
+
+       return sz;
+}
+
+/*
+ * Returns true, if we can push the rollback request to undo wrokers, false,
+ * otherwise.
+ */
+static bool
+CanPushReqToUndoWorker(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr,
+                                          uint64 req_size)
+{
+       /*
+        * This must be called after acquring RollbackRequestLock as we will check
+        * the binary heaps which can change.
+        */
+       Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE));
+
+       /*
+        * We normally push the rollback request to undo workers if the size of
+        * same is above a certain threshold.
+        */
+       if (req_size >= rollback_overflow_size * 1024 * 1024)
+       {
+               if (GetXidQueueSize() >= pending_undo_queue_size ||
+                       GetSizeQueueSize() >= pending_undo_queue_size)
+               {
+                       /*
+                        * If one of the queues is full traverse both the queues and
+                        * remove dangling entries, if any.  The queue entry is considered
+                        * dangling if the hash table doesn't contain the corresponding
+                        * entry.  It can happen due to two reasons (a) we have processed
+                        * the entry from one of the queues, but not from the other. (b)
+                        * the corresponding database has been dropped due to which we
+                        * have removed the entries from hash table, but not from the
+                        * queues.  This is just a lazy cleanup, if we want we can remove
+                        * the entries from the queues when we detect that the database is
+                        * dropped and remove the corresponding entries from hash table.
+                        */
+                       if (GetXidQueueSize() >= pending_undo_queue_size)
+                               RemoveOldElemsFromXidQueue();
+                       if (GetSizeQueueSize() >= pending_undo_queue_size)
+                               RemoveOldElemsFromSizeQueue();
+               }
+
+               if ((GetXidQueueSize() < pending_undo_queue_size))
+               {
+                       Assert(GetSizeQueueSize() < pending_undo_queue_size);
+
+                       /*
+                        * XXX - Here, we should return true once we have background
+                        * worker facility.
+                        */
+                       return false;
+               }
+       }
+
+       return false;
+}
+
+/*
+ * To return the size of the request queues and hash-table for rollbacks.
+ */
+int
+PendingUndoShmemSize(void)
+{
+       Size            size;
+
+       size = hash_estimate_size(UndoRollbackHashTableSize(), sizeof(RollbackHashEntry));
+       size = add_size(size, mul_size(MAX_UNDO_WORK_QUEUES,
+                                                                  binaryheap_shmem_size(pending_undo_queue_size)));
+       size = add_size(size, UndoXidQueueElemsShmSize());
+       size = add_size(size, UndoSizeQueueElemsShmSize());
+       size = add_size(size, UndoErrorQueueElemsShmSize());
+
+       return size;
+}
+
+/*
+ * Initialize the hash-table and priority heap based queues for rollback
+ * requests in shared memory.
+ */
+void
+PendingUndoShmemInit(void)
+{
+       HASHCTL         info;
+       bool            foundXidQueue = false;
+       bool            foundSizeQueue = false;
+       bool            foundErrorQueue = false;
+       binaryheap *bh;
+       UndoXidQueue *xid_elems;
+       UndoSizeQueue *size_elems;
+       UndoErrorQueue *error_elems;
+
+       MemSet(&info, 0, sizeof(info));
+
+       info.keysize = sizeof(TransactionId) + sizeof(UndoRecPtr);
+       info.entrysize = sizeof(RollbackHashEntry);
+       info.hash = tag_hash;
+
+       RollbackHT = ShmemInitHash("Undo Actions Lookup Table",
+                                                          UndoRollbackHashTableSize(),
+                                                          UndoRollbackHashTableSize(), &info,
+                                                          HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE);
+
+       bh = binaryheap_allocate_shm("Undo Xid Binary Heap",
+                                                                pending_undo_queue_size,
+                                                                undo_age_comparator,
+                                                                NULL);
+
+       xid_elems = (UndoXidQueue *) ShmemInitStruct("Undo Xid Queue Elements",
+                                                                                                UndoXidQueueElemsShmSize(),
+                                                                                                &foundXidQueue);
+
+       Assert(foundXidQueue || !IsUnderPostmaster);
+
+       if (!IsUnderPostmaster)
+               memset(xid_elems, 0, sizeof(UndoXidQueue));
+
+       InitXidQueue(bh, xid_elems);
+
+       bh = binaryheap_allocate_shm("Undo Size Binary Heap",
+                                                                pending_undo_queue_size,
+                                                                undo_size_comparator,
+                                                                NULL);
+       size_elems = (UndoSizeQueue *) ShmemInitStruct("Undo Size Queue Elements",
+                                                                                                  UndoSizeQueueElemsShmSize(),
+                                                                                                  &foundSizeQueue);
+       Assert(foundSizeQueue || !IsUnderPostmaster);
+
+       if (!IsUnderPostmaster)
+               memset(size_elems, 0, sizeof(UndoSizeQueue));
+
+       InitSizeQueue(bh, size_elems);
+
+       bh = binaryheap_allocate_shm("Undo Error Binary Heap",
+                                                                pending_undo_queue_size,
+                                                                undo_err_time_comparator,
+                                                                NULL);
+
+       error_elems = (UndoErrorQueue *) ShmemInitStruct("Undo Error Queue Elements",
+                                                                                                        UndoErrorQueueElemsShmSize(),
+                                                                                                        &foundErrorQueue);
+       Assert(foundErrorQueue || !IsUnderPostmaster);
+
+       if (!IsUnderPostmaster)
+               memset(error_elems, 0, sizeof(UndoSizeQueue));
+
+       InitErrorQueue(bh, error_elems);
+}
+
+/*
+ * Returns true, if there is no pending undo apply work, false, otherwise.
+ */
+bool
+UndoWorkerQueuesEmpty(void)
+{
+       if (XidQueueIsEmpty() && SizeQueueIsEmpty())
+               return true;
+
+       return false;
+}
+
+/* Insert the request in both xid and size based queues. */
+void
+InsertRequestIntoUndoQueues(UndoRequestInfo * urinfo)
+{
+       /*
+        * This must be called after acquring RollbackRequestLock as we will
+        * insert into the binary heaps which can change.
+        */
+       Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE));
+       PushXidQueueElem(urinfo);
+       PushSizeQueueElem(urinfo);
+
+       elog(DEBUG1, "Undo action pushed Xid: " UINT64_FORMAT ", Size: " UINT64_FORMAT ", "
+                "Start: " UndoRecPtrFormat ", End: " UndoRecPtrFormat "",
+                U64FromFullTransactionId(urinfo->full_xid), urinfo->request_size,
+                urinfo->start_urec_ptr, urinfo->end_urec_ptr);
+}
+
+/* Insert the request into an error queue. */
+bool
+InsertRequestIntoErrorUndoQueue(volatile UndoRequestInfo * urinfo)
+{
+       RollbackHashEntry *rh;
+
+       LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
+
+       /* We can't insert into an error queue if it is already full. */
+       if (GetErrorQueueSize() >= pending_undo_queue_size)
+       {
+               int                     num_removed = 0;
+
+               /* Try to remove few elements */
+               num_removed = RemoveOldElemsFromErrorQueue();
+
+               if (num_removed == 0)
+               {
+                       LWLockRelease(RollbackRequestLock);
+                       return false;
+               }
+       }
+
+       /*
+        * Mark the undo request in hash table as UNDO_REQUEST_INQUEUE so that undo
+        * launcher or other undo worker can process this request.
+        */
+       rh = (RollbackHashEntry *) hash_search(RollbackHT, (void *) &urinfo->full_xid,
+                                                                                  HASH_FIND, NULL);
+       rh->status = UNDO_REQUEST_INQUEUE;
+
+       /* Insert the request into error queue for processing it later. */
+       PushErrorQueueElem(urinfo);
+       LWLockRelease(RollbackRequestLock);
+
+       elog(DEBUG1, "Undo action pushed(error) Xid: " UINT64_FORMAT ", Size: " UINT64_FORMAT ", "
+                "Start: " UndoRecPtrFormat ", End: " UndoRecPtrFormat "",
+                U64FromFullTransactionId(urinfo->full_xid), urinfo->request_size,
+                urinfo->start_urec_ptr, urinfo->end_urec_ptr);
+
+       return true;
+}
+
+/*
+ * Set the undo worker queue from which the undo worker should start looking
+ * for work.
+ */
+void
+SetUndoWorkerQueueStart(UndoWorkerQueueType undo_worker_queue)
+{
+       cur_undo_queue = undo_worker_queue;
+}
+
+/*
+ * Get the next set of pending rollback request for undo worker.
+ *
+ * allow_peek - if true, peeks a few element from each queue to check whether
+ * any request matches current dbid.
+ * remove_from_queue - if true, picks an element from the queue whose dbid
+ * matches current dbid and remove it from the queue before returning the same
+ * to caller.
+ * urinfo - this is an OUT parameter that returns the details of undo request
+ * whose undo action is still pending.
+ * in_other_db_out - this is an OUT parameter.  If we've not found any work
+ * for current database, but there is work for some other database, we set
+ * this parameter as true.
+ */
+bool
+UndoGetWork(bool allow_peek, bool remove_from_queue, UndoRequestInfo *urinfo,
+                       bool *in_other_db_out)
+{
+       int                     i;
+       bool            found_work = false;
+       bool            in_other_db = false;
+
+       /* Reset the undo request info */
+       ResetUndoRequestInfo(urinfo);
+
+       /* Search the queues under lock as they can be modified concurrently. */
+       LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
+
+       /* Here, we check each of the work queues in a round-robin way. */
+       for (i = 0; i < MAX_UNDO_WORK_QUEUES; i++)
+       {
+               RollbackHashKey hkey;
+               RollbackHashEntry *rh;
+               int                     cur_queue = (int) (cur_undo_queue % MAX_UNDO_WORK_QUEUES);
+
+               if (!GetRollbackHashKeyFromQueue(cur_queue, 0, &hkey))
+               {
+                       cur_undo_queue++;
+                       continue;
+               }
+
+               rh = (RollbackHashEntry *) hash_search(RollbackHT,
+                                                                                          (void *) &hkey,
+                                                                                          HASH_FIND, NULL);
+
+               /*
+                * If some undo worker is already processing the rollback request or
+                * it is already processed, then we drop that request from the queue
+                * and fetch the next entry from the queue.
+                */
+               if (!rh || UndoRequestIsInProgress(rh))
+               {
+                       RemoveRequestFromQueue(cur_queue, 0);
+                       cur_undo_queue++;
+                       continue;
+               }
+
+               /*
+                * The request that is present in any queue must be a valid request
+                * and its status must be in_queue.
+                */
+               Assert(UndoRequestIsValid(rh));
+               Assert(UndoRequestIsInQueue(rh));
+
+               found_work = true;
+
+               /*
+                * We've found a work for some database.  If we don't want to remove
+                * the request, we return from here and spawn a worker process to
+                * apply the same.
+                */
+               if (!remove_from_queue)
+               {
+                       bool            exists;
+
+                       StartTransactionCommand();
+                       exists = dbid_exists(rh->dbid);
+                       CommitTransactionCommand();
+
+                       /*
+                        * If the database doesn't exist, just remove the request since we
+                        * no longer need to apply the undo actions.
+                        */
+                       if (!exists)
+                       {
+                               RemoveRequestFromQueue(cur_queue, 0);
+                               RollbackHTRemoveEntry(rh->full_xid, rh->start_urec_ptr);
+                               cur_undo_queue++;
+                               continue;
+                       }
+
+                       /* set the undo request info to process */
+                       SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue);
+
+                       cur_undo_queue++;
+                       LWLockRelease(RollbackRequestLock);
+                       return true;
+               }
+
+               /*
+                * The worker can perform this request if it is either not connected
+                * to any database or the request belongs to the same database to
+                * which it is connected.
+                */
+               if ((MyDatabaseId == InvalidOid) ||
+                       (MyDatabaseId != InvalidOid && MyDatabaseId == rh->dbid))
+               {
+                       /* found a work for current database */
+                       if (in_other_db_out)
+                               *in_other_db_out = false;
+
+                       /*
+                        * Mark the undo request in hash table as in_progress so that
+                        * other undo worker doesn't pick the same entry for rollback.
+                        */
+                       rh->status = UNDO_REQUEST_INPROGRESS;
+
+                       /* set the undo request info to process */
+                       SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue);
+
+                       /*
+                        * Remove the request from queue so that other undo worker doesn't
+                        * process the same entry.
+                        */
+                       RemoveRequestFromQueue(cur_queue, 0);
+
+                       cur_undo_queue++;
+                       LWLockRelease(RollbackRequestLock);
+                       return true;
+               }
+               else
+                       in_other_db = true;
+
+               cur_undo_queue++;
+       }
+
+       /*
+        * Iff a worker would need to switch databases in less than
+        * undo_worker_quantum ms after starting, it peeks a few entries deep into
+        * each queue to see whether there's work for that database.  This ensures
+        * that one worker doesn't have to restart quickly to switch databases.
+        */
+       if (allow_peek)
+       {
+               int                     depth,
+                                       cur_queue;
+               RollbackHashKey hkey;
+               RollbackHashEntry *rh;
+
+               /*
+                * We shouldn't have come here if we've found a work above for our
+                * database.
+                */
+               Assert(!found_work || in_other_db);
+
+               for (depth = 0; depth < UNDO_PEEK_DEPTH; depth++)
+               {
+                       for (cur_queue = 0; cur_queue < MAX_UNDO_WORK_QUEUES; cur_queue++)
+                       {
+                               if (!GetRollbackHashKeyFromQueue(cur_queue, depth, &hkey))
+                                       continue;
+
+                               rh = (RollbackHashEntry *) hash_search(RollbackHT,
+                                                                                                          (void *) &hkey,
+                                                                                                          HASH_FIND, NULL);
+
+                               /*
+                                * If some undo worker is already processing the rollback
+                                * request or it is already processed, then fetch the next
+                                * entry from the queue.
+                                */
+                               if (!rh || UndoRequestIsInProgress(rh))
+                                       continue;
+
+                               /*
+                                * The request that is present in any queue must be a valid request
+                                * and its status must be in_queue.
+                                */
+                               Assert(UndoRequestIsValid(rh));
+                               Assert(UndoRequestIsInQueue(rh));
+
+                               found_work = true;
+
+                               /*
+                                * The worker can perform this request if it is either not
+                                * connected to any database or the request belongs to the
+                                * same database to which it is connected.
+                                */
+                               if ((MyDatabaseId == InvalidOid) ||
+                                       (MyDatabaseId != InvalidOid && MyDatabaseId == rh->dbid))
+                               {
+                                       /* found a work for current database */
+                                       if (in_other_db_out)
+                                               *in_other_db_out = false;
+
+                                       /*
+                                        * Mark the undo request in hash table as in_progress so
+                                        * that other undo worker doesn't pick the same entry for
+                                        * rollback.
+                                        */
+                                       rh->status = UNDO_REQUEST_INPROGRESS;
+
+                                       /* set the undo request info to process */
+                                       SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue);
+
+                                       /*
+                                        * Remove the request from queue so that other undo worker
+                                        * doesn't process the same entry.
+                                        */
+                                       RemoveRequestFromQueue(cur_queue, depth);
+                                       LWLockRelease(RollbackRequestLock);
+                                       return true;
+                               }
+                               else
+                                       in_other_db = true;
+                       }
+               }
+       }
+
+       LWLockRelease(RollbackRequestLock);
+
+       if (in_other_db_out)
+               *in_other_db_out = in_other_db;
+
+       return found_work;
+}
+
+/*
+ * This function registers the rollback requests.
+ *
+ * Returns true, if the request is registered and will be processed by undo
+ * worker at some later point of time, false, otherwise in which case caller
+ * can process the undo request by itself.
+ *
+ * The caller may execute undo actions itself if the request is not already
+ * present in rollback hash table and can't be pushed to pending undo request
+ * queues.  The two reasons why request can't be pushed are (a) the size of
+ * request is smaller than a threshold and the request is not from discard
+ * worker, (b) the undo request queues are full.
+ *
+ * It is not advisable to apply the undo actions of a very large transaction
+ * in the foreground as that can lead to a delay in retruning the control back
+ * to user after abort.
+ */
+bool
+RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr,
+                                       Oid dbid, FullTransactionId full_xid)
+{
+       bool            found = false;
+       bool            can_push;
+       bool            pushed = false;
+       RollbackHashEntry *rh;
+       uint64          req_size = 0;
+       UndoRecPtr      last_log_start_urec_ptr = InvalidUndoRecPtr;
+       RollbackHashKey hkey;
+
+       Assert(UndoRecPtrIsValid(start_urec_ptr));
+       Assert(dbid != InvalidOid);
+
+       /*
+        * Find the rollback request size and the end_urec_ptr (in case of discard
+        * worker only).
+        */
+       req_size = FindUndoEndLocationAndSize(start_urec_ptr, &end_urec_ptr,
+                                                                                 &last_log_start_urec_ptr, full_xid);
+
+       /* Do not push any rollback request if working in single user-mode */
+       if (!IsUnderPostmaster)
+               return false;
+
+       /* The transaction got rolled back. */
+       if (!UndoRecPtrIsValid(end_urec_ptr))
+               return false;
+
+       LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
+
+       /*
+        * Check whether we can push the rollback request to the undo worker. This
+        * must be done under lock, see CanPushReqToUndoWorker.
+        */
+       can_push = CanPushReqToUndoWorker(start_urec_ptr, end_urec_ptr, req_size);
+
+       hkey.full_xid = full_xid;
+       hkey.start_urec_ptr = start_urec_ptr;
+
+       rh = (RollbackHashEntry *) hash_search(RollbackHT, &hkey,
+                                                                                  HASH_ENTER_NULL, &found);
+
+       /*
+        * It can only fail, if the value of pending_undo_queue_size or
+        * max_connections guc is reduced after restart of the server.
+        */
+       if (rh == NULL)
+       {
+               Assert(RollbackHTIsFull());
+
+               ereport(PANIC,
+                               (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+                                errmsg("rollback hash table is full, try running with higher value of pending_undo_queue_size")));
+       }
+
+       /* We shouldn't try to add the same rollback request again. */
+       if (!found)
+       {
+               rh->start_urec_ptr = start_urec_ptr;
+               rh->end_urec_ptr = end_urec_ptr;
+               rh->last_log_start_urec_ptr = last_log_start_urec_ptr;
+               rh->dbid = dbid;
+               rh->full_xid = full_xid;
+
+               /* Increment the pending request counter. */
+               ProcGlobal->xactsHavingPendingUndo++;
+
+               if (can_push)
+               {
+                       UndoRequestInfo urinfo;
+
+                       ResetUndoRequestInfo(&urinfo);
+
+                       urinfo.full_xid = rh->full_xid;
+                       urinfo.start_urec_ptr = rh->start_urec_ptr;
+                       urinfo.end_urec_ptr = rh->end_urec_ptr;
+                       urinfo.last_log_start_urec_ptr = rh->last_log_start_urec_ptr;
+                       urinfo.dbid = rh->dbid;
+                       urinfo.request_size = req_size;
+
+                       InsertRequestIntoUndoQueues(&urinfo);
+
+                       /*
+                        * Indicates that the request will be processed by undo
+                        * worker.
+                        */
+                       rh->status = UNDO_REQUEST_INQUEUE;
+                       pushed = true;
+               }
+               /*
+                * The request can't be pushed into the undo worker queue.  The
+                * backends will try executing by itself.
+                */
+               else
+                       rh->status = UNDO_REQUEST_INPROGRESS;
+       }
+       else if (!UndoRequestIsValid(rh) && can_push)
+       {
+               /*
+                * If we found the request which is still not in queue or not in
+                * progress then add it to the queue if there is a space in the queue.
+                */
+               UndoRequestInfo urinfo;
+
+               ResetUndoRequestInfo(&urinfo);
+
+               urinfo.full_xid = rh->full_xid;
+               urinfo.start_urec_ptr = rh->start_urec_ptr;
+               urinfo.end_urec_ptr = rh->end_urec_ptr;
+               urinfo.last_log_start_urec_ptr = rh->last_log_start_urec_ptr;
+               urinfo.dbid = rh->dbid;
+               urinfo.request_size = req_size;
+
+               InsertRequestIntoUndoQueues(&urinfo);
+
+               /* Indicates that the request will be processed by the undo worker */
+               rh->status = UNDO_REQUEST_INQUEUE;
+               pushed = true;
+       }
+
+       LWLockRelease(RollbackRequestLock);
+
+       return pushed;
+}
+
+/*
+ * Remove the rollback request entry from the rollback hash table.
+ */
+void
+RollbackHTRemoveEntry(FullTransactionId full_xid, UndoRecPtr start_urec_ptr)
+{
+       RollbackHashKey hkey;
+
+       hkey.full_xid = full_xid;
+       hkey.start_urec_ptr = start_urec_ptr;
+
+       LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
+
+       hash_search(RollbackHT, &hkey, HASH_REMOVE, NULL);
+
+       /* Decrement the pending request counter. */
+       ProcGlobal->xactsHavingPendingUndo--;
+       LWLockRelease(RollbackRequestLock);
+}
+
+/*
+ * Mark the entry status as invalid in the rollback hash table.
+ */
+void
+RollbackHTMarkEntryInvalid(FullTransactionId full_xid,
+                                                  UndoRecPtr start_urec_ptr)
+{
+       RollbackHashKey hkey;
+       RollbackHashEntry *rh;
+
+       hkey.full_xid = full_xid;
+       hkey.start_urec_ptr = start_urec_ptr;
+
+       LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
+
+       rh = (RollbackHashEntry *) hash_search(RollbackHT, &hkey, HASH_FIND, NULL);
+       Assert(rh != NULL);
+       rh->status = UNDO_REQUEST_INVALID;
+
+       LWLockRelease(RollbackRequestLock);
+}
+
+/*
+ * Returns the start undo record pointer for the last undo log in which
+ * transaction has spanned.  This will be different from start_urec_ptr only
+ * when the undo for a transaction has spanned across multiple undo logs.
+ */
+UndoRecPtr
+RollbackHTGetLastLogStartUrp(FullTransactionId full_xid,
+                                                        UndoRecPtr start_urec_ptr)
+{
+       RollbackHashKey hkey;
+       RollbackHashEntry *rh;
+       UndoRecPtr      last_log_start_urecptr;
+
+       hkey.full_xid = full_xid;
+       hkey.start_urec_ptr = start_urec_ptr;
+
+       LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
+
+       rh = (RollbackHashEntry *) hash_search(RollbackHT, &hkey, HASH_FIND, NULL);
+       Assert(rh != NULL);
+       last_log_start_urecptr = rh->last_log_start_urec_ptr;
+       LWLockRelease(RollbackRequestLock);
+
+       return last_log_start_urecptr;
+}
+
+/*
+ * Returns true, if the rollback hash table is full, false, otherwise.
+ */
+bool
+RollbackHTIsFull(void)
+{
+       bool            result = false;
+
+       LWLockAcquire(RollbackRequestLock, LW_SHARED);
+
+       if (hash_get_num_entries(RollbackHT) >= UndoRollbackHashTableSize())
+               result = true;
+
+       LWLockRelease(RollbackRequestLock);
+
+       return result;
+}
+
+/*
+ * Get the smallest of 'xid having pending undo' and 'oldestXmin'.
+ */
+FullTransactionId
+RollbackHTGetOldestFullXid(FullTransactionId oldestXmin)
+{
+       RollbackHashEntry   *rh;
+       FullTransactionId       oldestXid = oldestXmin;
+       HASH_SEQ_STATUS         status;
+
+       /* Fetch the pending undo requests */
+       LWLockAcquire(RollbackRequestLock, LW_SHARED);
+
+       Assert(hash_get_num_entries(RollbackHT) <= UndoRollbackHashTableSize());
+       hash_seq_init(&status, RollbackHT);
+       while (RollbackHT != NULL &&
+                  (rh = (RollbackHashEntry *) hash_seq_search(&status)) != NULL)
+       {
+               if (!FullTransactionIdIsValid(oldestXid) ||
+                       FullTransactionIdPrecedes(rh->full_xid, oldestXid))
+                       oldestXid = rh->full_xid;
+       }
+
+       LWLockRelease(RollbackRequestLock);
+
+       return oldestXid;
+}
index 4b42a1cf0ba16e75d1ad189433ee9679d809ca51..19e4f1f0c4add678c1bf8414dd7b2dda4d4a3212 100644 (file)
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock                                42
 LogicalRepWorkerLock                           43
 CLogTruncationLock                                     44
 UndoLogLock                                      45
+RollbackRequestLock                                    46
index 498373fd0ee1b4b40f3abeeffd149bfa1ddaa1dc..884fa2af52f50d4815681c0977a22e03099420f8 100644 (file)
@@ -296,6 +296,8 @@ InitProcGlobal(void)
        /* Create ProcStructLock spinlock, too */
        ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
        SpinLockInit(ProcStructLock);
+
+       ProcGlobal->xactsHavingPendingUndo = 0;
 }
 
 /*
index 5d9af8903675d3eebd285c240f82d2e569f08348..ede9c510ea0582ebbd65e353bbd5d7617bbb9bc2 100644 (file)
@@ -1085,6 +1085,20 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
                CommitTransactionCommand();
 }
 
+/*
+ * Check whether the dbid exist or not.
+ */
+bool
+dbid_exists(Oid dboid)
+{
+       bool            result = false;
+
+       Assert(IsTransactionState());
+       result = (GetDatabaseTupleByOid(dboid) != NULL);
+
+       return result;
+}
+
 /*
  * Process any command-line switches and any additional GUC variable
  * settings passed in the startup packet.
index 296fb771668e5ece82827c6d869e8857d6eab115..a7d1db52493ea71ffe09874155f992f2a7819a43 100644 (file)
@@ -32,6 +32,7 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/undorequest.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/namespace.h"
@@ -3040,6 +3041,27 @@ static struct config_int ConfigureNamesInt[] =
                check_autovacuum_work_mem, NULL, NULL
        },
 
+       {
+               {"rollback_overflow_size", PGC_USERSET, RESOURCES_MEM,
+                       gettext_noop("Rollbacks greater than this size are done lazily"),
+                       NULL,
+                       GUC_UNIT_MB
+               },
+               &rollback_overflow_size,
+               64, 0, MAX_KILOBYTES,
+               NULL, NULL, NULL
+       },
+
+       {
+               {"pending_undo_queue_size", PGC_POSTMASTER, RESOURCES_MEM,
+                       gettext_noop("Sets the size of queue used to register undo requests"),
+                       NULL,
+               },
+               &pending_undo_queue_size,
+               1024, 0, INT_MAX,
+               NULL, NULL, NULL
+       },
+
        {
                {"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
                        gettext_noop("Time before a snapshot is too old to read pages changed after the snapshot was taken."),
index cfad86c02a070264dde3dc54634c90c1f021ccd9..592f6e1b4aa0fb73757ce486d065081dca6bbf52 100644 (file)
                                        # autovacuum, -1 means use
                                        # vacuum_cost_limit
 
+#------------------------------------------------------------------------------
+# UNDO options
+#------------------------------------------------------------------------------
+
+#rollback_overflow_size = 64   # default size above which the undo
+                                       # requests are pushed to undo workers
+#pending_undo_queue_size = 1024        # size of queue used to register undo
+                                       # requests
 
 #------------------------------------------------------------------------------
 # CLIENT CONNECTION DEFAULTS
index cc005096991f428220eda16210998b8ebac37301..01f248a41eb73ade450664a933a5549313648390 100644 (file)
@@ -49,6 +49,7 @@
 #define U64FromFullTransactionId(x)            ((x).value)
 #define FullTransactionIdEquals(a, b)  ((a).value == (b).value)
 #define FullTransactionIdPrecedes(a, b)        ((a).value < (b).value)
+#define FullTransactionIdFollows(a, b) ((a).value > (b).value)
 #define FullTransactionIdIsValid(x)            TransactionIdIsValid(XidFromFullTransactionId(x))
 #define InvalidFullTransactionId               FullTransactionIdFromEpochAndXid(0, InvalidTransactionId)
 
index f7cfa9f6becc38926dbdaf1864f637b1233994c6..24ea97b8d3888d6d34fa62ce6803e9de9f1838fc 100644 (file)
@@ -117,5 +117,8 @@ extern void UndoLogBuffersSetLSN(UndoRecordInsertContext *context,
                                                                 XLogRecPtr recptr);
 extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
                                                                                UndoLogCategory category);
+extern UndoRecPtr UndoBlockGetFirstUndoRecord(BlockNumber blkno,
+                                                                                         UndoRecPtr urec_ptr,
+                                                                                         UndoLogCategory category);
 
 #endif                                                 /* UNDOINSERT_H */
diff --git a/src/include/access/undorequest.h b/src/include/access/undorequest.h
new file mode 100644 (file)
index 0000000..defc810
--- /dev/null
@@ -0,0 +1,230 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorequest.h
+ *       Exports from undo/undorequest.c.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ *
+ * src/include/access/undorequest.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _UNDOREQUEST_H
+#define _UNDOREQUEST_H
+
+#include "access/transam.h"
+#include "access/undoaccess.h"
+#include "datatype/timestamp.h"
+#include "utils/relcache.h"
+
+
+/* different types of undo worker */
+typedef enum
+{
+       XID_QUEUE = 0,
+       SIZE_QUEUE = 1,
+       ERROR_QUEUE
+} UndoWorkerQueueType;
+
+#define InvalidUndoWorkerQueue -1
+
+extern PGDLLIMPORT int rollback_overflow_size;
+extern PGDLLIMPORT int pending_undo_queue_size;
+
+/*
+ * Current status of the undo request in the hash table.
+ */
+typedef enum
+{
+       /*
+        * Request is present in the rollback hash table, but not present in any
+        * of the queues.  In this state, the undo actions can't be executed.
+        *
+        * The request will be marked with this status if a) discard worker finds
+        * that there is no space in the undo worker queue for inserting the undo
+        * request, b) there is an error while backend or undo worker is
+        * executing undo actions and there is no space in the error queue.
+        *
+        * Later when the discard worker finds such entry and if there is a
+        * sufficient space in the undo worker queues, then the request will be
+        * added to them and the status will be changed to UNDO_REQUEST_INQUEUE.
+        *
+        * It is important to keep the request in hash table with this status
+        * intsead of removing it to compute the value of
+        * oldestXidHavingUnappliedUndo.  If we don't do that, then the
+        * corresponding xid won't be considered for computation of
+        * oldestXidHavingUnappliedUndo.
+        */
+       UNDO_REQUEST_INVALID,
+
+       /*
+        * When backend or discard worker push the request to undo worker queue the
+        * status will be set to this.  Undo workers pulls such requests from the
+        * queues, change the state as UNDO_REQUEST_INPROGRESS and process the undo
+        * actions.
+        */
+       UNDO_REQUEST_INQUEUE,
+
+       /*
+        * Undo action execution is in progress either by backend or by undo worker.
+        */
+       UNDO_REQUEST_INPROGRESS
+} UndoRequestStatus;
+
+/*
+ * UndoRequestIsValid
+ *             True iff undo request status is not invalid.
+ */
+#define UndoRequestIsValid(rh) \
+       ((bool) ((rh->status) != UNDO_REQUEST_INVALID))
+
+ /*
+  * UndoRequestIsInProgress
+  *            True iff undo request status is in progress.
+  */
+#define UndoRequestIsInProgress(rh) \
+       ((bool) ((rh->status) == UNDO_REQUEST_INPROGRESS))
+
+/*
+ * UndoRequestIsInQueue
+ *             True iff undo request status is in queue.
+ */
+#define UndoRequestIsInQueue(rh) \
+       ((bool) ((rh->status) == UNDO_REQUEST_INQUEUE))
+
+/* This is the data structure for each hash table entry for rollbacks. */
+typedef struct RollbackHashEntry
+{
+       FullTransactionId full_xid; /* must be first entry */
+       UndoRecPtr      start_urec_ptr;
+       UndoRecPtr      end_urec_ptr;
+       UndoRecPtr      last_log_start_urec_ptr;
+       Oid                     dbid;
+       UndoRequestStatus       status; /* current state of the entry. */
+} RollbackHashEntry;
+
+/*
+ * This is the data structure for each hash table key for rollbacks.  We need
+ * to keep start_urec_ptr as a key element because in the same transaction,
+ * there could be rollback requests for both logged and unlogged relations.
+ */
+typedef struct RollbackHashKey
+{
+       FullTransactionId full_xid;
+       UndoRecPtr      start_urec_ptr;
+} RollbackHashKey;
+
+/* This is an entry for undo request queue that is sorted by xid. */
+typedef struct UndoXidQueue
+{
+       FullTransactionId full_xid;
+       UndoRecPtr      start_urec_ptr;
+       Oid                     dbid;
+} UndoXidQueue;
+
+/* This is an entry for undo request queue that is sorted by size. */
+typedef struct UndoSizeQueue
+{
+       FullTransactionId full_xid;
+       UndoRecPtr      start_urec_ptr;
+       Oid                     dbid;
+       uint64          request_size;
+} UndoSizeQueue;
+
+/*
+ * This is an entry for undo request queue that is sorted by time at which an
+ * error has occurred.
+ */
+typedef struct UndoErrorQueue
+{
+       FullTransactionId full_xid;
+       UndoRecPtr      start_urec_ptr;
+       Oid                     dbid;
+       TimestampTz next_retry_at;
+       TimestampTz err_occurred_at;
+} UndoErrorQueue;
+
+/* undo request information */
+typedef struct UndoRequestInfo
+{
+       FullTransactionId full_xid;
+       UndoRecPtr      start_urec_ptr;
+       UndoRecPtr      end_urec_ptr;
+       UndoRecPtr      last_log_start_urec_ptr;
+       Oid                     dbid;
+       uint64          request_size;
+       UndoWorkerQueueType undo_worker_queue;
+} UndoRequestInfo;
+
+/* Reset the undo request info */
+#define ResetUndoRequestInfo(urinfo) \
+( \
+       (urinfo)->full_xid = InvalidFullTransactionId, \
+       (urinfo)->start_urec_ptr = InvalidUndoRecPtr, \
+       (urinfo)->end_urec_ptr = InvalidUndoRecPtr, \
+       (urinfo)->last_log_start_urec_ptr = InvalidUndoRecPtr, \
+       (urinfo)->dbid = InvalidOid, \
+       (urinfo)->request_size = 0, \
+       (urinfo)->undo_worker_queue = InvalidUndoWorkerQueue \
+)
+
+/* set the undo request info from the rollback request */
+#define SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue) \
+( \
+       urinfo->full_xid = rh->full_xid, \
+       urinfo->start_urec_ptr = rh->start_urec_ptr, \
+       urinfo->end_urec_ptr = rh->end_urec_ptr, \
+       urinfo->last_log_start_urec_ptr = rh->last_log_start_urec_ptr, \
+       urinfo->dbid = rh->dbid, \
+       urinfo->undo_worker_queue = cur_queue \
+)
+
+/*
+ * From an undo log if all the undo actions have been applied for a particular
+ * transaction, we set the uur_progress of the transaction's log in that undo
+ * log as MaxBlockNumber.  If none of the undo actions have yet been applied,
+ * we set it to InvalidBlockNumber.
+ */
+#define XACT_APPLY_PROGRESS_COMPLETED MaxBlockNumber
+#define XACT_APPLY_PROGRESS_NOT_STARTED InvalidBlockNumber
+
+#define IsXactApplyProgressCompleted(uur_progress) \
+       (uur_progress == XACT_APPLY_PROGRESS_COMPLETED)
+
+#define IsXactApplyProgressNotStarted(uur_progress) \
+       (uur_progress == XACT_APPLY_PROGRESS_NOT_STARTED)
+
+/* Exposed functions for rollback request queues. */
+extern int     PendingUndoShmemSize(void);
+extern void PendingUndoShmemInit(void);
+extern bool UndoWorkerQueuesEmpty(void);
+extern void InsertRequestIntoUndoQueues(UndoRequestInfo *urinfo);
+extern bool InsertRequestIntoErrorUndoQueue(volatile UndoRequestInfo *urinfo);
+extern void SetUndoWorkerQueueStart(UndoWorkerQueueType undo_worker_queue);
+extern bool UndoGetWork(bool allow_peek, bool is_undo_launcher,
+                       UndoRequestInfo *urinfo, bool *in_other_db);
+
+/* Exposed functions for rollback hash table. */
+extern uint64 FindUndoEndLocationAndSize(UndoRecPtr start_urecptr,
+                                                       UndoRecPtr *end_urecptr_out,
+                                                       UndoRecPtr *last_log_start_urecptr_out,
+                                                       FullTransactionId full_xid);
+extern bool RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr,
+                                       Oid dbid, FullTransactionId full_xid);
+extern void RollbackHTRemoveEntry(FullTransactionId full_xid, UndoRecPtr start_urec_ptr);
+extern bool RollbackHTIsFull(void);
+extern int UndoRollbackHashTableSize(void);
+extern void RollbackHTMarkEntryInvalid(FullTransactionId full_xid,
+                                                               UndoRecPtr start_urec_ptr);
+extern UndoRecPtr RollbackHTGetLastLogStartUrp(FullTransactionId full_xid,
+                                                                                          UndoRecPtr start_urec_ptr);
+extern FullTransactionId RollbackHTGetOldestFullXid(FullTransactionId oldestXmin);
+
+/* functions exposed from undoaction.c */
+extern void execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr,
+                                        UndoRecPtr to_urecptr, bool nopartial);
+extern bool execute_undo_actions_page(UndoRecInfo *urp_array, int first_idx,
+                                                 int last_idx, Oid reloid, FullTransactionId full_xid,
+                                                 BlockNumber blkno, bool blk_chain_complete);
+
+#endif                                                 /* _UNDOREQUEST_H */
index 61a24c2e3c67722a6eea5eca570b6c9ad6e40a08..1afc4d3b5d39ed256df68259ce946de4fc3049b3 100644 (file)
@@ -426,6 +426,7 @@ extern void pg_split_opts(char **argv, int *argcp, const char *optstr);
 extern void InitializeMaxBackends(void);
 extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
                                                 Oid useroid, char *out_dbname, bool override_allow_connections);
+extern bool dbid_exists(Oid dboid);
 extern void BaseInit(void);
 
 /* in utils/init/miscinit.c */
index ac7ee72952e18ca04e5b36380adc5e0d1f355136..824f6bf232fbb6b702de8ae3009edb5d5da8d5d3 100644 (file)
@@ -272,6 +272,8 @@ typedef struct PROC_HDR
        int                     startupProcPid;
        /* Buffer id of the buffer that Startup process waits for pin on, or -1 */
        int                     startupBufferPinWaitBufId;
+       /* Number of aborted transactions with pending undo actions. */
+       int                     xactsHavingPendingUndo;
 } PROC_HDR;
 
 extern PGDLLIMPORT PROC_HDR *ProcGlobal;