0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-24 00:17:37 +01:00
mongodb/jstests/sharding/timeseries_insert.js
auto-revert-app[bot] 8dabd67451 Revert "SERVER-92880 Tag more jstests as owned by query (#29090)" (#29523)
GitOrigin-RevId: d5a1fb056144efac5baf4b3c0b4166ac3bc7944e
2024-11-23 09:10:19 +00:00

203 lines
6.9 KiB
JavaScript

/**
* Test inserts into sharded timeseries collection.
*
* @tags: [
* # TODO (SERVER-70605): Remove this tag once the time-series always compressed buckets feature
* # flag can be removed.
* multiversion_incompatible,
* ]
*/
import {TimeseriesTest} from "jstests/core/timeseries/libs/timeseries.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
Random.setRandomSeed();
const dbName = 'testDB';
const collName = 'testColl';
const timeField = 'time';
const metaField = 'hostid';
// Connections.
const st = new ShardingTest({shards: 2, rs: {nodes: 2}});
const mongos = st.s0;
// Databases and collections.
assert.commandWorked(mongos.adminCommand({enableSharding: dbName}));
const mainDB = mongos.getDB(dbName);
// Helpers.
let currentId = 0;
function generateId() {
return currentId++;
}
function generateBatch(size) {
return TimeseriesTest.generateHosts(size).map((host, index) => Object.assign(host, {
_id: generateId(),
[metaField]: index,
[timeField]: ISODate(`20${index}0-01-01`),
}));
}
function verifyBucketsOnShard(shard, expectedBuckets) {
const buckets =
shard.getDB(dbName).getCollection(`system.buckets.${collName}`).find({}).toArray();
assert.eq(buckets.length, expectedBuckets.length, tojson(buckets));
const usedBucketIds = new Set();
for (const expectedBucket of expectedBuckets) {
let found = false;
for (const bucket of buckets) {
if (!usedBucketIds.has(bucket._id) && expectedBucket.meta === bucket.meta &&
expectedBucket.minTime.toString() === bucket.control.min[timeField].toString() &&
expectedBucket.maxTime.toString() === bucket.control.max[timeField].toString()) {
found = true;
usedBucketIds.add(bucket._id);
break;
}
}
assert(
found,
"Failed to find bucket " + tojson(expectedBucket) + " in the list " + tojson(buckets));
}
}
function runTest(getShardKey, insert) {
assert.commandWorked(mainDB.createCollection(
collName, {timeseries: {timeField: timeField, metaField: metaField}}));
const coll = mainDB.getCollection(collName);
// The 'isTimeseriesNamespace' parameter is not allowed on mongos.
assert.commandFailedWithCode(mainDB.runCommand({
insert: `system.buckets.${collName}`,
documents: [{[timeField]: ISODate()}],
isTimeseriesNamespace: true
}),
[5916401, 7934201]);
// On a mongod node, 'isTimeseriesNamespace' can only be used on time-series buckets namespace.
assert.commandFailedWithCode(
st.shard0.getDB(dbName).runCommand(
{insert: collName, documents: [{[timeField]: ISODate()}], isTimeseriesNamespace: true}),
[5916400, 7934201]);
// Shard timeseries collection.
const shardKey = getShardKey(1, 1);
assert.commandWorked(coll.createIndex(shardKey));
assert.commandWorked(mongos.adminCommand({
shardCollection: `${dbName}.${collName}`,
key: shardKey,
}));
// Insert initial set of documents.
const numDocs = 4;
const firstBatch = generateBatch(numDocs);
assert.commandWorked(insert(coll, firstBatch));
// Manually split the data into two chunks.
const splitIndex = numDocs / 2;
const splitPoint = {};
if (shardKey.hasOwnProperty(metaField)) {
splitPoint.meta = firstBatch[splitIndex][metaField];
}
if (shardKey.hasOwnProperty(timeField)) {
splitPoint[`control.min.${timeField}`] = firstBatch[splitIndex][timeField];
}
assert.commandWorked(
mongos.adminCommand({split: `${dbName}.system.buckets.${collName}`, middle: splitPoint}));
// Ensure that currently both chunks reside on the primary shard.
let counts = st.chunkCounts(`system.buckets.${collName}`, dbName);
const primaryShard = st.getPrimaryShard(dbName);
assert.eq(2, counts[primaryShard.shardName], counts);
// Move one of the chunks into the second shard.
const otherShard = st.getOther(primaryShard);
assert.commandWorked(mongos.adminCommand({
movechunk: `${dbName}.system.buckets.${collName}`,
find: splitPoint,
to: otherShard.name,
_waitForDelete: true
}));
// Ensure that each shard owns one chunk.
counts = st.chunkCounts(`system.buckets.${collName}`, dbName);
assert.eq(1, counts[primaryShard.shardName], counts);
assert.eq(1, counts[otherShard.shardName], counts);
// Ensure that each shard has only 2 buckets.
const primaryBuckets = [];
const otherBuckets = [];
for (let index = 0; index < numDocs; index++) {
const doc = firstBatch[index];
const bucket = {
meta: doc[metaField],
minTime: doc[timeField],
maxTime: doc[timeField],
};
if (index < splitIndex) {
primaryBuckets.push(bucket);
} else {
otherBuckets.push(bucket);
}
}
verifyBucketsOnShard(primaryShard, primaryBuckets);
verifyBucketsOnShard(otherShard, otherBuckets);
// Ensure that after chunk migration all documents are still available.
assert.docEq(firstBatch, coll.find().sort({_id: 1}).toArray());
// Insert more documents with the same meta value range. These inserts should create new buckets
// because we cannot update any bucket after a chunk migration.
const secondBatch = generateBatch(numDocs);
assert.commandWorked(insert(coll, secondBatch));
const thirdBatch = generateBatch(numDocs);
assert.commandWorked(insert(coll, thirdBatch));
// With bucket reopening enabled, we can insert measurements into buckets after a chunk
// migration.
verifyBucketsOnShard(primaryShard, primaryBuckets);
verifyBucketsOnShard(otherShard, otherBuckets);
// Check that both old documents and newly inserted documents are available.
const allDocuments = firstBatch.concat(secondBatch).concat(thirdBatch);
assert.docEq(allDocuments, coll.find().sort({_id: 1}).toArray());
// Check queries with shard key.
for (let index = 0; index < numDocs; index++) {
const expectedDocuments = [firstBatch[index], secondBatch[index], thirdBatch[index]];
const actualDocuments =
coll.find(getShardKey(firstBatch[index][metaField], firstBatch[index][timeField]))
.sort({_id: 1})
.toArray();
assert.docEq(expectedDocuments, actualDocuments);
}
assert(coll.drop());
}
try {
TimeseriesTest.run((insert) => {
function metaShardKey(meta, _) {
return {[metaField]: meta};
}
runTest(metaShardKey, insert);
function timeShardKey(_, time) {
return {[timeField]: time};
}
runTest(timeShardKey, insert);
function timeAndMetaShardKey(meta, time) {
return {[metaField]: meta, [timeField]: time};
}
runTest(timeAndMetaShardKey, insert);
}, mainDB);
} finally {
st.stop();
}