mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-24 16:46:00 +01:00
2fd5f78d5a
GitOrigin-RevId: df168ee363c3f0e86526270437d3688ac4bb326d
281 lines
11 KiB
JavaScript
281 lines
11 KiB
JavaScript
/**
|
|
* Test that bulkWrite emit the same metrics that corresponding calls to update, delete and insert.
|
|
*
|
|
* @tags: [requires_fcv_80, featureFlagTimeseriesUpdatesSupport]
|
|
*/
|
|
|
|
import {BulkWriteMetricChecker} from "jstests/libs/bulk_write_utils.js";
|
|
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
|
|
function runTest(isMongos, cluster, bulkWrite, retryCount, timeseries) {
|
|
// We are ok with the randomness here since we clearly log the state.
|
|
const errorsOnly = Math.random() < 0.5;
|
|
print(`Running on a ${isMongos ? "ShardingTest" : "ReplSetTest"} with bulkWrite = ${
|
|
bulkWrite}, errorsOnly = ${errorsOnly} and timeseries = ${timeseries}.`);
|
|
|
|
const dbName = "testDB";
|
|
const collName1 = "testColl1";
|
|
const collName2 = "testColl2";
|
|
const namespace1 = `${dbName}.${collName1}`;
|
|
const namespace2 = `${dbName}.${collName2}`;
|
|
const session = isMongos ? cluster.s.startSession() : cluster.getPrimary().startSession();
|
|
const testDB = session.getDatabase(dbName);
|
|
|
|
// The chunks below are [$minKey, key1) on shard0, [key1, key2) on shard1 and [key2, $maxKey) on
|
|
// shard2.
|
|
const key0 = ISODate("2024-01-02T00:00:00.00Z"); // On shard0.
|
|
const key1 = ISODate("2024-01-05T00:00:00.00Z"); // On shard1.
|
|
const key2 = ISODate("2024-01-20T00:00:00.00Z"); // On shard2.
|
|
|
|
if (timeseries) {
|
|
assert.commandWorked(testDB.createCollection(collName1, {
|
|
timeseries: {timeField: "timestamp", bucketMaxSpanSeconds: 1, bucketRoundingSeconds: 1}
|
|
}));
|
|
}
|
|
|
|
if (isMongos) {
|
|
assert.commandWorked(testDB.adminCommand({'enableSharding': dbName}));
|
|
assert.commandWorked(
|
|
testDB.adminCommand({shardCollection: namespace1, key: {timestamp: 1}}));
|
|
assert.commandWorked(
|
|
testDB.adminCommand({shardCollection: namespace2, key: {timestamp: 1}}));
|
|
|
|
const splitNs = timeseries ? `${dbName}.system.buckets.${collName1}` : namespace1;
|
|
const splitKey = timeseries ? "control.min.timestamp" : "timestamp";
|
|
|
|
assert.commandWorked(testDB.adminCommand({split: splitNs, middle: {[splitKey]: key1}}));
|
|
assert.commandWorked(testDB.adminCommand({split: splitNs, middle: {[splitKey]: key2}}));
|
|
|
|
// Move chunks so each shard has one chunk.
|
|
assert.commandWorked(testDB.adminCommand({
|
|
moveChunk: splitNs,
|
|
find: {[splitKey]: key0},
|
|
to: cluster.shard0.shardName,
|
|
_waitForDelete: true
|
|
}));
|
|
assert.commandWorked(testDB.adminCommand({
|
|
moveChunk: splitNs,
|
|
find: {[splitKey]: key1},
|
|
to: cluster.shard1.shardName,
|
|
_waitForDelete: true
|
|
}));
|
|
assert.commandWorked(testDB.adminCommand({
|
|
moveChunk: splitNs,
|
|
find: {[splitKey]: key2},
|
|
to: cluster.shard2.shardName,
|
|
_waitForDelete: true
|
|
}));
|
|
}
|
|
|
|
const coll = testDB[collName1];
|
|
const coll2 = testDB[collName2];
|
|
|
|
const metricChecker =
|
|
new BulkWriteMetricChecker(testDB,
|
|
[namespace1, namespace2],
|
|
bulkWrite,
|
|
isMongos,
|
|
false /*fle*/,
|
|
errorsOnly,
|
|
retryCount,
|
|
timeseries,
|
|
ISODate("2021-05-18T00:00:00.000Z") /* defaultTimestamp */);
|
|
|
|
// Simplifies implementation of checkBulkWriteMetrics:
|
|
// totals["testDB.testColl"] will not be undefined on first top call below.
|
|
metricChecker.executeCommand({insert: collName1, documents: [{_id: 99}]});
|
|
metricChecker.executeCommand({insert: collName2, documents: [{_id: 99}]});
|
|
|
|
metricChecker.checkMetrics("Simple insert",
|
|
[{insert: 0, document: {_id: 0}}],
|
|
[{insert: collName1, documents: [{_id: 0}]}],
|
|
{inserted: 1});
|
|
|
|
metricChecker.checkMetrics("Update with pipeline",
|
|
[{update: 0, filter: {_id: 0}, updateMods: [{$set: {x: 1}}]}],
|
|
[{update: collName1, updates: [{q: {_id: 0}, u: [{$set: {x: 1}}]}]}],
|
|
{updated: 1, updatePipeline: 1});
|
|
|
|
metricChecker.executeCommand(
|
|
{insert: collName1, documents: [{_id: 1, a: [{b: 5}, {b: 1}, {b: 2}]}]});
|
|
|
|
metricChecker.checkMetrics(
|
|
"Update with arrayFilters",
|
|
[{
|
|
update: 0,
|
|
filter: {_id: 1},
|
|
updateMods: {$set: {"a.$[i].b": 6}},
|
|
arrayFilters: [{"i.b": 5}]
|
|
}],
|
|
[{
|
|
update: collName1,
|
|
updates: [{q: {_id: 1}, u: {$set: {"a.$[i].b": 6}}, arrayFilters: [{"i.b": 5}]}]
|
|
}],
|
|
{updated: 1, updateArrayFilters: 1});
|
|
|
|
metricChecker.checkMetrics("Simple delete",
|
|
[{delete: 0, filter: {_id: 0}}],
|
|
[{delete: collName1, deletes: [{q: {_id: 0}, limit: 1}]}],
|
|
{deleted: 1});
|
|
|
|
metricChecker.checkMetricsWithRetries("Simple insert with retry",
|
|
[{insert: 0, document: {_id: 3}}],
|
|
{
|
|
insert: collName1,
|
|
documents: [{_id: 3}],
|
|
},
|
|
{inserted: 1, retriedInsert: retryCount - 1},
|
|
session.getSessionId(),
|
|
NumberLong(10));
|
|
|
|
metricChecker.checkMetricsWithRetries(
|
|
"Simple update with retry",
|
|
[{
|
|
update: 0,
|
|
filter: {_id: 1},
|
|
updateMods: {$set: {"a.$[i].b": 7}},
|
|
arrayFilters: [{"i.b": 6}]
|
|
}],
|
|
{
|
|
update: collName1,
|
|
updates: [{q: {_id: 1}, u: {$set: {"a.$[i].b": 7}}, arrayFilters: [{"i.b": 6}]}]
|
|
},
|
|
{
|
|
updated: 1,
|
|
updateArrayFilters: retryCount // This is incremented even on a retry.
|
|
},
|
|
session.getSessionId(),
|
|
NumberLong(11));
|
|
|
|
// This one is set to have the 2 oneShard updates (each on a different shard) and 3 oneShard
|
|
// inserts (each on a different shard). This means that the bulkWrite as a whole counts as 1 in
|
|
// update.manyShards and 1 in insert.allShards.
|
|
let insertShardField = bulkWrite ? "allShards" : "oneShard";
|
|
let updateShardField = bulkWrite ? "manyShards" : "oneShard";
|
|
|
|
const key3 = ISODate("2024-01-10T00:00:00.00Z"); // On shard1.
|
|
const key4 = ISODate("2024-01-30T00:00:00.00Z"); // On shard2.
|
|
|
|
metricChecker.checkMetrics(
|
|
"Multiple operations",
|
|
[
|
|
{insert: 0, document: {_id: 4, timestamp: key0}},
|
|
{update: 0, filter: {timestamp: key0}, updateMods: {$set: {x: 2}}},
|
|
{insert: 0, document: {timestamp: key3}},
|
|
{update: 0, filter: {timestamp: key3}, updateMods: {$set: {x: 2}}},
|
|
{insert: 0, document: {timestamp: key4}},
|
|
{delete: 0, filter: {_id: 4, timestamp: key0}}
|
|
],
|
|
[
|
|
{insert: collName1, documents: [{_id: 4, timestamp: key0}]},
|
|
{update: collName1, updates: [{q: {timestamp: key0}, u: {$set: {x: 2}}}]},
|
|
{insert: collName1, documents: [{timestamp: key3}]},
|
|
{update: collName1, updates: [{q: {timestamp: key3}, u: {$set: {x: 2}}}]},
|
|
{insert: collName1, documents: [{timestamp: key4}]},
|
|
{delete: collName1, deletes: [{q: {_id: 4, timestamp: key0}, limit: 1}]}
|
|
],
|
|
{
|
|
updated: 2,
|
|
inserted: 3,
|
|
deleted: 1,
|
|
singleUpdateForBulkWrite: true,
|
|
singleInsertForBulkWrite: true,
|
|
insertShardField: insertShardField,
|
|
updateShardField: updateShardField,
|
|
deleteShardField: "oneShard"
|
|
});
|
|
|
|
if (isMongos) {
|
|
// Update modifying owning shard requires a transaction or retryable write, we do not want
|
|
// actual retries here.
|
|
metricChecker.retryCount = 1;
|
|
metricChecker.checkMetricsWithRetries(
|
|
"Update modifying owning shard",
|
|
[
|
|
{update: 0, filter: {timestamp: key3}, updateMods: {$set: {timestamp: key4}}},
|
|
],
|
|
{update: collName1, updates: [{q: {timestamp: key3}, u: {$set: {timestamp: key4}}}]},
|
|
{updated: 1, updateShardField: "manyShards"},
|
|
session.getSessionId(),
|
|
NumberLong(12));
|
|
} else {
|
|
// To get the same documents on Repl and Mongos for next test.
|
|
metricChecker.executeCommand({insert: collName1, documents: [{timestamp: key4, x: 2}]});
|
|
}
|
|
|
|
metricChecker.checkMetrics(
|
|
"Simple update with multi: true",
|
|
[{update: 0, filter: {timestamp: key4}, updateMods: {$set: {x: 3}}, multi: true}],
|
|
[{update: collName1, updates: [{q: {timestamp: key4}, u: {$set: {x: 3}}, multi: true}]}],
|
|
{updated: 2, updateCount: 1, updateShardField: "oneShard"});
|
|
|
|
metricChecker.checkMetrics(
|
|
"Multiple namespaces",
|
|
[
|
|
{insert: 0, document: {_id: 5, timestamp: key0}},
|
|
{insert: 1, document: {_id: 6, timestamp: key0}},
|
|
{update: 1, filter: {timestamp: key0}, updateMods: {$set: {x: 2}}},
|
|
{delete: 0, filter: {_id: 5, timestamp: key0}}
|
|
],
|
|
[
|
|
{insert: collName1, documents: [{_id: 5, timestamp: key0}]},
|
|
{insert: collName2, documents: [{_id: 6, timestamp: key0}]},
|
|
{update: collName2, updates: [{q: {timestamp: key0}, u: {$set: {x: 2}}}]},
|
|
{delete: collName1, deletes: [{q: {_id: 5, timestamp: key0}, limit: 1}]}
|
|
],
|
|
{
|
|
inserted: 2,
|
|
updated: 1,
|
|
deleted: 1,
|
|
updateShardField: "oneShard",
|
|
deleteShardField: "oneShard",
|
|
perNamespaceMetrics: {
|
|
[namespace1]: {inserted: 1, deleted: 1},
|
|
[namespace2]: {inserted: 1, updateCount: 1}
|
|
}
|
|
});
|
|
|
|
coll.drop();
|
|
coll2.drop();
|
|
}
|
|
|
|
{
|
|
const testName = jsTestName();
|
|
const replTest = new ReplSetTest({
|
|
name: testName,
|
|
nodes: [{}, {rsConfig: {priority: 0}}],
|
|
nodeOptions: {
|
|
setParameter: {
|
|
// Required for serverStatus() to have opWriteConcernCounters.
|
|
reportOpWriteConcernCountersInServerStatus: true
|
|
}
|
|
}
|
|
});
|
|
|
|
replTest.startSet();
|
|
replTest.initiate();
|
|
|
|
const retryCount = 3;
|
|
for (const bulkWrite of [false, true]) {
|
|
for (const timeseries of [false, true]) {
|
|
runTest(false /* isMongos */, replTest, bulkWrite, retryCount, timeseries);
|
|
}
|
|
}
|
|
|
|
replTest.stopSet();
|
|
}
|
|
|
|
{
|
|
const st = new ShardingTest({mongos: 1, shards: 3, rs: {nodes: 1}, config: 1});
|
|
|
|
const retryCount = 3;
|
|
for (const bulkWrite of [false, true]) {
|
|
for (const timeseries of [false, true]) {
|
|
runTest(true /* isMongos */, st, bulkWrite, retryCount, timeseries);
|
|
}
|
|
}
|
|
|
|
st.stop();
|
|
}
|