Allow execution and discard of undo by background workers.
authorAmit Kapila <[email protected]>
Thu, 13 Jun 2019 10:33:49 +0000 (16:03 +0530)
committerKuntal Ghosh <[email protected]>
Fri, 19 Jul 2019 08:48:51 +0000 (14:18 +0530)
Undo launcher is responsible for launching the workers iff there is some
work available in one of the work queues and there are more workers
available.  The worker is launched to handle requests for a particular
database.

The discard worker is responsible for discarding the undo log of
transactions that are committed and all-visible or are rolled-back.  It
also registers the request for aborted transactions in the work queues.
It iterates through all the active logs one-by-one and tries to discard the
transactions that are old enough to matter.

We don't allow any transaction older than 2^31 to have pending undo actions.
Also, we have a hard limit on the number of transactions that can have
pending undo which is proportional to pending_undo_queue_size.

Amit Kapila, Dilip Kumar and Kuntal Ghosh with inputs from Andres Freund,
Robert Haas and Thomas Munro.

34 files changed:
src/backend/access/rmgrdesc/xlogdesc.c
src/backend/access/transam/varsup.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/access/undo/Makefile
src/backend/access/undo/README.UndoProcessing
src/backend/access/undo/discardworker.c [new file with mode: 0644]
src/backend/access/undo/undoaccess.c
src/backend/access/undo/undodiscard.c [new file with mode: 0644]
src/backend/access/undo/undolog.c
src/backend/access/undo/undorequest.c
src/backend/access/undo/undoworker.c [new file with mode: 0644]
src/backend/commands/tablecmds.c
src/backend/postmaster/bgworker.c
src/backend/postmaster/pgstat.c
src/backend/postmaster/postmaster.c
src/backend/storage/ipc/ipci.c
src/backend/storage/lmgr/lwlocknames.txt
src/backend/storage/lmgr/proc.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/access/discardworker.h [new file with mode: 0644]
src/include/access/transam.h
src/include/access/undodiscard.h [new file with mode: 0644]
src/include/access/undolog.h
src/include/access/undoworker.h [new file with mode: 0644]
src/include/catalog/pg_control.h
src/include/nodes/primnodes.h
src/include/pgstat.h
src/include/postmaster/postmaster.h
src/include/storage/lwlock.h
src/include/storage/proc.h
src/include/storage/procarray.h
src/test/regress/expected/sysviews.out

index 33060f30429b0a7b700b848e41114d15679d46e4..4b00d7d83f7a12fd5b7e031f9d5ad42e8c87f543 100644 (file)
@@ -48,7 +48,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
                                                 "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; "
                                                 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
                                                 "oldest/newest commit timestamp xid: %u/%u; "
-                                                "oldest running xid %u; %s",
+                                                "oldest running xid %u; "
+                                                "oldest full xid having unapplied undo " UINT64_FORMAT "; %s",
                                                 (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
                                                 checkpoint->ThisTimeLineID,
                                                 checkpoint->PrevTimeLineID,
@@ -65,6 +66,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
                                                 checkpoint->oldestCommitTsXid,
                                                 checkpoint->newestCommitTsXid,
                                                 checkpoint->oldestActiveXid,
+                                                U64FromFullTransactionId(checkpoint->oldestFullXidHavingUnappliedUndo),
                                                 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
        }
        else if (info == XLOG_NEXTOID)
index fd019893020b3193057618504232eee041d900e6..e74155d1627fbec110c2520220412cfa0bef66a7 100644 (file)
@@ -127,14 +127,16 @@ GetNewTransactionId(bool isSubXact)
                                                 errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
                                                                oldest_datname),
                                                 errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
-                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+                                                                "increase max_undo_workers to allow execution of pending undo.")));
                        else
                                ereport(ERROR,
                                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                                                 errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
                                                                oldest_datoid),
                                                 errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
-                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+                                                                "increase max_undo_workers to allow execution of pending undo.")));
                }
                else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
                {
@@ -147,14 +149,16 @@ GetNewTransactionId(bool isSubXact)
                                                                oldest_datname,
                                                                xidWrapLimit - xid),
                                                 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
-                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+                                                                "increase max_undo_workers to allow execution of pending undo.")));
                        else
                                ereport(WARNING,
                                                (errmsg("database with OID %u must be vacuumed within %u transactions",
                                                                oldest_datoid,
                                                                xidWrapLimit - xid),
                                                 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
-                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+                                                                "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+                                                                "increase max_undo_workers to allow execution of pending undo.")));
                }
 
                /* Re-acquire lock and start over */
@@ -334,9 +338,23 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
        TransactionId xidStopLimit;
        TransactionId xidWrapLimit;
        TransactionId curXid;
+       TransactionId oldestXidHavingUndo;
+       FullTransactionId oldestFullXidHavingUndo;
 
        Assert(TransactionIdIsNormal(oldest_datfrozenxid));
 
+       /*
+        * To determine the last safe xid that can be allocated, we need to
+        * consider oldestXidHavingUnapplied Undo because this is the oldest xid
+        * whose undo is not yet discarded so this is still a valid xid in the
+        * system.
+        */
+       oldestFullXidHavingUndo =
+               FullTransactionIdFromU64(pg_atomic_read_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo));
+       oldestXidHavingUndo = XidFromFullTransactionId(oldestFullXidHavingUndo);
+       if (TransactionIdIsValid(oldestXidHavingUndo))
+               oldest_datfrozenxid = Min(oldest_datfrozenxid, oldestXidHavingUndo);
+
        /*
         * The place where we actually get into deep trouble is halfway around
         * from the oldest potentially-existing XID.  (This calculation is
@@ -433,6 +451,9 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
                 * Note: it's also possible that get_database_name fails and returns
                 * NULL, for example because the database just got dropped.  We'll
                 * still warn, even though the warning might now be unnecessary.
+                *
+                * XXX Can we easily distinguish that the problem is due to unapplied
+                * undo or some old open transactions?
                 */
                if (IsTransactionState())
                        oldest_datname = get_database_name(oldest_datoid);
@@ -445,14 +466,16 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
                                                        oldest_datname,
                                                        xidWrapLimit - curXid),
                                         errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
-                                                        "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+                                                        "You might also need to commit or roll back old prepared transactions, or drop stale replication slots, or\n"
+                                                        "increase max_undo_workers to allow execution of pending undo.")));
                else
                        ereport(WARNING,
                                        (errmsg("database with OID %u must be vacuumed within %u transactions",
                                                        oldest_datoid,
                                                        xidWrapLimit - curXid),
                                         errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
-                                                        "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+                                                        "You might also need to commit or roll back old prepared transactions, or drop stale replication slots, or\n"
+                                                        "increase max_undo_workers to allow execution of pending undo.")));
        }
 }
 
index d760796475ff1be211b92f42df9c6fda20ac527d..91ad62cde1d964c791c49b0250bf49ad1080b1ee 100644 (file)
@@ -26,6 +26,7 @@
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/undodiscard.h"
 #include "access/undorequest.h"
 #include "access/xact.h"
 #include "access/xlog.h"
@@ -2274,6 +2275,10 @@ CommitTransaction(void)
        AtEOXact_ApplyLauncher(true);
        pgstat_report_xact_timestamp(0);
 
+       /* In single user mode, discard all the undo logs, once committed. */
+       if (!IsUnderPostmaster)
+               UndoLogDiscardAll();
+
        CurrentResourceOwner = NULL;
        ResourceOwnerDelete(TopTransactionResourceOwner);
        s->curTransactionOwner = NULL;
index 5dbe485af238b050300f1cadf430dbec82b4016d..99e4322e9b8b5c318b9d2fd680cb2a9dc679282c 100644 (file)
@@ -5159,6 +5159,7 @@ BootStrapXLOG(void)
        checkPoint.newestCommitTsXid = InvalidTransactionId;
        checkPoint.time = (pg_time_t) time(NULL);
        checkPoint.oldestActiveXid = InvalidTransactionId;
+       checkPoint.oldestFullXidHavingUnappliedUndo = InvalidFullTransactionId;
 
        ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
        ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -6622,6 +6623,9 @@ StartupXLOG(void)
                        (errmsg_internal("commit timestamp Xid oldest/newest: %u/%u",
                                                         checkPoint.oldestCommitTsXid,
                                                         checkPoint.newestCommitTsXid)));
+       ereport(DEBUG1,
+                       (errmsg_internal("oldest xid with epoch having undo: " UINT64_FORMAT,
+                                                        U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo))));
        if (!TransactionIdIsNormal(XidFromFullTransactionId(checkPoint.nextFullXid)))
                ereport(PANIC,
                                (errmsg("invalid next transaction ID")));
@@ -6638,6 +6642,10 @@ StartupXLOG(void)
                                         checkPoint.newestCommitTsXid);
        XLogCtl->ckptFullXid = checkPoint.nextFullXid;
 
+       /* Read oldest xid having undo from checkpoint and set in proc global. */
+       pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+               U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo));
+
        /*
         * Initialize replication slots, before there's a chance to remove
         * required resources.
@@ -7326,7 +7334,13 @@ StartupXLOG(void)
         * end-of-recovery steps fail.
         */
        if (InRecovery)
+       {
                ResetUnloggedRelations(UNLOGGED_RELATION_INIT);
+               ResetUndoLogs(UNDO_UNLOGGED);
+       }
+
+       /* Always reset temporary undo logs. */
+       ResetUndoLogs(UNDO_TEMP);
 
        /*
         * We don't need the latch anymore. It's not strictly necessary to disown
@@ -8723,6 +8737,10 @@ CreateCheckPoint(int flags)
                checkPoint.nextOid += ShmemVariableCache->oidCount;
        LWLockRelease(OidGenLock);
 
+       checkPoint.oldestFullXidHavingUnappliedUndo =
+               FullTransactionIdFromU64(pg_atomic_read_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo));
+
+
        MultiXactGetCheckptMulti(shutdown,
                                                         &checkPoint.nextMulti,
                                                         &checkPoint.nextMultiOffset,
@@ -9635,6 +9653,9 @@ xlog_redo(XLogReaderState *record)
                MultiXactAdvanceOldest(checkPoint.oldestMulti,
                                                           checkPoint.oldestMultiDB);
 
+               pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+                       U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo));
+
                /*
                 * No need to set oldestClogXid here as well; it'll be set when we
                 * redo an xl_clog_truncate if it changed since initialization.
@@ -9692,12 +9713,17 @@ xlog_redo(XLogReaderState *record)
 
                /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
                ControlFile->checkPointCopy.nextFullXid = checkPoint.nextFullXid;
+               ControlFile->checkPointCopy.oldestFullXidHavingUnappliedUndo =
+                       checkPoint.oldestFullXidHavingUnappliedUndo;
 
                /* Update shared-memory copy of checkpoint XID/epoch */
                SpinLockAcquire(&XLogCtl->info_lck);
                XLogCtl->ckptFullXid = checkPoint.nextFullXid;
                SpinLockRelease(&XLogCtl->info_lck);
 
+               ControlFile->checkPointCopy.oldestFullXidHavingUnappliedUndo =
+                       checkPoint.oldestFullXidHavingUnappliedUndo;
+
                /*
                 * We should've already switched to the new TLI before replaying this
                 * record.
@@ -9737,6 +9763,9 @@ xlog_redo(XLogReaderState *record)
                MultiXactAdvanceNextMXact(checkPoint.nextMulti,
                                                                  checkPoint.nextMultiOffset);
 
+               pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+                       U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo));
+
                /*
                 * NB: This may perform multixact truncation when replaying WAL
                 * generated by an older primary.
index 68696bc81a8402f5d1ece9b0f466a35be74b77bf..b4e7bab7d401f72cd1e080fd6d7818193d34a25b 100644 (file)
@@ -12,7 +12,7 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undoaccess.o undoaction.o undoactionxlog.o undolog.o undorecord.o \
-          undorequest.o
+OBJS = discardworker.o undoaccess.o undoaction.o undoactionxlog.o undodiscard.o \
+          undolog.o undorecord.o undorequest.o undoworker.o
 
 include $(top_srcdir)/src/backend/common.mk
index e0caf9efeb629bd915fb163944ca325cd94fbaaa..b7817cf71206e9ad26b6648034f7817c66541c93 100644 (file)
@@ -37,3 +37,81 @@ tables are only accessible in the backend that has created them.  We can't
 postpone applying undo actions for subtransactions as the modifications
 made by aborted subtransaction must not be visible even if the main transaction
 commits.
+
+Undo Requests and Undo workers
+-------------------------------
+To improve the efficiency of the rollbacks, we create three queues and a hash
+table for the rollback requests.  A Xid based priority queue will allow us to
+process the requests of older transactions and help us to move
+oldesdXidHavingUnappliedUndo (this is a xid-horizon below which all the
+transactions are visible) forward.  A size-based queue which will help us to
+perform the rollbacks of larger aborts in a timely fashion, so that we don't get
+stuck while processing them during discard of the logs.  An error queue to hold
+the requests for transactions that failed to apply its undo.  The rollback hash
+table is used to avoid duplicate undo requests by backends and discard worker.
+The table must be able to accommodate all active undo requests.  The undo
+requests must appear in both xid and size requests queues or neither.  As of now,
+we process the requests from these queues in a round-robin fashion to give equal
+priority to all three types of requests.
+
+Note that, if the request queues are full, then we put backpressure on backends
+to complete the requests by themselves.  There is an exception to it where when
+error queue becomes full, we just mark the request as 'invalid' and continue to
+process other requests if any.  The discard worker will find this errored
+transaction at later point of time and again add it to the request queues.
+
+We have the hard limit (proportional to the size of the rollback hash table)
+for the number of transactions that can have pending undo.  This can help us
+in computing the value of oldestXidHavingUnappliedUndo and allowing us not to
+accumulate pending undo for a long time which will eventually block the
+discard of undo.
+
+In a running system, scanning the rollback hash table will give us the value of
+oldestXidHavingUnappliedUndo, however, after startup, we need to once scan all
+the undo logs and populate the rollback hash table.  After startup, we allow
+connections, but don't allow transactions that want to write undo till the
+rollback hash table is initialized.
+
+To process the request, we get the request from one of the queues, search it in
+hash table and mark it as in-progress and then remove from the respective queue.
+After that, we perform the request which means apply the undo actions and
+remove it from the hash table.
+
+To apply the undo actions, we collect the undo records in bulk and try to
+process them together.  We ensure to update the transaction's progress at
+regular intervals so that after a crash we can skip already applied undo.  The
+undo apply progress is updated in terms of the number of blocks processed.
+Undo apply progress value XACT_APPLY_PROGRESS_COMPLETED indicates that all the
+undo is applied, XACT_APPLY_PROGRESS_NOT_STARTED indicates that no undo action
+has been applied yet and any other value indicates that we have applied undo
+partially and after crash recovery, we need to start processing the undo from
+the same location.
+
+Undo launcher is responsible for launching the workers iff there is some work
+available in one of the work queues and there are more workers available.  The
+worker is launched to handle requests for a particular database.  Each undo
+worker then start reading from one of the queues the requests for that
+particular database.  A worker would peek into each queue for the requests from
+a particular database, if it needs to switch a database in less than
+undo_worker_quantum ms (10s as default) after starting.  Also, if there is no
+work, it lingers for UNDO_WORKER_LINGER_MS (10s as default).  This avoids
+restarting the workers too frequently.
+
+Discard Worker
+---------------
+The discard worker is responsible for discarding the undo log of transactions
+that are committed and all-visible or are rolled-back.  It also registers the
+request for aborted transactions in the work queues.  It iterates through all
+the active logs one-by-one and try to discard the transactions that are old
+enough to matter.
+
+For transactions that span across multiple logs, the log for committed and
+all-visible transactions are discarded separately for each log.  This is
+possible as the transactions that span across logs have separate transaction
+header for each log.  For aborted transactions, we try to process the actions
+of the entire transaction at one-shot as we need to perform the actions
+starting from end location to start location.  However, it is possible that the
+later portion of the transaction that is overflowed into a separate log can be
+processed separately if we encounter the corresponding log first.  If we want
+we can combine the log for processing in that case as well, but there is no
+clear advantage of the same.
diff --git a/src/backend/access/undo/discardworker.c b/src/backend/access/undo/discardworker.c
new file mode 100644 (file)
index 0000000..6fb9c70
--- /dev/null
@@ -0,0 +1,215 @@
+/*-------------------------------------------------------------------------
+ *
+ * discardworker.c
+ *       The undo discard worker for asynchronous undo management.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/postmaster/discardworker.c
+ *
+ * The main responsibility of the discard worker is to discard the undo log
+ * of transactions that are committed and all-visible or are rolledback.  It
+ * also registers the request for aborted transactions in the work queues.
+ * To know more about work queues, see undorequest.c.  It iterates through all
+ * the active logs one-by-one and try to discard the transactions that are old
+ * enough to matter.
+ *
+ * For tranasctions that spans across multiple logs, the log for committed and
+ * all-visible transactions are discarded seprately for each log.  This is
+ * possible as the transactions that span across logs have separate transaction
+ * header for each log.  For aborted transactions, we try to process the actions
+ * of entire transaction at one-shot as we need to perform the actions starting
+ * from end location to start location.  However, it is possbile that the later
+ * portion of transaction that is overflowed into a separate log can be processed
+ * separately if we encounter the corresponding log first.  If we want we can
+ * combine the log for processing in that case as well, but there is no clear
+ * advantage of the same.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include <unistd.h>
+
+#include "access/undodiscard.h"
+#include "access/discardworker.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "tcop/tcopprot.h"
+#include "utils/guc.h"
+#include "utils/resowner.h"
+
+static void undoworker_sigterm_handler(SIGNAL_ARGS);
+
+/* minimum and maximum sleep time for discard worker */
+#define MIN_NAPTIME_PER_CYCLE 100L
+#define DELAYED_NAPTIME 10 * MIN_NAPTIME_PER_CYCLE
+#define MAX_NAPTIME_PER_CYCLE 100 * MIN_NAPTIME_PER_CYCLE
+
+static volatile sig_atomic_t got_SIGTERM = false;
+static bool hibernate = false;
+static long wait_time = MIN_NAPTIME_PER_CYCLE;
+static bool am_discard_worker = false;
+
+/* SIGTERM: set flag to exit at next convenient time */
+static void
+undoworker_sigterm_handler(SIGNAL_ARGS)
+{
+       int                     save_errno = errno;
+
+       got_SIGTERM = true;
+
+       /* Waken anything waiting on the process latch */
+       SetLatch(MyLatch);
+
+       errno = save_errno;
+}
+
+/*
+ * DiscardWorkerRegister -- Register a undo discard worker.
+ */
+void
+DiscardWorkerRegister(void)
+{
+       BackgroundWorker bgw;
+
+       memset(&bgw, 0, sizeof(bgw));
+       bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+               BGWORKER_BACKEND_DATABASE_CONNECTION;
+       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       snprintf(bgw.bgw_name, BGW_MAXLEN, "discard worker");
+       sprintf(bgw.bgw_library_name, "postgres");
+       sprintf(bgw.bgw_function_name, "DiscardWorkerMain");
+       bgw.bgw_restart_time = 5;
+       bgw.bgw_notify_pid = 0;
+       bgw.bgw_main_arg = (Datum) 0;
+
+       RegisterBackgroundWorker(&bgw);
+}
+
+/*
+ * DiscardWorkerMain -- Main loop for the undo discard worker.
+ */
+void
+DiscardWorkerMain(Datum main_arg)
+{
+       ereport(LOG,
+                       (errmsg("discard worker started")));
+
+       /* Establish signal handlers. */
+       pqsignal(SIGTERM, undoworker_sigterm_handler);
+       BackgroundWorkerUnblockSignals();
+
+       am_discard_worker = true;
+
+       /* Make it easy to identify our processes. */
+       SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
+                                       PGC_USERSET, PGC_S_SESSION);
+
+       /* Establish connection to nailed catalogs. */
+       BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+
+       /*
+        * Scan all the undo logs and intialize the rollback hash table with all
+        * the pending rollback requests.  This need to be done as a first step
+        * because only after this the transactions will be allowed to write new
+        * undo.  See comments atop UndoLogProcess.
+        */
+       UndoLogProcess();
+
+       /* Enter main loop */
+       while (!got_SIGTERM)
+       {
+               TransactionId OldestXmin,
+                                       oldestXidHavingUndo;
+               FullTransactionId oldestFullXidHavingUndo;
+               int                     rc;
+
+               /*
+                * It is okay to ignore vacuum transaction here, as we can discard the
+                * undo of the vacuuming transaction if the transaction is committed.
+                * We don't need to hold its undo for the visibility purpose.
+                */
+               OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_AUTOVACUUM |
+                                                                  PROCARRAY_FLAGS_VACUUM);
+
+               oldestFullXidHavingUndo =
+                       FullTransactionIdFromU64(pg_atomic_read_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo));
+               oldestXidHavingUndo = XidFromFullTransactionId(oldestFullXidHavingUndo);
+
+               /*
+                * Call the discard routine if oldestXidHavingUndo is lagging behind
+                * OldestXmin.
+                */
+               if (OldestXmin != InvalidTransactionId &&
+                       TransactionIdPrecedes(oldestXidHavingUndo, OldestXmin))
+               {
+                       UndoDiscard(OldestXmin, &hibernate);
+
+                       /*
+                        * If we got some undo logs to discard or discarded something,
+                        * then reset the wait_time as we have got work to do.  Note that
+                        * if there are some undologs that cannot be discarded, then above
+                        * condition will remain unsatisfied till oldestXmin remains
+                        * unchanged and the wait_time will not reset in that case.
+                        */
+                       if (!hibernate)
+                               wait_time = MIN_NAPTIME_PER_CYCLE;
+               }
+
+               /* Wait for more work. */
+               rc = WaitLatch(&MyProc->procLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                                          wait_time,
+                                          WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN);
+
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(&MyProc->procLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
+
+               /*
+                * Increase the wait_time based on the length of inactivity.  If
+                * wait_time is within one second, then increment it by 100 ms at a
+                * time.  Henceforth, increment it one second at a time, till it
+                * reaches ten seconds.  Never increase the wait_time more than ten
+                * seconds, it will be too much of waiting otherwise.
+                */
+               if (rc & WL_TIMEOUT && hibernate)
+               {
+                       wait_time += (wait_time < DELAYED_NAPTIME ?
+                                                 MIN_NAPTIME_PER_CYCLE : DELAYED_NAPTIME);
+                       if (wait_time > MAX_NAPTIME_PER_CYCLE)
+                               wait_time = MAX_NAPTIME_PER_CYCLE;
+               }
+
+               /* emergency bailout if postmaster has died */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+       }
+
+       /* Normal exit from discard worker */
+       ereport(LOG,
+                       (errmsg("discard worker shutting down")));
+
+       proc_exit(0);
+}
+
+/*
+ * IsDiscardProcess -- Tells whether the current process is a discard worker
+ *     process.
+ */
+bool
+IsDiscardProcess(void)
+{
+       return am_discard_worker;
+}
index 55f51169b3a6eb73a3b0ddcf414e3be3f22f9826..5ab9e775db7b389de3cc37f07b9152c9400b7621 100644 (file)
@@ -56,6 +56,7 @@
 #include "storage/buf.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
+#include "storage/proc.h"
 #include "miscadmin.h"
 
 /*
  */
 UndoCompressionInfo undo_compression_info[UndoLogCategories];
 
+/*
+ * Defines the number of times we try to wait for rollback hash table to get
+ * initialized.  After these many attempts it will return error and the user
+ * can retry the operation.
+ */
+#define ROLLBACK_HT_INIT_WAIT_TRY      60
+
 /* Prototypes for static functions. */
 static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
                                                                                        UndoRecPtr urp, RelFileNode rnode,
@@ -668,6 +676,50 @@ PrepareUndoInsert(UndoRecordInsertContext *context,
        UndoCompressionInfo *compression_info =
        &context->undo_compression_info[context->alloc_context.category];
 
+       if (!InRecovery && IsUnderPostmaster)
+       {
+               int try_count = 0;
+
+               /*
+                * If we are not in a recovery and not in a single-user-mode, then undo
+                * generation should not be allowed until we have scanned all the undo
+                * logs and initialized the hash table with all the aborted
+                * transaction entries.  See detailed comments in UndoLogProcess.
+                */
+               while (!ProcGlobal->rollbackHTInitialized)
+               {
+                       /* Error out after trying for one minute. */
+                       if (try_count > ROLLBACK_HT_INIT_WAIT_TRY)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_E_R_E_MODIFYING_SQL_DATA_NOT_PERMITTED),
+                                                errmsg("rollback hash table is not yet initialized, wait for sometime and try again")));
+
+                       /*
+                        * Rollback hash table is not yet intialized, sleep for 1 second
+                        * and try again.
+                        */
+                       pg_usleep(1000000L);
+                       try_count++;
+               }
+       }
+
+       /*
+        * If the rollback hash table is already full (excluding one additional
+        * space for each backend) then don't allow to generate any new undo until
+        * we apply some of the pending requests and create some space in the hash
+        * table to accept new rollback requests.  Leave the enough slots in the
+        * hash table so that there is space for all the backends to register at
+        * least one request.  This is to protect the situation where one backend
+        * keep consuming slots reserve for the other backends and suddenly there
+        * is concurrent undo request from all the backends.  So we always keep
+        * the space reserve for MaxBackends.
+        */
+       if (ProcGlobal->xactsHavingPendingUndo >
+               (UndoRollbackHashTableSize() - MaxBackends))
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+                                errmsg("max limit for pending rollback request has reached, wait for sometime and try again")));
+
        /* Already reached maximum prepared limit. */
        if (context->nprepared_undo == context->max_prepared_undo)
                elog(ERROR, "already reached the maximum prepared limit");
@@ -1824,12 +1876,12 @@ UndoBlockGetFirstUndoRecord(BlockNumber blkno, UndoRecPtr urec_ptr,
        LockBuffer(buffer, BUFFER_LOCK_SHARE);
 
        page = BufferGetPage(buffer);
-       phdr = (UndoPageHeader)page;
+       phdr = (UndoPageHeader) page;
 
        /* Calculate the size of the partial record. */
        partial_rec_size = UndoRecordHeaderSize(phdr->uur_info) +
-                                               phdr->tuple_len + phdr->payload_len -
-                                               phdr->record_offset;
+                                          phdr->tuple_len + phdr->payload_len -
+                                          phdr->record_offset;
 
        /* calculate the offset in current log. */
        offset_cur_page = SizeOfUndoPageHeaderData + partial_rec_size;
diff --git a/src/backend/access/undo/undodiscard.c b/src/backend/access/undo/undodiscard.c
new file mode 100644 (file)
index 0000000..2badb8d
--- /dev/null
@@ -0,0 +1,488 @@
+/*-------------------------------------------------------------------------
+ *
+ * undodiscard.c
+ *       discard undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undodiscard.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/undolog.h"
+#include "access/undodiscard.h"
+#include "access/undorequest.h"
+#include "catalog/pg_tablespace.h"
+#include "miscadmin.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "utils/resowner.h"
+
+/*
+ * Discard as many record sets as we can from the undo log occupying a given
+ * slot, considering the given xmin horizon.  If we encounter a record set
+ * that needs to be rolled back, register a rollback request.  Set *hibernate
+ * to false if work was done.
+ */
+static void
+UndoDiscardOneLog(UndoLogSlot *slot, TransactionId xmin, bool *hibernate)
+{
+       UndoRecPtr      undo_recptr, next_insert;
+       UndoRecPtr      next_urecptr = InvalidUndoRecPtr;
+       UnpackedUndoRecord      *uur = NULL;
+       bool    need_discard = false;
+       FullTransactionId       undofxid = InvalidFullTransactionId;
+       TransactionId   latest_discardxid = InvalidTransactionId;
+       UndoLogNumber logno;
+
+       /*
+        * Currently we expect only one discard worker to be active at any time,
+        * but in future we might have more than one, and superuser maintenance
+        * functions might also discard data concurrently.  So we we have to
+        * assume that the given slot could be recycled underneath us any time we
+        * don't hold one of the locks that prevents that.  We'll detect that by
+        * the log number changing.
+        */
+       LWLockAcquire(&slot->discard_lock, LW_SHARED);
+       logno = slot->logno;
+       if (UndoRecPtrIsValid(slot->oldest_data))
+       {
+               undo_recptr = slot->oldest_data;
+               LWLockRelease(&slot->discard_lock);
+       }
+       else
+       {
+               LWLockRelease(&slot->discard_lock);
+               undo_recptr = UndoLogGetOldestRecord(logno, NULL);
+       }
+
+       /* There might not be any undo log and hibernation might be needed. */
+       *hibernate = true;
+
+       StartTransactionCommand();
+
+       /* Loop until we run out of discardable transactions in the given log. */
+       do
+       {
+               TransactionId wait_xid = InvalidTransactionId;
+               bool pending_abort = false;
+               bool request_rollback = false;
+               UndoStatus status;
+               UndoRecordFetchContext  context;
+
+               next_insert = UndoLogGetNextInsertPtr(logno);
+
+               /* There must be some undo data for a transaction. */
+               Assert(next_insert != undo_recptr);
+
+               /* Fetch the undo record for the given undo_recptr. */
+               BeginUndoFetch(&context);
+               uur = UndoFetchRecord(&context, undo_recptr);
+               FinishUndoFetch(&context);
+
+               if (uur != NULL)
+               {
+                       if (UndoRecPtrGetCategory(undo_recptr) == UNDO_SHARED)
+                       {
+                               /*
+                                * For the "shared" category, we only discard when the
+                                * rm_undo_status callback tells us we can.
+                                */
+                               status = RmgrTable[uur->uur_rmid].rm_undo_status(uur, &wait_xid);
+
+                               Assert((status == UNDO_STATUS_WAIT_XMIN &&
+                                               TransactionIdIsValid(wait_xid)) ^
+                                               (status == UNDO_STATUS_DISCARD &&
+                                               !TransactionIdIsValid(wait_xid)));
+                       }
+                       else
+                       {
+                               TransactionId xid = XidFromFullTransactionId(uur->uur_fxid);
+
+                               /*
+                                * Otherwise we use the CLOG and xmin to decide whether to
+                                * wait, discard or roll back.
+                                *
+                                * XXX: We've added the transaction-in-progress check to
+                                * avoid xids of in-progress autovacuum as those are not
+                                * computed for oldestxmin calculation.  See
+                                * DiscardWorkerMain.
+                                */
+                               if (TransactionIdDidCommit(xid))
+                               {
+                                       /*
+                                        * If this record set's xid isn't before the xmin
+                                        * horizon, we'll have to wait before we can discard
+                                        * it.
+                                        */
+                                       if (TransactionIdFollowsOrEquals(xid, xmin))
+                                               wait_xid = xid;
+
+                               }
+                               else if (!TransactionIdIsInProgress(xid))
+                               {
+                                       /*
+                                        * If it hasn't been applied already, then we'll ask
+                                        * for it to be applied now.  Otherwise it'll be
+                                        * discarded.
+                                        */
+                                       if (!IsXactApplyProgressCompleted(uur->uur_txn->urec_progress))
+                                               request_rollback = true;
+                               }
+                               else
+                               {
+                                       /*
+                                        * It's either in progress or isn't yet before the
+                                        * xmin horizon, so we'll have to wait.
+                                        */
+                                       wait_xid = XidFromFullTransactionId(uur->uur_fxid);
+                               }
+                       }
+
+                       /*
+                        * Add the aborted transaction to the rollback request queues.
+                        *
+                        * We can ignore the abort for transactions whose corresponding
+                        * database doesn't exist.
+                        */
+                       if (request_rollback && dbid_exists(uur->uur_txn->urec_dbid))
+                       {
+                               (void) RegisterRollbackReq(InvalidUndoRecPtr,
+                                                                                  undo_recptr,
+                                                                                  uur->uur_txn->urec_dbid,
+                                                                                  uur->uur_fxid);
+
+                               pending_abort = true;
+                       }
+
+                       next_urecptr = uur->uur_txn->urec_next;
+                       undofxid = uur->uur_fxid;
+
+                       UndoRecordRelease(uur);
+                       uur = NULL;
+               }
+
+               /*
+                * We can discard upto this point when one of following conditions is
+                * met: (a) we need to wait for a transaction first. (b) there is no
+                * more log to process. (c) the transaction undo in current log is
+                * finished. (d) there is a pending abort.
+                */
+               if (TransactionIdIsValid(wait_xid) ||
+                       next_urecptr == InvalidUndoRecPtr ||
+                       UndoRecPtrGetLogNo(next_urecptr) != logno ||
+                       pending_abort)
+               {
+                       /* Hey, I got some undo log to discard, can not hibernate now. */
+                       *hibernate = false;
+
+                       /*
+                        * If we don't need to wait for this transaction and this is not
+                        * an aborted transaction, then we can discard it as well.
+                        */
+                       if (!TransactionIdIsValid(wait_xid) && !pending_abort)
+                       {
+                               /*
+                                * It is safe to use next_insert as the location till which we
+                                * want to discard in this case.  If something new has been
+                                * added after we have fetched this transaction's record, it
+                                * won't be considered in this pass of discard.
+                                */
+                               undo_recptr = next_insert;
+                               latest_discardxid = XidFromFullTransactionId(undofxid);
+                               need_discard = true;
+
+                               /* We don't have anything more to discard. */
+                               undofxid = InvalidFullTransactionId;
+                       }
+
+                       /* Update the shared memory state. */
+                       LWLockAcquire(&slot->discard_lock, LW_EXCLUSIVE);
+
+                       /*
+                        * If the slot has been recycling while we were thinking about it,
+                        * we have to abandon the operation.
+                        */
+                       if (slot->logno != logno)
+                       {
+                               LWLockRelease(&slot->discard_lock);
+                               break;
+                       }
+
+                       /* Update the slot information for the next pass of discard. */
+                       slot->wait_fxmin = undofxid;
+                       slot->oldest_data = undo_recptr;
+
+                       LWLockRelease(&slot->discard_lock);
+
+                       if (need_discard)
+                       {
+                               LWLockAcquire(&slot->discard_update_lock, LW_EXCLUSIVE);
+                               UndoLogDiscard(undo_recptr, latest_discardxid);
+                               LWLockRelease(&slot->discard_update_lock);
+                       }
+
+                       break;
+               }
+
+               /*
+                * This transaction is smaller than the xmin so lets jump to the next
+                * transaction.
+                */
+               undo_recptr = next_urecptr;
+               latest_discardxid = XidFromFullTransactionId(undofxid);
+
+               Assert(uur == NULL);
+
+               /* If we reach here, this means there is something to discard. */
+               need_discard = true;
+       } while (true);
+
+       CommitTransactionCommand();
+}
+
+/*
+ * Scan all the undo logs and register the aborted transactions.  This is
+ * called as a first function from the discard worker and only after this pass
+ * over undo logs is complete, new undo can is allowed to be written in the
+ * system.  This is required because after crash recovery we don't know the
+ * exact number of aborted transactions whose rollback request is pending and
+ * we can not allow new undo request if we already have the request equal to
+ * hash table size.  So before start allowing any new transaction to write the
+ * undo we need to make sure that we know exact number of pending requests.
+ */
+void
+UndoLogProcess()
+{
+       UndoLogSlot *slot = NULL;
+
+       /*
+        * We need to perform this in a transaction because (a) we need resource
+        * owner to scan the logs and (b) TransactionIdIsInProgress requires us to
+        * be in transaction.
+        */
+       StartTransactionCommand();
+
+       /*
+        * Loop through all the valid undo logs and scan them transaction by
+        * transaction to find non-commited transactions if any and register them
+        * in the rollback hash table.
+        */
+       while ((slot = UndoLogNextSlot(slot)))
+       {
+               UndoRecPtr      undo_recptr;
+               UnpackedUndoRecord      *uur = NULL;
+
+               /* We do not execute shared (non-transactional) undo records. */
+               if (slot->meta.category == UNDO_SHARED)
+                       continue;
+
+               /* Start scanning the log from the last discard point. */
+               undo_recptr = UndoLogGetOldestRecord(slot->logno, NULL);
+
+               /* Loop until we scan complete log. */
+               while (1)
+               {
+                       TransactionId xid;
+                       UndoRecordFetchContext  context;
+
+                       /* Done with this log. */
+                       if (!UndoRecPtrIsValid(undo_recptr))
+                               break;
+
+                       /* Fetch the undo record for the given undo_recptr. */
+                       BeginUndoFetch(&context);
+                       uur = UndoFetchRecord(&context, undo_recptr);
+                       FinishUndoFetch(&context);
+
+                       Assert(uur != NULL);
+
+                       xid = XidFromFullTransactionId(uur->uur_fxid);
+
+                       /*
+                        * Register the rollback request for all uncommitted and not in
+                        * progress transactions whose undo apply progress is still not
+                        * completed.  Even though we don't allow any new transactions to
+                        * write undo until this first pass is completed, there might be
+                        * some prepared transactions which are still in progress, so we
+                        * don't include such transactions.
+                        */
+                       if (!TransactionIdDidCommit(xid) &&
+                               !TransactionIdIsInProgress(xid) &&
+                               !IsXactApplyProgressCompleted(uur->uur_txn->urec_progress))
+                       {
+                               (void) RegisterRollbackReq(InvalidUndoRecPtr, undo_recptr,
+                                                                               uur->uur_txn->urec_dbid, uur->uur_fxid);
+                       }
+
+                       /*
+                        * Go to the next transaction in the same log.  If uur_next is
+                        * point to the undo record pointer in the different log then we are
+                        * done with this log so just set undo_recptr to InvalidUndoRecPtr.
+                        */
+                       if (UndoRecPtrGetLogNo(undo_recptr) ==
+                               UndoRecPtrGetLogNo(uur->uur_txn->urec_next))
+                               undo_recptr = uur->uur_txn->urec_next;
+                       else
+                               undo_recptr = InvalidUndoRecPtr;
+
+                       /* Release memory for the current record. */
+                       UndoRecordRelease(uur);
+               }
+       }
+
+       CommitTransactionCommand();
+
+       /* Allow the transactions to start writting undo. */
+       ProcGlobal->rollbackHTInitialized = true;
+}
+
+/*
+ * Discard the undo for all the transactions whose xid is smaller than
+ * oldestXmin
+ */
+void
+UndoDiscard(TransactionId oldestXmin, bool *hibernate)
+{
+       FullTransactionId oldestXidHavingUndo;
+       UndoLogSlot *slot = NULL;
+       uint32  epoch;
+
+       /*
+        * If all the undo logs are discarded, then oldestXidHavingUndo should be
+        * oldestXmin.  As of now, we don't allow more than 2 billion xids in the
+        * system, so we can rely on the epoch retrieved with GetEpochForXid.
+        */
+       epoch = GetEpochForXid(oldestXmin);
+       oldestXidHavingUndo = FullTransactionIdFromEpochAndXid(epoch, oldestXmin);
+
+       /*
+        * Iterate through all the active logs and one-by-one try to discard the
+        * transactions that are old enough to matter.
+        *
+        * XXX Ideally we can arrange undo logs so that we can efficiently find
+        * those with oldest_xid < oldestXmin, but for now we'll just scan all of
+        * them.
+        */
+       while ((slot = UndoLogNextSlot(slot)))
+       {
+               /*
+                * If the log is already discarded, then we are done.  It is important
+                * to first check this to ensure that tablespace containing this log
+                * doesn't get dropped concurrently.
+                */
+               LWLockAcquire(&slot->mutex, LW_SHARED);
+               /*
+                * We don't have to worry about slot recycling and check the logno
+                * here, since we don't care about the identity of this slot, we're
+                * visiting all of them.
+                */
+               if (slot->meta.discard == slot->meta.unlogged.insert)
+               {
+                       LWLockRelease(&slot->mutex);
+                       continue;
+               }
+               LWLockRelease(&slot->mutex);
+
+               /* We can't process temporary undo logs. */
+               if (slot->meta.category == UNDO_TEMP)
+                       continue;
+
+               /*
+                * If the first xid of the undo log is smaller than the xmin then try
+                * to discard the undo log.
+                */
+               if (!FullTransactionIdIsValid(slot->wait_fxmin) ||
+                       FullTransactionIdPrecedes(slot->wait_fxmin, oldestXidHavingUndo))
+               {
+                       /* Process the undo log. */
+                       UndoDiscardOneLog(slot, oldestXmin, hibernate);
+               }
+       }
+
+       /* Get the smallest of 'xid having pending undo' and 'oldestXmin' */
+       oldestXidHavingUndo = RollbackHTGetOldestFullXid(oldestXidHavingUndo);
+
+       /*
+        * Update the oldestFullXidHavingUnappliedUndo in the shared memory.
+        *
+        * XXX: In future, if multiple workers can perform discard then we may
+        * need to use compare and swap for updating the shared memory value.
+        */
+       if (FullTransactionIdIsValid(oldestXidHavingUndo))
+               pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+                                                       U64FromFullTransactionId(oldestXidHavingUndo));
+}
+
+/*
+ * Discard all the logs.  This is particularly required in single user mode
+ * where at the commit time we discard all the undo logs.
+ */
+void
+UndoLogDiscardAll(void)
+{
+       UndoLogSlot *slot = NULL;
+
+       Assert(!IsUnderPostmaster);
+
+       /*
+        * No locks are required for discard, since this called only in single
+        * user mode.
+        */
+       while ((slot = UndoLogNextSlot(slot)))
+       {
+               /* If the log is already discarded, then we are done. */
+               if (slot->meta.discard == slot->meta.unlogged.insert)
+                       continue;
+
+               /*
+                * Process the undo log.
+                */
+               UndoLogDiscard(MakeUndoRecPtr(slot->logno, slot->meta.unlogged.insert),
+                                          InvalidTransactionId);
+       }
+
+}
+
+/*
+ * Discard the undo logs for temp tables.
+ */
+void
+TempUndoDiscard(UndoLogNumber logno)
+{
+       UndoLogSlot *slot = UndoLogGetSlot(logno, false);
+
+       /*
+        * Discard the undo log for temp table only. Ensure that there is
+        * something to be discarded there.
+        */
+       Assert (slot->meta.category == UNDO_TEMP);
+
+       /*
+        * If the log is already discarded, then we are done.  It is important
+        * to first check this to ensure that tablespace containing this log
+        * doesn't get dropped concurrently.
+        */
+       LWLockAcquire(&slot->mutex, LW_SHARED);
+       if (slot->meta.discard == slot->meta.unlogged.insert)
+       {
+               LWLockRelease(&slot->mutex);
+               return;
+       }
+       LWLockRelease(&slot->mutex);
+
+       /* Process the undo log. */
+       UndoLogDiscard(MakeUndoRecPtr(slot->logno, slot->meta.unlogged.insert),
+                                  InvalidTransactionId);
+}
index b5502f2417cb32c14f94ae4eaf42d32012dfd19f..09d9b3050addf3fcb5ca712fe9eeb1c6d3ee70ef 100644 (file)
@@ -145,6 +145,8 @@ UndoLogShmemInit(void)
                                                         LWTRANCHE_UNDOLOG);
                        LWLockInitialize(&UndoLogShared->slots[i].discard_lock,
                                                         LWTRANCHE_UNDODISCARD);
+                       LWLockInitialize(&UndoLogShared->slots[i].discard_update_lock,
+                                                        LWTRANCHE_DISCARD_UPDATE);
                }
        }
        else
index 916d87910f7b1956c858c78744db7956c88ab362..c04591ed783fb89aecd7af10bec349b65b6344d6 100644 (file)
 #include "postgres.h"
 #include "miscadmin.h"
 
+#include "access/discardworker.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/transam.h"
 #include "access/undorequest.h"
+#include "access/undoworker.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
 #include "catalog/pg_database.h"
@@ -925,6 +927,138 @@ FindUndoEndLocationAndSize(UndoRecPtr start_urecptr,
        return sz;
 }
 
+/*
+ * Fetch the start urec pointer for the transaction and the undo request size.
+ *
+ * start_urecptr_inout - This is an INOUT parameter.  If a transaction has
+ * overflowed to multiple undo logs, the caller can set start_urecptr_inout
+ * to a location of any of the undo logs where the transaction has written its
+ * first record for that particular log.  Given that, this function calculates
+ * the undo location where the transaction has inserted its first undo record.
+ * If a transaction hasn't overflowed to multiple undo logs, the value of this
+ * parameter remains unchanged.
+ *
+ * The first record of a transaction in each undo log contains a reference to
+ * the first record of this transaction in the previous log.  It finds the
+ * initial location by moving backward in the undo chain of this transaction
+ * across undo logs.  While doing the same, it also calculates the undo size
+ * between the input and output start undo record pointer value.
+ */
+static uint64
+FindUndoStartLocationAndSize(UndoRecPtr *start_urecptr_inout,
+                                                        FullTransactionId full_xid)
+{
+       UnpackedUndoRecord *uur = NULL;
+       UndoLogSlot *slot = NULL;
+       UndoRecPtr      urecptr = InvalidUndoRecPtr;
+       uint64          sz = 0;
+
+       Assert(start_urecptr_inout);
+
+       urecptr = *start_urecptr_inout;
+       Assert(urecptr != InvalidUndoRecPtr);
+
+       /*
+        * A backend always set the start undo record pointer to the first undo
+        * record inserted by this transaction.  Hence, we don't have to proceed
+        * further.
+        */
+       if (!IsDiscardProcess())
+               return sz;
+
+       /*
+        * Since the discard worker processes the undo logs sequentially, it's
+        * possible that start undo record pointer doesn't refer to the actual
+        * start of the transaction.  Instead, it may refer to the start location
+        * of the transaction in any of the subsequent logs.  In that case, we've
+        * to find the actual start location of the transaction by going backwards
+        * in the chain.
+        */
+       while (true)
+       {
+               UndoLogOffset next_insert;
+               UndoRecordFetchContext  context;
+
+               /*
+                * Fetch the log and undo record corresponding to the current undo
+                * pointer.
+                */
+               if ((slot == NULL) || (UndoRecPtrGetLogNo(urecptr) != slot->logno))
+                       slot = UndoLogGetSlot(UndoRecPtrGetLogNo(urecptr), false);
+
+               Assert(slot != NULL);
+
+               /* The corresponding log must be ahead urecptr. */
+               Assert(MakeUndoRecPtr(slot->logno, slot->meta.unlogged.insert) >= urecptr);
+
+               /* Fetch the undo record. */
+               BeginUndoFetch(&context);
+               uur = UndoFetchRecord(&context, urecptr);
+               FinishUndoFetch(&context);
+
+               /*
+                * Since the rollback isn't completed for this transaction, this undo
+                * record can't be discarded.
+                */
+               Assert (uur != NULL);
+
+               /* The undo must belongs to a same transaction. */
+               Assert(FullTransactionIdEquals(full_xid, uur->uur_fxid));
+
+               /*
+                * Since this is the first undo record of this transaction in this
+                * log, this must include the transaction header.
+                */
+               Assert(uur->uur_info & UREC_INFO_TRANSACTION);
+
+               /*
+                * If this is the first undo record of this transaction, return from
+                * here.
+                */
+               if ((uur->uur_info & UREC_INFO_LOGSWITCH) == 0)
+               {
+                       UndoRecordRelease(uur);
+                       break;
+               }
+
+               /*
+                * This is a start of a overflowed transaction header, so it must have
+                * a valid pointer to previous log's start transaction header.
+                */
+               Assert(UndoRecPtrIsValid(uur->uur_logswitch->urec_prevlogstart));
+
+
+               /*
+                * Find the previous log from which the transaction is overflowed
+                * to current log.
+                */
+               urecptr = uur->uur_logswitch->urec_prevlogstart;
+               slot = UndoLogGetSlot(UndoRecPtrGetLogNo(urecptr), false);
+
+               /*
+                * When a transaction overflows to a new undo log, it's guaranteed
+                * that this transaction will be the last transaction in the previous
+                * log and we mark that log as full so that no other transaction can
+                * write in that log further.  Check UndoLogAllocate for details.
+                *
+                * So, to find the undo size in the previous log, we've to find the
+                * next insert location of the previous log and subtract current
+                * transaction's start location in the previous log from it.
+                */
+               next_insert = UndoLogGetNextInsertPtr(slot->logno);
+               Assert(UndoRecPtrIsValid(next_insert));
+
+               sz += (next_insert - urecptr);
+
+               UndoRecordRelease(uur);
+               uur = NULL;
+       }
+
+       *start_urecptr_inout = urecptr;
+
+       return sz;
+}
+
 /*
  * Returns true, if we can push the rollback request to undo wrokers, false,
  * otherwise.
@@ -941,9 +1075,24 @@ CanPushReqToUndoWorker(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr,
 
        /*
         * We normally push the rollback request to undo workers if the size of
-        * same is above a certain threshold.
+        * same is above a certain threshold.  However, discard worker is allowed
+        * to push any size request provided there is a space in rollback request
+        * queue.  This is mainly because discard worker can be processing the
+        * rollback requests after crash recovery when no backend is alive.
+        *
+        * We have a race condition where discard worker can process the request
+        * before the backend which has aborted the transaction in which case
+        * backend won't do anything.  Normally, this won't happen because
+        * backends try to apply the undo actions immediately after marking the
+        * transaction as aborted in the clog.  One way to avoid this race
+        * condition is that we register the request by backend in hash table but
+        * not in rollback queues before marking abort in clog and then later add
+        * them in rollback queues.  However, we are not sure how important it is
+        * avoid such a race as this won't lead to any problem and OTOH, we might
+        * need some more trickery in the code to avoid such a race condition.
         */
-       if (req_size >= rollback_overflow_size * 1024 * 1024)
+       if (req_size >= rollback_overflow_size * 1024 * 1024 ||
+               IsDiscardProcess())
        {
                if (GetXidQueueSize() >= pending_undo_queue_size ||
                        GetSizeQueueSize() >= pending_undo_queue_size)
@@ -969,12 +1118,7 @@ CanPushReqToUndoWorker(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr,
                if ((GetXidQueueSize() < pending_undo_queue_size))
                {
                        Assert(GetSizeQueueSize() < pending_undo_queue_size);
-
-                       /*
-                        * XXX - Here, we should return true once we have background
-                        * worker facility.
-                        */
-                       return false;
+                       return true;
                }
        }
 
@@ -1414,6 +1558,12 @@ RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr,
        Assert(UndoRecPtrIsValid(start_urec_ptr));
        Assert(dbid != InvalidOid);
 
+       /*
+        * The discard worker can only send the start undo record pointer of a
+        * transaction.  It doesn't set the end_urec_ptr.
+        */
+       Assert(IsDiscardProcess() || UndoRecPtrIsValid(end_urec_ptr));
+
        /*
         * Find the rollback request size and the end_urec_ptr (in case of discard
         * worker only).
@@ -1429,6 +1579,14 @@ RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr,
        if (!UndoRecPtrIsValid(end_urec_ptr))
                return false;
 
+       /*
+        * For registering a rollback request, we always store the full transaction
+        * ID and the first undo record pointer inserted by this transaction.  This
+        * ensures that backends and discard worker don't register the same request
+        * twice.
+        */
+       req_size += FindUndoStartLocationAndSize(&start_urec_ptr, full_xid);
+
        LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
 
        /*
@@ -1444,7 +1602,13 @@ RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr,
                                                                                   HASH_ENTER_NULL, &found);
 
        /*
-        * It can only fail, if the value of pending_undo_queue_size or
+        * Except the first pass over the undo logs by discard worker, the hash
+        * table can never be full.
+        */
+       Assert(!ProcGlobal->rollbackHTInitialized || (rh != NULL));
+
+       /*
+        * It can only fail, if  the value of pending_undo_queue_size or
         * max_connections guc is reduced after restart of the server.
         */
        if (rh == NULL)
@@ -1492,10 +1656,16 @@ RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr,
                }
                /*
                 * The request can't be pushed into the undo worker queue.  The
-                * backends will try executing by itself.
+                * backends will try executing by itself.  The discard worker will
+                * keep the entry into the rollback hash table with
+                * UNDO_REQUEST_INVALID status.  Such requests will be added in the
+                * undo worker queues in the subsequent passes over undo logs by
+                * discard worker.
                 */
-               else
+               else if (!IsDiscardProcess())
                        rh->status = UNDO_REQUEST_INPROGRESS;
+               else
+                       rh->status = UNDO_REQUEST_INVALID;
        }
        else if (!UndoRequestIsValid(rh) && can_push)
        {
@@ -1523,6 +1693,13 @@ RegisterRollbackReq(UndoRecPtr end_urec_ptr, UndoRecPtr start_urec_ptr,
 
        LWLockRelease(RollbackRequestLock);
 
+       /*
+        * If we are able to successfully push the request, wakeup the undo worker
+        * so that it can be processed in a timely fashion.
+        */
+       if (pushed)
+               WakeupUndoWorker(dbid);
+
        return pushed;
 }
 
diff --git a/src/backend/access/undo/undoworker.c b/src/backend/access/undo/undoworker.c
new file mode 100644 (file)
index 0000000..4f6927b
--- /dev/null
@@ -0,0 +1,828 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoworker.c
+ *       undo launcher and undo worker process.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/postmaster/undoworker.c
+ *
+ * Undo launcher is responsible for launching the workers iff there is some
+ * work available in one of work queues and there are more workers available.
+ * To know more about work queues, see undorequest.c.  The worker is launched
+ * to handle requests for a particular database.
+ *
+ * Each undo worker then start reading from one of the queue the requests for
+ * that particular database.  A worker would peek into each queue for the
+ * requests from a particular database, if it needs to switch a database in
+ * less than undo_worker_quantum ms after starting.  Also, if there is no
+ * work, it lingers for UNDO_WORKER_LINGER_MS.  This avoids restarting
+ * the workers too frequently.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+
+#include "access/genam.h"
+#include "access/table.h"
+#include "access/xact.h"
+#include "access/undorequest.h"
+#include "access/undoworker.h"
+
+#include "libpq/pqsignal.h"
+
+#include "postmaster/bgworker.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/postmaster.h"
+
+#include "replication/slot.h"
+#include "replication/worker_internal.h"
+
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+
+#include "tcop/tcopprot.h"
+
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+
+/*
+ * GUC parameters
+ */
+int                    max_undo_workers = 4;
+
+/*
+ * If a worker would need to switch databases in less than undo_worker_quantum
+ * (10s as default) after starting, it peeks a few entries deep into each
+ * queue to see whether there's work for that database.
+ */
+int                    undo_worker_quantum_ms = 10000;
+
+/* max sleep time between cycles (100 milliseconds) */
+#define DEFAULT_NAPTIME_PER_CYCLE 100L
+
+/*
+ * Time for which undo worker can linger if there is no work, in
+ * milliseconds.  This has to be more than UNDO_FAILURE_RETRY_DELAY_MS,
+ * otherwise, worker can exit before retrying the failed requests.
+ */
+#define UNDO_WORKER_LINGER_MS 20000
+
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
+
+static TimestampTz last_xact_processed_at;
+
+typedef struct UndoApplyWorker
+{
+       /* Indicates if this slot is used or free. */
+       bool            in_use;
+
+       /* Increased every time the slot is taken by new worker. */
+       uint16          generation;
+
+       /* Pointer to proc array. NULL if not running. */
+       PGPROC     *proc;
+
+       /* Database id this worker is connected to. */
+       Oid                     dbid;
+
+       /* this tells whether worker is lingering. */
+       bool            lingering;
+
+       /*
+        * This tells the undo worker from which undo worker queue it should start
+        * processing.
+        */
+       UndoWorkerQueueType undo_worker_queue;
+} UndoApplyWorker;
+
+UndoApplyWorker *MyUndoWorker = NULL;
+
+typedef struct UndoApplyCtxStruct
+{
+       /* Supervisor process. */
+       pid_t           launcher_pid;
+
+       /* latch to wake up undo launcher. */
+       Latch      *undo_launcher_latch;
+
+       /* Background workers. */
+       UndoApplyWorker workers[FLEXIBLE_ARRAY_MEMBER];
+} UndoApplyCtxStruct;
+
+UndoApplyCtxStruct *UndoApplyCtx;
+
+static void UndoWorkerOnExit(int code, Datum arg);
+static void UndoWorkerCleanup(UndoApplyWorker *worker);
+static void UndoWorkerIsLingering(bool sleep);
+static void UndoWorkerGetSlotInfo(int slot, UndoRequestInfo *urinfo);
+static void UndoworkerSigtermHandler(SIGNAL_ARGS);
+
+/*
+ * Cleanup function for undo worker launcher.
+ *
+ * Called on undo worker launcher exit.
+ */
+static void
+UndoLauncherOnExit(int code, Datum arg)
+{
+       UndoApplyCtx->launcher_pid = 0;
+       UndoApplyCtx->undo_launcher_latch = NULL;
+}
+
+/* SIGTERM: set flag to exit at next convenient time */
+static void
+UndoworkerSigtermHandler(SIGNAL_ARGS)
+{
+       got_SIGTERM = true;
+
+       /* Waken anything waiting on the process latch */
+       SetLatch(MyLatch);
+}
+
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+UndoLauncherSighup(SIGNAL_ARGS)
+{
+       int                     save_errno = errno;
+
+       got_SIGHUP = true;
+
+       /* Waken anything waiting on the process latch */
+       SetLatch(MyLatch);
+
+       errno = save_errno;
+}
+
+/*
+ * Wait for a background worker to start up and attach to the shmem context.
+ *
+ * This is only needed for cleaning up the shared memory in case the worker
+ * fails to attach.
+ */
+static void
+WaitForUndoWorkerAttach(UndoApplyWorker * worker,
+                                               uint16 generation,
+                                               BackgroundWorkerHandle *handle)
+{
+       BgwHandleStatus status;
+       int                     rc;
+
+       for (;;)
+       {
+               pid_t           pid;
+
+               CHECK_FOR_INTERRUPTS();
+
+               LWLockAcquire(UndoWorkerLock, LW_SHARED);
+
+               /* Worker either died or has started; no need to do anything. */
+               if (!worker->in_use || worker->proc)
+               {
+                       LWLockRelease(UndoWorkerLock);
+                       return;
+               }
+
+               LWLockRelease(UndoWorkerLock);
+
+               /* Check if worker has died before attaching, and clean up after it. */
+               status = GetBackgroundWorkerPid(handle, &pid);
+
+               if (status == BGWH_STOPPED)
+               {
+                       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+                       /* Ensure that this was indeed the worker we waited for. */
+                       if (generation == worker->generation)
+                               UndoWorkerCleanup(worker);
+                       LWLockRelease(UndoWorkerLock);
+                       return;
+               }
+
+               /*
+                * We need timeout because we generally don't get notified via latch
+                * about the worker attach.  But we don't expect to have to wait long.
+                */
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                                          10L, WAIT_EVENT_BGWORKER_STARTUP);
+
+               /* emergency bailout if postmaster has died */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
+       }
+
+       return;
+}
+
+/*
+ * Attach to a slot.
+ */
+static void
+UndoWorkerAttach(int slot)
+{
+       /* Block concurrent access. */
+       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+       MyUndoWorker = &UndoApplyCtx->workers[slot];
+
+       if (!MyUndoWorker->in_use)
+       {
+               LWLockRelease(UndoWorkerLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("undo worker slot %d is empty, cannot attach",
+                                               slot)));
+       }
+
+       if (MyUndoWorker->proc)
+       {
+               LWLockRelease(UndoWorkerLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("undo worker slot %d is already used by "
+                                               "another worker, cannot attach", slot)));
+       }
+
+       MyUndoWorker->proc = MyProc;
+       before_shmem_exit(UndoWorkerOnExit, (Datum) 0);
+
+       LWLockRelease(UndoWorkerLock);
+}
+
+/*
+ * Returns whether an undo worker is available.
+ */
+static int
+IsUndoWorkerAvailable(void)
+{
+       int                     i;
+       int                     alive_workers = 0;
+
+       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+       /* Search for attached workers. */
+       for (i = 0; i < max_undo_workers; i++)
+       {
+               UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+               if (w->in_use)
+                       alive_workers++;
+       }
+
+       LWLockRelease(UndoWorkerLock);
+
+       return (alive_workers < max_undo_workers);
+}
+
+/* Sets the worker's lingering status. */
+static void
+UndoWorkerIsLingering(bool sleep)
+{
+       /* Block concurrent access. */
+       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+       MyUndoWorker->lingering = sleep;
+
+       LWLockRelease(UndoWorkerLock);
+}
+
+/* Get the dbid and undo worker queue set by the undo launcher. */
+static void
+UndoWorkerGetSlotInfo(int slot, UndoRequestInfo *urinfo)
+{
+       /* Block concurrent access. */
+       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+       MyUndoWorker = &UndoApplyCtx->workers[slot];
+
+       if (!MyUndoWorker->in_use)
+       {
+               LWLockRelease(UndoWorkerLock);
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("undo worker slot %d is empty",
+                                               slot)));
+       }
+
+       urinfo->dbid = MyUndoWorker->dbid;
+       urinfo->undo_worker_queue = MyUndoWorker->undo_worker_queue;
+
+       LWLockRelease(UndoWorkerLock);
+}
+
+/*
+ * Start new undo apply background worker, if possible otherwise return false.
+ */
+static bool
+UndoWorkerLaunch(UndoRequestInfo urinfo)
+{
+       BackgroundWorker bgw;
+       BackgroundWorkerHandle *bgw_handle;
+       uint16          generation;
+       int                     i;
+       int                     slot = 0;
+       UndoApplyWorker *worker = NULL;
+
+       /*
+        * We need to do the modification of the shared memory under lock so that
+        * we have consistent view.
+        */
+       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+       /* Find unused worker slot. */
+       for (i = 0; i < max_undo_workers; i++)
+       {
+               UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+               if (!w->in_use)
+               {
+                       worker = w;
+                       slot = i;
+                       break;
+               }
+       }
+
+       /* We must not try to start a worker if there are no available workers. */
+       Assert(worker != NULL);
+
+       /* Prepare the worker slot. */
+       worker->in_use = true;
+       worker->proc = NULL;
+       worker->dbid = urinfo.dbid;
+       worker->lingering = false;
+       worker->undo_worker_queue = urinfo.undo_worker_queue;
+       worker->generation++;
+
+       generation = worker->generation;
+       LWLockRelease(UndoWorkerLock);
+
+       /* Register the new dynamic worker. */
+       memset(&bgw, 0, sizeof(bgw));
+       bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+               BGWORKER_BACKEND_DATABASE_CONNECTION;
+       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+       snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoWorkerMain");
+       snprintf(bgw.bgw_type, BGW_MAXLEN, "undo apply worker");
+       snprintf(bgw.bgw_name, BGW_MAXLEN, "undo apply worker");
+
+       bgw.bgw_restart_time = BGW_NEVER_RESTART;
+       bgw.bgw_notify_pid = MyProcPid;
+       bgw.bgw_main_arg = Int32GetDatum(slot);
+
+       if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
+       {
+               /* Failed to start worker, so clean up the worker slot. */
+               LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+               UndoWorkerCleanup(worker);
+               LWLockRelease(UndoWorkerLock);
+
+               return false;
+       }
+
+       /* Now wait until it attaches. */
+       WaitForUndoWorkerAttach(worker, generation, bgw_handle);
+
+       return true;
+}
+
+/*
+ * Detach the worker (cleans up the worker info).
+ */
+static void
+UndoWorkerDetach(void)
+{
+       /* Block concurrent access. */
+       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+       UndoWorkerCleanup(MyUndoWorker);
+
+       LWLockRelease(UndoWorkerLock);
+}
+
+/*
+ * Clean up worker info.
+ */
+static void
+UndoWorkerCleanup(UndoApplyWorker * worker)
+{
+       Assert(LWLockHeldByMeInMode(UndoWorkerLock, LW_EXCLUSIVE));
+
+       worker->in_use = false;
+       worker->proc = NULL;
+       worker->dbid = InvalidOid;
+       worker->lingering = false;
+       worker->undo_worker_queue = InvalidUndoWorkerQueue;
+}
+
+/*
+ * Cleanup function.
+ *
+ * Called on undo worker exit.
+ */
+static void
+UndoWorkerOnExit(int code, Datum arg)
+{
+       UndoWorkerDetach();
+}
+
+/*
+ * Perform rollback request.  We need to connect to the database for first
+ * request and that is required because we access system tables while
+ * performing undo actions.
+ */
+static void
+UndoWorkerPerformRequest(UndoRequestInfo * urinfo)
+{
+       bool error = false;
+
+       /* must be connected to the database. */
+       Assert(MyDatabaseId != InvalidOid);
+
+       StartTransactionCommand();
+       PG_TRY();
+       {
+               execute_undo_actions(urinfo->full_xid, urinfo->end_urec_ptr,
+                                                        urinfo->start_urec_ptr, true);
+       }
+       PG_CATCH();
+       {
+               error = true;
+
+               /*
+                * Register the unprocessed request in an error queue, so that it can
+                * be processed in a timely fashion.  If we fail to add the request in
+                * an error queue, then mark the entry status invalid.  This request
+                * will be later added back to the queue by the discard worker.
+                */
+               if (!InsertRequestIntoErrorUndoQueue(urinfo))
+                       RollbackHTMarkEntryInvalid(urinfo->full_xid,
+                                                                          urinfo->start_urec_ptr);
+
+               /* Prevent interrupts while cleaning up. */
+               HOLD_INTERRUPTS();
+
+               /* Send the error only to server log. */
+               err_out_to_client(false);
+               EmitErrorReport();
+
+               /*
+                * Abort the transaction and continue processing pending undo requests.
+                */
+               AbortOutOfAnyTransaction();
+               FlushErrorState();
+
+               RESUME_INTERRUPTS();
+       }
+       PG_END_TRY();
+
+       if (!error)
+               CommitTransactionCommand();
+}
+
+/*
+ * UndoLauncherShmemSize
+ *             Compute space needed for undo launcher shared memory
+ */
+Size
+UndoLauncherShmemSize(void)
+{
+       Size            size;
+
+       /*
+        * Need the fixed struct and the array of LogicalRepWorker.
+        */
+       size = sizeof(UndoApplyCtxStruct);
+       size = MAXALIGN(size);
+       size = add_size(size, mul_size(max_undo_workers,
+                                                                  sizeof(UndoApplyWorker)));
+       return size;
+}
+
+/*
+ * UndoLauncherShmemInit
+ *             Allocate and initialize undo worker launcher shared memory
+ */
+void
+UndoLauncherShmemInit(void)
+{
+       bool            found;
+
+       UndoApplyCtx = (UndoApplyCtxStruct *)
+               ShmemInitStruct("Undo Worker Launcher Data",
+                                               UndoLauncherShmemSize(),
+                                               &found);
+
+       if (!found)
+               memset(UndoApplyCtx, 0, UndoLauncherShmemSize());
+}
+
+/*
+ * UndoLauncherRegister
+ *             Register a background worker running the undo worker launcher.
+ */
+void
+UndoLauncherRegister(void)
+{
+       BackgroundWorker bgw;
+
+       if (max_undo_workers == 0)
+               return;
+
+       memset(&bgw, 0, sizeof(bgw));
+       bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+               BGWORKER_BACKEND_DATABASE_CONNECTION;
+       bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+       snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+       snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoLauncherMain");
+       snprintf(bgw.bgw_name, BGW_MAXLEN,
+                        "undo worker launcher");
+       snprintf(bgw.bgw_type, BGW_MAXLEN,
+                        "undo worker launcher");
+       bgw.bgw_restart_time = 5;
+       bgw.bgw_notify_pid = 0;
+       bgw.bgw_main_arg = (Datum)0;
+
+       RegisterBackgroundWorker(&bgw);
+}
+
+/*
+ * Main loop for the undo worker launcher process.
+ */
+void
+UndoLauncherMain(Datum main_arg)
+{
+       UndoRequestInfo urinfo;
+
+       ereport(DEBUG1,
+                       (errmsg("undo launcher started")));
+
+       before_shmem_exit(UndoLauncherOnExit, (Datum) 0);
+
+       Assert(UndoApplyCtx->launcher_pid == 0);
+       UndoApplyCtx->launcher_pid = MyProcPid;
+
+       /* Establish signal handlers. */
+       pqsignal(SIGHUP, UndoLauncherSighup);
+       pqsignal(SIGTERM, UndoworkerSigtermHandler);
+       BackgroundWorkerUnblockSignals();
+
+       /* Establish connection to nailed catalogs. */
+       BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+
+       /*
+        * Advertise our latch that undo request enqueuer can use to wake us up
+        * while we're sleeping.
+        */
+       UndoApplyCtx->undo_launcher_latch = &MyProc->procLatch;
+
+       /* Enter main loop */
+       while (!got_SIGTERM)
+       {
+               int                     rc;
+
+               CHECK_FOR_INTERRUPTS();
+
+               ResetUndoRequestInfo(&urinfo);
+
+               if (UndoGetWork(false, false, &urinfo, NULL) &&
+                       IsUndoWorkerAvailable())
+                       UndoWorkerLaunch(urinfo);
+
+               /* Wait for more work. */
+               rc = WaitLatch(MyLatch,
+                                          WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                                          DEFAULT_NAPTIME_PER_CYCLE,
+                                          WAIT_EVENT_UNDO_LAUNCHER_MAIN);
+
+               /* emergency bailout if postmaster has died */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       CHECK_FOR_INTERRUPTS();
+               }
+
+               if (got_SIGHUP)
+               {
+                       got_SIGHUP = false;
+                       ProcessConfigFile(PGC_SIGHUP);
+               }
+       }
+
+       /* Normal exit from undo launcher main */
+       ereport(LOG,
+                       (errmsg("undo launcher shutting down")));
+       proc_exit(0);
+}
+
+/*
+ * UndoWorkerMain -- Main loop for the undo apply worker.
+ */
+void
+UndoWorkerMain(Datum main_arg)
+{
+       UndoRequestInfo urinfo;
+       int                     worker_slot = DatumGetInt32(main_arg);
+       bool            in_other_db;
+       bool            found_work;
+       TimestampTz started_at;
+
+       /* Setup signal handling */
+       pqsignal(SIGTERM, UndoworkerSigtermHandler);
+       BackgroundWorkerUnblockSignals();
+
+       ResetUndoRequestInfo(&urinfo);
+       started_at = GetCurrentTimestamp();
+
+       /*
+        * Get the dbid where the wroker should connect to and get the worker
+        * request queue from which the worker should start looking for an undo
+        * request.
+        */
+       UndoWorkerGetSlotInfo(worker_slot, &urinfo);
+
+       /* Connect to the requested database. */
+       BackgroundWorkerInitializeConnectionByOid(urinfo.dbid, 0, 0);
+
+       /*
+        * Set the undo worker request queue from which the undo worker start
+        * looking for a work.
+        */
+       SetUndoWorkerQueueStart(urinfo.undo_worker_queue);
+
+       /*
+        * Before attaching the worker, fetch and remove the undo request for
+        * which the undo launcher has launched this worker.  This restricts the
+        * undo launcher from launching multiple workers for the same request.
+        * But, it's possible that the undo request has already been processed by
+        * other in-progress undo worker.  In that case, we enter the undo worker
+        * main loop and fetch the next request.
+        */
+       found_work = UndoGetWork(false, true, &urinfo, &in_other_db);
+
+       /* Attach to slot */
+       UndoWorkerAttach(worker_slot);
+
+       if (found_work && !in_other_db)
+       {
+               /* We must have got the pending undo request. */
+               Assert(FullTransactionIdIsValid(urinfo.full_xid));
+               UndoWorkerPerformRequest(&urinfo);
+               last_xact_processed_at = GetCurrentTimestamp();
+       }
+
+       while (!got_SIGTERM)
+       {
+               int                     rc;
+               bool            allow_peek;
+
+               CHECK_FOR_INTERRUPTS();
+
+               allow_peek = !TimestampDifferenceExceeds(started_at,
+                                                                                                GetCurrentTimestamp(),
+                                                                                                undo_worker_quantum_ms);
+
+               found_work = UndoGetWork(allow_peek, true, &urinfo, &in_other_db);
+
+               if (found_work && in_other_db)
+               {
+                       proc_exit(0);
+               }
+               else if (found_work)
+               {
+                       /* We must have got the pending undo request. */
+                       Assert(FullTransactionIdIsValid(urinfo.full_xid));
+                       UndoWorkerPerformRequest(&urinfo);
+                       last_xact_processed_at = GetCurrentTimestamp();
+               }
+               else
+               {
+                       TimestampTz timeout = 0;
+
+                       timeout = TimestampTzPlusMilliseconds(last_xact_processed_at,
+                                                                                                 UNDO_WORKER_LINGER_MS);
+
+                       /*
+                        * We don't need to linger if we have already spent
+                        * UNDO_WORKER_LINGER_MS since last transaction has processed.
+                        */
+                       if (timeout <= GetCurrentTimestamp())
+                       {
+                               proc_exit(0);
+                       }
+
+                       /*
+                        * Update the shared state to reflect that this worker is
+                        * lingering so that if there is new work request, requester can
+                        * wake us up.
+                        */
+                       UndoWorkerIsLingering(true);
+
+                       /* Wait for more work. */
+                       rc = WaitLatch(MyLatch,
+                                                  WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+                                                  DEFAULT_NAPTIME_PER_CYCLE,
+                                                  WAIT_EVENT_UNDO_WORKER_MAIN);
+
+                       /* reset the shared state. */
+                       UndoWorkerIsLingering(false);
+
+                       /* emergency bailout if postmaster has died */
+                       if (rc & WL_POSTMASTER_DEATH)
+                               proc_exit(1);
+
+                       if (rc & WL_LATCH_SET)
+                       {
+                               ResetLatch(MyLatch);
+                               CHECK_FOR_INTERRUPTS();
+                       }
+
+                       if (got_SIGHUP)
+                       {
+                               got_SIGHUP = false;
+                               ProcessConfigFile(PGC_SIGHUP);
+                       }
+               }
+       }
+
+       /* Normal exit from undo worker main */
+       proc_exit(0);
+}
+
+/*
+ * Wake up undo worker so that undo requests can be processed in a timely
+ * fashion.
+ *
+ * We first try to wake up the lingering worker in the given database.  If we
+ * found even one such worker, we are done.
+ *
+ * Next, we try to stop some worker which is lingering, but doesn't belong to
+ * the given database.  We know that any worker which is lingering doesn't have
+ * any pending work, so it is fine to stop it when we know that there is going
+ * to be some work in the other database.
+ *
+ * Finally, we wakeup launcher so that it can either restart the worker we have
+ * stopped or find some other worker who can take up this request.
+ */
+void
+WakeupUndoWorker(Oid dbid)
+{
+       int                     i;
+
+       LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+       /* wake up lingering worker in the given database. */
+       for (i = 0; i < max_undo_workers; i++)
+       {
+               UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+               if (w->in_use && w->lingering && w->dbid == dbid)
+               {
+                       SetLatch(&w->proc->procLatch);
+
+                       LWLockRelease(UndoWorkerLock);
+                       return;
+               }
+       }
+
+       /*
+        * Stop one of the lingering worker which is not processing the requests
+        * in the given database.
+        */
+       for (i = 0; i < max_undo_workers; i++)
+       {
+               UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+               if (w->in_use && w->lingering && w->dbid != dbid)
+                       kill(w->proc->pid, SIGTERM);
+       }
+
+       if (UndoApplyCtx->undo_launcher_latch)
+               SetLatch(UndoApplyCtx->undo_launcher_latch);
+
+       LWLockRelease(UndoWorkerLock);
+
+       return;
+}
index 0c0ddd58c54a2126fafeece05a3141dd99d42f9f..e774c5569493e6a5399d1e1047a4a6cca7023880 100644 (file)
@@ -24,6 +24,7 @@
 #include "access/sysattr.h"
 #include "access/tableam.h"
 #include "access/tupconvert.h"
+#include "access/undodiscard.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
@@ -14547,6 +14548,10 @@ PreCommit_on_commit_actions(void)
                        case ONCOMMIT_DROP:
                                oids_to_drop = lappend_oid(oids_to_drop, oc->relid);
                                break;
+                       case ONCOMMIT_TEMP_DISCARD:
+                               /* Discard temp table undo logs for temp tables. */
+                               TempUndoDiscard(oc->relid);
+                               break;
                }
        }
 
index b66b517aca9edfc4337e64ffdedfe3f1daee9cae..b3db6fbf9d2a3d6adcac54a646e1d987f46b7528 100644 (file)
@@ -15,7 +15,9 @@
 #include <unistd.h>
 
 #include "libpq/pqsignal.h"
+#include "access/discardworker.h"
 #include "access/parallel.h"
+#include "access/undoworker.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/atomics.h"
@@ -129,6 +131,15 @@ static const struct
        },
        {
                "ApplyWorkerMain", ApplyWorkerMain
+       },
+       {
+               "UndoLauncherMain", UndoLauncherMain
+       },
+       {
+               "UndoWorkerMain", UndoWorkerMain
+       },
+       {
+               "DiscardWorkerMain", DiscardWorkerMain
        }
 };
 
index d8dc0cc547f502af1104ff258a8c0c11bfda4da4..4c906f6efad8d6aa50aef75d888101c9a9e2c9df 100644 (file)
@@ -3679,6 +3679,15 @@ pgstat_get_wait_activity(WaitEventActivity w)
                case WAIT_EVENT_WAL_WRITER_MAIN:
                        event_name = "WalWriterMain";
                        break;
+               case WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN:
+                       event_name = "UndoDiscardWorkerMain";
+                       break;
+               case WAIT_EVENT_UNDO_LAUNCHER_MAIN:
+                       event_name = "UndoLauncherMain";
+                       break;
+               case WAIT_EVENT_UNDO_WORKER_MAIN:
+                       event_name = "UndoWorkerMain";
+                       break;
                        /* no default case, so that compiler will warn */
        }
 
index 3339804be914e92152276030ceb12d8dbc59de3b..6521efa09e0c6ee60bf82159473497c39103f468 100644 (file)
@@ -93,7 +93,9 @@
 #include <pthread.h>
 #endif
 
+#include "access/discardworker.h"
 #include "access/transam.h"
+#include "access/undoworker.h"
 #include "access/xlog.h"
 #include "bootstrap/bootstrap.h"
 #include "catalog/pg_control.h"
@@ -246,6 +248,8 @@ bool                enable_bonjour = false;
 char      *bonjour_name;
 bool           restart_after_crash = true;
 
+bool           enable_undo_launcher = true;
+
 /* PIDs of special child processes; 0 when not running */
 static pid_t StartupPID = 0,
                        BgWriterPID = 0,
@@ -982,6 +986,13 @@ PostmasterMain(int argc, char *argv[])
         */
        ApplyLauncherRegister();
 
+       /* Register the Undo worker launcher. */
+       if (enable_undo_launcher)
+               UndoLauncherRegister();
+
+       /* Register the Undo Discard worker. */
+       DiscardWorkerRegister();
+
        /*
         * process any libraries that should be preloaded at postmaster start
         */
index 12c324925ca90a1524a8f14a3109c3eeb04ee186..56b5d087b03bbd35f03f2cd1227da5ebab3e4af2 100644 (file)
@@ -22,6 +22,8 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "access/undolog.h"
+#include "access/undorequest.h"
+#include "access/undoworker.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -149,6 +151,8 @@ CreateSharedMemoryAndSemaphores(int port)
                size = add_size(size, BTreeShmemSize());
                size = add_size(size, SyncScanShmemSize());
                size = add_size(size, AsyncShmemSize());
+               size = add_size(size, PendingUndoShmemSize());
+               size = add_size(size, UndoLauncherShmemSize());
 #ifdef EXEC_BACKEND
                size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -220,6 +224,7 @@ CreateSharedMemoryAndSemaphores(int port)
        SUBTRANSShmemInit();
        MultiXactShmemInit();
        InitBufferPool();
+       PendingUndoShmemInit();
 
        /*
         * Set up lock manager
@@ -258,6 +263,7 @@ CreateSharedMemoryAndSemaphores(int port)
        WalSndShmemInit();
        WalRcvShmemInit();
        ApplyLauncherShmemInit();
+       UndoLauncherShmemInit();
 
        /*
         * Set up other modules that need some shared memory space
index 19e4f1f0c4add678c1bf8414dd7b2dda4d4a3212..1f650561cf61cd60cfe4f8e42c02bb332bb2026a 100644 (file)
@@ -51,3 +51,4 @@ LogicalRepWorkerLock                          43
 CLogTruncationLock                                     44
 UndoLogLock                                      45
 RollbackRequestLock                                    46
+UndoWorkerLock                                         47
index 884fa2af52f50d4815681c0977a22e03099420f8..61406cff8fd3b3f608c0dd1695d543aa3ddaef9e 100644 (file)
@@ -297,7 +297,9 @@ InitProcGlobal(void)
        ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
        SpinLockInit(ProcStructLock);
 
+       pg_atomic_init_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo, 0);
        ProcGlobal->xactsHavingPendingUndo = 0;
+       ProcGlobal->rollbackHTInitialized = false;
 }
 
 /*
index a7d1db52493ea71ffe09874155f992f2a7819a43..8b4aa0cdab65f60042557e30b799d30caf13108f 100644 (file)
@@ -33,6 +33,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/undorequest.h"
+#include "access/undoworker.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/namespace.h"
@@ -1955,6 +1956,17 @@ static struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"enable_undo_launcher", PGC_POSTMASTER, DEVELOPER_OPTIONS,
+                       gettext_noop("Decides whether to launch an undo worker."),
+                       NULL,
+                       GUC_NOT_IN_SAMPLE
+                },
+                &enable_undo_launcher,
+                true,
+                NULL, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3030,6 +3042,16 @@ static struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"max_undo_workers", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+                       gettext_noop("Maximum number of undo worker processes."),
+                       NULL,
+               },
+               &max_undo_workers,
+               4, 0, MAX_BACKENDS,
+               NULL, NULL, NULL
+       },
+
        {
                {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
                        gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."),
index 592f6e1b4aa0fb73757ce486d065081dca6bbf52..e86507c0893f86e007085a20fbcbaec59e4830f9 100644 (file)
                                        # requests are pushed to undo workers
 #pending_undo_queue_size = 1024        # size of queue used to register undo
                                        # requests
+#max_undo_workers = 4  # maximum undo workers
 
 #------------------------------------------------------------------------------
 # CLIENT CONNECTION DEFAULTS
diff --git a/src/include/access/discardworker.h b/src/include/access/discardworker.h
new file mode 100644 (file)
index 0000000..5b065bf
--- /dev/null
@@ -0,0 +1,20 @@
+/*-------------------------------------------------------------------------
+ *
+ * discardworker.h
+ *       Exports from access/undo/discardworker.c.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/discardworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _DISCARDWORKER_H
+#define _DISCARDWORKER_H
+
+extern void DiscardWorkerRegister(void);
+extern void DiscardWorkerMain(Datum main_arg) pg_attribute_noreturn();
+extern bool IsDiscardProcess(void);
+
+#endif                                                 /* _DISCARDWORKER_H */
index 7796f7248c4e4a2b436aa0918eb6ac74ff3887a7..e68e74342ac88a8c577e84c842e8ef193f89339a 100644 (file)
@@ -73,6 +73,16 @@ FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid)
        return result;
 }
 
+static inline FullTransactionId
+FullTransactionIdFromU64(uint64 fxid)
+{
+       FullTransactionId result;
+
+       result.value = fxid;
+
+       return result;
+}
+
 /* advance a transaction ID variable, handling wraparound correctly */
 #define TransactionIdAdvance(dest)     \
        do { \
diff --git a/src/include/access/undodiscard.h b/src/include/access/undodiscard.h
new file mode 100644 (file)
index 0000000..282afc0
--- /dev/null
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.h
+ *       undo discard definitions
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undodiscard.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDODISCARD_H
+#define UNDODISCARD_H
+
+#include "access/undolog.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+#include "storage/lwlock.h"
+
+extern void UndoDiscard(TransactionId xmin, bool *hibernate);
+extern void UndoLogDiscardAll(void);
+extern void TempUndoDiscard(UndoLogNumber);
+extern void UndoLogProcess(void);
+
+#endif                                                 /* UNDODISCARD_H */
index 7ec4cb0ec1447de4060811ad1fdd9bee2cf32013..b19b9314165e8ba19b8098536df537fd7c8a8375 100644 (file)
@@ -279,6 +279,19 @@ typedef struct UndoLogAllocContext
 /*
  * The in-memory control object for an undo log.  We have a fixed-sized array
  * of these.
+ *
+ * The following two locks are used to manage the discard process
+ * discard_lock - should be acquired for undo read to protect it from discard and
+ * discard worker will acquire this lock to update oldest_data.
+ *
+ * discard_update_lock - This lock will be acquired in exclusive mode by discard
+ * worker during the discard process and in shared mode to update the
+ * next_urp in previous transaction's start header.
+ *
+ * Two different locks are used so that the readers are not blocked during the
+ * actual discard but only during the update of shared memory variable which
+ * influences the visibility decision but the updaters need to be blocked for
+ * the entire discard process to ensure proper ordering of WAL records.
  */
 typedef struct UndoLogSlot
 {
diff --git a/src/include/access/undoworker.h b/src/include/access/undoworker.h
new file mode 100644 (file)
index 0000000..1bdc31f
--- /dev/null
@@ -0,0 +1,29 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoworker.h
+ *       Exports from undoworker.c.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _UNDOWORKER_H
+#define _UNDOWORKER_H
+
+/* GUC options */
+extern int max_undo_workers;
+
+/* undo worker sleep time between rounds */
+extern int     UndoWorkerDelay;
+
+extern Size UndoLauncherShmemSize(void);
+extern void UndoLauncherShmemInit(void);
+extern void UndoLauncherRegister(void);
+extern void UndoLauncherMain(Datum main_arg);
+extern void UndoWorkerMain(Datum main_arg) pg_attribute_noreturn();
+extern void WakeupUndoWorker(Oid dbid);
+
+#endif                                                 /* _UNDOWORKER_H */
index ff98d9e91a8b12cee111dd667680341d44aca8df..babcc6b1dc0b8b3197d6c42d8542b52a5289f383 100644 (file)
@@ -61,6 +61,15 @@ typedef struct CheckPoint
         * set to InvalidTransactionId.
         */
        TransactionId oldestActiveXid;
+
+       /*
+        * Oldest full transaction id which is having unapplied undo.  We include
+        * this value in the checkpoint record so that whenever server re-starts
+        * we can use this to initialize the server-wide value for same variable.
+        * Any Xid prior to this should be all-visible, so if this is not set,
+        * then the scans might try to fetch undo which can suck the performance.
+        */
+       FullTransactionId               oldestFullXidHavingUnappliedUndo;
 } CheckPoint;
 
 /* XLOG info values for XLOG rmgr */
index 860a84de7c00838ce20159073303907bc35b1c2f..cca575bb3be9469bd3389c83be087488a423e04f 100644 (file)
@@ -49,7 +49,8 @@ typedef enum OnCommitAction
        ONCOMMIT_NOOP,                          /* No ON COMMIT clause (do nothing) */
        ONCOMMIT_PRESERVE_ROWS,         /* ON COMMIT PRESERVE ROWS (do nothing) */
        ONCOMMIT_DELETE_ROWS,           /* ON COMMIT DELETE ROWS */
-       ONCOMMIT_DROP                           /* ON COMMIT DROP */
+       ONCOMMIT_DROP,                          /* ON COMMIT DROP */
+       ONCOMMIT_TEMP_DISCARD           /* ON COMMIT discard temp table undo logs */
 } OnCommitAction;
 
 /*
index 2fff6734fc330265d96f75a820d3f9890844135b..f9dc0bb9797903ccc5723b72b79159af03104253 100644 (file)
@@ -785,7 +785,10 @@ typedef enum
        WAIT_EVENT_SYSLOGGER_MAIN,
        WAIT_EVENT_WAL_RECEIVER_MAIN,
        WAIT_EVENT_WAL_SENDER_MAIN,
-       WAIT_EVENT_WAL_WRITER_MAIN
+       WAIT_EVENT_WAL_WRITER_MAIN,
+       WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN,
+       WAIT_EVENT_UNDO_LAUNCHER_MAIN,
+       WAIT_EVENT_UNDO_WORKER_MAIN
 } WaitEventActivity;
 
 /* ----------
index b692d8be110938d59fc2d214e51198c820fd24ed..b9af96deeea8f6995878bb9ac195517452c38a9e 100644 (file)
@@ -29,6 +29,7 @@ extern bool log_hostname;
 extern bool enable_bonjour;
 extern char *bonjour_name;
 extern bool restart_after_crash;
+extern bool enable_undo_launcher;
 
 #ifdef WIN32
 extern HANDLE PostmasterHandle;
index 4abb344aec66747c073b409fadfd378611bd3a76..b220a051cc0e47b4fd099b0e9b890062d451693f 100644 (file)
@@ -223,6 +223,7 @@ typedef enum BuiltinTrancheIds
        LWTRANCHE_UNDOLOG,
        LWTRANCHE_UNDODISCARD,
        LWTRANCHE_REWIND,
+       LWTRANCHE_DISCARD_UPDATE,
        LWTRANCHE_FIRST_USER_DEFINED,
 }                      BuiltinTrancheIds;
 
index 824f6bf232fbb6b702de8ae3009edb5d5da8d5d3..bcee3e1b041d655ae3cf68f90d29e51399562e6f 100644 (file)
@@ -272,8 +272,12 @@ typedef struct PROC_HDR
        int                     startupProcPid;
        /* Buffer id of the buffer that Startup process waits for pin on, or -1 */
        int                     startupBufferPinWaitBufId;
+       /* Oldest transaction id which is having undo. */
+       pg_atomic_uint64 oldestFullXidHavingUnappliedUndo;
        /* Number of aborted transactions with pending undo actions. */
        int                     xactsHavingPendingUndo;
+       /* Whether the rollback hash table is initialized after the startup? */
+       bool            rollbackHTInitialized;
 } PROC_HDR;
 
 extern PGDLLIMPORT PROC_HDR *ProcGlobal;
index da8b672096f3d90e95b06b8337d4456976b90125..1d12994d7d2da4e78c2a3c935763a69090b01bf3 100644 (file)
@@ -27,6 +27,8 @@
  * to avoid forcing to include proc.h when including procarray.h. So if you modify
  * PROC_XXX flags, you need to modify these flags.
  */
+#define                PROCARRAY_AUTOVACUUM_FLAG               0x01    /* currently running
+                                                                                                        * autovacuum */
 #define                PROCARRAY_VACUUM_FLAG                   0x02    /* currently running lazy
                                                                                                         * vacuum */
 #define                PROCARRAY_ANALYZE_FLAG                  0x04    /* currently running
@@ -41,7 +43,8 @@
  * PGXACT->vacuumFlags. Other flags are used for different purposes and
  * have no corresponding PROC flag equivalent.
  */
-#define                PROCARRAY_PROC_FLAGS_MASK       (PROCARRAY_VACUUM_FLAG | \
+#define                PROCARRAY_PROC_FLAGS_MASK       (PROCARRAY_AUTOVACUUM_FLAG | \
+                                                                                PROCARRAY_VACUUM_FLAG | \
                                                                                 PROCARRAY_ANALYZE_FLAG | \
                                                                                 PROCARRAY_LOGICAL_DECODING_FLAG)
 
@@ -50,6 +53,8 @@
 #define                PROCARRAY_FLAGS_DEFAULT                 PROCARRAY_LOGICAL_DECODING_FLAG
 /* Ignore vacuum backends */
 #define                PROCARRAY_FLAGS_VACUUM                  PROCARRAY_FLAGS_DEFAULT | PROCARRAY_VACUUM_FLAG
+/* Ignore autovacuum worker and backends running vacuum */
+#define                PROCARRAY_FLAGS_AUTOVACUUM              PROCARRAY_FLAGS_DEFAULT | PROCARRAY_AUTOVACUUM_FLAG
 /* Ignore analyze backends */
 #define                PROCARRAY_FLAGS_ANALYZE                 PROCARRAY_FLAGS_DEFAULT | PROCARRAY_ANALYZE_FLAG
 /* Ignore both vacuum and analyze backends */
index a1c90eb9057550c6862bb76a195eb0aff068c69a..6034c5e41ad67a3588d536129566881eeab950eb 100644 (file)
@@ -89,7 +89,8 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(17 rows)
+ enable_undo_launcher           | on
+(18 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail