diff --git a/s/cursors.cpp b/s/cursors.cpp index 0bd2487bf28..745501cc444 100644 --- a/s/cursors.cpp +++ b/s/cursors.cpp @@ -16,6 +16,7 @@ namespace mongo { _ntoreturn = q.ntoreturn; _totalSent = 0; + _done = false; if ( q.fields.get() ){ BSONObjBuilder b; @@ -77,7 +78,9 @@ namespace mongo { return s.simplifiedQuery(); } - bool ShardedCursor::sendNextBatch( Request& r ){ + bool ShardedCursor::sendNextBatch( Request& r , int ntoreturn ){ + uassert( "cursor already done" , ! _done ); + int maxSize = 1024 * 1024; if ( _totalSent > 0 ) maxSize *= 3; @@ -87,7 +90,7 @@ namespace mongo { int num = 0; bool sendMore = true; - cout << "TEMP: ShardedCursor " << _ns << "\t" << _query << " ntoreturn: " << _ntoreturn << endl; + cout << "TEMP: ShardedCursor " << _ns << "\t" << _query << " ntoreturn: " << ntoreturn << endl; while ( more() ){ BSONObj o = next(); cout << "\t" << o << endl; @@ -98,12 +101,12 @@ namespace mongo { if ( b.len() > maxSize ) break; - if ( num == _ntoreturn ){ + if ( num == ntoreturn ){ // soft limit aka batch size break; } - if ( ( -1 * num + _totalSent ) == _ntoreturn ){ + if ( ( -1 * num + _totalSent ) == ntoreturn ){ // hard limit - total to send sendMore = false; break; @@ -114,6 +117,7 @@ namespace mongo { log() << " hasMore:" << hasMore << " id:" << _id << endl; replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , 0 , hasMore ? _id : 0 ); _totalSent += num; + _done = ! hasMore; return hasMore; } @@ -229,4 +233,28 @@ namespace mongo { } + CursorCache::CursorCache(){ + } + + CursorCache::~CursorCache(){ + // TODO: delete old cursors? + } + + ShardedCursor* CursorCache::get( long long id ){ + map::iterator i = _cursors.find( id ); + if ( i == _cursors.end() ){ + OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl; + return 0; + } + return i->second; + } + + void CursorCache::store( ShardedCursor * cursor ){ + _cursors[cursor->getId()] = cursor; + } + void CursorCache::remove( long long id ){ + _cursors.erase( id ); + } + + CursorCache cursorCache; } diff --git a/s/cursors.h b/s/cursors.h index 36ff514c842..eb748b318aa 100644 --- a/s/cursors.h +++ b/s/cursors.h @@ -19,11 +19,14 @@ namespace mongo { virtual bool more() = 0; virtual BSONObj next() = 0; + + long long getId(){ return _id; } /** * @return whether there is more data left */ - bool sendNextBatch( Request& r ); + bool sendNextBatch( Request& r ){ return sendNextBatch( r , _ntoreturn ); } + bool sendNextBatch( Request& r , int ntoreturn ); protected: auto_ptr query( const string& server , int num = 0 , BSONObj extraFilter = emptyObj ); @@ -42,6 +45,7 @@ namespace mongo { long long _id; int _totalSent; + bool _done; }; @@ -95,4 +99,19 @@ namespace mongo { auto_ptr * _cursors; BSONObj * _nexts; }; + + class CursorCache { + public: + CursorCache(); + ~CursorCache(); + + ShardedCursor* get( long long id ); + void store( ShardedCursor* cursor ); + void remove( long long id ); + + private: + map _cursors; + }; + + extern CursorCache cursorCache; } diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index 799016284c5..cf496d963a4 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -76,12 +76,24 @@ namespace mongo { delete( cursor ); return; } - - // TODO: store cursor + cout << "STORING CURSOR : " << cursor->getId() << endl; + cursorCache.store( cursor ); } virtual void getMore( Request& r ){ - throw UserException( "shard getMore doesn't work yet" ); + int ntoreturn = r.d().pullInt(); + long long id = r.d().pullInt64(); + + ShardedCursor * cursor = cursorCache.get( id ); + uassert( "can't find cursor" , cursor ); + + cout << "GOT CURSOR : " << id << endl; + + if ( cursor->sendNextBatch( r , ntoreturn ) ) + return; + + delete( cursor ); + cursorCache.remove( id ); } virtual void writeOp( int op , Request& r ){