0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-28 07:59:02 +01:00

SERVER-17496 Move cluster count command into a separate file

This commit is contained in:
Kaloian Manassiev 2015-03-17 11:17:07 -04:00
parent db982c182c
commit bf52637fde
9 changed files with 382 additions and 234 deletions

View File

@ -6,6 +6,8 @@ env.Library(
target='cluster_commands',
source=[
'cluster_add_shard_cmd.cpp',
'cluster_commands_common.cpp',
'cluster_count_cmd.cpp',
'cluster_enable_sharding_cmd.cpp',
'cluster_explain_cmd.cpp',
'cluster_find_cmd.cpp',

View File

@ -0,0 +1,64 @@
/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/s/commands/cluster_commands_common.h"
namespace mongo {
int getUniqueCodeFromCommandResults(const std::vector<Strategy::CommandResult>& results) {
int commonErrCode = -1;
for (std::vector<Strategy::CommandResult>::const_iterator it = results.begin();
it != results.end();
++it) {
// Only look at shards with errors.
if (!it->result["ok"].trueValue()) {
int errCode = it->result["code"].numberInt();
if (commonErrCode == -1) {
commonErrCode = errCode;
}
else if (commonErrCode != errCode) {
// At least two shards with errors disagree on the error code
commonErrCode = 0;
}
}
}
// If no error encountered or shards with errors disagree on the error code, return 0
if (commonErrCode == -1 || commonErrCode == 0) {
return 0;
}
// Otherwise, shards with errors agree on the error code; return that code
return commonErrCode;
}
} // namespace mongo

View File

@ -0,0 +1,45 @@
/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include <vector>
#include "mongo/s/strategy.h"
namespace mongo {
/**
* Utility function to compute a single error code from a vector of command results.
*
* @return If there is an error code common to all of the error results, returns that error
* code; otherwise, returns 0.
*/
int getUniqueCodeFromCommandResults(const std::vector<Strategy::CommandResult>& results);
} // namespace mongo

View File

@ -0,0 +1,248 @@
/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include <vector>
#include "mongo/db/commands.h"
#include "mongo/s/cluster_explain.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/strategy.h"
#include "mongo/util/timer.h"
namespace mongo {
using std::string;
using std::vector;
namespace {
long long applySkipLimit(long long num, const BSONObj& cmd) {
BSONElement s = cmd["skip"];
BSONElement l = cmd["limit"];
if (s.isNumber()) {
num = num - s.numberLong();
if (num < 0) {
num = 0;
}
}
if (l.isNumber()) {
long long limit = l.numberLong();
if (limit < 0){
limit = -limit;
}
// 0 limit means no limit
if (limit < num && limit != 0) {
num = limit;
}
}
return num;
}
class ClusterCountCmd : public Command {
public:
ClusterCountCmd() : Command("count", false) { }
virtual bool slaveOk() const {
return true;
}
virtual bool adminOnly() const {
return false;
}
virtual bool isWriteCommandForConfigServer() const {
return false;
}
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
virtual bool run(OperationContext* txn,
const std::string& dbname,
BSONObj& cmdObj,
int options,
std::string& errmsg,
BSONObjBuilder& result,
bool fromRepl) {
long long skip = 0;
if (cmdObj["skip"].isNumber()) {
skip = cmdObj["skip"].numberLong();
if (skip < 0) {
errmsg = "skip value is negative in count query";
return false;
}
}
else if (cmdObj["skip"].ok()) {
errmsg = "skip value is not a valid number";
return false;
}
const string collection = cmdObj.firstElement().valuestrsafe();
const string fullns = dbname + "." + collection;
BSONObjBuilder countCmdBuilder;
countCmdBuilder.append("count", collection);
BSONObj filter;
if (cmdObj["query"].isABSONObj()) {
countCmdBuilder.append("query", cmdObj["query"].Obj());
filter = cmdObj["query"].Obj();
}
if (cmdObj["limit"].isNumber()) {
long long limit = cmdObj["limit"].numberLong();
// We only need to factor in the skip value when sending to the shards if we
// have a value for limit, otherwise, we apply it only once we have collected all
// counts.
if (limit != 0 && cmdObj["skip"].isNumber()) {
if (limit > 0)
limit += skip;
else
limit -= skip;
}
countCmdBuilder.append("limit", limit);
}
if (cmdObj.hasField("hint")) {
countCmdBuilder.append(cmdObj["hint"]);
}
if (cmdObj.hasField("$queryOptions")) {
countCmdBuilder.append(cmdObj["$queryOptions"]);
}
if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
countCmdBuilder.append(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
}
vector<Strategy::CommandResult> countResult;
STRATEGY->commandOp(dbname,
countCmdBuilder.done(),
options,
fullns,
filter,
&countResult);
long long total = 0;
BSONObjBuilder shardSubTotal(result.subobjStart("shards"));
for (vector<Strategy::CommandResult>::const_iterator iter = countResult.begin();
iter != countResult.end();
++iter) {
const string& shardName = iter->shardTarget.getName();
if (iter->result["ok"].trueValue()) {
long long shardCount = iter->result["n"].numberLong();
shardSubTotal.appendNumber(shardName, shardCount);
total += shardCount;
}
else {
shardSubTotal.doneFast();
errmsg = "failed on : " + shardName;
result.append("cause", iter->result);
// Add "code" to the top-level response, if the failure of the sharded command
// can be accounted to a single error
int code = getUniqueCodeFromCommandResults(countResult);
if (code != 0) {
result.append("code", code);
}
return false;
}
}
shardSubTotal.doneFast();
total = applySkipLimit(total, cmdObj);
result.appendNumber("n", total);
return true;
}
virtual Status explain(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
ExplainCommon::Verbosity verbosity,
BSONObjBuilder* out) const {
const string fullns = parseNs(dbname, cmdObj);
// Extract the targeting query.
BSONObj targetingQuery;
if (Object == cmdObj["query"].type()) {
targetingQuery = cmdObj["query"].Obj();
}
BSONObjBuilder explainCmdBob;
ClusterExplain::wrapAsExplain(cmdObj, verbosity, &explainCmdBob);
// We will time how long it takes to run the commands on the shards
Timer timer;
vector<Strategy::CommandResult> shardResults;
STRATEGY->commandOp(dbname,
explainCmdBob.obj(),
0,
fullns,
targetingQuery,
&shardResults);
long long millisElapsed = timer.millis();
const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults,
cmdObj);
return ClusterExplain::buildExplainResult(shardResults,
mongosStageName,
millisElapsed,
out);
}
} clusterCountCmd;
} // namespace
} // namespace mongo

View File

@ -175,7 +175,7 @@ namespace {
BSONObj res;
{
list<BSONObj> all = conn->getCollectionInfos(
config->getName(),
config->name(),
BSON("name" << nsToCollectionSubstring(ns)));
if (!all.empty()) {
res = all.front().getOwned();

View File

@ -59,6 +59,7 @@
#include "mongo/s/client_info.h"
#include "mongo/s/cluster_explain.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/commands/cluster_commands_common.h"
#include "mongo/s/config.h"
#include "mongo/s/cursors.h"
#include "mongo/s/distlock.h"
@ -90,39 +91,6 @@ namespace mongo {
namespace {
/**
* Utility function to compute a single error code from a vector of command results. If
* there is an error code common to all of the error results, returns that error code;
* otherwise, returns 0.
*/
int getUniqueCode( const vector<Strategy::CommandResult>& results ) {
int commonErrCode = -1;
for ( vector<Strategy::CommandResult>::const_iterator it = results.begin();
it != results.end();
it++ ) {
// Only look at shards with errors.
if ( !it->result["ok"].trueValue() ) {
int errCode = it->result["code"].numberInt();
if ( commonErrCode == -1 ) {
commonErrCode = errCode;
}
else if ( commonErrCode != errCode ) {
// At least two shards with errors disagree on the error code.
commonErrCode = 0;
}
}
}
// If no error encountered or shards with errors disagree on the error code, return
// 0.
if ( commonErrCode == -1 || commonErrCode == 0 ) {
return 0;
}
// Otherwise, shards with errors agree on the error code; return that code.
return commonErrCode;
}
/**
* Utility function to parse a cursor command response and save the cursor in the
* CursorCache "refs" container. Returns Status::OK() if the cursor was successfully
@ -178,14 +146,14 @@ namespace mongo {
protected:
bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
return _passthrough(conf->getName(), conf, cmdObj, 0, result);
return _passthrough(conf->name(), conf, cmdObj, 0, result);
}
bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
return _passthrough("admin", conf, cmdObj, 0, result);
}
bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) {
return _passthrough(conf->getName(), conf, cmdObj, options, result);
return _passthrough(conf->name(), conf, cmdObj, options, result);
}
bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) {
return _passthrough("admin", conf, cmdObj, options, result);
@ -804,7 +772,7 @@ namespace mongo {
// intermediate version of the info
//
DBConfig confCopy( conf->getName() );
DBConfig confCopy( conf->name() );
if( ! confCopy.load() ){
errmsg = "could not load database info to drop";
return false;
@ -907,181 +875,6 @@ namespace mongo {
return Status::OK();
}
class CountCmd : public PublicGridCommand {
public:
CountCmd() : PublicGridCommand( "count" ) { }
virtual bool passOptions() const { return true; }
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
ActionSet actions;
actions.addAction(ActionType::find);
out->push_back(Privilege(parseResourcePattern(dbname, cmdObj), actions));
}
long long applySkipLimit( long long num , const BSONObj& cmd ) {
BSONElement s = cmd["skip"];
BSONElement l = cmd["limit"];
if ( s.isNumber() ) {
num = num - s.numberLong();
if ( num < 0 ) {
num = 0;
}
}
if ( l.isNumber() ) {
long long limit = l.numberLong();
if( limit < 0 ){
limit = -limit;
}
if ( limit < num && limit != 0 ) { // 0 limit means no limit
num = limit;
}
}
return num;
}
bool run(OperationContext* txn, const string& dbName,
BSONObj& cmdObj,
int options,
string& errmsg,
BSONObjBuilder& result,
bool ){
long long skip = 0;
if( cmdObj["skip"].isNumber() ){
skip = cmdObj["skip"].numberLong();
if( skip < 0 ){
errmsg = "skip value is negative in count query";
return false;
}
}
else if( cmdObj["skip"].ok() ){
errmsg = "skip value is not a valid number";
return false;
}
const string collection = cmdObj.firstElement().valuestrsafe();
const string fullns = dbName + "." + collection;
BSONObjBuilder countCmdBuilder;
countCmdBuilder.append( "count", collection );
BSONObj filter;
if( cmdObj["query"].isABSONObj() ){
countCmdBuilder.append( "query", cmdObj["query"].Obj() );
filter = cmdObj["query"].Obj();
}
if( cmdObj["limit"].isNumber() ){
long long limit = cmdObj["limit"].numberLong();
/* We only need to factor in the skip value when sending to
* the shards if we have a value for limit, otherwise, we
* apply it only once we have collected all counts.
*/
if( limit != 0 && cmdObj["skip"].isNumber() ){
if ( limit > 0 )
limit += skip;
else
limit -= skip;
}
countCmdBuilder.append( "limit", limit );
}
if (cmdObj.hasField("hint")) {
countCmdBuilder.append(cmdObj["hint"]);
}
if (cmdObj.hasField("$queryOptions")) {
countCmdBuilder.append(cmdObj["$queryOptions"]);
}
if (cmdObj.hasField(LiteParsedQuery::cmdOptionMaxTimeMS)) {
countCmdBuilder.append(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
}
vector<Strategy::CommandResult> countResult;
STRATEGY->commandOp( dbName, countCmdBuilder.done(),
options, fullns, filter, &countResult );
long long total = 0;
BSONObjBuilder shardSubTotal( result.subobjStart( "shards" ));
for( vector<Strategy::CommandResult>::const_iterator iter = countResult.begin();
iter != countResult.end(); ++iter ){
const string& shardName = iter->shardTarget.getName();
if( iter->result["ok"].trueValue() ){
long long shardCount = iter->result["n"].numberLong();
shardSubTotal.appendNumber( shardName, shardCount );
total += shardCount;
}
else {
shardSubTotal.doneFast();
errmsg = "failed on : " + shardName;
result.append( "cause", iter->result );
// Add "code" to the top-level response, if the failure of the sharded
// command can be accounted to a single error.
int code = getUniqueCode( countResult );
if ( code != 0 ) {
result.append( "code", code );
}
return false;
}
}
shardSubTotal.doneFast();
total = applySkipLimit( total , cmdObj );
result.appendNumber( "n" , total );
return true;
}
Status explain(OperationContext* txn,
const std::string& dbname,
const BSONObj& cmdObj,
ExplainCommon::Verbosity verbosity,
BSONObjBuilder* out) const {
const string fullns = parseNs(dbname, cmdObj);
// Extract the targeting query.
BSONObj targetingQuery;
if (Object == cmdObj["query"].type()) {
targetingQuery = cmdObj["query"].Obj();
}
BSONObjBuilder explainCmdBob;
ClusterExplain::wrapAsExplain(cmdObj, verbosity, &explainCmdBob);
// We will time how long it takes to run the commands on the shards.
Timer timer;
vector<Strategy::CommandResult> shardResults;
STRATEGY->commandOp(dbname,
explainCmdBob.obj(),
0,
fullns,
targetingQuery,
&shardResults);
long long millisElapsed = timer.millis();
const char* mongosStageName = ClusterExplain::getStageNameForReadOp(shardResults,
cmdObj);
return ClusterExplain::buildExplainResult(shardResults,
mongosStageName,
millisElapsed,
out);
}
} countCmd;
class CollectionStats : public PublicGridCommand {
public:
CollectionStats() : PublicGridCommand("collStats", "collstats") { }
@ -1288,7 +1081,7 @@ namespace mongo {
ChunkPtr chunk = cm->findIntersectingChunk(shardKey);
ShardConnection conn( chunk->getShard() , fullns );
BSONObj res;
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
bool ok = conn->runCommand( conf->name() , cmdObj , res );
conn.done();
if (!ok && res.getIntField("code") == RecvStaleConfigCode) { // code for RecvStaleConfigException
@ -1360,7 +1153,7 @@ namespace mongo {
for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) {
ScopedDbConnection conn(i->getConnString());
BSONObj res;
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
bool ok = conn->runCommand( conf->name() , cmdObj , res );
conn.done();
if ( ! ok ) {
@ -1530,7 +1323,7 @@ namespace mongo {
for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) {
ShardConnection conn( *i , fullns );
BSONObj res;
bool ok = conn->runCommand( conf->getName() , cmdObj , res, options );
bool ok = conn->runCommand( conf->name() , cmdObj , res, options );
conn.done();
if ( ! ok ) {
@ -1660,7 +1453,7 @@ namespace mongo {
return false;
}
uassert(16246, "Shard " + conf->getName() + " is too old to support GridFS sharded by {files_id:1, n:1}",
uassert(16246, "Shard " + conf->name() + " is too old to support GridFS sharded by {files_id:1, n:1}",
res.hasField("md5state"));
lastResult = res;
@ -2082,7 +1875,7 @@ namespace mongo {
errmsg += singleResult.toString();
// Add "code" to the top-level response, if the failure of the sharded command
// can be accounted to a single error.
int code = getUniqueCode( results );
int code = getUniqueCodeFromCommandResults( results );
if ( code != 0 ) {
result.append( "code", code );
}
@ -2580,7 +2373,7 @@ namespace mongo {
// If the failure of the sharded command can be accounted to a single error,
// throw a UserException with that error code; otherwise, throw with a
// location uassert code.
int errCode = getUniqueCode( shardResults );
int errCode = getUniqueCodeFromCommandResults( shardResults );
if ( errCode == 0 ) {
errCode = 17022;
}
@ -2720,7 +2513,7 @@ namespace mongo {
// Temporary hack. See comment on declaration for details.
ShardConnection conn( conf->getPrimary() , "" );
BSONObj result = aggRunCommand(conn.get(), conf->getName(), cmd, queryOptions);
BSONObj result = aggRunCommand(conn.get(), conf->name(), cmd, queryOptions);
conn.done();
bool ok = result["ok"].trueValue();

View File

@ -60,7 +60,7 @@ namespace mongo {
DBConfig(std::string name);
virtual ~DBConfig();
std::string getName() const { return _name; };
const std::string& name() const { return _name; };
/**
* @return if anything in this db is partitioned or not

View File

@ -194,21 +194,17 @@ namespace mongo {
}
void Grid::removeDBIfExists( const DBConfig& database ) {
void Grid::removeDBIfExists(const DBConfig& database) {
boost::lock_guard<boost::mutex> l(_lock);
boost::lock_guard<boost::mutex> l( _lock );
map<string,DBConfigPtr>::iterator it = _databases.find( database.getName() );
if( it != _databases.end() && it->second.get() == &database ){
_databases.erase( it );
log() << "erased database " << database.getName() << " from local registry" << endl;
map<string, DBConfigPtr>::iterator it = _databases.find(database.name());
if (it != _databases.end() && it->second.get() == &database) {
_databases.erase(it);
log() << "erased database " << database.name() << " from local registry";
}
else{
log() << database.getName() << "already erased from local registry" << endl;
else {
log() << database.name() << "already erased from local registry";
}
}
bool Grid::allowLocalHost() const {

View File

@ -371,8 +371,8 @@ namespace mongo {
if ( result["reloadConfig"].trueValue() ) {
if( result["version"].timestampTime() == 0 ){
warning() << "reloading full configuration for " << conf->getName()
<< ", connection state indicates significant version changes" << endl;
warning() << "reloading full configuration for " << conf->name()
<< ", connection state indicates significant version changes";
// reload db
conf->reload();