0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 01:21:03 +01:00
mongodb/s/strategy_shard.cpp

166 lines
6.0 KiB
C++
Raw Normal View History

2009-02-20 16:46:42 +01:00
// strategy_sharded.cpp
#include "stdafx.h"
#include "request.h"
#include "shard.h"
2009-02-25 06:57:31 +01:00
#include "cursors.h"
2009-02-20 16:46:42 +01:00
#include "../client/connpool.h"
#include "../db/commands.h"
namespace mongo {
2009-02-20 16:46:42 +01:00
class ShardStrategy : public Strategy {
virtual void queryOp( Request& r ){
QueryMessage q( r.d() );
log(3) << "shard query: " << q.ns << " " << q.query << endl;
if ( q.ntoreturn == 1 && strstr(q.ns, ".$cmd") )
throw UserException( "something is wrong, shouldn't see a command here" );
ShardManager * info = r.getShardManager();
assert( info );
Query query( q.query );
vector<Shard*> shards;
info->getShardsForQuery( shards , query.getFilter() );
2009-02-24 17:59:43 +01:00
set<ServerAndQuery> servers;
map<string,int> serverCounts;
for ( vector<Shard*>::iterator i = shards.begin(); i != shards.end(); i++ ){
servers.insert( (*i)->getServer() );
2009-02-24 17:59:43 +01:00
int& num = serverCounts[(*i)->getServer()];
num++;
}
if ( servers.size() == 1 ){
2009-02-24 17:59:43 +01:00
doQuery( r , servers.begin()->_server );
2009-02-20 16:46:42 +01:00
return;
}
2009-02-24 23:14:11 +01:00
ShardedCursor * cursor = 0;
2009-02-24 17:59:43 +01:00
BSONObj sort = query.getSort();
if ( sort.isEmpty() ){
// 1. no sort, can just hit them in serial
2009-02-24 03:47:25 +01:00
cursor = new SerialServerShardedCursor( servers , q );
}
else {
int shardKeyOrder = info->getShardKey().canOrder( sort );
2009-02-24 17:59:43 +01:00
if ( shardKeyOrder ){
// 2. sort on shard key, can do in serial intelligently
set<ServerAndQuery> buckets;
for ( vector<Shard*>::iterator i = shards.begin(); i != shards.end(); i++ ){
Shard * s = *i;
2009-02-24 19:28:24 +01:00
BSONObj extra = emptyObj;
if ( serverCounts[s->getServer()] > 1 ){
BSONObjBuilder b;
s->getFilter( b );
extra = b.obj();
}
buckets.insert( ServerAndQuery( s->getServer() , extra , s->getMin() ) );
2009-02-24 17:59:43 +01:00
}
cursor = new SerialServerShardedCursor( buckets , q , shardKeyOrder );
}
else {
// 3. sort on non-sharded key, pull back a portion from each server and iterate slowly
2009-02-24 23:14:11 +01:00
cursor = new ParallelSortShardedCursor( servers , q , sort );
2009-02-24 17:59:43 +01:00
}
}
assert( cursor );
2009-02-25 06:40:38 +01:00
if ( ! cursor->sendNextBatch( r ) ){
delete( cursor );
return;
}
log(6) << "storing cursor : " << cursor->getId() << endl;
2009-02-25 07:23:58 +01:00
cursorCache.store( cursor );
2009-02-20 16:46:42 +01:00
}
virtual void getMore( Request& r ){
2009-02-25 07:23:58 +01:00
int ntoreturn = r.d().pullInt();
long long id = r.d().pullInt64();
log(6) << "want cursor : " << id << endl;
2009-02-25 07:23:58 +01:00
ShardedCursor * cursor = cursorCache.get( id );
if ( ! cursor ){
log(6) << "\t invalid cursor :(" << endl;
replyToQuery( QueryResult::ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 );
return;
}
2009-02-25 07:23:58 +01:00
if ( cursor->sendNextBatch( r , ntoreturn ) ){
log(6) << "\t cursor finished: " << id << endl;
2009-02-25 07:23:58 +01:00
return;
}
2009-02-25 07:23:58 +01:00
delete( cursor );
cursorCache.remove( id );
2009-02-20 16:46:42 +01:00
}
2009-02-26 21:02:18 +01:00
void receivedInsert( Request& r , DbMessage& d, ShardManager* manager ){
while ( d.moreJSObjs() ){
BSONObj o = d.nextJsObj();
if ( ! manager->hasShardKey( o ) ){
log() << "tried to insert object without shard key: " << r.getns() << " " << o << endl;
throw UserException( "tried to insert object without shard key" );
}
Shard& s = manager->findShard( o );
log(4) << " server:" << s.getServer() << " " << o << endl;
insert( s.getServer() , r.getns() , o );
}
}
void receivedUpdate( Request& r , DbMessage& d, ShardManager* manager ){
int flags = d.pullInt();
BSONObj query = d.nextJsObj();
uassert( "invalid update" , d.moreJSObjs() );
BSONObj toupdate = d.nextJsObj();
bool upsert = flags & 1;
if ( upsert && ! manager->hasShardKey( toupdate ) )
throw UserException( "can't upsert something without shard key" );
if ( ! manager->hasShardKey( query ) )
throw UserException( "can't do update with query that doesn't have the shard key" );
if ( manager->hasShardKey( toupdate ) && manager->getShardKey().compare( query , toupdate ) )
throw UserException( "change would move shards!" );
Shard& s = manager->findShard( toupdate );
doWrite( dbUpdate , r , s.getServer() );
}
2009-02-20 16:46:42 +01:00
virtual void writeOp( int op , Request& r ){
const char *ns = r.getns();
log(3) << "write: " << ns << endl;
DbMessage& d = r.d();
ShardManager * info = r.getShardManager();
2009-02-20 16:46:42 +01:00
assert( info );
if ( op == dbInsert ){
2009-02-26 21:02:18 +01:00
receivedInsert( r , d , info );
2009-02-20 16:46:42 +01:00
}
else if ( op == dbUpdate ){
2009-02-26 21:02:18 +01:00
receivedUpdate( r , d , info );
2009-02-20 16:46:42 +01:00
}
else {
log() << "sharding can't do write op: " << op << endl;
throw UserException( "can't do this write op on sharded collection" );
}
}
};
Strategy * SHARDED = new ShardStrategy();
}