mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-30 00:56:44 +01:00
290 lines
12 KiB
JavaScript
290 lines
12 KiB
JavaScript
/**
|
|
* Test that an aggregation with a $merge stage obeys the maxTimeMS.
|
|
* @tags: [requires_sharding, requires_replication]
|
|
*/
|
|
(function() {
|
|
load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode().
|
|
load("jstests/libs/fixture_helpers.js"); // For isMongos().
|
|
load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow.
|
|
|
|
const kDBName = "test";
|
|
const kSourceCollName = "merge_max_time_ms_source";
|
|
const kDestCollName = "merge_max_time_ms_dest";
|
|
const nDocs = 10;
|
|
|
|
/**
|
|
* Helper for populating the collection.
|
|
*/
|
|
function insertDocs(coll) {
|
|
for (let i = 0; i < nDocs; i++) {
|
|
assert.commandWorked(coll.insert({_id: i}, {writeConcern: {w: "majority"}}));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Wait until the server sets its CurOp "failpointMsg" to the failpoint name, indicating that it's
|
|
* hanging.
|
|
*/
|
|
function waitUntilServerHangsOnFailPoint(conn, fpName) {
|
|
// Be sure that the server is hanging on the failpoint.
|
|
assert.soon(function() {
|
|
const filter = {"failpointMsg": fpName};
|
|
const ops = conn.getDB("admin")
|
|
.aggregate([{$currentOp: {allUsers: true}}, {$match: filter}])
|
|
.toArray();
|
|
return ops.length == 1;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Given a $merge parameters mongod connection, run a $merge aggregation against 'conn'. Set the
|
|
* provided failpoint on the node specified by 'failPointConn' in order to hang during the
|
|
* aggregate. Ensure that the $merge maxTimeMS expires on the node specified by 'maxTimeMsConn'.
|
|
*/
|
|
function forceAggregationToHangAndCheckMaxTimeMsExpires(
|
|
whenMatched, whenNotMatched, failPointName, conn, failPointConn, maxTimeMsConn) {
|
|
// Use a short maxTimeMS so that the test completes in a reasonable amount of time. We will
|
|
// use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not
|
|
// prematurely time out.
|
|
const maxTimeMS = 1000 * 2;
|
|
|
|
// Enable a failPoint so that the write will hang.
|
|
const failpointCommand = {
|
|
configureFailPoint: failPointName,
|
|
mode: "alwaysOn",
|
|
data: {nss: kDBName + "." + kDestCollName}
|
|
};
|
|
|
|
assert.commandWorked(failPointConn.getDB("admin").runCommand(failpointCommand));
|
|
|
|
// Make sure we don't run out of time on either of the involved nodes before the failpoint is
|
|
// hit.
|
|
assert.commandWorked(conn.getDB("admin").runCommand(
|
|
{configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));
|
|
assert.commandWorked(maxTimeMsConn.getDB("admin").runCommand(
|
|
{configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));
|
|
|
|
// Build the parallel shell function.
|
|
let shellStr = `const testDB = db.getSiblingDB('${kDBName}');`;
|
|
shellStr += `const sourceColl = testDB['${kSourceCollName}'];`;
|
|
shellStr += `const destColl = testDB['${kDestCollName}'];`;
|
|
shellStr += `const maxTimeMS = ${maxTimeMS};`;
|
|
shellStr += `const whenMatched = ${tojson(whenMatched)};`;
|
|
shellStr += `const whenNotMatched = '${whenNotMatched}';`;
|
|
const runAggregate = function() {
|
|
const pipeline = [{
|
|
$merge:
|
|
{into: destColl.getName(), whenMatched: whenMatched, whenNotMatched: whenNotMatched}
|
|
}];
|
|
const err = assert.throws(
|
|
() => sourceColl.aggregate(
|
|
pipeline, {maxTimeMS: maxTimeMS, $readPreference: {mode: "secondary"}}));
|
|
assert.eq(err.code, ErrorCodes.MaxTimeMSExpired, "expected aggregation to fail");
|
|
};
|
|
shellStr += `(${runAggregate.toString()})();`;
|
|
const awaitShell = startParallelShell(shellStr, conn.port);
|
|
|
|
waitUntilServerHangsOnFailPoint(failPointConn, failPointName);
|
|
|
|
assert.commandWorked(maxTimeMsConn.getDB("admin").runCommand(
|
|
{configureFailPoint: "maxTimeNeverTimeOut", mode: "off"}));
|
|
|
|
// The aggregation running in the parallel shell will hang on the failpoint, burning
|
|
// its time. Wait until the maxTimeMS has definitely expired.
|
|
sleep(maxTimeMS + 2000);
|
|
|
|
// Now drop the failpoint, allowing the aggregation to proceed. It should hit an
|
|
// interrupt check and terminate immediately.
|
|
assert.commandWorked(
|
|
failPointConn.getDB("admin").runCommand({configureFailPoint: failPointName, mode: "off"}));
|
|
|
|
// Wait for the parallel shell to finish.
|
|
assert.eq(awaitShell(), 0);
|
|
}
|
|
|
|
/**
|
|
* Run a $merge aggregate against the node specified by 'conn' with primary 'primaryConn' (these may
|
|
* be the same node). Verify that maxTimeMS properly times out the aggregate on the node specified
|
|
* by 'maxTimeMsConn' both while hanging on the insert/update on 'primaryConn' and while hanging on
|
|
* the batch being built on 'conn'.
|
|
*/
|
|
function runUnshardedTest(whenMatched, whenNotMatched, conn, primaryConn, maxTimeMsConn) {
|
|
jsTestLog("Running unsharded test in whenMatched: " + whenMatched +
|
|
" whenNotMatched: " + whenNotMatched);
|
|
// The target collection will always be empty so we do not test the setting that will cause
|
|
// only failure.
|
|
if (whenNotMatched == "fail") {
|
|
return;
|
|
}
|
|
|
|
const sourceColl = conn.getDB(kDBName)[kSourceCollName];
|
|
const destColl = primaryConn.getDB(kDBName)[kDestCollName];
|
|
assert.commandWorked(destColl.remove({}));
|
|
|
|
// Be sure we're able to read from a cursor with a maxTimeMS set on it.
|
|
(function() {
|
|
// Use a long maxTimeMS, since we expect the operation to finish.
|
|
const maxTimeMS = 1000 * 600;
|
|
const pipeline = [{
|
|
$merge:
|
|
{into: destColl.getName(), whenMatched: whenMatched, whenNotMatched: whenNotMatched}
|
|
}];
|
|
assert.doesNotThrow(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS}));
|
|
})();
|
|
|
|
assert.commandWorked(destColl.remove({}));
|
|
|
|
// Force the aggregation to hang while the batch is being written. The failpoint changes
|
|
// depending on the mode. If 'whenMatched' is set to "fail" then the implementation will end
|
|
// up issuing insert commands instead of updates.
|
|
const kFailPointName =
|
|
whenMatched === "fail" ? "hangDuringBatchInsert" : "hangDuringBatchUpdate";
|
|
forceAggregationToHangAndCheckMaxTimeMsExpires(
|
|
whenMatched, whenNotMatched, kFailPointName, conn, primaryConn, maxTimeMsConn);
|
|
|
|
assert.commandWorked(destColl.remove({}));
|
|
|
|
// Force the aggregation to hang while the batch is being built.
|
|
forceAggregationToHangAndCheckMaxTimeMsExpires(
|
|
whenMatched, whenNotMatched, "hangWhileBuildingDocumentSourceMergeBatch", conn, conn, conn);
|
|
}
|
|
|
|
// Run on a standalone.
|
|
(function() {
|
|
const conn = MongoRunner.runMongod({});
|
|
assert.neq(null, conn, 'mongod was unable to start up');
|
|
insertDocs(conn.getDB(kDBName)[kSourceCollName]);
|
|
withEachMergeMode(
|
|
(mode) => runUnshardedTest(mode.whenMatchedMode, mode.whenNotMatchedMode, conn, conn, conn));
|
|
MongoRunner.stopMongod(conn);
|
|
})();
|
|
|
|
// Run on the primary and the secondary of a replica set.
|
|
(function() {
|
|
const replTest = new ReplSetTest({nodes: 2});
|
|
replTest.startSet();
|
|
replTest.initiate();
|
|
replTest.awaitReplication();
|
|
const primary = replTest.getPrimary();
|
|
const secondary = replTest.getSecondary();
|
|
insertDocs(primary.getDB(kDBName)[kSourceCollName]);
|
|
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
|
|
// Run the $merge on the primary and test that the maxTimeMS times out on the primary.
|
|
runUnshardedTest(whenMatchedMode, whenNotMatchedMode, primary, primary, primary);
|
|
// Run the $merge on the secondary and test that the maxTimeMS times out on the primary.
|
|
runUnshardedTest(whenMatchedMode, whenNotMatchedMode, secondary, primary, primary);
|
|
// Run the $merge on the secondary and test that the maxTimeMS times out on the secondary.
|
|
runUnshardedTest(whenMatchedMode, whenNotMatchedMode, secondary, primary, secondary);
|
|
});
|
|
replTest.stopSet();
|
|
})();
|
|
|
|
// Runs a $merge against 'mongosConn' and verifies that the maxTimeMS value is included in the
|
|
// command sent to mongod. Since the actual timeout can unreliably happen in mongos before even
|
|
// reaching the shard, we instead set a very large timeout and verify that the command sent to
|
|
// mongod includes the maxTimeMS.
|
|
function runShardedTest(whenMatched, whenNotMatched, mongosConn, mongodConn, comment) {
|
|
jsTestLog("Running sharded test in whenMatched: " + whenMatched +
|
|
" whenNotMatched: " + whenNotMatched);
|
|
// The target collection will always be empty so we do not test the setting that will cause
|
|
// only failure.
|
|
if (whenNotMatched == "fail") {
|
|
return;
|
|
}
|
|
|
|
// Set a large timeout since we expect the command to finish.
|
|
const maxTimeMS = 1000 * 20;
|
|
|
|
const sourceColl = mongosConn.getDB(kDBName)[kSourceCollName];
|
|
const destColl = mongosConn.getDB(kDBName)[kDestCollName];
|
|
assert.commandWorked(destColl.remove({}));
|
|
|
|
// Make sure we don't timeout in mongos before even reaching the shards.
|
|
assert.commandWorked(mongosConn.getDB("admin").runCommand(
|
|
{configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));
|
|
|
|
const cursor = sourceColl.aggregate(
|
|
[{
|
|
$merge:
|
|
{into: destColl.getName(), whenMatched: whenMatched, whenNotMatched: whenNotMatched}
|
|
}],
|
|
{maxTimeMS: maxTimeMS, comment: comment});
|
|
assert(!cursor.hasNext());
|
|
|
|
// Filter the profiler entries on the existence of $merge, since aggregations through mongos
|
|
// will include an extra aggregation with an empty pipeline to establish cursors on the
|
|
// shards.
|
|
assert.soon(function() {
|
|
return mongodConn.getDB(kDBName)
|
|
.system.profile
|
|
.find({
|
|
"command.aggregate": kSourceCollName,
|
|
"command.pipeline.$merge": {"$exists": true},
|
|
"command.comment": comment,
|
|
"command.maxTimeMS": maxTimeMS,
|
|
})
|
|
.itcount() == 1;
|
|
});
|
|
|
|
assert.commandWorked(mongosConn.getDB("admin").runCommand(
|
|
{configureFailPoint: "maxTimeNeverTimeOut", mode: "off"}));
|
|
}
|
|
|
|
// Run on a sharded cluster.
|
|
(function() {
|
|
const st = new ShardingTest({shards: 2});
|
|
|
|
// Ensure shard 0 is the primary shard. This is so that the $merge stage is guaranteed to
|
|
// run on it.
|
|
assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: kDBName}));
|
|
st.ensurePrimaryShard(kDBName, st.shard0.name);
|
|
|
|
// Set up the source collection to be sharded in a way such that each node will have some
|
|
// documents for the remainder of the test.
|
|
// shard 0: [MinKey, 5]
|
|
// shard 1: [5, MaxKey]
|
|
st.shardColl(kSourceCollName,
|
|
{_id: 1}, // key
|
|
{_id: 5}, // split
|
|
{_id: 6}, // move
|
|
kDBName);
|
|
insertDocs(st.s.getDB(kDBName)[kSourceCollName]);
|
|
|
|
// Start the profiler on each shard so that we can examine the $out's maxTimeMS.
|
|
assert.commandWorked(st.shard0.getDB(kDBName).setProfilingLevel(2));
|
|
assert.commandWorked(st.shard1.getDB(kDBName).setProfilingLevel(2));
|
|
|
|
// // Run the test with 'destColl' unsharded.
|
|
withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode,
|
|
mode.whenNotMatchedMode,
|
|
st.s,
|
|
st.shard0,
|
|
tojson(mode) + "_unshardedDest"));
|
|
|
|
// Run the test with 'destColl' sharded. This means that writes will be sent to both
|
|
// shards, and if either one hangs, the MaxTimeMS will expire.
|
|
// Shard the destination collection.
|
|
st.shardColl(kDestCollName,
|
|
{_id: 1}, // key
|
|
{_id: 5}, // split
|
|
{_id: 6}, // move
|
|
kDBName);
|
|
|
|
jsTestLog("Running test forcing shard " + st.shard0.name + " to hang");
|
|
withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode,
|
|
mode.whenNotMatchedMode,
|
|
st.s,
|
|
st.shard0,
|
|
tojson(mode) + "_shardedDest_" + st.shard0.name));
|
|
|
|
jsTestLog("Running test forcing shard " + st.shard1.name + " to hang");
|
|
withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode,
|
|
mode.whenNotMatchedMode,
|
|
st.s,
|
|
st.shard1,
|
|
tojson(mode) + "_shardedDest_" + st.shard1.name));
|
|
|
|
st.stop();
|
|
})();
|
|
})();
|