mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-44150 Enable MR tests for output mode 'reduce' with a non-trivial reduce function
This commit is contained in:
parent
4722a18440
commit
0bed872351
@ -29,10 +29,6 @@ selector:
|
||||
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
|
||||
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
|
||||
|
||||
# This test is not compatible with the other mapReduce FSM tests since it requires the
|
||||
# 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
|
||||
- jstests/concurrency/fsm_workloads/map_reduce_reduce.js
|
||||
|
||||
exclude_with_any_tags:
|
||||
- uses_transactions
|
||||
- requires_replication
|
||||
|
@ -44,10 +44,6 @@ selector:
|
||||
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
|
||||
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
|
||||
|
||||
# This test is not compatible with the other mapReduce FSM tests since it requires the
|
||||
# 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
|
||||
- jstests/concurrency/fsm_workloads/map_reduce_reduce.js
|
||||
|
||||
exclude_with_any_tags:
|
||||
- requires_sharding
|
||||
|
||||
|
@ -44,10 +44,6 @@ selector:
|
||||
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
|
||||
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
|
||||
|
||||
# This test is not compatible with the other mapReduce FSM tests since it requires the
|
||||
# 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
|
||||
- jstests/concurrency/fsm_workloads/map_reduce_reduce.js
|
||||
|
||||
exclude_with_any_tags:
|
||||
- requires_sharding
|
||||
|
||||
|
@ -44,10 +44,6 @@ selector:
|
||||
- jstests/concurrency/fsm_workloads/collmod_writeconflict.js
|
||||
- jstests/concurrency/fsm_workloads/reindex_writeconflict.js
|
||||
|
||||
# This test is not compatible with the other mapReduce FSM tests since it requires the
|
||||
# 'internalQueryUseAggMapReduce' knob to be false while the other tests explicitly set it to true.
|
||||
- jstests/concurrency/fsm_workloads/map_reduce_reduce.js
|
||||
|
||||
exclude_with_any_tags:
|
||||
- requires_sharding
|
||||
|
||||
|
@ -31,12 +31,6 @@ var $config = extendWorkload($config, function($config, $super) {
|
||||
}
|
||||
|
||||
$config.states.init = function init(db, collName) {
|
||||
// TODO SERVER-44150: Cannot run MR with output 'reduce' in agg since the 'whenMatched'
|
||||
// pipeline will always run which can cause unexpected failures in the user-specified reduce
|
||||
// function.
|
||||
assert.commandWorked(
|
||||
db.adminCommand({setParameter: 1, internalQueryUseAggMapReduce: false}));
|
||||
|
||||
$super.states.init.apply(this, arguments);
|
||||
|
||||
this.outCollName = uniqueCollectionName(prefix, this.tid);
|
||||
|
@ -68,10 +68,11 @@ function runValidMrTests(coll) {
|
||||
}
|
||||
|
||||
function assertResultsValid(results, expectedCount) {
|
||||
assert.gt(results.length, 0);
|
||||
assert.lte(results.length, expectedCount);
|
||||
assert.gt(results.length, 0, tojson(results));
|
||||
assert.lte(results.length, expectedCount, tojson(results));
|
||||
results.map(resultDoc => assert.eq(resultDoc.value.avgAge,
|
||||
resultDoc.value.total / resultDoc.value.count));
|
||||
resultDoc.value.total / resultDoc.value.count,
|
||||
tojson(results)));
|
||||
}
|
||||
|
||||
// Inline output.
|
||||
@ -102,18 +103,14 @@ function runValidMrTests(coll) {
|
||||
// Cache a sample result document to ensure that re-reducing actually occurs below.
|
||||
let sampleDoc = mergeColl.findOne();
|
||||
|
||||
// TODO SERVER-44150: Enable the following tests once the new implementation is able to
|
||||
// support re-reducing against an existing collection.
|
||||
|
||||
// Output mode "reduce" to an existing unsharded collection.
|
||||
// assert.commandWorked(coll.mapReduce(
|
||||
// map,
|
||||
// reduce,
|
||||
// {finalize: fin, out: {reduce: mergeColl.getName(), db:
|
||||
// mergeColl.getDB().getName()}}));
|
||||
// res = mergeColl.find().toArray();
|
||||
// assertResultsValid(res, states.length);
|
||||
// assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
|
||||
assert.commandWorked(coll.mapReduce(
|
||||
map,
|
||||
reduce,
|
||||
{finalize: fin, out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName()}}));
|
||||
res = mergeColl.find().toArray();
|
||||
assertResultsValid(res, states.length);
|
||||
assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
|
||||
|
||||
// Drop and recreate the target collection as sharded.
|
||||
mergeColl.drop();
|
||||
@ -142,17 +139,14 @@ function runValidMrTests(coll) {
|
||||
// Cache a sample result document to ensure that re-reducing actually occurs below.
|
||||
sampleDoc = mergeColl.findOne({_id: {$not: {$in: ["AL", "PA"]}}});
|
||||
|
||||
// TODO SERVER-44150: Enable the following tests once the new implementation is able to
|
||||
// support re-reducing against an existing collection.
|
||||
|
||||
// Output mode "reduce" to an existing sharded collection.
|
||||
// assert.commandWorked(coll.mapReduce(map, reduce, {
|
||||
// finalize: fin,
|
||||
// out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName(), sharded: true}
|
||||
// }));
|
||||
// res = mergeColl.find().toArray();
|
||||
// assertResultsValid(res, states.length + 2);
|
||||
// assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
|
||||
assert.commandWorked(coll.mapReduce(map, reduce, {
|
||||
finalize: fin,
|
||||
out: {reduce: mergeColl.getName(), db: mergeColl.getDB().getName(), sharded: true}
|
||||
}));
|
||||
res = mergeColl.find().toArray();
|
||||
assertResultsValid(res, states.length + 2);
|
||||
assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
|
||||
}
|
||||
|
||||
// Test merge to a collection in the same database as the source collection.
|
||||
|
@ -83,16 +83,13 @@ function runValidMrTests(db, coll) {
|
||||
res = mergeColl.find().toArray();
|
||||
assertResultsValid(res);
|
||||
|
||||
// TODO SERVER-44150: Enable the following tests once the new implementation is able to
|
||||
// support re-reducing against an existing collection.
|
||||
|
||||
// Cache a sample result document to ensure that re-reducing actually occurs below.
|
||||
// const sampleDoc = mergeColl.findOne();
|
||||
const sampleDoc = mergeColl.findOne();
|
||||
|
||||
// // Output mode "reduce" to an existing collection.
|
||||
// assert.commandWorked(
|
||||
// coll.mapReduce(map, reduce, {finalize: fin, out: {reduce: mergeColl.getName()}}));
|
||||
// assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
|
||||
// Output mode "reduce" to an existing collection.
|
||||
assert.commandWorked(
|
||||
coll.mapReduce(map, reduce, {finalize: fin, out: {reduce: mergeColl.getName()}}));
|
||||
assert.gte(mergeColl.findOne({_id: sampleDoc._id}).value.avgAge, sampleDoc.value.avgAge);
|
||||
}
|
||||
|
||||
//
|
||||
|
@ -176,18 +176,31 @@ auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, Namespace
|
||||
|
||||
auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
|
||||
NamespaceString targetNss,
|
||||
std::string code) {
|
||||
std::string reduceCode,
|
||||
boost::optional<MapReduceJavascriptCode> finalizeCode) {
|
||||
// Because of communication for sharding, $merge must hold on to a serializable BSON object
|
||||
// at the moment so we reparse here. Note that the reduce function signature expects 2
|
||||
// arguments, the first being the key and the second being the array of values to reduce.
|
||||
auto reduceObj = BSON("args" << BSON_ARRAY("$_id" << BSON_ARRAY("$value"
|
||||
<< "$$new.value"))
|
||||
<< "eval" << code);
|
||||
<< "eval" << reduceCode);
|
||||
|
||||
auto finalProjectSpec =
|
||||
auto reduceSpec =
|
||||
BSON(DocumentSourceProject::kStageName
|
||||
<< BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj)));
|
||||
auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec});
|
||||
auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{reduceSpec});
|
||||
|
||||
// Build finalize $project stage if given.
|
||||
if (finalizeCode) {
|
||||
auto finalizeObj = BSON("args" << BSON_ARRAY("$_id"
|
||||
<< "$value")
|
||||
<< "eval" << finalizeCode->getCode());
|
||||
auto finalizeSpec =
|
||||
BSON(DocumentSourceProject::kStageName
|
||||
<< BSON("value" << BSON(ExpressionInternalJs::kExpressionName << finalizeObj)));
|
||||
pipelineSpec->emplace_back(std::move(finalizeSpec));
|
||||
}
|
||||
|
||||
return DocumentSourceMerge::create(targetNss,
|
||||
expCtx,
|
||||
MergeWhenMatchedModeEnum::kPipeline,
|
||||
@ -201,14 +214,16 @@ auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
|
||||
auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
|
||||
const OutputType outputType,
|
||||
NamespaceString targetNss,
|
||||
std::string reduceCode) {
|
||||
std::string reduceCode,
|
||||
boost::optional<MapReduceJavascriptCode> finalizeCode) {
|
||||
switch (outputType) {
|
||||
case OutputType::Replace:
|
||||
return boost::make_optional(translateOutReplace(expCtx, targetNss));
|
||||
case OutputType::Merge:
|
||||
return boost::make_optional(translateOutMerge(expCtx, targetNss));
|
||||
case OutputType::Reduce:
|
||||
return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode));
|
||||
return boost::make_optional(translateOutReduce(
|
||||
expCtx, targetNss, std::move(reduceCode), std::move(finalizeCode)));
|
||||
case OutputType::InMemory:;
|
||||
}
|
||||
return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{};
|
||||
@ -382,7 +397,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
|
||||
parsedMr.getFinalize().map([&](auto&& finalize) {
|
||||
return translateFinalize(expCtx, parsedMr.getFinalize()->getCode());
|
||||
}),
|
||||
translateOut(expCtx, outType, std::move(outNss), parsedMr.getReduce().getCode())),
|
||||
translateOut(expCtx,
|
||||
outType,
|
||||
std::move(outNss),
|
||||
parsedMr.getReduce().getCode(),
|
||||
parsedMr.getFinalize())),
|
||||
expCtx));
|
||||
pipeline->optimizePipeline();
|
||||
return pipeline;
|
||||
|
Loading…
Reference in New Issue
Block a user