mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-47645 Must invalidate all sessions on stepdown
This commit is contained in:
parent
7b910aba3c
commit
6cbc99ea55
@ -11,6 +11,7 @@ selector:
|
||||
- jstests/replsets/initial_sync_drop_collection.js
|
||||
- jstests/replsets/step_down_chaining_disabled.js
|
||||
- jstests/replsets/apply_prepare_txn_write_conflict_robustness.js
|
||||
- jstests/replsets/invalidate_sessions_on_stepdown.js
|
||||
executor:
|
||||
config:
|
||||
shell_options:
|
||||
|
@ -36,6 +36,8 @@ concurrency_sharded_replication_multiversion:
|
||||
replica_sets_multiversion:
|
||||
- ticket: SERVER-49471
|
||||
test_file: jstests/replsets/apply_prepare_txn_write_conflict_robustness.js
|
||||
- ticket: SERVER-47645
|
||||
test_file: jstests/replsets/invalidate_sessions_on_stepdown.js
|
||||
|
||||
replica_sets_jscore_multiversion_passthrough:
|
||||
- ticket: SERVER-47773
|
||||
|
67
jstests/replsets/invalidate_sessions_on_stepdown.js
Normal file
67
jstests/replsets/invalidate_sessions_on_stepdown.js
Normal file
@ -0,0 +1,67 @@
|
||||
/**
|
||||
* Tests that a txnNumber for a transaction that was aborted in-memory can be reused by a new
|
||||
* primary. Mongos does this as an optimization.
|
||||
* @tags: [uses_transactions, uses_prepare_transaction]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
const rst = new ReplSetTest({name: "invalidate_sessions_on_stepdown", nodes: 2});
|
||||
rst.startSet();
|
||||
rst.initiateWithHighElectionTimeout();
|
||||
|
||||
const dbName = "test";
|
||||
const collName = "coll";
|
||||
const node0 = rst.getPrimary();
|
||||
const node1 = rst.getSecondary();
|
||||
const node0Session = node0.startSession();
|
||||
const sessionId = node0Session.getSessionId();
|
||||
const node0SessionDB = node0Session.getDatabase(dbName);
|
||||
|
||||
assert.commandWorked(node0SessionDB.coll.insert({a: 1}, {writeConcern: {"w": "majority"}}));
|
||||
|
||||
jsTestLog("Run a transaction with txnNumber 0 on the primary.");
|
||||
assert.commandWorked(node0SessionDB.runCommand({
|
||||
insert: collName,
|
||||
documents: [{b: 1}],
|
||||
txnNumber: NumberLong(0),
|
||||
startTransaction: true,
|
||||
autocommit: false
|
||||
}));
|
||||
|
||||
jsTestLog("Step up the secondary. The primary will abort the transaction when it steps down.");
|
||||
assert.commandWorked(node1.adminCommand({replSetStepUp: 1}));
|
||||
assert.eq(node1, rst.getPrimary());
|
||||
|
||||
const node1DB = node1.getDB(dbName);
|
||||
|
||||
jsTestLog("Run a transaction with txnNumber 0 and the same session ID on the new primary.");
|
||||
assert.commandWorked(node1DB.runCommand({
|
||||
insert: collName,
|
||||
documents: [{c: 1}],
|
||||
lsid: sessionId,
|
||||
txnNumber: NumberLong(0),
|
||||
startTransaction: true,
|
||||
autocommit: false
|
||||
}));
|
||||
let res = assert.commandWorked(node1DB.adminCommand({
|
||||
prepareTransaction: 1,
|
||||
lsid: sessionId,
|
||||
txnNumber: NumberLong(0),
|
||||
autocommit: false,
|
||||
writeConcern: {w: "majority"}
|
||||
}));
|
||||
assert.commandWorked(node1DB.adminCommand({
|
||||
commitTransaction: 1,
|
||||
commitTimestamp: res.prepareTimestamp,
|
||||
lsid: sessionId,
|
||||
txnNumber: NumberLong(0),
|
||||
autocommit: false,
|
||||
writeConcern: {w: "majority"}
|
||||
}));
|
||||
assert.eq(2, node0SessionDB.coll.find().itcount());
|
||||
assert.eq(0, node0SessionDB.coll.find({b: 1}).itcount());
|
||||
assert.eq(1, node0SessionDB.coll.find({c: 1}).itcount());
|
||||
|
||||
rst.stopSet();
|
||||
})();
|
@ -218,4 +218,24 @@ void yieldLocksForPreparedTransactions(OperationContext* opCtx) {
|
||||
ErrorCodes::InterruptedDueToReplStateChange);
|
||||
}
|
||||
|
||||
void invalidateSessionsForStepdown(OperationContext* opCtx) {
|
||||
// It is illegal to invalidate the sessions if the operation has a session checked out.
|
||||
invariant(!OperationContextSession::get(opCtx));
|
||||
|
||||
SessionKiller::Matcher matcherAllSessions(
|
||||
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
|
||||
killSessionsAction(opCtx,
|
||||
matcherAllSessions,
|
||||
[](const ObservableSession& session) {
|
||||
return !TransactionParticipant::get(session).transactionIsPrepared();
|
||||
},
|
||||
[](OperationContext* killerOpCtx, const SessionToKill& session) {
|
||||
auto txnParticipant = TransactionParticipant::get(session);
|
||||
if (!txnParticipant.transactionIsPrepared()) {
|
||||
txnParticipant.invalidate(killerOpCtx);
|
||||
}
|
||||
},
|
||||
ErrorCodes::InterruptedDueToReplStateChange);
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -71,4 +71,10 @@ void killSessionsAbortAllPreparedTransactions(OperationContext* opCtx);
|
||||
*/
|
||||
void yieldLocksForPreparedTransactions(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Invalidates sessions that do not have prepared transactions, since txnNumbers for transactions
|
||||
* that were aborted in-memory may be reused on the new primary.
|
||||
*/
|
||||
void invalidateSessionsForStepdown(OperationContext* opCtx);
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -2692,6 +2692,7 @@ void ReplicationCoordinatorImpl::stepDown(OperationContext* opCtx,
|
||||
lk.unlock();
|
||||
|
||||
yieldLocksForPreparedTransactions(opCtx);
|
||||
invalidateSessionsForStepdown(opCtx);
|
||||
|
||||
lk.lock();
|
||||
|
||||
@ -3554,6 +3555,7 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx,
|
||||
lk.unlock();
|
||||
|
||||
yieldLocksForPreparedTransactions(opCtx);
|
||||
invalidateSessionsForStepdown(opCtx);
|
||||
|
||||
lk.lock();
|
||||
|
||||
|
@ -509,6 +509,7 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
|
||||
lk.unlock();
|
||||
|
||||
yieldLocksForPreparedTransactions(opCtx.get());
|
||||
invalidateSessionsForStepdown(opCtx.get());
|
||||
|
||||
lk.lock();
|
||||
|
||||
@ -785,6 +786,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
|
||||
lk.unlock();
|
||||
|
||||
yieldLocksForPreparedTransactions(opCtx.get());
|
||||
invalidateSessionsForStepdown(opCtx.get());
|
||||
|
||||
lk.lock();
|
||||
|
||||
|
@ -590,6 +590,9 @@ void TransactionParticipant::Participant::beginOrContinueTransactionUnconditiona
|
||||
|
||||
if (o().activeTxnNumber != txnNumber) {
|
||||
_beginMultiDocumentTransaction(opCtx, txnNumber);
|
||||
} else {
|
||||
invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared),
|
||||
str::stream() << "Current state: " << o().txnState);
|
||||
}
|
||||
|
||||
// Assume we need to write an abort if we abort this transaction. This method is called only
|
||||
|
@ -53,8 +53,8 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1},
|
||||
{"_recvChunkStart", 1}};
|
||||
|
||||
// Commands that can be sent with session info but should not check out a session.
|
||||
const StringMap<int> skipSessionCheckoutList = {{"coordinateCommitTransaction", 1},
|
||||
{"_recvChunkStart", 1}};
|
||||
const StringMap<int> skipSessionCheckoutList = {
|
||||
{"coordinateCommitTransaction", 1}, {"_recvChunkStart", 1}, {"replSetStepDown", 1}};
|
||||
|
||||
const StringMap<int> transactionCommands = {{"commitTransaction", 1},
|
||||
{"coordinateCommitTransaction", 1},
|
||||
|
Loading…
Reference in New Issue
Block a user