mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-30 17:10:48 +01:00
SERVER-41770 Multiversion test for change stream support of large txns
This commit is contained in:
parent
ac6c8db70d
commit
c68b233864
130
jstests/multiVersion/change_streams_multi_version_transaction.js
Normal file
130
jstests/multiVersion/change_streams_multi_version_transaction.js
Normal file
@ -0,0 +1,130 @@
|
||||
// Test that resume tokens from a replica set running the "last-stable" mongod can be used to resume
|
||||
// a change stream after upgrading the replica set to the "latest" mongod, even when the change
|
||||
// stream includes multi-statement transactions.
|
||||
//
|
||||
// @tags: [uses_change_streams, uses_transactions, requires_replication]
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
load("jstests/libs/feature_compatibility_version.js");
|
||||
load('jstests/multiVersion/libs/multi_rs.js'); // For upgradeSet.
|
||||
load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
|
||||
|
||||
const rst = new ReplSetTest({
|
||||
nodes: 2,
|
||||
nopeOptions: {binVersion: "last-stable"},
|
||||
});
|
||||
|
||||
if (!startSetIfSupportsReadMajority(rst)) {
|
||||
jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
|
||||
rst.stopSet();
|
||||
return;
|
||||
}
|
||||
|
||||
rst.initiate();
|
||||
|
||||
const dbName = jsTestName();
|
||||
const watchedCollName = "change_stream_watched";
|
||||
const unwatchedCollName = "change_stream_unwatched";
|
||||
|
||||
rst.getPrimary().getDB(dbName).createCollection(watchedCollName);
|
||||
rst.getPrimary().getDB(dbName).createCollection(unwatchedCollName);
|
||||
|
||||
// Calls next() on a change stream cursor 'n' times and returns an array with the results.
|
||||
function getChangeStreamResults(cursor, n) {
|
||||
let results = [];
|
||||
for (let i = 0; i < n; ++i) {
|
||||
assert.soon(() => cursor.hasNext(), "Timed out waiting for change stream result " + i);
|
||||
results.push(cursor.next());
|
||||
}
|
||||
assert(!cursor.hasNext()); // The change stream should always have exactly 'n' results.
|
||||
return results;
|
||||
}
|
||||
|
||||
// Compare expected changes with output from a change stream, failing an assertion if they do
|
||||
// not match.
|
||||
function compareChanges(expectedChanges, observedChanges) {
|
||||
assert.eq(expectedChanges.length, observedChanges.length);
|
||||
for (let i = 0; i < expectedChanges.length; ++i) {
|
||||
assert.eq(expectedChanges[i].operationType, observedChanges[i].operationType);
|
||||
if (expectedChanges[i].hasOwnProperty("fullDocument")) {
|
||||
assert.eq(expectedChanges[i].fullDocument, observedChanges[i].fullDocument);
|
||||
}
|
||||
if (expectedChanges[i].hasOwnProperty("updateDescription")) {
|
||||
assert.eq(expectedChanges[i].updateDescription,
|
||||
observedChanges[i].updateDescription);
|
||||
}
|
||||
if (expectedChanges[i].hasOwnProperty("documentKey")) {
|
||||
assert.eq(expectedChanges[i].documentKey, observedChanges[i].documentKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Opens a $changeStream and then performs inserts, deletes, updates both within a transaction
|
||||
// and outside the transaction. Leaves all collections empty when done.
|
||||
function performDBOps(mongod) {
|
||||
const session = mongod.startSession();
|
||||
session.startTransaction();
|
||||
|
||||
const watchedColl = session.getDatabase(dbName)[watchedCollName];
|
||||
assert.commandWorked(watchedColl.insert({_id: 1}));
|
||||
assert.commandWorked(watchedColl.updateOne({_id: 1}, {$set: {a: 1}}));
|
||||
assert.commandWorked(watchedColl.remove({_id: 1}));
|
||||
|
||||
const unwatchedColl = session.getDatabase(dbName)[unwatchedCollName];
|
||||
assert.commandWorked(unwatchedColl.insert({_id: 1}));
|
||||
assert.commandWorked(unwatchedColl.remove({_id: 1}));
|
||||
|
||||
const watchedCollNoTxn = mongod.getDB(dbName)[watchedCollName];
|
||||
assert.commandWorked(watchedCollNoTxn.insert({_id: 2}));
|
||||
assert.commandWorked(watchedCollNoTxn.remove({_id: 2}));
|
||||
|
||||
session.commitTransaction();
|
||||
}
|
||||
|
||||
// Resume a change stream from each of the resume tokens in the 'changeStreamDocs' array and
|
||||
// verify that we always see the same set of changes.
|
||||
function resumeChangeStreamFromEachToken(mongod, changeStreamDocs, expectedChanges) {
|
||||
changeStreamDocs.forEach(function(changeDoc, i) {
|
||||
const testDB = mongod.getDB(dbName);
|
||||
const resumedCursor = testDB[watchedCollName].watch([], {resumeAfter: changeDoc._id});
|
||||
|
||||
// Resuming from document 'i' should return all the documents from 'i' + 1 to the end of
|
||||
// the array.
|
||||
const expectedChangesAfterResumeToken = expectedChanges.slice(i + 1);
|
||||
compareChanges(
|
||||
expectedChangesAfterResumeToken,
|
||||
getChangeStreamResults(resumedCursor, expectedChangesAfterResumeToken.length));
|
||||
});
|
||||
}
|
||||
|
||||
const expectedChanges = [
|
||||
{operationType: "insert", fullDocument: {_id: 2}},
|
||||
{operationType: "delete", documentKey: {_id: 2}},
|
||||
{operationType: "insert", fullDocument: {_id: 1}},
|
||||
{operationType: "update", updateDescription: {updatedFields: {a: 1}, removedFields: []}},
|
||||
{operationType: "delete", documentKey: {_id: 1}},
|
||||
];
|
||||
|
||||
// Create the original change stream, verify it gives us the changes we expect, and verify that
|
||||
// we can correctly resume from any resume token.
|
||||
const changeStreamCursor = rst.getPrimary().getDB(dbName)[watchedCollName].watch();
|
||||
performDBOps(rst.getPrimary());
|
||||
const changeStreamDocs = getChangeStreamResults(changeStreamCursor, expectedChanges.length);
|
||||
compareChanges(expectedChanges, changeStreamDocs);
|
||||
resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
|
||||
|
||||
// Upgrade the replica set (while leaving featureCompatibilityVersion as it is) and verify that
|
||||
// we can correctly resume from any resume token.
|
||||
rst.upgradeSet({binVersion: "latest"});
|
||||
resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
|
||||
|
||||
// Upgrade the featureCompatibilityVersion and verify that we can correctly resume from any
|
||||
// resume token.
|
||||
assert.commandWorked(
|
||||
rst.getPrimary().adminCommand({setFeatureCompatibilityVersion: latestFCV}));
|
||||
checkFCV(rst.getPrimary().getDB("admin"), latestFCV);
|
||||
resumeChangeStreamFromEachToken(rst.getPrimary(), changeStreamDocs, expectedChanges);
|
||||
|
||||
rst.stopSet();
|
||||
}());
|
Loading…
Reference in New Issue
Block a user