0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

SERVER-42299 Batch chunk and tag upgrade/downgrade modification transactions

This commit is contained in:
Jack Mulrow 2019-10-16 19:50:41 +00:00 committed by evergreen
parent 98676262fd
commit 71fd21e91f
6 changed files with 185 additions and 166 deletions

View File

@ -138,5 +138,36 @@ function runInProgressSetFCVTest(st, {initialFCV, desiredFCV}) {
runInProgressSetFCVTest(st, {initialFCV: latestFCV, desiredFCV: lastStableFCV});
runInProgressSetFCVTest(st, {initialFCV: lastStableFCV, desiredFCV: latestFCV});
//
// Test setFCV with many chunks and tags.
//
// Set up collections with the same number of chunks and zones as the batch limit for the
// transactions used to modify chunks and zones documents and with more than the limit to verify the
// batching logic in both cases.
const txnBatchSize = 100;
setUpCollectionWithManyChunksAndZones(
st, dbName + ".many_at_batch_size", txnBatchSize /* numChunks */, txnBatchSize /* numZones */);
setUpCollectionWithManyChunksAndZones(st,
dbName + ".many_over_batch_size",
txnBatchSize + 5 /* numChunks */,
txnBatchSize + 5 /* numZones */);
checkFCV(configPrimary.getDB("admin"), latestFCV);
verifyChunks(st, {expectNewFormat: true});
jsTestLog("Downgrading FCV to last stable with many chunks and zones");
assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: lastStableFCV}));
checkFCV(configPrimary.getDB("admin"), lastStableFCV);
verifyChunks(st, {expectNewFormat: false});
jsTestLog("Upgrading FCV to latest with many chunks and zones");
assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
checkFCV(configPrimary.getDB("admin"), latestFCV);
verifyChunks(st, {expectNewFormat: true});
st.stop();
}());

View File

@ -53,6 +53,17 @@ setUpCollectionForZoneTesting(st, zoneNs);
// for every sharded collection.
setUpExtraShardedCollections(st, "extra_db" /* dbName */);
// Set up collections with the same number of chunks and zones as the batch limit for the
// transactions used to modify chunks and zones documents and with more than the limit to verify the
// batching logic in both cases.
const txnBatchSize = 100;
setUpCollectionWithManyChunksAndZones(
st, dbName + ".many_at_batch_size", txnBatchSize /* numChunks */, txnBatchSize /* numZones */);
setUpCollectionWithManyChunksAndZones(st,
dbName + ".many_over_batch_size",
txnBatchSize + 5 /* numChunks */,
txnBatchSize + 5 /* numZones */);
//
// Upgrade back to the latest version.
//

View File

@ -24,6 +24,22 @@ function setUpCollectionForZoneTesting(st, ns) {
st.s.adminCommand({updateZoneKeyRange: ns, min: {_id: 0}, max: {_id: 50}, zone: "zone1"}));
}
// Sets up a sharded collection with the given number of chunks and zones.
function setUpCollectionWithManyChunksAndZones(st, ns, numChunks, numZones) {
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
for (let i = 0; i < numChunks - 1; i++) {
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: i}}));
}
for (let i = 0; i < numZones; i++) {
assert.commandWorked(
st.s.adminCommand({addShardToZone: st.shard0.shardName, zone: "many_zones-" + i}));
assert.commandWorked(st.s.adminCommand(
{updateZoneKeyRange: ns, min: {_id: i}, max: {_id: i + 1}, zone: "many_zones-" + i}));
}
}
function setUpExtraShardedCollections(st, dbName) {
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
st.ensurePrimaryShard(dbName, st.shard1.shardName);

View File

@ -199,7 +199,8 @@ public:
log() << "Hit pauseBeforeUpgradingConfigMetadata";
pauseBeforeUpgradingConfigMetadata.pauseWhileSet(opCtx);
}
ShardingCatalogManager::get(opCtx)->upgradeChunksAndTags(opCtx);
ShardingCatalogManager::get(opCtx)->upgradeOrDowngradeChunksAndTags(
opCtx, ShardingCatalogManager::ConfigUpgradeType::kUpgrade);
}
FeatureCompatibilityVersion::unsetTargetUpgradeOrDowngrade(opCtx, requestedVersion);
@ -255,7 +256,8 @@ public:
log() << "Hit pauseBeforeDowngradingConfigMetadata";
pauseBeforeDowngradingConfigMetadata.pauseWhileSet(opCtx);
}
ShardingCatalogManager::get(opCtx)->downgradeChunksAndTags(opCtx);
ShardingCatalogManager::get(opCtx)->upgradeOrDowngradeChunksAndTags(
opCtx, ShardingCatalogManager::ConfigUpgradeType::kDowngrade);
}
FeatureCompatibilityVersion::unsetTargetUpgradeOrDowngrade(opCtx, requestedVersion);

View File

@ -44,11 +44,13 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_lockpings.h"
#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/database_version_gen.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@ -405,171 +407,134 @@ Lock::ExclusiveLock ShardingCatalogManager::lockZoneMutex(OperationContext* opCt
return lk;
}
void ShardingCatalogManager::upgradeChunksAndTags(OperationContext* opCtx) {
// Upgrade each chunk document by deleting and re-inserting with the 4.4 _id format.
{
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
// TODO SERVER-44034: Remove this function.
void deleteAndInsertChunk(OperationContext* opCtx,
const BSONObj& chunkDoc,
bool startTransaction,
TxnNumber txnNumber,
ShardingCatalogManager::ConfigUpgradeType upgradeType) {
auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkDoc));
auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findResponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
ChunkType::ConfigNS,
{},
{},
boost::none /* limit */));
removeDocumentsInLocalTxn(
opCtx,
ChunkType::ConfigNS,
BSON(ChunkType::ns(chunk.getNS().ns()) << ChunkType::min(chunk.getMin())),
startTransaction,
txnNumber);
AlternativeSessionRegion asr(opCtx);
AuthorizationSession::get(asr.opCtx()->getClient())
->grantInternalAuthorization(asr.opCtx()->getClient());
TxnNumber txnNumber = 0;
for (const auto& chunkObj : findResponse.docs) {
auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkObj));
insertDocumentsInLocalTxn(
opCtx,
ChunkType::ConfigNS,
{upgradeType == ShardingCatalogManager::ConfigUpgradeType::kUpgrade
// Note that ChunkType::toConfigBSON() will not include an _id if one hasn't been set,
// which will be the case for chunks written in the 4.2 format because parsing ignores
// _ids in the 4.2 format, so the insert path will generate one for us.
? chunk.toConfigBSON()
: chunk.toConfigBSONLegacyID()},
false /* startTransaction */,
txnNumber);
}
removeDocumentsInLocalTxn(
asr.opCtx(),
ChunkType::ConfigNS,
BSON(ChunkType::ns(chunk.getNS().ns()) << ChunkType::min(chunk.getMin())),
true /* startTransaction */,
txnNumber);
// TODO SERVER-44034: Remove this function.
void deleteAndInsertTag(OperationContext* opCtx,
const BSONObj& tagDoc,
bool startTransaction,
TxnNumber txnNumber,
ShardingCatalogManager::ConfigUpgradeType upgradeType) {
auto tag = uassertStatusOK(TagsType::fromBSON(tagDoc));
// Note that ChunkType::toConfigBSON() will not include an _id if one hasn't been set,
// which will be the case for chunks written in the 4.2 format because parsing ignores
// _ids in the 4.2 format, so the insert path will generate one for us.
insertDocumentsInLocalTxn(asr.opCtx(),
ChunkType::ConfigNS,
{chunk.toConfigBSON()},
false /* startTransaction */,
txnNumber);
removeDocumentsInLocalTxn(
opCtx,
TagsType::ConfigNS,
BSON(TagsType::ns(tag.getNS().ns()) << TagsType::min(tag.getMinKey())),
startTransaction,
txnNumber);
insertDocumentsInLocalTxn(opCtx,
TagsType::ConfigNS,
{upgradeType == ShardingCatalogManager::ConfigUpgradeType::kUpgrade
// Note that TagsType::toBSON() will not include an _id, so the
// insert path will generate one for us.
? tag.toBSON()
: tag.toBSONLegacyID()},
false /* startTransaction */,
txnNumber);
}
// TODO SERVER-44034: Remove this function and type.
using ConfigDocModFunction = std::function<void(
OperationContext*, BSONObj, bool, TxnNumber, ShardingCatalogManager::ConfigUpgradeType)>;
void forEachConfigDocInBatchedTransactions(OperationContext* opCtx,
const NamespaceString& configNss,
const NamespaceString& shardedCollNss,
ConfigDocModFunction configDocModFn,
ShardingCatalogManager::ConfigUpgradeType upgradeType) {
const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findResponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
configNss,
BSON("ns" << shardedCollNss.ns()),
{},
boost::none /* limit */));
AlternativeSessionRegion asr(opCtx);
AuthorizationSession::get(asr.opCtx()->getClient())
->grantInternalAuthorization(asr.opCtx()->getClient());
TxnNumber txnNumber = 0;
const auto batchSizeLimit = 100;
auto currentBatchSize = 0;
for (const auto& doc : findResponse.docs) {
auto startTransaction = currentBatchSize == 0;
configDocModFn(asr.opCtx(), doc, startTransaction, txnNumber, upgradeType);
currentBatchSize += 1;
if (currentBatchSize == batchSizeLimit) {
commitLocalTxn(asr.opCtx(), txnNumber);
txnNumber += 1;
currentBatchSize = 0;
}
}
// Upgrade each tag document by deleting and re-inserting with the 4.4 _id format.
{
Lock::ExclusiveLock lk(opCtx->lockState(), _kZoneOpLock);
auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findResponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
TagsType::ConfigNS,
{},
{},
boost::none /* limit */));
AlternativeSessionRegion asr(opCtx);
AuthorizationSession::get(asr.opCtx()->getClient())
->grantInternalAuthorization(asr.opCtx()->getClient());
TxnNumber txnNumber = 0;
for (const auto& tagObj : findResponse.docs) {
auto tag = uassertStatusOK(TagsType::fromBSON(tagObj));
removeDocumentsInLocalTxn(
asr.opCtx(),
TagsType::ConfigNS,
BSON(TagsType::ns(tag.getNS().ns()) << TagsType::min(tag.getMinKey())),
true /* startTransaction */,
txnNumber);
// Note that TagsType::toBSON() will not include an _id, so the insert path will
// generate one for us.
insertDocumentsInLocalTxn(asr.opCtx(),
TagsType::ConfigNS,
{tag.toBSON()},
false /* startTransaction */,
txnNumber);
commitLocalTxn(asr.opCtx(), txnNumber);
txnNumber += 1;
}
if (currentBatchSize != 0) {
commitLocalTxn(asr.opCtx(), txnNumber);
}
}
void ShardingCatalogManager::downgradeChunksAndTags(OperationContext* opCtx) {
// Downgrade each chunk document by deleting and re-inserting with the 4.2 _id format.
{
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
void ShardingCatalogManager::upgradeOrDowngradeChunksAndTags(OperationContext* opCtx,
ConfigUpgradeType upgradeType) {
const auto grid = Grid::get(opCtx);
auto allDbs = uassertStatusOK(grid->catalogClient()->getAllDBs(
opCtx, repl::ReadConcernLevel::kLocalReadConcern))
.value;
auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findResponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
ChunkType::ConfigNS,
{},
{},
boost::none /* limit */));
// The 'config' database contains the sharded 'config.system.sessions' collection but does not
// have an entry in config.databases.
allDbs.emplace_back("config", ShardId("config"), true, DatabaseVersion());
AlternativeSessionRegion asr(opCtx);
AuthorizationSession::get(asr.opCtx()->getClient())
->grantInternalAuthorization(asr.opCtx()->getClient());
TxnNumber txnNumber = 0;
for (const auto& chunkObj : findResponse.docs) {
auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkObj));
for (const auto& db : allDbs) {
auto collections = uassertStatusOK(grid->catalogClient()->getCollections(
opCtx, &db.getName(), nullptr, repl::ReadConcernLevel::kLocalReadConcern));
removeDocumentsInLocalTxn(
asr.opCtx(),
ChunkType::ConfigNS,
BSON(ChunkType::ns(chunk.getNS().ns()) << ChunkType::min(chunk.getMin())),
true /* startTransaction */,
txnNumber);
for (const auto& coll : collections) {
if (coll.getDropped()) {
continue;
}
insertDocumentsInLocalTxn(asr.opCtx(),
ChunkType::ConfigNS,
{chunk.toConfigBSONLegacyID()},
false /* startTransaction */,
txnNumber);
{
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
forEachConfigDocInBatchedTransactions(
opCtx, ChunkType::ConfigNS, coll.getNs(), deleteAndInsertChunk, upgradeType);
}
commitLocalTxn(asr.opCtx(), txnNumber);
txnNumber += 1;
}
}
// Downgrade each tag document by deleting and re-inserting with the 4.2 _id format.
{
Lock::ExclusiveLock lk(opCtx->lockState(), _kZoneOpLock);
auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto findResponse = uassertStatusOK(
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
TagsType::ConfigNS,
{},
{},
boost::none /* limit */));
AlternativeSessionRegion asr(opCtx);
AuthorizationSession::get(asr.opCtx()->getClient())
->grantInternalAuthorization(asr.opCtx()->getClient());
TxnNumber txnNumber = 0;
for (const auto& tagObj : findResponse.docs) {
auto tag = uassertStatusOK(TagsType::fromBSON(tagObj));
removeDocumentsInLocalTxn(
asr.opCtx(),
TagsType::ConfigNS,
BSON(TagsType::ns(tag.getNS().ns()) << TagsType::min(tag.getMinKey())),
true /* startTransaction */,
txnNumber);
insertDocumentsInLocalTxn(asr.opCtx(),
TagsType::ConfigNS,
{tag.toBSONLegacyID()},
false /* startTransaction */,
txnNumber);
commitLocalTxn(asr.opCtx(), txnNumber);
txnNumber += 1;
{
Lock::ExclusiveLock lk(opCtx->lockState(), _kZoneOpLock);
forEachConfigDocInBatchedTransactions(
opCtx, TagsType::ConfigNS, coll.getNs(), deleteAndInsertTag, upgradeType);
}
}
}
}

View File

@ -78,6 +78,14 @@ public:
std::unique_ptr<executor::TaskExecutor> addShardExecutor);
~ShardingCatalogManager();
/**
* Indicates the desired modification to the config.chunks and config.tags collections during
* setFeatureCompatibilityVersion.
*
* TODO SERVER-44034: Remove this enum.
*/
enum class ConfigUpgradeType { kUpgrade, kDowngrade };
/**
* Instantiates an instance of the sharding catalog manager and installs it on the specified
* service context. This method is not thread-safe and must be called only once when the service
@ -417,26 +425,12 @@ public:
static void clearForTests(ServiceContext* serviceContext);
/**
* Changes the _id format of all documents in config.chunks and config.tags to use the format
* introduced in 4.4.
* Changes the _id format of all documents in config.chunks and config.tags to use either the
* format introduced in 4.4 or the format expected by a 4.2 binary.
*
* TODO SERVER-44034: Remove this method.
*
* TODO SERVER-42299: Optimize this method by batching inserts and deletes into larger
* transactions.
*/
void upgradeChunksAndTags(OperationContext* opCtx);
/**
* Changes the _id format of all documents in config.chunks and config.tags to use the format
* expected by a 4.2 binary.
*
* TODO SERVER-44034: Remove this method.
*
* TODO SERVER-42299: Optimize this method by batching inserts and deletes into larger
* transactions.
*/
void downgradeChunksAndTags(OperationContext* opCtx);
void upgradeOrDowngradeChunksAndTags(OperationContext* opCtx, ConfigUpgradeType upgradeType);
Lock::ExclusiveLock lockZoneMutex(OperationContext* opCtx);