mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
fix cursor generation in mongos and clean up killCursor logging
This commit is contained in:
parent
cf576485f4
commit
1b194a97c5
@ -380,7 +380,7 @@ namespace mongo {
|
||||
int n = *x++;
|
||||
uassert( 13004 , "sent 0 cursors to kill" , n >= 1 );
|
||||
if ( n > 2000 ) {
|
||||
problem() << "Assertion failure, receivedKillCursors, n=" << n << endl;
|
||||
log( n < 30000 ? LL_WARNING : LL_ERROR ) << "receivedKillCursors, n=" << n << endl;
|
||||
assert( n < 30000 );
|
||||
}
|
||||
killCursors(n, (long long *) x);
|
||||
|
@ -36,11 +36,7 @@ namespace mongo {
|
||||
_totalSent = 0;
|
||||
_done = false;
|
||||
|
||||
do {
|
||||
// TODO: only create _id when needed
|
||||
_id = security.getNonce();
|
||||
} while ( _id == 0 );
|
||||
|
||||
_id = 0;
|
||||
}
|
||||
|
||||
ShardedClientCursor::~ShardedClientCursor(){
|
||||
@ -49,6 +45,14 @@ namespace mongo {
|
||||
_cursor = 0;
|
||||
}
|
||||
|
||||
long long ShardedClientCursor::getId(){
|
||||
if ( _id <= 0 ){
|
||||
_id = cursorCache.genId();
|
||||
assert( _id >= 0 );
|
||||
}
|
||||
return _id;
|
||||
}
|
||||
|
||||
bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){
|
||||
uassert( 10191 , "cursor already done" , ! _done );
|
||||
|
||||
@ -81,12 +85,17 @@ namespace mongo {
|
||||
sendMore = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if ( ntoreturn == 0 && _totalSent == 0 && num > 100 ){
|
||||
// first batch should be max 100 unless batch size specified
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool hasMore = sendMore && _cursor->more();
|
||||
log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << _id << " totalSent: " << _totalSent << endl;
|
||||
log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << getId() << " totalSent: " << _totalSent << endl;
|
||||
|
||||
replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? _id : 0 );
|
||||
replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? getId() : 0 );
|
||||
_totalSent += num;
|
||||
_done = ! hasMore;
|
||||
|
||||
@ -95,7 +104,7 @@ namespace mongo {
|
||||
|
||||
|
||||
CursorCache::CursorCache()
|
||||
:_mutex( "CursorCache" ){
|
||||
:_mutex( "CursorCache" ), _shardedTotal(0){
|
||||
}
|
||||
|
||||
CursorCache::~CursorCache(){
|
||||
@ -120,31 +129,65 @@ namespace mongo {
|
||||
}
|
||||
|
||||
void CursorCache::store( ShardedClientCursorPtr cursor ){
|
||||
assert( cursor->getId() );
|
||||
scoped_lock lk( _mutex );
|
||||
_cursors[cursor->getId()] = cursor;
|
||||
_shardedTotal++;
|
||||
}
|
||||
void CursorCache::remove( long long id ){
|
||||
assert( id );
|
||||
scoped_lock lk( _mutex );
|
||||
_cursors.erase( id );
|
||||
}
|
||||
|
||||
void CursorCache::storeRef( const string& server , long long id ){
|
||||
assert( id );
|
||||
scoped_lock lk( _mutex );
|
||||
_refs[id] = server;
|
||||
}
|
||||
|
||||
long long CursorCache::genId(){
|
||||
while ( true ){
|
||||
long long x = security.getNonce();
|
||||
if ( x == 0 )
|
||||
continue;
|
||||
if ( x < 0 )
|
||||
x *= -1;
|
||||
|
||||
scoped_lock lk( _mutex );
|
||||
MapSharded::iterator i = _cursors.find( x );
|
||||
if ( i != _cursors.end() )
|
||||
continue;
|
||||
|
||||
MapNormal::iterator j = _refs.find( x );
|
||||
if ( j != _refs.end() )
|
||||
continue;
|
||||
|
||||
return x;
|
||||
}
|
||||
}
|
||||
|
||||
void CursorCache::gotKillCursors(Message& m ){
|
||||
int *x = (int *) m.singleData()->_data;
|
||||
x++; // reserved
|
||||
int n = *x++;
|
||||
|
||||
|
||||
if ( n > 2000 ){
|
||||
log( n < 30000 ? LL_WARNING : LL_ERROR ) << "receivedKillCursors, n=" << n << endl;
|
||||
}
|
||||
|
||||
|
||||
uassert( 13286 , "sent 0 cursors to kill" , n >= 1 );
|
||||
uassert( 13287 , "too many cursors to kill" , n < 10000 );
|
||||
uassert( 13287 , "too many cursors to kill" , n < 30000 );
|
||||
|
||||
long long * cursors = (long long *)x;
|
||||
for ( int i=0; i<n; i++ ){
|
||||
long long id = cursors[i];
|
||||
|
||||
if ( ! id ){
|
||||
log( LL_WARNING ) << " got cursor id of 0 to kill" << endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
string server;
|
||||
{
|
||||
scoped_lock lk( _mutex );
|
||||
@ -157,7 +200,7 @@ namespace mongo {
|
||||
|
||||
MapNormal::iterator j = _refs.find( id );
|
||||
if ( j == _refs.end() ){
|
||||
log() << "can't find cursor: " << id << endl;
|
||||
log( LL_WARNING ) << "can't find cursor: " << id << endl;
|
||||
continue;
|
||||
}
|
||||
server = j->second;
|
||||
@ -174,6 +217,7 @@ namespace mongo {
|
||||
void CursorCache::appendInfo( BSONObjBuilder& result ){
|
||||
scoped_lock lk( _mutex );
|
||||
result.append( "sharded" , (int)_cursors.size() );
|
||||
result.appendNumber( "shardedEver" , _shardedTotal );
|
||||
result.append( "refs" , (int)_refs.size() );
|
||||
result.append( "totalOpen" , (int)(_cursors.size() + _refs.size() ) );
|
||||
}
|
||||
|
@ -29,12 +29,12 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class ShardedClientCursor {
|
||||
class ShardedClientCursor : boost::noncopyable {
|
||||
public:
|
||||
ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor );
|
||||
virtual ~ShardedClientCursor();
|
||||
|
||||
long long getId(){ return _id; }
|
||||
long long getId();
|
||||
|
||||
/**
|
||||
* @return whether there is more data left
|
||||
@ -76,10 +76,14 @@ namespace mongo {
|
||||
|
||||
void appendInfo( BSONObjBuilder& result );
|
||||
|
||||
long long genId();
|
||||
private:
|
||||
mutex _mutex;
|
||||
|
||||
MapSharded _cursors;
|
||||
MapNormal _refs;
|
||||
|
||||
long long _shardedTotal;
|
||||
};
|
||||
|
||||
extern CursorCache cursorCache;
|
||||
|
Loading…
Reference in New Issue
Block a user