mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-24 00:17:37 +01:00
668a6f4e9e
GitOrigin-RevId: 1cd8a1cdb3d45876003ad3ccddd4d466cd9fb66c
128 lines
5.2 KiB
JavaScript
128 lines
5.2 KiB
JavaScript
// Tests that internal resharding ops are exposed as change stream events during a
|
|
// resharding operation. Internal resharding ops include:
|
|
// 1. reshardBegin
|
|
// 2. reshardDoneCatchUp
|
|
//
|
|
// @tags: [
|
|
// requires_majority_read_concern,
|
|
// uses_change_streams,
|
|
// uses_atclustertime,
|
|
//
|
|
// ]
|
|
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
|
import {
|
|
assertChangeStreamEventEq,
|
|
ChangeStreamTest
|
|
} from "jstests/libs/query/change_stream_util.js";
|
|
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
|
|
|
// Use a higher frequency for periodic noops to speed up the test.
|
|
const reshardingTest = new ReshardingTest({
|
|
numDonors: 2,
|
|
numRecipients: 1,
|
|
reshardInPlace: false,
|
|
periodicNoopIntervalSecs: 1,
|
|
writePeriodicNoops: true
|
|
});
|
|
reshardingTest.setup();
|
|
|
|
const kDbName = "reshardingDb";
|
|
const collName = "coll";
|
|
const ns = kDbName + "." + collName;
|
|
|
|
const donorShardNames = reshardingTest.donorShardNames;
|
|
const recipientShardNames = reshardingTest.recipientShardNames;
|
|
const sourceCollection = reshardingTest.createShardedCollection({
|
|
ns: ns,
|
|
shardKeyPattern: {oldKey: 1},
|
|
chunks: [
|
|
{min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
|
|
{min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}
|
|
],
|
|
primaryShardName: donorShardNames[0]
|
|
});
|
|
|
|
const mongos = sourceCollection.getMongo();
|
|
const topology = DiscoverTopology.findConnectedNodes(mongos);
|
|
|
|
const donor0 = new Mongo(topology.shards[donorShardNames[0]].primary);
|
|
const cstDonor0 = new ChangeStreamTest(donor0.getDB(kDbName));
|
|
let changeStreamsCursorDonor0 = cstDonor0.startWatchingChanges(
|
|
{pipeline: [{$changeStream: {showMigrationEvents: true}}], collection: collName});
|
|
|
|
const donor1 = new Mongo(topology.shards[donorShardNames[1]].primary);
|
|
const cstDonor1 = new ChangeStreamTest(donor1.getDB(kDbName));
|
|
let changeStreamsCursorDonor1 = cstDonor1.startWatchingChanges(
|
|
{pipeline: [{$changeStream: {showMigrationEvents: true}}], collection: collName});
|
|
|
|
const recipient0 = new Mongo(topology.shards[recipientShardNames[0]].primary);
|
|
const cstRecipient0 = new ChangeStreamTest(recipient0.getDB(kDbName));
|
|
|
|
let reshardingUUID;
|
|
let changeStreamsCursorRecipient0;
|
|
|
|
reshardingTest.withReshardingInBackground(
|
|
{
|
|
// If a donor is also a recipient, the donor state machine will run renameCollection with
|
|
// {dropTarget : true} rather than running drop and letting the recipient state machine run
|
|
// rename at the end of the resharding operation. So, we ensure that only one of the donor
|
|
// shards will also be a recipient shard in order to verify that neither the rename with
|
|
// {dropTarget : true} nor the drop command are picked up by the change streams cursor.
|
|
newShardKeyPattern: {newKey: 1},
|
|
newChunks: [
|
|
{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]},
|
|
],
|
|
},
|
|
(tempNs) => {
|
|
// Wait until participants are aware of the resharding operation.
|
|
reshardingTest.awaitCloneTimestampChosen();
|
|
|
|
const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne();
|
|
reshardingUUID = coordinatorDoc._id;
|
|
|
|
changeStreamsCursorRecipient0 = cstRecipient0.startWatchingChanges({
|
|
pipeline: [{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true}}],
|
|
collection: tempNs.substring(tempNs.indexOf('.') + 1)
|
|
});
|
|
|
|
// Check for reshardBegin event on both donors.
|
|
const expectedReshardBeginEvent = {
|
|
reshardingUUID: reshardingUUID,
|
|
operationType: "reshardBegin",
|
|
ns: {db: kDbName, coll: collName},
|
|
};
|
|
|
|
const reshardBeginDonor0Event =
|
|
cstDonor0.getNextChanges(changeStreamsCursorDonor0, 1, false /* skipFirstBatch */);
|
|
|
|
assertChangeStreamEventEq(reshardBeginDonor0Event[0], expectedReshardBeginEvent);
|
|
|
|
const reshardBeginDonor1Event =
|
|
cstDonor1.getNextChanges(changeStreamsCursorDonor1, 1, false /* skipFirstBatch */);
|
|
assertChangeStreamEventEq(reshardBeginDonor1Event[0], expectedReshardBeginEvent);
|
|
},
|
|
{
|
|
postDecisionPersistedFn: () => {
|
|
// Check for reshardDoneCatchUp event on the recipient.
|
|
const expectedReshardDoneCatchUpEvent = {
|
|
reshardingUUID: reshardingUUID,
|
|
operationType: "reshardDoneCatchUp",
|
|
};
|
|
|
|
const reshardDoneCatchUpEvent = cstRecipient0.getNextChanges(
|
|
changeStreamsCursorRecipient0, 1, false /* skipFirstBatch */)[0];
|
|
|
|
// Ensure that the 'reshardingDoneCatchUp' event has an 'ns' field of the format
|
|
// '{ns: kDbName, coll: "system.resharding.<>"}.
|
|
assert(reshardDoneCatchUpEvent.ns, reshardDoneCatchUpEvent);
|
|
assert.eq(reshardDoneCatchUpEvent.ns.db, kDbName, reshardDoneCatchUpEvent);
|
|
assert(reshardDoneCatchUpEvent.ns.coll.startsWith("system.resharding."),
|
|
reshardDoneCatchUpEvent);
|
|
delete reshardDoneCatchUpEvent.ns;
|
|
|
|
assertChangeStreamEventEq(reshardDoneCatchUpEvent, expectedReshardDoneCatchUpEvent);
|
|
}
|
|
});
|
|
|
|
reshardingTest.teardown();
|