diff --git a/jstests/slow/sharding_balance1.js b/jstests/slow/sharding_balance1.js index 184864a2f9f..1fff0dcbe39 100644 --- a/jstests/slow/sharding_balance1.js +++ b/jstests/slow/sharding_balance1.js @@ -44,6 +44,11 @@ function diff(){ return Math.max( x.shard0 , x.shard1 ) - Math.min( x.shard0 , x.shard1 ); } +function sum(){ + var x = dist(); + return x.shard0 + x.shard1; +} + assert.lt( 20 , diff() ); print( diff() ) @@ -52,5 +57,14 @@ assert.soon( function(){ return d < 5; } , "balance didn't happen" , 1000 * 60 * 3 , 5000 ); +var chunkCount = sum(); +host = s.config.shards.findOne({_id : "shard0" }).host; +s.adminCommand( { removeshard: host , forTestingOnly : true } ); + +assert.soon( function(){ + printjson(dist()); + s.config.shards.find().forEach(function(z){printjson(z);}); + return chunkCount == s.config.chunks.count({shard: "shard1"}); +} , "removeshard didn't happen" , 1000 * 60 * 3 , 5000 ); s.stop(); diff --git a/s/balance.cpp b/s/balance.cpp index 6b08bf3d382..03328b033ba 100644 --- a/s/balance.cpp +++ b/s/balance.cpp @@ -244,8 +244,12 @@ namespace mongo { for ( vector::const_iterator it = allShards.begin(); it != allShards.end(); ++it ){ const Shard& s = *it; ShardStatus status = s.getStatus(); - BSONObj limitsObj = BSON( "maxSize" << s.getMaxSize() << "currSize" << status.mapped() ); - shardLimitsMap[s.getName()] = limitsObj; + + BSONObj limitsObj = BSON( ShardFields::maxSize( s.getMaxSize() ) << + ShardFields::currSize( status.mapped() ) << + ShardFields::draining( s.isDraining()) ); + + shardLimitsMap[ s.getName() ] = limitsObj; } // diff --git a/s/balancer_policy.cpp b/s/balancer_policy.cpp index f92cb41aaf6..bc68fe719a7 100644 --- a/s/balancer_policy.cpp +++ b/s/balancer_policy.cpp @@ -19,6 +19,7 @@ #include "pch.h" #include "config.h" + #include "../client/dbclient.h" #include "../util/unittest.h" @@ -27,56 +28,82 @@ namespace mongo { BalancerPolicy::ChunkInfo* BalancerPolicy::balance( const string& ns, - const map< string, BSONObj>& shardLimitsMap, - const map< string,vector >& shardToChunksMap, + const ShardToLimitsMap& shardToLimitsMap, + const ShardToChunksMap& shardToChunksMap, int balancedLastTime ){ - pair min("",9999999); + pair min("",numeric_limits::max()); pair max("",0); - - for ( map< string,vector >::const_iterator i=shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ){ + vector drainingShards; + + for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ){ + + // Find whether this shard has reached its size cap or whether it is being removed. const string& shard = i->first; - unsigned size = i->second.size(); - BSONObj shardLimits; - map< string,BSONObj >::const_iterator it = shardLimitsMap.find( shard ); - if ( it != shardLimitsMap.end() ) { - shardLimits = it->second; - } + ShardToLimitsIter it = shardToLimitsMap.find( shard ); + if ( it != shardToLimitsMap.end() ) shardLimits = it->second; + const bool maxedOut = isSizeMaxed( shardLimits ); + const bool draining = isDraining( shardLimits ); - if ( size < min.second ){ - if ( isReceiver( shardLimits ) ){ - min.first = shard; - min.second = size; - } else { - log() << "balancer: shard can't receive any more chunks (" << shard << ")" << endl; + // Check whether this shard is a better chunk receiver then the current one. + // Maxed out shards or draining shards cannot be considered receivers. + const unsigned size = i->second.size(); + if ( ! maxedOut && ! draining ){ + if ( size < min.second ){ + min = make_pair( shard , size ); } } - - if ( isDraining( shardLimits ) || ( size > max.second )){ - max.first = shard; - max.second = size; + + // Check whether this shard is a better chunk donor then the current one. + // Draining shards take a lower priority than overloaded shards. + if ( size > max.second ){ + max = make_pair( shard , size ); } + if ( draining && (size > 0)){ + drainingShards.push_back( shard ); + } + } + + // If there is no candidate chunk receiver -- they may have all been maxed out, + // draining, ... -- there's not much that the policy can do. + if ( min.second == numeric_limits::max() ){ + log() << "balancer: no availalable shards to take chunks" << endl; + return NULL; } log(4) << "min: " << min.first << "\t" << min.second << endl; log(4) << "max: " << max.first << "\t" << max.second << endl; - - if( (int)( max.second - min.second) < ( balancedLastTime ? 2 : 8 ) ) + log(4) << "draining: " << ! drainingShards.empty() << "(" << drainingShards.size() << ")" << endl; + + // Solving imbalances takes a higher priority than draining shards. Many shards can + // be draining at once but we choose only one of them to cater to per round. + const int imbalance = max.second - min.second; + const int threshold = balancedLastTime ? 2 : 8; + string from, to; + if ( imbalance >= threshold ){ + from = max.first; + to = min.first; + + } else if ( ! drainingShards.empty() ){ + from = drainingShards[ random() % drainingShards.size() ]; + to = min.first; + + } else { + // Everything is balanced here! return NULL; + } - const string from = max.first; - const string to = min.first; const vector& chunksFrom = shardToChunksMap.find( from )->second; - const vector& chunksTo = shardToChunksMap.find( to )->second; + const vector& chunksTo = shardToChunksMap.find( to )->second; BSONObj chunkToMove = pickChunk( chunksFrom , chunksTo ); - - log() << "balancer: chose chunk from [" << from << "] to [" << to << "] " << chunkToMove << endl; + log() << "balancer: chose [" << from << "] to [" << to << "] " << chunkToMove << endl; return new ChunkInfo( ns, to, from, chunkToMove ); } BSONObj BalancerPolicy::pickChunk( const vector& from, const vector& to ){ - assert( from.size() > to.size() ); + // It is possible for a donor ('from') shard to have less chunks than a recevier one ('to') + // if the donor is in draining mode. if ( to.size() == 0 ) return from[0]; @@ -90,36 +117,29 @@ namespace mongo { return from[0]; } - bool BalancerPolicy::isReceiver( BSONObj limits ){ - - // A draining shard can never be a receiver - if ( isDraining( limits ) ){ - return false; - } - + bool BalancerPolicy::isSizeMaxed( BSONObj limits ){ // If there's no limit information for the shard, assume it can be a chunk receiver // (i.e., there's not bound on space utilization) if ( limits.isEmpty() ){ - return true; + return false; } - long long maxUsage = limits["maxSize"].Long(); + long long maxUsage = limits[ ShardFields::maxSize.name() ].Long(); if ( maxUsage == 0 ){ - return true; + return false; } - long long currUsage = limits["currSize"].Long(); + long long currUsage = limits[ ShardFields::currSize.name() ].Long(); if ( currUsage < maxUsage ){ - return true; + return false; } - return false; + return true; } bool BalancerPolicy::isDraining( BSONObj limits ){ - - // If there's no entry saying it is draining, it isn't. - if ( limits.isEmpty() || limits[ "draining" ].eoo() ){ + BSONElement draining = limits[ ShardFields::draining.name() ]; + if ( draining.eoo() || ! draining.Bool() ){ return false; } @@ -128,33 +148,141 @@ namespace mongo { class PolicyObjUnitTest : public UnitTest { public: - void maxSizeForShard(){ - BSONObj shard0 = BSON( "maxSize" << 0LL << "currSize" << 0LL ); - assert( BalancerPolicy::isReceiver( shard0 ) ); - BSONObj shard1 = BSON( "maxSize" << 100LL << "currSize" << 80LL ); - assert( BalancerPolicy::isReceiver( shard1 ) ); + typedef ShardFields sf; // convenience alias - BSONObj shard2 = BSON( "maxSize" << 100LL << "currSize" << 110LL ); - assert( ! BalancerPolicy::isReceiver( shard2 ) ); + void caseSizeMaxedShard(){ + BSONObj shard0 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) ); + assert( ! BalancerPolicy::isSizeMaxed( shard0 ) ); - BSONObj shard3 = BSON( "draining" << true ); - assert( ! BalancerPolicy::isReceiver( shard3 ) ); + BSONObj shard1 = BSON( sf::maxSize(100LL) << sf::currSize(80LL) ); + assert( ! BalancerPolicy::isSizeMaxed( shard1 ) ); + + BSONObj shard2 = BSON( sf::maxSize(100LL) << sf::currSize(110LL) ); + assert( BalancerPolicy::isSizeMaxed( shard2 ) ); BSONObj empty; - assert( BalancerPolicy::isReceiver( empty ) ); + assert( ! BalancerPolicy::isSizeMaxed( empty ) ); } - void drainingShard(){ - BSONObj shard0 = BSON( "draining" << true ); + void caseDrainingShard(){ + BSONObj shard0 = BSON( sf::draining(true) ); assert( BalancerPolicy::isDraining( shard0 ) ); + BSONObj shard1 = BSON( sf::draining(false) ); + assert( ! BalancerPolicy::isDraining( shard1 ) ); + BSONObj empty; assert( ! BalancerPolicy::isDraining( empty ) ); } + void caseBalanceNormal(){ + // 2 chunks and 0 chunk shards + BalancerPolicy::ShardToChunksMap chunkMap; + vector chunks; + chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) << + "max" << BSON( "x" << 49 ))); + chunks.push_back(BSON( "min" << BSON( "x" << 49 ) << + "max" << BSON( "x" << BSON( "$maxkey"<<1 )))); + chunkMap["shard0"] = chunks; + chunks.clear(); + chunkMap["shard1"] = chunks; + + // no limits + BalancerPolicy::ShardToLimitsMap limitsMap; + BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(false) ); + BSONObj limits1 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) << sf::draining(false) ); + limitsMap["shard0"] = limits0; + limitsMap["shard1"] = limits1; + + BalancerPolicy::ChunkInfo* c = NULL; + c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 1 ); + assert( c != NULL ); + } + + void caseBalanceDraining(){ + // one normal, one draining + // 2 chunks and 0 chunk shards + BalancerPolicy::ShardToChunksMap chunkMap; + vector chunks; + chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) << + "max" << BSON( "x" << 49 ))); + chunkMap["shard0"] = chunks; + chunks.clear(); + chunks.push_back(BSON( "min" << BSON( "x" << 49 ) << + "max" << BSON( "x" << BSON( "$maxkey"<<1 )))); + chunkMap["shard1"] = chunks; + + // shard0 is draining + BalancerPolicy::ShardToLimitsMap limitsMap; + BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(true) ); + BSONObj limits1 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) << sf::draining(false) ); + limitsMap["shard0"] = limits0; + limitsMap["shard1"] = limits1; + + BalancerPolicy::ChunkInfo* c = NULL; + c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 0 ); + assert( c != NULL ); + assert( c->to == "shard1" ); + assert( c->from == "shard0" ); + assert( ! c->chunk.isEmpty() ); + } + + void caseBalanceEndedDraining(){ + // 2 chunks and 0 chunk (drain completed) shards + BalancerPolicy::ShardToChunksMap chunkMap; + vector chunks; + chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) << + "max" << BSON( "x" << 49 ))); + chunks.push_back(BSON( "min" << BSON( "x" << 49 ) << + "max" << BSON( "x" << BSON( "$maxkey"<<1 )))); + chunkMap["shard0"] = chunks; + chunks.clear(); + chunkMap["shard1"] = chunks; + + // no limits + BalancerPolicy::ShardToLimitsMap limitsMap; + BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(false) ); + BSONObj limits1 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) << sf::draining(true) ); + limitsMap["shard0"] = limits0; + limitsMap["shard1"] = limits1; + + BalancerPolicy::ChunkInfo* c = NULL; + c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 0 ); + assert( c == NULL ); + } + + void caseBalanceImpasse(){ + // one maxed out, one draining + // 2 chunks and 0 chunk shards + BalancerPolicy::ShardToChunksMap chunkMap; + vector chunks; + chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) << + "max" << BSON( "x" << 49 ))); + chunkMap["shard0"] = chunks; + chunks.clear(); + chunks.push_back(BSON( "min" << BSON( "x" << 49 ) << + "max" << BSON( "x" << BSON( "$maxkey"<<1 )))); + chunkMap["shard1"] = chunks; + + // shard0 is draining, shard1 is maxed out + BalancerPolicy::ShardToLimitsMap limitsMap; + BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(true) ); + BSONObj limits1 = BSON( sf::maxSize(1LL) << sf::currSize(1LL) << sf::draining(false) ); + limitsMap["shard0"] = limits0; + limitsMap["shard1"] = limits1; + + BalancerPolicy::ChunkInfo* c = NULL; + c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 0 ); + assert( c == NULL ); + } + void run(){ - maxSizeForShard(); + caseSizeMaxedShard(); + caseDrainingShard(); + caseBalanceNormal(); + caseBalanceDraining(); + caseBalanceImpasse(); log(1) << "policyObjUnitTest passed" << endl; } } policyObjUnitTest; diff --git a/s/balancer_policy.h b/s/balancer_policy.h index b78e7da54ac..3622edcf7d4 100644 --- a/s/balancer_policy.h +++ b/s/balancer_policy.h @@ -40,24 +40,33 @@ namespace mongo { * @param balancedLastTime is the number of chunks effectively moved in the last round. * @returns NULL or ChunkInfo of the best move to make towards balacing the collection. */ - static ChunkInfo* balance( const string& ns, const map< string, BSONObj>& shardLimitsMap, - const map< string,vector >& shardToChunksMap, int balancedLastTime ); + typedef map< string,BSONObj > ShardToLimitsMap; + typedef map< string,vector > ShardToChunksMap; + static ChunkInfo* balance( const string& ns, const ShardToLimitsMap& shardToLimitsMap, + const ShardToChunksMap& shardToChunksMap, int balancedLastTime ); // below exposed for testing purposes only -- treat it as private -- static BSONObj pickChunk( const vector& from, const vector& to ); /** - * Returns true if a shard can receive new chunks based on that shard's 'shardLimits' information. - * Expects the optional fields "maxSize", can in size in MB, and "usedSize", currently used size in MB, - * on 'shardLimits'. + * Returns true if a shard cannot receive any new chunks bacause it reache 'shardLimits'. + * Expects the optional fields "maxSize", can in size in MB, and "usedSize", currently used size + * in MB, on 'shardLimits'. */ - static bool isReceiver( BSONObj shardLimits ); + static bool isSizeMaxed( BSONObj shardLimits ); /** - * Returns true if 'shardLimist' contains a field "draining". + * Returns true if 'shardLimist' contains a field "draining". Expects the optional field + * "isDraining" on 'shrdLimits'. */ static bool isDraining( BSONObj shardLimits ); + + private: + // Convenience types + typedef ShardToChunksMap::const_iterator ShardToChunksIter; + typedef ShardToLimitsMap::const_iterator ShardToLimitsIter; + }; struct BalancerPolicy::ChunkInfo { diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp index 4e56f9aaa01..84bd33bd602 100644 --- a/s/commands_admin.cpp +++ b/s/commands_admin.cpp @@ -589,7 +589,6 @@ namespace mongo { bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ ScopedDbConnection conn( configServer.getPrimary() ); - string host = cmdObj["addshard"].valuestrsafe(); if ( host == "localhost" || host.find( "localhost:" ) == 0 || @@ -624,8 +623,8 @@ namespace mongo { BSONObjBuilder b; b.append( "_id" , name ); b.append( "host" , host ); - if ( cmdObj["maxSize"].isNumber() ) - b.append( cmdObj["maxSize"] ); + if ( cmdObj[ ShardFields::maxSize.name() ].isNumber() ) + b.append( cmdObj[ ShardFields::maxSize.name() ] ); shard = b.obj(); } @@ -669,20 +668,47 @@ namespace mongo { public: RemoveShardCmd() : GridAdminCmd("removeshard") { } virtual void help( stringstream& help ) const { - help << "remove a shard to the system.\nshard must be empty or command will return an error."; + help << "remove a shard to the system."; } bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - if ( 1 ){ + if ( ! cmdObj["forTestingOnly"].trueValue() ){ errmsg = "removeshard not yet implemented"; - return 0; + return false; + } + + string shard = cmdObj["removeshard"].valuestrsafe(); + if ( ! grid.knowAboutShard( shard ) ){ + errmsg = "that server isn't known to me"; + return false; } ScopedDbConnection conn( configServer.getPrimary() ); - BSONObj server = BSON( "host" << cmdObj["removeshard"].valuestrsafe() ); - conn->remove( "config.shards" , server ); + BSONObj drainingDoc = BSON( "host" << shard << ShardFields::draining(true) ); + BSONObj old = conn->findOne( "config.shards", drainingDoc ); + if ( ! old.isEmpty() ){ + result.append( "msg" , "shard already being removed" ); + conn.done(); + return false; + } + // TODO prevent move chunks to this shard. + + log() << "going to start draining shard: " << shard << endl; + BSONObj shardDoc = BSON( "host" << shard ); + BSONObj newStatus = BSON( "$set" << BSON( ShardFields::draining(true) ) ); + conn->update( "config.shards" , shardDoc , newStatus, false /* do no upsert */); + errmsg = conn->getLastError(); + if ( errmsg.size() ){ + log() << "error removing shard at: " << shard << " err: " << errmsg << endl; + return false; + } + + result.append( "started draining" , shard ); conn.done(); + + Shard::reloadShardInfo(); + return true; } } removeShardCmd; diff --git a/s/config.cpp b/s/config.cpp index 41fb4cec6c7..50dbb29b21a 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -39,6 +39,11 @@ namespace mongo { string ShardNS::mongos = "config.mongos"; string ShardNS::settings = "config.settings"; + BSONField ShardFields::draining("draining"); + BSONField ShardFields::maxSize ("maxSize"); + BSONField ShardFields::currSize("currSize"); + + /* --- DBConfig --- */ string DBConfig::modelServer() { diff --git a/s/config.h b/s/config.h index 7be22271130..b447233582b 100644 --- a/s/config.h +++ b/s/config.h @@ -39,7 +39,15 @@ namespace mongo { static string settings; }; - + /** + * Field names used in the 'shards' collection. + */ + struct ShardFields { + static BSONField draining; + static BSONField maxSize; + static BSONField currSize; + }; + class Grid; class ConfigServer; @@ -65,7 +73,7 @@ namespace mongo { /** * top level grid configuration for an entire database * TODO: use shared_ptr for ChunkManager - */ + */ class DBConfig : public Model { public: @@ -223,5 +231,5 @@ namespace mongo { private: string getHost( string name , bool withPort ); }; - + } // namespace mongo diff --git a/s/shard.cpp b/s/shard.cpp index cec5ea53c4d..4633e335795 100644 --- a/s/shard.cpp +++ b/s/shard.cpp @@ -47,13 +47,13 @@ namespace mongo { string host = o["host"].String(); long long maxSize = 0; - BSONElement maxSizeElem = o["maxSize"]; + BSONElement maxSizeElem = o[ ShardFields::maxSize.name() ]; if ( ! maxSizeElem.eoo() ){ maxSize = maxSizeElem.numberLong(); } bool isDraining = false; - BSONElement isDrainingElem = o["isDraining"]; + BSONElement isDrainingElem = o[ ShardFields::draining.name() ]; if ( ! isDrainingElem.eoo() ){ isDraining = isDrainingElem.Bool(); }