mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-74438 Update BucketMetadata to use time-series metadata value
This commit is contained in:
parent
ca5d2ca126
commit
1e217fd954
113
jstests/noPassthrough/validate_timeseries_bucket_reopening.js
Normal file
113
jstests/noPassthrough/validate_timeseries_bucket_reopening.js
Normal file
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* Validate the state of time-series collections after inserting measurements into reopened buckets.
|
||||
*
|
||||
* We set the 'timeseriesIdleBucketExpiryMemoryUsageThreshold' to a low value and configure the
|
||||
* 'alwaysUseSameBucketCatalogStripe' failpoint to expedite bucket closures and increase the number
|
||||
* of buckets we reopen to insert into.
|
||||
*
|
||||
* @tags: [requires_replication]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
load("jstests/core/timeseries/libs/timeseries.js"); // For 'TimeseriesTest'.
|
||||
load("jstests/libs/fail_point_util.js");
|
||||
|
||||
const rst = new ReplSetTest({nodes: 1});
|
||||
rst.startSet({setParameter: {timeseriesIdleBucketExpiryMemoryUsageThreshold: 1024}});
|
||||
rst.initiate();
|
||||
|
||||
const db = rst.getPrimary().getDB(jsTestName());
|
||||
assert.commandWorked(db.dropDatabase());
|
||||
|
||||
if (!TimeseriesTest.timeseriesScalabilityImprovementsEnabled(db)) {
|
||||
rst.stopSet();
|
||||
jsTestLog(
|
||||
'Skipping test because the TimeseriesScalabilityImprovements feature flag is disabled.');
|
||||
return;
|
||||
}
|
||||
|
||||
const collNamePrefix = db.validate_timeseries_bucket_reopening_;
|
||||
const timeFieldName = 'time';
|
||||
const metaFieldName1 = 'meta';
|
||||
const metaFieldName2 = 'tag';
|
||||
const valueFieldName = 'value';
|
||||
let testCaseId = 0;
|
||||
|
||||
const validateBucketReopening = function(metaFieldName = null) {
|
||||
// Create collection with metaField passed in.
|
||||
let timeseriesOptions = {timeField: timeFieldName};
|
||||
if (metaFieldName != null) {
|
||||
timeseriesOptions = Object.merge(timeseriesOptions, {metaField: metaFieldName});
|
||||
}
|
||||
jsTestLog("Running validateBucketReopening() with timeseriesOptions = " +
|
||||
tojson(timeseriesOptions));
|
||||
|
||||
const coll = db.getCollection(collNamePrefix + testCaseId++);
|
||||
const bucketsColl = db.getCollection('system.buckets.' + coll.getName());
|
||||
coll.drop();
|
||||
assert.commandWorked(db.createCollection(coll.getName(), {timeseries: timeseriesOptions}));
|
||||
assert.contains(bucketsColl.getName(), db.getCollectionNames());
|
||||
|
||||
// Insert documents with varying metaField values (if the 'metaFieldName' is specified).
|
||||
const distinctMetaValues = 10;
|
||||
const numOfPasses = 100;
|
||||
for (let i = 0; i < numOfPasses; ++i) {
|
||||
for (let j = 0; j < distinctMetaValues; ++j) {
|
||||
if (metaFieldName != null) {
|
||||
assert.commandWorked(coll.insert(
|
||||
{[timeFieldName]: ISODate(), [metaFieldName]: j, [valueFieldName]: "a"}));
|
||||
} else {
|
||||
assert.commandWorked(
|
||||
coll.insert({[timeFieldName]: ISODate(), [valueFieldName]: "a"}));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify we inserted measurements into reopened buckets through the time-series stats.
|
||||
const stats = assert.commandWorked(coll.stats());
|
||||
assert(stats.timeseries);
|
||||
if (metaFieldName != null) {
|
||||
// The idea of this test is to insert measurements into reopened buckets so we are looking
|
||||
// to reopen more buckets than we create.
|
||||
assert.lt(stats.timeseries['bucketCount'],
|
||||
stats.timeseries['numBucketsReopened'],
|
||||
"Timeseries stats: " + tojson(stats));
|
||||
assert.lt(stats.timeseries['numBucketsOpenedDueToMetadata'],
|
||||
stats.timeseries['numBucketsReopened'],
|
||||
"Timeseries stats: " + tojson(stats));
|
||||
|
||||
// The number of bucket inserts should be less than the number of bucket updates.
|
||||
assert.lt(stats.timeseries['numBucketInserts'],
|
||||
stats.timeseries['numBucketUpdates'],
|
||||
"Timeseries stats: " + tojson(stats));
|
||||
} else {
|
||||
// When no metaField is specified, all measurements fit in one bucket since there is no need
|
||||
// to close buckets due to metadata.
|
||||
assert.eq(stats.timeseries['bucketCount'], 1, "Timeseries stats: " + tojson(stats));
|
||||
}
|
||||
|
||||
// Finally, validate the collection and ensure there are no inconsistencies.
|
||||
const res = assert.commandWorked(coll.validate());
|
||||
assert.eq(res.valid, true);
|
||||
assert.eq(res.nNonCompliantDocuments, 0);
|
||||
assert.eq(res.nInvalidDocuments, 0);
|
||||
assert.eq(res.errors.length, 0);
|
||||
assert.eq(res.warnings.length, 0);
|
||||
};
|
||||
|
||||
// Activate failpoint to place all buckets in the same stripe in the BucketCatalog.
|
||||
let fpSameStripe = configureFailPoint(db, "alwaysUseSameBucketCatalogStripe");
|
||||
|
||||
// Validate results with no metaField.
|
||||
validateBucketReopening();
|
||||
|
||||
// Validate results with metaField == 'meta'.
|
||||
validateBucketReopening(metaFieldName1);
|
||||
|
||||
// Validate results with metaField == 'tag'.
|
||||
validateBucketReopening(metaFieldName2);
|
||||
|
||||
fpSameStripe.off();
|
||||
rst.stopSet();
|
||||
})();
|
@ -242,9 +242,9 @@ Status BucketCatalog::reopenBucket(OperationContext* opCtx,
|
||||
BSONElement metadata;
|
||||
auto metaFieldName = options->getMetaField();
|
||||
if (metaFieldName) {
|
||||
metadata = bucketDoc.getField(*metaFieldName);
|
||||
metadata = bucketDoc.getField(kBucketMetaFieldName);
|
||||
}
|
||||
auto key = BucketKey{ns, BucketMetadata{metadata, coll->getDefaultCollator()}};
|
||||
auto key = BucketKey{ns, BucketMetadata{metadata, coll->getDefaultCollator(), metaFieldName}};
|
||||
|
||||
// Validate the bucket document against the schema.
|
||||
auto validator = [&](OperationContext * opCtx, const BSONObj& bucketDoc) -> auto {
|
||||
@ -628,7 +628,7 @@ StatusWith<std::pair<BucketKey, Date_t>> BucketCatalog::_extractBucketingParamet
|
||||
|
||||
// Buckets are spread across independently-lockable stripes to improve parallelism. We map a
|
||||
// bucket to a stripe by hashing the BucketKey.
|
||||
auto key = BucketKey{ns, BucketMetadata{metadata, comparator}};
|
||||
auto key = BucketKey{ns, BucketMetadata{metadata, comparator, options.getMetaField()}};
|
||||
|
||||
return {std::make_pair(key, time)};
|
||||
}
|
||||
@ -807,7 +807,7 @@ StatusWith<std::unique_ptr<Bucket>> BucketCatalog::_rehydrateBucket(
|
||||
|
||||
// Buckets are spread across independently-lockable stripes to improve parallelism. We map a
|
||||
// bucket to a stripe by hashing the BucketKey.
|
||||
auto key = BucketKey{ns, BucketMetadata{metadata, comparator}};
|
||||
auto key = BucketKey{ns, BucketMetadata{metadata, comparator, options.getMetaField()}};
|
||||
if (expectedKey.has_value() && key != expectedKey.value()) {
|
||||
return {ErrorCodes::BadValue, "Bucket metadata does not match (hash collision)"};
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ protected:
|
||||
BucketCatalog* _bucketCatalog;
|
||||
|
||||
StringData _timeField = "time";
|
||||
StringData _metaField = "meta";
|
||||
StringData _metaField = "tag";
|
||||
|
||||
NamespaceString _ns1 =
|
||||
NamespaceString::createNamespaceString_forTest("bucket_catalog_test_1", "t_1");
|
||||
@ -1306,6 +1306,56 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement
|
||||
_bucketCatalog->finish(batch, {});
|
||||
}
|
||||
|
||||
TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurementWithMeta) {
|
||||
RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
|
||||
true};
|
||||
// Bucket document to reopen.
|
||||
BSONObj bucketDoc = ::mongo::fromjson(
|
||||
R"({"_id":{"$oid":"629e1e680958e279dc29a642"},
|
||||
"control":{"version":1,"min":{"time":{"$date":"2022-06-06T15:34:00.000Z"},"a":1,"b":1},
|
||||
"max":{"time":{"$date":"2022-06-06T15:34:30.000Z"},"a":3,"b":3}},
|
||||
"meta": 42,
|
||||
"data":{"time":{"0":{"$date":"2022-06-06T15:34:30.000Z"},
|
||||
"1":{"$date":"2022-06-06T15:34:30.000Z"},
|
||||
"2":{"$date":"2022-06-06T15:34:30.000Z"}},
|
||||
"a":{"0":1,"1":2,"2":3},
|
||||
"b":{"0":1,"1":2,"2":3}}})");
|
||||
|
||||
AutoGetCollection autoColl(_opCtx, _ns1.makeTimeseriesBucketsNamespace(), MODE_IX);
|
||||
Status status = _bucketCatalog->reopenBucket(_opCtx, autoColl.getCollection(), bucketDoc);
|
||||
ASSERT_OK(status);
|
||||
|
||||
// Insert a measurement that is compatible with the reopened bucket.
|
||||
auto result = _bucketCatalog->insert(
|
||||
_opCtx,
|
||||
_ns1,
|
||||
_getCollator(_ns1),
|
||||
_getTimeseriesOptions(_ns1),
|
||||
::mongo::fromjson(R"({"time":{"$date":"2022-06-06T15:34:40.000Z"},"tag":42,
|
||||
"a":-100,"b":100})"),
|
||||
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
|
||||
|
||||
// No buckets are closed.
|
||||
ASSERT(result.getValue().closedBuckets.empty());
|
||||
ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges));
|
||||
|
||||
auto batch = result.getValue().batch;
|
||||
ASSERT(claimWriteBatchCommitRights(*batch));
|
||||
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
|
||||
ASSERT_EQ(batch->measurements.size(), 1);
|
||||
|
||||
// The reopened bucket already contains three committed measurements.
|
||||
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 3);
|
||||
|
||||
// Verify that the min and max is updated correctly when inserting new measurements.
|
||||
ASSERT_BSONOBJ_BINARY_EQ(batch->min, BSON("u" << BSON("a" << -100)));
|
||||
ASSERT_BSONOBJ_BINARY_EQ(
|
||||
batch->max,
|
||||
BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100)));
|
||||
|
||||
_bucketCatalog->finish(batch, {});
|
||||
}
|
||||
|
||||
TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasurement) {
|
||||
RAIIServerParameterControllerForTest featureFlag{"featureFlagTimeseriesScalabilityImprovements",
|
||||
true};
|
||||
@ -1618,7 +1668,7 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
|
||||
_ns1,
|
||||
_getCollator(_ns1),
|
||||
_getTimeseriesOptions(_ns1),
|
||||
::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}, "meta": "foo"})"),
|
||||
::mongo::fromjson(R"({"time":{"$date":"2022-06-07T15:34:40.000Z"}, "tag": "foo"})"),
|
||||
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
|
||||
ASSERT_OK(result.getStatus());
|
||||
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold));
|
||||
|
@ -34,13 +34,15 @@
|
||||
|
||||
namespace mongo::timeseries::bucket_catalog {
|
||||
|
||||
BucketMetadata::BucketMetadata(BSONElement elem, const StringData::ComparatorInterface* comparator)
|
||||
BucketMetadata::BucketMetadata(BSONElement elem,
|
||||
const StringData::ComparatorInterface* comparator,
|
||||
boost::optional<StringData> trueMetaFieldName)
|
||||
: _metadataElement(elem), _comparator(comparator) {
|
||||
if (_metadataElement) {
|
||||
BSONObjBuilder objBuilder;
|
||||
// We will get an object of equal size, just with reordered fields.
|
||||
objBuilder.bb().reserveBytes(_metadataElement.size());
|
||||
normalizeMetadata(&objBuilder, _metadataElement, boost::none);
|
||||
normalizeMetadata(&objBuilder, _metadataElement, trueMetaFieldName);
|
||||
_metadata = objBuilder.obj();
|
||||
}
|
||||
// Updates the BSONElement to refer to the copied BSONObj.
|
||||
|
@ -41,7 +41,9 @@ namespace mongo::timeseries::bucket_catalog {
|
||||
struct BucketMetadata {
|
||||
public:
|
||||
BucketMetadata() = default;
|
||||
BucketMetadata(BSONElement elem, const StringData::ComparatorInterface* comparator);
|
||||
BucketMetadata(BSONElement elem,
|
||||
const StringData::ComparatorInterface* comparator,
|
||||
boost::optional<StringData> trueMetaFieldName);
|
||||
|
||||
bool operator==(const BucketMetadata& other) const;
|
||||
bool operator!=(const BucketMetadata& other) const;
|
||||
|
@ -86,7 +86,7 @@ public:
|
||||
NamespaceString ns2 = NamespaceString::createNamespaceString_forTest("db.test2");
|
||||
NamespaceString ns3 = NamespaceString::createNamespaceString_forTest("db.test3");
|
||||
BSONElement elem;
|
||||
BucketMetadata bucketMetadata{elem, nullptr};
|
||||
BucketMetadata bucketMetadata{elem, nullptr, boost::none};
|
||||
BucketKey bucketKey1{ns1, bucketMetadata};
|
||||
BucketKey bucketKey2{ns2, bucketMetadata};
|
||||
BucketKey bucketKey3{ns3, bucketMetadata};
|
||||
|
Loading…
Reference in New Issue
Block a user