// parallel.cpp /* * Copyright 2010 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "stdafx.h" #include "parallel.h" #include "connpool.h" #include "../db/queryutil.h" #include "../db/dbmessage.h" #include "../s/util.h" namespace mongo { // -------- ClusteredCursor ----------- ClusteredCursor::ClusteredCursor( QueryMessage& q ){ _ns = q.ns; _query = q.query.copy(); _options = q.queryOptions; _fields = q.fields; _done = false; } ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){ _ns = ns; _query = q.getOwned(); _options = options; _fields = fields.getOwned(); _done = false; } ClusteredCursor::~ClusteredCursor(){ _done = true; // just in case } auto_ptr ClusteredCursor::query( const string& server , int num , BSONObj extra ){ uassert( 10017 , "cursor already done" , ! _done ); BSONObj q = _query; if ( ! extra.isEmpty() ){ q = concatQuery( q , extra ); } ScopedDbConnection conn( server ); checkShardVersion( conn.conn() , _ns ); if ( logLevel >= 5 ){ log(5) << "ClusteredCursor::query (" << type() << ") server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl; } auto_ptr cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options ); if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) ) throw StaleConfigException( _ns , "ClusteredCursor::query" ); conn.done(); return cursor; } BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ if ( ! query.hasField( "query" ) ) return _concatFilter( query , extraFilter ); BSONObjBuilder b; BSONObjIterator i( query ); while ( i.more() ){ BSONElement e = i.next(); if ( strcmp( e.fieldName() , "query" ) ){ b.append( e ); continue; } b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) ); } return b.obj(); } BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){ BSONObjBuilder b; b.appendElements( filter ); b.appendElements( extra ); return b.obj(); // TODO: should do some simplification here if possibl ideally } // -------- FilteringClientCursor ----------- FilteringClientCursor::FilteringClientCursor( const BSONObj filter ) : _matcher( filter ) , _done( false ){ } FilteringClientCursor::FilteringClientCursor( auto_ptr cursor , const BSONObj filter ) : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ){ } FilteringClientCursor::~FilteringClientCursor(){ } void FilteringClientCursor::reset( auto_ptr cursor ){ _cursor = cursor; _next = BSONObj(); _done = _cursor.get() == 0; } bool FilteringClientCursor::more(){ if ( ! _next.isEmpty() ) return true; if ( _done ) return false; _advance(); return ! _next.isEmpty(); } BSONObj FilteringClientCursor::next(){ assert( ! _next.isEmpty() ); assert( ! _done ); BSONObj ret = _next; _next = BSONObj(); _advance(); return ret; } BSONObj FilteringClientCursor::peek(){ if ( _next.isEmpty() ) _advance(); return _next; } void FilteringClientCursor::_advance(){ assert( _next.isEmpty() ); if ( ! _cursor.get() || _done ) return; while ( _cursor->more() ){ _next = _cursor->next(); if ( _matcher.matches( _next ) ){ if ( ! _cursor->moreInCurrentBatch() ) _next = _next.getOwned(); return; } _next = BSONObj(); } _done = true; } // -------- SerialServerClusteredCursor ----------- SerialServerClusteredCursor::SerialServerClusteredCursor( const set& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){ for ( set::const_iterator i = servers.begin(); i!=servers.end(); i++ ) _servers.push_back( *i ); if ( sortOrder > 0 ) sort( _servers.begin() , _servers.end() ); else if ( sortOrder < 0 ) sort( _servers.rbegin() , _servers.rend() ); _serverIndex = 0; _needToSkip = q.ntoskip; } bool SerialServerClusteredCursor::more(){ // TODO: optimize this by sending on first query and then back counting // tricky in case where 1st server doesn't have any after // need it to send n skipped while ( _needToSkip > 0 && _current.more() ){ _current.next(); _needToSkip--; } if ( _current.more() ) return true; if ( _serverIndex >= _servers.size() ){ return false; } ServerAndQuery& sq = _servers[_serverIndex++]; _current.reset( query( sq._server , 0 , sq._extra ) ); return more(); } BSONObj SerialServerClusteredCursor::next(){ uassert( 10018 , "no more items" , more() ); return _current.next(); } // -------- ParallelSortClusteredCursor ----------- ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set& servers , QueryMessage& q , const BSONObj& sortKey ) : ClusteredCursor( q ) , _servers( servers ){ _sortKey = sortKey.getOwned(); _needToSkip = q.ntoskip; _init(); } ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set& servers , const string& ns , const Query& q , int options , const BSONObj& fields ) : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){ _sortKey = q.getSort().copy(); _needToSkip = 0; _init(); } void ParallelSortClusteredCursor::_init(){ _numServers = _servers.size(); _cursors = new FilteringClientCursor[_numServers]; // TODO: parellize int num = 0; for ( set::iterator i = _servers.begin(); i!=_servers.end(); i++ ){ const ServerAndQuery& sq = *i; _cursors[num++].reset( query( sq._server , 0 , sq._extra ) ); } } ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){ delete [] _cursors; } bool ParallelSortClusteredCursor::more(){ if ( _needToSkip > 0 ){ int n = _needToSkip; _needToSkip = 0; while ( n > 0 && more() ){ next(); n--; } } for ( int i=0; i<_numServers; i++ ){ if ( _cursors[i].more() ) return true; } return false; } BSONObj ParallelSortClusteredCursor::next(){ BSONObj best = BSONObj(); int bestFrom = -1; for ( int i=0; i<_numServers; i++){ if ( ! _cursors[i].more() ) continue; BSONObj me = _cursors[i].peek(); if ( best.isEmpty() ){ best = me; bestFrom = i; continue; } int comp = best.woSortOrder( me , _sortKey ); if ( comp < 0 ) continue; best = me; bestFrom = i; } uassert( 10019 , "no more elements" , ! best.isEmpty() ); _cursors[bestFrom].next(); return best; } // ----------------- // ---- Future ----- // ----------------- Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd ){ _server = server; _db = db; _cmd = cmd; _done = false; } bool Future::CommandResult::join(){ while ( ! _done ) sleepmicros( 50 ); return _ok; } void Future::commandThread(){ assert( _grab ); shared_ptr res = *_grab; _grab = 0; ScopedDbConnection conn( res->_server ); res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); res->_done = true; } shared_ptr Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){ shared_ptr res; res.reset( new Future::CommandResult( server , db , cmd ) ); _grab = &res; boost::thread thr( Future::commandThread ); while ( _grab ) sleepmicros(2); return res; } shared_ptr * Future::_grab; }