mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-44734 Implement large document optimization for agg MR
This commit is contained in:
parent
a42bd2546b
commit
0ac857c06f
@ -6,6 +6,7 @@ selector:
|
||||
roots:
|
||||
- jstests/core/commands_namespace_parsing.js
|
||||
- jstests/core/map_reduce_validation.js
|
||||
- jstests/core/mr_bigobject.js
|
||||
- jstests/core/mr_preserve_indexes.js
|
||||
- jstests/core/mr_merge.js
|
||||
- jstests/core/mr_mutable_properties.js
|
||||
|
@ -79,7 +79,7 @@ for (let i = 0; i < nDocs; i++) {
|
||||
bulk.insert({word: "hello", val: "a".repeat(longStringLength)});
|
||||
}
|
||||
assert.commandWorked(bulk.execute());
|
||||
assert.commandFailedWithCode(db.runCommand(command), [ErrorCodes.BSONObjectTooLarge, 16493]);
|
||||
assert.commandFailedWithCode(db.runCommand(command), [31392, 16493]);
|
||||
|
||||
//
|
||||
// Test that the accumulator correctly fails for invalid arguments.
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Tests that mapReduce's map function fails if it outputs objects that are too large.
|
||||
// Confirms that the mapReduce reduce function will process data sets larger than 16MB.
|
||||
// @tags: [
|
||||
// # mapReduce does not support afterClusterTime.
|
||||
// does_not_support_causal_consistency,
|
||||
@ -20,28 +20,23 @@ for (let i = 0; i < 5; i++)
|
||||
bulk.insert({_id: i, s: largeString});
|
||||
assert.commandWorked(bulk.execute());
|
||||
|
||||
let mapFn = function() {
|
||||
emit(1, this.s + this.s);
|
||||
// MapReduce succeeds when the reduce function processes single-key data sets larger than 16MB.
|
||||
const mapFn = function() {
|
||||
emit(1, this.s);
|
||||
};
|
||||
|
||||
let reduceFn = function(k, v) {
|
||||
return 1;
|
||||
};
|
||||
|
||||
assert.throws(function() {
|
||||
coll.mapReduce(mapFn, reduceFn, outputColl.getName());
|
||||
}, [], "emit should fail");
|
||||
|
||||
mapFn = function() {
|
||||
emit(1, this.s);
|
||||
};
|
||||
assert.commandWorked(coll.mapReduce(mapFn, reduceFn, outputColl.getName()));
|
||||
assert.commandWorked(coll.mapReduce(mapFn, reduceFn, {out: {"merge": outputColl.getName()}}));
|
||||
assert.eq([{_id: 1, value: 1}], outputColl.find().toArray());
|
||||
|
||||
// The reduce function processes the expected amount of data.
|
||||
reduceFn = function(k, v) {
|
||||
total = 0;
|
||||
for (var i = 0; i < v.length; i++) {
|
||||
var x = v[i];
|
||||
for (let i = 0; i < v.length; i++) {
|
||||
const x = v[i];
|
||||
if (typeof (x) == "number")
|
||||
total += x;
|
||||
else
|
||||
@ -50,6 +45,6 @@ reduceFn = function(k, v) {
|
||||
return total;
|
||||
};
|
||||
|
||||
assert.commandWorked(coll.mapReduce(mapFn, reduceFn, outputColl.getName()));
|
||||
assert.commandWorked(coll.mapReduce(mapFn, reduceFn, {out: {"merge": outputColl.getName()}}));
|
||||
assert.eq([{_id: 1, value: coll.count() * largeString.length}], outputColl.find().toArray());
|
||||
}());
|
||||
|
@ -9,6 +9,9 @@
|
||||
// in Map, Reduce and Finalize functions
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
load("jstests/aggregation/extras/utils.js"); // For assertArrayEq.
|
||||
|
||||
const collection = db.mrMutableReceiver;
|
||||
collection.drop();
|
||||
collection.insert({a: 1});
|
||||
@ -60,17 +63,14 @@ const finalize = function(key, values) {
|
||||
return values;
|
||||
};
|
||||
|
||||
const mr = collection.mapReduce(map, reduce, {finalize: finalize, out: {inline: 1}});
|
||||
printjson(mr);
|
||||
const cmdResult = collection.mapReduce(map, reduce, {finalize: finalize, out: {inline: 1}});
|
||||
|
||||
// verify mutated properties exist (order dictated by emit sequence and properties added)
|
||||
assert.eq(mr.results[0].value.food[0].beef, 1);
|
||||
assert.eq(mr.results[0].value.food[1].cake, 1);
|
||||
assert.eq(mr.results[0].value.food[2].beat, 1);
|
||||
assert.eq(mr.results[0].value.food[3].mochi, 1);
|
||||
assert.eq(mr.results[0].value.food[4].ice, 1);
|
||||
assert.eq(mr.results[0].value.food[5].cream, 1);
|
||||
mr.results[0].value.food.forEach(function(val) {
|
||||
assert.eq(val.mod, 1);
|
||||
});
|
||||
assertArrayEq(cmdResult.results[0].value.food, [
|
||||
{"cake": 1, "mod": 1},
|
||||
{"beef": 1, "mod": 1},
|
||||
{"beat": 1, "mod": 1},
|
||||
{"mochi": 1, "mod": 1},
|
||||
{"ice": 1, "mod": 1},
|
||||
{"cream": 1, "mod": 1}
|
||||
]);
|
||||
}());
|
||||
|
@ -14,6 +14,8 @@
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
load("jstests/aggregation/extras/utils.js"); // For assertArrayEq.
|
||||
|
||||
// Use a unique database name to avoid conflicts with other tests that directly modify
|
||||
// system.js.
|
||||
const testDB = db.getSiblingDB("mr_stored");
|
||||
@ -58,8 +60,8 @@ const out = testDB.mr_stored_out;
|
||||
|
||||
function assertCorrect(results) {
|
||||
assert.eq(2, Object.keySet(results).length);
|
||||
assert.eq([9, 11, 30], results["1"].stats);
|
||||
assert.eq([9, 41, 41], results["2"].stats);
|
||||
assertArrayEq([9, 11, 30], results["1"].stats);
|
||||
assertArrayEq([9, 41, 41], results["2"].stats);
|
||||
}
|
||||
|
||||
// Stored Map.
|
||||
|
@ -111,36 +111,60 @@ void AccumulatorInternalJsReduce::processInternal(const Value& input, bool mergi
|
||||
}
|
||||
|
||||
Value AccumulatorInternalJsReduce::getValue(bool toBeMerged) {
|
||||
auto val = [&]() {
|
||||
if (_values.size() < 1) {
|
||||
return Value{};
|
||||
}
|
||||
if (_values.size() < 1) {
|
||||
return Value{};
|
||||
}
|
||||
|
||||
Value result;
|
||||
// Keep reducing until we have exactly one value.
|
||||
while (true) {
|
||||
BSONArrayBuilder bsonValues;
|
||||
for (const auto& val : _values) {
|
||||
size_t numLeft = _values.size();
|
||||
for (; numLeft > 0; numLeft--) {
|
||||
Value val = _values[numLeft - 1];
|
||||
|
||||
// Do not insert if doing so would exceed the the maximum allowed BSONObj size.
|
||||
if (bsonValues.len() + _key.getApproximateSize() + val.getApproximateSize() >
|
||||
BSONObjMaxUserSize) {
|
||||
// If we have reached the threshold for maximum allowed BSONObj size and only have a
|
||||
// single value then no progress will be made on reduce. We must fail when this
|
||||
// scenario is encountered.
|
||||
size_t numNextReduce = _values.size() - numLeft;
|
||||
uassert(31392, "Value too large to reduce", numNextReduce > 1);
|
||||
break;
|
||||
}
|
||||
bsonValues << val;
|
||||
}
|
||||
|
||||
auto expCtx = getExpressionContext();
|
||||
auto reduceFunc = makeJsFunc(expCtx, _funcSource.toString());
|
||||
|
||||
|
||||
// Function signature: reduce(key, values).
|
||||
BSONObj params = BSON_ARRAY(_key << bsonValues.arr());
|
||||
// For reduce, the key and values are both passed as 'params' so there's no need to set
|
||||
// 'this'.
|
||||
BSONObj thisObj;
|
||||
return expCtx->getJsExecWithScope()->callFunction(reduceFunc, params, thisObj);
|
||||
}();
|
||||
Value reduceResult =
|
||||
expCtx->getJsExecWithScope()->callFunction(reduceFunc, params, thisObj);
|
||||
if (numLeft == 0) {
|
||||
result = reduceResult;
|
||||
break;
|
||||
} else {
|
||||
// Remove all values which have been reduced.
|
||||
_values.resize(numLeft);
|
||||
// Include most recent result in the set of values to be reduced.
|
||||
_values.push_back(reduceResult);
|
||||
}
|
||||
}
|
||||
|
||||
// If we're merging after this, wrap the value in the same format it was inserted in.
|
||||
if (toBeMerged) {
|
||||
MutableDocument output;
|
||||
output.addField("k", _key);
|
||||
output.addField("v", val);
|
||||
output.addField("v", result);
|
||||
return Value(output.freeze());
|
||||
} else {
|
||||
return val;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user