0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

SERVER-2531: added REDUCE mode for M/R to sharded output collection

This commit is contained in:
agirbal 2011-06-26 18:59:37 -07:00
parent 5a2f4ceb93
commit 1388ecbdb7
7 changed files with 163 additions and 108 deletions

View File

@ -1116,102 +1116,113 @@ namespace mongo {
virtual LockType locktype() const { return NONE; } virtual LockType locktype() const { return NONE; }
bool run(const string& dbname , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { bool run(const string& dbname , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
string postProcessCollection = cmdObj["postProcessCollection"].valuestrsafe();
bool postProcessOnly = !postProcessCollection.empty();
Config config( dbname , cmdObj.firstElement().embeddedObjectUserCheck() ); Config config( dbname , cmdObj.firstElement().embeddedObjectUserCheck() );
if (postProcessOnly) {
// the temp collection has been decided by mongos
config.tempLong = dbname + "." + postProcessCollection;
}
// no need for incremental collection because records are already sorted
config.incLong = config.tempLong; config.incLong = config.tempLong;
set<ServerAndQuery> servers; State state(config);
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck(); BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck(); BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
vector< auto_ptr<DBClientCursor> > shardCursors;
{
// parse per shard results
BSONObjIterator i( shards );
while ( i.more() ) {
BSONElement e = i.next();
string shard = e.fieldName();
BSONObj res = e.embeddedObjectUserCheck();
uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
servers.insert( shard );
if (postProcessOnly) {
// this is usually for reduce mode
if (!state._db.exists(config.tempLong)) {
// nothing to do
return 1;
} }
} else {
set<ServerAndQuery> servers;
vector< auto_ptr<DBClientCursor> > shardCursors;
} {
// parse per shard results
BSONObjIterator i( shards );
while ( i.more() ) {
BSONElement e = i.next();
string shard = e.fieldName();
State state(config); BSONObj res = e.embeddedObjectUserCheck();
state.prepTempCollection();
{ uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
// reduce from each stream servers.insert( shard );
BSONObj sortKey = BSON( "_id" << 1 );
ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
Query().sort( sortKey ) );
cursor.init();
state.init();
BSONList values;
if (!config.outDB.empty()) {
BSONObjBuilder loc;
if ( !config.outDB.empty())
loc.append( "db" , config.outDB );
if ( !config.finalShort.empty() )
loc.append( "collection" , config.finalShort );
result.append("result", loc.obj());
}
else {
if ( !config.finalShort.empty() )
result.append( "result" , config.finalShort );
}
while ( cursor.more() ) {
BSONObj t = cursor.next().getOwned();
if ( values.size() == 0 ) {
values.push_back( t );
continue;
} }
if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { }
values.push_back( t );
continue; state.prepTempCollection();
{
// reduce from each stream
BSONObj sortKey = BSON( "_id" << 1 );
ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
Query().sort( sortKey ) );
cursor.init();
state.init();
BSONList values;
if (!config.outDB.empty()) {
BSONObjBuilder loc;
if ( !config.outDB.empty())
loc.append( "db" , config.outDB );
if ( !config.finalShort.empty() )
loc.append( "collection" , config.finalShort );
result.append("result", loc.obj());
}
else {
if ( !config.finalShort.empty() )
result.append( "result" , config.finalShort );
} }
BSONObj res = config.reducer->finalReduce( values , config.finalizer.get()); while ( cursor.more() || !values.empty() ) {
if (state.isOnDisk()) BSONObj t;
state.insertToInc(res); if (cursor.more()) {
else t = cursor.next().getOwned();
state.emit(res);
values.clear(); if ( values.size() == 0 ) {
values.push_back( t ); values.push_back( t );
continue;
}
if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
values.push_back( t );
continue;
}
}
BSONObj res = config.reducer->finalReduce( values , config.finalizer.get());
if (state.isOnDisk())
state.insertToInc(res);
else
state.emit(res);
values.clear();
if (!t.isEmpty())
values.push_back( t );
}
} }
if ( values.size() ) { for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
BSONObj res = config.reducer->finalReduce( values , config.finalizer.get() ); ScopedDbConnection conn( i->_server );
if (state.isOnDisk()) conn->dropCollection( dbname + "." + shardedOutputCollection );
state.insertToInc(res); conn.done();
else
state.emit(res);
} }
result.append( "shardCounts" , shardCounts );
} }
long long finalCount = state.postProcessCollection(); long long finalCount = state.postProcessCollection();
state.appendResults( result ); state.appendResults( result );
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
ScopedDbConnection conn( i->_server );
conn->dropCollection( dbname + "." + shardedOutputCollection );
conn.done();
}
result.append( "shardCounts" , shardCounts );
// fix the global counts // fix the global counts
BSONObjBuilder countsB(32); BSONObjBuilder countsB(32);
BSONObjIterator j(counts); BSONObjIterator j(counts);

View File

@ -283,6 +283,7 @@ namespace mongo {
void bailFromJS(); void bailFromJS();
const Config& _config; const Config& _config;
DBDirectClient _db;
protected: protected:
@ -291,8 +292,6 @@ namespace mongo {
scoped_ptr<Scope> _scope; scoped_ptr<Scope> _scope;
bool _onDisk; // if the end result of this map reduce is disk or not bool _onDisk; // if the end result of this map reduce is disk or not
DBDirectClient _db;
scoped_ptr<InMemory> _temp; scoped_ptr<InMemory> _temp;
long _size; // bytes in _temp long _size; // bytes in _temp
long _dupCount; // number of duplicate key entries long _dupCount; // number of duplicate key entries

View File

@ -1003,9 +1003,7 @@ namespace mongo {
// so we allocate them in our thread // so we allocate them in our thread
// and hand off // and hand off
// Note: why not use pooled connections? This has been reported to create too many connections // Note: why not use pooled connections? This has been reported to create too many connections
vector< shared_ptr<ShardConnection> > shardConns; vector< shared_ptr<ShardConnection> > shardConns;
list< shared_ptr<Future::CommandResult> > futures; list< shared_ptr<Future::CommandResult> > futures;
for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
@ -1100,9 +1098,7 @@ namespace mongo {
mr_shard::State state(config); mr_shard::State state(config);
log(1) << "mr sharded output ns: " << config.ns << endl; log(1) << "mr sharded output ns: " << config.ns << endl;
// for now we only support replace output collection in sharded mode if (config.outType == mr_shard::Config::INMEMORY) {
if (config.outType != mr_shard::Config::REPLACE &&
config.outType != mr_shard::Config::MERGE) {
errmsg = "This Map Reduce mode is not supported with sharded output"; errmsg = "This Map Reduce mode is not supported with sharded output";
return false; return false;
} }
@ -1111,18 +1107,22 @@ namespace mongo {
BSONObjBuilder loc; BSONObjBuilder loc;
if ( !config.outDB.empty()) if ( !config.outDB.empty())
loc.append( "db" , config.outDB ); loc.append( "db" , config.outDB );
if ( !config.finalShort.empty() ) loc.append( "collection" , config.finalShort );
loc.append( "collection" , config.finalShort );
result.append("result", loc.obj()); result.append("result", loc.obj());
} }
else { else {
if ( !config.finalShort.empty() ) if ( !config.finalShort.empty() )
result.append( "result" , config.finalShort ); result.append( "result" , config.finalShort );
} }
string outns = config.finalLong;
bool merge = (config.outType == mr_shard::Config::MERGE); string outns = config.finalLong;
if (!merge) { string tempns;
if (config.outType == mr_shard::Config::REDUCE) {
// result will be inserted into a temp collection to post process
const string postProcessCollection = getTmpName( collection );
cout << "post process collection is " << postProcessCollection << endl;
tempns = dbName + "." + postProcessCollection;
} else if (config.outType == mr_shard::Config::REPLACE) {
// drop previous collection // drop previous collection
BSONObj dropColCmd = BSON("drop" << config.finalShort); BSONObj dropColCmd = BSON("drop" << config.finalShort);
BSONObjBuilder dropColResult(32); BSONObjBuilder dropColResult(32);
@ -1134,14 +1134,20 @@ namespace mongo {
} }
} }
// create the sharded collection
BSONObj sortKey = BSON( "_id" << 1 ); BSONObj sortKey = BSON( "_id" << 1 );
BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey); if (!conf->isSharded(outns)) {
BSONObjBuilder shardColResult(32); // create the sharded collection
bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult);
if (!res) { BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey);
errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); BSONObjBuilder shardColResult(32);
return false; bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult);
if (!res) {
errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString();
return false;
}
// since it's new collection, use replace mode always
config.outType = mr_shard::Config::REPLACE;
} }
ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection ,
@ -1152,7 +1158,7 @@ namespace mongo {
mr_shard::BSONList values; mr_shard::BSONList values;
Strategy* s = SHARDED; Strategy* s = SHARDED;
long long finalCount = 0; long long finalCount = 0;
while ( cursor.more() || values.size() > 0 ) { while ( cursor.more() || !values.empty() ) {
BSONObj t; BSONObj t;
if ( cursor.more() ) { if ( cursor.more() ) {
t = cursor.next().getOwned(); t = cursor.next().getOwned();
@ -1169,11 +1175,14 @@ namespace mongo {
} }
BSONObj final = config.reducer->finalReduce(values, config.finalizer.get()); BSONObj final = config.reducer->finalReduce(values, config.finalizer.get());
if (merge) { if (config.outType == mr_shard::Config::MERGE) {
BSONObj id = final["_id"].wrap(); BSONObj id = final["_id"].wrap();
s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert); s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true);
} else if (config.outType == mr_shard::Config::REDUCE) {
// insert into temp collection, but final collection's sharding
s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str());
} else { } else {
s->insertSharded(conf, outns.c_str(), final, 0); s->insertSharded(conf, outns.c_str(), final, 0, true);
} }
++finalCount; ++finalCount;
values.clear(); values.clear();
@ -1181,6 +1190,36 @@ namespace mongo {
values.push_back( t ); values.push_back( t );
} }
if (config.outType == mr_shard::Config::REDUCE) {
// results were written to temp collection, need final reduce
vector< shared_ptr<ShardConnection> > shardConns;
list< shared_ptr<Future::CommandResult> > futures;
BSONObj finalCmdObj = finalCmd.obj();
for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , outns ) );
futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , temp->get() ) );
shardConns.push_back( temp );
}
// now wait for the result of all shards
bool failed = false;
for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
shared_ptr<Future::CommandResult> res = *i;
if ( ! res->join() ) {
error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
result.append( "cause" , res->result() );
errmsg = "mongod mr failed: ";
errmsg += res->result().toString();
failed = true;
continue;
}
BSONObj result = res->result();
}
for ( unsigned i=0; i<shardConns.size(); i++ )
shardConns[i]->done();
}
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
ScopedDbConnection conn( i->_server ); ScopedDbConnection conn( i->_server );
conn->dropCollection( dbName + "." + shardedOutputCollection ); conn->dropCollection( dbName + "." + shardedOutputCollection );

View File

@ -67,17 +67,19 @@ namespace mongo {
dbcon.done(); dbcon.done();
} }
void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags ) { void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags, bool safe ) {
ShardConnection dbcon( shard , ns ); ShardConnection dbcon( shard , ns );
if ( dbcon.setVersion() ) { if ( dbcon.setVersion() ) {
dbcon.done(); dbcon.done();
throw StaleConfigException( ns , "for insert" ); throw StaleConfigException( ns , "for insert" );
} }
dbcon->insert( ns , obj , flags); dbcon->insert( ns , obj , flags);
if (safe)
dbcon->getLastError();
dbcon.done(); dbcon.done();
} }
void Strategy::update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags ) { void Strategy::update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags, bool safe ) {
bool upsert = flags & UpdateOption_Upsert; bool upsert = flags & UpdateOption_Upsert;
bool multi = flags & UpdateOption_Multi; bool multi = flags & UpdateOption_Multi;
@ -87,6 +89,8 @@ namespace mongo {
throw StaleConfigException( ns , "for insert" ); throw StaleConfigException( ns , "for insert" );
} }
dbcon->update( ns , query , toupdate, upsert, multi); dbcon->update( ns , query , toupdate, upsert, multi);
if (safe)
dbcon->getLastError();
dbcon.done(); dbcon.done();
} }

View File

@ -32,15 +32,15 @@ namespace mongo {
virtual void getMore( Request& r ) = 0; virtual void getMore( Request& r ) = 0;
virtual void writeOp( int op , Request& r ) = 0; virtual void writeOp( int op , Request& r ) = 0;
virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) = 0; virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe=false, const char* nsChunkLookup=0 ) = 0;
virtual void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) = 0; virtual void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe=false ) = 0;
protected: protected:
void doWrite( int op , Request& r , const Shard& shard , bool checkVersion = true ); void doWrite( int op , Request& r , const Shard& shard , bool checkVersion = true );
void doQuery( Request& r , const Shard& shard ); void doQuery( Request& r , const Shard& shard );
void insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags=0 ); void insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags=0 , bool safe=false );
void update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags=0 ); void update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags=0, bool safe=false );
}; };

View File

@ -191,8 +191,10 @@ namespace mongo {
} }
} }
void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) { void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe, const char* nsChunkLookup ) {
ChunkManagerPtr manager = conf->getChunkManager(ns); if (!nsChunkLookup)
nsChunkLookup = ns;
ChunkManagerPtr manager = conf->getChunkManager(nsChunkLookup);
if ( ! manager->hasShardKey( o ) ) { if ( ! manager->hasShardKey( o ) ) {
bool bad = true; bool bad = true;
@ -206,7 +208,7 @@ namespace mongo {
} }
if ( bad ) { if ( bad ) {
log() << "tried to insert object without shard key: " << ns << " " << o << endl; log() << "tried to insert object without shard key: " << nsChunkLookup << " " << o << endl;
uasserted( 14842 , "tried to insert object without shard key" ); uasserted( 14842 , "tried to insert object without shard key" );
} }
@ -221,7 +223,7 @@ namespace mongo {
try { try {
ChunkPtr c = manager->findChunk( o ); ChunkPtr c = manager->findChunk( o );
log(4) << " server:" << c->getShard().toString() << " " << o << endl; log(4) << " server:" << c->getShard().toString() << " " << o << endl;
insert( c->getShard() , ns , o , flags); insert( c->getShard() , ns , o , flags, safe);
// r.gotInsert(); // r.gotInsert();
// if ( r.getClientInfo()->autoSplitOk() ) // if ( r.getClientInfo()->autoSplitOk() )
@ -344,7 +346,7 @@ namespace mongo {
} }
} }
void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) { void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe ) {
ChunkManagerPtr manager = conf->getChunkManager(ns); ChunkManagerPtr manager = conf->getChunkManager(ns);
BSONObj chunkFinder = query; BSONObj chunkFinder = query;
@ -410,7 +412,7 @@ namespace mongo {
// int * x = (int*)(r.d().afterNS()); // int * x = (int*)(r.d().afterNS());
// x[0] |= UpdateOption_Broadcast; // x[0] |= UpdateOption_Broadcast;
for ( set<Shard>::iterator i=shards.begin(); i!=shards.end(); i++) { for ( set<Shard>::iterator i=shards.begin(); i!=shards.end(); i++) {
update(*i, ns, query, toupdate, flags); update(*i, ns, query, toupdate, flags, safe);
} }
} }
else { else {

View File

@ -262,11 +262,11 @@ namespace mongo {
return true; return true;
} }
void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) { void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe, const char* nsChunkLookup ) {
// only useful for shards // only useful for shards
} }
void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) { void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe ) {
// only useful for shards // only useful for shards
} }