0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 01:21:03 +01:00
mongodb/db/mr.cpp

511 lines
19 KiB
C++
Raw Normal View History

2009-09-21 21:55:24 +02:00
// mr.cpp
/**
2009-09-24 16:51:27 +02:00
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2009-09-21 21:55:24 +02:00
#include "stdafx.h"
2009-09-21 21:55:24 +02:00
#include "db.h"
#include "instance.h"
#include "commands.h"
#include "../scripting/engine.h"
2009-11-03 16:35:48 +01:00
#include "../client/dbclient.h"
#include "../client/connpool.h"
#include "../client/parallel.h"
2009-09-21 21:55:24 +02:00
namespace mongo {
2009-09-24 16:51:27 +02:00
namespace mr {
2009-10-07 18:57:45 +02:00
typedef pair<BSONObj,BSONObj> Data;
2009-10-07 18:57:45 +02:00
//typedef list< Data > InMemory;
typedef map< BSONObj,list<BSONObj>,BSONObjCmp > InMemory;
typedef map< BSONObj,int,BSONObjCmp > KeyNums;
class MyCmp {
public:
MyCmp(){}
bool operator()( const Data &l, const Data &r ) const {
return l.first.woCompare( r.first ) < 0;
}
};
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
BSONObj reduceValues( list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){
uassert( "need values" , values.size() );
2009-09-21 21:55:24 +02:00
BSONObj key;
BSONObjBuilder reduceArgs;
2009-09-24 16:51:27 +02:00
2009-09-21 21:55:24 +02:00
BSONObjBuilder valueBuilder;
int n = 0;
for ( list<BSONObj>::iterator i=values.begin(); i!=values.end(); i++){
BSONObj o = *i;
if ( n == 0 ){
reduceArgs.append( o["_id"] );
2009-09-21 21:55:24 +02:00
BSONObjBuilder temp;
temp.append( o["_id"] );
2009-09-21 21:55:24 +02:00
key = temp.obj();
}
valueBuilder.appendAs( o["value"] , BSONObjBuilder::numStr( n++ ).c_str() );
}
2009-09-24 16:51:27 +02:00
2009-09-21 21:55:24 +02:00
reduceArgs.appendArray( "values" , valueBuilder.obj() );
BSONObj args = reduceArgs.obj();
2009-09-24 16:51:27 +02:00
2009-09-21 21:55:24 +02:00
s->invokeSafe( reduce , args );
if ( s->type( "return" ) == Array ){
uassert("reduce -> multiple not supported yet",0);
2009-09-24 16:51:27 +02:00
return BSONObj();
2009-09-21 21:55:24 +02:00
}
2009-09-24 16:51:27 +02:00
BSONObjBuilder b;
b.append( key["_id"] );
2009-09-24 16:51:27 +02:00
s->append( b , "value" , "return" );
return b.obj();
}
class MRTL {
public:
MRTL( DBDirectClient * db , string coll , Scope * s , ScriptingFunction reduce ) :
_db( db ) , _coll( coll ) , _scope( s ) , _reduce( reduce ) , _size(0){
_temp = new InMemory();
2009-09-24 16:51:27 +02:00
}
~MRTL(){
delete _temp;
}
void reduceInMemory(){
BSONObj prevKey;
InMemory * old = _temp;
InMemory * n = new InMemory();
2009-09-24 16:51:27 +02:00
_temp = n;
_size = 0;
for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
2009-09-24 16:51:27 +02:00
BSONObj key = i->first;
2009-10-07 18:57:45 +02:00
list<BSONObj>& all = i->second;
2009-09-24 16:51:27 +02:00
if ( all.size() == 1 ){
2009-10-07 18:57:45 +02:00
insert( key , *(all.begin()) );
2009-09-24 16:51:27 +02:00
}
else if ( all.size() > 1 ){
BSONObj res = reduceValues( all , _scope , _reduce );
2009-10-07 18:57:45 +02:00
insert( key , res );
2009-09-24 16:51:27 +02:00
}
}
delete( old );
}
void dump(){
for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
2009-09-24 16:51:27 +02:00
BSONObj key = i->first;
2009-10-07 18:57:45 +02:00
list<BSONObj>& all = i->second;
if ( all.size() < 1 )
continue;
assert( all.size() == 1 );
BSONObj value = *(all.begin());
2009-09-24 16:51:27 +02:00
BSONObjBuilder b;
b.appendElements( value );
BSONObj o = b.obj();
_db->insert( _coll , o );
}
_temp->clear();
_size = 0;
}
void insert( const BSONObj& key , const BSONObj& value ){
2009-10-07 18:57:45 +02:00
list<BSONObj>& all = (*_temp)[key];
all.push_back( value );
2009-09-24 16:51:27 +02:00
_size += key.objsize() + value.objsize() + 32;
}
2009-10-07 18:57:45 +02:00
2009-09-24 16:51:27 +02:00
void checkSize(){
if ( _size < 1024 * 10 )
2009-09-24 16:51:27 +02:00
return;
long before = _size;
reduceInMemory();
log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
if ( _size < 1024 * 15 )
return;
2009-10-07 18:57:45 +02:00
2009-09-24 16:51:27 +02:00
dump();
log(1) << " mr: dumping to db" << endl;
2009-09-21 21:55:24 +02:00
}
2009-10-07 18:57:45 +02:00
int getNum( const BSONObj& key ){
KeyNums::iterator i = _nums.find( key );
if ( i != _nums.end() )
return i->second;
int n = _nums.size() + 1;
_nums[key] = n;
return n;
}
void resetNum(){
_nums.clear();
}
2009-09-24 16:51:27 +02:00
private:
DBDirectClient * _db;
string _coll;
Scope * _scope;
ScriptingFunction _reduce;
InMemory * _temp;
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
long _size;
2009-10-07 18:57:45 +02:00
map<BSONObj,int,BSONObjCmp> _nums;
2009-09-24 16:51:27 +02:00
};
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
boost::thread_specific_ptr<MRTL> _tlmr;
BSONObj fast_emit( const BSONObj& args ){
BSONObj key,value;
BSONObjIterator i( args );
{
assert( i.more() );
BSONObjBuilder b;
b.appendAs( i.next() , "_id" );
2009-09-24 16:51:27 +02:00
key = b.obj();
}
{
assert( i.more() );
BSONObjBuilder b;
b.append( key.firstElement() );
b.appendAs( i.next() , "value" );
value = b.obj();
}
assert( ! i.more() );
_tlmr->insert( key , value );
return BSON( "x" << 1 );
2009-09-21 21:55:24 +02:00
}
2009-10-07 18:57:45 +02:00
BSONObj get_num( const BSONObj& args ){
return BSON( "" << _tlmr->getNum( args ) );
}
BSONObj reset_num( const BSONObj& args ){
_tlmr->resetNum();
return BSONObj();
}
string tempCollectionName( const BSONObj& cmd ){
static int inc = 1;
stringstream ss;
ss << cc().database()->name << ".";
if ( ! cmd["keeptemp"].trueValue() )
ss << "tmp.";
ss << "mr." << cmd.firstElement().fieldName() << "_" << time(0) << "_" << inc++;
return ss.str();
}
2009-09-24 16:51:27 +02:00
class MapReduceCommand : public Command {
public:
MapReduceCommand() : Command("mapreduce"){}
virtual bool slaveOk() { return true; }
virtual void help( stringstream &help ) const {
help << "see http://www.mongodb.org/display/DOCS/MapReduce";
}
void doReduce( const string& resultColl , list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){
if ( values.size() == 0 )
return;
BSONObj res = reduceValues( values , s , reduce );
2009-10-07 18:57:45 +02:00
db.update( resultColl , res.extractFields( BSON( "_id" << 1 ) ) , res , true );
2009-09-24 16:51:27 +02:00
}
void finalReduce( const string& resultColl , list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){
if ( values.size() == 0 )
return;
BSONObj key = values.begin()->extractFields( BSON( "_id" << 1 ) );
2009-09-24 16:51:27 +02:00
if ( values.size() == 1 ){
assert( db.count( resultColl , key ) == 1 );
return;
}
db.remove( resultColl , key );
doReduce( resultColl , values , s , reduce );
}
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
bool run(const char *dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
Timer t;
2009-10-23 17:13:08 +02:00
Client::GodScope cg;
2009-10-15 17:26:15 +02:00
bool verboseOutput = cmdObj["verbose"].trueValue();
2009-09-22 05:47:09 +02:00
string ns = cc().database()->name + '.' + cmdObj.firstElement().valuestr();
2009-09-24 16:51:27 +02:00
log(1) << "mr ns: " << ns << endl;
if ( ! db.exists( ns ) ){
errmsg = "ns doesn't exist";
return false;
}
2009-09-24 16:51:27 +02:00
auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
s->localConnect( cc().database()->name.c_str() );
2009-09-30 23:11:56 +02:00
bool istemp = ! cmdObj["keeptemp"].trueValue();
string resultColl = tempCollectionName( cmdObj );
if ( istemp )
currentClient->addTempCollection( resultColl );
2009-09-30 23:11:56 +02:00
string finalOutput = resultColl;
2009-09-30 05:37:13 +02:00
if ( cmdObj["out"].type() == String )
finalOutput = cc().database()->name + "." + cmdObj["out"].valuestr();
2009-09-30 23:11:56 +02:00
string resultCollShort = resultColl.substr( cc().database()->name.size() + 1 );
string finalOutputShort = finalOutput.substr( cc().database()->name.size() + 1 );
2009-09-24 16:51:27 +02:00
log(1) << "\t resultColl: " << resultColl << " short: " << resultCollShort << endl;
db.dropCollection( resultColl );
2009-09-21 21:55:24 +02:00
long long num = 0;
2009-10-15 17:26:15 +02:00
long long inReduce = 0;
2009-10-23 05:48:12 +02:00
long long numEmits = 0;
2009-10-15 17:26:15 +02:00
BSONObjBuilder countsBuilder;
BSONObjBuilder timingBuilder;
2009-09-24 16:51:27 +02:00
try {
2009-09-30 23:11:56 +02:00
dbtemprelease temprlease;
2009-10-07 18:57:45 +02:00
2009-10-23 05:48:12 +02:00
s->execSetup( (string)"tempcoll = db[\"" + resultCollShort + "\"]; db.getMongo().setSlaveOk();" , "tempcoll1" );
2009-10-07 18:57:45 +02:00
s->execSetup( "MR.init()" );
2009-09-24 16:51:27 +02:00
2009-10-07 18:57:45 +02:00
s->injectNative( "get_num" , get_num );
s->injectNative( "reset_num" , reset_num );
2009-09-24 16:51:27 +02:00
ScriptingFunction mapFunction = s->createFunction( cmdObj["map"].ascode().c_str() );
ScriptingFunction reduceFunction = s->createFunction( cmdObj["reduce"].ascode().c_str() );
2009-10-07 18:57:45 +02:00
s->execSetup( (string)"$reduce = " + cmdObj["reduce"].ascode() );
2009-09-24 16:51:27 +02:00
MRTL * mrtl = new MRTL( &db , resultColl , s.get() , reduceFunction );
_tlmr.reset( mrtl );
2009-10-07 18:57:45 +02:00
Query q;
BSONObj filter;
if ( cmdObj["query"].type() == Object ){
filter = cmdObj["query"].embeddedObjectUserCheck();
q = filter;
}
if ( cmdObj["sort"].type() == Object )
q.sort( cmdObj["sort"].embeddedObjectUserCheck() );
2009-09-29 18:59:16 +02:00
2009-10-07 18:57:45 +02:00
ProgressMeter pm( db.count( ns , filter ) );
2009-09-24 16:51:27 +02:00
auto_ptr<DBClientCursor> cursor = db.query( ns , q );
2009-10-15 21:44:50 +02:00
long long mapTime = 0;
2009-10-15 17:26:15 +02:00
Timer mt;
2009-09-24 16:51:27 +02:00
while ( cursor->more() ){
BSONObj o = cursor->next();
2009-09-21 21:55:24 +02:00
2009-10-15 17:26:15 +02:00
if ( verboseOutput ) mt.reset();
s->setThis( &o );
2009-09-24 16:51:27 +02:00
if ( s->invoke( mapFunction , BSONObj() , 0 , true ) )
throw UserException( (string)"map invoke failed: " + s->getError() );
2009-10-15 17:26:15 +02:00
if ( verboseOutput ) mapTime += mt.micros();
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
num++;
if ( num % 100 == 0 ){
2009-10-15 17:26:15 +02:00
Timer t;
2009-10-07 18:57:45 +02:00
s->exec( "MR.check();" , "reduce-i" , false , true , true );
2009-10-15 17:26:15 +02:00
inReduce += t.micros();
2009-10-07 18:57:45 +02:00
//mrtl->checkSize();
2009-09-24 16:51:27 +02:00
}
2009-10-07 18:57:45 +02:00
pm.hit();
2009-09-22 05:47:09 +02:00
}
2009-09-24 16:51:27 +02:00
2009-10-15 17:26:15 +02:00
countsBuilder.append( "input" , num );
2009-10-24 02:59:38 +02:00
numEmits = (long long)(s->getNumber( "$numEmits" ));
2009-10-23 05:48:12 +02:00
countsBuilder.append( "emit" , numEmits );
2009-10-15 17:26:15 +02:00
timingBuilder.append( "mapTime" , mapTime / 1000 );
timingBuilder.append( "emitLoop" , t.millis() );
2009-10-07 18:57:45 +02:00
2009-09-24 16:51:27 +02:00
// final reduce
2009-10-15 17:26:15 +02:00
2009-09-24 16:51:27 +02:00
mrtl->reduceInMemory();
mrtl->dump();
2009-10-15 17:26:15 +02:00
{
Timer t;
s->exec( "MR.doReduce(true)" , "reduce" , false , true , true );
inReduce += t.micros();
timingBuilder.append( "reduce" , inReduce / 1000 );
}
if ( verboseOutput ){
countsBuilder.append( "reduces" , s->getNumber( "$numReduces" ) );
countsBuilder.append( "reducesToDB" , s->getNumber( "$numReducesToDB" ) );
}
if ( cmdObj["finalize"].type() ){
s->execSetup( (string)"$finalize = " + cmdObj["finalize"].ascode() );
s->execSetup( "MR.finalize()" );
}
2009-10-07 18:57:45 +02:00
s->execSetup( "MR.cleanup()" );
_tlmr.reset( 0 );
/*
2009-09-24 16:51:27 +02:00
BSONObj prev;
list<BSONObj> all;
BSONObj sortKey = BSON( "_id" << 1 );
2009-09-21 21:55:24 +02:00
cursor = db.query( resultColl, Query().sort( sortKey ) );
2009-09-24 16:51:27 +02:00
while ( cursor->more() ){
BSONObj o = cursor->next().getOwned();
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
if ( o.woSortOrder( prev , sortKey ) == 0 ){
all.push_back( o );
continue;
}
finalReduce( resultColl , all , s.get() , reduceFunction );
all.clear();
prev = o;
2009-09-21 21:55:24 +02:00
all.push_back( o );
}
2009-10-07 18:57:45 +02:00
2009-09-24 16:51:27 +02:00
finalReduce( resultColl , all , s.get() , reduceFunction );
2009-10-07 18:57:45 +02:00
*/
2009-09-24 16:51:27 +02:00
}
catch ( ... ){
log() << "mr failed, removing collection" << endl;
db.dropCollection( resultColl );
throw;
2009-09-21 21:55:24 +02:00
}
2009-09-30 23:11:56 +02:00
if ( finalOutput != resultColl ){
// need to do this with the full dblock, that's why its after the try/catch
db.dropCollection( finalOutput );
BSONObj info;
uassert( "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << resultColl << "to" << finalOutput ) , info ) );
}
2009-10-23 05:48:12 +02:00
if ( db.count( finalOutput ) == 0 && numEmits > 0 ){
errmsg = "there were emits but no data!";
return false;
}
2009-10-15 17:26:15 +02:00
timingBuilder.append( "total" , t.millis() );
2009-09-30 23:11:56 +02:00
result.append( "result" , finalOutputShort );
2009-09-24 16:51:27 +02:00
result.append( "timeMillis" , t.millis() );
2009-10-15 17:26:15 +02:00
countsBuilder.append( "output" , (long long)(db.count( finalOutput )) );
if ( verboseOutput ) result.append( "timing" , timingBuilder.obj() );
result.append( "counts" , countsBuilder.obj() );
return true;
2009-09-24 16:51:27 +02:00
}
private:
DBDirectClient db;
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
} mapReduceCommand;
2009-11-03 16:35:48 +01:00
class MapReduceFinishCommand : public Command {
public:
MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){}
virtual bool slaveOk() { return true; }
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
dbtemprelease temprlease; // we don't touch the db directly
2009-11-03 16:35:48 +01:00
string dbname = cc().database()->name;
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
2009-11-03 16:35:48 +01:00
BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck();
set<ServerAndQuery> servers;
2009-11-03 16:35:48 +01:00
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
vector< auto_ptr<DBClientCursor> > shardCursors;
BSONObjIterator i( shards );
while ( i.more() ){
BSONElement e = i.next();
string shard = e.fieldName();
BSONObj res = e.embeddedObjectUserCheck();
uassert( "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
servers.insert( shard );
2009-11-03 16:35:48 +01:00
}
BSONObj sortKey = BSON( "_id" << 1 );
ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
Query().sort( sortKey ) );
2009-11-03 16:35:48 +01:00
auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
ScriptingFunction reduceFunction = s->createFunction( origCmd["reduce"].ascode().c_str() );
list<BSONObj> values;
string output = origCmd["out"].valuestrsafe();
if ( output.size() == 0 )
output = tempCollectionName( origCmd );
string fulloutput = dbname + "." + output;
result.append( "result" , output );
2009-11-03 16:35:48 +01:00
DBDirectClient db;
while ( cursor.more() ){
BSONObj t = cursor.next();
if ( values.size() == 0 ){
values.push_back( t );
continue;
}
if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){
values.push_back( t );
continue;
}
db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
values.clear();
values.push_back( t );
}
if ( values.size() )
db.insert( fulloutput , reduceValues( values , s.get() , reduceFunction ) );
return 1;
2009-11-03 16:35:48 +01:00
}
} mapReduceFinishCommand;
2009-09-21 21:55:24 +02:00
2009-09-24 16:51:27 +02:00
}
2009-09-21 21:55:24 +02:00
}