0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00
mongodb/s/shard.cpp

424 lines
13 KiB
C++
Raw Normal View History

2008-11-09 23:49:37 +01:00
// shard.cpp
2008-09-15 15:14:42 +02:00
/**
2009-02-18 16:10:39 +01:00
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2008-09-15 15:14:42 +02:00
#include "stdafx.h"
2008-11-09 23:49:37 +01:00
#include "shard.h"
2009-02-13 03:09:06 +01:00
#include "config.h"
2009-02-18 16:10:39 +01:00
#include "../util/unittest.h"
#include "../client/connpool.h"
2009-04-03 20:21:00 +02:00
#include "strategy.h"
2008-09-15 15:14:42 +02:00
2009-01-14 23:09:51 +01:00
namespace mongo {
// ------- Shard --------
2009-02-03 23:10:44 +01:00
Shard::Shard( ShardManager * manager ) : _manager( manager ){
2009-02-20 19:46:57 +01:00
_modified = false;
_lastmod = 0;
}
2009-02-20 19:46:57 +01:00
void Shard::setServer( string s ){
_server = s;
2009-02-27 18:51:49 +01:00
_markModified();
}
bool Shard::contains( const BSONObj& obj ){
return
_manager->getShardKey().compare( getMin() , obj ) <= 0 &&
_manager->getShardKey().compare( obj , getMax() ) < 0;
}
2009-03-30 20:33:40 +02:00
BSONObj Shard::pickSplitPoint(){
int sort = 0;
if ( _manager->getShardKey().globalMin().woCompare( getMin() ) == 0 ){
sort = 1;
}
else if ( _manager->getShardKey().globalMax().woCompare( getMax() ) == 0 ){
sort = -1;
}
if ( sort ){
ScopedDbConnection conn( getServer() );
Query q;
if ( sort == 1 )
q.sort( _manager->getShardKey().key() );
else {
BSONObj k = _manager->getShardKey().key();
BSONObjBuilder r;
BSONObjIterator i(k);
while( i.more() ) {
BSONElement e = i.next();
if ( e.eoo() )
break;
uassert( "can only handle numbers here - which i think is correct" , e.isNumber() );
r.append( e.fieldName() , -1 * e.number() );
}
q.sort( r.obj() );
}
BSONObj end = conn->findOne( _ns , q );
conn.done();
if ( ! end.isEmpty() )
return _manager->getShardKey().extractKey( end );
}
ScopedDbConnection conn( getServer() );
BSONObj result;
uassert( "medianKey failed!" , conn->runCommand( "admin" , BSON( "medianKey" << _ns
<< "keyPattern" << _manager->getShardKey().key()
<< "min" << getMin()
<< "max" << getMax()
) , result ) );
conn.done();
2009-04-02 21:43:53 +02:00
return result.getObjectField( "median" ).getOwned();
2009-03-30 20:33:40 +02:00
}
Shard * Shard::split(){
2009-03-30 20:33:40 +02:00
return split( pickSplitPoint() );
2009-02-19 23:32:19 +01:00
}
2009-02-27 20:23:52 +01:00
Shard * Shard::split( const BSONObj& m ){
uassert( "can't split as shard that doesn't have a manager" , _manager );
2009-02-27 20:23:52 +01:00
log(1) << " before split on: " << m << "\n"
<< "\t self : " << toString() << endl;
Shard * s = new Shard( _manager );
s->_ns = _ns;
s->_server = _server;
s->_min = m.getOwned();
s->_max = _max;
2009-02-27 18:51:49 +01:00
s->_markModified();
_markModified();
_manager->_shards.push_back( s );
_max = m.getOwned();
2009-02-27 20:23:52 +01:00
log(1) << " after split:\n"
<< "\t left : " << toString() << "\n"
<< "\t right: "<< s->toString() << endl;
return s;
}
2009-04-03 19:52:06 +02:00
bool Shard::moveAndCommit( const string& to , string& errmsg ){
log() << "moving shard ns: " << _ns << " moving shard: " << toString() << " " << _server << " -> " << to << endl;
string from = _server;
2009-04-03 20:21:00 +02:00
lockNamespaceOnServer( from , _ns );
2009-04-03 19:52:06 +02:00
// copyCollection
ScopedDbConnection toconn( to );
BSONObj cloneRes;
BSONObj filter;
{
BSONObjBuilder b;
getFilter( b );
filter = b.obj();
}
bool worked = toconn->runCommand( _manager->_config->getName().c_str() ,
BSON( "cloneCollection" << _ns <<
"from" << from <<
"query" << filter
) ,
cloneRes
);
toconn.done();
if ( ! worked ){
errmsg = (string)"cloneCollection failed: " + cloneRes.toString();
return false;
}
// update config db
setServer( to );
// need to increment version # for old server
Shard * randomShardOnOldServer = _manager->findShardOnServer( from );
if ( randomShardOnOldServer )
randomShardOnOldServer->_markModified();
_manager->save();
// delete old data
ScopedDbConnection fromconn( from );
fromconn->remove( _ns.c_str() , filter );
string removeerror = fromconn->getLastError();
fromconn.done();
if ( removeerror.size() ){
errmsg = (string)"error removing old data:" + removeerror;
return false;
}
return true;
}
bool Shard::operator==( const Shard& s ){
return
_manager->getShardKey().compare( _min , s._min ) == 0 &&
_manager->getShardKey().compare( _max , s._max ) == 0
;
}
2009-02-20 19:46:57 +01:00
void Shard::getFilter( BSONObjBuilder& b ){
_manager->_key.getFilter( b , _min , _max );
2009-02-20 19:46:57 +01:00
}
2009-02-27 18:51:49 +01:00
void Shard::serialize(BSONObjBuilder& to){
if ( _lastmod )
to.appendDate( "lastmod" , _lastmod );
else
to.appendTimestamp( "lastmod" );
to << "ns" << _ns;
to << "min" << _min;
to << "max" << _max;
to << "server" << _server;
}
2009-02-27 18:51:49 +01:00
2009-02-27 16:37:13 +01:00
void Shard::unserialize(const BSONObj& from){
_ns = from.getStringField( "ns" );
_min = from.getObjectField( "min" ).getOwned();
_max = from.getObjectField( "max" ).getOwned();
_server = from.getStringField( "server" );
2009-02-27 18:51:49 +01:00
_lastmod = from.hasField( "lastmod" ) ? from["lastmod"].date() : 0;
uassert( "Shard needs a ns" , ! _ns.empty() );
uassert( "Shard needs a server" , ! _ns.empty() );
2009-02-20 19:46:57 +01:00
uassert( "Shard needs a min" , ! _min.isEmpty() );
uassert( "Shard needs a max" , ! _max.isEmpty() );
}
string Shard::modelServer() {
// TODO: this could move around?
return configServer.modelServer();
2009-02-20 19:46:57 +01:00
}
2009-02-27 18:51:49 +01:00
void Shard::_markModified(){
_modified = true;
unsigned long long t = time(0);
t *= 1000;
_lastmod = 0;
2009-02-27 18:51:49 +01:00
}
2009-03-27 21:55:26 +01:00
void Shard::save( bool check ){
cout << "HERE: " << _id << endl;
bool reload = ! _lastmod;
Model::save( check );
cout << "\t" << _id << endl;
if ( reload ){
// need to do this so that we get the new _lastMod and therefore version number
massert( "_id has to be filled in already" , ! _id.isEmpty() );
string b = toString();
BSONObj q = _id.copy();
massert( "how could load fail?" , load( q ) );
cout << "before: " << q << "\t" << b << endl;
cout << "after : " << _id << "\t" << toString() << endl;
massert( "shard reload changed content!" , b == toString() );
massert( "id changed!" , q["_id"] == _id["_id"] );
}
}
void Shard::ensureIndex(){
ScopedDbConnection conn( getServer() );
conn->ensureIndex( _ns , _manager->getShardKey().key() );
conn.done();
}
2009-03-27 21:55:26 +01:00
string Shard::toString() const {
stringstream ss;
ss << "shard ns:" << _ns << " server: " << _server << " min: " << _min << " max: " << _max;
return ss.str();
}
// ------- ShardManager --------
2009-03-30 16:50:10 +02:00
unsigned long long ShardManager::NextSequenceNumber = 1;
ShardManager::ShardManager( DBConfig * config , string ns , ShardKeyPattern pattern ) : _config( config ) , _ns( ns ) , _key( pattern ){
Shard temp(0);
ScopedDbConnection conn( temp.modelServer() );
auto_ptr<DBClientCursor> cursor = conn->query( temp.getNS() , BSON( "ns" << ns ) );
while ( cursor->more() ){
Shard * s = new Shard( this );
BSONObj d = cursor->next();
s->unserialize( d );
_shards.push_back( s );
2009-03-27 21:55:26 +01:00
s->_id = d["_id"].wrap().getOwned();
}
conn.done();
if ( _shards.size() == 0 ){
Shard * s = new Shard( this );
s->_ns = ns;
s->_min = _key.globalMin();
s->_max = _key.globalMax();
s->_server = config->getPrimary();
2009-02-27 18:51:49 +01:00
s->_markModified();
2009-03-27 21:55:26 +01:00
_shards.push_back( s );
2009-03-30 16:50:10 +02:00
log() << "no shards for:" << ns << " so creating first: " << s->toString() << endl;
}
2009-03-30 16:50:10 +02:00
_sequenceNumber = ++NextSequenceNumber;
}
ShardManager::~ShardManager(){
for ( vector<Shard*>::iterator i=_shards.begin(); i != _shards.end(); i++ ){
delete( *i );
}
_shards.clear();
}
bool ShardManager::hasShardKey( const BSONObj& obj ){
2009-02-20 16:46:42 +01:00
return _key.hasShardKey( obj );
}
Shard& ShardManager::findShard( const BSONObj & obj ){
for ( vector<Shard*>::iterator i=_shards.begin(); i != _shards.end(); i++ ){
Shard * s = *i;
if ( s->contains( obj ) )
return *s;
}
throw UserException( "couldn't find a shard which should be impossible" );
}
2009-03-27 21:55:26 +01:00
Shard* ShardManager::findShardOnServer( const string& server ) const {
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
Shard* s = *i;
if ( s->getServer() == server )
return s;
}
return 0;
}
int ShardManager::getShardsForQuery( vector<Shard*>& shards , const BSONObj& query ){
int added = 0;
for ( vector<Shard*>::iterator i=_shards.begin(); i != _shards.end(); i++ ){
Shard* s = *i;
if ( _key.relevantForQuery( query , s ) ){
shards.push_back( s );
added++;
}
}
return added;
}
2009-03-25 22:35:38 +01:00
void ShardManager::ensureIndex(){
set<string> seen;
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
Shard* s = *i;
if ( seen.count( s->getServer() ) )
continue;
seen.insert( s->getServer() );
s->ensureIndex();
}
}
void ShardManager::save(){
2009-03-27 21:55:26 +01:00
ServerShardVersion a = getVersion();
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
Shard* s = *i;
if ( ! s->_modified )
continue;
s->save( true );
2009-03-30 16:50:10 +02:00
_sequenceNumber = ++NextSequenceNumber;
2009-02-17 21:34:52 +01:00
}
2009-03-27 21:55:26 +01:00
massert( "how did version get smalled" , getVersion() >= a );
ensureIndex(); // TODO: this is too aggressive - but not really sooo bad
2009-03-27 21:55:26 +01:00
}
2009-03-25 22:35:38 +01:00
ServerShardVersion ShardManager::getVersion( const string& server ) const{
// TODO: cache or something?
ServerShardVersion max = 0;
2009-03-27 21:55:26 +01:00
2009-03-25 22:35:38 +01:00
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
Shard* s = *i;
if ( s->getServer() != server )
continue;
if ( s->_lastmod > max )
max = s->_lastmod;
}
return max;
}
2009-03-27 21:55:26 +01:00
ServerShardVersion ShardManager::getVersion() const{
ServerShardVersion max = 0;
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
Shard* s = *i;
if ( s->_lastmod > max )
max = s->_lastmod;
}
return max;
}
string ShardManager::toString() const {
stringstream ss;
ss << "ShardManager: " << _ns << " key:" << _key.toString() << "\n";
for ( vector<Shard*>::const_iterator i=_shards.begin(); i!=_shards.end(); i++ ){
const Shard* s = *i;
ss << "\t" << s->toString() << "\n";
}
return ss.str();
}
2009-02-18 16:10:39 +01:00
class ShardObjUnitTest : public UnitTest {
public:
void runShard(){
}
void run(){
runShard();
2009-02-18 16:10:39 +01:00
log(1) << "shardObjTest passed" << endl;
}
} shardObjTest;
2009-01-14 23:09:51 +01:00
} // namespace mongo