mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-30 09:06:21 +01:00
faster map/reduce impl
This commit is contained in:
parent
cfb8512886
commit
a517b84d7f
114
db/mr.cpp
114
db/mr.cpp
@ -24,9 +24,12 @@
|
||||
namespace mongo {
|
||||
|
||||
namespace mr {
|
||||
|
||||
|
||||
typedef pair<BSONObj,BSONObj> Data;
|
||||
typedef list< Data > InMemory;
|
||||
//typedef list< Data > InMemory;
|
||||
typedef map< BSONObj,list<BSONObj>,BSONObjCmp > InMemory;
|
||||
typedef map< BSONObj,int,BSONObjCmp > KeyNums;
|
||||
|
||||
|
||||
class MyCmp {
|
||||
public:
|
||||
@ -83,10 +86,7 @@ namespace mongo {
|
||||
|
||||
void reduceInMemory(){
|
||||
BSONObj prevKey;
|
||||
list<BSONObj> all;
|
||||
|
||||
_temp->sort( MyCmp() );
|
||||
|
||||
InMemory * old = _temp;
|
||||
InMemory * n = new InMemory();
|
||||
_temp = n;
|
||||
@ -94,32 +94,15 @@ namespace mongo {
|
||||
|
||||
for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
|
||||
BSONObj key = i->first;
|
||||
BSONObj value = i->second;
|
||||
|
||||
if ( key.woCompare( prevKey ) == 0 ){
|
||||
all.push_back( value );
|
||||
continue;
|
||||
}
|
||||
list<BSONObj>& all = i->second;
|
||||
|
||||
if ( all.size() == 1 ){
|
||||
insert( prevKey , *(all.begin()) );
|
||||
all.clear();
|
||||
insert( key , *(all.begin()) );
|
||||
}
|
||||
else if ( all.size() > 1 ){
|
||||
BSONObj res = reduceValues( all , _scope , _reduce );
|
||||
insert( prevKey , res );
|
||||
all.clear();
|
||||
insert( key , res );
|
||||
}
|
||||
prevKey = key.getOwned();
|
||||
all.push_back( value );
|
||||
}
|
||||
|
||||
if ( all.size() == 1 ){
|
||||
insert( prevKey , *(all.begin()) );
|
||||
}
|
||||
else if ( all.size() > 1 ){
|
||||
BSONObj res = reduceValues( all , _scope , _reduce );
|
||||
insert( prevKey , res );
|
||||
}
|
||||
|
||||
delete( old );
|
||||
@ -128,7 +111,13 @@ namespace mongo {
|
||||
void dump(){
|
||||
for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
|
||||
BSONObj key = i->first;
|
||||
BSONObj value = i->second;
|
||||
list<BSONObj>& all = i->second;
|
||||
if ( all.size() < 1 )
|
||||
continue;
|
||||
assert( all.size() == 1 );
|
||||
|
||||
BSONObj value = *(all.begin());
|
||||
|
||||
BSONObjBuilder b;
|
||||
b.appendElements( value );
|
||||
|
||||
@ -140,10 +129,11 @@ namespace mongo {
|
||||
}
|
||||
|
||||
void insert( const BSONObj& key , const BSONObj& value ){
|
||||
_temp->push_back( pair<BSONObj,BSONObj>( key , value ) );
|
||||
list<BSONObj>& all = (*_temp)[key];
|
||||
all.push_back( value );
|
||||
_size += key.objsize() + value.objsize() + 32;
|
||||
}
|
||||
|
||||
|
||||
void checkSize(){
|
||||
if ( _size < 1024 * 10 )
|
||||
return;
|
||||
@ -154,11 +144,24 @@ namespace mongo {
|
||||
|
||||
if ( _size < 1024 * 15 )
|
||||
return;
|
||||
|
||||
|
||||
dump();
|
||||
log(1) << " mr: dumping to db" << endl;
|
||||
}
|
||||
|
||||
int getNum( const BSONObj& key ){
|
||||
KeyNums::iterator i = _nums.find( key );
|
||||
if ( i != _nums.end() )
|
||||
return i->second;
|
||||
int n = _nums.size() + 1;
|
||||
_nums[key] = n;
|
||||
return n;
|
||||
}
|
||||
|
||||
void resetNum(){
|
||||
_nums.clear();
|
||||
}
|
||||
|
||||
private:
|
||||
DBDirectClient * _db;
|
||||
string _coll;
|
||||
@ -168,6 +171,8 @@ namespace mongo {
|
||||
InMemory * _temp;
|
||||
|
||||
long _size;
|
||||
|
||||
map<BSONObj,int,BSONObjCmp> _nums;
|
||||
};
|
||||
|
||||
boost::thread_specific_ptr<MRTL> _tlmr;
|
||||
@ -195,6 +200,15 @@ namespace mongo {
|
||||
_tlmr->insert( key , value );
|
||||
return BSON( "x" << 1 );
|
||||
}
|
||||
|
||||
BSONObj get_num( const BSONObj& args ){
|
||||
return BSON( "" << _tlmr->getNum( args ) );
|
||||
}
|
||||
|
||||
BSONObj reset_num( const BSONObj& args ){
|
||||
_tlmr->resetNum();
|
||||
return BSONObj();
|
||||
}
|
||||
|
||||
class MapReduceCommand : public Command {
|
||||
public:
|
||||
@ -217,7 +231,7 @@ namespace mongo {
|
||||
return;
|
||||
|
||||
BSONObj res = reduceValues( values , s , reduce );
|
||||
db.insert( resultColl , res );
|
||||
db.update( resultColl , res.extractFields( BSON( "_id" << 1 ) ) , res , true );
|
||||
}
|
||||
|
||||
void finalReduce( const string& resultColl , list<BSONObj>& values , Scope * s , ScriptingFunction reduce ){
|
||||
@ -264,22 +278,31 @@ namespace mongo {
|
||||
|
||||
try {
|
||||
dbtemprelease temprlease;
|
||||
|
||||
|
||||
s->execSetup( (string)"tempcoll = db[\"" + resultCollShort + "\"];" , "tempcoll1" );
|
||||
if ( s->type( "emit" ) == 6 ){
|
||||
s->injectNative( "emit" , fast_emit );
|
||||
}
|
||||
s->execSetup( "MR.init()" );
|
||||
|
||||
s->injectNative( "get_num" , get_num );
|
||||
s->injectNative( "reset_num" , reset_num );
|
||||
|
||||
ScriptingFunction mapFunction = s->createFunction( cmdObj["map"].ascode().c_str() );
|
||||
ScriptingFunction reduceFunction = s->createFunction( cmdObj["reduce"].ascode().c_str() );
|
||||
|
||||
s->execSetup( (string)"$reduce = " + cmdObj["reduce"].ascode() );
|
||||
|
||||
MRTL * mrtl = new MRTL( &db , resultColl , s.get() , reduceFunction );
|
||||
_tlmr.reset( mrtl );
|
||||
|
||||
BSONObj q;
|
||||
if ( cmdObj["query"].type() == Object )
|
||||
q = cmdObj["query"].embeddedObjectUserCheck();
|
||||
Query q;
|
||||
BSONObj filter;
|
||||
if ( cmdObj["query"].type() == Object ){
|
||||
filter = cmdObj["query"].embeddedObjectUserCheck();
|
||||
q = filter;
|
||||
}
|
||||
|
||||
if ( cmdObj["sort"].type() == Object )
|
||||
q.sort( cmdObj["sort"].embeddedObjectUserCheck() );
|
||||
|
||||
ProgressMeter pm( db.count( ns , filter ) );
|
||||
auto_ptr<DBClientCursor> cursor = db.query( ns , q );
|
||||
while ( cursor->more() ){
|
||||
BSONObj o = cursor->next();
|
||||
@ -290,18 +313,24 @@ namespace mongo {
|
||||
|
||||
num++;
|
||||
if ( num % 100 == 0 ){
|
||||
mrtl->checkSize();
|
||||
s->exec( "MR.check();" , "reduce-i" , false , true , true );
|
||||
//mrtl->checkSize();
|
||||
}
|
||||
pm.hit();
|
||||
}
|
||||
|
||||
|
||||
result.append( "timeMillis.emit" , t.millis() );
|
||||
|
||||
|
||||
// final reduce
|
||||
|
||||
mrtl->reduceInMemory();
|
||||
mrtl->dump();
|
||||
|
||||
|
||||
s->exec( "MR.doReduce(true)" , "reduce" , false , true , true );
|
||||
s->execSetup( "MR.cleanup()" );
|
||||
_tlmr.reset( 0 );
|
||||
/*
|
||||
BSONObj prev;
|
||||
list<BSONObj> all;
|
||||
BSONObj sortKey = BSON( "_id" << 1 );
|
||||
@ -321,8 +350,9 @@ namespace mongo {
|
||||
prev = o;
|
||||
all.push_back( o );
|
||||
}
|
||||
|
||||
|
||||
finalReduce( resultColl , all , s.get() , reduceFunction );
|
||||
*/
|
||||
}
|
||||
catch ( ... ){
|
||||
log() << "mr failed, removing collection" << endl;
|
||||
|
@ -9,6 +9,11 @@ t.save( { x : 4 , tags : [ "b" , "c" ] } );
|
||||
|
||||
emit = printjson;
|
||||
|
||||
ks = "_id";
|
||||
if ( db.version() == "1.1.1" )
|
||||
ks = "key";
|
||||
|
||||
|
||||
m = function(){
|
||||
this.tags.forEach(
|
||||
function(z){
|
||||
@ -19,20 +24,21 @@ m = function(){
|
||||
|
||||
r = function( key , values ){
|
||||
var total = 0;
|
||||
for ( var i=0; i<values.length; i++ )
|
||||
for ( var i=0; i<values.length; i++ ){
|
||||
total += values[i].count;
|
||||
}
|
||||
return { count : total };
|
||||
};
|
||||
|
||||
res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } );
|
||||
assert( res.ok , "not ok" );
|
||||
if ( ks == "_id" ) assert( res.ok , "not ok" );
|
||||
assert.eq( 4 , res.numObjects , "A" );
|
||||
x = db[res.result];
|
||||
|
||||
assert.eq( 3 , x.find().count() , "B" );
|
||||
x.find().forEach( printjson );
|
||||
z = {};
|
||||
x.find().forEach( function(a){ z[a._id] = a.value.count; } );
|
||||
x.find().forEach( function(a){ z[a[ks]] = a.value.count; } );
|
||||
printjson( z );
|
||||
assert.eq( 3 , z.keySet().length , "C" );
|
||||
assert.eq( 2 , z.a , "D" );
|
||||
@ -44,7 +50,7 @@ res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , query : { x :
|
||||
assert.eq( 2 , res.numObjects , "B" );
|
||||
x = db[res.result];
|
||||
z = {};
|
||||
x.find().forEach( function(a){ z[a._id] = a.value.count; } );
|
||||
x.find().forEach( function(a){ z[a[ks]] = a.value.count; } );
|
||||
assert.eq( 1 , z.a , "C1" );
|
||||
assert.eq( 1 , z.b , "C2" );
|
||||
assert.eq( 2 , z.c , "C3" );
|
||||
@ -55,7 +61,7 @@ assert.eq( 2 , res.numObjects , "B2" );
|
||||
assert.eq( "foo" , res.result , "B2-c" );
|
||||
x = db[res.result];
|
||||
z = {};
|
||||
x.find().forEach( function(a){ z[a._id] = a.value.count; } );
|
||||
x.find().forEach( function(a){ z[a[ks]] = a.value.count; } );
|
||||
assert.eq( 1 , z.a , "C1a" );
|
||||
assert.eq( 1 , z.b , "C2a" );
|
||||
assert.eq( 2 , z.c , "C3a" );
|
||||
@ -73,15 +79,25 @@ assert.eq( 999 , res.numObjects , "Z1" );
|
||||
x = db[res.result];
|
||||
x.find().forEach( printjson )
|
||||
assert.eq( 4 , x.find().count() , "Z2" );
|
||||
assert.eq( "a,b,c,d" , x.distinct( "_id" ) , "Z3" );
|
||||
assert.eq( 2 , x.findOne( { _id : "a" } ).value.count , "ZA" );
|
||||
assert.eq( 998 , x.findOne( { _id : "b" } ).value.count , "ZB" );
|
||||
assert.eq( 3 , x.findOne( { _id : "c" } ).value.count , "ZC" );
|
||||
assert.eq( 995 , x.findOne( { _id : "d" } ).value.count , "ZD" );
|
||||
assert.eq( "a,b,c,d" , x.distinct( ks ) , "Z3" );
|
||||
|
||||
function getk( k ){
|
||||
var o = {};
|
||||
o[ks] = k;
|
||||
return x.findOne( o );
|
||||
}
|
||||
|
||||
assert.eq( 2 , getk( "a" ).value.count , "ZA" );
|
||||
assert.eq( 998 , getk( "b" ).value.count , "ZB" );
|
||||
assert.eq( 3 , getk( "c" ).value.count , "ZC" );
|
||||
assert.eq( 995 , getk( "d" ).value.count , "ZD" );
|
||||
x.drop();
|
||||
|
||||
print( Date.timeFunc(
|
||||
function(){
|
||||
db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } );
|
||||
var out = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } );
|
||||
if ( ks == "_id" ) assert( out.ok , "XXX" );
|
||||
db[out.result].drop();
|
||||
} , 10 ) );
|
||||
|
||||
|
||||
@ -90,3 +106,28 @@ print( Date.timeFunc(
|
||||
res = db.runCommand( { mapreduce : "lasjdlasjdlasjdjasldjalsdj12e" , map : m , reduce : r } );
|
||||
assert( ! res.ok , "should be not ok" );
|
||||
|
||||
if ( true ){
|
||||
correct = {};
|
||||
|
||||
for ( i=0; i<20000; i++ ){
|
||||
k = "Z" + i % 10000;
|
||||
if ( correct[k] )
|
||||
correct[k]++;
|
||||
else
|
||||
correct[k] = 1;
|
||||
t.save( { x : i , tags : [ k ] } );
|
||||
}
|
||||
|
||||
res = db.runCommand( { mapreduce : "mr1" , out : "foo" , map : m , reduce : r } );
|
||||
printjson( res );
|
||||
x = db[res.result];
|
||||
z = {};
|
||||
x.find().forEach( function(a){ z[a[ks]] = a.value.count; } );
|
||||
for ( zz in z ){
|
||||
if ( zz.indexOf( "Z" ) == 0 ){
|
||||
assert.eq( correct[zz] , z[zz] , "ZZ : " + zz );
|
||||
}
|
||||
}
|
||||
x.drop();
|
||||
}
|
||||
|
||||
|
78
shell/mr.js
Normal file
78
shell/mr.js
Normal file
@ -0,0 +1,78 @@
|
||||
// mr.js
|
||||
|
||||
MR = {};
|
||||
|
||||
MR.init = function(){
|
||||
$max = 0;
|
||||
$arr = [];
|
||||
emit = MR.emit;
|
||||
gc(); // this is just so that keep memory size sane
|
||||
}
|
||||
|
||||
MR.cleanup = function(){
|
||||
MR.init();
|
||||
gc();
|
||||
}
|
||||
|
||||
MR.emit = function(k,v){
|
||||
var num = get_num( k );
|
||||
var data = $arr[num];
|
||||
if ( ! data ){
|
||||
data = { key : k , values : [] };
|
||||
$arr[num] = data;
|
||||
}
|
||||
data.values.push( v );
|
||||
$max = Math.max( $max , data.values.length );
|
||||
}
|
||||
|
||||
MR.doReduce = function( useDB ){
|
||||
$max = 0;
|
||||
for ( var i=0; i<$arr.length; i++){
|
||||
var data = $arr[i];
|
||||
if ( ! data )
|
||||
continue;
|
||||
|
||||
if ( useDB ){
|
||||
var x = tempcoll.findOne( { _id : data.key } );
|
||||
if ( x ){
|
||||
data.values.push( x.value );
|
||||
}
|
||||
}
|
||||
|
||||
var r = $reduce( data.key , data.values );
|
||||
if ( r.length && r[0] ){
|
||||
data.values = r;
|
||||
}
|
||||
else{
|
||||
data.values = [ r ];
|
||||
}
|
||||
|
||||
$max = Math.max( $max , data.values.length );
|
||||
|
||||
if ( useDB ){
|
||||
if ( data.values.length == 1 ){
|
||||
tempcoll.save( { _id : data.key , value : data.values[0] } );
|
||||
}
|
||||
else {
|
||||
tempcoll.save( { _id : data.key , value : data.values } );
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MR.check = function(){
|
||||
if ( $max < 2000 && $arr.length < 1000 ){
|
||||
return 0;
|
||||
}
|
||||
MR.doReduce();
|
||||
if ( $max < 2000 && $arr.length < 1000 ){
|
||||
return 1;
|
||||
}
|
||||
MR.doReduce( true );
|
||||
$arr = [];
|
||||
$max = 0;
|
||||
reset_num();
|
||||
gc();
|
||||
return 2;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user