static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
ReorderBufferIterTXNState *state);
-static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
/*
* ---------------------------------------
pfree(change->data.msg.message);
change->data.msg.message = NULL;
break;
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ if (change->data.inval.invalidations)
+ pfree(change->data.inval.invalidations);
+ change->data.inval.invalidations = NULL;
+ break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
if (change->data.snapshot)
{
ReorderBufferApplyMessage(rb, txn, change, streaming);
break;
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ /* Execute the invalidation messages locally */
+ ReorderBufferExecuteInvalidations(
+ change->data.inval.ninvalidations,
+ change->data.inval.invalidations);
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
/* get rid of the old */
TeardownHistoricSnapshot(false);
TeardownHistoricSnapshot(false);
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
-
- /*
- * Every time the CommandId is incremented, we could
- * see new catalog contents, so execute all
- * invalidations.
- */
- ReorderBufferExecuteInvalidations(rb, txn);
}
break;
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(rb, txn);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
AbortCurrentTransaction();
/* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(rb, txn);
+ ReorderBufferExecuteInvalidations(txn->ninvalidations,
+ txn->invalidations);
if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
* Setup the invalidation of the toplevel transaction.
*
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
- * accumulates all the invalidation messages in the toplevel transaction.
- * This is required because in some cases where we skip processing the
- * transaction (see ReorderBufferForget), we need to execute all the
- * invalidations together.
+ * accumulates all the invalidation messages in the toplevel transaction as
+ * well as in the form of change in reorder buffer. We require to record it in
+ * form of the change so that we can execute only the required invalidations
+ * instead of executing all the invalidations on each CommandId increment. We
+ * also need to accumulate these in the toplevel transaction because in some
+ * cases we skip processing the transaction (see ReorderBufferForget), we need
+ * to execute all the invalidations together.
*/
void
ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
SharedInvalidationMessage *msgs)
{
ReorderBufferTXN *txn;
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
/*
- * We collect all the invalidations under the top transaction so that we
- * can execute them all together.
+ * Collect all the invalidations under the top transaction so that we can
+ * execute them all together. See comment atop this function
*/
if (txn->toptxn)
txn = txn->toptxn;
{
txn->ninvalidations = nmsgs;
txn->invalidations = (SharedInvalidationMessage *)
- MemoryContextAlloc(rb->context,
- sizeof(SharedInvalidationMessage) * nmsgs);
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
memcpy(txn->invalidations, msgs,
sizeof(SharedInvalidationMessage) * nmsgs);
}
nmsgs * sizeof(SharedInvalidationMessage));
txn->ninvalidations += nmsgs;
}
+
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+ change->data.inval.ninvalidations = nmsgs;
+ change->data.inval.invalidations = (SharedInvalidationMessage *)
+ palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(change->data.inval.invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change, false);
+
+ MemoryContextSwitchTo(oldcontext);
}
/*
* in the changestream but we don't know which those are.
*/
static void
-ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
{
int i;
- for (i = 0; i < txn->ninvalidations; i++)
- LocalExecuteInvalidationMessage(&txn->invalidations[i]);
+ for (i = 0; i < nmsgs; i++)
+ LocalExecuteInvalidationMessage(&msgs[i]);
}
/*
change->data.msg.message_size);
data += change->data.msg.message_size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ {
+ char *data;
+ Size inval_size = sizeof(SharedInvalidationMessage) *
+ change->data.inval.ninvalidations;
+
+ sz += inval_size;
+
+ ReorderBufferSerializeReserve(rb, sz);
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+ /* might have been reallocated above */
+ ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+ memcpy(data, change->data.inval.invalidations, inval_size);
+ data += inval_size;
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
break;
}
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ {
+ sz += sizeof(SharedInvalidationMessage) *
+ change->data.inval.ninvalidations;
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot snap;
change->data.msg.message_size);
data += change->data.msg.message_size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_INVALIDATION:
+ {
+ Size inval_size = sizeof(SharedInvalidationMessage) *
+ change->data.inval.ninvalidations;
+
+ change->data.inval.invalidations =
+ MemoryContextAlloc(rb->context, inval_size);
+
+ /* read the message */
+ memcpy(change->data.inval.invalidations, data, inval_size);
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: