0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-24 00:17:37 +01:00
mongodb/jstests/sharding/resharding_change_stream_internal_ops.js
Nick Zolnierz 668a6f4e9e SERVER-94971 Add query ownership for files under jstests/libs (#27763)
GitOrigin-RevId: 1cd8a1cdb3d45876003ad3ccddd4d466cd9fb66c
2024-10-03 16:08:45 +00:00

128 lines
5.2 KiB
JavaScript

// Tests that internal resharding ops are exposed as change stream events during a
// resharding operation. Internal resharding ops include:
// 1. reshardBegin
// 2. reshardDoneCatchUp
//
// @tags: [
// requires_majority_read_concern,
// uses_change_streams,
// uses_atclustertime,
//
// ]
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
import {
assertChangeStreamEventEq,
ChangeStreamTest
} from "jstests/libs/query/change_stream_util.js";
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
// Use a higher frequency for periodic noops to speed up the test.
const reshardingTest = new ReshardingTest({
numDonors: 2,
numRecipients: 1,
reshardInPlace: false,
periodicNoopIntervalSecs: 1,
writePeriodicNoops: true
});
reshardingTest.setup();
const kDbName = "reshardingDb";
const collName = "coll";
const ns = kDbName + "." + collName;
const donorShardNames = reshardingTest.donorShardNames;
const recipientShardNames = reshardingTest.recipientShardNames;
const sourceCollection = reshardingTest.createShardedCollection({
ns: ns,
shardKeyPattern: {oldKey: 1},
chunks: [
{min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
{min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}
],
primaryShardName: donorShardNames[0]
});
const mongos = sourceCollection.getMongo();
const topology = DiscoverTopology.findConnectedNodes(mongos);
const donor0 = new Mongo(topology.shards[donorShardNames[0]].primary);
const cstDonor0 = new ChangeStreamTest(donor0.getDB(kDbName));
let changeStreamsCursorDonor0 = cstDonor0.startWatchingChanges(
{pipeline: [{$changeStream: {showMigrationEvents: true}}], collection: collName});
const donor1 = new Mongo(topology.shards[donorShardNames[1]].primary);
const cstDonor1 = new ChangeStreamTest(donor1.getDB(kDbName));
let changeStreamsCursorDonor1 = cstDonor1.startWatchingChanges(
{pipeline: [{$changeStream: {showMigrationEvents: true}}], collection: collName});
const recipient0 = new Mongo(topology.shards[recipientShardNames[0]].primary);
const cstRecipient0 = new ChangeStreamTest(recipient0.getDB(kDbName));
let reshardingUUID;
let changeStreamsCursorRecipient0;
reshardingTest.withReshardingInBackground(
{
// If a donor is also a recipient, the donor state machine will run renameCollection with
// {dropTarget : true} rather than running drop and letting the recipient state machine run
// rename at the end of the resharding operation. So, we ensure that only one of the donor
// shards will also be a recipient shard in order to verify that neither the rename with
// {dropTarget : true} nor the drop command are picked up by the change streams cursor.
newShardKeyPattern: {newKey: 1},
newChunks: [
{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]},
],
},
(tempNs) => {
// Wait until participants are aware of the resharding operation.
reshardingTest.awaitCloneTimestampChosen();
const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne();
reshardingUUID = coordinatorDoc._id;
changeStreamsCursorRecipient0 = cstRecipient0.startWatchingChanges({
pipeline: [{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true}}],
collection: tempNs.substring(tempNs.indexOf('.') + 1)
});
// Check for reshardBegin event on both donors.
const expectedReshardBeginEvent = {
reshardingUUID: reshardingUUID,
operationType: "reshardBegin",
ns: {db: kDbName, coll: collName},
};
const reshardBeginDonor0Event =
cstDonor0.getNextChanges(changeStreamsCursorDonor0, 1, false /* skipFirstBatch */);
assertChangeStreamEventEq(reshardBeginDonor0Event[0], expectedReshardBeginEvent);
const reshardBeginDonor1Event =
cstDonor1.getNextChanges(changeStreamsCursorDonor1, 1, false /* skipFirstBatch */);
assertChangeStreamEventEq(reshardBeginDonor1Event[0], expectedReshardBeginEvent);
},
{
postDecisionPersistedFn: () => {
// Check for reshardDoneCatchUp event on the recipient.
const expectedReshardDoneCatchUpEvent = {
reshardingUUID: reshardingUUID,
operationType: "reshardDoneCatchUp",
};
const reshardDoneCatchUpEvent = cstRecipient0.getNextChanges(
changeStreamsCursorRecipient0, 1, false /* skipFirstBatch */)[0];
// Ensure that the 'reshardingDoneCatchUp' event has an 'ns' field of the format
// '{ns: kDbName, coll: "system.resharding.<>"}.
assert(reshardDoneCatchUpEvent.ns, reshardDoneCatchUpEvent);
assert.eq(reshardDoneCatchUpEvent.ns.db, kDbName, reshardDoneCatchUpEvent);
assert(reshardDoneCatchUpEvent.ns.coll.startsWith("system.resharding."),
reshardDoneCatchUpEvent);
delete reshardDoneCatchUpEvent.ns;
assertChangeStreamEventEq(reshardDoneCatchUpEvent, expectedReshardDoneCatchUpEvent);
}
});
reshardingTest.teardown();