0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 17:10:48 +01:00
mongodb/s/strategy.cpp

236 lines
8.2 KiB
C++
Raw Normal View History

/*
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2009-02-20 16:46:42 +01:00
// stragegy.cpp
#include "stdafx.h"
#include "request.h"
#include "../util/background.h"
2009-02-20 16:46:42 +01:00
#include "../client/connpool.h"
#include "../db/commands.h"
#include "server.h"
2009-02-20 16:46:42 +01:00
namespace mongo {
2009-02-24 03:47:25 +01:00
// ----- Strategy ------
2009-02-20 16:46:42 +01:00
void Strategy::doWrite( int op , Request& r , string server ){
2010-04-19 22:55:16 +02:00
ShardConnection dbcon( server );
2009-02-20 16:46:42 +01:00
DBClientBase &_c = dbcon.conn();
/* TODO FIX - do not case and call DBClientBase::say() */
DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
c.port().say( r.m() );
dbcon.done();
}
void Strategy::doQuery( Request& r , string server ){
try{
2010-04-19 22:55:16 +02:00
ShardConnection dbcon( server );
DBClientBase &c = dbcon.conn();
checkShardVersion( c , r.getns() );
2009-04-07 21:19:27 +02:00
Message response;
bool ok = c.call( r.m(), response);
2009-04-07 21:19:27 +02:00
{
QueryResult *qr = (QueryResult *) response.data;
if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){
dbcon.done();
throw StaleConfigException( r.getns() , "Strategy::doQuery" );
2009-04-07 21:19:27 +02:00
}
}
uassert( 10200 , "mongos: error calling db", ok);
r.reply( response );
dbcon.done();
}
catch ( AssertionException& e ) {
BSONObjBuilder err;
err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
BSONObj errObj = err.done();
replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
}
}
2009-02-20 19:46:57 +01:00
2009-02-20 16:46:42 +01:00
void Strategy::insert( string server , const char * ns , const BSONObj& obj ){
2010-04-19 22:55:16 +02:00
ShardConnection dbcon( server );
checkShardVersion( dbcon.conn() , ns );
2009-02-20 16:46:42 +01:00
dbcon->insert( ns , obj );
2009-02-20 19:46:57 +01:00
dbcon.done();
2009-02-20 16:46:42 +01:00
}
2009-02-24 03:47:25 +01:00
2009-03-30 16:50:10 +02:00
map<DBClientBase*,unsigned long long> checkShardVersionLastSequence;
class WriteBackListener : public BackgroundJob {
protected:
WriteBackListener( const string& addr ) : _addr( addr ){
cout << "creating WriteBackListener for: " << addr << endl;
}
void run(){
int secsToSleep = 0;
while ( 1 ){
try {
2010-04-19 22:55:16 +02:00
ShardConnection conn( _addr );
BSONObj result;
{
BSONObjBuilder cmd;
cmd.appendOID( "writebacklisten" , &serverID );
if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){
log() << "writebacklisten command failed! " << result << endl;
conn.done();
continue;
}
}
log(1) << "writebacklisten result: " << result << endl;
BSONObj data = result.getObjectField( "data" );
if ( data.getBoolField( "writeBack" ) ){
string ns = data["ns"].valuestrsafe();
int len;
Message m( (void*)data["msg"].binData( len ) , false );
massert( 10427 , "invalid writeback message" , m.data->valid() );
grid.getDBConfig( ns )->getChunkManager( ns , true );
Request r( m , 0 );
r.process();
}
else {
log() << "unknown writeBack result: " << result << endl;
}
conn.done();
secsToSleep = 0;
}
catch ( std::exception e ){
log() << "WriteBackListener exception : " << e.what() << endl;
}
catch ( ... ){
log() << "WriteBackListener uncaught exception!" << endl;
}
secsToSleep++;
2009-04-20 15:16:47 +02:00
sleepsecs(secsToSleep);
if ( secsToSleep > 10 )
secsToSleep = 0;
}
}
private:
string _addr;
static map<string,WriteBackListener*> _cache;
public:
static void init( DBClientBase& conn ){
WriteBackListener*& l = _cache[conn.getServerAddress()];
if ( l )
return;
l = new WriteBackListener( conn.getServerAddress() );
l->go();
}
};
map<string,WriteBackListener*> WriteBackListener::_cache;
2009-03-27 21:55:26 +01:00
void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){
2009-03-25 22:35:38 +01:00
// TODO: cache, optimize, etc...
WriteBackListener::init( conn );
2009-03-25 22:35:38 +01:00
DBConfig * conf = grid.getDBConfig( ns );
if ( ! conf )
return;
ShardChunkVersion version = 0;
unsigned long long officialSequenceNumber = 0;
if ( conf->isSharded( ns ) ){
ChunkManager * manager = conf->getChunkManager( ns , authoritative );
officialSequenceNumber = manager->getSequenceNumber();
version = manager->getVersion( conn.getServerAddress() );
}
2009-03-30 16:50:10 +02:00
unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ];
if ( officialSequenceNumber == sequenceNumber )
2009-03-30 16:50:10 +02:00
return;
2009-04-10 16:41:35 +02:00
log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << endl;
2009-03-27 21:55:26 +01:00
BSONObj result;
if ( setShardVersion( conn , ns , version , authoritative , result ) ){
2009-03-30 16:50:10 +02:00
// success!
log(1) << " setShardVersion success!" << endl;
sequenceNumber = officialSequenceNumber;
2009-03-27 21:55:26 +01:00
return;
}
log(1) << " setShardVersion failed!\n" << result << endl;
if ( result.getBoolField( "need_authoritative" ) )
massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative );
2009-03-27 21:55:26 +01:00
if ( ! authoritative ){
checkShardVersion( conn , ns , 1 );
return;
}
log(1) << " setShardVersion failed: " << result << endl;
massert( 10429 , "setShardVersion failed!" , 0 );
2009-03-25 22:35:38 +01:00
}
2009-03-27 21:55:26 +01:00
bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
2009-03-25 22:35:38 +01:00
2009-03-27 21:55:26 +01:00
BSONObjBuilder cmdBuilder;
cmdBuilder.append( "setShardVersion" , ns.c_str() );
cmdBuilder.append( "configdb" , configServer.modelServer() );
cmdBuilder.appendTimestamp( "version" , version );
cmdBuilder.appendOID( "serverID" , &serverID );
2009-03-27 21:55:26 +01:00
if ( authoritative )
cmdBuilder.appendBool( "authoritative" , 1 );
BSONObj cmd = cmdBuilder.obj();
2009-03-30 15:48:15 +02:00
log(1) << " setShardVersion " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl;
2009-03-27 21:55:26 +01:00
return conn.runCommand( "admin" , cmd , result );
}
2009-04-03 20:21:00 +02:00
bool lockNamespaceOnServer( const string& server , const string& ns ){
2010-04-19 22:55:16 +02:00
ShardConnection conn( server );
2009-04-03 20:21:00 +02:00
bool res = lockNamespaceOnServer( conn.conn() , ns );
conn.done();
return res;
}
bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ){
BSONObj lockResult;
return setShardVersion( conn , ns , grid.getNextOpTime() , true , lockResult );
}
2009-02-20 16:46:42 +01:00
}