mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
224 lines
8.7 KiB
JavaScript
224 lines
8.7 KiB
JavaScript
// Test change streams related shell helpers and options passed to them. Note that, while we only
|
|
// call the DBCollection.watch helper in this file, it will be redirected to the DB.watch or
|
|
// Mongo.watch equivalents in the whole_db and whole_cluster passthroughs.
|
|
//
|
|
// This test expects each change stream result to have an operationTime based on the clusterTime in
|
|
// the oplog entry. When operations get bundled into a transaction, their operationTime is instead
|
|
// based on the commit oplog entry, which would cause this test to fail.
|
|
// @tags: [change_stream_does_not_expect_txns]
|
|
(function() {
|
|
"use strict";
|
|
|
|
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
|
|
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
|
|
load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp.
|
|
|
|
const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper");
|
|
|
|
assert.commandWorked(db.adminCommand({"setParameter": 1, "logLevel": 5}));
|
|
|
|
function checkNextChange(cursor, expected) {
|
|
assert.soon(() => cursor.hasNext());
|
|
const nextObj = cursor.next();
|
|
assertChangeStreamEventEq(nextObj, expected);
|
|
return nextObj;
|
|
}
|
|
|
|
function testCommandIsCalled(testFunc, checkFunc) {
|
|
const mongoRunCommandOriginal = Mongo.prototype.runCommand;
|
|
|
|
const sentinel = {};
|
|
let cmdObjSeen = sentinel;
|
|
|
|
Mongo.prototype.runCommand = function runCommandSpy(dbName, cmdObj, options) {
|
|
cmdObjSeen = cmdObj;
|
|
return mongoRunCommandOriginal.apply(this, arguments);
|
|
};
|
|
|
|
try {
|
|
assert.doesNotThrow(testFunc);
|
|
} finally {
|
|
Mongo.prototype.runCommand = mongoRunCommandOriginal;
|
|
}
|
|
|
|
if (cmdObjSeen === sentinel) {
|
|
throw new Error("Mongo.prototype.runCommand() was never called: " + testFunc.toString());
|
|
}
|
|
|
|
checkFunc(cmdObjSeen);
|
|
}
|
|
|
|
jsTestLog("Testing watch() without options");
|
|
let changeStreamCursor = coll.watch();
|
|
|
|
assert(!changeStreamCursor.hasNext());
|
|
|
|
// Write the first document into the collection. We will save the resume token from this change.
|
|
assert.writeOK(coll.insert({_id: 0, x: 1}));
|
|
let resumeToken;
|
|
|
|
// Test that each of the change stream cursors picks up the change.
|
|
assert.soon(() => changeStreamCursor.hasNext());
|
|
let change = changeStreamCursor.next();
|
|
assert(!changeStreamCursor.hasNext());
|
|
let expected = {
|
|
documentKey: {_id: 0},
|
|
fullDocument: {_id: 0, x: 1},
|
|
ns: {db: "test", coll: coll.getName()},
|
|
operationType: "insert",
|
|
};
|
|
assert("_id" in change, "Got unexpected change: " + tojson(change));
|
|
// Remember the _id of the first op to resume the stream.
|
|
resumeToken = change._id;
|
|
// Remove the fields we cannot predict, then test that the change is as expected.
|
|
delete change._id;
|
|
delete change.clusterTime;
|
|
assert.docEq(change, expected);
|
|
|
|
jsTestLog("Testing watch() with pipeline");
|
|
changeStreamCursor = coll.watch([{$project: {clusterTime: 1, docId: "$documentKey._id"}}]);
|
|
|
|
// Store the cluster time of the insert as the timestamp to start from.
|
|
const resumeTime =
|
|
assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]}))
|
|
.operationTime;
|
|
jsTestLog("Insert of document with _id 1 got operationTime " + tojson(resumeTime));
|
|
|
|
const changeForInsert = checkNextChange(changeStreamCursor, {docId: 1});
|
|
jsTestLog("Change stream event for document with _id 1 reports clusterTime " +
|
|
tojson(changeForInsert.clusterTime));
|
|
|
|
// We expect the clusterTime returned by the change stream event and the operationTime returned
|
|
// by the insert to be the same.
|
|
assert.eq(changeForInsert.clusterTime, resumeTime);
|
|
|
|
jsTestLog("Testing watch() with pipeline and resumeAfter");
|
|
changeStreamCursor =
|
|
coll.watch([{$project: {docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
|
|
checkNextChange(changeStreamCursor, {docId: 1});
|
|
|
|
jsTestLog("Testing watch() with pipeline and startAfter");
|
|
changeStreamCursor =
|
|
coll.watch([{$project: {docId: "$documentKey._id"}}], {startAfter: resumeToken});
|
|
checkNextChange(changeStreamCursor, {docId: 1});
|
|
|
|
jsTestLog("Testing watch() with pipeline and startAtOperationTime");
|
|
changeStreamCursor =
|
|
coll.watch([{$project: {docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime});
|
|
checkNextChange(changeStreamCursor, {docId: 1});
|
|
|
|
jsTestLog("Testing watch() with updateLookup");
|
|
changeStreamCursor = coll.watch([], {fullDocument: "updateLookup"});
|
|
|
|
assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}}));
|
|
expected = {
|
|
documentKey: {_id: 0},
|
|
fullDocument: {_id: 0, x: 10},
|
|
ns: {db: "test", coll: coll.getName()},
|
|
operationType: "update",
|
|
updateDescription: {removedFields: [], updatedFields: {x: 10}},
|
|
};
|
|
checkNextChange(changeStreamCursor, expected);
|
|
|
|
jsTestLog("Testing watch() with batchSize");
|
|
// Only test mongod because mongos uses batch size 0 for aggregate commands internally to
|
|
// establish cursors quickly. GetMore on mongos doesn't respect batch size due to SERVER-31992.
|
|
const isMongos = FixtureHelpers.isMongos(db);
|
|
if (!isMongos) {
|
|
// Increase a field by 5 times and verify the batch size is respected.
|
|
for (let i = 0; i < 5; i++) {
|
|
assert.writeOK(coll.update({_id: 1}, {$inc: {x: 1}}));
|
|
}
|
|
|
|
// Only watch the "update" changes of the specific doc since the beginning.
|
|
changeStreamCursor = coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}],
|
|
{resumeAfter: resumeToken, batchSize: 2});
|
|
|
|
// Check the first batch.
|
|
assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
|
|
// Consume the first batch.
|
|
assert(changeStreamCursor.hasNext());
|
|
changeStreamCursor.next();
|
|
assert(changeStreamCursor.hasNext());
|
|
changeStreamCursor.next();
|
|
// Confirm that the batch is empty.
|
|
assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
|
|
|
|
// Check the batch returned by getMore.
|
|
assert(changeStreamCursor.hasNext());
|
|
assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
|
|
changeStreamCursor.next();
|
|
assert(changeStreamCursor.hasNext());
|
|
changeStreamCursor.next();
|
|
assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
|
|
// There are more changes coming, just not in the batch.
|
|
assert(changeStreamCursor.hasNext());
|
|
}
|
|
|
|
jsTestLog("Testing watch() with maxAwaitTimeMS");
|
|
changeStreamCursor = coll.watch([], {maxAwaitTimeMS: 500});
|
|
testCommandIsCalled(() => assert(!changeStreamCursor.hasNext()), (cmdObj) => {
|
|
assert.eq(
|
|
"getMore", Object.keys(cmdObj)[0], "expected getMore command, but was: " + tojson(cmdObj));
|
|
assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj));
|
|
assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj));
|
|
});
|
|
|
|
jsTestLog("Testing the cursor gets closed when the collection gets dropped");
|
|
changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]);
|
|
assert.writeOK(coll.insert({_id: 2, x: 1}));
|
|
expected = {
|
|
documentKey: {_id: 2},
|
|
fullDocument: {_id: 2, x: 1},
|
|
ns: {db: "test", coll: coll.getName()},
|
|
operationType: "insert",
|
|
};
|
|
checkNextChange(changeStreamCursor, expected);
|
|
assert(!changeStreamCursor.hasNext());
|
|
assert(!changeStreamCursor.isClosed());
|
|
assert(!changeStreamCursor.isExhausted());
|
|
|
|
// Dropping the collection should trigger a drop notification.
|
|
assertDropCollection(db, coll.getName());
|
|
assert.soon(() => changeStreamCursor.hasNext());
|
|
assert(!changeStreamCursor.isExhausted());
|
|
expected = {
|
|
operationType: "drop",
|
|
ns: {db: db.getName(), coll: coll.getName()}
|
|
};
|
|
checkNextChange(changeStreamCursor, expected);
|
|
// For single collection change streams, the drop should invalidate the stream.
|
|
const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
|
|
|
|
if (invalidateDoc) {
|
|
jsTestLog("Testing using the 'startAfter' option from the invalidate entry");
|
|
assert.commandWorked(coll.insert({_id: "After drop"}));
|
|
let resumedFromInvalidate =
|
|
coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}});
|
|
|
|
// We should see the new insert after starting over. However, in sharded cluster
|
|
// passthroughs we may see more drop and invalidate notifications before we see the insert.
|
|
let firstChangeAfterDrop;
|
|
assert.soon(() => {
|
|
if (!resumedFromInvalidate.hasNext()) {
|
|
return false;
|
|
}
|
|
const next = resumedFromInvalidate.next();
|
|
if (next.operationType == "invalidate") {
|
|
// Start again!
|
|
resumedFromInvalidate =
|
|
coll.watch([], {startAfter: next._id, collation: {locale: "simple"}});
|
|
return false;
|
|
}
|
|
if (next.operationType == "drop") {
|
|
return false;
|
|
}
|
|
// THIS is the change we wanted.
|
|
firstChangeAfterDrop = next;
|
|
return true;
|
|
});
|
|
|
|
assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change));
|
|
}
|
|
}());
|