mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
139 lines
5.0 KiB
JavaScript
139 lines
5.0 KiB
JavaScript
/**
|
|
* Test that a change stream on the primary node survives stepdown.
|
|
*
|
|
* Change streams are only supported on WiredTiger.
|
|
* @tags: [requires_wiredtiger]
|
|
*/
|
|
(function() {
|
|
"use strict";
|
|
|
|
load("jstests/libs/write_concern_util.js"); // for [stop|restart]ServerReplication.
|
|
|
|
const name = "change_stream_stepdown";
|
|
const replTest = new ReplSetTest({name: name, nodes: [{}, {}]});
|
|
replTest.startSet();
|
|
replTest.initiate();
|
|
|
|
const dbName = name;
|
|
const collName = "change_stream_stepdown";
|
|
const changeStreamComment = collName + "_comment";
|
|
|
|
const primary = replTest.getPrimary();
|
|
const secondary = replTest.getSecondary();
|
|
const primaryDb = primary.getDB(dbName);
|
|
const secondaryDb = secondary.getDB(dbName);
|
|
const primaryColl = primaryDb[collName];
|
|
|
|
// Tell the secondary to stay secondary until we say otherwise.
|
|
assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 999999}));
|
|
|
|
// Open a change stream.
|
|
let res = primaryDb.runCommand({
|
|
aggregate: collName,
|
|
pipeline: [{$changeStream: {}}],
|
|
cursor: {},
|
|
comment: changeStreamComment,
|
|
maxTimeMS: 5000
|
|
});
|
|
assert.commandWorked(res);
|
|
let cursorId = res.cursor.id;
|
|
|
|
// Insert several documents on primary and let them majority commit.
|
|
assert.commandWorked(
|
|
primaryColl.insert([{_id: 1}, {_id: 2}, {_id: 3}], {writeConcern: {w: "majority"}}));
|
|
replTest.awaitReplication();
|
|
|
|
jsTestLog("Testing that changestream survives stepdown between find and getmore");
|
|
// Step down.
|
|
assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true}));
|
|
replTest.waitForState(primary, ReplSetTest.State.SECONDARY);
|
|
|
|
// Receive the first change event. This tests stepdown between find and getmore.
|
|
res = assert.commandWorked(
|
|
primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1}));
|
|
let changes = res.cursor.nextBatch;
|
|
assert.eq(changes.length, 1);
|
|
assert.eq(changes[0]["fullDocument"], {_id: 1});
|
|
assert.eq(changes[0]["operationType"], "insert");
|
|
|
|
jsTestLog("Testing that changestream survives step-up");
|
|
// Step back up and wait for primary.
|
|
assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0}));
|
|
replTest.getPrimary();
|
|
|
|
// Get the next one. This tests that changestreams survives a step-up.
|
|
res = assert.commandWorked(
|
|
primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1}));
|
|
changes = res.cursor.nextBatch;
|
|
assert.eq(changes.length, 1);
|
|
assert.eq(changes[0]["fullDocument"], {_id: 2});
|
|
assert.eq(changes[0]["operationType"], "insert");
|
|
|
|
jsTestLog("Testing that changestream survives stepdown between two getmores");
|
|
// Step down again.
|
|
assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true}));
|
|
replTest.waitForState(primary, ReplSetTest.State.SECONDARY);
|
|
|
|
// Get the next one. This tests that changestreams survives a step down between getmores.
|
|
res = assert.commandWorked(
|
|
primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1}));
|
|
changes = res.cursor.nextBatch;
|
|
assert.eq(changes.length, 1);
|
|
assert.eq(changes[0]["fullDocument"], {_id: 3});
|
|
assert.eq(changes[0]["operationType"], "insert");
|
|
|
|
// Step back up and wait for primary.
|
|
assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0}));
|
|
replTest.getPrimary();
|
|
|
|
jsTestLog("Testing that changestream waiting on old primary sees docs inserted on new primary");
|
|
|
|
replTest.awaitReplication(); // Ensure secondary is up to date and can win an election.
|
|
TestData.changeStreamComment = changeStreamComment;
|
|
TestData.secondaryHost = secondary.host;
|
|
TestData.dbName = dbName;
|
|
TestData.collName = collName;
|
|
let waitForShell = startParallelShell(function() {
|
|
// Wait for the getMore to be in progress.
|
|
assert.soon(
|
|
() => db.getSiblingDB("admin")
|
|
.aggregate([
|
|
{'$currentOp': {}},
|
|
{
|
|
'$match': {
|
|
op: 'getmore',
|
|
'cursor.originatingCommand.comment': TestData.changeStreamComment
|
|
}
|
|
}
|
|
])
|
|
.itcount() == 1);
|
|
|
|
const secondary = new Mongo(TestData.secondaryHost);
|
|
const secondaryDb = secondary.getDB(TestData.dbName);
|
|
// Step down the old primary and wait for new primary.
|
|
assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 0}));
|
|
assert.commandWorked(secondaryDb.adminCommand({replSetStepUp: 1, skipDryRun: true}));
|
|
jsTestLog("Waiting for new primary");
|
|
assert.soon(() => secondaryDb.adminCommand({isMaster: 1}).ismaster);
|
|
|
|
jsTestLog("Inserting document on new primary");
|
|
assert.commandWorked(secondaryDb[TestData.collName].insert({_id: 4}),
|
|
{writeConcern: {w: "majority"}});
|
|
}, primary.port);
|
|
|
|
res = assert.commandWorked(primaryDb.runCommand({
|
|
getMore: cursorId,
|
|
collection: collName,
|
|
batchSize: 1,
|
|
maxTimeMS: ReplSetTest.kDefaultTimeoutMS
|
|
}));
|
|
changes = res.cursor.nextBatch;
|
|
assert.eq(changes.length, 1);
|
|
assert.eq(changes[0]["fullDocument"], {_id: 4});
|
|
assert.eq(changes[0]["operationType"], "insert");
|
|
|
|
waitForShell();
|
|
|
|
replTest.stopSet();
|
|
})();
|