0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 01:21:03 +01:00

SERVER-940 Balancer now drains chunks off a shard that's being removed

This commit is contained in:
Alberto Lerner 2010-06-23 19:25:03 -07:00
parent 1dc2b9cd17
commit 30b1524a77
8 changed files with 274 additions and 80 deletions

View File

@ -44,6 +44,11 @@ function diff(){
return Math.max( x.shard0 , x.shard1 ) - Math.min( x.shard0 , x.shard1 );
}
function sum(){
var x = dist();
return x.shard0 + x.shard1;
}
assert.lt( 20 , diff() );
print( diff() )
@ -52,5 +57,14 @@ assert.soon( function(){
return d < 5;
} , "balance didn't happen" , 1000 * 60 * 3 , 5000 );
var chunkCount = sum();
host = s.config.shards.findOne({_id : "shard0" }).host;
s.adminCommand( { removeshard: host , forTestingOnly : true } );
assert.soon( function(){
printjson(dist());
s.config.shards.find().forEach(function(z){printjson(z);});
return chunkCount == s.config.chunks.count({shard: "shard1"});
} , "removeshard didn't happen" , 1000 * 60 * 3 , 5000 );
s.stop();

View File

@ -244,8 +244,12 @@ namespace mongo {
for ( vector<Shard>::const_iterator it = allShards.begin(); it != allShards.end(); ++it ){
const Shard& s = *it;
ShardStatus status = s.getStatus();
BSONObj limitsObj = BSON( "maxSize" << s.getMaxSize() << "currSize" << status.mapped() );
shardLimitsMap[s.getName()] = limitsObj;
BSONObj limitsObj = BSON( ShardFields::maxSize( s.getMaxSize() ) <<
ShardFields::currSize( status.mapped() ) <<
ShardFields::draining( s.isDraining()) );
shardLimitsMap[ s.getName() ] = limitsObj;
}
//

View File

@ -19,6 +19,7 @@
#include "pch.h"
#include "config.h"
#include "../client/dbclient.h"
#include "../util/unittest.h"
@ -27,56 +28,82 @@
namespace mongo {
BalancerPolicy::ChunkInfo* BalancerPolicy::balance( const string& ns,
const map< string, BSONObj>& shardLimitsMap,
const map< string,vector<BSONObj> >& shardToChunksMap,
const ShardToLimitsMap& shardToLimitsMap,
const ShardToChunksMap& shardToChunksMap,
int balancedLastTime ){
pair<string,unsigned> min("",9999999);
pair<string,unsigned> min("",numeric_limits<unsigned>::max());
pair<string,unsigned> max("",0);
for ( map< string,vector<BSONObj> >::const_iterator i=shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ){
vector<string> drainingShards;
for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ){
// Find whether this shard has reached its size cap or whether it is being removed.
const string& shard = i->first;
unsigned size = i->second.size();
BSONObj shardLimits;
map< string,BSONObj >::const_iterator it = shardLimitsMap.find( shard );
if ( it != shardLimitsMap.end() ) {
shardLimits = it->second;
}
ShardToLimitsIter it = shardToLimitsMap.find( shard );
if ( it != shardToLimitsMap.end() ) shardLimits = it->second;
const bool maxedOut = isSizeMaxed( shardLimits );
const bool draining = isDraining( shardLimits );
if ( size < min.second ){
if ( isReceiver( shardLimits ) ){
min.first = shard;
min.second = size;
} else {
log() << "balancer: shard can't receive any more chunks (" << shard << ")" << endl;
// Check whether this shard is a better chunk receiver then the current one.
// Maxed out shards or draining shards cannot be considered receivers.
const unsigned size = i->second.size();
if ( ! maxedOut && ! draining ){
if ( size < min.second ){
min = make_pair( shard , size );
}
}
if ( isDraining( shardLimits ) || ( size > max.second )){
max.first = shard;
max.second = size;
// Check whether this shard is a better chunk donor then the current one.
// Draining shards take a lower priority than overloaded shards.
if ( size > max.second ){
max = make_pair( shard , size );
}
if ( draining && (size > 0)){
drainingShards.push_back( shard );
}
}
// If there is no candidate chunk receiver -- they may have all been maxed out,
// draining, ... -- there's not much that the policy can do.
if ( min.second == numeric_limits<unsigned>::max() ){
log() << "balancer: no availalable shards to take chunks" << endl;
return NULL;
}
log(4) << "min: " << min.first << "\t" << min.second << endl;
log(4) << "max: " << max.first << "\t" << max.second << endl;
if( (int)( max.second - min.second) < ( balancedLastTime ? 2 : 8 ) )
log(4) << "draining: " << ! drainingShards.empty() << "(" << drainingShards.size() << ")" << endl;
// Solving imbalances takes a higher priority than draining shards. Many shards can
// be draining at once but we choose only one of them to cater to per round.
const int imbalance = max.second - min.second;
const int threshold = balancedLastTime ? 2 : 8;
string from, to;
if ( imbalance >= threshold ){
from = max.first;
to = min.first;
} else if ( ! drainingShards.empty() ){
from = drainingShards[ random() % drainingShards.size() ];
to = min.first;
} else {
// Everything is balanced here!
return NULL;
}
const string from = max.first;
const string to = min.first;
const vector<BSONObj>& chunksFrom = shardToChunksMap.find( from )->second;
const vector<BSONObj>& chunksTo = shardToChunksMap.find( to )->second;
const vector<BSONObj>& chunksTo = shardToChunksMap.find( to )->second;
BSONObj chunkToMove = pickChunk( chunksFrom , chunksTo );
log() << "balancer: chose chunk from [" << from << "] to [" << to << "] " << chunkToMove << endl;
log() << "balancer: chose [" << from << "] to [" << to << "] " << chunkToMove << endl;
return new ChunkInfo( ns, to, from, chunkToMove );
}
BSONObj BalancerPolicy::pickChunk( const vector<BSONObj>& from, const vector<BSONObj>& to ){
assert( from.size() > to.size() );
// It is possible for a donor ('from') shard to have less chunks than a recevier one ('to')
// if the donor is in draining mode.
if ( to.size() == 0 )
return from[0];
@ -90,36 +117,29 @@ namespace mongo {
return from[0];
}
bool BalancerPolicy::isReceiver( BSONObj limits ){
// A draining shard can never be a receiver
if ( isDraining( limits ) ){
return false;
}
bool BalancerPolicy::isSizeMaxed( BSONObj limits ){
// If there's no limit information for the shard, assume it can be a chunk receiver
// (i.e., there's not bound on space utilization)
if ( limits.isEmpty() ){
return true;
return false;
}
long long maxUsage = limits["maxSize"].Long();
long long maxUsage = limits[ ShardFields::maxSize.name() ].Long();
if ( maxUsage == 0 ){
return true;
return false;
}
long long currUsage = limits["currSize"].Long();
long long currUsage = limits[ ShardFields::currSize.name() ].Long();
if ( currUsage < maxUsage ){
return true;
return false;
}
return false;
return true;
}
bool BalancerPolicy::isDraining( BSONObj limits ){
// If there's no entry saying it is draining, it isn't.
if ( limits.isEmpty() || limits[ "draining" ].eoo() ){
BSONElement draining = limits[ ShardFields::draining.name() ];
if ( draining.eoo() || ! draining.Bool() ){
return false;
}
@ -128,33 +148,141 @@ namespace mongo {
class PolicyObjUnitTest : public UnitTest {
public:
void maxSizeForShard(){
BSONObj shard0 = BSON( "maxSize" << 0LL << "currSize" << 0LL );
assert( BalancerPolicy::isReceiver( shard0 ) );
BSONObj shard1 = BSON( "maxSize" << 100LL << "currSize" << 80LL );
assert( BalancerPolicy::isReceiver( shard1 ) );
typedef ShardFields sf; // convenience alias
BSONObj shard2 = BSON( "maxSize" << 100LL << "currSize" << 110LL );
assert( ! BalancerPolicy::isReceiver( shard2 ) );
void caseSizeMaxedShard(){
BSONObj shard0 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) );
assert( ! BalancerPolicy::isSizeMaxed( shard0 ) );
BSONObj shard3 = BSON( "draining" << true );
assert( ! BalancerPolicy::isReceiver( shard3 ) );
BSONObj shard1 = BSON( sf::maxSize(100LL) << sf::currSize(80LL) );
assert( ! BalancerPolicy::isSizeMaxed( shard1 ) );
BSONObj shard2 = BSON( sf::maxSize(100LL) << sf::currSize(110LL) );
assert( BalancerPolicy::isSizeMaxed( shard2 ) );
BSONObj empty;
assert( BalancerPolicy::isReceiver( empty ) );
assert( ! BalancerPolicy::isSizeMaxed( empty ) );
}
void drainingShard(){
BSONObj shard0 = BSON( "draining" << true );
void caseDrainingShard(){
BSONObj shard0 = BSON( sf::draining(true) );
assert( BalancerPolicy::isDraining( shard0 ) );
BSONObj shard1 = BSON( sf::draining(false) );
assert( ! BalancerPolicy::isDraining( shard1 ) );
BSONObj empty;
assert( ! BalancerPolicy::isDraining( empty ) );
}
void caseBalanceNormal(){
// 2 chunks and 0 chunk shards
BalancerPolicy::ShardToChunksMap chunkMap;
vector<BSONObj> chunks;
chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) <<
"max" << BSON( "x" << 49 )));
chunks.push_back(BSON( "min" << BSON( "x" << 49 ) <<
"max" << BSON( "x" << BSON( "$maxkey"<<1 ))));
chunkMap["shard0"] = chunks;
chunks.clear();
chunkMap["shard1"] = chunks;
// no limits
BalancerPolicy::ShardToLimitsMap limitsMap;
BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(false) );
BSONObj limits1 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) << sf::draining(false) );
limitsMap["shard0"] = limits0;
limitsMap["shard1"] = limits1;
BalancerPolicy::ChunkInfo* c = NULL;
c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 1 );
assert( c != NULL );
}
void caseBalanceDraining(){
// one normal, one draining
// 2 chunks and 0 chunk shards
BalancerPolicy::ShardToChunksMap chunkMap;
vector<BSONObj> chunks;
chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) <<
"max" << BSON( "x" << 49 )));
chunkMap["shard0"] = chunks;
chunks.clear();
chunks.push_back(BSON( "min" << BSON( "x" << 49 ) <<
"max" << BSON( "x" << BSON( "$maxkey"<<1 ))));
chunkMap["shard1"] = chunks;
// shard0 is draining
BalancerPolicy::ShardToLimitsMap limitsMap;
BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(true) );
BSONObj limits1 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) << sf::draining(false) );
limitsMap["shard0"] = limits0;
limitsMap["shard1"] = limits1;
BalancerPolicy::ChunkInfo* c = NULL;
c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 0 );
assert( c != NULL );
assert( c->to == "shard1" );
assert( c->from == "shard0" );
assert( ! c->chunk.isEmpty() );
}
void caseBalanceEndedDraining(){
// 2 chunks and 0 chunk (drain completed) shards
BalancerPolicy::ShardToChunksMap chunkMap;
vector<BSONObj> chunks;
chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) <<
"max" << BSON( "x" << 49 )));
chunks.push_back(BSON( "min" << BSON( "x" << 49 ) <<
"max" << BSON( "x" << BSON( "$maxkey"<<1 ))));
chunkMap["shard0"] = chunks;
chunks.clear();
chunkMap["shard1"] = chunks;
// no limits
BalancerPolicy::ShardToLimitsMap limitsMap;
BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(false) );
BSONObj limits1 = BSON( sf::maxSize(0LL) << sf::currSize(0LL) << sf::draining(true) );
limitsMap["shard0"] = limits0;
limitsMap["shard1"] = limits1;
BalancerPolicy::ChunkInfo* c = NULL;
c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 0 );
assert( c == NULL );
}
void caseBalanceImpasse(){
// one maxed out, one draining
// 2 chunks and 0 chunk shards
BalancerPolicy::ShardToChunksMap chunkMap;
vector<BSONObj> chunks;
chunks.push_back(BSON( "min" << BSON( "x" << BSON( "$minKey"<<1) ) <<
"max" << BSON( "x" << 49 )));
chunkMap["shard0"] = chunks;
chunks.clear();
chunks.push_back(BSON( "min" << BSON( "x" << 49 ) <<
"max" << BSON( "x" << BSON( "$maxkey"<<1 ))));
chunkMap["shard1"] = chunks;
// shard0 is draining, shard1 is maxed out
BalancerPolicy::ShardToLimitsMap limitsMap;
BSONObj limits0 = BSON( sf::maxSize(0LL) << sf::currSize(2LL) << sf::draining(true) );
BSONObj limits1 = BSON( sf::maxSize(1LL) << sf::currSize(1LL) << sf::draining(false) );
limitsMap["shard0"] = limits0;
limitsMap["shard1"] = limits1;
BalancerPolicy::ChunkInfo* c = NULL;
c = BalancerPolicy::balance( "ns", limitsMap, chunkMap, 0 );
assert( c == NULL );
}
void run(){
maxSizeForShard();
caseSizeMaxedShard();
caseDrainingShard();
caseBalanceNormal();
caseBalanceDraining();
caseBalanceImpasse();
log(1) << "policyObjUnitTest passed" << endl;
}
} policyObjUnitTest;

View File

@ -40,24 +40,33 @@ namespace mongo {
* @param balancedLastTime is the number of chunks effectively moved in the last round.
* @returns NULL or ChunkInfo of the best move to make towards balacing the collection.
*/
static ChunkInfo* balance( const string& ns, const map< string, BSONObj>& shardLimitsMap,
const map< string,vector<BSONObj> >& shardToChunksMap, int balancedLastTime );
typedef map< string,BSONObj > ShardToLimitsMap;
typedef map< string,vector<BSONObj> > ShardToChunksMap;
static ChunkInfo* balance( const string& ns, const ShardToLimitsMap& shardToLimitsMap,
const ShardToChunksMap& shardToChunksMap, int balancedLastTime );
// below exposed for testing purposes only -- treat it as private --
static BSONObj pickChunk( const vector<BSONObj>& from, const vector<BSONObj>& to );
/**
* Returns true if a shard can receive new chunks based on that shard's 'shardLimits' information.
* Expects the optional fields "maxSize", can in size in MB, and "usedSize", currently used size in MB,
* on 'shardLimits'.
* Returns true if a shard cannot receive any new chunks bacause it reache 'shardLimits'.
* Expects the optional fields "maxSize", can in size in MB, and "usedSize", currently used size
* in MB, on 'shardLimits'.
*/
static bool isReceiver( BSONObj shardLimits );
static bool isSizeMaxed( BSONObj shardLimits );
/**
* Returns true if 'shardLimist' contains a field "draining".
* Returns true if 'shardLimist' contains a field "draining". Expects the optional field
* "isDraining" on 'shrdLimits'.
*/
static bool isDraining( BSONObj shardLimits );
private:
// Convenience types
typedef ShardToChunksMap::const_iterator ShardToChunksIter;
typedef ShardToLimitsMap::const_iterator ShardToLimitsIter;
};
struct BalancerPolicy::ChunkInfo {

View File

@ -589,7 +589,6 @@ namespace mongo {
bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
ScopedDbConnection conn( configServer.getPrimary() );
string host = cmdObj["addshard"].valuestrsafe();
if ( host == "localhost" || host.find( "localhost:" ) == 0 ||
@ -624,8 +623,8 @@ namespace mongo {
BSONObjBuilder b;
b.append( "_id" , name );
b.append( "host" , host );
if ( cmdObj["maxSize"].isNumber() )
b.append( cmdObj["maxSize"] );
if ( cmdObj[ ShardFields::maxSize.name() ].isNumber() )
b.append( cmdObj[ ShardFields::maxSize.name() ] );
shard = b.obj();
}
@ -669,20 +668,47 @@ namespace mongo {
public:
RemoveShardCmd() : GridAdminCmd("removeshard") { }
virtual void help( stringstream& help ) const {
help << "remove a shard to the system.\nshard must be empty or command will return an error.";
help << "remove a shard to the system.";
}
bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
if ( 1 ){
if ( ! cmdObj["forTestingOnly"].trueValue() ){
errmsg = "removeshard not yet implemented";
return 0;
return false;
}
string shard = cmdObj["removeshard"].valuestrsafe();
if ( ! grid.knowAboutShard( shard ) ){
errmsg = "that server isn't known to me";
return false;
}
ScopedDbConnection conn( configServer.getPrimary() );
BSONObj server = BSON( "host" << cmdObj["removeshard"].valuestrsafe() );
conn->remove( "config.shards" , server );
BSONObj drainingDoc = BSON( "host" << shard << ShardFields::draining(true) );
BSONObj old = conn->findOne( "config.shards", drainingDoc );
if ( ! old.isEmpty() ){
result.append( "msg" , "shard already being removed" );
conn.done();
return false;
}
// TODO prevent move chunks to this shard.
log() << "going to start draining shard: " << shard << endl;
BSONObj shardDoc = BSON( "host" << shard );
BSONObj newStatus = BSON( "$set" << BSON( ShardFields::draining(true) ) );
conn->update( "config.shards" , shardDoc , newStatus, false /* do no upsert */);
errmsg = conn->getLastError();
if ( errmsg.size() ){
log() << "error removing shard at: " << shard << " err: " << errmsg << endl;
return false;
}
result.append( "started draining" , shard );
conn.done();
Shard::reloadShardInfo();
return true;
}
} removeShardCmd;

View File

@ -39,6 +39,11 @@ namespace mongo {
string ShardNS::mongos = "config.mongos";
string ShardNS::settings = "config.settings";
BSONField<bool> ShardFields::draining("draining");
BSONField<long long> ShardFields::maxSize ("maxSize");
BSONField<long long> ShardFields::currSize("currSize");
/* --- DBConfig --- */
string DBConfig::modelServer() {

View File

@ -39,7 +39,15 @@ namespace mongo {
static string settings;
};
/**
* Field names used in the 'shards' collection.
*/
struct ShardFields {
static BSONField<bool> draining;
static BSONField<long long> maxSize;
static BSONField<long long> currSize;
};
class Grid;
class ConfigServer;
@ -65,7 +73,7 @@ namespace mongo {
/**
* top level grid configuration for an entire database
* TODO: use shared_ptr for ChunkManager
*/
*/
class DBConfig : public Model {
public:
@ -223,5 +231,5 @@ namespace mongo {
private:
string getHost( string name , bool withPort );
};
} // namespace mongo

View File

@ -47,13 +47,13 @@ namespace mongo {
string host = o["host"].String();
long long maxSize = 0;
BSONElement maxSizeElem = o["maxSize"];
BSONElement maxSizeElem = o[ ShardFields::maxSize.name() ];
if ( ! maxSizeElem.eoo() ){
maxSize = maxSizeElem.numberLong();
}
bool isDraining = false;
BSONElement isDrainingElem = o["isDraining"];
BSONElement isDrainingElem = o[ ShardFields::draining.name() ];
if ( ! isDrainingElem.eoo() ){
isDraining = isDrainingElem.Bool();
}