0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

sharded cursor framework, serial unordered access works

This commit is contained in:
Eliot Horowitz 2009-02-23 22:55:58 -05:00
parent 8a87e7123b
commit 6198d911e5
4 changed files with 60 additions and 19 deletions

View File

@ -72,7 +72,10 @@ assert.eq( "eliot" , db.foo.findOne( { num : 1 } ).name );
assert.eq( "funny man" , db.foo.findOne( { num : -2 } ).name );
// TODO: getAll
//assert.eq( 3 , db.foo.find().length() );
assert.eq( 6 , db.foo.find().length() );
var sum = 0;
db.foo.find().toArray().forEach( function(z){ sum += z.num; } );
assert.eq( 3 , sum );
// TODO: sort by num

View File

@ -60,9 +60,44 @@ namespace mongo {
b.append( i->c_str() , 1 );
_fields = b.obj();
}
do {
_id = security.getNonce();
} while ( _id == 0 );
}
ShardedCursor::~ShardedCursor(){
}
auto_ptr<DBClientCursor> ShardedCursor::query( const string& server , int num ){
ScopedDbConnection conn( server );
auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , _query , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options );
conn.done();
return cursor;
}
void ShardedCursor::sendNextBatch( Request& r ){
BufBuilder b(32768);
int num = 0;
cout << "TEMP: ShardedCursor " << _ns << "\t" << _query << endl;
while ( more() ){
BSONObj o = next();
cout << "\t" << o << endl;
b.append( (void*)o.objdata() , o.objsize() );
num++;
if ( b.len() > 2097152 ){
// TEMP
break;
}
}
uassert( "can't handle getMore with sharding yet" , ! more() );
replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num );
}
}

View File

@ -25,10 +25,13 @@ namespace mongo {
ShardedCursor( QueryMessage& q );
virtual ~ShardedCursor();
virtual void sendNextBatch( Request& r ) = 0;
virtual bool more() = 0;
virtual BSONObj next() = 0;
void sendNextBatch( Request& r );
protected:
auto_ptr<DBClientCursor> query( const string& server , int num );
auto_ptr<DBClientCursor> query( const string& server , int num = 0 );
string _ns;
int _options;

View File

@ -8,33 +8,33 @@
namespace mongo {
class DownstreamServerState {
public:
DownstreamServerState( string name ) : _name( name ) , _used(0){
}
string _name;
bool _used;
long long _cursor;
};
class SerialServerShardedCursor : public ShardedCursor {
public:
SerialServerShardedCursor( set<string> servers , QueryMessage& q ) : ShardedCursor( q ){
for ( set<string>::iterator i = servers.begin(); i!=servers.end(); i++ )
_servers.push_back( DownstreamServerState( *i ) );
_servers.push_back( *i );
_serverIndex = 0;
}
virtual void sendNextBatch( Request& r ){
while ( _serverIndex < _servers.size() ){
}
throw UserException( "SerialServerShardedCursor doesn't work yet" );
virtual bool more(){
if ( _current.get() && _current->more() )
return true;
if ( _serverIndex >= _servers.size() )
return false;
_current = query( _servers[_serverIndex++] );
return _current->more();
}
virtual BSONObj next(){
uassert( "no more items" , more() );
return _current->next();
}
private:
vector<DownstreamServerState> _servers;
vector<string> _servers;
int _serverIndex;
auto_ptr<DBClientCursor> _current;