0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00
mongodb/s/chunk.cpp
2010-09-10 23:27:10 -04:00

1273 lines
45 KiB
C++

// shard.cpp
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pch.h"
#include "chunk.h"
#include "config.h"
#include "grid.h"
#include "../util/unittest.h"
#include "../client/connpool.h"
#include "../db/queryutil.h"
#include "cursors.h"
#include "strategy.h"
namespace mongo {
inline bool allOfType(BSONType type, const BSONObj& o){
BSONObjIterator it(o);
while(it.more()){
if (it.next().type() != type)
return false;
}
return true;
}
// ------- Shard --------
int Chunk::MaxChunkSize = 1024 * 1024 * 200;
Chunk::Chunk( ChunkManager * manager )
: _manager(manager),
_lastmod(0), _modified(false), _dataWritten(0)
{}
Chunk::Chunk(ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard)
: _manager(info), _min(min), _max(max), _shard(shard),
_lastmod(0), _modified(false), _dataWritten(0)
{}
string Chunk::getns() const {
assert( _manager );
return _manager->getns();
}
void Chunk::setShard( const Shard& s ){
_shard = s;
_manager->_migrationNotification(this);
_markModified();
}
bool Chunk::contains( const BSONObj& obj ) const{
return
_manager->getShardKey().compare( getMin() , obj ) <= 0 &&
_manager->getShardKey().compare( obj , getMax() ) < 0;
}
bool ChunkRange::contains(const BSONObj& obj) const {
// same as Chunk method
return
_manager->getShardKey().compare( getMin() , obj ) <= 0 &&
_manager->getShardKey().compare( obj , getMax() ) < 0;
}
bool Chunk::minIsInf() const {
return _manager->getShardKey().globalMin().woCompare( getMin() ) == 0;
}
bool Chunk::maxIsInf() const {
return _manager->getShardKey().globalMax().woCompare( getMax() ) == 0;
}
BSONObj Chunk::pickSplitPoint() const{
int sort = 0;
if ( minIsInf() ){
sort = 1;
}
else if ( maxIsInf() ){
sort = -1;
}
if ( sort ){
ShardConnection conn( getShard().getConnString() , _manager->getns() );
Query q;
if ( sort == 1 )
q.sort( _manager->getShardKey().key() );
else {
BSONObj k = _manager->getShardKey().key();
BSONObjBuilder r;
BSONObjIterator i(k);
while( i.more() ) {
BSONElement e = i.next();
uassert( 10163 , "can only handle numbers here - which i think is correct" , e.isNumber() );
r.append( e.fieldName() , -1 * e.number() );
}
q.sort( r.obj() );
}
BSONObj end = conn->findOne( _manager->getns() , q );
conn.done();
if ( ! end.isEmpty() )
return _manager->getShardKey().extractKey( end );
}
BSONObj cmd = BSON( "medianKey" << _manager->getns()
<< "keyPattern" << _manager->getShardKey().key()
<< "min" << getMin()
<< "max" << getMax() );
ScopedDbConnection conn( getShard().getConnString() );
BSONObj result;
if ( ! conn->runCommand( "admin" , cmd , result ) ){
stringstream ss;
ss << "medianKey command failed: " << result;
uassert( 10164 , ss.str() , 0 );
}
BSONObj median = result.getObjectField( "median" ).getOwned();
if (median == getMin()){
Query q;
q.minKey(_min).maxKey(_max);
q.sort(_manager->getShardKey().key());
median = conn->findOne(_manager->getns(), q);
median = _manager->getShardKey().extractKey( median );
}
conn.done();
if ( median < getMin() || median >= getMax() ){
stringstream ss;
ss << "medianKey returned value out of range. "
<< " cmd: " << cmd
<< " result: " << result;
uasserted( 13394 , ss.str() );
}
return median;
}
void Chunk::pickSplitVector( vector<BSONObj>* splitPoints , int chunkSize /* in bytes */) const {
// Ask the mongod holding this chunk to figure out the split points.
ScopedDbConnection conn( getShard().getConnString() );
BSONObj result;
BSONObjBuilder cmd;
cmd.append( "splitVector" , _manager->getns() );
cmd.append( "keyPattern" , _manager->getShardKey().key() );
cmd.append( "min" , getMin() );
cmd.append( "max" , getMax() );
cmd.append( "maxChunkSize" , chunkSize / (1<<20) /* in MBs */ );
BSONObj cmdObj = cmd.obj();
if ( ! conn->runCommand( "admin" , cmdObj , result )){
ostringstream os;
os << "splitVector command failed: " << result;
uassert( 13345 , os.str() , 0 );
}
BSONObjIterator it( result.getObjectField( "splitKeys" ) );
while ( it.more() ){
splitPoints->push_back( it.next().Obj().getOwned() );
}
conn.done();
}
ChunkPtr Chunk::split(){
vector<BSONObj> splitPoints;
splitPoints.push_back( pickSplitPoint() );
return multiSplit( splitPoints );
}
ChunkPtr Chunk::multiSplit( const vector<BSONObj>& m ){
dist_lock_try dlk( &_manager->_nsLock , string("split-") + toString() );
uassert( 10166 , "locking namespace failed" , dlk.got() );
return multiSplit_inlock( m );
}
ChunkPtr Chunk::multiSplit_inlock( const vector<BSONObj>& m ){
const size_t maxSplitPoints = 256;
uassert( 10165 , "can't split as shard doesn't have a manager" , _manager );
uassert( 13332 , "need a split key to split chunk" , !m.empty() );
uassert( 13333 , "can't split a chunk in that many parts", m.size() < maxSplitPoints );
uassert( 13003 , "can't split a chunk with only one distinct value" , _min.woCompare(_max) );
{
ShardChunkVersion onServer = getVersionOnConfigServer();
ShardChunkVersion mine = _lastmod;
if ( onServer > mine ){
stringstream ss;
ss << "mulitSplit failing because config not up to date"
<< " onServer: " << onServer.toString()
<< " mine: " << mine.toString();
//reload config
grid.getDBConfig(_manager->_ns)->getChunkManager(_manager->_ns, true);
uasserted( 13387 , ss.str() );
}
}
// Now that we're sure to have the freshest data about the shard, sanity check the split
// keys. Recall we get the split keys without any lock. It is possible that another split
// started in front of us and made the split vector used here invalid.
if ( ( _min.woCompare( m.front() ) > 0 ) || ( _max.woCompare( m.back() ) <= 0 ) ){
stringstream ss;
ss << "attempt to split on stale range (other split got through first?)"
<< " min / max on split request: " << m.front() << " / " << m.back()
<< " min / max on shard: " << _min << " / " << _max;
uasserted( 13442 , ss.str() );
}
BSONObjBuilder detail;
appendShortVersion( "before" , detail );
log(1) << "before split on " << m.size() << " points " << toString() << endl;
// Iterate over the split points in 'm', splitting off a new chunk per entry. That chunk's range
// covers until the next entry in 'm' or _max .
vector<ChunkPtr> newChunks;
vector<BSONObj>::const_iterator i = m.begin();
BSONObj nextPoint = i->getOwned();
_markModified();
do {
BSONObj splitPoint = nextPoint;
log(4) << "splitPoint: " << splitPoint << endl;
nextPoint = (++i != m.end()) ? i->getOwned() : _max.getOwned();
log(4) << "nextPoint: " << nextPoint << endl;
if ( nextPoint <= splitPoint) {
stringstream ss;
ss << "multiSplit failing because keys min: " << splitPoint << " and max: " << nextPoint
<< " do not define a valid chunk";
uasserted( 13395, ss.str() );
}
ChunkPtr s( new Chunk( _manager, splitPoint , nextPoint , _shard) );
s->_markModified();
newChunks.push_back(s);
} while ( i != m.end() );
// Have the chunk manager reflect the key change for the first chunk and create an entry for every
// new chunk spawned by it.
{
rwlock lk( _manager->_lock , true );
setMax(m[0].getOwned());
DEV assert( shared_from_this() );
_manager->_chunkMap[_max] = shared_from_this();
for ( vector<ChunkPtr>::const_iterator it = newChunks.begin(); it != newChunks.end(); ++it ){
ChunkPtr s = *it;
_manager->_chunkMap[s->getMax()] = s;
}
}
log(1) << "after split adjusted range: " << toString() << endl;
for ( vector<ChunkPtr>::const_iterator it = newChunks.begin(); it != newChunks.end(); ++it ){
ChunkPtr s = *it;
log(1) << "after split created new chunk: " << s->toString() << endl;
}
// Save the new key boundaries in the configDB.
_manager->save( false );
// Log all these changes in the configDB's log. We log a simple split differently than a multi-split.
if ( newChunks.size() == 1) {
appendShortVersion( "left" , detail );
newChunks[0]->appendShortVersion( "right" , detail );
configServer.logChange( "split" , _manager->getns(), detail.obj() );
} else {
BSONObj beforeDetailObj = detail.obj();
BSONObj firstDetailObj = beforeDetailObj.getOwned();
const int newChunksSize = newChunks.size();
BSONObjBuilder firstDetail;
firstDetail.appendElements( beforeDetailObj );
firstDetail.append( "number" , 0 );
firstDetail.append( "of" , newChunksSize );
appendShortVersion( "chunk" , firstDetail );
configServer.logChange( "multi-split" , _manager->getns() , firstDetail.obj() );
for ( int i=0; i < newChunksSize; i++ ){
BSONObjBuilder chunkDetail;
chunkDetail.appendElements( beforeDetailObj );
chunkDetail.append( "number", i+1 );
chunkDetail.append( "of" , newChunksSize );
newChunks[i]->appendShortVersion( "chunk" , chunkDetail );
configServer.logChange( "multi-split" , _manager->getns() , chunkDetail.obj() );
}
}
return newChunks[0];
}
bool Chunk::moveAndCommit( const Shard& to , string& errmsg ){
uassert( 10167 , "can't move shard to its current location!" , getShard() != to );
log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl;
Shard from = _shard;
ScopedDbConnection fromconn( from);
BSONObj res;
bool worked = fromconn->runCommand( "admin" ,
BSON( "moveChunk" << _manager->getns() <<
"from" << from.getConnString() <<
"to" << to.getConnString() <<
"min" << _min <<
"max" << _max <<
"shardId" << genID() <<
"configdb" << configServer.modelServer()
) ,
res
);
fromconn.done();
if ( worked ){
_manager->_reload();
return true;
}
errmsg = res["errmsg"].String();
errmsg += " " + res.toString();
return false;
}
bool Chunk::splitIfShould( long dataWritten ){
LastError::Disabled d( lastError.get() );
try {
return _splitIfShould( dataWritten );
}
catch ( std::exception& e ){
error() << "splitIfShould failed: " << e.what() << endl;
return false;
}
}
bool Chunk::_splitIfShould( long dataWritten ){
_dataWritten += dataWritten;
// split faster in early chunks helps spread out an initial load better
int splitThreshold;
const int minChunkSize = 1 << 20; // 1 MBytes
int numChunks = getManager()->numChunks();
if ( numChunks < 10 ){
splitThreshold = max( MaxChunkSize / 4 , minChunkSize );
} else if ( numChunks < 20 ){
splitThreshold = max( MaxChunkSize / 2 , minChunkSize );
} else {
splitThreshold = max( MaxChunkSize , minChunkSize );
}
if ( minIsInf() || maxIsInf() ){
splitThreshold = (int) ((double)splitThreshold * .9);
}
if ( _dataWritten < splitThreshold / 5 )
return false;
ChunkPtr newShard;
{
// putting this in its own scope so moveIfShould is done outside
dist_lock_try dlk( &_manager->_nsLock , string("split-") + toString() );
if ( ! dlk.got() )
return false;
log(3) << "\t splitIfShould : " << *this << endl;
_dataWritten = 0;
BSONObj splitPoint = pickSplitPoint();
if ( splitPoint.isEmpty() || _min == splitPoint || _max == splitPoint) {
error() << "want to split chunk, but can't find split point "
<< " chunk: " << toString() << " got: " << splitPoint << endl;
return false;
}
long size = getPhysicalSize();
if ( size < splitThreshold )
return false;
log() << "autosplitting " << _manager->getns() << " size: " << size << " shard: " << toString()
<< " on: " << splitPoint << "(splitThreshold " << splitThreshold << ")" << endl;
vector<BSONObj> splitPoints;
splitPoints.push_back( splitPoint );
newShard = multiSplit_inlock( splitPoints );
}
// since a chunk move is done by the donor shard, it should be responsible for locking the ns
moveIfShould( newShard );
return true;
}
bool Chunk::moveIfShould( ChunkPtr newChunk ){
ChunkPtr toMove;
if ( newChunk->countObjects(2) <= 1 ){
toMove = newChunk;
}
else if ( this->countObjects(2) <= 1 ){
DEV assert( shared_from_this() );
toMove = shared_from_this();
}
else {
// moving middle shards is handled by balancer
return false;
}
assert( toMove );
Shard newLocation = Shard::pick( getShard() );
if ( getShard() == newLocation ){
// if this is the best shard, then we shouldn't do anything (Shard::pick already logged our shard).
log(1) << "recently split chunk: " << toString() << "already in the best shard" << endl;
return 0;
}
log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation.toString() << " #objects: " << toMove->countObjects() << endl;
string errmsg;
massert( 10412 , (string)"moveAndCommit failed: " + errmsg ,
toMove->moveAndCommit( newLocation , errmsg ) );
return true;
}
long Chunk::getPhysicalSize() const{
ScopedDbConnection conn( getShard().getConnString() );
BSONObj result;
uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" ,
BSON( "datasize" << _manager->getns()
<< "keyPattern" << _manager->getShardKey().key()
<< "min" << getMin()
<< "max" << getMax()
<< "maxSize" << ( MaxChunkSize + 1 )
<< "estimate" << true
) , result ) );
conn.done();
return (long)result["size"].number();
}
int Chunk::countObjects(int maxCount) const {
static const BSONObj fields = BSON("_id" << 1 );
ShardConnection conn( getShard() , _manager->getns() );
// not using regular count as this is more flexible and supports $min/$max
Query q = Query().minKey(_min).maxKey(_max);
int n;
{
auto_ptr<DBClientCursor> c = conn->query(_manager->getns(), q, maxCount, 0, &fields);
assert( c.get() );
n = c->itcount();
}
conn.done();
return n;
}
void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ){
BSONObjBuilder bb( b.subobjStart( name ) );
bb.append( "min" , _min );
bb.append( "max" , _max );
bb.done();
}
bool Chunk::operator==( const Chunk& s ) const{
return
_manager->getShardKey().compare( _min , s._min ) == 0 &&
_manager->getShardKey().compare( _max , s._max ) == 0
;
}
void Chunk::serialize(BSONObjBuilder& to,ShardChunkVersion myLastMod){
to.append( "_id" , genID( _manager->getns() , _min ) );
if ( myLastMod.isSet() ){
to.appendTimestamp( "lastmod" , myLastMod );
}
else if ( _lastmod.isSet() ){
assert( _lastmod > 0 && _lastmod < 1000 );
to.appendTimestamp( "lastmod" , _lastmod );
}
else {
assert(0);
}
to << "ns" << _manager->getns();
to << "min" << _min;
to << "max" << _max;
to << "shard" << _shard.getName();
}
string Chunk::genID( const string& ns , const BSONObj& o ) {
StringBuilder buf( ns.size() + o.objsize() + 16 );
buf << ns << "-";
BSONObjIterator i(o);
while ( i.more() ){
BSONElement e = i.next();
buf << e.fieldName() << "_" << e.toString(false, true);
}
return buf.str();
}
void Chunk::unserialize(const BSONObj& from){
string ns = from.getStringField( "ns" );
_shard.reset( from.getStringField( "shard" ) );
_lastmod = from["lastmod"];
assert( _lastmod > 0 );
BSONElement e = from["minDotted"];
if (e.eoo()){
_min = from.getObjectField( "min" ).getOwned();
_max = from.getObjectField( "max" ).getOwned();
}
else { // TODO delete this case after giving people a chance to migrate
_min = e.embeddedObject().getOwned();
_max = from.getObjectField( "maxDotted" ).getOwned();
}
uassert( 10170 , "Chunk needs a ns" , ! ns.empty() );
uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() );
uassert( 10171 , "Chunk needs a server" , _shard.ok() );
uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() );
uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() );
}
string Chunk::modelServer() const {
// TODO: this could move around?
return configServer.modelServer();
}
ShardChunkVersion Chunk::getVersionOnConfigServer() const {
ScopedDbConnection conn( modelServer() );
BSONObj o = conn->findOne( ShardNS::chunk , BSON( "_id" << genID() ) );
conn.done();
return o["lastmod"];
}
void Chunk::_markModified(){
_modified = true;
}
string Chunk::toString() const {
stringstream ss;
ss << "ns:" << _manager->getns() << " at: " << _shard.toString() << " lastmod: " << _lastmod.toString() << " min: " << _min << " max: " << _max;
return ss.str();
}
ShardKeyPattern Chunk::skey() const{
return _manager->getShardKey();
}
// ------- ChunkManager --------
AtomicUInt ChunkManager::NextSequenceNumber = 1;
ChunkManager::ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern , bool unique ) :
_config( config ) , _ns( ns ) ,
_key( pattern ) , _unique( unique ) ,
_sequenceNumber( ++NextSequenceNumber ),
_lock("rw:ChunkManager"), _nsLock( ConnectionString( configServer.modelServer() , ConnectionString::SYNC ) , ns )
{
_reload_inlock();
if ( _chunkMap.empty() ){
ChunkPtr c( new Chunk(this, _key.globalMin(), _key.globalMax(), config->getPrimary()) );
c->_markModified();
_chunkMap[c->getMax()] = c;
_chunkRanges.reloadAll(_chunkMap);
_shards.insert(c->getShard());
save_inlock( true );
log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl;
}
}
ChunkManager::~ChunkManager(){
_chunkMap.clear();
_chunkRanges.clear();
_shards.clear();
}
void ChunkManager::_reload(){
rwlock lk( _lock , true );
_reload_inlock();
}
void ChunkManager::_reload_inlock(){
int tries = 3;
while (tries--){
_chunkMap.clear();
_chunkRanges.clear();
_shards.clear();
_load();
if (_isValid()){
_chunkRanges.reloadAll(_chunkMap);
return;
}
if (_chunkMap.size() < 10){
_printChunks();
}
sleepmillis(10 * (3-tries));
sleepsecs(10);
}
msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 tries. Giving up");
}
void ChunkManager::_load(){
static Chunk temp(0);
ScopedDbConnection conn( temp.modelServer() );
auto_ptr<DBClientCursor> cursor = conn->query(temp.getNS(), QUERY("ns" << _ns).sort("lastmod",1), 0, 0, 0, 0,
(DEBUG_BUILD ? 2 : 1000000)); // batch size. Try to induce potential race conditions in debug builds
assert( cursor.get() );
while ( cursor->more() ){
BSONObj d = cursor->next();
if ( d["isMaxMarker"].trueValue() ){
continue;
}
ChunkPtr c( new Chunk( this ) );
c->unserialize( d );
_chunkMap[c->getMax()] = c;
_shards.insert(c->getShard());
}
conn.done();
}
bool ChunkManager::_isValid() const {
#define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x << endl; return false; } } while(0)
if (_chunkMap.empty())
return true;
// Check endpoints
ENSURE(allOfType(MinKey, _chunkMap.begin()->second->getMin()));
ENSURE(allOfType(MaxKey, prior(_chunkMap.end())->second->getMax()));
// Make sure there are no gaps or overlaps
for (ChunkMap::const_iterator it=boost::next(_chunkMap.begin()), end=_chunkMap.end(); it != end; ++it){
ChunkMap::const_iterator last = prior(it);
if (!(it->second->getMin() == last->second->getMax())){
PRINT(it->second->toString());
PRINT(it->second->getMin());
PRINT(last->second->getMax());
}
ENSURE(it->second->getMin() == last->second->getMax());
}
return true;
#undef ENSURE
}
void ChunkManager::_printChunks() const {
for (ChunkMap::const_iterator it=_chunkMap.begin(), end=_chunkMap.end(); it != end; ++it) {
log() << *it->second << endl;
}
}
bool ChunkManager::hasShardKey( const BSONObj& obj ){
return _key.hasShardKey( obj );
}
ChunkPtr ChunkManager::findChunk( const BSONObj & obj , bool retry ){
BSONObj key = _key.extractKey(obj);
{
rwlock lk( _lock , false );
BSONObj foo;
ChunkPtr c;
{
ChunkMap::iterator it = _chunkMap.upper_bound(key);
if (it != _chunkMap.end()){
foo = it->first;
c = it->second;
}
}
if ( c ){
if ( c->contains( obj ) )
return c;
PRINT(foo);
PRINT(*c);
PRINT(key);
_reload_inlock();
massert(13141, "Chunk map pointed to incorrect chunk", false);
}
}
if ( retry ){
stringstream ss;
ss << "couldn't find a chunk aftry retry which should be impossible extracted: " << key;
throw UserException( 8070 , ss.str() );
}
log() << "ChunkManager: couldn't find chunk for: " << key << " going to retry" << endl;
_reload_inlock();
return findChunk( obj , true );
}
ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const {
rwlock lk( _lock , false );
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
ChunkPtr c = i->second;
if ( c->getShard() == shard )
return c;
}
return ChunkPtr();
}
void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ){
rwlock lk( _lock , false );
DEV PRINT(query);
//TODO look into FieldRangeSetOr
FieldRangeOrSet fros(_ns.c_str(), query, false);
uassert(13088, "no support for special queries yet", fros.getSpecial().empty());
do {
boost::scoped_ptr<FieldRangeSet> frs (fros.topFrs());
{
// special case if most-significant field isn't in query
FieldRange range = frs->range(_key.key().firstElement().fieldName());
if ( !range.nontrivial() ){
DEV PRINT(range.nontrivial());
getAllShards(shards);
return;
}
}
BoundList ranges = frs->indexBounds(_key.key(), 1);
for (BoundList::const_iterator it=ranges.begin(), end=ranges.end(); it != end; ++it){
BSONObj minObj = it->first.replaceFieldNames(_key.key());
BSONObj maxObj = it->second.replaceFieldNames(_key.key());
DEV PRINT(minObj);
DEV PRINT(maxObj);
ChunkRangeMap::const_iterator min, max;
min = _chunkRanges.upper_bound(minObj);
max = _chunkRanges.upper_bound(maxObj);
assert(min != _chunkRanges.ranges().end());
// make max non-inclusive like end iterators
if(max != _chunkRanges.ranges().end())
++max;
for (ChunkRangeMap::const_iterator it=min; it != max; ++it){
shards.insert(it->second->getShard());
}
// once we know we need to visit all shards no need to keep looping
//if (shards.size() == _shards.size())
//return;
}
if (fros.moreOrClauses())
fros.popOrClause();
} while (fros.moreOrClauses());
}
void ChunkManager::getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max){
uassert(13405, "min must have shard key", hasShardKey(min));
uassert(13406, "max must have shard key", hasShardKey(max));
ChunkRangeMap::const_iterator it = _chunkRanges.upper_bound(min);
ChunkRangeMap::const_iterator end = _chunkRanges.lower_bound(max);
for (; it!=end; ++ it){
shards.insert(it->second->getShard());
// once we know we need to visit all shards no need to keep looping
if (shards.size() == _shards.size())
break;
}
}
void ChunkManager::getAllShards( set<Shard>& all ){
rwlock lk( _lock , false );
all.insert(_shards.begin(), _shards.end());
}
void ChunkManager::ensureIndex_inlock(){
//TODO in parallel?
for ( set<Shard>::const_iterator i=_shards.begin(); i!=_shards.end(); ++i ){
ScopedDbConnection conn( i->getConnString() );
conn->ensureIndex( getns() , getShardKey().key() , _unique );
conn.done();
}
}
void ChunkManager::drop( ChunkManagerPtr me ){
rwlock lk( _lock , true );
configServer.logChange( "dropCollection.start" , _ns , BSONObj() );
dist_lock_try dlk( &_nsLock , "drop" );
uassert( 13331 , "collection's metadata is undergoing changes. Please try again." , dlk.got() );
uassert( 10174 , "config servers not all up" , configServer.allUp() );
set<Shard> seen;
log(1) << "ChunkManager::drop : " << _ns << endl;
// lock all shards so no one can do a split/migrate
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
ChunkPtr c = i->second;
seen.insert( c->getShard() );
}
log(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl;
// wipe my meta-data
_chunkMap.clear();
_chunkRanges.clear();
_shards.clear();
// delete data from mongod
for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ){
ScopedDbConnection conn( *i );
conn->dropCollection( _ns );
conn.done();
}
log(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl;
// clean up database meta-data
uassert( 10176 , "no sharding data?" , _config->removeSharding( _ns ) );
// remove chunk data
static Chunk temp(0);
ScopedDbConnection conn( temp.modelServer() );
conn->remove( temp.getNS() , BSON( "ns" << _ns ) );
conn.done();
log(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl;
for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ){
ScopedDbConnection conn( *i );
BSONObj res;
if ( ! setShardVersion( conn.conn() , _ns , 0 , true , res ) )
throw UserException( 8071 , (string)"OH KNOW, cleaning up after drop failed: " + res.toString() );
conn.done();
}
log(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl;
configServer.logChange( "dropCollection" , _ns , BSONObj() );
}
void ChunkManager::save( bool major ){
rwlock lk( _lock , true );
save_inlock( major );
}
void ChunkManager::save_inlock( bool major ){
ShardChunkVersion version = getVersion_inlock();
assert( version > 0 || _chunkMap.size() <= 1 );
ShardChunkVersion nextChunkVersion = version;
nextChunkVersion.inc( major );
vector<ChunkPtr> toFix;
vector<ShardChunkVersion> newVersions;
// Instead of upating the 'chunks' collection directly, we use the 'applyOps' command. It allows us
// (a) to serialize the changes to that collection and (b) to only actually perform the update if this
// ChunkManager has the proper ShardChunkVersion.
BSONObjBuilder cmdBuilder;
BSONArrayBuilder updates( cmdBuilder.subarrayStart( "applyOps" ) );
int numOps = 0;
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
ChunkPtr c = i->second;
if ( ! c->_modified )
continue;
numOps++;
_sequenceNumber = ++NextSequenceNumber;
ShardChunkVersion myVersion = nextChunkVersion;
nextChunkVersion.incMinor();
toFix.push_back( c );
newVersions.push_back( myVersion );
// build an update operation against the chunks collection of the config database with
// upsert true
BSONObjBuilder op;
op.append( "op" , "u" );
op.appendBool( "b" , true );
op.append( "ns" , ShardNS::chunk );
// add the modified (new) chunk infomation as the update object
BSONObjBuilder n( op.subobjStart( "o" ) );
c->serialize( n , myVersion ); // n will get full 'c' info plus version
n.done();
// add the chunk's _id as the query part of the update statement
BSONObjBuilder q( op.subobjStart( "o2" ) );
q.append( "_id" , c->genID() );
q.done();
updates.append( op.obj() );
}
if ( numOps == 0 )
return;
updates.done();
if ( version > 0 || _chunkMap.size() > 1 ){
BSONArrayBuilder temp( cmdBuilder.subarrayStart( "preCondition" ) );
BSONObjBuilder b;
b.append( "ns" , ShardNS::chunk );
b.append( "q" , BSON( "query" << BSON( "ns" << _ns ) << "orderby" << BSON( "lastmod" << -1 ) ) );
{
BSONObjBuilder bb( b.subobjStart( "res" ) );
bb.appendTimestamp( "lastmod" , version );
bb.done();
}
temp.append( b.obj() );
temp.done();
}
// TODO preCondition for initial chunk or starting collection
BSONObj cmd = cmdBuilder.obj();
log(7) << "ChunkManager::save update: " << cmd << endl;
ScopedDbConnection conn( Chunk(0).modelServer() );
BSONObj res;
bool ok = conn->runCommand( "config" , cmd , res );
conn.done();
if ( ! ok ){
stringstream ss;
ss << "saving chunks failed. cmd: " << cmd << " result: " << res;
log( LL_ERROR ) << ss.str() << endl;
msgasserted( 13327 , ss.str() );
}
for ( unsigned i=0; i<toFix.size(); i++ ){
toFix[i]->_lastmod = newVersions[i];
}
massert( 10417 , "how did version get smalled" , getVersion_inlock() >= version );
ensureIndex_inlock(); // TODO: this is too aggressive - but not really sooo bad
}
void ChunkManager::maybeChunkCollection() {
uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) );
ChunkPtr soleChunk = _chunkMap.begin()->second;
vector<BSONObj> splitPoints;
soleChunk->pickSplitVector( &splitPoints , Chunk::MaxChunkSize );
if ( splitPoints.empty() ){
log(1) << "not enough data to warrant chunking " << getns() << endl;
return;
}
soleChunk->multiSplit( splitPoints );
}
ShardChunkVersion ChunkManager::getVersionOnConfigServer() const {
static Chunk temp(0);
ScopedDbConnection conn( temp.modelServer() );
auto_ptr<DBClientCursor> cursor = conn->query(temp.getNS(), QUERY("ns" << _ns).sort("lastmod",1), 1 );
assert( cursor.get() );
BSONObj o;
if ( cursor->more() )
o = cursor->next();
conn.done();
return o["lastmod"];
}
ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const{
rwlock lk( _lock , false );
// TODO: cache or something?
ShardChunkVersion max = 0;
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
ChunkPtr c = i->second;
DEV assert( c );
if ( c->getShard() != shard )
continue;
if ( c->_lastmod > max )
max = c->_lastmod;
}
return max;
}
ShardChunkVersion ChunkManager::getVersion() const{
rwlock lk( _lock , false );
return getVersion_inlock();
}
ShardChunkVersion ChunkManager::getVersion_inlock() const{
ShardChunkVersion max = 0;
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
ChunkPtr c = i->second;
if ( c->_lastmod > max )
max = c->_lastmod;
}
return max;
}
string ChunkManager::toString() const {
rwlock lk( _lock , false );
stringstream ss;
ss << "ChunkManager: " << _ns << " key:" << _key.toString() << '\n';
for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
const ChunkPtr c = i->second;
ss << "\t" << c->toString() << '\n';
}
return ss.str();
}
void ChunkManager::_migrationNotification(Chunk* c){
_chunkRanges.reloadRange(_chunkMap, c->getMin(), c->getMax());
_shards.insert(c->getShard());
}
void ChunkRangeManager::assertValid() const{
if (_ranges.empty())
return;
try {
// No Nulls
for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it){
assert(it->second);
}
// Check endpoints
assert(allOfType(MinKey, _ranges.begin()->second->getMin()));
assert(allOfType(MaxKey, prior(_ranges.end())->second->getMax()));
// Make sure there are no gaps or overlaps
for (ChunkRangeMap::const_iterator it=boost::next(_ranges.begin()), end=_ranges.end(); it != end; ++it){
ChunkRangeMap::const_iterator last = prior(it);
assert(it->second->getMin() == last->second->getMax());
}
// Check Map keys
for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it){
assert(it->first == it->second->getMax());
}
// Make sure we match the original chunks
const ChunkMap chunks = _ranges.begin()->second->getManager()->_chunkMap;
for ( ChunkMap::const_iterator i=chunks.begin(); i!=chunks.end(); ++i ){
const ChunkPtr chunk = i->second;
ChunkRangeMap::const_iterator min = _ranges.upper_bound(chunk->getMin());
ChunkRangeMap::const_iterator max = _ranges.lower_bound(chunk->getMax());
assert(min != _ranges.end());
assert(max != _ranges.end());
assert(min == max);
assert(min->second->getShard() == chunk->getShard());
assert(min->second->contains( chunk->getMin() ));
assert(min->second->contains( chunk->getMax() ) || (min->second->getMax() == chunk->getMax()));
}
} catch (...) {
log( LL_ERROR ) << "\t invalid ChunkRangeMap! printing ranges:" << endl;
for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it)
cout << it->first << ": " << *it->second << endl;
throw;
}
}
void ChunkRangeManager::reloadRange(const ChunkMap& chunks, const BSONObj& min, const BSONObj& max){
if (_ranges.empty()){
reloadAll(chunks);
return;
}
ChunkRangeMap::iterator low = _ranges.upper_bound(min);
ChunkRangeMap::iterator high = _ranges.lower_bound(max);
assert(low != _ranges.end());
assert(high != _ranges.end());
assert(low->second);
assert(high->second);
ChunkMap::const_iterator begin = chunks.upper_bound(low->second->getMin());
ChunkMap::const_iterator end = chunks.lower_bound(high->second->getMax());
assert(begin != chunks.end());
assert(end != chunks.end());
// C++ end iterators are one-past-last
++high;
++end;
// update ranges
_ranges.erase(low, high); // invalidates low
_insertRange(begin, end);
assert(!_ranges.empty());
DEV assertValid();
// merge low-end if possible
low = _ranges.upper_bound(min);
assert(low != _ranges.end());
if (low != _ranges.begin()){
shared_ptr<ChunkRange> a = prior(low)->second;
shared_ptr<ChunkRange> b = low->second;
if (a->getShard() == b->getShard()){
shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b));
_ranges.erase(prior(low));
_ranges.erase(low); // invalidates low
_ranges[cr->getMax()] = cr;
}
}
DEV assertValid();
// merge high-end if possible
high = _ranges.lower_bound(max);
if (high != prior(_ranges.end())){
shared_ptr<ChunkRange> a = high->second;
shared_ptr<ChunkRange> b = boost::next(high)->second;
if (a->getShard() == b->getShard()){
shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b));
_ranges.erase(boost::next(high));
_ranges.erase(high); //invalidates high
_ranges[cr->getMax()] = cr;
}
}
DEV assertValid();
}
void ChunkRangeManager::reloadAll(const ChunkMap& chunks){
_ranges.clear();
_insertRange(chunks.begin(), chunks.end());
DEV assertValid();
}
void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end){
while (begin != end){
ChunkMap::const_iterator first = begin;
Shard shard = first->second->getShard();
while (begin != end && (begin->second->getShard() == shard))
++begin;
shared_ptr<ChunkRange> cr (new ChunkRange(first, begin));
_ranges[cr->getMax()] = cr;
}
}
class ChunkObjUnitTest : public UnitTest {
public:
void runShard(){
ChunkPtr c;
assert( ! c );
c.reset( new Chunk( 0 ) );
assert( c );
}
void runShardChunkVersion(){
vector<ShardChunkVersion> all;
all.push_back( ShardChunkVersion(1,1) );
all.push_back( ShardChunkVersion(1,2) );
all.push_back( ShardChunkVersion(2,1) );
all.push_back( ShardChunkVersion(2,2) );
for ( unsigned i=0; i<all.size(); i++ ){
for ( unsigned j=i+1; j<all.size(); j++ ){
assert( all[i] < all[j] );
}
}
}
void run(){
runShard();
runShardChunkVersion();
log(1) << "shardObjTest passed" << endl;
}
} shardObjTest;
// ----- to be removed ---
extern OID serverID;
bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
BSONObjBuilder cmdBuilder;
cmdBuilder.append( "setShardVersion" , ns.c_str() );
cmdBuilder.append( "configdb" , configServer.modelServer() );
cmdBuilder.appendTimestamp( "version" , version.toLong() );
cmdBuilder.appendOID( "serverID" , &serverID );
if ( authoritative )
cmdBuilder.appendBool( "authoritative" , 1 );
Shard s = Shard::make( conn.getServerAddress() );
cmdBuilder.append( "shard" , s.getName() );
cmdBuilder.append( "shardHost" , s.getConnString() );
BSONObj cmd = cmdBuilder.obj();
log(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl;
return conn.runCommand( "admin" , cmd , result );
}
} // namespace mongo