0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 00:56:44 +01:00
mongodb/s/cursors.cpp
2009-02-25 00:57:31 -05:00

233 lines
6.7 KiB
C++

// cursors.cpp
#include "cursors.h"
#include "../client/connpool.h"
#include "../db/queryutil.h"
namespace mongo {
// -------- ShardedCursor -----------
ShardedCursor::ShardedCursor( QueryMessage& q ){
_ns = q.ns;
_query = q.query.copy();
_options = q.queryOptions;
_skip = q.ntoskip;
_ntoreturn = q.ntoreturn;
_totalSent = 0;
if ( q.fields.get() ){
BSONObjBuilder b;
for ( set<string>::iterator i=q.fields->begin(); i!=q.fields->end(); i++)
b.append( i->c_str() , 1 );
_fields = b.obj();
}
do {
_id = security.getNonce();
} while ( _id == 0 );
}
ShardedCursor::~ShardedCursor(){
}
auto_ptr<DBClientCursor> ShardedCursor::query( const string& server , int num , BSONObj extra ){
BSONObj q = _query;
if ( ! extra.isEmpty() ){
q = concatQuery( q , extra );
}
ScopedDbConnection conn( server );
log(5) << "ShardedCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << endl;
auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , _query , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options );
conn.done();
return cursor;
}
BSONObj ShardedCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){
if ( ! query.hasField( "query" ) )
return _concatFilter( query , extraFilter );
BSONObjBuilder b;
BSONObjIterator i( query );
while ( i.more() ){
BSONElement e = i.next();
if ( e.eoo() )
break;
if ( strcmp( e.fieldName() , "query" ) ){
b.append( e );
continue;
}
b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) );
}
return b.obj();
}
BSONObj ShardedCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){
BSONObjBuilder b;
b.appendElements( filter );
b.appendElements( extra );
FieldBoundSet s( "wrong" , b.obj() );
return s.simplifiedQuery();
}
bool ShardedCursor::sendNextBatch( Request& r ){
int maxSize = 1024 * 1024;
if ( _totalSent > 0 )
maxSize *= 3;
BufBuilder b(32768);
int num = 0;
bool sendMore = true;
cout << "TEMP: ShardedCursor " << _ns << "\t" << _query << " ntoreturn: " << _ntoreturn << endl;
while ( more() ){
BSONObj o = next();
cout << "\t" << o << endl;
b.append( (void*)o.objdata() , o.objsize() );
num++;
if ( b.len() > maxSize )
break;
if ( num == _ntoreturn ){
// soft limit aka batch size
break;
}
if ( ( -1 * num + _totalSent ) == _ntoreturn ){
// hard limit - total to send
sendMore = false;
break;
}
}
bool hasMore = sendMore && more();
log() << " hasMore:" << hasMore << " id:" << _id << endl;
replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , 0 , hasMore ? _id : 0 );
_totalSent += num;
return hasMore;
}
// -------- SerialServerShardedCursor -----------
SerialServerShardedCursor::SerialServerShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder) : ShardedCursor( q ){
for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ )
_servers.push_back( *i );
if ( sortOrder > 0 )
sort( _servers.begin() , _servers.end() );
else if ( sortOrder < 0 )
sort( _servers.rbegin() , _servers.rend() );
_serverIndex = 0;
}
bool SerialServerShardedCursor::more(){
if ( _current.get() && _current->more() )
return true;
if ( _serverIndex >= _servers.size() )
return false;
ServerAndQuery& sq = _servers[_serverIndex++];
_current = query( sq._server , 0 , sq._extra );
return _current->more();
}
BSONObj SerialServerShardedCursor::next(){
uassert( "no more items" , more() );
return _current->next();
}
// -------- ParallelSortShardedCursor -----------
ParallelSortShardedCursor::ParallelSortShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ) : ShardedCursor( q ) , _servers( servers ){
_numServers = servers.size();
_sortKey = sortKey;
_cursors = new auto_ptr<DBClientCursor>[_numServers];
_nexts = new BSONObj[_numServers];
// TODO: parellize
int num = 0;
for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){
const ServerAndQuery& sq = *i;
_cursors[num++] = query( sq._server , 0 , sq._extra );
}
}
ParallelSortShardedCursor::~ParallelSortShardedCursor(){
delete( _cursors );
}
bool ParallelSortShardedCursor::more(){
for ( int i=0; i<_numServers; i++ ){
if ( ! _nexts[i].isEmpty() )
return true;
if ( _cursors[i].get() && _cursors[i]->more() )
return true;
}
return false;
}
BSONObj ParallelSortShardedCursor::next(){
advance();
BSONObj best = emptyObj;
int bestFrom = -1;
for ( int i=0; i<_numServers; i++){
if ( _nexts[i].isEmpty() )
continue;
if ( best.isEmpty() ){
best = _nexts[i];
bestFrom = i;
continue;
}
int comp = best.woSortOrder( _nexts[i] , _sortKey );
if ( comp < 0 )
continue;
best = _nexts[i];
bestFrom = i;
}
uassert( "no more elements" , ! best.isEmpty() );
_nexts[bestFrom] = emptyObj;
return best;
}
void ParallelSortShardedCursor::advance(){
for ( int i=0; i<_numServers; i++ ){
if ( ! _nexts[i].isEmpty() ){
// already have a good object there
continue;
}
if ( ! _cursors[i]->more() ){
// cursor is dead, oh well
continue;
}
_nexts[i] = _cursors[i]->next();
}
}
}