2009-09-21 21:55:24 +02:00
|
|
|
// mr.cpp
|
|
|
|
|
|
|
|
/**
|
2009-09-24 16:51:27 +02:00
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU Affero General Public License, version 3,
|
|
|
|
* as published by the Free Software Foundation.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU Affero General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
2009-09-21 21:55:24 +02:00
|
|
|
|
2009-10-14 20:52:01 +02:00
|
|
|
#include "stdafx.h"
|
2009-09-21 21:55:24 +02:00
|
|
|
#include "db.h"
|
|
|
|
#include "instance.h"
|
|
|
|
#include "commands.h"
|
|
|
|
#include "../scripting/engine.h"
|
2009-11-03 16:35:48 +01:00
|
|
|
#include "../client/dbclient.h"
|
|
|
|
#include "../client/connpool.h"
|
2009-11-03 17:40:00 +01:00
|
|
|
#include "../client/parallel.h"
|
2010-03-30 17:01:02 +02:00
|
|
|
#include "queryoptimizer.h"
|
|
|
|
#include "matcher.h"
|
|
|
|
#include "clientcursor.h"
|
2009-09-21 21:55:24 +02:00
|
|
|
|
|
|
|
namespace mongo {
|
2009-09-24 16:51:27 +02:00
|
|
|
|
|
|
|
namespace mr {
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2010-03-19 14:55:24 +01:00
|
|
|
typedef vector<BSONObj> BSONList;
|
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
class MyCmp {
|
|
|
|
public:
|
|
|
|
MyCmp(){}
|
|
|
|
bool operator()( const BSONObj &l, const BSONObj &r ) const {
|
|
|
|
return l.firstElement().woCompare( r.firstElement() ) < 0;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2009-10-01 01:07:01 +02:00
|
|
|
typedef pair<BSONObj,BSONObj> Data;
|
2009-10-07 18:57:45 +02:00
|
|
|
//typedef list< Data > InMemory;
|
2010-03-19 14:55:24 +01:00
|
|
|
typedef map< BSONObj,BSONList,MyCmp > InMemory;
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2010-03-19 14:55:24 +01:00
|
|
|
BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
|
2009-12-28 22:43:43 +01:00
|
|
|
uassert( 10074 , "need values" , values.size() );
|
2009-11-04 23:11:17 +01:00
|
|
|
|
|
|
|
int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128;
|
|
|
|
BSONObj key;
|
2009-10-07 18:57:45 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
BSONObjBuilder reduceArgs( sizeEstimate );
|
2010-03-14 07:18:14 +01:00
|
|
|
BSONArrayBuilder * valueBuilder = 0;
|
|
|
|
|
2010-03-19 14:55:24 +01:00
|
|
|
int sizeSoFar = 0;
|
|
|
|
unsigned n = 0;
|
|
|
|
for ( ; n<values.size(); n++ ){
|
|
|
|
BSONObjIterator j(values[n]);
|
2009-11-04 23:11:17 +01:00
|
|
|
BSONElement keyE = j.next();
|
|
|
|
if ( n == 0 ){
|
|
|
|
reduceArgs.append( keyE );
|
2010-03-14 07:18:14 +01:00
|
|
|
key = keyE.wrap();
|
|
|
|
valueBuilder = new BSONArrayBuilder( reduceArgs.subarrayStart( "values" ) );
|
2010-03-19 14:55:24 +01:00
|
|
|
sizeSoFar = 5 + keyE.size();
|
2009-11-04 23:11:17 +01:00
|
|
|
}
|
2010-03-19 14:55:24 +01:00
|
|
|
|
|
|
|
BSONElement ee = j.next();
|
|
|
|
|
|
|
|
uassert( 13070 , "value to large to reduce" , ee.size() < ( 2 * 1024 * 1024 ) );
|
|
|
|
|
|
|
|
if ( sizeSoFar + ee.size() > ( 4 * 1024 * 1024 ) ){
|
|
|
|
assert( n > 1 ); // if not, inf. loop
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
valueBuilder->append( ee );
|
|
|
|
sizeSoFar += ee.size();
|
2009-11-04 23:11:17 +01:00
|
|
|
}
|
2010-03-14 07:18:14 +01:00
|
|
|
assert(valueBuilder);
|
|
|
|
valueBuilder->done();
|
|
|
|
delete valueBuilder;
|
2009-11-04 23:11:17 +01:00
|
|
|
BSONObj args = reduceArgs.obj();
|
2010-03-14 07:18:14 +01:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
s->invokeSafe( reduce , args );
|
|
|
|
if ( s->type( "return" ) == Array ){
|
2009-12-28 22:43:43 +01:00
|
|
|
uassert( 10075 , "reduce -> multiple not supported yet",0);
|
2009-11-04 23:11:17 +01:00
|
|
|
return BSONObj();
|
|
|
|
}
|
2010-03-19 14:55:24 +01:00
|
|
|
|
2010-03-14 07:18:14 +01:00
|
|
|
int endSizeEstimate = key.objsize() + ( args.objsize() / values.size() );
|
|
|
|
|
2010-03-19 14:55:24 +01:00
|
|
|
if ( n < values.size() ){
|
|
|
|
BSONList x;
|
|
|
|
for ( ; n < values.size(); n++ ){
|
|
|
|
x.push_back( values[n] );
|
|
|
|
}
|
|
|
|
BSONObjBuilder temp( endSizeEstimate );
|
|
|
|
temp.append( key.firstElement() );
|
|
|
|
s->append( temp , "1" , "return" );
|
|
|
|
x.push_back( temp.obj() );
|
|
|
|
return reduceValues( x , s , reduce , final , finalize );
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
if ( finalize ){
|
2010-03-14 07:18:14 +01:00
|
|
|
BSONObjBuilder b(endSizeEstimate);
|
2009-11-04 23:11:17 +01:00
|
|
|
b.appendAs( key.firstElement() , "_id" );
|
|
|
|
s->append( b , "value" , "return" );
|
|
|
|
s->invokeSafe( finalize , b.obj() );
|
|
|
|
}
|
|
|
|
|
2010-03-14 07:18:14 +01:00
|
|
|
BSONObjBuilder b(endSizeEstimate);
|
2009-11-04 23:11:17 +01:00
|
|
|
b.appendAs( key.firstElement() , final ? "_id" : "0" );
|
|
|
|
s->append( b , final ? "value" : "1" , "return" );
|
|
|
|
return b.obj();
|
|
|
|
}
|
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
class MRSetup {
|
|
|
|
public:
|
|
|
|
MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){
|
|
|
|
static int jobNumber = 1;
|
|
|
|
|
|
|
|
dbname = _dbname;
|
|
|
|
ns = dbname + "." + cmdObj.firstElement().valuestr();
|
|
|
|
|
|
|
|
verbose = cmdObj["verbose"].trueValue();
|
|
|
|
keeptemp = cmdObj["keeptemp"].trueValue();
|
|
|
|
|
|
|
|
{ // setup names
|
|
|
|
stringstream ss;
|
|
|
|
if ( ! keeptemp )
|
|
|
|
ss << "tmp.";
|
|
|
|
ss << "mr." << cmdObj.firstElement().fieldName() << "_" << time(0) << "_" << jobNumber++;
|
|
|
|
tempShort = ss.str();
|
|
|
|
tempLong = dbname + "." + tempShort;
|
2009-11-04 23:11:17 +01:00
|
|
|
incLong = tempLong + "_inc";
|
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
if ( ! keeptemp && markAsTemp )
|
|
|
|
cc().addTempCollection( tempLong );
|
|
|
|
|
2010-03-15 17:29:14 +01:00
|
|
|
replicate = keeptemp;
|
|
|
|
|
|
|
|
if ( cmdObj["out"].type() == String ){
|
2009-11-04 21:25:45 +01:00
|
|
|
finalShort = cmdObj["out"].valuestr();
|
2010-03-15 17:29:14 +01:00
|
|
|
replicate = true;
|
|
|
|
}
|
2009-11-04 21:25:45 +01:00
|
|
|
else
|
|
|
|
finalShort = tempShort;
|
|
|
|
|
|
|
|
finalLong = dbname + "." + finalShort;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
{ // code
|
|
|
|
mapCode = cmdObj["map"].ascode();
|
|
|
|
reduceCode = cmdObj["reduce"].ascode();
|
|
|
|
if ( cmdObj["finalize"].type() ){
|
|
|
|
finalizeCode = cmdObj["finalize"].ascode();
|
|
|
|
}
|
2010-02-24 04:01:07 +01:00
|
|
|
checkCodeWScope( "map" , cmdObj );
|
|
|
|
checkCodeWScope( "reduce" , cmdObj );
|
|
|
|
checkCodeWScope( "finalize" , cmdObj );
|
2009-11-04 21:25:45 +01:00
|
|
|
|
2009-11-10 21:43:51 +01:00
|
|
|
if ( cmdObj["mapparams"].type() == Array ){
|
|
|
|
mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
|
|
|
|
}
|
2009-12-03 17:50:28 +01:00
|
|
|
|
|
|
|
if ( cmdObj["scope"].type() == Object ){
|
|
|
|
scopeSetup = cmdObj["scope"].embeddedObjectUserCheck();
|
2009-11-10 21:43:51 +01:00
|
|
|
}
|
2009-12-03 17:50:28 +01:00
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
{ // query options
|
|
|
|
if ( cmdObj["query"].type() == Object ){
|
|
|
|
filter = cmdObj["query"].embeddedObjectUserCheck();
|
|
|
|
q = filter;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( cmdObj["sort"].type() == Object )
|
|
|
|
q.sort( cmdObj["sort"].embeddedObjectUserCheck() );
|
2009-11-04 23:34:59 +01:00
|
|
|
|
|
|
|
if ( cmdObj["limit"].isNumber() )
|
|
|
|
limit = cmdObj["limit"].numberLong();
|
|
|
|
else
|
|
|
|
limit = 0;
|
2009-11-04 21:25:45 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2010-02-24 04:01:07 +01:00
|
|
|
void checkCodeWScope( const char * field , const BSONObj& o ){
|
|
|
|
BSONElement e = o[field];
|
|
|
|
if ( e.type() != CodeWScope )
|
|
|
|
return;
|
|
|
|
BSONObj x = e.codeWScopeObject();
|
2010-02-24 04:38:09 +01:00
|
|
|
uassert( 13035 , (string)"can't use CodeWScope with map/reduce function: " + field , x.isEmpty() );
|
2010-02-24 04:01:07 +01:00
|
|
|
}
|
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
/**
|
|
|
|
@return number objects in collection
|
|
|
|
*/
|
|
|
|
long long renameIfNeeded( DBDirectClient& db ){
|
|
|
|
if ( finalLong != tempLong ){
|
|
|
|
db.dropCollection( finalLong );
|
2009-11-08 03:00:15 +01:00
|
|
|
if ( db.count( tempLong ) ){
|
|
|
|
BSONObj info;
|
2009-12-28 22:43:43 +01:00
|
|
|
uassert( 10076 , "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
|
2009-11-08 03:00:15 +01:00
|
|
|
}
|
2009-11-04 21:25:45 +01:00
|
|
|
}
|
|
|
|
return db.count( finalLong );
|
|
|
|
}
|
|
|
|
|
|
|
|
string dbname;
|
|
|
|
string ns;
|
2009-11-04 23:34:59 +01:00
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
// options
|
|
|
|
bool verbose;
|
|
|
|
bool keeptemp;
|
2010-03-15 17:29:14 +01:00
|
|
|
bool replicate;
|
2009-11-04 21:25:45 +01:00
|
|
|
|
|
|
|
// query options
|
|
|
|
|
|
|
|
BSONObj filter;
|
|
|
|
Query q;
|
2009-11-04 23:34:59 +01:00
|
|
|
long long limit;
|
2009-11-04 21:25:45 +01:00
|
|
|
|
|
|
|
// functions
|
|
|
|
|
|
|
|
string mapCode;
|
|
|
|
string reduceCode;
|
|
|
|
string finalizeCode;
|
2009-11-10 21:43:51 +01:00
|
|
|
|
|
|
|
BSONObj mapparams;
|
2009-12-03 17:50:28 +01:00
|
|
|
BSONObj scopeSetup;
|
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
// output tables
|
2009-11-04 23:11:17 +01:00
|
|
|
string incLong;
|
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
string tempShort;
|
|
|
|
string tempLong;
|
|
|
|
|
|
|
|
string finalShort;
|
|
|
|
string finalLong;
|
2009-11-04 23:11:17 +01:00
|
|
|
|
|
|
|
}; // end MRsetup
|
2009-11-04 21:25:45 +01:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
class MRState {
|
2009-10-01 01:07:01 +02:00
|
|
|
public:
|
2009-11-04 23:11:17 +01:00
|
|
|
MRState( MRSetup& s ) : setup(s){
|
|
|
|
scope = globalScriptEngine->getPooledScope( setup.dbname );
|
|
|
|
scope->localConnect( setup.dbname.c_str() );
|
|
|
|
|
|
|
|
map = scope->createFunction( setup.mapCode.c_str() );
|
2009-12-21 16:58:20 +01:00
|
|
|
if ( ! map )
|
2009-12-28 23:06:07 +01:00
|
|
|
throw UserException( 9012, (string)"map compile failed: " + scope->getError() );
|
2009-12-21 16:58:20 +01:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
reduce = scope->createFunction( setup.reduceCode.c_str() );
|
2009-12-21 16:58:20 +01:00
|
|
|
if ( ! reduce )
|
2009-12-28 23:06:07 +01:00
|
|
|
throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() );
|
2009-12-21 16:58:20 +01:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
if ( setup.finalizeCode.size() )
|
|
|
|
finalize = scope->createFunction( setup.finalizeCode.c_str() );
|
|
|
|
else
|
|
|
|
finalize = 0;
|
|
|
|
|
2009-12-03 17:50:28 +01:00
|
|
|
if ( ! setup.scopeSetup.isEmpty() )
|
|
|
|
scope->init( &setup.scopeSetup );
|
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
db.dropCollection( setup.tempLong );
|
|
|
|
db.dropCollection( setup.incLong );
|
|
|
|
|
2009-11-09 21:21:37 +01:00
|
|
|
writelock l( setup.incLong );
|
2010-01-30 04:32:44 +01:00
|
|
|
Client::Context ctx( setup.incLong );
|
2009-11-04 23:11:17 +01:00
|
|
|
string err;
|
|
|
|
assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
|
|
|
|
|
2009-10-01 01:07:01 +02:00
|
|
|
}
|
2009-09-21 21:55:24 +02:00
|
|
|
|
2010-03-19 14:55:24 +01:00
|
|
|
void finalReduce( BSONList& values ){
|
2009-11-04 23:11:17 +01:00
|
|
|
if ( values.size() == 0 )
|
|
|
|
return;
|
|
|
|
|
|
|
|
BSONObj key = values.begin()->firstElement().wrap( "_id" );
|
|
|
|
BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
|
|
|
|
|
2009-11-09 21:21:37 +01:00
|
|
|
writelock l( setup.tempLong );
|
2010-01-30 04:32:44 +01:00
|
|
|
Client::Context ctx( setup.incLong );
|
2010-03-15 17:29:14 +01:00
|
|
|
if ( setup.replicate )
|
|
|
|
theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false );
|
|
|
|
else
|
|
|
|
theDataFileMgr.insert( setup.tempLong.c_str() , res , false );
|
2009-09-21 21:55:24 +02:00
|
|
|
}
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
MRSetup& setup;
|
|
|
|
auto_ptr<Scope> scope;
|
|
|
|
DBDirectClient db;
|
2009-09-24 16:51:27 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
ScriptingFunction map;
|
|
|
|
ScriptingFunction reduce;
|
|
|
|
ScriptingFunction finalize;
|
|
|
|
|
|
|
|
};
|
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
class MRTL {
|
|
|
|
public:
|
2009-11-04 23:11:17 +01:00
|
|
|
MRTL( MRState& state ) : _state( state ){
|
2009-10-01 01:07:01 +02:00
|
|
|
_temp = new InMemory();
|
2009-11-04 23:11:17 +01:00
|
|
|
_size = 0;
|
|
|
|
numEmits = 0;
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
|
|
|
~MRTL(){
|
|
|
|
delete _temp;
|
|
|
|
}
|
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
void reduceInMemory(){
|
|
|
|
|
2009-10-01 01:07:01 +02:00
|
|
|
InMemory * old = _temp;
|
|
|
|
InMemory * n = new InMemory();
|
2009-09-24 16:51:27 +02:00
|
|
|
_temp = n;
|
|
|
|
_size = 0;
|
|
|
|
|
2009-10-01 01:07:01 +02:00
|
|
|
for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
|
2009-09-24 16:51:27 +02:00
|
|
|
BSONObj key = i->first;
|
2010-03-19 14:55:24 +01:00
|
|
|
BSONList& all = i->second;
|
2009-09-24 16:51:27 +02:00
|
|
|
|
|
|
|
if ( all.size() == 1 ){
|
2009-11-04 23:11:17 +01:00
|
|
|
// this key has low cardinality, so just write to db
|
2009-11-09 21:21:37 +01:00
|
|
|
writelock l(_state.setup.incLong);
|
2010-03-30 17:01:02 +02:00
|
|
|
Client::Context ctx(_state.setup.incLong.c_str());
|
2009-11-04 23:11:17 +01:00
|
|
|
write( *(all.begin()) );
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
|
|
|
else if ( all.size() > 1 ){
|
2009-11-04 23:11:17 +01:00
|
|
|
BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 );
|
|
|
|
insert( res );
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
delete( old );
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
void dump(){
|
2009-11-09 21:21:37 +01:00
|
|
|
writelock l(_state.setup.incLong);
|
2010-01-30 05:06:17 +01:00
|
|
|
Client::Context ctx(_state.setup.incLong);
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-10-01 01:07:01 +02:00
|
|
|
for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
|
2010-03-19 14:55:24 +01:00
|
|
|
BSONList& all = i->second;
|
2009-10-07 18:57:45 +02:00
|
|
|
if ( all.size() < 1 )
|
|
|
|
continue;
|
|
|
|
|
2010-03-19 14:55:24 +01:00
|
|
|
for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ )
|
2009-11-04 23:11:17 +01:00
|
|
|
write( *j );
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
|
|
|
_temp->clear();
|
|
|
|
_size = 0;
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
void insert( const BSONObj& a ){
|
2010-03-19 14:55:24 +01:00
|
|
|
BSONList& all = (*_temp)[a];
|
2009-11-04 23:11:17 +01:00
|
|
|
all.push_back( a );
|
|
|
|
_size += a.objsize() + 16;
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
void checkSize(){
|
2009-11-04 23:11:17 +01:00
|
|
|
if ( _size < 1024 * 5 )
|
2009-09-24 16:51:27 +02:00
|
|
|
return;
|
|
|
|
|
|
|
|
long before = _size;
|
|
|
|
reduceInMemory();
|
|
|
|
log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
|
|
|
|
|
|
|
|
if ( _size < 1024 * 15 )
|
|
|
|
return;
|
2009-10-07 18:57:45 +02:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
dump();
|
|
|
|
log(1) << " mr: dumping to db" << endl;
|
2009-09-21 21:55:24 +02:00
|
|
|
}
|
2009-11-09 21:21:37 +01:00
|
|
|
|
|
|
|
private:
|
|
|
|
void write( BSONObj& o ){
|
|
|
|
theDataFileMgr.insert( _state.setup.incLong.c_str() , o , true );
|
|
|
|
}
|
2009-09-21 21:55:24 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
MRState& _state;
|
2009-09-24 16:51:27 +02:00
|
|
|
|
2009-10-01 01:07:01 +02:00
|
|
|
InMemory * _temp;
|
2009-09-24 16:51:27 +02:00
|
|
|
long _size;
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-11-09 21:21:37 +01:00
|
|
|
public:
|
2009-11-04 23:11:17 +01:00
|
|
|
long long numEmits;
|
2009-09-24 16:51:27 +02:00
|
|
|
};
|
2009-09-21 21:55:24 +02:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
boost::thread_specific_ptr<MRTL> _tlmr;
|
|
|
|
|
|
|
|
BSONObj fast_emit( const BSONObj& args ){
|
2010-03-19 14:55:24 +01:00
|
|
|
uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 );
|
|
|
|
uassert( 13069 , "an emit can't be more than 2mb" , args.objsize() < ( 2 * 1024 * 1024 ) );
|
2009-11-04 23:11:17 +01:00
|
|
|
_tlmr->insert( args );
|
|
|
|
_tlmr->numEmits++;
|
2009-10-07 18:57:45 +02:00
|
|
|
return BSONObj();
|
|
|
|
}
|
2009-11-03 17:40:00 +01:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
class MapReduceCommand : public Command {
|
|
|
|
public:
|
|
|
|
MapReduceCommand() : Command("mapreduce"){}
|
2010-04-23 21:50:49 +02:00
|
|
|
virtual bool slaveOk() const { return true; }
|
2009-09-24 16:51:27 +02:00
|
|
|
|
|
|
|
virtual void help( stringstream &help ) const {
|
|
|
|
help << "see http://www.mongodb.org/display/DOCS/MapReduce";
|
|
|
|
}
|
2010-04-23 21:50:49 +02:00
|
|
|
virtual LockType locktype() const { return NONE; }
|
2009-11-04 21:25:45 +01:00
|
|
|
bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
|
2009-09-24 16:51:27 +02:00
|
|
|
Timer t;
|
2009-10-23 17:13:08 +02:00
|
|
|
Client::GodScope cg;
|
2010-03-15 16:26:56 +01:00
|
|
|
Client& client = cc();
|
|
|
|
CurOp * op = client.curop();
|
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
MRSetup mr( nsToDatabase( dbname ) , cmd );
|
2009-09-22 05:47:09 +02:00
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
log(1) << "mr ns: " << mr.ns << endl;
|
2009-10-01 23:15:44 +02:00
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
if ( ! db.exists( mr.ns ) ){
|
2009-10-01 23:15:44 +02:00
|
|
|
errmsg = "ns doesn't exist";
|
|
|
|
return false;
|
|
|
|
}
|
2009-09-30 23:11:56 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
bool shouldHaveData = false;
|
|
|
|
|
2009-10-14 22:19:13 +02:00
|
|
|
long long num = 0;
|
2009-10-15 17:26:15 +02:00
|
|
|
long long inReduce = 0;
|
2009-11-04 23:11:17 +01:00
|
|
|
|
2009-10-15 17:26:15 +02:00
|
|
|
BSONObjBuilder countsBuilder;
|
|
|
|
BSONObjBuilder timingBuilder;
|
2009-09-24 16:51:27 +02:00
|
|
|
try {
|
2009-10-07 18:57:45 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
MRState state( mr );
|
|
|
|
state.scope->injectNative( "emit" , fast_emit );
|
2009-10-07 18:57:45 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
MRTL * mrtl = new MRTL( state );
|
2009-09-24 16:51:27 +02:00
|
|
|
_tlmr.reset( mrtl );
|
|
|
|
|
2010-03-15 16:26:56 +01:00
|
|
|
ProgressMeter & pm = op->setMessage( "m/r: (1/3) emit phase" , db.count( mr.ns , mr.filter ) );
|
2009-10-15 21:44:50 +02:00
|
|
|
long long mapTime = 0;
|
2010-03-30 17:01:02 +02:00
|
|
|
{
|
|
|
|
readlock lock( mr.ns );
|
|
|
|
Client::Context ctx( mr.ns );
|
2009-10-15 17:26:15 +02:00
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
auto_ptr<Cursor> temp = QueryPlanSet(mr.ns.c_str() , mr.filter , BSONObj() ).getBestGuess()->newCursor();
|
|
|
|
auto_ptr<ClientCursor> cursor( new ClientCursor( QueryOption_NoCursorTimeout , temp , mr.ns.c_str() ) );
|
|
|
|
|
|
|
|
if ( ! mr.filter.isEmpty() )
|
|
|
|
cursor->matcher.reset( new CoveredIndexMatcher( mr.filter , cursor->indexKeyPattern() ) );
|
2009-12-04 22:20:13 +01:00
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
Timer mt;
|
|
|
|
while ( cursor->ok() ){
|
|
|
|
|
|
|
|
if ( ! cursor->currentMatches() ){
|
|
|
|
cursor->advance();
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
BSONObj o = cursor->current();
|
|
|
|
cursor->advance();
|
|
|
|
|
|
|
|
if ( mr.verbose ) mt.reset();
|
|
|
|
|
|
|
|
state.scope->setThis( &o );
|
|
|
|
if ( state.scope->invoke( state.map , state.setup.mapparams , 0 , true ) )
|
|
|
|
throw UserException( 9014, (string)"map invoke failed: " + state.scope->getError() );
|
|
|
|
|
|
|
|
if ( mr.verbose ) mapTime += mt.micros();
|
|
|
|
|
|
|
|
num++;
|
|
|
|
if ( num % 100 == 0 ){
|
|
|
|
ClientCursor::YieldLock yield = cursor->yieldHold();
|
|
|
|
Timer t;
|
|
|
|
mrtl->checkSize();
|
|
|
|
inReduce += t.micros();
|
|
|
|
|
|
|
|
if ( ! yield.stillOk() ){
|
|
|
|
cursor.release();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
killCurrentOp.checkForInterrupt();
|
|
|
|
}
|
|
|
|
pm.hit();
|
|
|
|
|
|
|
|
if ( mr.limit && num >= mr.limit )
|
|
|
|
break;
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
2009-09-22 05:47:09 +02:00
|
|
|
}
|
2010-03-15 16:26:56 +01:00
|
|
|
pm.finished();
|
2009-09-24 16:51:27 +02:00
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
killCurrentOp.checkForInterrupt();
|
|
|
|
|
2010-03-16 05:09:52 +01:00
|
|
|
countsBuilder.appendNumber( "input" , num );
|
|
|
|
countsBuilder.appendNumber( "emit" , mrtl->numEmits );
|
2009-11-04 23:11:17 +01:00
|
|
|
if ( mrtl->numEmits )
|
|
|
|
shouldHaveData = true;
|
2009-10-15 17:26:15 +02:00
|
|
|
|
|
|
|
timingBuilder.append( "mapTime" , mapTime / 1000 );
|
|
|
|
timingBuilder.append( "emitLoop" , t.millis() );
|
2009-10-07 18:57:45 +02:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
// final reduce
|
2010-03-15 16:26:56 +01:00
|
|
|
op->setMessage( "m/r: (2/3) final reduce in memory" );
|
2009-09-24 16:51:27 +02:00
|
|
|
mrtl->reduceInMemory();
|
|
|
|
mrtl->dump();
|
2009-10-19 17:00:53 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
BSONObj sortKey = BSON( "0" << 1 );
|
|
|
|
db.ensureIndex( mr.incLong , sortKey );
|
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
{
|
|
|
|
writelock lock( mr.tempLong.c_str() );
|
|
|
|
Client::Context ctx( mr.tempLong.c_str() );
|
|
|
|
assert( userCreateNS( mr.tempLong.c_str() , BSONObj() , errmsg , mr.replicate ) );
|
|
|
|
}
|
2010-03-15 17:29:14 +01:00
|
|
|
|
2009-12-04 22:20:13 +01:00
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
{
|
|
|
|
readlock rl(mr.incLong.c_str());
|
|
|
|
Client::Context ctx( mr.incLong );
|
|
|
|
|
|
|
|
BSONObj prev;
|
|
|
|
BSONList all;
|
|
|
|
|
|
|
|
pm = op->setMessage( "m/r: (3/3) final reduce to collection" , db.count( mr.incLong ) );
|
2010-03-19 16:59:51 +01:00
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
auto_ptr<Cursor> temp = QueryPlanSet(mr.incLong.c_str() , BSONObj() , sortKey ).getBestGuess()->newCursor();
|
|
|
|
auto_ptr<ClientCursor> cursor( new ClientCursor( QueryOption_NoCursorTimeout , temp , mr.incLong.c_str() ) );
|
|
|
|
|
|
|
|
while ( cursor->ok() ){
|
|
|
|
BSONObj o = cursor->current().getOwned();
|
|
|
|
cursor->advance();
|
|
|
|
|
|
|
|
pm.hit();
|
|
|
|
|
|
|
|
if ( o.woSortOrder( prev , sortKey ) == 0 ){
|
|
|
|
all.push_back( o );
|
|
|
|
if ( pm.hits() % 1000 == 0 ){
|
|
|
|
if ( ! cursor->yield() ){
|
|
|
|
cursor.release();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
killCurrentOp.checkForInterrupt();
|
|
|
|
}
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
ClientCursor::YieldLock yield = cursor->yieldHold();
|
|
|
|
|
|
|
|
state.finalReduce( all );
|
|
|
|
|
|
|
|
all.clear();
|
|
|
|
prev = o;
|
2009-09-24 16:51:27 +02:00
|
|
|
all.push_back( o );
|
2010-03-30 17:01:02 +02:00
|
|
|
|
|
|
|
if ( ! yield.stillOk() ){
|
|
|
|
cursor.release();
|
|
|
|
break;
|
2010-03-19 19:00:40 +01:00
|
|
|
}
|
2010-03-30 17:01:02 +02:00
|
|
|
|
|
|
|
killCurrentOp.checkForInterrupt();
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
2010-03-30 17:01:02 +02:00
|
|
|
|
|
|
|
{
|
|
|
|
dbtemprelease tl;
|
|
|
|
state.finalReduce( all );
|
|
|
|
}
|
|
|
|
|
|
|
|
pm.finished();
|
2009-09-21 21:55:24 +02:00
|
|
|
}
|
2010-03-30 17:01:02 +02:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
_tlmr.reset( 0 );
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
|
|
|
catch ( ... ){
|
|
|
|
log() << "mr failed, removing collection" << endl;
|
2009-11-04 21:25:45 +01:00
|
|
|
db.dropCollection( mr.tempLong );
|
2009-11-13 17:08:42 +01:00
|
|
|
db.dropCollection( mr.incLong );
|
2009-09-24 16:51:27 +02:00
|
|
|
throw;
|
2009-09-21 21:55:24 +02:00
|
|
|
}
|
2009-09-30 23:11:56 +02:00
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
long long finalCount = 0;
|
|
|
|
{
|
|
|
|
dblock lock;
|
|
|
|
db.dropCollection( mr.incLong );
|
2009-09-30 23:11:56 +02:00
|
|
|
|
2010-03-30 17:01:02 +02:00
|
|
|
finalCount = mr.renameIfNeeded( db );
|
|
|
|
}
|
2009-09-30 23:11:56 +02:00
|
|
|
|
2009-10-15 17:26:15 +02:00
|
|
|
timingBuilder.append( "total" , t.millis() );
|
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
result.append( "result" , mr.finalShort );
|
2009-09-24 16:51:27 +02:00
|
|
|
result.append( "timeMillis" , t.millis() );
|
2010-03-16 05:09:52 +01:00
|
|
|
countsBuilder.appendNumber( "output" , finalCount );
|
2009-11-04 21:25:45 +01:00
|
|
|
if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() );
|
2009-10-15 17:26:15 +02:00
|
|
|
result.append( "counts" , countsBuilder.obj() );
|
2010-01-16 14:01:38 +01:00
|
|
|
|
|
|
|
if ( finalCount == 0 && shouldHaveData ){
|
|
|
|
result.append( "cmd" , cmd );
|
|
|
|
errmsg = "there were emits but no data!";
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2009-10-01 23:15:44 +02:00
|
|
|
return true;
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
DBDirectClient db;
|
2009-09-21 21:55:24 +02:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
} mapReduceCommand;
|
2009-11-03 16:35:48 +01:00
|
|
|
|
|
|
|
class MapReduceFinishCommand : public Command {
|
|
|
|
public:
|
|
|
|
MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){}
|
2010-04-23 21:50:49 +02:00
|
|
|
virtual bool slaveOk() const { return true; }
|
2010-02-26 20:38:51 +01:00
|
|
|
|
2010-04-23 21:50:49 +02:00
|
|
|
virtual LockType locktype() const { return WRITE; }
|
2009-11-03 16:35:48 +01:00
|
|
|
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
|
2010-01-30 04:32:44 +01:00
|
|
|
string dbname = cc().database()->name; // this has to come before dbtemprelease
|
|
|
|
dbtemprelease temprelease; // we don't touch the db directly
|
|
|
|
|
2009-11-03 17:40:00 +01:00
|
|
|
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
|
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false );
|
2009-11-03 17:40:00 +01:00
|
|
|
|
|
|
|
set<ServerAndQuery> servers;
|
2009-11-03 20:44:41 +01:00
|
|
|
|
|
|
|
BSONObjBuilder shardCounts;
|
|
|
|
map<string,long long> counts;
|
|
|
|
|
2009-11-03 16:35:48 +01:00
|
|
|
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
|
|
|
|
vector< auto_ptr<DBClientCursor> > shardCursors;
|
|
|
|
BSONObjIterator i( shards );
|
|
|
|
while ( i.more() ){
|
|
|
|
BSONElement e = i.next();
|
|
|
|
string shard = e.fieldName();
|
|
|
|
|
2009-11-03 17:40:00 +01:00
|
|
|
BSONObj res = e.embeddedObjectUserCheck();
|
|
|
|
|
2009-12-28 22:43:43 +01:00
|
|
|
uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
|
2009-11-03 17:40:00 +01:00
|
|
|
servers.insert( shard );
|
2009-11-03 20:44:41 +01:00
|
|
|
shardCounts.appendAs( res["counts"] , shard.c_str() );
|
|
|
|
|
|
|
|
BSONObjIterator j( res["counts"].embeddedObjectUserCheck() );
|
|
|
|
while ( j.more() ){
|
|
|
|
BSONElement temp = j.next();
|
|
|
|
counts[temp.fieldName()] += temp.numberLong();
|
|
|
|
}
|
|
|
|
|
2009-11-03 16:35:48 +01:00
|
|
|
}
|
2009-11-03 17:40:00 +01:00
|
|
|
|
|
|
|
BSONObj sortKey = BSON( "_id" << 1 );
|
|
|
|
|
|
|
|
ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
|
|
|
|
Query().sort( sortKey ) );
|
2009-11-03 16:35:48 +01:00
|
|
|
|
2009-11-03 17:40:00 +01:00
|
|
|
|
|
|
|
auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
|
2009-11-04 21:25:45 +01:00
|
|
|
ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() );
|
2009-11-04 23:11:17 +01:00
|
|
|
ScriptingFunction finalizeFunction = 0;
|
|
|
|
if ( mr.finalizeCode.size() )
|
|
|
|
finalizeFunction = s->createFunction( mr.finalizeCode.c_str() );
|
2009-11-03 17:40:00 +01:00
|
|
|
|
2010-03-19 14:55:24 +01:00
|
|
|
BSONList values;
|
2009-11-03 17:40:00 +01:00
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
result.append( "result" , mr.finalShort );
|
2009-11-03 16:35:48 +01:00
|
|
|
|
2009-11-03 17:40:00 +01:00
|
|
|
DBDirectClient db;
|
|
|
|
|
|
|
|
while ( cursor.more() ){
|
2010-01-27 02:59:12 +01:00
|
|
|
BSONObj t = cursor.next().getOwned();
|
2009-11-03 17:40:00 +01:00
|
|
|
|
|
|
|
if ( values.size() == 0 ){
|
|
|
|
values.push_back( t );
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){
|
|
|
|
values.push_back( t );
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2010-01-16 14:01:38 +01:00
|
|
|
|
2009-11-04 23:11:17 +01:00
|
|
|
db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
|
2009-11-03 17:40:00 +01:00
|
|
|
values.clear();
|
|
|
|
values.push_back( t );
|
|
|
|
}
|
|
|
|
|
|
|
|
if ( values.size() )
|
2009-11-04 23:11:17 +01:00
|
|
|
db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
|
2009-11-03 17:40:00 +01:00
|
|
|
|
2009-11-04 21:25:45 +01:00
|
|
|
long long finalCount = mr.renameIfNeeded( db );
|
|
|
|
log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl;
|
|
|
|
|
2009-11-03 18:17:56 +01:00
|
|
|
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){
|
|
|
|
ScopedDbConnection conn( i->_server );
|
|
|
|
conn->dropCollection( dbname + "." + shardedOutputCollection );
|
|
|
|
}
|
2009-11-03 17:40:00 +01:00
|
|
|
|
2009-11-03 20:44:41 +01:00
|
|
|
result.append( "shardCounts" , shardCounts.obj() );
|
|
|
|
|
|
|
|
{
|
|
|
|
BSONObjBuilder c;
|
|
|
|
for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ){
|
|
|
|
c.append( i->first , i->second );
|
|
|
|
}
|
|
|
|
result.append( "counts" , c.obj() );
|
|
|
|
}
|
|
|
|
|
2009-11-03 17:40:00 +01:00
|
|
|
return 1;
|
2009-11-03 16:35:48 +01:00
|
|
|
}
|
|
|
|
} mapReduceFinishCommand;
|
2009-09-21 21:55:24 +02:00
|
|
|
|
2009-09-24 16:51:27 +02:00
|
|
|
}
|
2009-09-21 21:55:24 +02:00
|
|
|
|
|
|
|
}
|
|
|
|
|