mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 01:21:03 +01:00
SERVER-42251 Timestamp multikey writes with the prepare timestamp during replication recovery
Now that we execute multikey catalog updates in a side transaction, we need to give them some suitable timestamp. In normal replication, we can grab the latest value of the LogicalClock. In startup recovery, though, we may replay a prepared transaction that does a multikey write, but the LogicalClock may not have been initialized yet. Thus, we use the prepare timestamp of the transaction for the multikey write, since that timestamp is guaranteed to be less than or equal to the commit timestamp of the transaction.
This commit is contained in:
parent
d7154f5b3c
commit
7d687264de
43
jstests/replsets/recover_prepared_txn_with_multikey_write.js
Normal file
43
jstests/replsets/recover_prepared_txn_with_multikey_write.js
Normal file
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Test that replication recovery can reconstruct a prepared transaction that includes a write that
|
||||
* sets the multikey flag.
|
||||
*
|
||||
* @tags: [uses_transactions, uses_prepare_transaction]
|
||||
*/
|
||||
(function() {
|
||||
const rst = new ReplSetTest({
|
||||
nodes: [
|
||||
{},
|
||||
{
|
||||
// Disallow elections on secondary.
|
||||
rsConfig: {
|
||||
priority: 0,
|
||||
votes: 0,
|
||||
}
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
rst.startSet();
|
||||
rst.initiate();
|
||||
|
||||
const primary = rst.getPrimary();
|
||||
|
||||
const session = primary.getDB("test").getMongo().startSession();
|
||||
const sessionDB = session.getDatabase("test");
|
||||
const sessionColl = sessionDB.getCollection("coll");
|
||||
|
||||
// Create an index that will later be made multikey.
|
||||
sessionColl.createIndex({x: 1});
|
||||
session.startTransaction();
|
||||
|
||||
// Make the index multikey.
|
||||
sessionColl.insert({x: [1, 2, 3]});
|
||||
assert.commandWorked(sessionDB.adminCommand({prepareTransaction: 1}));
|
||||
|
||||
// Do an unclean shutdown so we don't force a checkpoint, and then restart.
|
||||
rst.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL});
|
||||
rst.restart(0);
|
||||
|
||||
rst.stopSet();
|
||||
}());
|
@ -296,7 +296,18 @@ void IndexCatalogEntryImpl::setMultikey(OperationContext* opCtx,
|
||||
TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
|
||||
writeConflictRetry(opCtx, "set index multikey", _ns.ns(), [&] {
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
auto writeTs = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
|
||||
|
||||
// If we have a prepare optime for recovery, then we always use that. During recovery of
|
||||
// prepared transactions, the logical clock may not yet be initialized, so we use the
|
||||
// prepare timestamp of the transaction for this write. This is safe since the prepare
|
||||
// timestamp is always <= the commit timestamp of a transaction, which satisfies the
|
||||
// correctness requirement for multikey writes i.e. they must occur at or before the
|
||||
// first write that set the multikey flag.
|
||||
auto recoveryPrepareOpTime = txnParticipant.getPrepareOpTimeForRecovery();
|
||||
Timestamp writeTs = recoveryPrepareOpTime.isNull()
|
||||
? LogicalClock::get(opCtx)->getClusterTime().asTimestamp()
|
||||
: recoveryPrepareOpTime.getTimestamp();
|
||||
|
||||
auto status = opCtx->recoveryUnit()->setTimestamp(writeTs);
|
||||
if (status.code() == ErrorCodes::BadValue) {
|
||||
log() << "Temporarily could not timestamp the multikey catalog write, retrying. "
|
||||
|
@ -350,6 +350,12 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
|
||||
auto transaction = TransactionParticipant::get(opCtx);
|
||||
transaction.unstashTransactionResources(opCtx, "prepareTransaction");
|
||||
|
||||
// Set this in case the application of any ops need to use the prepare timestamp of this
|
||||
// transaction. It should be cleared automatically when the transaction finishes.
|
||||
if (mode == repl::OplogApplication::Mode::kRecovering) {
|
||||
transaction.setPrepareOpTimeForRecovery(opCtx, entry.getOpTime());
|
||||
}
|
||||
|
||||
auto status = _applyOperationsForTransaction(opCtx, ops, mode);
|
||||
fassert(31137, status);
|
||||
|
||||
|
@ -1100,6 +1100,16 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
|
||||
return prepareOplogSlot.getTimestamp();
|
||||
}
|
||||
|
||||
void TransactionParticipant::Participant::setPrepareOpTimeForRecovery(OperationContext* opCtx,
|
||||
repl::OpTime prepareOpTime) {
|
||||
stdx::lock_guard<Client> lk(*opCtx->getClient());
|
||||
o(lk).recoveryPrepareOpTime = prepareOpTime;
|
||||
}
|
||||
|
||||
const repl::OpTime TransactionParticipant::Participant::getPrepareOpTimeForRecovery() const {
|
||||
return o().recoveryPrepareOpTime;
|
||||
}
|
||||
|
||||
void TransactionParticipant::Participant::addTransactionOperation(
|
||||
OperationContext* opCtx, const repl::ReplOperation& operation) {
|
||||
|
||||
@ -2024,6 +2034,7 @@ void TransactionParticipant::Participant::_resetTransactionState(
|
||||
p().transactionOperationBytes = 0;
|
||||
p().transactionOperations.clear();
|
||||
o(wl).prepareOpTime = repl::OpTime();
|
||||
o(wl).recoveryPrepareOpTime = repl::OpTime();
|
||||
p().multikeyPathInfo.clear();
|
||||
p().autoCommit = boost::none;
|
||||
|
||||
|
@ -454,6 +454,16 @@ public:
|
||||
Timestamp prepareTransaction(OperationContext* opCtx,
|
||||
boost::optional<repl::OpTime> prepareOptime);
|
||||
|
||||
/**
|
||||
* Sets the prepare optime used for recovery.
|
||||
*/
|
||||
void setPrepareOpTimeForRecovery(OperationContext* opCtx, repl::OpTime prepareOpTime);
|
||||
|
||||
/**
|
||||
* Gets the prepare optime used for recovery. Returns a null optime if unset.
|
||||
*/
|
||||
const repl::OpTime getPrepareOpTimeForRecovery() const;
|
||||
|
||||
/**
|
||||
* Commits the transaction, including committing the write unit of work and updating
|
||||
* transaction state.
|
||||
@ -878,6 +888,12 @@ private:
|
||||
// Track the prepareOpTime, the OpTime of the 'prepare' oplog entry for a transaction.
|
||||
repl::OpTime prepareOpTime;
|
||||
|
||||
// The prepare optime of the transaction. This is exposed to consumers who may need to know
|
||||
// the optime of the prepare oplog entry during replication recovery. It is stored
|
||||
// separately from the 'prepareOpTime' since it serves a different purpose and may be
|
||||
// updated at different times.
|
||||
repl::OpTime recoveryPrepareOpTime;
|
||||
|
||||
// Tracks and updates transaction metrics upon the appropriate transaction event.
|
||||
TransactionMetricsObserver transactionMetricsObserver;
|
||||
} _o;
|
||||
|
Loading…
Reference in New Issue
Block a user