mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
starting sharding name cleanup SHARDING-25
This commit is contained in:
parent
72d68a4ffd
commit
75e16447a4
@ -290,7 +290,7 @@ else:
|
||||
nojni = True
|
||||
|
||||
coreShardFiles = []
|
||||
shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/cursors.cpp" , "s/server.cpp" ] + [ "s/shard.cpp" , "s/shardkey.cpp" , "s/config.cpp" ]
|
||||
shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/chunk.cpp" , "s/shardkey.cpp" , "s/config.cpp" ]
|
||||
serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" ]
|
||||
|
||||
allClientFiles = commonFiles + coreDbFiles + [ "client/clientOnly.cpp" , "client/gridfs.cpp" ];
|
||||
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
|
||||
#include "stdafx.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
#include "config.h"
|
||||
#include "../util/unittest.h"
|
||||
#include "../client/connpool.h"
|
||||
@ -27,26 +27,26 @@ namespace mongo {
|
||||
|
||||
// ------- Shard --------
|
||||
|
||||
long Shard::MaxShardSize = 1024 * 1204 * 50;
|
||||
long Chunk::MaxChunkSize = 1024 * 1204 * 50;
|
||||
|
||||
Shard::Shard( ShardManager * manager ) : _manager( manager ){
|
||||
Chunk::Chunk( ChunkManager * manager ) : _manager( manager ){
|
||||
_modified = false;
|
||||
_lastmod = 0;
|
||||
_dataWritten = 0;
|
||||
}
|
||||
|
||||
void Shard::setServer( string s ){
|
||||
void Chunk::setServer( string s ){
|
||||
_server = s;
|
||||
_markModified();
|
||||
}
|
||||
|
||||
bool Shard::contains( const BSONObj& obj ){
|
||||
bool Chunk::contains( const BSONObj& obj ){
|
||||
return
|
||||
_manager->getShardKey().compare( getMin() , obj ) <= 0 &&
|
||||
_manager->getShardKey().compare( obj , getMax() ) < 0;
|
||||
}
|
||||
|
||||
BSONObj Shard::pickSplitPoint(){
|
||||
BSONObj Chunk::pickSplitPoint(){
|
||||
int sort = 0;
|
||||
|
||||
if ( _manager->getShardKey().globalMin().woCompare( getMin() ) == 0 ){
|
||||
@ -93,11 +93,11 @@ namespace mongo {
|
||||
return result.getObjectField( "median" ).getOwned();
|
||||
}
|
||||
|
||||
Shard * Shard::split(){
|
||||
Chunk * Chunk::split(){
|
||||
return split( pickSplitPoint() );
|
||||
}
|
||||
|
||||
Shard * Shard::split( const BSONObj& m ){
|
||||
Chunk * Chunk::split( const BSONObj& m ){
|
||||
uassert( "can't split as shard that doesn't have a manager" , _manager );
|
||||
|
||||
log(1) << " before split on: " << m << "\n"
|
||||
@ -105,7 +105,7 @@ namespace mongo {
|
||||
|
||||
uassert( "locking namespace on server failed" , lockNamespaceOnServer( getServer() , _ns ) );
|
||||
|
||||
Shard * s = new Shard( _manager );
|
||||
Chunk * s = new Chunk( _manager );
|
||||
s->_ns = _ns;
|
||||
s->_server = _server;
|
||||
s->_min = m.getOwned();
|
||||
@ -114,7 +114,7 @@ namespace mongo {
|
||||
s->_markModified();
|
||||
_markModified();
|
||||
|
||||
_manager->_shards.push_back( s );
|
||||
_manager->_chunks.push_back( s );
|
||||
|
||||
_max = m.getOwned();
|
||||
|
||||
@ -128,13 +128,13 @@ namespace mongo {
|
||||
return s;
|
||||
}
|
||||
|
||||
bool Shard::moveAndCommit( const string& to , string& errmsg ){
|
||||
bool Chunk::moveAndCommit( const string& to , string& errmsg ){
|
||||
uassert( "can't move shard to its current location!" , to != getServer() );
|
||||
|
||||
log() << "moving shard ns: " << _ns << " moving shard: " << toString() << " " << _server << " -> " << to << endl;
|
||||
|
||||
string from = _server;
|
||||
ServerShardVersion oldVersion = _manager->getVersion( from );
|
||||
ShardChunkVersion oldVersion = _manager->getVersion( from );
|
||||
|
||||
BSONObj filter;
|
||||
{
|
||||
@ -164,16 +164,16 @@ namespace mongo {
|
||||
setServer( to );
|
||||
|
||||
// need to increment version # for old server
|
||||
Shard * randomShardOnOldServer = _manager->findShardOnServer( from );
|
||||
if ( randomShardOnOldServer )
|
||||
randomShardOnOldServer->_markModified();
|
||||
Chunk * randomChunkOnOldServer = _manager->findChunkOnServer( from );
|
||||
if ( randomChunkOnOldServer )
|
||||
randomChunkOnOldServer->_markModified();
|
||||
|
||||
_manager->save();
|
||||
|
||||
BSONObj finishRes;
|
||||
{
|
||||
|
||||
ServerShardVersion newVersion = _manager->getVersion( from );
|
||||
ShardChunkVersion newVersion = _manager->getVersion( from );
|
||||
uassert( "version has to be higher" , newVersion > oldVersion );
|
||||
|
||||
BSONObjBuilder b;
|
||||
@ -196,10 +196,10 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Shard::splitIfShould( long dataWritten ){
|
||||
bool Chunk::splitIfShould( long dataWritten ){
|
||||
_dataWritten += dataWritten;
|
||||
|
||||
if ( _dataWritten < MaxShardSize / 5 )
|
||||
if ( _dataWritten < MaxChunkSize / 5 )
|
||||
return false;
|
||||
|
||||
_dataWritten = 0;
|
||||
@ -210,22 +210,22 @@ namespace mongo {
|
||||
}
|
||||
|
||||
long size = getPhysicalSize();
|
||||
if ( size < MaxShardSize )
|
||||
if ( size < MaxChunkSize )
|
||||
return false;
|
||||
|
||||
log() << "autosplitting " << _ns << " size: " << size << " shard: " << toString() << endl;
|
||||
Shard * newShard = split();
|
||||
Chunk * newShard = split();
|
||||
|
||||
moveIfShould( newShard );
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Shard::moveIfShould( Shard * newShard ){
|
||||
Shard * toMove = 0;
|
||||
bool Chunk::moveIfShould( Chunk * newChunk ){
|
||||
Chunk * toMove = 0;
|
||||
|
||||
if ( newShard->countObjects() <= 1 ){
|
||||
toMove = newShard;
|
||||
if ( newChunk->countObjects() <= 1 ){
|
||||
toMove = newChunk;
|
||||
}
|
||||
else if ( this->countObjects() <= 1 ){
|
||||
toMove = this;
|
||||
@ -253,7 +253,7 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
long Shard::getPhysicalSize(){
|
||||
long Chunk::getPhysicalSize(){
|
||||
ScopedDbConnection conn( getServer() );
|
||||
|
||||
BSONObj result;
|
||||
@ -268,7 +268,7 @@ namespace mongo {
|
||||
}
|
||||
|
||||
|
||||
long Shard::countObjects(){
|
||||
long Chunk::countObjects(){
|
||||
ScopedDbConnection conn( getServer() );
|
||||
|
||||
|
||||
@ -279,18 +279,18 @@ namespace mongo {
|
||||
return (long)n;
|
||||
}
|
||||
|
||||
bool Shard::operator==( const Shard& s ){
|
||||
bool Chunk::operator==( const Chunk& s ){
|
||||
return
|
||||
_manager->getShardKey().compare( _min , s._min ) == 0 &&
|
||||
_manager->getShardKey().compare( _max , s._max ) == 0
|
||||
;
|
||||
}
|
||||
|
||||
void Shard::getFilter( BSONObjBuilder& b ){
|
||||
void Chunk::getFilter( BSONObjBuilder& b ){
|
||||
_manager->_key.getFilter( b , _min , _max );
|
||||
}
|
||||
|
||||
void Shard::serialize(BSONObjBuilder& to){
|
||||
void Chunk::serialize(BSONObjBuilder& to){
|
||||
if ( _lastmod )
|
||||
to.appendDate( "lastmod" , _lastmod );
|
||||
else
|
||||
@ -302,26 +302,26 @@ namespace mongo {
|
||||
to << "server" << _server;
|
||||
}
|
||||
|
||||
void Shard::unserialize(const BSONObj& from){
|
||||
void Chunk::unserialize(const BSONObj& from){
|
||||
_ns = from.getStringField( "ns" );
|
||||
_min = from.getObjectField( "min" ).getOwned();
|
||||
_max = from.getObjectField( "max" ).getOwned();
|
||||
_server = from.getStringField( "server" );
|
||||
_lastmod = from.hasField( "lastmod" ) ? from["lastmod"].date() : 0;
|
||||
|
||||
uassert( "Shard needs a ns" , ! _ns.empty() );
|
||||
uassert( "Shard needs a server" , ! _ns.empty() );
|
||||
uassert( "Chunk needs a ns" , ! _ns.empty() );
|
||||
uassert( "Chunk needs a server" , ! _ns.empty() );
|
||||
|
||||
uassert( "Shard needs a min" , ! _min.isEmpty() );
|
||||
uassert( "Shard needs a max" , ! _max.isEmpty() );
|
||||
uassert( "Chunk needs a min" , ! _min.isEmpty() );
|
||||
uassert( "Chunk needs a max" , ! _max.isEmpty() );
|
||||
}
|
||||
|
||||
string Shard::modelServer() {
|
||||
string Chunk::modelServer() {
|
||||
// TODO: this could move around?
|
||||
return configServer.modelServer();
|
||||
}
|
||||
|
||||
void Shard::_markModified(){
|
||||
void Chunk::_markModified(){
|
||||
_modified = true;
|
||||
|
||||
unsigned long long t = time(0);
|
||||
@ -329,14 +329,11 @@ namespace mongo {
|
||||
_lastmod = 0;
|
||||
}
|
||||
|
||||
void Shard::save( bool check ){
|
||||
cout << "HERE: " << _id << endl;
|
||||
void Chunk::save( bool check ){
|
||||
bool reload = ! _lastmod;
|
||||
Model::save( check );
|
||||
cout << "\t" << _id << endl;
|
||||
if ( reload ){
|
||||
// need to do this so that we get the new _lastMod and therefore version number
|
||||
|
||||
massert( "_id has to be filled in already" , ! _id.isEmpty() );
|
||||
|
||||
string b = toString();
|
||||
@ -344,129 +341,129 @@ namespace mongo {
|
||||
massert( "how could load fail?" , load( q ) );
|
||||
cout << "before: " << q << "\t" << b << endl;
|
||||
cout << "after : " << _id << "\t" << toString() << endl;
|
||||
massert( "shard reload changed content!" , b == toString() );
|
||||
massert( "chunk reload changed content!" , b == toString() );
|
||||
massert( "id changed!" , q["_id"] == _id["_id"] );
|
||||
}
|
||||
}
|
||||
|
||||
void Shard::ensureIndex(){
|
||||
void Chunk::ensureIndex(){
|
||||
ScopedDbConnection conn( getServer() );
|
||||
conn->ensureIndex( _ns , _manager->getShardKey().key() );
|
||||
conn.done();
|
||||
}
|
||||
|
||||
string Shard::toString() const {
|
||||
string Chunk::toString() const {
|
||||
stringstream ss;
|
||||
ss << "shard ns:" << _ns << " server: " << _server << " min: " << _min << " max: " << _max;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
|
||||
ShardKeyPattern Shard::skey(){
|
||||
ShardKeyPattern Chunk::skey(){
|
||||
return _manager->getShardKey();
|
||||
}
|
||||
|
||||
// ------- ShardManager --------
|
||||
// ------- ChunkManager --------
|
||||
|
||||
unsigned long long ShardManager::NextSequenceNumber = 1;
|
||||
unsigned long long ChunkManager::NextSequenceNumber = 1;
|
||||
|
||||
ShardManager::ShardManager( DBConfig * config , string ns , ShardKeyPattern pattern ) : _config( config ) , _ns( ns ) , _key( pattern ){
|
||||
Shard temp(0);
|
||||
ChunkManager::ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern ) : _config( config ) , _ns( ns ) , _key( pattern ){
|
||||
Chunk temp(0);
|
||||
|
||||
ScopedDbConnection conn( temp.modelServer() );
|
||||
auto_ptr<DBClientCursor> cursor = conn->query( temp.getNS() , BSON( "ns" << ns ) );
|
||||
while ( cursor->more() ){
|
||||
Shard * s = new Shard( this );
|
||||
Chunk * c = new Chunk( this );
|
||||
BSONObj d = cursor->next();
|
||||
s->unserialize( d );
|
||||
_shards.push_back( s );
|
||||
s->_id = d["_id"].wrap().getOwned();
|
||||
c->unserialize( d );
|
||||
_chunks.push_back( c );
|
||||
c->_id = d["_id"].wrap().getOwned();
|
||||
}
|
||||
conn.done();
|
||||
|
||||
if ( _shards.size() == 0 ){
|
||||
Shard * s = new Shard( this );
|
||||
s->_ns = ns;
|
||||
s->_min = _key.globalMin();
|
||||
s->_max = _key.globalMax();
|
||||
s->_server = config->getPrimary();
|
||||
s->_markModified();
|
||||
if ( _chunks.size() == 0 ){
|
||||
Chunk * c = new Chunk( this );
|
||||
c->_ns = ns;
|
||||
c->_min = _key.globalMin();
|
||||
c->_max = _key.globalMax();
|
||||
c->_server = config->getPrimary();
|
||||
c->_markModified();
|
||||
|
||||
_shards.push_back( s );
|
||||
_chunks.push_back( c );
|
||||
|
||||
log() << "no shards for:" << ns << " so creating first: " << s->toString() << endl;
|
||||
log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl;
|
||||
}
|
||||
|
||||
_sequenceNumber = ++NextSequenceNumber;
|
||||
}
|
||||
|
||||
ShardManager::~ShardManager(){
|
||||
for ( vector<Shard*>::iterator i=_shards.begin(); i != _shards.end(); i++ ){
|
||||
ChunkManager::~ChunkManager(){
|
||||
for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
|
||||
delete( *i );
|
||||
}
|
||||
_shards.clear();
|
||||
_chunks.clear();
|
||||
}
|
||||
|
||||
bool ShardManager::hasShardKey( const BSONObj& obj ){
|
||||
bool ChunkManager::hasShardKey( const BSONObj& obj ){
|
||||
return _key.hasShardKey( obj );
|
||||
}
|
||||
|
||||
Shard& ShardManager::findShard( const BSONObj & obj ){
|
||||
Chunk& ChunkManager::findChunk( const BSONObj & obj ){
|
||||
|
||||
for ( vector<Shard*>::iterator i=_shards.begin(); i != _shards.end(); i++ ){
|
||||
Shard * s = *i;
|
||||
if ( s->contains( obj ) )
|
||||
return *s;
|
||||
for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
|
||||
Chunk * c = *i;
|
||||
if ( c->contains( obj ) )
|
||||
return *c;
|
||||
}
|
||||
stringstream ss;
|
||||
ss << "couldn't find a shard which should be impossible extracted: " << _key.extractKey( obj );
|
||||
ss << "couldn't find a chunk which should be impossible extracted: " << _key.extractKey( obj );
|
||||
throw UserException( ss.str() );
|
||||
}
|
||||
|
||||
Shard* ShardManager::findShardOnServer( const string& server ) const {
|
||||
Chunk* ChunkManager::findChunkOnServer( const string& server ) const {
|
||||
|
||||
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
|
||||
Shard* s = *i;
|
||||
if ( s->getServer() == server )
|
||||
return s;
|
||||
for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
|
||||
Chunk * c = *i;
|
||||
if ( c->getServer() == server )
|
||||
return c;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ShardManager::getShardsForQuery( vector<Shard*>& shards , const BSONObj& query ){
|
||||
int ChunkManager::getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query ){
|
||||
int added = 0;
|
||||
|
||||
for ( vector<Shard*>::iterator i=_shards.begin(); i != _shards.end(); i++ ){
|
||||
Shard* s = *i;
|
||||
if ( _key.relevantForQuery( query , s ) ){
|
||||
shards.push_back( s );
|
||||
for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
|
||||
Chunk * c = *i;
|
||||
if ( _key.relevantForQuery( query , c ) ){
|
||||
chunks.push_back( c );
|
||||
added++;
|
||||
}
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
||||
void ShardManager::ensureIndex(){
|
||||
void ChunkManager::ensureIndex(){
|
||||
set<string> seen;
|
||||
|
||||
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
|
||||
Shard* s = *i;
|
||||
if ( seen.count( s->getServer() ) )
|
||||
for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
|
||||
Chunk * c = *i;
|
||||
if ( seen.count( c->getServer() ) )
|
||||
continue;
|
||||
seen.insert( s->getServer() );
|
||||
s->ensureIndex();
|
||||
seen.insert( c->getServer() );
|
||||
c->ensureIndex();
|
||||
}
|
||||
}
|
||||
|
||||
void ShardManager::save(){
|
||||
ServerShardVersion a = getVersion();
|
||||
void ChunkManager::save(){
|
||||
ShardChunkVersion a = getVersion();
|
||||
|
||||
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
|
||||
Shard* s = *i;
|
||||
if ( ! s->_modified )
|
||||
for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
|
||||
Chunk* c = *i;
|
||||
if ( ! c->_modified )
|
||||
continue;
|
||||
s->save( true );
|
||||
c->save( true );
|
||||
_sequenceNumber = ++NextSequenceNumber;
|
||||
}
|
||||
|
||||
@ -475,47 +472,47 @@ namespace mongo {
|
||||
ensureIndex(); // TODO: this is too aggressive - but not really sooo bad
|
||||
}
|
||||
|
||||
ServerShardVersion ShardManager::getVersion( const string& server ) const{
|
||||
ShardChunkVersion ChunkManager::getVersion( const string& server ) const{
|
||||
// TODO: cache or something?
|
||||
|
||||
ServerShardVersion max = 0;
|
||||
ShardChunkVersion max = 0;
|
||||
|
||||
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
|
||||
Shard* s = *i;
|
||||
if ( s->getServer() != server )
|
||||
for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
|
||||
Chunk* c = *i;
|
||||
if ( c->getServer() != server )
|
||||
continue;
|
||||
|
||||
if ( s->_lastmod > max )
|
||||
max = s->_lastmod;
|
||||
if ( c->_lastmod > max )
|
||||
max = c->_lastmod;
|
||||
}
|
||||
|
||||
return max;
|
||||
}
|
||||
|
||||
ServerShardVersion ShardManager::getVersion() const{
|
||||
ServerShardVersion max = 0;
|
||||
ShardChunkVersion ChunkManager::getVersion() const{
|
||||
ShardChunkVersion max = 0;
|
||||
|
||||
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
|
||||
Shard* s = *i;
|
||||
if ( s->_lastmod > max )
|
||||
max = s->_lastmod;
|
||||
for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
|
||||
Chunk* c = *i;
|
||||
if ( c->_lastmod > max )
|
||||
max = c->_lastmod;
|
||||
}
|
||||
|
||||
return max;
|
||||
}
|
||||
|
||||
string ShardManager::toString() const {
|
||||
string ChunkManager::toString() const {
|
||||
stringstream ss;
|
||||
ss << "ShardManager: " << _ns << " key:" << _key.toString() << "\n";
|
||||
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
|
||||
const Shard* s = *i;
|
||||
ss << "\t" << s->toString() << "\n";
|
||||
ss << "ChunkManager: " << _ns << " key:" << _key.toString() << "\n";
|
||||
for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
|
||||
const Chunk* c = *i;
|
||||
ss << "\t" << c->toString() << "\n";
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
|
||||
class ShardObjUnitTest : public UnitTest {
|
||||
class ChunkObjUnitTest : public UnitTest {
|
||||
public:
|
||||
void runShard(){
|
||||
|
@ -34,22 +34,22 @@
|
||||
namespace mongo {
|
||||
|
||||
class DBConfig;
|
||||
class ShardManager;
|
||||
class ShardObjUnitTest;
|
||||
class ChunkManager;
|
||||
class ChunkObjUnitTest;
|
||||
|
||||
typedef unsigned long long ServerShardVersion;
|
||||
typedef unsigned long long ShardChunkVersion;
|
||||
|
||||
/**
|
||||
config.shard
|
||||
config.chunks
|
||||
{ ns : "alleyinsider.fs.chunks" , min : {} , max : {} , server : "localhost:30001" }
|
||||
|
||||
x is in a shard iff
|
||||
min <= x < max
|
||||
*/
|
||||
class Shard : public Model , boost::noncopyable {
|
||||
class Chunk : public Model , boost::noncopyable {
|
||||
public:
|
||||
|
||||
Shard( ShardManager * info );
|
||||
Chunk( ChunkManager * info );
|
||||
|
||||
BSONObj& getMin(){
|
||||
return _min;
|
||||
@ -68,9 +68,9 @@ namespace mongo {
|
||||
string toString() const;
|
||||
operator string() const { return toString(); }
|
||||
|
||||
bool operator==(const Shard& s);
|
||||
bool operator==(const Chunk& s);
|
||||
|
||||
bool operator!=(const Shard& s){
|
||||
bool operator!=(const Chunk& s){
|
||||
return ! ( *this == s );
|
||||
}
|
||||
|
||||
@ -79,8 +79,8 @@ namespace mongo {
|
||||
|
||||
|
||||
BSONObj pickSplitPoint();
|
||||
Shard * split();
|
||||
Shard * split( const BSONObj& middle );
|
||||
Chunk * split();
|
||||
Chunk * split( const BSONObj& middle );
|
||||
|
||||
/**
|
||||
* @return size of shard in bytes
|
||||
@ -101,11 +101,11 @@ namespace mongo {
|
||||
* moves either this shard or newShard if it makes sense too
|
||||
* @return whether or not a shard was moved
|
||||
*/
|
||||
bool moveIfShould( Shard * newShard = 0 );
|
||||
bool moveIfShould( Chunk * newShard = 0 );
|
||||
|
||||
bool moveAndCommit( const string& to , string& errmsg );
|
||||
|
||||
virtual const char * getNS(){ return "config.shard"; }
|
||||
virtual const char * getNS(){ return "config.shard"; } // XXX
|
||||
virtual void serialize(BSONObjBuilder& to);
|
||||
virtual void unserialize(const BSONObj& from);
|
||||
virtual string modelServer();
|
||||
@ -116,20 +116,20 @@ namespace mongo {
|
||||
|
||||
void _markModified();
|
||||
|
||||
static long MaxShardSize;
|
||||
static long MaxChunkSize;
|
||||
|
||||
private:
|
||||
|
||||
// main shard info
|
||||
|
||||
ShardManager * _manager;
|
||||
ChunkManager * _manager;
|
||||
ShardKeyPattern skey();
|
||||
|
||||
string _ns;
|
||||
BSONObj _min;
|
||||
BSONObj _max;
|
||||
string _server;
|
||||
ServerShardVersion _lastmod;
|
||||
ShardChunkVersion _lastmod;
|
||||
|
||||
bool _modified;
|
||||
|
||||
@ -141,7 +141,7 @@ namespace mongo {
|
||||
|
||||
void _split( BSONObj& middle );
|
||||
|
||||
friend class ShardManager;
|
||||
friend class ChunkManager;
|
||||
friend class ShardObjUnitTest;
|
||||
};
|
||||
|
||||
@ -151,21 +151,21 @@ namespace mongo {
|
||||
shards: [ { min: 1, max: 100, server: a } , { min: 101, max: 200 , server : b } ]
|
||||
}
|
||||
*/
|
||||
class ShardManager {
|
||||
class ChunkManager {
|
||||
public:
|
||||
|
||||
ShardManager( DBConfig * config , string ns ,ShardKeyPattern pattern );
|
||||
virtual ~ShardManager();
|
||||
ChunkManager( DBConfig * config , string ns ,ShardKeyPattern pattern );
|
||||
virtual ~ChunkManager();
|
||||
|
||||
string getns(){
|
||||
return _ns;
|
||||
}
|
||||
|
||||
int numShards(){ return _shards.size(); }
|
||||
int numChunks(){ return _chunks.size(); }
|
||||
bool hasShardKey( const BSONObj& obj );
|
||||
|
||||
Shard& findShard( const BSONObj& obj );
|
||||
Shard* findShardOnServer( const string& server ) const;
|
||||
Chunk& findChunk( const BSONObj& obj );
|
||||
Chunk* findChunkOnServer( const string& server ) const;
|
||||
|
||||
ShardKeyPattern& getShardKey(){ return _key; }
|
||||
|
||||
@ -175,20 +175,20 @@ namespace mongo {
|
||||
void ensureIndex();
|
||||
|
||||
/**
|
||||
* @return number of shards added to the vector
|
||||
* @return number of Chunk added to the vector
|
||||
*/
|
||||
int getShardsForQuery( vector<Shard*>& shards , const BSONObj& query );
|
||||
int getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query );
|
||||
|
||||
void save();
|
||||
|
||||
string toString() const;
|
||||
operator string() const { return toString(); }
|
||||
|
||||
ServerShardVersion getVersion( const string& server ) const;
|
||||
ServerShardVersion getVersion() const;
|
||||
ShardChunkVersion getVersion( const string& server ) const;
|
||||
ShardChunkVersion getVersion() const;
|
||||
|
||||
/**
|
||||
* this is just an increasing number of how many shard managers we have so we know if something has been updated
|
||||
* this is just an increasing number of how many ChunkManagers we have so we know if something has been updated
|
||||
*/
|
||||
unsigned long long getSequenceNumber(){
|
||||
return _sequenceNumber;
|
||||
@ -199,11 +199,11 @@ namespace mongo {
|
||||
string _ns;
|
||||
ShardKeyPattern _key;
|
||||
|
||||
vector<Shard*> _shards;
|
||||
vector<Chunk*> _chunks;
|
||||
|
||||
unsigned long long _sequenceNumber;
|
||||
|
||||
friend class Shard;
|
||||
friend class Chunk;
|
||||
static unsigned long long NextSequenceNumber;
|
||||
};
|
||||
|
@ -32,7 +32,7 @@
|
||||
#include "../db/commands.h"
|
||||
|
||||
#include "config.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
#include "strategy.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -277,7 +277,7 @@ namespace mongo {
|
||||
;
|
||||
}
|
||||
|
||||
virtual bool _split( BSONObjBuilder& result , string&errmsg , const string& ns , ShardManager * manager , Shard& old , BSONObj middle ) = 0;
|
||||
virtual bool _split( BSONObjBuilder& result , string&errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ) = 0;
|
||||
|
||||
bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
|
||||
string ns = cmdObj[_name.c_str()].valuestrsafe();
|
||||
@ -304,8 +304,8 @@ namespace mongo {
|
||||
return false;
|
||||
}
|
||||
|
||||
ShardManager * info = config->getShardManager( ns );
|
||||
Shard& old = info->findShard( find );
|
||||
ChunkManager * info = config->getChunkManager( ns );
|
||||
Chunk& old = info->findChunk( find );
|
||||
|
||||
|
||||
return _split( result , errmsg , ns , info , old , cmdObj.getObjectField( "middle" ) );
|
||||
@ -318,7 +318,7 @@ namespace mongo {
|
||||
class SplitValueCommand : public SplitCollectionHelper {
|
||||
public:
|
||||
SplitValueCommand() : SplitCollectionHelper( "splitvalue" ){}
|
||||
virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ShardManager * manager , Shard& old , BSONObj middle ){
|
||||
virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ){
|
||||
|
||||
result << "shardinfo" << old.toString();
|
||||
|
||||
@ -340,7 +340,7 @@ namespace mongo {
|
||||
class SplitCollection : public SplitCollectionHelper {
|
||||
public:
|
||||
SplitCollection() : SplitCollectionHelper( "split" ){}
|
||||
virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ShardManager * manager , Shard& old , BSONObj middle ){
|
||||
virtual bool _split( BSONObjBuilder& result , string& errmsg , const string& ns , ChunkManager * manager , Chunk& old , BSONObj middle ){
|
||||
|
||||
log() << "splitting: " << ns << " shard: " << old << endl;
|
||||
|
||||
@ -387,8 +387,8 @@ namespace mongo {
|
||||
return false;
|
||||
}
|
||||
|
||||
ShardManager * info = config->getShardManager( ns );
|
||||
Shard& s = info->findShard( find );
|
||||
ChunkManager * info = config->getChunkManager( ns );
|
||||
Chunk& s = info->findChunk( find );
|
||||
string from = s.getServer();
|
||||
|
||||
if ( s.getServer() == to ){
|
||||
|
@ -24,7 +24,7 @@
|
||||
#include "../db/commands.h"
|
||||
|
||||
#include "config.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
#include "strategy.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -66,16 +66,16 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
ShardManager * sm = conf->getShardManager( fullns );
|
||||
massert( "how could shard manager be null!" , sm );
|
||||
ChunkManager * cm = conf->getChunkManager( fullns );
|
||||
massert( "how could chunk manager be null!" , cm );
|
||||
|
||||
vector<Shard*> shards;
|
||||
sm->getShardsForQuery( shards , filter );
|
||||
vector<Chunk*> chunks;
|
||||
cm->getChunksForQuery( chunks , filter );
|
||||
|
||||
unsigned long long total = 0;
|
||||
for ( vector<Shard*>::iterator i = shards.begin() ; i != shards.end() ; i++ ){
|
||||
Shard * s = *i;
|
||||
total += s->countObjects();
|
||||
for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){
|
||||
Chunk * c = *i;
|
||||
total += c->countObjects();
|
||||
}
|
||||
|
||||
result.append( "n" , (double)total );
|
||||
|
16
s/config.cpp
16
s/config.cpp
@ -25,7 +25,7 @@
|
||||
|
||||
#include "server.h"
|
||||
#include "config.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
@ -53,11 +53,11 @@ namespace mongo {
|
||||
_partitioned = true;
|
||||
}
|
||||
|
||||
ShardManager* DBConfig::turnOnSharding( const string& ns , ShardKeyPattern fieldsAndOrder ){
|
||||
ChunkManager* DBConfig::turnOnSharding( const string& ns , ShardKeyPattern fieldsAndOrder ){
|
||||
if ( ! _partitioned )
|
||||
throw UserException( "not partitioned" );
|
||||
|
||||
ShardManager * info = _shards[ns];
|
||||
ChunkManager * info = _shards[ns];
|
||||
if ( info )
|
||||
return info;
|
||||
|
||||
@ -66,21 +66,21 @@ namespace mongo {
|
||||
|
||||
_sharded[ns] = fieldsAndOrder;
|
||||
|
||||
info = new ShardManager( this , ns , fieldsAndOrder );
|
||||
info = new ChunkManager( this , ns , fieldsAndOrder );
|
||||
_shards[ns] = info;
|
||||
return info;
|
||||
|
||||
}
|
||||
|
||||
ShardManager* DBConfig::getShardManager( const string& ns , bool reload ){
|
||||
ShardManager* m = _shards[ns];
|
||||
ChunkManager* DBConfig::getChunkManager( const string& ns , bool reload ){
|
||||
ChunkManager* m = _shards[ns];
|
||||
if ( m && ! reload )
|
||||
return m;
|
||||
|
||||
uassert( (string)"not sharded:" + ns , sharded( ns ) );
|
||||
if ( m && reload )
|
||||
log() << "reloading shard info for: " << ns << endl;
|
||||
m = new ShardManager( this , ns , _sharded[ ns ] );
|
||||
m = new ChunkManager( this , ns , _sharded[ ns ] );
|
||||
_shards[ns] = m;
|
||||
return m;
|
||||
}
|
||||
@ -118,7 +118,7 @@ namespace mongo {
|
||||
|
||||
void DBConfig::save( bool check ){
|
||||
Model::save( check );
|
||||
for ( map<string,ShardManager*>::iterator i=_shards.begin(); i != _shards.end(); i++)
|
||||
for ( map<string,ChunkManager*>::iterator i=_shards.begin(); i != _shards.end(); i++)
|
||||
i->second->save();
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,7 @@ namespace mongo {
|
||||
extern ConfigServer configServer;
|
||||
extern Grid grid;
|
||||
|
||||
class ShardManager;
|
||||
class ChunkManager;
|
||||
|
||||
/**
|
||||
top level grid configuration for an entire database
|
||||
@ -55,14 +55,14 @@ namespace mongo {
|
||||
}
|
||||
|
||||
void turnOnPartitioning();
|
||||
ShardManager* turnOnSharding( const string& ns , ShardKeyPattern fieldsAndOrder );
|
||||
ChunkManager* turnOnSharding( const string& ns , ShardKeyPattern fieldsAndOrder );
|
||||
|
||||
/**
|
||||
* @return whether or not this partition is partitioned
|
||||
*/
|
||||
bool sharded( const string& ns );
|
||||
|
||||
ShardManager* getShardManager( const string& ns , bool reload = false );
|
||||
ChunkManager* getChunkManager( const string& ns , bool reload = false );
|
||||
|
||||
/**
|
||||
* @return the correct for machine for the ns
|
||||
@ -97,7 +97,7 @@ namespace mongo {
|
||||
bool _partitioned;
|
||||
|
||||
map<string,ShardKeyPattern> _sharded; // { "alleyinsider.blog.posts" : { ts : 1 } , ... ] - all ns that are sharded
|
||||
map<string,ShardManager*> _shards; // this will only have entries for things that have been looked at
|
||||
map<string,ChunkManager*> _shards; // this will only have entries for things that have been looked at
|
||||
|
||||
friend class Grid;
|
||||
};
|
||||
|
@ -27,7 +27,7 @@
|
||||
|
||||
#include "request.h"
|
||||
#include "config.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
@ -42,7 +42,7 @@ namespace mongo {
|
||||
_config = grid.getDBConfig( getns() );
|
||||
|
||||
if ( _config->sharded( getns() ) ){
|
||||
_shardInfo = _config->getShardManager( getns() , reload );
|
||||
_shardInfo = _config->getChunkManager( getns() , reload );
|
||||
uassert( (string)"no shard info for: " + getns() , _shardInfo );
|
||||
}
|
||||
else {
|
||||
@ -55,9 +55,9 @@ namespace mongo {
|
||||
|
||||
string Request::singleServerName(){
|
||||
if ( _shardInfo ){
|
||||
if ( _shardInfo->numShards() > 1 )
|
||||
if ( _shardInfo->numChunks() > 1 )
|
||||
throw UserException( "can't call singleServerName on a sharded collection" );
|
||||
return _shardInfo->findShard( _shardInfo->getShardKey().globalMin() ).getServer();
|
||||
return _shardInfo->findChunk( _shardInfo->getShardKey().globalMin() ).getServer();
|
||||
}
|
||||
string s = _config->getServer( getns() );
|
||||
uassert( "can't call singleServerName on a sharded collection!" , s.size() > 0 );
|
||||
|
@ -34,7 +34,7 @@ namespace mongo {
|
||||
return _config;
|
||||
}
|
||||
|
||||
ShardManager * getShardManager(){
|
||||
ChunkManager * getChunkManager(){
|
||||
return _shardInfo;
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ namespace mongo {
|
||||
|
||||
MSGID _id;
|
||||
DBConfig * _config;
|
||||
ShardManager * _shardInfo;
|
||||
ChunkManager * _shardInfo;
|
||||
};
|
||||
|
||||
class StaleConfigException : public std::exception {
|
||||
|
@ -25,7 +25,7 @@
|
||||
#include "server.h"
|
||||
#include "request.h"
|
||||
#include "config.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
|
@ -17,7 +17,7 @@
|
||||
*/
|
||||
|
||||
#include "stdafx.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
#include "../db/jsobj.h"
|
||||
#include "../util/unittest.h"
|
||||
|
||||
@ -336,17 +336,11 @@ normal:
|
||||
return L.woCompare(q) <= 0 && R.woCompare(q) > 0;
|
||||
}
|
||||
|
||||
bool ShardKeyPattern::relevantForQuery( const BSONObj& query , Shard * shard ){
|
||||
/* if ( ! hasShardKey( query ) ){
|
||||
// if the shard key isn't in the query, then we have to go everywhere
|
||||
// therefore this shard is relevant
|
||||
return true;
|
||||
}
|
||||
*/
|
||||
bool ShardKeyPattern::relevantForQuery( const BSONObj& query , Chunk * chunk ){
|
||||
massert("not done for compound patterns", patternfields.size() == 1);
|
||||
|
||||
bool rel = relevant(query, shard->getMin(), shard->getMax());
|
||||
if( !hasShardKey( query ) )
|
||||
bool rel = relevant(query, chunk->getMin(), chunk->getMax());
|
||||
if( ! hasShardKey( query ) )
|
||||
assert(rel);
|
||||
|
||||
return rel;
|
||||
@ -464,15 +458,15 @@ normal:
|
||||
void rfq() {
|
||||
ShardKeyPattern k( BSON( "key" << 1 ) );
|
||||
BSONObj q = BSON( "key" << 3 );
|
||||
Shard s(0);
|
||||
Chunk c(0);
|
||||
BSONObj z = fromjson("{ ns : \"alleyinsider.fs.chunks\" , min : {key:2} , max : {key:20} , server : \"localhost:30001\" }");
|
||||
s.unserialize(z);
|
||||
assert( k.relevantForQuery(q, &s) );
|
||||
assert( k.relevantForQuery(fromjson("{foo:9,key:4}"), &s) );
|
||||
assert( !k.relevantForQuery(fromjson("{foo:9,key:43}"), &s) );
|
||||
assert( k.relevantForQuery(fromjson("{foo:9,key:{$gt:10}}"), &s) );
|
||||
assert( !k.relevantForQuery(fromjson("{foo:9,key:{$gt:22}}"), &s) );
|
||||
assert( k.relevantForQuery(fromjson("{foo:9}"), &s) );
|
||||
c.unserialize(z);
|
||||
assert( k.relevantForQuery(q, &c) );
|
||||
assert( k.relevantForQuery(fromjson("{foo:9,key:4}"), &c) );
|
||||
assert( !k.relevantForQuery(fromjson("{foo:9,key:43}"), &c) );
|
||||
assert( k.relevantForQuery(fromjson("{foo:9,key:{$gt:10}}"), &c) );
|
||||
assert( !k.relevantForQuery(fromjson("{foo:9,key:{$gt:22}}"), &c) );
|
||||
assert( k.relevantForQuery(fromjson("{foo:9}"), &c) );
|
||||
}
|
||||
void getfilt() {
|
||||
ShardKeyPattern k( BSON( "key" << 1 ) );
|
||||
|
@ -22,7 +22,7 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class Shard;
|
||||
class Chunk;
|
||||
|
||||
/* A ShardKeyPattern is a pattern indicating what data to extract from the object to make the shard key from.
|
||||
Analogous to an index key pattern.
|
||||
@ -84,7 +84,7 @@ namespace mongo {
|
||||
s: x:2..x:7
|
||||
-> true
|
||||
*/
|
||||
bool relevantForQuery( const BSONObj& q , Shard * s );
|
||||
bool relevantForQuery( const BSONObj& q , Chunk * s );
|
||||
|
||||
/**
|
||||
Returns if the given sort pattern can be ordered by the shard key pattern.
|
||||
|
@ -5,7 +5,6 @@
|
||||
#include "../util/background.h"
|
||||
#include "../client/connpool.h"
|
||||
#include "../db/commands.h"
|
||||
#include "shard.h"
|
||||
#include "server.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -99,7 +98,7 @@ namespace mongo {
|
||||
Message m( (void*)data["msg"].binData( len ) , false );
|
||||
massert( "invalid writeback message" , m.data->valid() );
|
||||
|
||||
grid.getDBConfig( ns )->getShardManager( ns , true );
|
||||
grid.getDBConfig( ns )->getChunkManager( ns , true );
|
||||
|
||||
Request r( m , 0 );
|
||||
r.process();
|
||||
@ -155,7 +154,7 @@ namespace mongo {
|
||||
return;
|
||||
|
||||
|
||||
ShardManager * manager = conf->getShardManager( ns , authoritative );
|
||||
ChunkManager * manager = conf->getChunkManager( ns , authoritative );
|
||||
|
||||
unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ];
|
||||
if ( manager->getSequenceNumber() == sequenceNumber )
|
||||
@ -163,7 +162,7 @@ namespace mongo {
|
||||
|
||||
log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << manager->getSequenceNumber() << endl;
|
||||
|
||||
ServerShardVersion version = manager->getVersion( conn.getServerAddress() );
|
||||
ShardChunkVersion version = manager->getVersion( conn.getServerAddress() );
|
||||
|
||||
BSONObj result;
|
||||
if ( setShardVersion( conn , ns , version , authoritative , result ) ){
|
||||
@ -186,7 +185,7 @@ namespace mongo {
|
||||
massert( "setShardVersion failed!" , 0 );
|
||||
}
|
||||
|
||||
bool setShardVersion( DBClientBase & conn , const string& ns , ServerShardVersion version , bool authoritative , BSONObj& result ){
|
||||
bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
|
||||
|
||||
BSONObjBuilder cmdBuilder;
|
||||
cmdBuilder.append( "setShardVersion" , ns.c_str() );
|
||||
|
@ -3,7 +3,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "../stdafx.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
#include "request.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -29,7 +29,7 @@ namespace mongo {
|
||||
|
||||
void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false );
|
||||
|
||||
bool setShardVersion( DBClientBase & conn , const string& ns , ServerShardVersion version , bool authoritative , BSONObj& result );
|
||||
bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result );
|
||||
|
||||
bool lockNamespaceOnServer( const string& server , const string& ns );
|
||||
bool lockNamespaceOnServer( DBClientBase& conn , const string& ns );
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
#include "stdafx.h"
|
||||
#include "request.h"
|
||||
#include "shard.h"
|
||||
#include "chunk.h"
|
||||
#include "cursors.h"
|
||||
#include "../client/connpool.h"
|
||||
#include "../db/commands.h"
|
||||
@ -19,17 +19,17 @@ namespace mongo {
|
||||
if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") )
|
||||
throw UserException( "something is wrong, shouldn't see a command here" );
|
||||
|
||||
ShardManager * info = r.getShardManager();
|
||||
ChunkManager * info = r.getChunkManager();
|
||||
assert( info );
|
||||
|
||||
Query query( q.query );
|
||||
|
||||
vector<Shard*> shards;
|
||||
info->getShardsForQuery( shards , query.getFilter() );
|
||||
vector<Chunk*> shards;
|
||||
info->getChunksForQuery( shards , query.getFilter() );
|
||||
|
||||
set<ServerAndQuery> servers;
|
||||
map<string,int> serverCounts;
|
||||
for ( vector<Shard*>::iterator i = shards.begin(); i != shards.end(); i++ ){
|
||||
for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){
|
||||
servers.insert( (*i)->getServer() );
|
||||
int& num = serverCounts[(*i)->getServer()];
|
||||
num++;
|
||||
@ -48,8 +48,8 @@ namespace mongo {
|
||||
if ( shardKeyOrder ){
|
||||
// 2. sort on shard key, can do in serial intelligently
|
||||
set<ServerAndQuery> buckets;
|
||||
for ( vector<Shard*>::iterator i = shards.begin(); i != shards.end(); i++ ){
|
||||
Shard * s = *i;
|
||||
for ( vector<Chunk*>::iterator i = shards.begin(); i != shards.end(); i++ ){
|
||||
Chunk * s = *i;
|
||||
BSONObj extra = BSONObj();
|
||||
if ( serverCounts[s->getServer()] > 1 ){
|
||||
BSONObjBuilder b;
|
||||
@ -98,7 +98,7 @@ namespace mongo {
|
||||
cursorCache.remove( id );
|
||||
}
|
||||
|
||||
void _insert( Request& r , DbMessage& d, ShardManager* manager ){
|
||||
void _insert( Request& r , DbMessage& d, ChunkManager* manager ){
|
||||
|
||||
while ( d.moreJSObjs() ){
|
||||
BSONObj o = d.nextJsObj();
|
||||
@ -107,15 +107,15 @@ namespace mongo {
|
||||
throw UserException( "tried to insert object without shard key" );
|
||||
}
|
||||
|
||||
Shard& s = manager->findShard( o );
|
||||
log(4) << " server:" << s.getServer() << " " << o << endl;
|
||||
insert( s.getServer() , r.getns() , o );
|
||||
Chunk& c = manager->findChunk( o );
|
||||
log(4) << " server:" << c.getServer() << " " << o << endl;
|
||||
insert( c.getServer() , r.getns() , o );
|
||||
|
||||
s.splitIfShould( o.objsize() );
|
||||
c.splitIfShould( o.objsize() );
|
||||
}
|
||||
}
|
||||
|
||||
void _update( Request& r , DbMessage& d, ShardManager* manager ){
|
||||
void _update( Request& r , DbMessage& d, ChunkManager* manager ){
|
||||
int flags = d.pullInt();
|
||||
|
||||
BSONObj query = d.nextJsObj();
|
||||
@ -138,13 +138,13 @@ namespace mongo {
|
||||
throw UserException( "change would move shards!" );
|
||||
}
|
||||
|
||||
Shard& s = manager->findShard( toupdate );
|
||||
doWrite( dbUpdate , r , s.getServer() );
|
||||
Chunk& c = manager->findChunk( toupdate );
|
||||
doWrite( dbUpdate , r , c.getServer() );
|
||||
|
||||
s.splitIfShould( d.msg().data->dataLen() );
|
||||
c.splitIfShould( d.msg().data->dataLen() );
|
||||
}
|
||||
|
||||
void _delete( Request& r , DbMessage& d, ShardManager* manager ){
|
||||
void _delete( Request& r , DbMessage& d, ChunkManager* manager ){
|
||||
|
||||
int flags = d.pullInt();
|
||||
bool justOne = flags & 1;
|
||||
@ -153,24 +153,24 @@ namespace mongo {
|
||||
BSONObj pattern = d.nextJsObj();
|
||||
|
||||
if ( manager->hasShardKey( pattern ) ){
|
||||
Shard& s = manager->findShard( pattern );
|
||||
doWrite( dbDelete , r , s.getServer() );
|
||||
Chunk& c = manager->findChunk( pattern );
|
||||
doWrite( dbDelete , r , c.getServer() );
|
||||
return;
|
||||
}
|
||||
|
||||
if ( ! justOne && ! pattern.hasField( "_id" ) )
|
||||
throw UserException( "can only delete with a non-shard key pattern if can delete as many as we find" );
|
||||
|
||||
vector<Shard*> shards;
|
||||
manager->getShardsForQuery( shards , pattern );
|
||||
vector<Chunk*> chunks;
|
||||
manager->getChunksForQuery( chunks , pattern );
|
||||
|
||||
set<string> seen;
|
||||
for ( vector<Shard*>::iterator i=shards.begin(); i!=shards.end(); i++){
|
||||
Shard * s = *i;
|
||||
if ( seen.count( s->getServer() ) )
|
||||
for ( vector<Chunk*>::iterator i=chunks.begin(); i!=chunks.end(); i++){
|
||||
Chunk * c = *i;
|
||||
if ( seen.count( c->getServer() ) )
|
||||
continue;
|
||||
seen.insert( s->getServer() );
|
||||
doWrite( dbDelete , r , s->getServer() );
|
||||
seen.insert( c->getServer() );
|
||||
doWrite( dbDelete , r , c->getServer() );
|
||||
}
|
||||
}
|
||||
|
||||
@ -179,7 +179,7 @@ namespace mongo {
|
||||
log(3) << "write: " << ns << endl;
|
||||
|
||||
DbMessage& d = r.d();
|
||||
ShardManager * info = r.getShardManager();
|
||||
ChunkManager * info = r.getChunkManager();
|
||||
assert( info );
|
||||
|
||||
if ( op == dbInsert ){
|
||||
|
Loading…
Reference in New Issue
Block a user