mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-54821 Change window function syntax
This commit is contained in:
parent
1f8978300d
commit
c7f704e3f5
@ -71,7 +71,7 @@ function testAccumAgainstGroup(coll, accum) {
|
||||
$setWindowFields: {
|
||||
partitionBy: partition,
|
||||
sortBy: {_id: 1},
|
||||
output: {res: {[accum]: {input: "$price", documents: bounds}}}
|
||||
output: {res: {[accum]: "$price", window: {documents: bounds}}}
|
||||
},
|
||||
},
|
||||
])
|
||||
@ -99,6 +99,7 @@ function testAccumAgainstGroup(coll, accum) {
|
||||
upper: upperBound
|
||||
});
|
||||
}
|
||||
|
||||
assert.eq(groupRes,
|
||||
wfResults[index].res,
|
||||
"Window function result for index " + index + ": " + tojson(wfRes));
|
||||
|
@ -32,8 +32,8 @@ let results =
|
||||
sortBy: {_id: 1},
|
||||
partitionBy: "$ticker",
|
||||
output: {
|
||||
runningAvg: {$avg: {input: "$price", documents: ["unbounded", "current"]}},
|
||||
runningAvgLead: {$avg: {input: "$price", documents: ["unbounded", 3]}},
|
||||
runningAvg: {$avg: "$price", window: {documents: ["unbounded", "current"]}},
|
||||
runningAvgLead: {$avg: "$price", window: {documents: ["unbounded", 3]}},
|
||||
}
|
||||
},
|
||||
},
|
||||
|
@ -19,20 +19,21 @@ coll.drop();
|
||||
assert.commandWorked(coll.insert({_id: 0, ticker: "MDB", price: 1000, ts: new Date()}));
|
||||
|
||||
// Over a non-existent field.
|
||||
let wfSpec = {input: "$noField", documents: ["unbounded", "current"]};
|
||||
let results = coll.aggregate([{
|
||||
$setWindowFields: {
|
||||
sortBy: {ts: 1},
|
||||
output: {
|
||||
defaultStdDev: {$stdDevPop: wfSpec},
|
||||
defaultSum: {$sum: wfSpec},
|
||||
defaultAvg: {$avg: wfSpec},
|
||||
defaultMin: {$min: wfSpec},
|
||||
defaultMax: {$max: wfSpec},
|
||||
}
|
||||
}
|
||||
}])
|
||||
.toArray();
|
||||
let results =
|
||||
coll.aggregate([{
|
||||
$setWindowFields: {
|
||||
sortBy: {ts: 1},
|
||||
output: {
|
||||
defaultStdDev:
|
||||
{$stdDevPop: "$noField", window: {documents: ["unbounded", "current"]}},
|
||||
defaultSum: {$sum: "$noField", window: {documents: ["unbounded", "current"]}},
|
||||
defaultAvg: {$avg: "$noField", window: {documents: ["unbounded", "current"]}},
|
||||
defaultMin: {$min: "$noField", window: {documents: ["unbounded", "current"]}},
|
||||
defaultMax: {$max: "$noField", window: {documents: ["unbounded", "current"]}},
|
||||
}
|
||||
}
|
||||
}])
|
||||
.toArray();
|
||||
assert.eq(null, results[0].defaultStdDev);
|
||||
assert.eq(null, results[0].defaultAvg);
|
||||
assert.eq(null, results[0].defaultMin);
|
||||
@ -41,23 +42,20 @@ assert.eq(null, results[0].defaultMax);
|
||||
assert.eq(0, results[0].defaultSum);
|
||||
|
||||
// Over a window with no documents.
|
||||
wfSpec = {
|
||||
input: "$price",
|
||||
documents: ["unbounded", -1]
|
||||
};
|
||||
results = coll.aggregate([{
|
||||
$setWindowFields: {
|
||||
sortBy: {ts: 1},
|
||||
output: {
|
||||
defaultStdDev: {$stdDevPop: wfSpec},
|
||||
defaultSum: {$sum: wfSpec},
|
||||
defaultAvg: {$avg: wfSpec},
|
||||
defaultMin: {$min: wfSpec},
|
||||
defaultMax: {$max: wfSpec},
|
||||
}
|
||||
}
|
||||
}])
|
||||
.toArray();
|
||||
results =
|
||||
coll.aggregate([{
|
||||
$setWindowFields: {
|
||||
sortBy: {ts: 1},
|
||||
output: {
|
||||
defaultStdDev: {$stdDevPop: "$price", window: {documents: ["unbounded", -1]}},
|
||||
defaultSum: {$sum: "$price", window: {documents: ["unbounded", -1]}},
|
||||
defaultAvg: {$avg: "$price", window: {documents: ["unbounded", -1]}},
|
||||
defaultMin: {$min: "$price", window: {documents: ["unbounded", -1]}},
|
||||
defaultMax: {$max: "$price", window: {documents: ["unbounded", -1]}},
|
||||
}
|
||||
}
|
||||
}])
|
||||
.toArray();
|
||||
assert.eq(null, results[0].defaultStdDev);
|
||||
assert.eq(null, results[0].defaultAvg);
|
||||
assert.eq(null, results[0].defaultMin);
|
||||
|
@ -36,8 +36,8 @@ let results =
|
||||
$setWindowFields: {
|
||||
sortBy: {ts: 1},
|
||||
output: {
|
||||
minStr: {$min: {input: "$str", documents: ["unbounded", "current"]}},
|
||||
maxStr: {$max: {input: "$str", documents: ["unbounded", "current"]}},
|
||||
minStr: {$min: "$str", window: {documents: ["unbounded", "current"]}},
|
||||
maxStr: {$max: "$str", window: {documents: ["unbounded", "current"]}},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -57,14 +57,14 @@ assert.commandWorked(run({
|
||||
|
||||
function runSum(spec) {
|
||||
// Include a single-field sortBy in this helper to allow all kinds of bounds.
|
||||
return run({$setWindowFields: {sortBy: {ts: 1}, output: {v: {$sum: spec}}}});
|
||||
return run({$setWindowFields: {sortBy: {ts: 1}, output: {v: spec}}});
|
||||
}
|
||||
|
||||
// The most basic case: $sum everything.
|
||||
assert.commandWorked(runSum({input: "$a"}));
|
||||
assert.commandWorked(runSum({$sum: "$a"}));
|
||||
|
||||
// That's equivalent to bounds of [unbounded, unbounded].
|
||||
assert.commandWorked(runSum({input: "$a", documents: ['unbounded', 'unbounded']}));
|
||||
assert.commandWorked(runSum({$sum: "$a", window: {documents: ['unbounded', 'unbounded']}}));
|
||||
|
||||
// Extra arguments to a window function are rejected.
|
||||
assert.commandFailedWithCode(runSum({abcde: 1}),
|
||||
@ -72,38 +72,41 @@ assert.commandFailedWithCode(runSum({abcde: 1}),
|
||||
'Window function $sum found an unknown argument: abcde');
|
||||
|
||||
// Bounds can be bounded, or bounded on one side.
|
||||
assert.commandFailedWithCode(runSum({input: "$a", documents: [-2, +4]}), 5461500);
|
||||
assert.commandFailedWithCode(runSum({input: "$a", documents: [-3, 'unbounded']}), 5461500);
|
||||
assert.commandWorked(runSum({input: "$a", documents: ['unbounded', +5]}));
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {documents: [-2, +4]}}), 5461500);
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {documents: [-3, 'unbounded']}}),
|
||||
5461500);
|
||||
assert.commandWorked(runSum({"$sum": "$a", window: {documents: ['unbounded', +5]}}));
|
||||
|
||||
// Range-based bounds:
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: ['unbounded', 'unbounded']}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: [-2, +4]}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: [-3, 'unbounded']}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: ['unbounded', +5]}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: [NumberDecimal('1.42'), NumberLong(5)]}),
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {range: ['unbounded', 'unbounded']}}),
|
||||
5397901);
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {range: [-2, +4]}}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {range: [-3, 'unbounded']}}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {range: ['unbounded', +5]}}), 5397901);
|
||||
assert.commandFailedWithCode(
|
||||
runSum({"$sum": "$a", window: {range: [NumberDecimal('1.42'), NumberLong(5)]}}), 5397901);
|
||||
|
||||
// Time-based bounds:
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: [-3, 'unbounded'], unit: 'hour'}),
|
||||
5397902);
|
||||
assert.commandFailedWithCode(
|
||||
runSum({"$sum": "$a", window: {range: [-3, 'unbounded'], unit: 'hour'}}), 5397902);
|
||||
|
||||
// Numeric bounds can be a constant expression:
|
||||
let expr = {$add: [2, 2]};
|
||||
assert.commandFailedWithCode(runSum({input: "$a", documents: [expr, expr]}), 5461500);
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: [expr, expr]}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: [expr, expr], unit: 'hour'}), 5397902);
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {documents: [expr, expr]}}), 5461500);
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {range: [expr, expr]}}), 5397901);
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {range: [expr, expr], unit: 'hour'}}),
|
||||
5397902);
|
||||
// But 'current' and 'unbounded' are not expressions: they're more like keywords.
|
||||
assert.commandFailedWithCode(runSum({input: "$a", documents: [{$const: 'current'}, 3]}),
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", window: {documents: [{$const: 'current'}, 3]}}),
|
||||
ErrorCodes.FailedToParse,
|
||||
'Numeric document-based bounds must be an integer');
|
||||
assert.commandFailedWithCode(runSum({input: "$a", range: [{$const: 'current'}, 3]}),
|
||||
assert.commandFailedWithCode(runSum({"$sum": "$a", range: [{$const: 'current'}, 3]}),
|
||||
ErrorCodes.FailedToParse,
|
||||
'Range-based bounds expression must be a number');
|
||||
|
||||
// Bounds must not be backwards.
|
||||
function badBounds(bounds) {
|
||||
assert.commandFailedWithCode(runSum(Object.merge({input: "$a"}, bounds)),
|
||||
assert.commandFailedWithCode(runSum(Object.merge({"$sum": "$a"}, {window: bounds})),
|
||||
5339900,
|
||||
'Lower bound must not exceed upper bound');
|
||||
}
|
||||
@ -122,38 +125,39 @@ badBounds({range: [+1, 'current'], unit: 'day'});
|
||||
// Any bound besides [unbounded, unbounded] requires a sort:
|
||||
// - document-based
|
||||
assert.commandWorked(run({
|
||||
$setWindowFields: {output: {v: {$sum: {input: "$a", documents: ['unbounded', 'unbounded']}}}}
|
||||
$setWindowFields:
|
||||
{output: {v: {$sum: "$a", window: {documents: ['unbounded', 'unbounded']}}}}
|
||||
}));
|
||||
assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields:
|
||||
{output: {v: {$sum: {input: "$a", documents: ['unbounded', 'current']}}}}
|
||||
{output: {v: {$sum: "$a", window: {documents: ['unbounded', 'current']}}}}
|
||||
}),
|
||||
5339901,
|
||||
'Document-based bounds require a sortBy');
|
||||
// - range-based
|
||||
assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields: {output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded']}}}}
|
||||
$setWindowFields: {output: {v: {$sum: "$a", window: {range: ['unbounded', 'unbounded']}}}}
|
||||
}),
|
||||
5397901);
|
||||
assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields: {
|
||||
sortBy: {a: 1, b: 1},
|
||||
output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded']}}}
|
||||
output: {v: {$sum: "$a", window: {range: ['unbounded', 'unbounded']}}}
|
||||
}
|
||||
}),
|
||||
5397901);
|
||||
assert.commandFailedWithCode(
|
||||
run({$setWindowFields: {output: {v: {$sum: {input: "$a", range: ['unbounded', 'current']}}}}}),
|
||||
run({$setWindowFields: {output: {v: {$sum: "$a", window: {range: ['unbounded', 'current']}}}}}),
|
||||
5339902,
|
||||
'Range-based bounds require a sortBy a single field');
|
||||
assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields: {
|
||||
sortBy: {a: 1, b: 1},
|
||||
output: {v: {$sum: {input: "$a", range: ['unbounded', 'current']}}}
|
||||
output: {v: {$sum: "$a", window: {range: ['unbounded', 'current']}}}
|
||||
}
|
||||
}),
|
||||
5339902,
|
||||
@ -162,21 +166,21 @@ assert.commandFailedWithCode(
|
||||
assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields:
|
||||
{output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded'], unit: 'second'}}}}
|
||||
{output: {v: {$sum: "$a", window: {range: ['unbounded', 'unbounded'], unit: 'second'}}}}
|
||||
}),
|
||||
5397902);
|
||||
assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields: {
|
||||
sortBy: {a: 1, b: 1},
|
||||
output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded'], unit: 'second'}}}
|
||||
output: {v: {$sum: "$a", window: {range: ['unbounded', 'unbounded'], unit: 'second'}}}
|
||||
}
|
||||
}),
|
||||
5397902);
|
||||
assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields:
|
||||
{output: {v: {$sum: {input: "$a", range: ['unbounded', 'current'], unit: 'second'}}}}
|
||||
{output: {v: {$sum: "$a", window: {range: ['unbounded', 'current'], unit: 'second'}}}}
|
||||
}),
|
||||
5339902,
|
||||
'Range-based bounds require sortBy a single field');
|
||||
@ -184,7 +188,7 @@ assert.commandFailedWithCode(
|
||||
run({
|
||||
$setWindowFields: {
|
||||
sortBy: {a: 1, b: 1},
|
||||
output: {v: {$sum: {input: "$a", range: ['unbounded', 'current'], unit: 'second'}}}
|
||||
output: {v: {$sum: "$a", window: {range: ['unbounded', 'current'], unit: 'second'}}}
|
||||
}
|
||||
}),
|
||||
5339902,
|
||||
@ -194,29 +198,29 @@ assert.commandFailedWithCode(
|
||||
assert.commandWorked(run({
|
||||
$setWindowFields:
|
||||
{sortBy: {ts: 1},
|
||||
output: {v: {$sum: {input: "$a", documents: ['unbounded', 'current']}}}}
|
||||
output: {v: {$sum: "$a", window: {documents: ['unbounded', 'current']}}}}
|
||||
}));
|
||||
assert.commandWorked(run({
|
||||
$setWindowFields:
|
||||
{sortBy: {ts: 1},
|
||||
output: {v: {$avg: {input: "$a", documents: ['unbounded', 'current']}}}}
|
||||
output: {v: {$avg: "$a", window: {documents: ['unbounded', 'current']}}}}
|
||||
}));
|
||||
assert.commandWorked(run({
|
||||
$setWindowFields:
|
||||
{sortBy: {ts: 1},
|
||||
output: {v: {$max: {input: "$a", documents: ['unbounded', 'current']}}}}
|
||||
output: {v: {$max: "$a", window: {documents: ['unbounded', 'current']}}}}
|
||||
}));
|
||||
assert.commandWorked(run({
|
||||
$setWindowFields:
|
||||
{sortBy: {ts: 1},
|
||||
output: {v: {$min: {input: "$a", documents: ['unbounded', 'current']}}}}
|
||||
output: {v: {$min: "$a", window: {documents: ['unbounded', 'current']}}}}
|
||||
}));
|
||||
|
||||
// Not every accumulator is automatically a window function.
|
||||
assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$mergeObjects: {input: "$a"}}}}}),
|
||||
assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$mergeObjects: "$a"}}}}),
|
||||
ErrorCodes.FailedToParse,
|
||||
'No such window function: $mergeObjects');
|
||||
assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$accumulator: {input: "$a"}}}}}),
|
||||
assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$accumulator: "$a"}}}}),
|
||||
ErrorCodes.FailedToParse,
|
||||
'No such window function: $accumulator');
|
||||
})();
|
||||
|
@ -35,9 +35,9 @@ let results =
|
||||
sortBy: {ts: 1},
|
||||
output: {
|
||||
stdDevPop:
|
||||
{$stdDevPop: {input: "$str", documents: ["unbounded", "current"]}},
|
||||
{$stdDevPop: "$str", window: {documents: ["unbounded", "current"]}},
|
||||
stdDevSamp:
|
||||
{$stdDevSamp: {input: "$str", documents: ["unbounded", "current"]}},
|
||||
{$stdDevSamp: "$str", window: {documents: ["unbounded", "current"]}},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -52,16 +52,17 @@ const sortStage = {
|
||||
};
|
||||
|
||||
// Test using $sum to count.
|
||||
let result = coll.aggregate([
|
||||
sortStage,
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {a: {$sum: {input: 1, documents: ["unbounded", "current"]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
.toArray();
|
||||
let result =
|
||||
coll.aggregate([
|
||||
sortStage,
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {a: {$sum: 1, window: {documents: ["unbounded", "current"]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
.toArray();
|
||||
verifyResults(result, function(num, baseObj) {
|
||||
baseObj.a = num + 1;
|
||||
return baseObj;
|
||||
@ -74,7 +75,7 @@ result =
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {a: {$sum: {input: "$one", documents: ["unbounded", "current"]}}}
|
||||
output: {a: {$sum: "$one", window: {documents: ["unbounded", "current"]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
@ -91,8 +92,8 @@ result = coll.aggregate([
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {
|
||||
a: {$sum: {input: "$one", documents: ["unbounded", "current"]}},
|
||||
b: {$sum: {input: "$two", documents: ["unbounded", "current"]}}
|
||||
a: {$sum: "$one", window: {documents: ["unbounded", "current"]}},
|
||||
b: {$sum: "$two", window: {documents: ["unbounded", "current"]}}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -111,7 +112,7 @@ result =
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {one: {$sum: {input: "$one", documents: ["unbounded", "current"]}}}
|
||||
output: {one: {$sum: "$one", window: {documents: ["unbounded", "current"]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
@ -128,9 +129,9 @@ result =
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {
|
||||
"docArr.a": {$sum: {input: "$one", documents: ["unbounded", "current"]}}
|
||||
}
|
||||
output:
|
||||
{"docArr.a":
|
||||
{$sum: "$one", window: {documents: ["unbounded", "current"]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
@ -149,10 +150,10 @@ result =
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {
|
||||
"docArr.1": {$sum: {input: "$one", documents: ["unbounded", "current"]}},
|
||||
"docArr.2": {$sum: {input: "$one", documents: ["unbounded", "current"]}},
|
||||
"docArr.1": {$sum: "$one", window: {documents: ["unbounded", "current"]}},
|
||||
"docArr.2": {$sum: "$one", window: {documents: ["unbounded", "current"]}},
|
||||
"simpleArr.1":
|
||||
{$sum: {input: "$one", documents: ["unbounded", "current"]}}
|
||||
{$sum: "$one", window: {documents: ["unbounded", "current"]}}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -175,7 +176,8 @@ result =
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {"a.b": {$sum: {input: "$one", documents: ["unbounded", "current"]}}}
|
||||
output:
|
||||
{"a.b": {$sum: "$one", window: {documents: ["unbounded", "current"]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
@ -193,12 +195,12 @@ result =
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {
|
||||
"a": {$sum: {input: "$one", documents: ["unbounded", "current"]}},
|
||||
"newField.a": {$sum: {input: "$two", documents: ["unbounded", "current"]}},
|
||||
"a": {$sum: "$one", window: {documents: ["unbounded", "current"]}},
|
||||
"newField.a": {$sum: "$two", window: {documents: ["unbounded", "current"]}},
|
||||
"simpleArr.0.b":
|
||||
{$sum: {input: "$one", documents: ["unbounded", "current"]}},
|
||||
{$sum: "$one", window: {documents: ["unbounded", "current"]}},
|
||||
"nestedDoc.1.2.a":
|
||||
{$sum: {input: "$one", documents: ["unbounded", "current"]}}
|
||||
{$sum: "$one", window: {documents: ["unbounded", "current"]}}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -219,7 +221,7 @@ result = coll.aggregate([
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {one: {$sum: {input: "$one", documents: ["unbounded", 1]}}}
|
||||
output: {one: {$sum: "$one", window: {documents: ["unbounded", 1]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
@ -237,7 +239,7 @@ result = coll.aggregate([
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {one: 1},
|
||||
output: {one: {$sum: {input: "$one", documents: ["unbounded", -1]}}}
|
||||
output: {one: {$sum: "$one", window: {documents: ["unbounded", -1]}}}
|
||||
}
|
||||
}
|
||||
])
|
||||
@ -257,7 +259,7 @@ result = coll.aggregate([
|
||||
sortBy: {one: 1},
|
||||
output: {
|
||||
mixedTypeSum:
|
||||
{$sum: {input: "$mixed", documents: ["unbounded", "current"]}}
|
||||
{$sum: "$mixed", window: {documents: ["unbounded", "current"]}}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -99,12 +99,11 @@ WindowFunctionStatement WindowFunctionStatement::parse(BSONElement elem,
|
||||
ExpressionContext* expCtx) {
|
||||
// 'elem' is a statement like 'v: {$sum: {...}}', whereas the expression is '$sum: {...}'.
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "The field '" << elem.fieldName()
|
||||
<< "' must be a window-function object",
|
||||
elem.type() == BSONType::Object && elem.Obj().nFields() == 1);
|
||||
str::stream() << "The field '" << elem.fieldName() << "' must be an object",
|
||||
elem.type() == BSONType::Object);
|
||||
return WindowFunctionStatement(
|
||||
elem.fieldName(),
|
||||
window_function::Expression::parse(elem.Obj().firstElement(), sortBy, expCtx));
|
||||
window_function::Expression::parse(elem.embeddedObject(), sortBy, expCtx));
|
||||
}
|
||||
void WindowFunctionStatement::serialize(MutableDocument& outputFields,
|
||||
boost::optional<ExplainOptions::Verbosity> explain) const {
|
||||
|
@ -81,10 +81,30 @@ TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseInvalidArgumentTypes) {
|
||||
40415);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseIfArgumentsAreRepeated) {
|
||||
auto spec = fromjson(R"(
|
||||
{$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum:
|
||||
{$sum: '$pop', $max: '$pop', window: {documents: [-10, 0]}}}}})");
|
||||
ASSERT_THROWS_CODE(
|
||||
DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
|
||||
AssertionException,
|
||||
ErrorCodes::FailedToParse);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseIfWindowFieldHasExtraArgument) {
|
||||
auto spec = fromjson(R"(
|
||||
{$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum:
|
||||
{$sum: '$pop', window: {documents: [0, 10], document: [0,8]} }}}})");
|
||||
ASSERT_THROWS_CODE(
|
||||
DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
|
||||
AssertionException,
|
||||
ErrorCodes::FailedToParse);
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceSetWindowFieldsTest, SuccessfullyParsesAndReserializes) {
|
||||
auto spec = fromjson(R"(
|
||||
{$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum:
|
||||
{input: '$pop', documents: [-10, 0]}}}}})");
|
||||
{$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum:
|
||||
{$sum: '$pop', window: {documents: [-10, 0]}}}}})");
|
||||
auto parsedStage =
|
||||
DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx());
|
||||
std::vector<Value> serializedArray;
|
||||
@ -94,8 +114,8 @@ TEST_F(DocumentSourceSetWindowFieldsTest, SuccessfullyParsesAndReserializes) {
|
||||
|
||||
TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseIfFeatureFlagDisabled) {
|
||||
auto spec = fromjson(R"(
|
||||
{$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum:
|
||||
{input: '$pop', documents: [-10, 0]}}}}})");
|
||||
{$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum:
|
||||
{$sum: '$pop', window: {documents: [-10, 0]}}}}})");
|
||||
// By default, the unit test will have the feature flag disabled.
|
||||
ASSERT_THROWS_CODE(
|
||||
Pipeline::parse(std::vector<BSONObj>({spec}), getExpCtx()), AssertionException, 16436);
|
||||
|
@ -133,7 +133,12 @@ WindowBounds WindowBounds::parse(BSONObj args,
|
||||
}
|
||||
|
||||
if (!range && !documents) {
|
||||
return WindowBounds{DocumentBased{Unbounded{}, Unbounded{}}};
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "'window' field can only contain '" << kArgDocuments
|
||||
<< "' as the only argument or '" << kArgRange
|
||||
<< "' with an optional '" << kArgUnit << "' field",
|
||||
args.nFields() == 0);
|
||||
return defaultBounds();
|
||||
}
|
||||
|
||||
auto unpack = [](BSONElement e) -> std::pair<BSONElement, BSONElement> {
|
||||
@ -144,7 +149,12 @@ WindowBounds WindowBounds::parse(BSONObj args,
|
||||
auto upper = e.Obj()[1];
|
||||
return {lower, upper};
|
||||
};
|
||||
|
||||
if (documents) {
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "'window' field that specifies " << kArgDocuments
|
||||
<< " cannot have other fields",
|
||||
args.nFields() == 1);
|
||||
// Parse document-based bounds.
|
||||
auto [lowerElem, upperElem] = unpack(documents);
|
||||
|
||||
@ -165,6 +175,10 @@ WindowBounds WindowBounds::parse(BSONObj args,
|
||||
WindowBounds bounds;
|
||||
if (unit) {
|
||||
// Parse time-based bounds (range-based, with a unit).
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "'window' field that specifies " << kArgUnit
|
||||
<< " cannot have other fields besides 'range'",
|
||||
args.nFields() == 2);
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "'" << kArgUnit << "' must be a string",
|
||||
unit.type() == BSONType::String);
|
||||
@ -182,6 +196,10 @@ WindowBounds WindowBounds::parse(BSONObj args,
|
||||
bounds = WindowBounds{TimeBased{lower, upper, parseTimeUnit(unit.str())}};
|
||||
} else {
|
||||
// Parse range-based bounds.
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "'window' field that specifies " << kArgRange
|
||||
<< " cannot have other fields besides 'unit'",
|
||||
args.nFields() == 1);
|
||||
auto parseNumber = [](Value v) -> Value {
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
"Range-based bounds expression must be a number",
|
||||
|
@ -89,6 +89,10 @@ struct WindowBounds {
|
||||
|
||||
stdx::variant<DocumentBased, RangeBased, TimeBased> bounds;
|
||||
|
||||
static WindowBounds defaultBounds() {
|
||||
return WindowBounds{DocumentBased{Unbounded{}, Unbounded{}}};
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if these bounds are unbounded on both ends.
|
||||
* This case is special because it means you don't need a sortBy to interpret the bounds:
|
||||
@ -102,11 +106,11 @@ struct WindowBounds {
|
||||
*
|
||||
* {$setWindowFields: {
|
||||
* output: {
|
||||
* v: {$sum: {input: "$x", range: [-1, +1], unit: 'seconds'}},
|
||||
* v: {$sum: "$x", window: {range: [-1, +1], unit: 'seconds'}},
|
||||
* }
|
||||
* }}
|
||||
*
|
||||
* 'args' would be {input: "$x", range: [-1, +1], unit: 'seconds'}.
|
||||
* 'args' would be {range: [-1, +1], unit: 'seconds'}.
|
||||
*
|
||||
* If the BSON doesn't specify bounds, we default to:
|
||||
*
|
||||
|
@ -46,17 +46,19 @@ namespace mongo::window_function {
|
||||
|
||||
StringMap<Expression::Parser> Expression::parserMap;
|
||||
|
||||
intrusive_ptr<Expression> Expression::parse(BSONElement elem,
|
||||
intrusive_ptr<Expression> Expression::parse(BSONObj obj,
|
||||
const optional<SortPattern>& sortBy,
|
||||
ExpressionContext* expCtx) {
|
||||
auto parser = parserMap.find(elem.fieldNameStringData());
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "No such window function: " << elem.fieldName(),
|
||||
parser != parserMap.end());
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "Window function " << elem.fieldName() << " requires an object.",
|
||||
elem.type() == BSONType::Object);
|
||||
return parser->second(elem, sortBy, expCtx);
|
||||
boost::optional<Expression::Parser> parser;
|
||||
for (const auto& field : obj) {
|
||||
// Found one valid window function. If there are multiple window functions they will be
|
||||
// caught as invalid arguments to the Expression parser later.
|
||||
auto parser = parserMap.find(field.fieldNameStringData());
|
||||
if (parser != parserMap.end()) {
|
||||
return parser->second(obj, sortBy, expCtx);
|
||||
}
|
||||
}
|
||||
uasserted(ErrorCodes::FailedToParse, "Unrecognized window function");
|
||||
}
|
||||
|
||||
void Expression::registerParser(std::string functionName, Parser parser) {
|
||||
|
@ -58,38 +58,39 @@ namespace mongo::window_function {
|
||||
*
|
||||
* {$setWindowFields: {
|
||||
* output: {
|
||||
* totalCost: {$sum: {input: "$price"}},
|
||||
* totalCost: {$sum: "$price"},
|
||||
* numItems: {$count: {}},
|
||||
* }
|
||||
* }}
|
||||
*
|
||||
* the two window-function expressions are {$sum: {input: "$price"}} and {$count: {}}.
|
||||
* the two window-function expressions are {$sum: "$price"} and {$count: {}}.
|
||||
*
|
||||
* Because this class is part of a syntax tree, it does not hold any execution state:
|
||||
* instead it lets you create new instances of a window-function state.
|
||||
*/
|
||||
class Expression : public RefCountable {
|
||||
public:
|
||||
static constexpr StringData kWindowArg = "window"_sd;
|
||||
/**
|
||||
* Parses a single window-function expression. The BSONElement's key is the function name,
|
||||
* and the value is the spec: for example, the whole BSONElement might be '$sum: {input:
|
||||
* "$x"}'.
|
||||
* Parses a single window-function expression. One of the BSONObj's keys is the function
|
||||
* name, and the other (optional) key is 'window': for example, the whole BSONObj might be
|
||||
* {$sum: "$x"} or {$sum: "$x", window: {documents: [2,3]}}.
|
||||
*
|
||||
* 'sortBy' is from the sortBy argument of $setWindowFields. Some window functions require
|
||||
* a sort spec, or require a one-field sort spec; they use this argument to enforce those
|
||||
* requirements.
|
||||
*
|
||||
* If the window function accepts bounds, parse() parses them, just like other arguments
|
||||
* such as 'input' or 'default'. For window functions like $rank, which don't accept bounds,
|
||||
* parse() is responsible for throwing a parse error, just like other unexpected arguments.
|
||||
* If the window function accepts bounds, parse() parses them, from the window field. For window
|
||||
* functions like $rank, which don't accept bounds, parse() is responsible for throwing a parse
|
||||
* error, just like other unexpected arguments.
|
||||
*/
|
||||
static boost::intrusive_ptr<Expression> parse(BSONElement elem,
|
||||
static boost::intrusive_ptr<Expression> parse(BSONObj obj,
|
||||
const boost::optional<SortPattern>& sortBy,
|
||||
ExpressionContext* expCtx);
|
||||
|
||||
/**
|
||||
* A Parser has the same signature as parse(). The BSONElement is the whole expression, such
|
||||
* as '$sum: {input: "$x"}', because some parsers need to switch on the function name.
|
||||
* A Parser has the same signature as parse(). The BSONObj is the whole expression, as
|
||||
* described above, because some parsers need to switch on the function name.
|
||||
*/
|
||||
using Parser = std::function<decltype(parse)>;
|
||||
static void registerParser(std::string functionName, Parser parser);
|
||||
@ -119,55 +120,60 @@ public:
|
||||
|
||||
virtual std::unique_ptr<WindowFunctionState> buildRemovable() const = 0;
|
||||
|
||||
virtual Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
|
||||
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
|
||||
MutableDocument args;
|
||||
|
||||
args["input"] = _input->serialize(static_cast<bool>(explain));
|
||||
_bounds.serialize(args);
|
||||
|
||||
return Value{Document{
|
||||
{_accumulatorName, args.freezeToValue()},
|
||||
}};
|
||||
args[_accumulatorName] = _input->serialize(static_cast<bool>(explain));
|
||||
MutableDocument windowField;
|
||||
_bounds.serialize(windowField);
|
||||
args[kWindowArg] = windowField.freezeToValue();
|
||||
return args.freezeToValue();
|
||||
}
|
||||
|
||||
|
||||
protected:
|
||||
/*
|
||||
* Used during parsing to reject extra arguments in window function specifications.
|
||||
*/
|
||||
static void checkAllowedFields(BSONElement elem) {
|
||||
static const StringSet allowedFields = {"input", "documents", "range", "unit"};
|
||||
for (auto&& arg : elem.embeddedObject()) {
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
str::stream() << "Window function found an unknown argument: "
|
||||
<< arg.fieldNameStringData(),
|
||||
allowedFields.find(arg.fieldNameStringData()) != allowedFields.end());
|
||||
}
|
||||
}
|
||||
ExpressionContext* _expCtx;
|
||||
std::string _accumulatorName;
|
||||
boost::intrusive_ptr<::mongo::Expression> _input;
|
||||
WindowBounds _bounds;
|
||||
|
||||
private:
|
||||
static StringMap<Parser> parserMap;
|
||||
};
|
||||
|
||||
template <typename NonRemovableType>
|
||||
class ExpressionFromAccumulator : public Expression {
|
||||
public:
|
||||
static boost::intrusive_ptr<Expression> parse(BSONElement elem,
|
||||
static boost::intrusive_ptr<Expression> parse(BSONObj obj,
|
||||
const boost::optional<SortPattern>& sortBy,
|
||||
ExpressionContext* expCtx) {
|
||||
// 'elem' is something like '$sum: {input: E, ...}'
|
||||
std::string accumulatorName = elem.fieldName();
|
||||
boost::intrusive_ptr<::mongo::Expression> input = ::mongo::Expression::parseOperand(
|
||||
expCtx, elem.embeddedObject()["input"], expCtx->variablesParseState);
|
||||
auto bounds = WindowBounds::parse(elem.Obj(), sortBy, expCtx);
|
||||
// Reject extra arguments.
|
||||
checkAllowedFields(elem);
|
||||
// 'obj' is something like '{$func: <args>, window: {...}}'
|
||||
boost::optional<StringData> accumulatorName;
|
||||
WindowBounds bounds = WindowBounds::defaultBounds();
|
||||
boost::intrusive_ptr<::mongo::Expression> input;
|
||||
for (const auto& arg : obj) {
|
||||
auto argName = arg.fieldNameStringData();
|
||||
if (argName == kWindowArg) {
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
"'window' field must be an object",
|
||||
arg.type() == BSONType::Object);
|
||||
bounds = WindowBounds::parse(arg.embeddedObject(), sortBy, expCtx);
|
||||
} else if (parserMap.find(argName) != parserMap.end()) {
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
"Cannot specify two functions in window function spec",
|
||||
!accumulatorName);
|
||||
accumulatorName = argName;
|
||||
input = ::mongo::Expression::parseOperand(expCtx, arg, expCtx->variablesParseState);
|
||||
} else {
|
||||
uasserted(ErrorCodes::FailedToParse,
|
||||
str::stream()
|
||||
<< "Window function found an unknown argument: " << argName);
|
||||
}
|
||||
}
|
||||
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
"Must specify a window function in output field",
|
||||
accumulatorName);
|
||||
return make_intrusive<ExpressionFromAccumulator<NonRemovableType>>(
|
||||
expCtx, std::move(accumulatorName), std::move(input), std::move(bounds));
|
||||
expCtx, accumulatorName->toString(), std::move(input), std::move(bounds));
|
||||
}
|
||||
|
||||
boost::intrusive_ptr<AccumulatorState> buildAccumulatorOnly() const final {
|
||||
@ -189,18 +195,38 @@ public:
|
||||
template <typename NonRemovableType, typename RemovableType>
|
||||
class ExpressionRemovable : public Expression {
|
||||
public:
|
||||
static boost::intrusive_ptr<Expression> parse(BSONElement elem,
|
||||
static boost::intrusive_ptr<Expression> parse(BSONObj obj,
|
||||
const boost::optional<SortPattern>& sortBy,
|
||||
ExpressionContext* expCtx) {
|
||||
// 'elem' is something like '$sum: {input: E, ...}'
|
||||
std::string accumulatorName = elem.fieldName();
|
||||
boost::intrusive_ptr<::mongo::Expression> input = ::mongo::Expression::parseOperand(
|
||||
expCtx, elem.embeddedObject()["input"], expCtx->variablesParseState);
|
||||
auto bounds = WindowBounds::parse(elem.Obj(), sortBy, expCtx);
|
||||
// Reject extra arguments.
|
||||
checkAllowedFields(elem);
|
||||
// 'obj' is something like '{$func: <args>, window: {...}}'
|
||||
boost::optional<StringData> accumulatorName;
|
||||
WindowBounds bounds = WindowBounds::defaultBounds();
|
||||
boost::intrusive_ptr<::mongo::Expression> input;
|
||||
for (const auto& arg : obj) {
|
||||
auto argName = arg.fieldNameStringData();
|
||||
if (argName == kWindowArg) {
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
"'window' field must be an object",
|
||||
obj[kWindowArg].type() == BSONType::Object);
|
||||
bounds = WindowBounds::parse(arg.embeddedObject(), sortBy, expCtx);
|
||||
} else if (parserMap.find(argName) != parserMap.end()) {
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
"Cannot specify two functions in window function spec",
|
||||
!accumulatorName);
|
||||
accumulatorName = argName;
|
||||
input = ::mongo::Expression::parseOperand(expCtx, arg, expCtx->variablesParseState);
|
||||
} else {
|
||||
uasserted(ErrorCodes::FailedToParse,
|
||||
str::stream()
|
||||
<< "Window function found an unknown argument: " << argName);
|
||||
}
|
||||
}
|
||||
|
||||
uassert(ErrorCodes::FailedToParse,
|
||||
"Must specify a window function in output field",
|
||||
accumulatorName);
|
||||
return make_intrusive<ExpressionRemovable<NonRemovableType, RemovableType>>(
|
||||
expCtx, std::move(accumulatorName), std::move(input), std::move(bounds));
|
||||
expCtx, accumulatorName->toString(), std::move(input), std::move(bounds));
|
||||
}
|
||||
|
||||
ExpressionRemovable(ExpressionContext* expCtx,
|
||||
|
Loading…
Reference in New Issue
Block a user