mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-30 00:56:44 +01:00
getMore checkpoint
This commit is contained in:
parent
7b815a4a98
commit
4c98640678
@ -16,6 +16,7 @@ namespace mongo {
|
||||
_ntoreturn = q.ntoreturn;
|
||||
|
||||
_totalSent = 0;
|
||||
_done = false;
|
||||
|
||||
if ( q.fields.get() ){
|
||||
BSONObjBuilder b;
|
||||
@ -77,7 +78,9 @@ namespace mongo {
|
||||
return s.simplifiedQuery();
|
||||
}
|
||||
|
||||
bool ShardedCursor::sendNextBatch( Request& r ){
|
||||
bool ShardedCursor::sendNextBatch( Request& r , int ntoreturn ){
|
||||
uassert( "cursor already done" , ! _done );
|
||||
|
||||
int maxSize = 1024 * 1024;
|
||||
if ( _totalSent > 0 )
|
||||
maxSize *= 3;
|
||||
@ -87,7 +90,7 @@ namespace mongo {
|
||||
int num = 0;
|
||||
bool sendMore = true;
|
||||
|
||||
cout << "TEMP: ShardedCursor " << _ns << "\t" << _query << " ntoreturn: " << _ntoreturn << endl;
|
||||
cout << "TEMP: ShardedCursor " << _ns << "\t" << _query << " ntoreturn: " << ntoreturn << endl;
|
||||
while ( more() ){
|
||||
BSONObj o = next();
|
||||
cout << "\t" << o << endl;
|
||||
@ -98,12 +101,12 @@ namespace mongo {
|
||||
if ( b.len() > maxSize )
|
||||
break;
|
||||
|
||||
if ( num == _ntoreturn ){
|
||||
if ( num == ntoreturn ){
|
||||
// soft limit aka batch size
|
||||
break;
|
||||
}
|
||||
|
||||
if ( ( -1 * num + _totalSent ) == _ntoreturn ){
|
||||
if ( ( -1 * num + _totalSent ) == ntoreturn ){
|
||||
// hard limit - total to send
|
||||
sendMore = false;
|
||||
break;
|
||||
@ -114,6 +117,7 @@ namespace mongo {
|
||||
log() << " hasMore:" << hasMore << " id:" << _id << endl;
|
||||
replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , 0 , hasMore ? _id : 0 );
|
||||
_totalSent += num;
|
||||
_done = ! hasMore;
|
||||
return hasMore;
|
||||
}
|
||||
|
||||
@ -229,4 +233,28 @@ namespace mongo {
|
||||
|
||||
}
|
||||
|
||||
CursorCache::CursorCache(){
|
||||
}
|
||||
|
||||
CursorCache::~CursorCache(){
|
||||
// TODO: delete old cursors?
|
||||
}
|
||||
|
||||
ShardedCursor* CursorCache::get( long long id ){
|
||||
map<long long,ShardedCursor*>::iterator i = _cursors.find( id );
|
||||
if ( i == _cursors.end() ){
|
||||
OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl;
|
||||
return 0;
|
||||
}
|
||||
return i->second;
|
||||
}
|
||||
|
||||
void CursorCache::store( ShardedCursor * cursor ){
|
||||
_cursors[cursor->getId()] = cursor;
|
||||
}
|
||||
void CursorCache::remove( long long id ){
|
||||
_cursors.erase( id );
|
||||
}
|
||||
|
||||
CursorCache cursorCache;
|
||||
}
|
||||
|
21
s/cursors.h
21
s/cursors.h
@ -19,11 +19,14 @@ namespace mongo {
|
||||
|
||||
virtual bool more() = 0;
|
||||
virtual BSONObj next() = 0;
|
||||
|
||||
long long getId(){ return _id; }
|
||||
|
||||
/**
|
||||
* @return whether there is more data left
|
||||
*/
|
||||
bool sendNextBatch( Request& r );
|
||||
bool sendNextBatch( Request& r ){ return sendNextBatch( r , _ntoreturn ); }
|
||||
bool sendNextBatch( Request& r , int ntoreturn );
|
||||
|
||||
protected:
|
||||
auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = emptyObj );
|
||||
@ -42,6 +45,7 @@ namespace mongo {
|
||||
long long _id;
|
||||
|
||||
int _totalSent;
|
||||
bool _done;
|
||||
};
|
||||
|
||||
|
||||
@ -95,4 +99,19 @@ namespace mongo {
|
||||
auto_ptr<DBClientCursor> * _cursors;
|
||||
BSONObj * _nexts;
|
||||
};
|
||||
|
||||
class CursorCache {
|
||||
public:
|
||||
CursorCache();
|
||||
~CursorCache();
|
||||
|
||||
ShardedCursor* get( long long id );
|
||||
void store( ShardedCursor* cursor );
|
||||
void remove( long long id );
|
||||
|
||||
private:
|
||||
map<long long,ShardedCursor*> _cursors;
|
||||
};
|
||||
|
||||
extern CursorCache cursorCache;
|
||||
}
|
||||
|
@ -76,12 +76,24 @@ namespace mongo {
|
||||
delete( cursor );
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: store cursor
|
||||
cout << "STORING CURSOR : " << cursor->getId() << endl;
|
||||
cursorCache.store( cursor );
|
||||
}
|
||||
|
||||
virtual void getMore( Request& r ){
|
||||
throw UserException( "shard getMore doesn't work yet" );
|
||||
int ntoreturn = r.d().pullInt();
|
||||
long long id = r.d().pullInt64();
|
||||
|
||||
ShardedCursor * cursor = cursorCache.get( id );
|
||||
uassert( "can't find cursor" , cursor );
|
||||
|
||||
cout << "GOT CURSOR : " << id << endl;
|
||||
|
||||
if ( cursor->sendNextBatch( r , ntoreturn ) )
|
||||
return;
|
||||
|
||||
delete( cursor );
|
||||
cursorCache.remove( id );
|
||||
}
|
||||
|
||||
virtual void writeOp( int op , Request& r ){
|
||||
|
Loading…
Reference in New Issue
Block a user