"tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; "
"oldest xid %u in DB %u; oldest multi %u in DB %u; "
"oldest/newest commit timestamp xid: %u/%u; "
- "oldest running xid %u; %s",
+ "oldest running xid %u; "
+ "oldest full xid having unapplied undo " UINT64_FORMAT "; %s",
(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
checkpoint->ThisTimeLineID,
checkpoint->PrevTimeLineID,
checkpoint->oldestCommitTsXid,
checkpoint->newestCommitTsXid,
checkpoint->oldestActiveXid,
+ U64FromFullTransactionId(checkpoint->oldestFullXidHavingUnappliedUndo),
(info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
}
else if (info == XLOG_NEXTOID)
errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
oldest_datname),
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
- "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+ "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+ "increase max_undo_workers to allow execution of pending undo.")));
else
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
oldest_datoid),
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
- "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+ "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+ "increase max_undo_workers to allow execution of pending undo.")));
}
else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
{
oldest_datname,
xidWrapLimit - xid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
- "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+ "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+ "increase max_undo_workers to allow execution of pending undo.")));
else
ereport(WARNING,
(errmsg("database with OID %u must be vacuumed within %u transactions",
oldest_datoid,
xidWrapLimit - xid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
- "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+ "You might also need to commit or roll back old prepared transactions, or drop stale replication slots or\n"
+ "increase max_undo_workers to allow execution of pending undo.")));
}
/* Re-acquire lock and start over */
TransactionId xidStopLimit;
TransactionId xidWrapLimit;
TransactionId curXid;
+ TransactionId oldestXidHavingUndo;
+ FullTransactionId oldestFullXidHavingUndo;
Assert(TransactionIdIsNormal(oldest_datfrozenxid));
+ /*
+ * To determine the last safe xid that can be allocated, we need to
+ * consider oldestXidHavingUnapplied Undo because this is the oldest xid
+ * whose undo is not yet discarded so this is still a valid xid in the
+ * system.
+ */
+ oldestFullXidHavingUndo =
+ FullTransactionIdFromU64(pg_atomic_read_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo));
+ oldestXidHavingUndo = XidFromFullTransactionId(oldestFullXidHavingUndo);
+ if (TransactionIdIsValid(oldestXidHavingUndo))
+ oldest_datfrozenxid = Min(oldest_datfrozenxid, oldestXidHavingUndo);
+
/*
* The place where we actually get into deep trouble is halfway around
* from the oldest potentially-existing XID. (This calculation is
* Note: it's also possible that get_database_name fails and returns
* NULL, for example because the database just got dropped. We'll
* still warn, even though the warning might now be unnecessary.
+ *
+ * XXX Can we easily distinguish that the problem is due to unapplied
+ * undo or some old open transactions?
*/
if (IsTransactionState())
oldest_datname = get_database_name(oldest_datoid);
oldest_datname,
xidWrapLimit - curXid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
- "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+ "You might also need to commit or roll back old prepared transactions, or drop stale replication slots, or\n"
+ "increase max_undo_workers to allow execution of pending undo.")));
else
ereport(WARNING,
(errmsg("database with OID %u must be vacuumed within %u transactions",
oldest_datoid,
xidWrapLimit - curXid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
- "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
+ "You might also need to commit or roll back old prepared transactions, or drop stale replication slots, or\n"
+ "increase max_undo_workers to allow execution of pending undo.")));
}
}
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
+#include "access/undodiscard.h"
#include "access/undorequest.h"
#include "access/xact.h"
#include "access/xlog.h"
AtEOXact_ApplyLauncher(true);
pgstat_report_xact_timestamp(0);
+ /* In single user mode, discard all the undo logs, once committed. */
+ if (!IsUnderPostmaster)
+ UndoLogDiscardAll();
+
CurrentResourceOwner = NULL;
ResourceOwnerDelete(TopTransactionResourceOwner);
s->curTransactionOwner = NULL;
checkPoint.newestCommitTsXid = InvalidTransactionId;
checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId;
+ checkPoint.oldestFullXidHavingUnappliedUndo = InvalidFullTransactionId;
ShmemVariableCache->nextFullXid = checkPoint.nextFullXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
(errmsg_internal("commit timestamp Xid oldest/newest: %u/%u",
checkPoint.oldestCommitTsXid,
checkPoint.newestCommitTsXid)));
+ ereport(DEBUG1,
+ (errmsg_internal("oldest xid with epoch having undo: " UINT64_FORMAT,
+ U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo))));
if (!TransactionIdIsNormal(XidFromFullTransactionId(checkPoint.nextFullXid)))
ereport(PANIC,
(errmsg("invalid next transaction ID")));
checkPoint.newestCommitTsXid);
XLogCtl->ckptFullXid = checkPoint.nextFullXid;
+ /* Read oldest xid having undo from checkpoint and set in proc global. */
+ pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+ U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo));
+
/*
* Initialize replication slots, before there's a chance to remove
* required resources.
* end-of-recovery steps fail.
*/
if (InRecovery)
+ {
ResetUnloggedRelations(UNLOGGED_RELATION_INIT);
+ ResetUndoLogs(UNDO_UNLOGGED);
+ }
+
+ /* Always reset temporary undo logs. */
+ ResetUndoLogs(UNDO_TEMP);
/*
* We don't need the latch anymore. It's not strictly necessary to disown
checkPoint.nextOid += ShmemVariableCache->oidCount;
LWLockRelease(OidGenLock);
+ checkPoint.oldestFullXidHavingUnappliedUndo =
+ FullTransactionIdFromU64(pg_atomic_read_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo));
+
+
MultiXactGetCheckptMulti(shutdown,
&checkPoint.nextMulti,
&checkPoint.nextMultiOffset,
MultiXactAdvanceOldest(checkPoint.oldestMulti,
checkPoint.oldestMultiDB);
+ pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+ U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo));
+
/*
* No need to set oldestClogXid here as well; it'll be set when we
* redo an xl_clog_truncate if it changed since initialization.
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
ControlFile->checkPointCopy.nextFullXid = checkPoint.nextFullXid;
+ ControlFile->checkPointCopy.oldestFullXidHavingUnappliedUndo =
+ checkPoint.oldestFullXidHavingUnappliedUndo;
/* Update shared-memory copy of checkpoint XID/epoch */
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->ckptFullXid = checkPoint.nextFullXid;
SpinLockRelease(&XLogCtl->info_lck);
+ ControlFile->checkPointCopy.oldestFullXidHavingUnappliedUndo =
+ checkPoint.oldestFullXidHavingUnappliedUndo;
+
/*
* We should've already switched to the new TLI before replaying this
* record.
MultiXactAdvanceNextMXact(checkPoint.nextMulti,
checkPoint.nextMultiOffset);
+ pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+ U64FromFullTransactionId(checkPoint.oldestFullXidHavingUnappliedUndo));
+
/*
* NB: This may perform multixact truncation when replaying WAL
* generated by an older primary.
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = undoaccess.o undoaction.o undoactionxlog.o undolog.o undorecord.o \
- undorequest.o
+OBJS = discardworker.o undoaccess.o undoaction.o undoactionxlog.o undodiscard.o \
+ undolog.o undorecord.o undorequest.o undoworker.o
include $(top_srcdir)/src/backend/common.mk
postpone applying undo actions for subtransactions as the modifications
made by aborted subtransaction must not be visible even if the main transaction
commits.
+
+Undo Requests and Undo workers
+-------------------------------
+To improve the efficiency of the rollbacks, we create three queues and a hash
+table for the rollback requests. A Xid based priority queue will allow us to
+process the requests of older transactions and help us to move
+oldesdXidHavingUnappliedUndo (this is a xid-horizon below which all the
+transactions are visible) forward. A size-based queue which will help us to
+perform the rollbacks of larger aborts in a timely fashion, so that we don't get
+stuck while processing them during discard of the logs. An error queue to hold
+the requests for transactions that failed to apply its undo. The rollback hash
+table is used to avoid duplicate undo requests by backends and discard worker.
+The table must be able to accommodate all active undo requests. The undo
+requests must appear in both xid and size requests queues or neither. As of now,
+we process the requests from these queues in a round-robin fashion to give equal
+priority to all three types of requests.
+
+Note that, if the request queues are full, then we put backpressure on backends
+to complete the requests by themselves. There is an exception to it where when
+error queue becomes full, we just mark the request as 'invalid' and continue to
+process other requests if any. The discard worker will find this errored
+transaction at later point of time and again add it to the request queues.
+
+We have the hard limit (proportional to the size of the rollback hash table)
+for the number of transactions that can have pending undo. This can help us
+in computing the value of oldestXidHavingUnappliedUndo and allowing us not to
+accumulate pending undo for a long time which will eventually block the
+discard of undo.
+
+In a running system, scanning the rollback hash table will give us the value of
+oldestXidHavingUnappliedUndo, however, after startup, we need to once scan all
+the undo logs and populate the rollback hash table. After startup, we allow
+connections, but don't allow transactions that want to write undo till the
+rollback hash table is initialized.
+
+To process the request, we get the request from one of the queues, search it in
+hash table and mark it as in-progress and then remove from the respective queue.
+After that, we perform the request which means apply the undo actions and
+remove it from the hash table.
+
+To apply the undo actions, we collect the undo records in bulk and try to
+process them together. We ensure to update the transaction's progress at
+regular intervals so that after a crash we can skip already applied undo. The
+undo apply progress is updated in terms of the number of blocks processed.
+Undo apply progress value XACT_APPLY_PROGRESS_COMPLETED indicates that all the
+undo is applied, XACT_APPLY_PROGRESS_NOT_STARTED indicates that no undo action
+has been applied yet and any other value indicates that we have applied undo
+partially and after crash recovery, we need to start processing the undo from
+the same location.
+
+Undo launcher is responsible for launching the workers iff there is some work
+available in one of the work queues and there are more workers available. The
+worker is launched to handle requests for a particular database. Each undo
+worker then start reading from one of the queues the requests for that
+particular database. A worker would peek into each queue for the requests from
+a particular database, if it needs to switch a database in less than
+undo_worker_quantum ms (10s as default) after starting. Also, if there is no
+work, it lingers for UNDO_WORKER_LINGER_MS (10s as default). This avoids
+restarting the workers too frequently.
+
+Discard Worker
+---------------
+The discard worker is responsible for discarding the undo log of transactions
+that are committed and all-visible or are rolled-back. It also registers the
+request for aborted transactions in the work queues. It iterates through all
+the active logs one-by-one and try to discard the transactions that are old
+enough to matter.
+
+For transactions that span across multiple logs, the log for committed and
+all-visible transactions are discarded separately for each log. This is
+possible as the transactions that span across logs have separate transaction
+header for each log. For aborted transactions, we try to process the actions
+of the entire transaction at one-shot as we need to perform the actions
+starting from end location to start location. However, it is possible that the
+later portion of the transaction that is overflowed into a separate log can be
+processed separately if we encounter the corresponding log first. If we want
+we can combine the log for processing in that case as well, but there is no
+clear advantage of the same.
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * discardworker.c
+ * The undo discard worker for asynchronous undo management.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/discardworker.c
+ *
+ * The main responsibility of the discard worker is to discard the undo log
+ * of transactions that are committed and all-visible or are rolledback. It
+ * also registers the request for aborted transactions in the work queues.
+ * To know more about work queues, see undorequest.c. It iterates through all
+ * the active logs one-by-one and try to discard the transactions that are old
+ * enough to matter.
+ *
+ * For tranasctions that spans across multiple logs, the log for committed and
+ * all-visible transactions are discarded seprately for each log. This is
+ * possible as the transactions that span across logs have separate transaction
+ * header for each log. For aborted transactions, we try to process the actions
+ * of entire transaction at one-shot as we need to perform the actions starting
+ * from end location to start location. However, it is possbile that the later
+ * portion of transaction that is overflowed into a separate log can be processed
+ * separately if we encounter the corresponding log first. If we want we can
+ * combine the log for processing in that case as well, but there is no clear
+ * advantage of the same.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include <unistd.h>
+
+#include "access/undodiscard.h"
+#include "access/discardworker.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "tcop/tcopprot.h"
+#include "utils/guc.h"
+#include "utils/resowner.h"
+
+static void undoworker_sigterm_handler(SIGNAL_ARGS);
+
+/* minimum and maximum sleep time for discard worker */
+#define MIN_NAPTIME_PER_CYCLE 100L
+#define DELAYED_NAPTIME 10 * MIN_NAPTIME_PER_CYCLE
+#define MAX_NAPTIME_PER_CYCLE 100 * MIN_NAPTIME_PER_CYCLE
+
+static volatile sig_atomic_t got_SIGTERM = false;
+static bool hibernate = false;
+static long wait_time = MIN_NAPTIME_PER_CYCLE;
+static bool am_discard_worker = false;
+
+/* SIGTERM: set flag to exit at next convenient time */
+static void
+undoworker_sigterm_handler(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGTERM = true;
+
+ /* Waken anything waiting on the process latch */
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * DiscardWorkerRegister -- Register a undo discard worker.
+ */
+void
+DiscardWorkerRegister(void)
+{
+ BackgroundWorker bgw;
+
+ memset(&bgw, 0, sizeof(bgw));
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ snprintf(bgw.bgw_name, BGW_MAXLEN, "discard worker");
+ sprintf(bgw.bgw_library_name, "postgres");
+ sprintf(bgw.bgw_function_name, "DiscardWorkerMain");
+ bgw.bgw_restart_time = 5;
+ bgw.bgw_notify_pid = 0;
+ bgw.bgw_main_arg = (Datum) 0;
+
+ RegisterBackgroundWorker(&bgw);
+}
+
+/*
+ * DiscardWorkerMain -- Main loop for the undo discard worker.
+ */
+void
+DiscardWorkerMain(Datum main_arg)
+{
+ ereport(LOG,
+ (errmsg("discard worker started")));
+
+ /* Establish signal handlers. */
+ pqsignal(SIGTERM, undoworker_sigterm_handler);
+ BackgroundWorkerUnblockSignals();
+
+ am_discard_worker = true;
+
+ /* Make it easy to identify our processes. */
+ SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
+ PGC_USERSET, PGC_S_SESSION);
+
+ /* Establish connection to nailed catalogs. */
+ BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+
+ /*
+ * Scan all the undo logs and intialize the rollback hash table with all
+ * the pending rollback requests. This need to be done as a first step
+ * because only after this the transactions will be allowed to write new
+ * undo. See comments atop UndoLogProcess.
+ */
+ UndoLogProcess();
+
+ /* Enter main loop */
+ while (!got_SIGTERM)
+ {
+ TransactionId OldestXmin,
+ oldestXidHavingUndo;
+ FullTransactionId oldestFullXidHavingUndo;
+ int rc;
+
+ /*
+ * It is okay to ignore vacuum transaction here, as we can discard the
+ * undo of the vacuuming transaction if the transaction is committed.
+ * We don't need to hold its undo for the visibility purpose.
+ */
+ OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_AUTOVACUUM |
+ PROCARRAY_FLAGS_VACUUM);
+
+ oldestFullXidHavingUndo =
+ FullTransactionIdFromU64(pg_atomic_read_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo));
+ oldestXidHavingUndo = XidFromFullTransactionId(oldestFullXidHavingUndo);
+
+ /*
+ * Call the discard routine if oldestXidHavingUndo is lagging behind
+ * OldestXmin.
+ */
+ if (OldestXmin != InvalidTransactionId &&
+ TransactionIdPrecedes(oldestXidHavingUndo, OldestXmin))
+ {
+ UndoDiscard(OldestXmin, &hibernate);
+
+ /*
+ * If we got some undo logs to discard or discarded something,
+ * then reset the wait_time as we have got work to do. Note that
+ * if there are some undologs that cannot be discarded, then above
+ * condition will remain unsatisfied till oldestXmin remains
+ * unchanged and the wait_time will not reset in that case.
+ */
+ if (!hibernate)
+ wait_time = MIN_NAPTIME_PER_CYCLE;
+ }
+
+ /* Wait for more work. */
+ rc = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ wait_time,
+ WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(&MyProc->procLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /*
+ * Increase the wait_time based on the length of inactivity. If
+ * wait_time is within one second, then increment it by 100 ms at a
+ * time. Henceforth, increment it one second at a time, till it
+ * reaches ten seconds. Never increase the wait_time more than ten
+ * seconds, it will be too much of waiting otherwise.
+ */
+ if (rc & WL_TIMEOUT && hibernate)
+ {
+ wait_time += (wait_time < DELAYED_NAPTIME ?
+ MIN_NAPTIME_PER_CYCLE : DELAYED_NAPTIME);
+ if (wait_time > MAX_NAPTIME_PER_CYCLE)
+ wait_time = MAX_NAPTIME_PER_CYCLE;
+ }
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+ }
+
+ /* Normal exit from discard worker */
+ ereport(LOG,
+ (errmsg("discard worker shutting down")));
+
+ proc_exit(0);
+}
+
+/*
+ * IsDiscardProcess -- Tells whether the current process is a discard worker
+ * process.
+ */
+bool
+IsDiscardProcess(void)
+{
+ return am_discard_worker;
+}
#include "storage/buf.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
+#include "storage/proc.h"
#include "miscadmin.h"
/*
*/
UndoCompressionInfo undo_compression_info[UndoLogCategories];
+/*
+ * Defines the number of times we try to wait for rollback hash table to get
+ * initialized. After these many attempts it will return error and the user
+ * can retry the operation.
+ */
+#define ROLLBACK_HT_INIT_WAIT_TRY 60
+
/* Prototypes for static functions. */
static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
UndoRecPtr urp, RelFileNode rnode,
UndoCompressionInfo *compression_info =
&context->undo_compression_info[context->alloc_context.category];
+ if (!InRecovery && IsUnderPostmaster)
+ {
+ int try_count = 0;
+
+ /*
+ * If we are not in a recovery and not in a single-user-mode, then undo
+ * generation should not be allowed until we have scanned all the undo
+ * logs and initialized the hash table with all the aborted
+ * transaction entries. See detailed comments in UndoLogProcess.
+ */
+ while (!ProcGlobal->rollbackHTInitialized)
+ {
+ /* Error out after trying for one minute. */
+ if (try_count > ROLLBACK_HT_INIT_WAIT_TRY)
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_E_MODIFYING_SQL_DATA_NOT_PERMITTED),
+ errmsg("rollback hash table is not yet initialized, wait for sometime and try again")));
+
+ /*
+ * Rollback hash table is not yet intialized, sleep for 1 second
+ * and try again.
+ */
+ pg_usleep(1000000L);
+ try_count++;
+ }
+ }
+
+ /*
+ * If the rollback hash table is already full (excluding one additional
+ * space for each backend) then don't allow to generate any new undo until
+ * we apply some of the pending requests and create some space in the hash
+ * table to accept new rollback requests. Leave the enough slots in the
+ * hash table so that there is space for all the backends to register at
+ * least one request. This is to protect the situation where one backend
+ * keep consuming slots reserve for the other backends and suddenly there
+ * is concurrent undo request from all the backends. So we always keep
+ * the space reserve for MaxBackends.
+ */
+ if (ProcGlobal->xactsHavingPendingUndo >
+ (UndoRollbackHashTableSize() - MaxBackends))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("max limit for pending rollback request has reached, wait for sometime and try again")));
+
/* Already reached maximum prepared limit. */
if (context->nprepared_undo == context->max_prepared_undo)
elog(ERROR, "already reached the maximum prepared limit");
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);
- phdr = (UndoPageHeader)page;
+ phdr = (UndoPageHeader) page;
/* Calculate the size of the partial record. */
partial_rec_size = UndoRecordHeaderSize(phdr->uur_info) +
- phdr->tuple_len + phdr->payload_len -
- phdr->record_offset;
+ phdr->tuple_len + phdr->payload_len -
+ phdr->record_offset;
/* calculate the offset in current log. */
offset_cur_page = SizeOfUndoPageHeaderData + partial_rec_size;
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undodiscard.c
+ * discard undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undodiscard.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/undolog.h"
+#include "access/undodiscard.h"
+#include "access/undorequest.h"
+#include "catalog/pg_tablespace.h"
+#include "miscadmin.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "utils/resowner.h"
+
+/*
+ * Discard as many record sets as we can from the undo log occupying a given
+ * slot, considering the given xmin horizon. If we encounter a record set
+ * that needs to be rolled back, register a rollback request. Set *hibernate
+ * to false if work was done.
+ */
+static void
+UndoDiscardOneLog(UndoLogSlot *slot, TransactionId xmin, bool *hibernate)
+{
+ UndoRecPtr undo_recptr, next_insert;
+ UndoRecPtr next_urecptr = InvalidUndoRecPtr;
+ UnpackedUndoRecord *uur = NULL;
+ bool need_discard = false;
+ FullTransactionId undofxid = InvalidFullTransactionId;
+ TransactionId latest_discardxid = InvalidTransactionId;
+ UndoLogNumber logno;
+
+ /*
+ * Currently we expect only one discard worker to be active at any time,
+ * but in future we might have more than one, and superuser maintenance
+ * functions might also discard data concurrently. So we we have to
+ * assume that the given slot could be recycled underneath us any time we
+ * don't hold one of the locks that prevents that. We'll detect that by
+ * the log number changing.
+ */
+ LWLockAcquire(&slot->discard_lock, LW_SHARED);
+ logno = slot->logno;
+ if (UndoRecPtrIsValid(slot->oldest_data))
+ {
+ undo_recptr = slot->oldest_data;
+ LWLockRelease(&slot->discard_lock);
+ }
+ else
+ {
+ LWLockRelease(&slot->discard_lock);
+ undo_recptr = UndoLogGetOldestRecord(logno, NULL);
+ }
+
+ /* There might not be any undo log and hibernation might be needed. */
+ *hibernate = true;
+
+ StartTransactionCommand();
+
+ /* Loop until we run out of discardable transactions in the given log. */
+ do
+ {
+ TransactionId wait_xid = InvalidTransactionId;
+ bool pending_abort = false;
+ bool request_rollback = false;
+ UndoStatus status;
+ UndoRecordFetchContext context;
+
+ next_insert = UndoLogGetNextInsertPtr(logno);
+
+ /* There must be some undo data for a transaction. */
+ Assert(next_insert != undo_recptr);
+
+ /* Fetch the undo record for the given undo_recptr. */
+ BeginUndoFetch(&context);
+ uur = UndoFetchRecord(&context, undo_recptr);
+ FinishUndoFetch(&context);
+
+ if (uur != NULL)
+ {
+ if (UndoRecPtrGetCategory(undo_recptr) == UNDO_SHARED)
+ {
+ /*
+ * For the "shared" category, we only discard when the
+ * rm_undo_status callback tells us we can.
+ */
+ status = RmgrTable[uur->uur_rmid].rm_undo_status(uur, &wait_xid);
+
+ Assert((status == UNDO_STATUS_WAIT_XMIN &&
+ TransactionIdIsValid(wait_xid)) ^
+ (status == UNDO_STATUS_DISCARD &&
+ !TransactionIdIsValid(wait_xid)));
+ }
+ else
+ {
+ TransactionId xid = XidFromFullTransactionId(uur->uur_fxid);
+
+ /*
+ * Otherwise we use the CLOG and xmin to decide whether to
+ * wait, discard or roll back.
+ *
+ * XXX: We've added the transaction-in-progress check to
+ * avoid xids of in-progress autovacuum as those are not
+ * computed for oldestxmin calculation. See
+ * DiscardWorkerMain.
+ */
+ if (TransactionIdDidCommit(xid))
+ {
+ /*
+ * If this record set's xid isn't before the xmin
+ * horizon, we'll have to wait before we can discard
+ * it.
+ */
+ if (TransactionIdFollowsOrEquals(xid, xmin))
+ wait_xid = xid;
+
+ }
+ else if (!TransactionIdIsInProgress(xid))
+ {
+ /*
+ * If it hasn't been applied already, then we'll ask
+ * for it to be applied now. Otherwise it'll be
+ * discarded.
+ */
+ if (!IsXactApplyProgressCompleted(uur->uur_txn->urec_progress))
+ request_rollback = true;
+ }
+ else
+ {
+ /*
+ * It's either in progress or isn't yet before the
+ * xmin horizon, so we'll have to wait.
+ */
+ wait_xid = XidFromFullTransactionId(uur->uur_fxid);
+ }
+ }
+
+ /*
+ * Add the aborted transaction to the rollback request queues.
+ *
+ * We can ignore the abort for transactions whose corresponding
+ * database doesn't exist.
+ */
+ if (request_rollback && dbid_exists(uur->uur_txn->urec_dbid))
+ {
+ (void) RegisterRollbackReq(InvalidUndoRecPtr,
+ undo_recptr,
+ uur->uur_txn->urec_dbid,
+ uur->uur_fxid);
+
+ pending_abort = true;
+ }
+
+ next_urecptr = uur->uur_txn->urec_next;
+ undofxid = uur->uur_fxid;
+
+ UndoRecordRelease(uur);
+ uur = NULL;
+ }
+
+ /*
+ * We can discard upto this point when one of following conditions is
+ * met: (a) we need to wait for a transaction first. (b) there is no
+ * more log to process. (c) the transaction undo in current log is
+ * finished. (d) there is a pending abort.
+ */
+ if (TransactionIdIsValid(wait_xid) ||
+ next_urecptr == InvalidUndoRecPtr ||
+ UndoRecPtrGetLogNo(next_urecptr) != logno ||
+ pending_abort)
+ {
+ /* Hey, I got some undo log to discard, can not hibernate now. */
+ *hibernate = false;
+
+ /*
+ * If we don't need to wait for this transaction and this is not
+ * an aborted transaction, then we can discard it as well.
+ */
+ if (!TransactionIdIsValid(wait_xid) && !pending_abort)
+ {
+ /*
+ * It is safe to use next_insert as the location till which we
+ * want to discard in this case. If something new has been
+ * added after we have fetched this transaction's record, it
+ * won't be considered in this pass of discard.
+ */
+ undo_recptr = next_insert;
+ latest_discardxid = XidFromFullTransactionId(undofxid);
+ need_discard = true;
+
+ /* We don't have anything more to discard. */
+ undofxid = InvalidFullTransactionId;
+ }
+
+ /* Update the shared memory state. */
+ LWLockAcquire(&slot->discard_lock, LW_EXCLUSIVE);
+
+ /*
+ * If the slot has been recycling while we were thinking about it,
+ * we have to abandon the operation.
+ */
+ if (slot->logno != logno)
+ {
+ LWLockRelease(&slot->discard_lock);
+ break;
+ }
+
+ /* Update the slot information for the next pass of discard. */
+ slot->wait_fxmin = undofxid;
+ slot->oldest_data = undo_recptr;
+
+ LWLockRelease(&slot->discard_lock);
+
+ if (need_discard)
+ {
+ LWLockAcquire(&slot->discard_update_lock, LW_EXCLUSIVE);
+ UndoLogDiscard(undo_recptr, latest_discardxid);
+ LWLockRelease(&slot->discard_update_lock);
+ }
+
+ break;
+ }
+
+ /*
+ * This transaction is smaller than the xmin so lets jump to the next
+ * transaction.
+ */
+ undo_recptr = next_urecptr;
+ latest_discardxid = XidFromFullTransactionId(undofxid);
+
+ Assert(uur == NULL);
+
+ /* If we reach here, this means there is something to discard. */
+ need_discard = true;
+ } while (true);
+
+ CommitTransactionCommand();
+}
+
+/*
+ * Scan all the undo logs and register the aborted transactions. This is
+ * called as a first function from the discard worker and only after this pass
+ * over undo logs is complete, new undo can is allowed to be written in the
+ * system. This is required because after crash recovery we don't know the
+ * exact number of aborted transactions whose rollback request is pending and
+ * we can not allow new undo request if we already have the request equal to
+ * hash table size. So before start allowing any new transaction to write the
+ * undo we need to make sure that we know exact number of pending requests.
+ */
+void
+UndoLogProcess()
+{
+ UndoLogSlot *slot = NULL;
+
+ /*
+ * We need to perform this in a transaction because (a) we need resource
+ * owner to scan the logs and (b) TransactionIdIsInProgress requires us to
+ * be in transaction.
+ */
+ StartTransactionCommand();
+
+ /*
+ * Loop through all the valid undo logs and scan them transaction by
+ * transaction to find non-commited transactions if any and register them
+ * in the rollback hash table.
+ */
+ while ((slot = UndoLogNextSlot(slot)))
+ {
+ UndoRecPtr undo_recptr;
+ UnpackedUndoRecord *uur = NULL;
+
+ /* We do not execute shared (non-transactional) undo records. */
+ if (slot->meta.category == UNDO_SHARED)
+ continue;
+
+ /* Start scanning the log from the last discard point. */
+ undo_recptr = UndoLogGetOldestRecord(slot->logno, NULL);
+
+ /* Loop until we scan complete log. */
+ while (1)
+ {
+ TransactionId xid;
+ UndoRecordFetchContext context;
+
+ /* Done with this log. */
+ if (!UndoRecPtrIsValid(undo_recptr))
+ break;
+
+ /* Fetch the undo record for the given undo_recptr. */
+ BeginUndoFetch(&context);
+ uur = UndoFetchRecord(&context, undo_recptr);
+ FinishUndoFetch(&context);
+
+ Assert(uur != NULL);
+
+ xid = XidFromFullTransactionId(uur->uur_fxid);
+
+ /*
+ * Register the rollback request for all uncommitted and not in
+ * progress transactions whose undo apply progress is still not
+ * completed. Even though we don't allow any new transactions to
+ * write undo until this first pass is completed, there might be
+ * some prepared transactions which are still in progress, so we
+ * don't include such transactions.
+ */
+ if (!TransactionIdDidCommit(xid) &&
+ !TransactionIdIsInProgress(xid) &&
+ !IsXactApplyProgressCompleted(uur->uur_txn->urec_progress))
+ {
+ (void) RegisterRollbackReq(InvalidUndoRecPtr, undo_recptr,
+ uur->uur_txn->urec_dbid, uur->uur_fxid);
+ }
+
+ /*
+ * Go to the next transaction in the same log. If uur_next is
+ * point to the undo record pointer in the different log then we are
+ * done with this log so just set undo_recptr to InvalidUndoRecPtr.
+ */
+ if (UndoRecPtrGetLogNo(undo_recptr) ==
+ UndoRecPtrGetLogNo(uur->uur_txn->urec_next))
+ undo_recptr = uur->uur_txn->urec_next;
+ else
+ undo_recptr = InvalidUndoRecPtr;
+
+ /* Release memory for the current record. */
+ UndoRecordRelease(uur);
+ }
+ }
+
+ CommitTransactionCommand();
+
+ /* Allow the transactions to start writting undo. */
+ ProcGlobal->rollbackHTInitialized = true;
+}
+
+/*
+ * Discard the undo for all the transactions whose xid is smaller than
+ * oldestXmin
+ */
+void
+UndoDiscard(TransactionId oldestXmin, bool *hibernate)
+{
+ FullTransactionId oldestXidHavingUndo;
+ UndoLogSlot *slot = NULL;
+ uint32 epoch;
+
+ /*
+ * If all the undo logs are discarded, then oldestXidHavingUndo should be
+ * oldestXmin. As of now, we don't allow more than 2 billion xids in the
+ * system, so we can rely on the epoch retrieved with GetEpochForXid.
+ */
+ epoch = GetEpochForXid(oldestXmin);
+ oldestXidHavingUndo = FullTransactionIdFromEpochAndXid(epoch, oldestXmin);
+
+ /*
+ * Iterate through all the active logs and one-by-one try to discard the
+ * transactions that are old enough to matter.
+ *
+ * XXX Ideally we can arrange undo logs so that we can efficiently find
+ * those with oldest_xid < oldestXmin, but for now we'll just scan all of
+ * them.
+ */
+ while ((slot = UndoLogNextSlot(slot)))
+ {
+ /*
+ * If the log is already discarded, then we are done. It is important
+ * to first check this to ensure that tablespace containing this log
+ * doesn't get dropped concurrently.
+ */
+ LWLockAcquire(&slot->mutex, LW_SHARED);
+ /*
+ * We don't have to worry about slot recycling and check the logno
+ * here, since we don't care about the identity of this slot, we're
+ * visiting all of them.
+ */
+ if (slot->meta.discard == slot->meta.unlogged.insert)
+ {
+ LWLockRelease(&slot->mutex);
+ continue;
+ }
+ LWLockRelease(&slot->mutex);
+
+ /* We can't process temporary undo logs. */
+ if (slot->meta.category == UNDO_TEMP)
+ continue;
+
+ /*
+ * If the first xid of the undo log is smaller than the xmin then try
+ * to discard the undo log.
+ */
+ if (!FullTransactionIdIsValid(slot->wait_fxmin) ||
+ FullTransactionIdPrecedes(slot->wait_fxmin, oldestXidHavingUndo))
+ {
+ /* Process the undo log. */
+ UndoDiscardOneLog(slot, oldestXmin, hibernate);
+ }
+ }
+
+ /* Get the smallest of 'xid having pending undo' and 'oldestXmin' */
+ oldestXidHavingUndo = RollbackHTGetOldestFullXid(oldestXidHavingUndo);
+
+ /*
+ * Update the oldestFullXidHavingUnappliedUndo in the shared memory.
+ *
+ * XXX: In future, if multiple workers can perform discard then we may
+ * need to use compare and swap for updating the shared memory value.
+ */
+ if (FullTransactionIdIsValid(oldestXidHavingUndo))
+ pg_atomic_write_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo,
+ U64FromFullTransactionId(oldestXidHavingUndo));
+}
+
+/*
+ * Discard all the logs. This is particularly required in single user mode
+ * where at the commit time we discard all the undo logs.
+ */
+void
+UndoLogDiscardAll(void)
+{
+ UndoLogSlot *slot = NULL;
+
+ Assert(!IsUnderPostmaster);
+
+ /*
+ * No locks are required for discard, since this called only in single
+ * user mode.
+ */
+ while ((slot = UndoLogNextSlot(slot)))
+ {
+ /* If the log is already discarded, then we are done. */
+ if (slot->meta.discard == slot->meta.unlogged.insert)
+ continue;
+
+ /*
+ * Process the undo log.
+ */
+ UndoLogDiscard(MakeUndoRecPtr(slot->logno, slot->meta.unlogged.insert),
+ InvalidTransactionId);
+ }
+
+}
+
+/*
+ * Discard the undo logs for temp tables.
+ */
+void
+TempUndoDiscard(UndoLogNumber logno)
+{
+ UndoLogSlot *slot = UndoLogGetSlot(logno, false);
+
+ /*
+ * Discard the undo log for temp table only. Ensure that there is
+ * something to be discarded there.
+ */
+ Assert (slot->meta.category == UNDO_TEMP);
+
+ /*
+ * If the log is already discarded, then we are done. It is important
+ * to first check this to ensure that tablespace containing this log
+ * doesn't get dropped concurrently.
+ */
+ LWLockAcquire(&slot->mutex, LW_SHARED);
+ if (slot->meta.discard == slot->meta.unlogged.insert)
+ {
+ LWLockRelease(&slot->mutex);
+ return;
+ }
+ LWLockRelease(&slot->mutex);
+
+ /* Process the undo log. */
+ UndoLogDiscard(MakeUndoRecPtr(slot->logno, slot->meta.unlogged.insert),
+ InvalidTransactionId);
+}
LWTRANCHE_UNDOLOG);
LWLockInitialize(&UndoLogShared->slots[i].discard_lock,
LWTRANCHE_UNDODISCARD);
+ LWLockInitialize(&UndoLogShared->slots[i].discard_update_lock,
+ LWTRANCHE_DISCARD_UPDATE);
}
}
else
#include "postgres.h"
#include "miscadmin.h"
+#include "access/discardworker.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/transam.h"
#include "access/undorequest.h"
+#include "access/undoworker.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_database.h"
return sz;
}
+/*
+ * Fetch the start urec pointer for the transaction and the undo request size.
+ *
+ * start_urecptr_inout - This is an INOUT parameter. If a transaction has
+ * overflowed to multiple undo logs, the caller can set start_urecptr_inout
+ * to a location of any of the undo logs where the transaction has written its
+ * first record for that particular log. Given that, this function calculates
+ * the undo location where the transaction has inserted its first undo record.
+ * If a transaction hasn't overflowed to multiple undo logs, the value of this
+ * parameter remains unchanged.
+ *
+ * The first record of a transaction in each undo log contains a reference to
+ * the first record of this transaction in the previous log. It finds the
+ * initial location by moving backward in the undo chain of this transaction
+ * across undo logs. While doing the same, it also calculates the undo size
+ * between the input and output start undo record pointer value.
+ */
+static uint64
+FindUndoStartLocationAndSize(UndoRecPtr *start_urecptr_inout,
+ FullTransactionId full_xid)
+{
+ UnpackedUndoRecord *uur = NULL;
+ UndoLogSlot *slot = NULL;
+ UndoRecPtr urecptr = InvalidUndoRecPtr;
+ uint64 sz = 0;
+
+ Assert(start_urecptr_inout);
+
+ urecptr = *start_urecptr_inout;
+ Assert(urecptr != InvalidUndoRecPtr);
+
+ /*
+ * A backend always set the start undo record pointer to the first undo
+ * record inserted by this transaction. Hence, we don't have to proceed
+ * further.
+ */
+ if (!IsDiscardProcess())
+ return sz;
+
+ /*
+ * Since the discard worker processes the undo logs sequentially, it's
+ * possible that start undo record pointer doesn't refer to the actual
+ * start of the transaction. Instead, it may refer to the start location
+ * of the transaction in any of the subsequent logs. In that case, we've
+ * to find the actual start location of the transaction by going backwards
+ * in the chain.
+ */
+ while (true)
+ {
+ UndoLogOffset next_insert;
+ UndoRecordFetchContext context;
+
+ /*
+ * Fetch the log and undo record corresponding to the current undo
+ * pointer.
+ */
+ if ((slot == NULL) || (UndoRecPtrGetLogNo(urecptr) != slot->logno))
+ slot = UndoLogGetSlot(UndoRecPtrGetLogNo(urecptr), false);
+
+ Assert(slot != NULL);
+
+ /* The corresponding log must be ahead urecptr. */
+ Assert(MakeUndoRecPtr(slot->logno, slot->meta.unlogged.insert) >= urecptr);
+
+ /* Fetch the undo record. */
+ BeginUndoFetch(&context);
+ uur = UndoFetchRecord(&context, urecptr);
+ FinishUndoFetch(&context);
+
+ /*
+ * Since the rollback isn't completed for this transaction, this undo
+ * record can't be discarded.
+ */
+ Assert (uur != NULL);
+
+ /* The undo must belongs to a same transaction. */
+ Assert(FullTransactionIdEquals(full_xid, uur->uur_fxid));
+
+ /*
+ * Since this is the first undo record of this transaction in this
+ * log, this must include the transaction header.
+ */
+ Assert(uur->uur_info & UREC_INFO_TRANSACTION);
+
+ /*
+ * If this is the first undo record of this transaction, return from
+ * here.
+ */
+ if ((uur->uur_info & UREC_INFO_LOGSWITCH) == 0)
+ {
+ UndoRecordRelease(uur);
+ break;
+ }
+
+ /*
+ * This is a start of a overflowed transaction header, so it must have
+ * a valid pointer to previous log's start transaction header.
+ */
+ Assert(UndoRecPtrIsValid(uur->uur_logswitch->urec_prevlogstart));
+
+
+ /*
+ * Find the previous log from which the transaction is overflowed
+ * to current log.
+ */
+ urecptr = uur->uur_logswitch->urec_prevlogstart;
+ slot = UndoLogGetSlot(UndoRecPtrGetLogNo(urecptr), false);
+
+ /*
+ * When a transaction overflows to a new undo log, it's guaranteed
+ * that this transaction will be the last transaction in the previous
+ * log and we mark that log as full so that no other transaction can
+ * write in that log further. Check UndoLogAllocate for details.
+ *
+ * So, to find the undo size in the previous log, we've to find the
+ * next insert location of the previous log and subtract current
+ * transaction's start location in the previous log from it.
+ */
+ next_insert = UndoLogGetNextInsertPtr(slot->logno);
+ Assert(UndoRecPtrIsValid(next_insert));
+
+ sz += (next_insert - urecptr);
+
+ UndoRecordRelease(uur);
+ uur = NULL;
+ }
+
+ *start_urecptr_inout = urecptr;
+
+ return sz;
+}
+
/*
* Returns true, if we can push the rollback request to undo wrokers, false,
* otherwise.
/*
* We normally push the rollback request to undo workers if the size of
- * same is above a certain threshold.
+ * same is above a certain threshold. However, discard worker is allowed
+ * to push any size request provided there is a space in rollback request
+ * queue. This is mainly because discard worker can be processing the
+ * rollback requests after crash recovery when no backend is alive.
+ *
+ * We have a race condition where discard worker can process the request
+ * before the backend which has aborted the transaction in which case
+ * backend won't do anything. Normally, this won't happen because
+ * backends try to apply the undo actions immediately after marking the
+ * transaction as aborted in the clog. One way to avoid this race
+ * condition is that we register the request by backend in hash table but
+ * not in rollback queues before marking abort in clog and then later add
+ * them in rollback queues. However, we are not sure how important it is
+ * avoid such a race as this won't lead to any problem and OTOH, we might
+ * need some more trickery in the code to avoid such a race condition.
*/
- if (req_size >= rollback_overflow_size * 1024 * 1024)
+ if (req_size >= rollback_overflow_size * 1024 * 1024 ||
+ IsDiscardProcess())
{
if (GetXidQueueSize() >= pending_undo_queue_size ||
GetSizeQueueSize() >= pending_undo_queue_size)
if ((GetXidQueueSize() < pending_undo_queue_size))
{
Assert(GetSizeQueueSize() < pending_undo_queue_size);
-
- /*
- * XXX - Here, we should return true once we have background
- * worker facility.
- */
- return false;
+ return true;
}
}
Assert(UndoRecPtrIsValid(start_urec_ptr));
Assert(dbid != InvalidOid);
+ /*
+ * The discard worker can only send the start undo record pointer of a
+ * transaction. It doesn't set the end_urec_ptr.
+ */
+ Assert(IsDiscardProcess() || UndoRecPtrIsValid(end_urec_ptr));
+
/*
* Find the rollback request size and the end_urec_ptr (in case of discard
* worker only).
if (!UndoRecPtrIsValid(end_urec_ptr))
return false;
+ /*
+ * For registering a rollback request, we always store the full transaction
+ * ID and the first undo record pointer inserted by this transaction. This
+ * ensures that backends and discard worker don't register the same request
+ * twice.
+ */
+ req_size += FindUndoStartLocationAndSize(&start_urec_ptr, full_xid);
+
LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
/*
HASH_ENTER_NULL, &found);
/*
- * It can only fail, if the value of pending_undo_queue_size or
+ * Except the first pass over the undo logs by discard worker, the hash
+ * table can never be full.
+ */
+ Assert(!ProcGlobal->rollbackHTInitialized || (rh != NULL));
+
+ /*
+ * It can only fail, if the value of pending_undo_queue_size or
* max_connections guc is reduced after restart of the server.
*/
if (rh == NULL)
}
/*
* The request can't be pushed into the undo worker queue. The
- * backends will try executing by itself.
+ * backends will try executing by itself. The discard worker will
+ * keep the entry into the rollback hash table with
+ * UNDO_REQUEST_INVALID status. Such requests will be added in the
+ * undo worker queues in the subsequent passes over undo logs by
+ * discard worker.
*/
- else
+ else if (!IsDiscardProcess())
rh->status = UNDO_REQUEST_INPROGRESS;
+ else
+ rh->status = UNDO_REQUEST_INVALID;
}
else if (!UndoRequestIsValid(rh) && can_push)
{
LWLockRelease(RollbackRequestLock);
+ /*
+ * If we are able to successfully push the request, wakeup the undo worker
+ * so that it can be processed in a timely fashion.
+ */
+ if (pushed)
+ WakeupUndoWorker(dbid);
+
return pushed;
}
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undoworker.c
+ * undo launcher and undo worker process.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/undoworker.c
+ *
+ * Undo launcher is responsible for launching the workers iff there is some
+ * work available in one of work queues and there are more workers available.
+ * To know more about work queues, see undorequest.c. The worker is launched
+ * to handle requests for a particular database.
+ *
+ * Each undo worker then start reading from one of the queue the requests for
+ * that particular database. A worker would peek into each queue for the
+ * requests from a particular database, if it needs to switch a database in
+ * less than undo_worker_quantum ms after starting. Also, if there is no
+ * work, it lingers for UNDO_WORKER_LINGER_MS. This avoids restarting
+ * the workers too frequently.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+
+#include "access/genam.h"
+#include "access/table.h"
+#include "access/xact.h"
+#include "access/undorequest.h"
+#include "access/undoworker.h"
+
+#include "libpq/pqsignal.h"
+
+#include "postmaster/bgworker.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/postmaster.h"
+
+#include "replication/slot.h"
+#include "replication/worker_internal.h"
+
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+
+#include "tcop/tcopprot.h"
+
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+
+/*
+ * GUC parameters
+ */
+int max_undo_workers = 4;
+
+/*
+ * If a worker would need to switch databases in less than undo_worker_quantum
+ * (10s as default) after starting, it peeks a few entries deep into each
+ * queue to see whether there's work for that database.
+ */
+int undo_worker_quantum_ms = 10000;
+
+/* max sleep time between cycles (100 milliseconds) */
+#define DEFAULT_NAPTIME_PER_CYCLE 100L
+
+/*
+ * Time for which undo worker can linger if there is no work, in
+ * milliseconds. This has to be more than UNDO_FAILURE_RETRY_DELAY_MS,
+ * otherwise, worker can exit before retrying the failed requests.
+ */
+#define UNDO_WORKER_LINGER_MS 20000
+
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
+
+static TimestampTz last_xact_processed_at;
+
+typedef struct UndoApplyWorker
+{
+ /* Indicates if this slot is used or free. */
+ bool in_use;
+
+ /* Increased every time the slot is taken by new worker. */
+ uint16 generation;
+
+ /* Pointer to proc array. NULL if not running. */
+ PGPROC *proc;
+
+ /* Database id this worker is connected to. */
+ Oid dbid;
+
+ /* this tells whether worker is lingering. */
+ bool lingering;
+
+ /*
+ * This tells the undo worker from which undo worker queue it should start
+ * processing.
+ */
+ UndoWorkerQueueType undo_worker_queue;
+} UndoApplyWorker;
+
+UndoApplyWorker *MyUndoWorker = NULL;
+
+typedef struct UndoApplyCtxStruct
+{
+ /* Supervisor process. */
+ pid_t launcher_pid;
+
+ /* latch to wake up undo launcher. */
+ Latch *undo_launcher_latch;
+
+ /* Background workers. */
+ UndoApplyWorker workers[FLEXIBLE_ARRAY_MEMBER];
+} UndoApplyCtxStruct;
+
+UndoApplyCtxStruct *UndoApplyCtx;
+
+static void UndoWorkerOnExit(int code, Datum arg);
+static void UndoWorkerCleanup(UndoApplyWorker *worker);
+static void UndoWorkerIsLingering(bool sleep);
+static void UndoWorkerGetSlotInfo(int slot, UndoRequestInfo *urinfo);
+static void UndoworkerSigtermHandler(SIGNAL_ARGS);
+
+/*
+ * Cleanup function for undo worker launcher.
+ *
+ * Called on undo worker launcher exit.
+ */
+static void
+UndoLauncherOnExit(int code, Datum arg)
+{
+ UndoApplyCtx->launcher_pid = 0;
+ UndoApplyCtx->undo_launcher_latch = NULL;
+}
+
+/* SIGTERM: set flag to exit at next convenient time */
+static void
+UndoworkerSigtermHandler(SIGNAL_ARGS)
+{
+ got_SIGTERM = true;
+
+ /* Waken anything waiting on the process latch */
+ SetLatch(MyLatch);
+}
+
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+UndoLauncherSighup(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGHUP = true;
+
+ /* Waken anything waiting on the process latch */
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * Wait for a background worker to start up and attach to the shmem context.
+ *
+ * This is only needed for cleaning up the shared memory in case the worker
+ * fails to attach.
+ */
+static void
+WaitForUndoWorkerAttach(UndoApplyWorker * worker,
+ uint16 generation,
+ BackgroundWorkerHandle *handle)
+{
+ BgwHandleStatus status;
+ int rc;
+
+ for (;;)
+ {
+ pid_t pid;
+
+ CHECK_FOR_INTERRUPTS();
+
+ LWLockAcquire(UndoWorkerLock, LW_SHARED);
+
+ /* Worker either died or has started; no need to do anything. */
+ if (!worker->in_use || worker->proc)
+ {
+ LWLockRelease(UndoWorkerLock);
+ return;
+ }
+
+ LWLockRelease(UndoWorkerLock);
+
+ /* Check if worker has died before attaching, and clean up after it. */
+ status = GetBackgroundWorkerPid(handle, &pid);
+
+ if (status == BGWH_STOPPED)
+ {
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+ /* Ensure that this was indeed the worker we waited for. */
+ if (generation == worker->generation)
+ UndoWorkerCleanup(worker);
+ LWLockRelease(UndoWorkerLock);
+ return;
+ }
+
+ /*
+ * We need timeout because we generally don't get notified via latch
+ * about the worker attach. But we don't expect to have to wait long.
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ 10L, WAIT_EVENT_BGWORKER_STARTUP);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+ }
+
+ return;
+}
+
+/*
+ * Attach to a slot.
+ */
+static void
+UndoWorkerAttach(int slot)
+{
+ /* Block concurrent access. */
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+ MyUndoWorker = &UndoApplyCtx->workers[slot];
+
+ if (!MyUndoWorker->in_use)
+ {
+ LWLockRelease(UndoWorkerLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("undo worker slot %d is empty, cannot attach",
+ slot)));
+ }
+
+ if (MyUndoWorker->proc)
+ {
+ LWLockRelease(UndoWorkerLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("undo worker slot %d is already used by "
+ "another worker, cannot attach", slot)));
+ }
+
+ MyUndoWorker->proc = MyProc;
+ before_shmem_exit(UndoWorkerOnExit, (Datum) 0);
+
+ LWLockRelease(UndoWorkerLock);
+}
+
+/*
+ * Returns whether an undo worker is available.
+ */
+static int
+IsUndoWorkerAvailable(void)
+{
+ int i;
+ int alive_workers = 0;
+
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+ /* Search for attached workers. */
+ for (i = 0; i < max_undo_workers; i++)
+ {
+ UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+ if (w->in_use)
+ alive_workers++;
+ }
+
+ LWLockRelease(UndoWorkerLock);
+
+ return (alive_workers < max_undo_workers);
+}
+
+/* Sets the worker's lingering status. */
+static void
+UndoWorkerIsLingering(bool sleep)
+{
+ /* Block concurrent access. */
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+ MyUndoWorker->lingering = sleep;
+
+ LWLockRelease(UndoWorkerLock);
+}
+
+/* Get the dbid and undo worker queue set by the undo launcher. */
+static void
+UndoWorkerGetSlotInfo(int slot, UndoRequestInfo *urinfo)
+{
+ /* Block concurrent access. */
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+ MyUndoWorker = &UndoApplyCtx->workers[slot];
+
+ if (!MyUndoWorker->in_use)
+ {
+ LWLockRelease(UndoWorkerLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("undo worker slot %d is empty",
+ slot)));
+ }
+
+ urinfo->dbid = MyUndoWorker->dbid;
+ urinfo->undo_worker_queue = MyUndoWorker->undo_worker_queue;
+
+ LWLockRelease(UndoWorkerLock);
+}
+
+/*
+ * Start new undo apply background worker, if possible otherwise return false.
+ */
+static bool
+UndoWorkerLaunch(UndoRequestInfo urinfo)
+{
+ BackgroundWorker bgw;
+ BackgroundWorkerHandle *bgw_handle;
+ uint16 generation;
+ int i;
+ int slot = 0;
+ UndoApplyWorker *worker = NULL;
+
+ /*
+ * We need to do the modification of the shared memory under lock so that
+ * we have consistent view.
+ */
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+ /* Find unused worker slot. */
+ for (i = 0; i < max_undo_workers; i++)
+ {
+ UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+ if (!w->in_use)
+ {
+ worker = w;
+ slot = i;
+ break;
+ }
+ }
+
+ /* We must not try to start a worker if there are no available workers. */
+ Assert(worker != NULL);
+
+ /* Prepare the worker slot. */
+ worker->in_use = true;
+ worker->proc = NULL;
+ worker->dbid = urinfo.dbid;
+ worker->lingering = false;
+ worker->undo_worker_queue = urinfo.undo_worker_queue;
+ worker->generation++;
+
+ generation = worker->generation;
+ LWLockRelease(UndoWorkerLock);
+
+ /* Register the new dynamic worker. */
+ memset(&bgw, 0, sizeof(bgw));
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoWorkerMain");
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "undo apply worker");
+ snprintf(bgw.bgw_name, BGW_MAXLEN, "undo apply worker");
+
+ bgw.bgw_restart_time = BGW_NEVER_RESTART;
+ bgw.bgw_notify_pid = MyProcPid;
+ bgw.bgw_main_arg = Int32GetDatum(slot);
+
+ if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
+ {
+ /* Failed to start worker, so clean up the worker slot. */
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+ UndoWorkerCleanup(worker);
+ LWLockRelease(UndoWorkerLock);
+
+ return false;
+ }
+
+ /* Now wait until it attaches. */
+ WaitForUndoWorkerAttach(worker, generation, bgw_handle);
+
+ return true;
+}
+
+/*
+ * Detach the worker (cleans up the worker info).
+ */
+static void
+UndoWorkerDetach(void)
+{
+ /* Block concurrent access. */
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+ UndoWorkerCleanup(MyUndoWorker);
+
+ LWLockRelease(UndoWorkerLock);
+}
+
+/*
+ * Clean up worker info.
+ */
+static void
+UndoWorkerCleanup(UndoApplyWorker * worker)
+{
+ Assert(LWLockHeldByMeInMode(UndoWorkerLock, LW_EXCLUSIVE));
+
+ worker->in_use = false;
+ worker->proc = NULL;
+ worker->dbid = InvalidOid;
+ worker->lingering = false;
+ worker->undo_worker_queue = InvalidUndoWorkerQueue;
+}
+
+/*
+ * Cleanup function.
+ *
+ * Called on undo worker exit.
+ */
+static void
+UndoWorkerOnExit(int code, Datum arg)
+{
+ UndoWorkerDetach();
+}
+
+/*
+ * Perform rollback request. We need to connect to the database for first
+ * request and that is required because we access system tables while
+ * performing undo actions.
+ */
+static void
+UndoWorkerPerformRequest(UndoRequestInfo * urinfo)
+{
+ bool error = false;
+
+ /* must be connected to the database. */
+ Assert(MyDatabaseId != InvalidOid);
+
+ StartTransactionCommand();
+ PG_TRY();
+ {
+ execute_undo_actions(urinfo->full_xid, urinfo->end_urec_ptr,
+ urinfo->start_urec_ptr, true);
+ }
+ PG_CATCH();
+ {
+ error = true;
+
+ /*
+ * Register the unprocessed request in an error queue, so that it can
+ * be processed in a timely fashion. If we fail to add the request in
+ * an error queue, then mark the entry status invalid. This request
+ * will be later added back to the queue by the discard worker.
+ */
+ if (!InsertRequestIntoErrorUndoQueue(urinfo))
+ RollbackHTMarkEntryInvalid(urinfo->full_xid,
+ urinfo->start_urec_ptr);
+
+ /* Prevent interrupts while cleaning up. */
+ HOLD_INTERRUPTS();
+
+ /* Send the error only to server log. */
+ err_out_to_client(false);
+ EmitErrorReport();
+
+ /*
+ * Abort the transaction and continue processing pending undo requests.
+ */
+ AbortOutOfAnyTransaction();
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+ }
+ PG_END_TRY();
+
+ if (!error)
+ CommitTransactionCommand();
+}
+
+/*
+ * UndoLauncherShmemSize
+ * Compute space needed for undo launcher shared memory
+ */
+Size
+UndoLauncherShmemSize(void)
+{
+ Size size;
+
+ /*
+ * Need the fixed struct and the array of LogicalRepWorker.
+ */
+ size = sizeof(UndoApplyCtxStruct);
+ size = MAXALIGN(size);
+ size = add_size(size, mul_size(max_undo_workers,
+ sizeof(UndoApplyWorker)));
+ return size;
+}
+
+/*
+ * UndoLauncherShmemInit
+ * Allocate and initialize undo worker launcher shared memory
+ */
+void
+UndoLauncherShmemInit(void)
+{
+ bool found;
+
+ UndoApplyCtx = (UndoApplyCtxStruct *)
+ ShmemInitStruct("Undo Worker Launcher Data",
+ UndoLauncherShmemSize(),
+ &found);
+
+ if (!found)
+ memset(UndoApplyCtx, 0, UndoLauncherShmemSize());
+}
+
+/*
+ * UndoLauncherRegister
+ * Register a background worker running the undo worker launcher.
+ */
+void
+UndoLauncherRegister(void)
+{
+ BackgroundWorker bgw;
+
+ if (max_undo_workers == 0)
+ return;
+
+ memset(&bgw, 0, sizeof(bgw));
+ bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoLauncherMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "undo worker launcher");
+ snprintf(bgw.bgw_type, BGW_MAXLEN,
+ "undo worker launcher");
+ bgw.bgw_restart_time = 5;
+ bgw.bgw_notify_pid = 0;
+ bgw.bgw_main_arg = (Datum)0;
+
+ RegisterBackgroundWorker(&bgw);
+}
+
+/*
+ * Main loop for the undo worker launcher process.
+ */
+void
+UndoLauncherMain(Datum main_arg)
+{
+ UndoRequestInfo urinfo;
+
+ ereport(DEBUG1,
+ (errmsg("undo launcher started")));
+
+ before_shmem_exit(UndoLauncherOnExit, (Datum) 0);
+
+ Assert(UndoApplyCtx->launcher_pid == 0);
+ UndoApplyCtx->launcher_pid = MyProcPid;
+
+ /* Establish signal handlers. */
+ pqsignal(SIGHUP, UndoLauncherSighup);
+ pqsignal(SIGTERM, UndoworkerSigtermHandler);
+ BackgroundWorkerUnblockSignals();
+
+ /* Establish connection to nailed catalogs. */
+ BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+
+ /*
+ * Advertise our latch that undo request enqueuer can use to wake us up
+ * while we're sleeping.
+ */
+ UndoApplyCtx->undo_launcher_latch = &MyProc->procLatch;
+
+ /* Enter main loop */
+ while (!got_SIGTERM)
+ {
+ int rc;
+
+ CHECK_FOR_INTERRUPTS();
+
+ ResetUndoRequestInfo(&urinfo);
+
+ if (UndoGetWork(false, false, &urinfo, NULL) &&
+ IsUndoWorkerAvailable())
+ UndoWorkerLaunch(urinfo);
+
+ /* Wait for more work. */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ DEFAULT_NAPTIME_PER_CYCLE,
+ WAIT_EVENT_UNDO_LAUNCHER_MAIN);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+ }
+
+ /* Normal exit from undo launcher main */
+ ereport(LOG,
+ (errmsg("undo launcher shutting down")));
+ proc_exit(0);
+}
+
+/*
+ * UndoWorkerMain -- Main loop for the undo apply worker.
+ */
+void
+UndoWorkerMain(Datum main_arg)
+{
+ UndoRequestInfo urinfo;
+ int worker_slot = DatumGetInt32(main_arg);
+ bool in_other_db;
+ bool found_work;
+ TimestampTz started_at;
+
+ /* Setup signal handling */
+ pqsignal(SIGTERM, UndoworkerSigtermHandler);
+ BackgroundWorkerUnblockSignals();
+
+ ResetUndoRequestInfo(&urinfo);
+ started_at = GetCurrentTimestamp();
+
+ /*
+ * Get the dbid where the wroker should connect to and get the worker
+ * request queue from which the worker should start looking for an undo
+ * request.
+ */
+ UndoWorkerGetSlotInfo(worker_slot, &urinfo);
+
+ /* Connect to the requested database. */
+ BackgroundWorkerInitializeConnectionByOid(urinfo.dbid, 0, 0);
+
+ /*
+ * Set the undo worker request queue from which the undo worker start
+ * looking for a work.
+ */
+ SetUndoWorkerQueueStart(urinfo.undo_worker_queue);
+
+ /*
+ * Before attaching the worker, fetch and remove the undo request for
+ * which the undo launcher has launched this worker. This restricts the
+ * undo launcher from launching multiple workers for the same request.
+ * But, it's possible that the undo request has already been processed by
+ * other in-progress undo worker. In that case, we enter the undo worker
+ * main loop and fetch the next request.
+ */
+ found_work = UndoGetWork(false, true, &urinfo, &in_other_db);
+
+ /* Attach to slot */
+ UndoWorkerAttach(worker_slot);
+
+ if (found_work && !in_other_db)
+ {
+ /* We must have got the pending undo request. */
+ Assert(FullTransactionIdIsValid(urinfo.full_xid));
+ UndoWorkerPerformRequest(&urinfo);
+ last_xact_processed_at = GetCurrentTimestamp();
+ }
+
+ while (!got_SIGTERM)
+ {
+ int rc;
+ bool allow_peek;
+
+ CHECK_FOR_INTERRUPTS();
+
+ allow_peek = !TimestampDifferenceExceeds(started_at,
+ GetCurrentTimestamp(),
+ undo_worker_quantum_ms);
+
+ found_work = UndoGetWork(allow_peek, true, &urinfo, &in_other_db);
+
+ if (found_work && in_other_db)
+ {
+ proc_exit(0);
+ }
+ else if (found_work)
+ {
+ /* We must have got the pending undo request. */
+ Assert(FullTransactionIdIsValid(urinfo.full_xid));
+ UndoWorkerPerformRequest(&urinfo);
+ last_xact_processed_at = GetCurrentTimestamp();
+ }
+ else
+ {
+ TimestampTz timeout = 0;
+
+ timeout = TimestampTzPlusMilliseconds(last_xact_processed_at,
+ UNDO_WORKER_LINGER_MS);
+
+ /*
+ * We don't need to linger if we have already spent
+ * UNDO_WORKER_LINGER_MS since last transaction has processed.
+ */
+ if (timeout <= GetCurrentTimestamp())
+ {
+ proc_exit(0);
+ }
+
+ /*
+ * Update the shared state to reflect that this worker is
+ * lingering so that if there is new work request, requester can
+ * wake us up.
+ */
+ UndoWorkerIsLingering(true);
+
+ /* Wait for more work. */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ DEFAULT_NAPTIME_PER_CYCLE,
+ WAIT_EVENT_UNDO_WORKER_MAIN);
+
+ /* reset the shared state. */
+ UndoWorkerIsLingering(false);
+
+ /* emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+ }
+ }
+
+ /* Normal exit from undo worker main */
+ proc_exit(0);
+}
+
+/*
+ * Wake up undo worker so that undo requests can be processed in a timely
+ * fashion.
+ *
+ * We first try to wake up the lingering worker in the given database. If we
+ * found even one such worker, we are done.
+ *
+ * Next, we try to stop some worker which is lingering, but doesn't belong to
+ * the given database. We know that any worker which is lingering doesn't have
+ * any pending work, so it is fine to stop it when we know that there is going
+ * to be some work in the other database.
+ *
+ * Finally, we wakeup launcher so that it can either restart the worker we have
+ * stopped or find some other worker who can take up this request.
+ */
+void
+WakeupUndoWorker(Oid dbid)
+{
+ int i;
+
+ LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE);
+
+ /* wake up lingering worker in the given database. */
+ for (i = 0; i < max_undo_workers; i++)
+ {
+ UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+ if (w->in_use && w->lingering && w->dbid == dbid)
+ {
+ SetLatch(&w->proc->procLatch);
+
+ LWLockRelease(UndoWorkerLock);
+ return;
+ }
+ }
+
+ /*
+ * Stop one of the lingering worker which is not processing the requests
+ * in the given database.
+ */
+ for (i = 0; i < max_undo_workers; i++)
+ {
+ UndoApplyWorker *w = &UndoApplyCtx->workers[i];
+
+ if (w->in_use && w->lingering && w->dbid != dbid)
+ kill(w->proc->pid, SIGTERM);
+ }
+
+ if (UndoApplyCtx->undo_launcher_latch)
+ SetLatch(UndoApplyCtx->undo_launcher_latch);
+
+ LWLockRelease(UndoWorkerLock);
+
+ return;
+}
#include "access/sysattr.h"
#include "access/tableam.h"
#include "access/tupconvert.h"
+#include "access/undodiscard.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
case ONCOMMIT_DROP:
oids_to_drop = lappend_oid(oids_to_drop, oc->relid);
break;
+ case ONCOMMIT_TEMP_DISCARD:
+ /* Discard temp table undo logs for temp tables. */
+ TempUndoDiscard(oc->relid);
+ break;
}
}
#include <unistd.h>
#include "libpq/pqsignal.h"
+#include "access/discardworker.h"
#include "access/parallel.h"
+#include "access/undoworker.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "port/atomics.h"
},
{
"ApplyWorkerMain", ApplyWorkerMain
+ },
+ {
+ "UndoLauncherMain", UndoLauncherMain
+ },
+ {
+ "UndoWorkerMain", UndoWorkerMain
+ },
+ {
+ "DiscardWorkerMain", DiscardWorkerMain
}
};
case WAIT_EVENT_WAL_WRITER_MAIN:
event_name = "WalWriterMain";
break;
+ case WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN:
+ event_name = "UndoDiscardWorkerMain";
+ break;
+ case WAIT_EVENT_UNDO_LAUNCHER_MAIN:
+ event_name = "UndoLauncherMain";
+ break;
+ case WAIT_EVENT_UNDO_WORKER_MAIN:
+ event_name = "UndoWorkerMain";
+ break;
/* no default case, so that compiler will warn */
}
#include <pthread.h>
#endif
+#include "access/discardworker.h"
#include "access/transam.h"
+#include "access/undoworker.h"
#include "access/xlog.h"
#include "bootstrap/bootstrap.h"
#include "catalog/pg_control.h"
char *bonjour_name;
bool restart_after_crash = true;
+bool enable_undo_launcher = true;
+
/* PIDs of special child processes; 0 when not running */
static pid_t StartupPID = 0,
BgWriterPID = 0,
*/
ApplyLauncherRegister();
+ /* Register the Undo worker launcher. */
+ if (enable_undo_launcher)
+ UndoLauncherRegister();
+
+ /* Register the Undo Discard worker. */
+ DiscardWorkerRegister();
+
/*
* process any libraries that should be preloaded at postmaster start
*/
#include "access/subtrans.h"
#include "access/twophase.h"
#include "access/undolog.h"
+#include "access/undorequest.h"
+#include "access/undoworker.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, PendingUndoShmemSize());
+ size = add_size(size, UndoLauncherShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
SUBTRANSShmemInit();
MultiXactShmemInit();
InitBufferPool();
+ PendingUndoShmemInit();
/*
* Set up lock manager
WalSndShmemInit();
WalRcvShmemInit();
ApplyLauncherShmemInit();
+ UndoLauncherShmemInit();
/*
* Set up other modules that need some shared memory space
CLogTruncationLock 44
UndoLogLock 45
RollbackRequestLock 46
+UndoWorkerLock 47
ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
SpinLockInit(ProcStructLock);
+ pg_atomic_init_u64(&ProcGlobal->oldestFullXidHavingUnappliedUndo, 0);
ProcGlobal->xactsHavingPendingUndo = 0;
+ ProcGlobal->rollbackHTInitialized = false;
}
/*
#include "access/transam.h"
#include "access/twophase.h"
#include "access/undorequest.h"
+#include "access/undoworker.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/namespace.h"
NULL, NULL, NULL
},
+ {
+ {"enable_undo_launcher", PGC_POSTMASTER, DEVELOPER_OPTIONS,
+ gettext_noop("Decides whether to launch an undo worker."),
+ NULL,
+ GUC_NOT_IN_SAMPLE
+ },
+ &enable_undo_launcher,
+ true,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
NULL, NULL, NULL
},
+ {
+ {"max_undo_workers", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+ gettext_noop("Maximum number of undo worker processes."),
+ NULL,
+ },
+ &max_undo_workers,
+ 4, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
+
{
{"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM,
gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."),
# requests are pushed to undo workers
#pending_undo_queue_size = 1024 # size of queue used to register undo
# requests
+#max_undo_workers = 4 # maximum undo workers
#------------------------------------------------------------------------------
# CLIENT CONNECTION DEFAULTS
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * discardworker.h
+ * Exports from access/undo/discardworker.c.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/discardworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _DISCARDWORKER_H
+#define _DISCARDWORKER_H
+
+extern void DiscardWorkerRegister(void);
+extern void DiscardWorkerMain(Datum main_arg) pg_attribute_noreturn();
+extern bool IsDiscardProcess(void);
+
+#endif /* _DISCARDWORKER_H */
return result;
}
+static inline FullTransactionId
+FullTransactionIdFromU64(uint64 fxid)
+{
+ FullTransactionId result;
+
+ result.value = fxid;
+
+ return result;
+}
+
/* advance a transaction ID variable, handling wraparound correctly */
#define TransactionIdAdvance(dest) \
do { \
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.h
+ * undo discard definitions
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undodiscard.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDODISCARD_H
+#define UNDODISCARD_H
+
+#include "access/undolog.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+#include "storage/lwlock.h"
+
+extern void UndoDiscard(TransactionId xmin, bool *hibernate);
+extern void UndoLogDiscardAll(void);
+extern void TempUndoDiscard(UndoLogNumber);
+extern void UndoLogProcess(void);
+
+#endif /* UNDODISCARD_H */
/*
* The in-memory control object for an undo log. We have a fixed-sized array
* of these.
+ *
+ * The following two locks are used to manage the discard process
+ * discard_lock - should be acquired for undo read to protect it from discard and
+ * discard worker will acquire this lock to update oldest_data.
+ *
+ * discard_update_lock - This lock will be acquired in exclusive mode by discard
+ * worker during the discard process and in shared mode to update the
+ * next_urp in previous transaction's start header.
+ *
+ * Two different locks are used so that the readers are not blocked during the
+ * actual discard but only during the update of shared memory variable which
+ * influences the visibility decision but the updaters need to be blocked for
+ * the entire discard process to ensure proper ordering of WAL records.
*/
typedef struct UndoLogSlot
{
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * undoworker.h
+ * Exports from undoworker.c.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _UNDOWORKER_H
+#define _UNDOWORKER_H
+
+/* GUC options */
+extern int max_undo_workers;
+
+/* undo worker sleep time between rounds */
+extern int UndoWorkerDelay;
+
+extern Size UndoLauncherShmemSize(void);
+extern void UndoLauncherShmemInit(void);
+extern void UndoLauncherRegister(void);
+extern void UndoLauncherMain(Datum main_arg);
+extern void UndoWorkerMain(Datum main_arg) pg_attribute_noreturn();
+extern void WakeupUndoWorker(Oid dbid);
+
+#endif /* _UNDOWORKER_H */
* set to InvalidTransactionId.
*/
TransactionId oldestActiveXid;
+
+ /*
+ * Oldest full transaction id which is having unapplied undo. We include
+ * this value in the checkpoint record so that whenever server re-starts
+ * we can use this to initialize the server-wide value for same variable.
+ * Any Xid prior to this should be all-visible, so if this is not set,
+ * then the scans might try to fetch undo which can suck the performance.
+ */
+ FullTransactionId oldestFullXidHavingUnappliedUndo;
} CheckPoint;
/* XLOG info values for XLOG rmgr */
ONCOMMIT_NOOP, /* No ON COMMIT clause (do nothing) */
ONCOMMIT_PRESERVE_ROWS, /* ON COMMIT PRESERVE ROWS (do nothing) */
ONCOMMIT_DELETE_ROWS, /* ON COMMIT DELETE ROWS */
- ONCOMMIT_DROP /* ON COMMIT DROP */
+ ONCOMMIT_DROP, /* ON COMMIT DROP */
+ ONCOMMIT_TEMP_DISCARD /* ON COMMIT discard temp table undo logs */
} OnCommitAction;
/*
WAIT_EVENT_SYSLOGGER_MAIN,
WAIT_EVENT_WAL_RECEIVER_MAIN,
WAIT_EVENT_WAL_SENDER_MAIN,
- WAIT_EVENT_WAL_WRITER_MAIN
+ WAIT_EVENT_WAL_WRITER_MAIN,
+ WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN,
+ WAIT_EVENT_UNDO_LAUNCHER_MAIN,
+ WAIT_EVENT_UNDO_WORKER_MAIN
} WaitEventActivity;
/* ----------
extern bool enable_bonjour;
extern char *bonjour_name;
extern bool restart_after_crash;
+extern bool enable_undo_launcher;
#ifdef WIN32
extern HANDLE PostmasterHandle;
LWTRANCHE_UNDOLOG,
LWTRANCHE_UNDODISCARD,
LWTRANCHE_REWIND,
+ LWTRANCHE_DISCARD_UPDATE,
LWTRANCHE_FIRST_USER_DEFINED,
} BuiltinTrancheIds;
int startupProcPid;
/* Buffer id of the buffer that Startup process waits for pin on, or -1 */
int startupBufferPinWaitBufId;
+ /* Oldest transaction id which is having undo. */
+ pg_atomic_uint64 oldestFullXidHavingUnappliedUndo;
/* Number of aborted transactions with pending undo actions. */
int xactsHavingPendingUndo;
+ /* Whether the rollback hash table is initialized after the startup? */
+ bool rollbackHTInitialized;
} PROC_HDR;
extern PGDLLIMPORT PROC_HDR *ProcGlobal;
* to avoid forcing to include proc.h when including procarray.h. So if you modify
* PROC_XXX flags, you need to modify these flags.
*/
+#define PROCARRAY_AUTOVACUUM_FLAG 0x01 /* currently running
+ * autovacuum */
#define PROCARRAY_VACUUM_FLAG 0x02 /* currently running lazy
* vacuum */
#define PROCARRAY_ANALYZE_FLAG 0x04 /* currently running
* PGXACT->vacuumFlags. Other flags are used for different purposes and
* have no corresponding PROC flag equivalent.
*/
-#define PROCARRAY_PROC_FLAGS_MASK (PROCARRAY_VACUUM_FLAG | \
+#define PROCARRAY_PROC_FLAGS_MASK (PROCARRAY_AUTOVACUUM_FLAG | \
+ PROCARRAY_VACUUM_FLAG | \
PROCARRAY_ANALYZE_FLAG | \
PROCARRAY_LOGICAL_DECODING_FLAG)
#define PROCARRAY_FLAGS_DEFAULT PROCARRAY_LOGICAL_DECODING_FLAG
/* Ignore vacuum backends */
#define PROCARRAY_FLAGS_VACUUM PROCARRAY_FLAGS_DEFAULT | PROCARRAY_VACUUM_FLAG
+/* Ignore autovacuum worker and backends running vacuum */
+#define PROCARRAY_FLAGS_AUTOVACUUM PROCARRAY_FLAGS_DEFAULT | PROCARRAY_AUTOVACUUM_FLAG
/* Ignore analyze backends */
#define PROCARRAY_FLAGS_ANALYZE PROCARRAY_FLAGS_DEFAULT | PROCARRAY_ANALYZE_FLAG
/* Ignore both vacuum and analyze backends */
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(17 rows)
+ enable_undo_launcher | on
+(18 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail