mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 01:21:03 +01:00
432 lines
21 KiB
JavaScript
432 lines
21 KiB
JavaScript
/**
|
|
* Tests that config.transactions documents are correctly modified on FCV upgrade/downgrade and that
|
|
* retryability is preserved for transactions that don't use prepare after upgrade and always for
|
|
* retryable writes.
|
|
*
|
|
* @tags: [uses_transactions, uses_prepare_transaction]
|
|
*/
|
|
(function() {
|
|
"use strict";
|
|
load("jstests/libs/feature_compatibility_version.js");
|
|
load('jstests/sharding/libs/sharded_transactions_helpers.js');
|
|
|
|
const dbName = "test";
|
|
const collName = "config_transactions_set_fcv";
|
|
|
|
// Define autocommit as a variable so it can be used in object literals w/o an explicit value.
|
|
const autocommit = false;
|
|
|
|
// Start a replica set with an odd number of members to verify nodes outside the majority behave
|
|
// correctly around setFeatureCompatibilityVersion, which uses majority writes to update the FCV
|
|
// document. The primary isn't expected to change, so each secondary is given priority 0.
|
|
const rst =
|
|
new ReplSetTest({nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}]});
|
|
rst.startSet();
|
|
rst.initiate();
|
|
|
|
let testDB = rst.getPrimary().getDB(dbName);
|
|
let adminDB = rst.getPrimary().getDB("admin");
|
|
|
|
assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}}));
|
|
|
|
// Starts a dummy transaction, commits or aborts it with or without prepare, then returns the
|
|
// commit or abort response. Returns the response from prepare if it fails.
|
|
function runTxn({lsid, txnNumber}, {commit, prepare, leaveOpen}) {
|
|
const startTransactionRes = testDB.runCommand({
|
|
insert: collName,
|
|
documents: [{x: "dummy_txn"}],
|
|
txnNumber: NumberLong(txnNumber),
|
|
startTransaction: true, lsid, autocommit,
|
|
});
|
|
if (!startTransactionRes.ok || leaveOpen) {
|
|
return startTransactionRes;
|
|
}
|
|
|
|
if (prepare) {
|
|
const prepareRes = testDB.adminCommand({
|
|
prepareTransaction: 1,
|
|
txnNumber: NumberLong(txnNumber), lsid, autocommit,
|
|
writeConcern: {w: "majority"}
|
|
});
|
|
if (!prepareRes.ok) {
|
|
return prepareRes;
|
|
}
|
|
|
|
if (commit) {
|
|
// Add 1 to the increment so that the commitTimestamp is after the prepareTimestamp.
|
|
const commitTimestamp = Timestamp(prepareRes.prepareTimestamp.getTime(),
|
|
prepareRes.prepareTimestamp.getInc() + 1);
|
|
return testDB.adminCommand({
|
|
commitTransaction: 1,
|
|
commitTimestamp,
|
|
txnNumber: NumberLong(txnNumber), lsid, autocommit
|
|
});
|
|
} else {
|
|
return testDB.adminCommand(
|
|
{abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
|
|
}
|
|
}
|
|
|
|
if (commit) {
|
|
return testDB.adminCommand(
|
|
{commitTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
|
|
} else {
|
|
return testDB.adminCommand(
|
|
{abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
|
|
}
|
|
}
|
|
|
|
// Retries commitTransaction for the given txnId, returning the response.
|
|
function retryCommit({lsid, txnNumber}) {
|
|
return testDB.adminCommand(
|
|
{commitTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit});
|
|
}
|
|
|
|
// Asserts aborting the given txnId returns NoSuchTransaction.
|
|
function assertTransactionAborted({lsid, txnNumber}) {
|
|
assert.commandFailedWithCode(
|
|
testDB.adminCommand(
|
|
{abortTransaction: 1, txnNumber: NumberLong(txnNumber), lsid, autocommit}),
|
|
ErrorCodes.NoSuchTransaction);
|
|
}
|
|
|
|
// Global counter for the number of retryable writes completed. Used to verify retried retryable
|
|
// writes aren't double applied.
|
|
let numRetryableWrites = 0;
|
|
|
|
// Runs a dummy retryable write and increments the retryable write counter.
|
|
function assertRetryableWriteWorked({lsid, txnNumber}) {
|
|
numRetryableWrites += 1;
|
|
assert.commandWorked(testDB.runCommand({
|
|
insert: collName,
|
|
documents: [{fromRetryableWrite: true}],
|
|
txnNumber: NumberLong(txnNumber), lsid
|
|
}));
|
|
}
|
|
|
|
// Verifies a txnId has already been used for a retryable write by running a dummy retryable
|
|
// write and asserting the write isn't applied.
|
|
function assertRetryableWriteCanBeRetried({lsid, txnNumber}) {
|
|
assert.commandWorked(testDB.runCommand({
|
|
insert: collName,
|
|
documents: [{fromRetryableWrite: true}],
|
|
txnNumber: NumberLong(txnNumber), lsid
|
|
}));
|
|
assert.eq(numRetryableWrites, testDB[collName].find({fromRetryableWrite: true}).itcount());
|
|
}
|
|
|
|
// Searches config.transactions for an entry for the given txnId on each node in the replica
|
|
// set, verifying the entry does / does not exist and has the expected state, if specified.
|
|
function checkConfigTransactionEntry(rst, {lsid, txnNumber}, {hasEntry, expectedState}) {
|
|
rst.awaitReplication();
|
|
rst.nodes.forEach((node) => {
|
|
// Search for id since we don't know the uid, which is generated by the server.
|
|
const entry = node.getDB("config").transactions.findOne({"_id.id": lsid.id});
|
|
|
|
if (!hasEntry) {
|
|
// There should be no entry for this session or it should be for an earlier
|
|
// operation.
|
|
if (entry) {
|
|
assert.gt(txnNumber,
|
|
entry.txnNum,
|
|
"expected entry to have lower txnNumber, entry: " + tojson(entry) +
|
|
", node: " + tojson(node));
|
|
} else {
|
|
assert.isnull(entry,
|
|
"expected entry to be null, entry: " + tojson(entry) +
|
|
", node: " + tojson(node));
|
|
}
|
|
return;
|
|
}
|
|
|
|
assert.eq(txnNumber,
|
|
entry.txnNum,
|
|
"expected entry to have the same txnNumber, entry: " + tojson(entry) +
|
|
", node: " + tojson(node));
|
|
|
|
if (expectedState) {
|
|
assert.eq(expectedState,
|
|
entry.state,
|
|
"entry: " + tojson(entry) + ", node: " + tojson(node));
|
|
} else {
|
|
assert(!entry.hasOwnProperty("state"),
|
|
"expected entry to not have state, entry: " + tojson(entry) + ", node: " +
|
|
tojson(node));
|
|
}
|
|
});
|
|
}
|
|
|
|
function runTest({shouldRestart}) {
|
|
// The test waits for failpoints to log a message when hit, so clear the program output
|
|
// before starting so messages from previous iterations aren't in it.
|
|
clearRawMongoProgramOutput();
|
|
|
|
const txnIds = {
|
|
write: {lsid: {id: UUID()}, txnNumber: 0}, // Retryable write.
|
|
commit: {lsid: {id: UUID()}, txnNumber: 0}, // Committed transaction w/o prepare.
|
|
commitPrepare: {lsid: {id: UUID()}, txnNumber: 0}, // Committed transaction w/ prepare.
|
|
abort: {lsid: {id: UUID()}, txnNumber: 0}, // Aborted transaction w/o prepare.
|
|
abortPrepare: {lsid: {id: UUID()}, txnNumber: 0}, // Aborted transaction after prepare.
|
|
concurrentTxn: {lsid: {id: UUID()}, txnNumber: 0}, // Transaction concurrent w/ setFCV.
|
|
concurrentWrite:
|
|
{lsid: {id: UUID()}, txnNumber: 0}, // Retryable write concurrent w/ setFCV.
|
|
upgradingTxn:
|
|
{lsid: {id: UUID()}, txnNumber: 0}, // Transaction started during FCV upgrade.
|
|
};
|
|
|
|
//
|
|
// In the latest FCV, verify the expected updates are made to config.transactions for each
|
|
// case and the successful operations are retryable.
|
|
//
|
|
checkFCV(adminDB, latestFCV);
|
|
|
|
assertRetryableWriteWorked(txnIds.write);
|
|
assert.commandWorked(runTxn(txnIds.commit, {commit: true, prepare: false}));
|
|
assert.commandWorked(runTxn(txnIds.commitPrepare, {commit: true, prepare: true}));
|
|
assert.commandWorked(runTxn(txnIds.abort, {commit: false, prepare: false}));
|
|
assert.commandWorked(runTxn(txnIds.abortPrepare, {commit: false, prepare: true}));
|
|
|
|
checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
|
|
checkConfigTransactionEntry(
|
|
rst, txnIds.commit, {hasEntry: true, expectedState: "committed"});
|
|
checkConfigTransactionEntry(
|
|
rst, txnIds.commitPrepare, {hasEntry: true, expectedState: "committed"});
|
|
checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
|
|
checkConfigTransactionEntry(
|
|
rst, txnIds.abortPrepare, {hasEntry: true, expectedState: "aborted"});
|
|
|
|
// The retryable write and the commit of both committed transactions should be retryable.
|
|
// The aborted transactions should still be aborted.
|
|
assertRetryableWriteCanBeRetried(txnIds.write);
|
|
assert.commandWorked(retryCommit(txnIds.commit));
|
|
assert.commandWorked(retryCommit(txnIds.commitPrepare));
|
|
assertTransactionAborted(txnIds.abort);
|
|
assertTransactionAborted(txnIds.abortPrepare);
|
|
|
|
//
|
|
// Downgrade to the last-stable FCV and verify config.transactions was updated as expected
|
|
// for previously completed operations and operations concurrent with the downgrade.
|
|
//
|
|
|
|
if (shouldRestart) {
|
|
// Restart to verify config.transactions entries for sessions not in-memory at the
|
|
// beginning of FCV downgrade are updated correctly.
|
|
jsTestLog("Restarting replica set before downgrading the featureCompatibilityVersion.");
|
|
for (let i = 0; i < rst.nodes.length; i++) {
|
|
rst.restart(i);
|
|
}
|
|
testDB = rst.getPrimary().getDB(dbName);
|
|
adminDB = rst.getPrimary().getDB("admin");
|
|
}
|
|
|
|
// Make setFCV pause in the downgrading state after getting the list of sessions to
|
|
// potentially modify.
|
|
assert.commandWorked(rst.getPrimary().adminCommand(
|
|
{configureFailPoint: "pauseBeforeDowngradingSessions", mode: "alwaysOn"}));
|
|
|
|
// Downgrade FCV in a parallel shell and wait until it blocks at the failpoint above.
|
|
const awaitDowngradeFCV = startParallelShell(() => {
|
|
load("jstests/libs/feature_compatibility_version.js");
|
|
jsTestLog("Downgrade the featureCompatibilityVersion in a parallel shell.");
|
|
assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: lastStableFCV}));
|
|
}, rst.getPrimary().port);
|
|
waitForFailpoint("Hit pauseBeforeDowngradingSessions failpoint", 1 /*numTimes*/);
|
|
|
|
// Concurrent transactions that use prepare will fail.
|
|
assert.commandFailedWithCode(runTxn(txnIds.concurrentTxn, {commit: true, prepare: true}),
|
|
ErrorCodes.CommandNotSupported);
|
|
txnIds.concurrentTxn.txnNumber += 1;
|
|
|
|
// Concurrent transactions that do not use prepare and retryable writes succeed.
|
|
assert.commandWorked(runTxn(txnIds.concurrentTxn, {commit: true, prepare: false}));
|
|
assertRetryableWriteWorked(txnIds.concurrentWrite);
|
|
|
|
// Unset the failpoint and wait for the downgrade to finish.
|
|
assert.commandWorked(rst.getPrimary().adminCommand(
|
|
{configureFailPoint: "pauseBeforeDowngradingSessions", mode: "off"}));
|
|
|
|
awaitDowngradeFCV();
|
|
checkFCV(adminDB, lastStableFCV);
|
|
|
|
// The successful concurrent operations should have entries without state and be retryable.
|
|
checkConfigTransactionEntry(rst, txnIds.concurrentTxn, {hasEntry: true});
|
|
assert.commandWorked(retryCommit(txnIds.concurrentTxn));
|
|
checkConfigTransactionEntry(rst, txnIds.concurrentWrite, {hasEntry: true});
|
|
assertRetryableWriteCanBeRetried(txnIds.concurrentWrite);
|
|
|
|
// Only the retryable write entry should remain.
|
|
checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
|
|
checkConfigTransactionEntry(rst, txnIds.commit, {hasEntry: false});
|
|
checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false});
|
|
checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
|
|
checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false});
|
|
|
|
// The retryable write can be retried.
|
|
assertRetryableWriteCanBeRetried(txnIds.write);
|
|
|
|
// Neither of the commits can be retried.
|
|
assert.commandFailedWithCode(retryCommit(txnIds.commit), ErrorCodes.NoSuchTransaction);
|
|
assert.commandFailedWithCode(retryCommit(txnIds.commitPrepare),
|
|
ErrorCodes.NoSuchTransaction);
|
|
|
|
//
|
|
// In the last-stable FCV, verify the expected updates are made to config.transactions for
|
|
// each case and the successful operations are retryable.
|
|
//
|
|
|
|
// Reset each txnId to test upgrade with a clean slate.
|
|
Object.keys(txnIds).forEach((txnIdKey) => {
|
|
txnIds[txnIdKey].lsid = {id: UUID()};
|
|
txnIds[txnIdKey].txnNumber = 0;
|
|
});
|
|
|
|
// Prepare can't be used in FCV 4.0, so only commit, abort, and retryable write should
|
|
// succeed.
|
|
assertRetryableWriteWorked(txnIds.write);
|
|
assert.commandWorked(runTxn(txnIds.commit, {commit: true, prepare: false}));
|
|
assert.commandFailedWithCode(runTxn(txnIds.commitPrepare, {commit: true, prepare: true}),
|
|
ErrorCodes.CommandNotSupported);
|
|
assert.commandWorked(runTxn(txnIds.abort, {commit: false, prepare: false}));
|
|
assert.commandFailedWithCode(runTxn(txnIds.abortPrepare, {commit: false, prepare: true}),
|
|
ErrorCodes.CommandNotSupported);
|
|
|
|
// Only the retryable write and transaction that committed without prepare should have an
|
|
// entry. Neither should have state.
|
|
checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
|
|
checkConfigTransactionEntry(rst, txnIds.commit, {hasEntry: true});
|
|
checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false});
|
|
checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
|
|
checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false});
|
|
|
|
// The retryable write and successful commit can be retried.
|
|
assertRetryableWriteCanBeRetried(txnIds.write);
|
|
assert.commandWorked(retryCommit(txnIds.commit));
|
|
|
|
if (shouldRestart) {
|
|
// Restart to verify config.transactions entries for sessions not in-memory at the
|
|
// beginning of FCV upgrade are updated correctly.
|
|
jsTestLog("Restarting replica set before upgrading the featureCompatibilityVersion.");
|
|
for (let i = 0; i < rst.nodes.length; i++) {
|
|
rst.restart(i);
|
|
}
|
|
testDB = rst.getPrimary().getDB(dbName);
|
|
adminDB = rst.getPrimary().getDB("admin");
|
|
}
|
|
|
|
//
|
|
// Upgrade to the latest FCV and verify config.transactions was updated as expected for
|
|
// previously completed operations and operations concurrent with the upgrade.
|
|
//
|
|
|
|
// Run a retryable write on the session that will be used during upgrade so it has a
|
|
// transaction table entry and will be checked out by the upgrade.
|
|
assertRetryableWriteWorked(txnIds.upgradingTxn);
|
|
txnIds.upgradingTxn.txnNumber += 1;
|
|
|
|
// Make setFCV pause in the upgrading state after getting the list of sessions to
|
|
// potentially modify.
|
|
assert.commandWorked(rst.getPrimary().adminCommand(
|
|
{configureFailPoint: "pauseBeforeUpgradingSessions", mode: "alwaysOn"}));
|
|
|
|
// Upgrade FCV in a parallel shell and wait until it blocks at the failpoint above.
|
|
const awaitUpgradeFCV = startParallelShell(() => {
|
|
load("jstests/libs/feature_compatibility_version.js");
|
|
jsTestLog("Upgrade the featureCompatibilityVersion in a parallel shell.");
|
|
assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
|
|
}, rst.getPrimary().port);
|
|
waitForFailpoint("Hit pauseBeforeUpgradingSessions failpoint", 1 /*numTimes*/);
|
|
|
|
// Concurrent transactions that use prepare will fail.
|
|
assert.commandFailedWithCode(runTxn(txnIds.concurrentTxn, {commit: true, prepare: true}),
|
|
ErrorCodes.CommandNotSupported);
|
|
txnIds.concurrentTxn.txnNumber += 1;
|
|
|
|
// Concurrent transactions that do not use prepare and retryable writes succeed.
|
|
assert.commandWorked(runTxn(txnIds.concurrentTxn, {commit: true, prepare: false}));
|
|
assertRetryableWriteWorked(txnIds.concurrentWrite);
|
|
|
|
// Start a transaction in the upgrading state and verify that it doesn't get aborted by the
|
|
// rest of the upgrade. Note that all sessions are killed and their transactions aborted for
|
|
// writes to the FCV document except when it is set to the fully upgraded state, so this
|
|
// can't be tested for downgrade.
|
|
assert.commandWorked(runTxn(txnIds.upgradingTxn, {leaveOpen: true}));
|
|
|
|
// Unset the failpoint and wait for the upgrade to finish.
|
|
assert.commandWorked(rst.getPrimary().adminCommand(
|
|
{configureFailPoint: "pauseBeforeUpgradingSessions", mode: "off"}));
|
|
|
|
awaitUpgradeFCV();
|
|
checkFCV(adminDB, latestFCV);
|
|
|
|
// The transaction started while upgrading shouldn't have been killed and can be committed.
|
|
assert.commandWorked(testDB.adminCommand({
|
|
commitTransaction: 1,
|
|
lsid: txnIds.upgradingTxn.lsid,
|
|
txnNumber: NumberLong(txnIds.upgradingTxn.txnNumber), autocommit
|
|
}));
|
|
|
|
// The successful concurrent transaction should have "committed" state and be retryable, and
|
|
// the concurrent retryable write should not have state and also be retryable.
|
|
checkConfigTransactionEntry(
|
|
rst, txnIds.concurrentTxn, {hasEntry: true, expectedState: "committed"});
|
|
assert.commandWorked(retryCommit(txnIds.concurrentTxn));
|
|
checkConfigTransactionEntry(rst, txnIds.concurrentWrite, {hasEntry: true});
|
|
assertRetryableWriteCanBeRetried(txnIds.concurrentWrite);
|
|
|
|
// There should still only be entries for the committed transaction and retryable write. The
|
|
// committed transaction should now have a "state" field.
|
|
checkConfigTransactionEntry(rst, txnIds.write, {hasEntry: true});
|
|
checkConfigTransactionEntry(
|
|
rst, txnIds.commit, {hasEntry: true, expectedState: "committed"});
|
|
checkConfigTransactionEntry(rst, txnIds.commitPrepare, {hasEntry: false});
|
|
checkConfigTransactionEntry(rst, txnIds.abort, {hasEntry: false});
|
|
checkConfigTransactionEntry(rst, txnIds.abortPrepare, {hasEntry: false});
|
|
|
|
// The retryable write and successful commit can be retried.
|
|
assertRetryableWriteCanBeRetried(txnIds.write);
|
|
assert.commandWorked(retryCommit(txnIds.commit));
|
|
}
|
|
|
|
runTest({shouldRestart: false});
|
|
runTest({shouldRestart: true});
|
|
|
|
//
|
|
// Verify setFCV is interruptible between modifying sessions.
|
|
//
|
|
clearRawMongoProgramOutput();
|
|
checkFCV(adminDB, latestFCV);
|
|
|
|
// Construct a config.transactions entry that would be modified by downgrade.
|
|
const txnIds = {interrupt: {lsid: {id: UUID()}, txnNumber: 0}};
|
|
assert.commandWorked(runTxn(txnIds.interrupt, {commit: true, prepare: true}));
|
|
checkConfigTransactionEntry(
|
|
rst, txnIds.interrupt, {hasEntry: true, expectedState: "committed"});
|
|
|
|
// Pause setFCV before it would modify the entry.
|
|
assert.commandWorked(rst.getPrimary().adminCommand(
|
|
{configureFailPoint: "pauseBeforeDowngradingSessions", mode: "alwaysOn"}));
|
|
|
|
TestData.setFCVLsid = {id: UUID()};
|
|
const awaitUpgradeFCV = startParallelShell(() => {
|
|
load("jstests/libs/feature_compatibility_version.js");
|
|
assert.commandFailedWithCode(
|
|
db.adminCommand(
|
|
{setFeatureCompatibilityVersion: lastStableFCV, lsid: TestData.setFCVLsid}),
|
|
ErrorCodes.Interrupted);
|
|
}, rst.getPrimary().port);
|
|
waitForFailpoint("Hit pauseBeforeDowngradingSessions failpoint", 1 /*numTimes*/);
|
|
|
|
// Kill the session running setFCV.
|
|
assert.commandWorked(rst.getPrimary().adminCommand({killSessions: [TestData.setFCVLsid]}));
|
|
|
|
// Unpause the failpoint and verify setFCV returns without modifying config.transactions.
|
|
assert.commandWorked(rst.getPrimary().adminCommand(
|
|
{configureFailPoint: "pauseBeforeDowngradingSessions", mode: "off"}));
|
|
|
|
awaitUpgradeFCV();
|
|
checkConfigTransactionEntry(
|
|
rst, txnIds.interrupt, {hasEntry: true, expectedState: "committed"});
|
|
|
|
rst.stopSet();
|
|
}());
|