mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-44720 Send createIndexes, dropIndexes, and collMod with shard versions
This commit is contained in:
parent
7dbc33b755
commit
a42bd2546b
@ -115,10 +115,11 @@ assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {x: 0}, to: st.shar
|
||||
checkShardIndexes("idx1", [st.shard0, st.shard1], [st.shard2]);
|
||||
checkShardCollOption("validator", validationOption1, [st.shard0, st.shard1], [st.shard2]);
|
||||
|
||||
// Though some shards don't own data for the sharded collection, createIndex, reIndex,
|
||||
// dropIndex, and collMod (which are broadcast to all shards) report overall success (that is,
|
||||
// NamespaceNotFound-type errors from shards are ignored, and they are not included in the 'raw'
|
||||
// shard responses).
|
||||
// Starting in v4.4, createIndex, reIndex, dropIndex, collMod only target the primary shard and
|
||||
// the shards that own chunks for the collection (as supposed to all shards in the previous
|
||||
// versions). The commands will retry on shard version errors, and only report overall success.
|
||||
// That is, IndexNotFound errors from shards are ignored, and not included in the 'raw' shard
|
||||
// responses.
|
||||
|
||||
var res;
|
||||
|
||||
@ -127,17 +128,26 @@ res = st.s.getDB(dbName).getCollection(collName).createIndex({"idx2": 1});
|
||||
assert.commandWorked(res);
|
||||
assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res));
|
||||
assert.eq(res.raw[st.shard1.host].ok, 1, tojson(res));
|
||||
assert.eq(undefined,
|
||||
res.raw[st.shard2.host],
|
||||
tojson(res)); // CannotImplicitlyCreateCollection is ignored
|
||||
assert.eq(undefined, res.raw[st.shard2.host], tojson(res));
|
||||
checkShardIndexes("idx2", [st.shard0, st.shard1], [st.shard2]);
|
||||
|
||||
// dropIndex
|
||||
res = st.s.getDB(dbName).getCollection(collName).dropIndex("idx1_1");
|
||||
assert.commandWorked(res);
|
||||
assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res));
|
||||
// TODO SERVER-44719: Once createIndex is made to check shard versions, after the createIndex
|
||||
// above, each shard should have refreshed its cache. So the mongos will not need to retry the
|
||||
// dropIndex here and will not get IndexNotFound from shard0 (which is ignored and causes the
|
||||
// response from shard0 to be empty).
|
||||
if (jsTestOptions().mongosBinVersion == "last-stable") {
|
||||
assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res));
|
||||
} else {
|
||||
// dropIndexes checks shard versions so causes the first try to succeed on shard0 but not
|
||||
// on shard1. When it retries after the refresh, it fails with IndexNotFound on shard0
|
||||
// so the response from shard0 is empty.
|
||||
assert.eq(undefined, res.raw[st.shard0.host], tojson(res));
|
||||
}
|
||||
assert.eq(res.raw[st.shard1.host].ok, 1, tojson(res));
|
||||
assert.eq(undefined, res.raw[st.shard2.host], tojson(res)); // NamespaceNotFound is ignored
|
||||
assert.eq(undefined, res.raw[st.shard2.host], tojson(res));
|
||||
checkShardIndexes("idx1", [], [st.shard0, st.shard1, st.shard2]);
|
||||
|
||||
// collMod
|
||||
@ -153,7 +163,7 @@ res = st.s.getDB(dbName).runCommand({
|
||||
assert.commandWorked(res);
|
||||
assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res));
|
||||
assert.eq(res.raw[st.shard1.host].ok, 1, tojson(res));
|
||||
assert.eq(undefined, res.raw[st.shard2.host], tojson(res)); // NamespaceNotFound is ignored
|
||||
assert.eq(undefined, res.raw[st.shard2.host], tojson(res));
|
||||
checkShardCollOption("validator", validationOption2, [st.shard0, st.shard1], [st.shard2]);
|
||||
|
||||
// Check that errors from shards are aggregated correctly.
|
||||
@ -171,13 +181,18 @@ assert.neq(null, res.errmsg, tojson(res));
|
||||
res = st.s.getDB(dbName).getCollection(collName).createIndex({});
|
||||
assert.eq(res.raw[st.shard0.host].ok, 0, tojson(res));
|
||||
assert.eq(res.raw[st.shard1.host].ok, 0, tojson(res));
|
||||
assert.eq(res.raw[st.shard2.host].ok, 0, tojson(res));
|
||||
assert.eq(res.code, res.raw[st.shard0.host].code, tojson(res));
|
||||
assert.eq(res.code, res.raw[st.shard1.host].code, tojson(res));
|
||||
assert.eq(res.code, res.raw[st.shard2.host].code, tojson(res));
|
||||
assert.eq(res.codeName, res.raw[st.shard0.host].codeName, tojson(res));
|
||||
assert.eq(res.codeName, res.raw[st.shard1.host].codeName, tojson(res));
|
||||
assert.eq(res.codeName, res.raw[st.shard2.host].codeName, tojson(res));
|
||||
if (jsTestOptions().mongosBinVersion == "last-stable") {
|
||||
assert.eq(res.raw[st.shard2.host].ok, 0, tojson(res));
|
||||
assert.eq(res.code, res.raw[st.shard2.host].code, tojson(res));
|
||||
assert.eq(res.codeName, res.raw[st.shard2.host].codeName, tojson(res));
|
||||
} else {
|
||||
// If the mongos version is "latest", createIndex does not target shard2.
|
||||
assert.eq(undefined, res.raw[st.shard2.host], tojson(res));
|
||||
}
|
||||
assert.eq(res.code, ErrorCodes.CannotCreateIndex, tojson(res));
|
||||
assert.eq("CannotCreateIndex", res.codeName, tojson(res));
|
||||
assert.neq(null, res.errmsg, tojson(res));
|
||||
|
170
jstests/sharding/index_commands_shard_targeting.js
Normal file
170
jstests/sharding/index_commands_shard_targeting.js
Normal file
@ -0,0 +1,170 @@
|
||||
/*
|
||||
* Test that the index commands send shard versions, and only target the primary
|
||||
* shard and the shards that have chunks for the collection.
|
||||
* @tags: [requires_fcv_44]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
/*
|
||||
* Returns the metadata for the collection in the shard's catalog cache.
|
||||
*/
|
||||
function getMetadataOnShard(shard, ns) {
|
||||
let res = shard.adminCommand({getShardVersion: ns, fullMetadata: true});
|
||||
assert.commandWorked(res);
|
||||
return res.metadata;
|
||||
}
|
||||
|
||||
/*
|
||||
* Asserts that the shard version of the shard in its catalog cache is equal to the
|
||||
* given shard version.
|
||||
*/
|
||||
function assertShardVersionEquals(shard, ns, shardVersion) {
|
||||
assert.eq(getMetadataOnShard(shard, ns).shardVersion, shardVersion);
|
||||
}
|
||||
|
||||
/*
|
||||
* Asserts that the shard has an index for the collection with the given index key.
|
||||
*/
|
||||
function assertIndexExistsOnShard(shard, dbName, collName, targetIndexKey) {
|
||||
let res = shard.getDB(dbName).runCommand({listIndexes: collName});
|
||||
assert.commandWorked(res);
|
||||
|
||||
let indexesOnShard = res.cursor.firstBatch;
|
||||
const isTargetIndex = (index) => bsonWoCompare(index.key, targetIndexKey) === 0;
|
||||
assert(indexesOnShard.some(isTargetIndex));
|
||||
}
|
||||
|
||||
/*
|
||||
* Asserts that the shard does not have an index for the collection with the given index key.
|
||||
*/
|
||||
function assertIndexDoesNotExistOnShard(shard, dbName, collName, targetIndexKey) {
|
||||
let res = shard.getDB(dbName).runCommand({listIndexes: collName});
|
||||
if (!res.ok && res.code === ErrorCodes.NamespaceNotFound) {
|
||||
// The collection does not exist on the shard, neither does the target index.
|
||||
return;
|
||||
}
|
||||
assert.commandWorked(res);
|
||||
|
||||
let indexesOnShard = res.cursor.firstBatch;
|
||||
indexesOnShard.forEach(function(index) {
|
||||
assert.neq(0, bsonWoCompare(index.key, targetIndexKey));
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Performs chunk operations to make the primary shard (shard0) not own any chunks for collection,
|
||||
* and only a subset of non-primary shards (shard1 and shard2) own chunks for collection.
|
||||
*/
|
||||
function setUpShards(st, ns) {
|
||||
// Move the initial chunk out of the primary shard.
|
||||
assert.commandWorked(st.s.adminCommand(
|
||||
{moveChunk: ns, find: {_id: MinKey}, to: st.shard1.shardName, _waitForDelete: true}));
|
||||
|
||||
// Split the chunk to create two chunks on shard1. Move one of the chunks to shard2.
|
||||
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
|
||||
assert.commandWorked(st.s.adminCommand(
|
||||
{moveChunk: ns, find: {_id: 0}, to: st.shard2.shardName, _waitForDelete: true}));
|
||||
|
||||
// Assert that primary shard does not have any chunks for the collection.
|
||||
assertShardVersionEquals(st.shard0, ns, Timestamp(0, 0));
|
||||
}
|
||||
|
||||
const numShards = 4;
|
||||
const st = new ShardingTest({shards: numShards});
|
||||
const allShards = [];
|
||||
for (let i = 0; i < numShards; i++) {
|
||||
allShards.push(st["shard" + i]);
|
||||
}
|
||||
|
||||
const dbName = "test";
|
||||
const testDB = st.s.getDB(dbName);
|
||||
const shardKey = {
|
||||
_id: 1
|
||||
};
|
||||
const index = {
|
||||
key: {x: 1},
|
||||
name: "x_1"
|
||||
};
|
||||
|
||||
const expectedTargetedShards = new Set([st.shard0, st.shard1, st.shard2]);
|
||||
assert.lt(expectedTargetedShards.size, numShards);
|
||||
|
||||
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
|
||||
st.ensurePrimaryShard(dbName, st.shard0.shardName);
|
||||
|
||||
jsTest.log("Test createIndexes command...");
|
||||
|
||||
(() => {
|
||||
let testColl = testDB.testCreateIndexes;
|
||||
let collName = testColl.getName();
|
||||
let ns = testColl.getFullName();
|
||||
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey}));
|
||||
setUpShards(st, ns);
|
||||
assert.commandWorked(testDB.runCommand({createIndexes: collName, indexes: [index]}));
|
||||
|
||||
// Assert that the index exists on the targeted shards but not on the untargeted shards.
|
||||
allShards.forEach(function(shard) {
|
||||
if (expectedTargetedShards.has(shard)) {
|
||||
assertIndexExistsOnShard(shard, dbName, collName, index.key);
|
||||
} else {
|
||||
assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key);
|
||||
}
|
||||
});
|
||||
})();
|
||||
|
||||
jsTest.log("Test dropIndexes command...");
|
||||
|
||||
(() => {
|
||||
let testColl = testDB.testDropIndexes;
|
||||
let collName = testColl.getName();
|
||||
let ns = testColl.getFullName();
|
||||
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey}));
|
||||
setUpShards(st, ns);
|
||||
|
||||
// Create the index directly on all the shards.
|
||||
allShards.forEach(function(shard) {
|
||||
assert.commandWorked(
|
||||
shard.getDB(dbName).runCommand({createIndexes: collName, indexes: [index]}));
|
||||
});
|
||||
|
||||
// Drop the index.
|
||||
assert.commandWorked(testDB.runCommand({dropIndexes: collName, index: index.name}));
|
||||
|
||||
// Assert that the index no longer exists on the targeted shards but still exists on the
|
||||
// untargeted shards.
|
||||
allShards.forEach(function(shard) {
|
||||
if (expectedTargetedShards.has(shard)) {
|
||||
assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key);
|
||||
} else {
|
||||
assertIndexExistsOnShard(shard, dbName, collName, index.key);
|
||||
}
|
||||
});
|
||||
})();
|
||||
|
||||
jsTest.log("Test collMod command...");
|
||||
|
||||
(() => {
|
||||
let testColl = testDB.testCollMod;
|
||||
let collName = testColl.getName();
|
||||
let ns = testColl.getFullName();
|
||||
|
||||
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey}));
|
||||
setUpShards(st, ns);
|
||||
assert.commandWorked(testDB.runCommand({collMod: collName, validator: {x: {$type: "string"}}}));
|
||||
|
||||
// Assert that the targeted shards do document validation, and the untargeted shards do not.
|
||||
allShards.forEach(function(shard) {
|
||||
if (expectedTargetedShards.has(shard)) {
|
||||
assert.commandFailedWithCode(shard.getCollection(ns).insert({x: 1}),
|
||||
ErrorCodes.DocumentValidationFailure);
|
||||
} else {
|
||||
assert.commandWorked(shard.getCollection(ns).insert({x: 1}));
|
||||
}
|
||||
});
|
||||
})();
|
||||
|
||||
st.stop();
|
||||
})();
|
@ -32,7 +32,6 @@ function validateTestCase(testCase) {
|
||||
"implicitlyCreatesCollection",
|
||||
"whenNamespaceIsViewFailsWith",
|
||||
"doesNotCheckShardVersion",
|
||||
"doesNotSendShardVersionIfTracked",
|
||||
"command",
|
||||
"conditional"
|
||||
].includes(key),
|
||||
@ -81,7 +80,6 @@ let testCases = {
|
||||
clearLog: {skip: "executes locally on mongos (not sent to any remote node)"},
|
||||
collMod: {
|
||||
whenNamespaceDoesNotExistFailsWith: ErrorCodes.NamespaceNotFound,
|
||||
doesNotSendShardVersionIfTracked: true,
|
||||
command: collName => {
|
||||
return {collMod: collName};
|
||||
},
|
||||
@ -109,7 +107,6 @@ let testCases = {
|
||||
implicitlyCreatesCollection: true,
|
||||
whenNamespaceIsViewFailsWith: ErrorCodes.CommandNotSupportedOnView,
|
||||
doesNotCheckShardVersion: true,
|
||||
doesNotSendShardVersionIfTracked: true,
|
||||
command: collName => {
|
||||
return {createIndexes: collName, indexes: [{key: {a: 1}, name: "index"}]};
|
||||
},
|
||||
@ -140,7 +137,6 @@ let testCases = {
|
||||
dropIndexes: {
|
||||
whenNamespaceDoesNotExistFailsWith: ErrorCodes.NamespaceNotFound,
|
||||
whenNamespaceIsViewFailsWith: ErrorCodes.CommandNotSupportedOnView,
|
||||
doesNotSendShardVersionIfTracked: true,
|
||||
command: collName => {
|
||||
return {dropIndexes: collName, index: "*"};
|
||||
},
|
||||
@ -379,8 +375,7 @@ for (let command of Object.keys(res.commands)) {
|
||||
assert.commandWorked(st.s.getDB(dbName).runCommand(testCase.command(collName)));
|
||||
}
|
||||
|
||||
if (testCase.implicitlyCreatesCollection &&
|
||||
!(testCase.doesNotCheckShardVersion || testCase.doesNotSendShardVersionIfTracked)) {
|
||||
if (testCase.implicitlyCreatesCollection && !testCase.doesNotCheckShardVersion) {
|
||||
expectShardsCachedShardVersionToBe(st.shard0, ns, Timestamp(1, 0));
|
||||
} else {
|
||||
expectShardsCachedShardVersionToBe(st.shard0, ns, "UNKNOWN");
|
||||
@ -444,7 +439,7 @@ for (let command of Object.keys(res.commands)) {
|
||||
assert.commandWorked(st.s.adminCommand({flushRouterConfig: dbName}));
|
||||
assert.commandWorked(st.s.getDB(dbName).runCommand(testCase.command(collName)));
|
||||
|
||||
if (testCase.doesNotCheckShardVersion || testCase.doesNotSendShardVersionIfTracked) {
|
||||
if (testCase.doesNotCheckShardVersion) {
|
||||
expectShardsCachedShardVersionToBe(st.shard0, ns, "UNKNOWN");
|
||||
} else {
|
||||
expectShardsCachedShardVersionToBe(st.shard0, ns, Timestamp(1, 0));
|
||||
|
@ -78,15 +78,36 @@ void SessionsCollectionConfigServer::_shardCollectionIfNeeded(OperationContext*
|
||||
}
|
||||
|
||||
void SessionsCollectionConfigServer::_generateIndexesIfNeeded(OperationContext* opCtx) {
|
||||
const auto nss = NamespaceString::kLogicalSessionsNamespace;
|
||||
auto routingInfo =
|
||||
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
|
||||
|
||||
try {
|
||||
dispatchCommandAssertCollectionExistsOnAtLeastOneShard(
|
||||
opCtx,
|
||||
NamespaceString::kLogicalSessionsNamespace,
|
||||
SessionsCollection::generateCreateIndexesCmd());
|
||||
for (int tries = 0;; ++tries) {
|
||||
const bool canRetry = tries < kMaxNumStaleVersionRetries - 1;
|
||||
try {
|
||||
scatterGatherVersionedTargetPrimaryShardAndByRoutingTable(
|
||||
opCtx,
|
||||
nss.db(),
|
||||
nss,
|
||||
routingInfo,
|
||||
SessionsCollection::generateCreateIndexesCmd(),
|
||||
ReadPreferenceSetting::get(opCtx),
|
||||
Shard::RetryPolicy::kNoRetry,
|
||||
BSONObj() /* query */,
|
||||
BSONObj() /* collation */);
|
||||
return;
|
||||
} catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& ex) {
|
||||
log() << "Attempt " << tries << " to generate TTL index for " << nss
|
||||
<< " received StaleShardVersion error" << causedBy(ex);
|
||||
if (canRetry) {
|
||||
continue;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
} catch (DBException& ex) {
|
||||
ex.addContext(str::stream()
|
||||
<< "Failed to generate TTL index for "
|
||||
<< NamespaceString::kLogicalSessionsNamespace << " on all shards");
|
||||
ex.addContext(str::stream() << "Failed to generate TTL index for " << nss);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -135,7 +135,8 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard
|
||||
const CachedCollectionRoutingInfo& routingInfo,
|
||||
const BSONObj& cmdObj,
|
||||
const BSONObj& query,
|
||||
const BSONObj& collation) {
|
||||
const BSONObj& collation,
|
||||
const bool alwaysIncludePrimaryShard = false) {
|
||||
|
||||
auto cmdToSend = cmdObj;
|
||||
if (!cmdToSend.hasField(kAllowImplicitCollectionCreation)) {
|
||||
@ -163,6 +164,19 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard
|
||||
std::set<ShardId> shardIds;
|
||||
routingInfo.cm()->getShardIdsForQuery(opCtx, query, collation, &shardIds);
|
||||
|
||||
if (alwaysIncludePrimaryShard && routingInfo.db().primaryId() != "config") {
|
||||
// TODO (SERVER-44949): The only sharded collection which has the config server as
|
||||
// the primary shard is config.system.sessions. Unfortunately, the config server
|
||||
// calls this code path to create indexes on the sessions collection, and the code
|
||||
// it uses to create the indexes would currently invariant if one of the targeted
|
||||
// shards was the config server itself. To get around this, we skip targeting the
|
||||
// config shard here, but as a result, listIndexes on config.system.sessions will
|
||||
// not return the correct indexes. This bug existed prior to 4.4 as well, since
|
||||
// the old code to create indexes on the sessions collection also did not build
|
||||
// the index on the config server.
|
||||
shardIds.insert(routingInfo.db().primaryId());
|
||||
}
|
||||
|
||||
for (const ShardId& shardId : shardIds) {
|
||||
requests.emplace_back(shardId,
|
||||
appendShardVersion(cmdToSend, routingInfo.cm()->getVersion(shardId)));
|
||||
@ -381,6 +395,23 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting
|
||||
return gatherResponses(opCtx, dbName, readPref, retryPolicy, requests);
|
||||
}
|
||||
|
||||
std::vector<AsyncRequestsSender::Response>
|
||||
scatterGatherVersionedTargetPrimaryShardAndByRoutingTable(
|
||||
OperationContext* opCtx,
|
||||
StringData dbName,
|
||||
const NamespaceString& nss,
|
||||
const CachedCollectionRoutingInfo& routingInfo,
|
||||
const BSONObj& cmdObj,
|
||||
const ReadPreferenceSetting& readPref,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
const BSONObj& query,
|
||||
const BSONObj& collation) {
|
||||
const auto requests = buildVersionedRequestsForTargetedShards(
|
||||
opCtx, nss, routingInfo, cmdObj, query, collation, true);
|
||||
|
||||
return gatherResponses(opCtx, dbName, readPref, retryPolicy, requests);
|
||||
}
|
||||
|
||||
std::vector<AsyncRequestsSender::Response> scatterGatherOnlyVersionIfUnsharded(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
@ -658,27 +689,4 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
|
||||
return catalogCache->getCollectionRoutingInfoAt(opCtx, nss, atClusterTime.asTimestamp());
|
||||
}
|
||||
|
||||
std::vector<AsyncRequestsSender::Response> dispatchCommandAssertCollectionExistsOnAtLeastOneShard(
|
||||
OperationContext* opCtx, const NamespaceString& nss, const BSONObj& cmdObj) {
|
||||
auto shardResponses = scatterGatherOnlyVersionIfUnsharded(
|
||||
opCtx,
|
||||
nss,
|
||||
CommandHelpers::filterCommandRequestForPassthrough(cmdObj),
|
||||
ReadPreferenceSetting::get(opCtx),
|
||||
Shard::RetryPolicy::kNoRetry,
|
||||
{ErrorCodes::CannotImplicitlyCreateCollection});
|
||||
|
||||
if (std::all_of(shardResponses.begin(), shardResponses.end(), [](const auto& response) {
|
||||
return response.swResponse.getStatus().isOK() &&
|
||||
getStatusFromCommandResult(response.swResponse.getValue().data) ==
|
||||
ErrorCodes::CannotImplicitlyCreateCollection;
|
||||
})) {
|
||||
// Propagate the ExtraErrorInfo from the first response.
|
||||
uassertStatusOK(
|
||||
getStatusFromCommandResult(shardResponses.front().swResponse.getValue().data));
|
||||
}
|
||||
|
||||
return shardResponses;
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -153,6 +153,22 @@ std::vector<AsyncRequestsSender::Response> scatterGatherVersionedTargetByRouting
|
||||
const BSONObj& query,
|
||||
const BSONObj& collation);
|
||||
|
||||
/**
|
||||
* Similar to scatterGatherVersionedTargetByRoutingTable but always targets the primary
|
||||
* shard for the namespace.
|
||||
*/
|
||||
std::vector<AsyncRequestsSender::Response>
|
||||
scatterGatherVersionedTargetPrimaryShardAndByRoutingTable(
|
||||
OperationContext* opCtx,
|
||||
StringData dbName,
|
||||
const NamespaceString& nss,
|
||||
const CachedCollectionRoutingInfo& routingInfo,
|
||||
const BSONObj& cmdObj,
|
||||
const ReadPreferenceSetting& readPref,
|
||||
Shard::RetryPolicy retryPolicy,
|
||||
const BSONObj& query,
|
||||
const BSONObj& collation);
|
||||
|
||||
/**
|
||||
* Utility for dispatching commands on a namespace, but with special hybrid versioning:
|
||||
* - If the namespace is unsharded, a version is attached (so this node can find out if its routing
|
||||
@ -257,12 +273,4 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx,
|
||||
StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
|
||||
OperationContext* opCtx, const NamespaceString& nss);
|
||||
|
||||
/**
|
||||
* Utility for dispatching a command on a namespace that is sent to all shards. If all shards return
|
||||
* CannotImplicitlyCreateCollection, will throw. If at least one shard succeeds, will ignore
|
||||
* CannotImplicitlyCreateCollection errors.
|
||||
*/
|
||||
std::vector<AsyncRequestsSender::Response> dispatchCommandAssertCollectionExistsOnAtLeastOneShard(
|
||||
OperationContext* opCtx, const NamespaceString& nss, const BSONObj& cmdObj);
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "mongo/db/auth/authorization_session.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/s/cluster_commands_helpers.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/util/log.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -70,15 +71,20 @@ public:
|
||||
const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj));
|
||||
LOG(1) << "collMod: " << nss << " cmd:" << redact(cmdObj);
|
||||
|
||||
auto shardResponses = scatterGatherOnlyVersionIfUnsharded(
|
||||
auto routingInfo =
|
||||
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
|
||||
auto shardResponses = scatterGatherVersionedTargetPrimaryShardAndByRoutingTable(
|
||||
opCtx,
|
||||
nss.db(),
|
||||
nss,
|
||||
routingInfo,
|
||||
applyReadWriteConcern(
|
||||
opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)),
|
||||
ReadPreferenceSetting::get(opCtx),
|
||||
Shard::RetryPolicy::kNoRetry);
|
||||
return appendRawResponses(
|
||||
opCtx, &errmsg, &output, std::move(shardResponses), {ErrorCodes::NamespaceNotFound});
|
||||
Shard::RetryPolicy::kNoRetry,
|
||||
BSONObj() /* query */,
|
||||
BSONObj() /* collation */);
|
||||
return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses));
|
||||
}
|
||||
|
||||
} collectionModCmd;
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/rpc/get_status_from_command_result.h"
|
||||
#include "mongo/s/cluster_commands_helpers.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/util/log.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -73,14 +74,21 @@ public:
|
||||
|
||||
createShardDatabase(opCtx, dbName);
|
||||
|
||||
auto shardResponses = dispatchCommandAssertCollectionExistsOnAtLeastOneShard(
|
||||
opCtx, nss, applyReadWriteConcern(opCtx, this, cmdObj));
|
||||
auto routingInfo =
|
||||
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
|
||||
auto shardResponses = scatterGatherVersionedTargetPrimaryShardAndByRoutingTable(
|
||||
opCtx,
|
||||
nss.db(),
|
||||
nss,
|
||||
routingInfo,
|
||||
CommandHelpers::filterCommandRequestForPassthrough(
|
||||
applyReadWriteConcern(opCtx, this, cmdObj)),
|
||||
ReadPreferenceSetting::get(opCtx),
|
||||
Shard::RetryPolicy::kNoRetry,
|
||||
BSONObj() /* query */,
|
||||
BSONObj() /* collation */);
|
||||
|
||||
return appendRawResponses(opCtx,
|
||||
&errmsg,
|
||||
&output,
|
||||
std::move(shardResponses),
|
||||
{ErrorCodes::CannotImplicitlyCreateCollection});
|
||||
return appendRawResponses(opCtx, &errmsg, &output, std::move(shardResponses));
|
||||
}
|
||||
|
||||
} createIndexesCmd;
|
||||
|
@ -33,6 +33,7 @@
|
||||
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/s/cluster_commands_helpers.h"
|
||||
#include "mongo/s/grid.h"
|
||||
#include "mongo/util/log.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -70,25 +71,24 @@ public:
|
||||
const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbName, cmdObj));
|
||||
LOG(1) << "dropIndexes: " << nss << " cmd:" << redact(cmdObj);
|
||||
|
||||
// If the collection is sharded, we target all shards rather than just shards that own
|
||||
// chunks for the collection, because some shard may have previously owned chunks but no
|
||||
// longer does (and so, may have the index). However, we ignore NamespaceNotFound errors
|
||||
// from individual shards, because some shards may have never owned chunks for the
|
||||
// collection. We additionally ignore IndexNotFound errors, because the index may not have
|
||||
// been built on a shard if the earlier createIndexes command coincided with the shard
|
||||
// receiving its first chunk for the collection (see SERVER-31715).
|
||||
auto shardResponses = scatterGatherOnlyVersionIfUnsharded(
|
||||
// If the collection is sharded, we target only the primary shard and the shards that own
|
||||
// chunks for the collection. We ignore IndexNotFound errors, because the index may have
|
||||
// been dropped on an earlier attempt.
|
||||
auto routingInfo =
|
||||
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
|
||||
auto shardResponses = scatterGatherVersionedTargetPrimaryShardAndByRoutingTable(
|
||||
opCtx,
|
||||
nss.db(),
|
||||
nss,
|
||||
routingInfo,
|
||||
applyReadWriteConcern(
|
||||
opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)),
|
||||
ReadPreferenceSetting::get(opCtx),
|
||||
Shard::RetryPolicy::kNotIdempotent);
|
||||
return appendRawResponses(opCtx,
|
||||
&errmsg,
|
||||
&output,
|
||||
std::move(shardResponses),
|
||||
{ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound});
|
||||
Shard::RetryPolicy::kNotIdempotent,
|
||||
BSONObj() /* query */,
|
||||
BSONObj() /* collation */);
|
||||
return appendRawResponses(
|
||||
opCtx, &errmsg, &output, std::move(shardResponses), {ErrorCodes::IndexNotFound});
|
||||
}
|
||||
|
||||
} dropIndexesCmd;
|
||||
|
Loading…
Reference in New Issue
Block a user