mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-64319 Fix the name hiding issue for usages of ShardingCatalogManager::withTransaction
This commit is contained in:
parent
32192ca71d
commit
c6e976d9f6
@ -1716,15 +1716,16 @@ void ShardingCatalogManager::bumpMultipleCollectionVersionsAndChangeMetadataInTx
|
||||
// migrations
|
||||
Lock::ExclusiveLock lk(opCtx, opCtx->lockState(), _kChunkOpLock);
|
||||
|
||||
withTransaction(opCtx,
|
||||
NamespaceString::kConfigReshardingOperationsNamespace,
|
||||
[&](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
for (const auto& nss : collNames) {
|
||||
bumpCollectionMinorVersion(opCtx, nss, txnNumber);
|
||||
}
|
||||
changeMetadataFunc(opCtx, txnNumber);
|
||||
},
|
||||
writeConcern);
|
||||
withTransaction(
|
||||
opCtx,
|
||||
NamespaceString::kConfigReshardingOperationsNamespace,
|
||||
[&collNames, &changeMetadataFunc](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
for (const auto& nss : collNames) {
|
||||
bumpCollectionMinorVersion(opCtx, nss, txnNumber);
|
||||
}
|
||||
changeMetadataFunc(opCtx, txnNumber);
|
||||
},
|
||||
writeConcern);
|
||||
}
|
||||
|
||||
void ShardingCatalogManager::splitOrMarkJumbo(OperationContext* opCtx,
|
||||
@ -1832,7 +1833,10 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(
|
||||
|
||||
cm.getAllShardIds(&shardsIds);
|
||||
withTransaction(
|
||||
opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
opCtx,
|
||||
CollectionType::ConfigNS,
|
||||
[this, allowMigrations, &nss, &collectionUUID](OperationContext* opCtx,
|
||||
TxnNumber txnNumber) {
|
||||
// Update the 'allowMigrations' field. An unset 'allowMigrations' field implies
|
||||
// 'true'. To ease backwards compatibility we omit 'allowMigrations' instead of
|
||||
// setting it explicitly to 'true'.
|
||||
|
@ -325,79 +325,80 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
|
||||
Timestamp newTimestamp = now.clusterTime().asTimestamp();
|
||||
collType.setTimestamp(newTimestamp);
|
||||
|
||||
auto updateCollectionAndChunksFn = [&](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
// Update the config.collections entry for the given namespace.
|
||||
updateShardingCatalogEntryForCollectionInTxn(
|
||||
opCtx, nss, collType, false /* upsert */, txnNumber);
|
||||
auto updateCollectionAndChunksFn =
|
||||
[this, &nss, &collType, &timers, &newFields](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
// Update the config.collections entry for the given namespace.
|
||||
updateShardingCatalogEntryForCollectionInTxn(
|
||||
opCtx, nss, collType, false /* upsert */, txnNumber);
|
||||
|
||||
LOGV2(21933,
|
||||
"refineCollectionShardKey updated collection entry for {namespace}: took "
|
||||
"{durationMillis} ms. Total time taken: {totalTimeMillis} ms.",
|
||||
"refineCollectionShardKey updated collection entry",
|
||||
"namespace"_attr = nss.ns(),
|
||||
"durationMillis"_attr = timers->executionTimer.millis(),
|
||||
"totalTimeMillis"_attr = timers->totalTimer.millis());
|
||||
timers->executionTimer.reset();
|
||||
LOGV2(21933,
|
||||
"refineCollectionShardKey updated collection entry for {namespace}: took "
|
||||
"{durationMillis} ms. Total time taken: {totalTimeMillis} ms.",
|
||||
"refineCollectionShardKey updated collection entry",
|
||||
"namespace"_attr = nss.ns(),
|
||||
"durationMillis"_attr = timers->executionTimer.millis(),
|
||||
"totalTimeMillis"_attr = timers->totalTimer.millis());
|
||||
timers->executionTimer.reset();
|
||||
|
||||
if (MONGO_unlikely(hangRefineCollectionShardKeyBeforeUpdatingChunks.shouldFail())) {
|
||||
LOGV2(21934, "Hit hangRefineCollectionShardKeyBeforeUpdatingChunks failpoint");
|
||||
hangRefineCollectionShardKeyBeforeUpdatingChunks.pauseWhileSet(opCtx);
|
||||
}
|
||||
if (MONGO_unlikely(hangRefineCollectionShardKeyBeforeUpdatingChunks.shouldFail())) {
|
||||
LOGV2(21934, "Hit hangRefineCollectionShardKeyBeforeUpdatingChunks failpoint");
|
||||
hangRefineCollectionShardKeyBeforeUpdatingChunks.pauseWhileSet(opCtx);
|
||||
}
|
||||
|
||||
auto [chunkUpdates, tagUpdates] = makeChunkAndTagUpdatesForRefine(newFields);
|
||||
auto [chunkUpdates, tagUpdates] = makeChunkAndTagUpdatesForRefine(newFields);
|
||||
|
||||
// Update all config.chunks entries for the given namespace by setting (i) their bounds for
|
||||
// each new field in the refined key to MinKey (except for the global max chunk where the
|
||||
// max bounds are set to MaxKey), and unsetting (ii) their jumbo field.
|
||||
const auto chunksQuery = BSON(ChunkType::collectionUUID << collType.getUuid());
|
||||
writeToConfigDocumentInTxn(
|
||||
opCtx,
|
||||
ChunkType::ConfigNS,
|
||||
BatchedCommandRequest::buildPipelineUpdateOp(ChunkType::ConfigNS,
|
||||
chunksQuery,
|
||||
chunkUpdates,
|
||||
false, // upsert
|
||||
true // useMultiUpdate
|
||||
),
|
||||
txnNumber);
|
||||
// Update all config.chunks entries for the given namespace by setting (i) their bounds
|
||||
// for each new field in the refined key to MinKey (except for the global max chunk
|
||||
// where the max bounds are set to MaxKey), and unsetting (ii) their jumbo field.
|
||||
const auto chunksQuery = BSON(ChunkType::collectionUUID << collType.getUuid());
|
||||
writeToConfigDocumentInTxn(
|
||||
opCtx,
|
||||
ChunkType::ConfigNS,
|
||||
BatchedCommandRequest::buildPipelineUpdateOp(ChunkType::ConfigNS,
|
||||
chunksQuery,
|
||||
chunkUpdates,
|
||||
false, // upsert
|
||||
true // useMultiUpdate
|
||||
),
|
||||
txnNumber);
|
||||
|
||||
LOGV2(21935,
|
||||
"refineCollectionShardKey: updated chunk entries for {namespace}: took "
|
||||
"{durationMillis} ms. Total time taken: {totalTimeMillis} ms.",
|
||||
"refineCollectionShardKey: updated chunk entries",
|
||||
"namespace"_attr = nss.ns(),
|
||||
"durationMillis"_attr = timers->executionTimer.millis(),
|
||||
"totalTimeMillis"_attr = timers->totalTimer.millis());
|
||||
timers->executionTimer.reset();
|
||||
LOGV2(21935,
|
||||
"refineCollectionShardKey: updated chunk entries for {namespace}: took "
|
||||
"{durationMillis} ms. Total time taken: {totalTimeMillis} ms.",
|
||||
"refineCollectionShardKey: updated chunk entries",
|
||||
"namespace"_attr = nss.ns(),
|
||||
"durationMillis"_attr = timers->executionTimer.millis(),
|
||||
"totalTimeMillis"_attr = timers->totalTimer.millis());
|
||||
timers->executionTimer.reset();
|
||||
|
||||
// Update all config.tags entries for the given namespace by setting their bounds for
|
||||
// each new field in the refined key to MinKey (except for the global max tag where the
|
||||
// max bounds are set to MaxKey).
|
||||
writeToConfigDocumentInTxn(
|
||||
opCtx,
|
||||
TagsType::ConfigNS,
|
||||
BatchedCommandRequest::buildPipelineUpdateOp(TagsType::ConfigNS,
|
||||
BSON("ns" << nss.ns()),
|
||||
tagUpdates,
|
||||
false, // upsert
|
||||
true // useMultiUpdate
|
||||
),
|
||||
txnNumber);
|
||||
// Update all config.tags entries for the given namespace by setting their bounds for
|
||||
// each new field in the refined key to MinKey (except for the global max tag where the
|
||||
// max bounds are set to MaxKey).
|
||||
writeToConfigDocumentInTxn(
|
||||
opCtx,
|
||||
TagsType::ConfigNS,
|
||||
BatchedCommandRequest::buildPipelineUpdateOp(TagsType::ConfigNS,
|
||||
BSON("ns" << nss.ns()),
|
||||
tagUpdates,
|
||||
false, // upsert
|
||||
true // useMultiUpdate
|
||||
),
|
||||
txnNumber);
|
||||
|
||||
|
||||
LOGV2(21936,
|
||||
"refineCollectionShardKey: updated zone entries for {namespace}: took "
|
||||
"{durationMillis} ms. Total time taken: {totalTimeMillis} ms.",
|
||||
"refineCollectionShardKey: updated zone entries",
|
||||
"namespace"_attr = nss.ns(),
|
||||
"durationMillis"_attr = timers->executionTimer.millis(),
|
||||
"totalTimeMillis"_attr = timers->totalTimer.millis());
|
||||
LOGV2(21936,
|
||||
"refineCollectionShardKey: updated zone entries for {namespace}: took "
|
||||
"{durationMillis} ms. Total time taken: {totalTimeMillis} ms.",
|
||||
"refineCollectionShardKey: updated zone entries",
|
||||
"namespace"_attr = nss.ns(),
|
||||
"durationMillis"_attr = timers->executionTimer.millis(),
|
||||
"totalTimeMillis"_attr = timers->totalTimer.millis());
|
||||
|
||||
if (MONGO_unlikely(hangRefineCollectionShardKeyBeforeCommit.shouldFail())) {
|
||||
LOGV2(21937, "Hit hangRefineCollectionShardKeyBeforeCommit failpoint");
|
||||
hangRefineCollectionShardKeyBeforeCommit.pauseWhileSet(opCtx);
|
||||
}
|
||||
};
|
||||
if (MONGO_unlikely(hangRefineCollectionShardKeyBeforeCommit.shouldFail())) {
|
||||
LOGV2(21937, "Hit hangRefineCollectionShardKeyBeforeCommit failpoint");
|
||||
hangRefineCollectionShardKeyBeforeCommit.pauseWhileSet(opCtx);
|
||||
}
|
||||
};
|
||||
|
||||
auto updateCollectionAndChunksWithAPIFn =
|
||||
[collType, newFields, nss, timers](const txn_api::TransactionClient& txnClient,
|
||||
@ -619,26 +620,27 @@ void ShardingCatalogManager::configureCollectionBalancing(
|
||||
// migrations
|
||||
Lock::ExclusiveLock lk(opCtx, opCtx->lockState(), _kChunkOpLock);
|
||||
|
||||
withTransaction(
|
||||
opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
const auto query = BSON(CollectionType::kNssFieldName << nss.ns());
|
||||
const auto res = writeToConfigDocumentInTxn(
|
||||
opCtx,
|
||||
CollectionType::ConfigNS,
|
||||
BatchedCommandRequest::buildUpdateOp(CollectionType::ConfigNS,
|
||||
query,
|
||||
update /* update */,
|
||||
false /* upsert */,
|
||||
false /* multi */),
|
||||
txnNumber);
|
||||
const auto numDocsModified = UpdateOp::parseResponse(res).getN();
|
||||
uassert(ErrorCodes::ConflictingOperationInProgress,
|
||||
str::stream() << "Expected to match one doc for query " << query
|
||||
<< " but matched " << numDocsModified,
|
||||
numDocsModified == 1);
|
||||
withTransaction(opCtx,
|
||||
CollectionType::ConfigNS,
|
||||
[this, &nss, &update](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
const auto query = BSON(CollectionType::kNssFieldName << nss.ns());
|
||||
const auto res = writeToConfigDocumentInTxn(
|
||||
opCtx,
|
||||
CollectionType::ConfigNS,
|
||||
BatchedCommandRequest::buildUpdateOp(CollectionType::ConfigNS,
|
||||
query,
|
||||
update /* update */,
|
||||
false /* upsert */,
|
||||
false /* multi */),
|
||||
txnNumber);
|
||||
const auto numDocsModified = UpdateOp::parseResponse(res).getN();
|
||||
uassert(ErrorCodes::ConflictingOperationInProgress,
|
||||
str::stream() << "Expected to match one doc for query " << query
|
||||
<< " but matched " << numDocsModified,
|
||||
numDocsModified == 1);
|
||||
|
||||
bumpCollectionMinorVersionInTxn(opCtx, nss, txnNumber);
|
||||
});
|
||||
bumpCollectionMinorVersionInTxn(opCtx, nss, txnNumber);
|
||||
});
|
||||
// Now any migrations that change the list of shards will see the results of the transaction
|
||||
// during refresh, so it is safe to release the chunk lock.
|
||||
}
|
||||
@ -705,7 +707,9 @@ void ShardingCatalogManager::updateTimeSeriesGranularity(OperationContext* opCtx
|
||||
cm.getAllShardIds(&shardIds);
|
||||
|
||||
withTransaction(
|
||||
opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
opCtx,
|
||||
CollectionType::ConfigNS,
|
||||
[this, &nss, granularity, &shardIds](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
// Update granularity value in config.collections.
|
||||
auto granularityFieldName = CollectionType::kTimeseriesFieldsFieldName + "." +
|
||||
TypeCollectionTimeseriesFields::kGranularityFieldName;
|
||||
|
@ -710,21 +710,24 @@ void writeDecisionPersistedState(OperationContext* opCtx,
|
||||
Timestamp newCollectionTimestamp) {
|
||||
|
||||
// No need to bump originalNss version because its epoch will be changed.
|
||||
executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
// Update the config.reshardingOperations entry
|
||||
writeToCoordinatorStateNss(opCtx, metrics, coordinatorDoc, txnNumber);
|
||||
executeMetadataChangesInTxn(
|
||||
opCtx,
|
||||
[&metrics, &coordinatorDoc, &newCollectionEpoch, &newCollectionTimestamp](
|
||||
OperationContext* opCtx, TxnNumber txnNumber) {
|
||||
// Update the config.reshardingOperations entry
|
||||
writeToCoordinatorStateNss(opCtx, metrics, coordinatorDoc, txnNumber);
|
||||
|
||||
// Remove the config.collections entry for the temporary collection
|
||||
writeToConfigCollectionsForTempNss(
|
||||
opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
|
||||
// Remove the config.collections entry for the temporary collection
|
||||
writeToConfigCollectionsForTempNss(
|
||||
opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
|
||||
|
||||
// Update the config.collections entry for the original namespace to reflect the new
|
||||
// shard key, new epoch, and new UUID
|
||||
updateConfigCollectionsForOriginalNss(
|
||||
opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
|
||||
// Update the config.collections entry for the original namespace to reflect the new
|
||||
// shard key, new epoch, and new UUID
|
||||
updateConfigCollectionsForOriginalNss(
|
||||
opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
|
||||
|
||||
updateChunkAndTagsDocsForTempNss(opCtx, coordinatorDoc, newCollectionEpoch, txnNumber);
|
||||
});
|
||||
updateChunkAndTagsDocsForTempNss(opCtx, coordinatorDoc, newCollectionEpoch, txnNumber);
|
||||
});
|
||||
}
|
||||
|
||||
void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx,
|
||||
|
Loading…
Reference in New Issue
Block a user