2009-02-20 16:46:42 +01:00
// stragegy.cpp
# include "stdafx.h"
# include "request.h"
2009-04-17 23:11:32 +02:00
# include "../util/background.h"
2009-02-20 16:46:42 +01:00
# include "../client/connpool.h"
# include "../db/commands.h"
2009-03-25 22:35:38 +01:00
# include "shard.h"
2009-04-17 23:11:32 +02:00
# include "server.h"
2009-02-20 16:46:42 +01:00
namespace mongo {
2009-02-24 03:47:25 +01:00
// ----- Strategy ------
2009-02-20 16:46:42 +01:00
void Strategy : : doWrite ( int op , Request & r , string server ) {
ScopedDbConnection dbcon ( server ) ;
DBClientBase & _c = dbcon . conn ( ) ;
/* TODO FIX - do not case and call DBClientBase::say() */
DBClientConnection & c = dynamic_cast < DBClientConnection & > ( _c ) ;
c . port ( ) . say ( r . m ( ) ) ;
dbcon . done ( ) ;
}
2009-02-22 05:39:41 +01:00
void Strategy : : doQuery ( Request & r , string server ) {
try {
ScopedDbConnection dbcon ( server ) ;
DBClientBase & _c = dbcon . conn ( ) ;
2009-03-25 22:35:38 +01:00
checkShardVersion ( _c , r . getns ( ) ) ;
2009-04-07 21:19:27 +02:00
2009-02-22 05:39:41 +01:00
// TODO: This will not work with Paired connections. Fix.
DBClientConnection & c = dynamic_cast < DBClientConnection & > ( _c ) ;
Message response ;
bool ok = c . port ( ) . call ( r . m ( ) , response ) ;
2009-04-07 21:19:27 +02:00
{
QueryResult * qr = ( QueryResult * ) response . data ;
if ( qr - > resultFlags ( ) & QueryResult : : ResultFlag_ShardConfigStale ) {
2009-04-13 04:19:41 +02:00
throw StaleConfigException ( r . getns ( ) , " Strategy::doQuery " ) ;
2009-04-07 21:19:27 +02:00
}
}
2009-02-22 05:39:41 +01:00
uassert ( " mongos: error calling db " , ok ) ;
r . reply ( response ) ;
dbcon . done ( ) ;
}
catch ( AssertionException & e ) {
BSONObjBuilder err ;
err . append ( " $err " , string ( " mongos: " ) + ( e . msg . empty ( ) ? " assertion during query " : e . msg ) ) ;
BSONObj errObj = err . done ( ) ;
replyToQuery ( QueryResult : : ResultFlag_ErrSet , r . p ( ) , r . m ( ) , errObj ) ;
}
}
2009-02-20 19:46:57 +01:00
2009-02-20 16:46:42 +01:00
void Strategy : : insert ( string server , const char * ns , const BSONObj & obj ) {
ScopedDbConnection dbcon ( server ) ;
2009-04-19 03:55:34 +02:00
checkShardVersion ( dbcon . conn ( ) , ns ) ;
2009-02-20 16:46:42 +01:00
dbcon - > insert ( ns , obj ) ;
2009-02-20 19:46:57 +01:00
dbcon . done ( ) ;
2009-02-20 16:46:42 +01:00
}
2009-02-24 03:47:25 +01:00
2009-03-30 16:50:10 +02:00
map < DBClientBase * , unsigned long long > checkShardVersionLastSequence ;
2009-04-17 23:11:32 +02:00
class WriteBackListener : public BackgroundJob {
protected :
WriteBackListener ( const string & addr ) : _addr ( addr ) {
cout < < " creating WriteBackListener for: " < < addr < < endl ;
}
void run ( ) {
int secsToSleep = 0 ;
while ( 1 ) {
try {
ScopedDbConnection conn ( _addr ) ;
BSONObj result ;
{
BSONObjBuilder cmd ;
cmd . appendOID ( " writebacklisten " , & serverID ) ;
if ( ! conn - > runCommand ( " admin " , cmd . obj ( ) , result ) ) {
log ( ) < < " writebacklisten command failed! " < < result < < endl ;
continue ;
}
}
2009-04-19 03:55:34 +02:00
log ( 1 ) < < " writebacklisten result: " < < result < < endl ;
BSONObj data = result . getObjectField ( " data " ) ;
if ( data . getBoolField ( " writeBack " ) ) {
string ns = data [ " ns " ] . valuestrsafe ( ) ;
int len ;
Message m ( ( void * ) data [ " msg " ] . binData ( len ) , false ) ;
massert ( " invalid writeback message " , m . data - > valid ( ) ) ;
grid . getDBConfig ( ns ) - > getShardManager ( ns , true ) ;
Request r ( m , 0 ) ;
r . process ( ) ;
}
else {
log ( ) < < " unknown writeBack result: " < < result < < endl ;
}
2009-04-17 23:11:32 +02:00
conn . done ( ) ;
secsToSleep = 0 ;
}
catch ( std : : exception e ) {
log ( ) < < " WriteBackListener exception : " < < e . what ( ) < < endl ;
}
catch ( . . . ) {
log ( ) < < " WriteBackListener uncaught exception! " < < endl ;
}
secsToSleep + + ;
2009-04-20 15:16:47 +02:00
sleepsecs ( secsToSleep ) ;
2009-04-17 23:11:32 +02:00
if ( secsToSleep > 10 )
secsToSleep = 0 ;
}
}
private :
string _addr ;
static map < string , WriteBackListener * > _cache ;
public :
static void init ( DBClientBase & conn ) {
WriteBackListener * & l = _cache [ conn . getServerAddress ( ) ] ;
if ( l )
return ;
l = new WriteBackListener ( conn . getServerAddress ( ) ) ;
l - > go ( ) ;
}
} ;
map < string , WriteBackListener * > WriteBackListener : : _cache ;
2009-03-27 21:55:26 +01:00
void checkShardVersion ( DBClientBase & conn , const string & ns , bool authoritative ) {
2009-03-25 22:35:38 +01:00
// TODO: cache, optimize, etc...
2009-04-17 23:11:32 +02:00
WriteBackListener : : init ( conn ) ;
2009-03-25 22:35:38 +01:00
DBConfig * conf = grid . getDBConfig ( ns ) ;
if ( ! conf )
return ;
if ( ! conf - > sharded ( ns ) )
return ;
2009-03-27 21:55:26 +01:00
ShardManager * manager = conf - > getShardManager ( ns , authoritative ) ;
2009-03-30 16:50:10 +02:00
unsigned long long & sequenceNumber = checkShardVersionLastSequence [ & conn ] ;
if ( manager - > getSequenceNumber ( ) = = sequenceNumber )
return ;
2009-04-10 16:41:35 +02:00
log ( 2 ) < < " have to set shard version for conn: " < < & conn < < " ns: " < < ns < < " my last seq: " < < sequenceNumber < < " current: " < < manager - > getSequenceNumber ( ) < < endl ;
2009-03-30 16:50:10 +02:00
ServerShardVersion version = manager - > getVersion ( conn . getServerAddress ( ) ) ;
2009-03-27 21:55:26 +01:00
BSONObj result ;
if ( setShardVersion ( conn , ns , version , authoritative , result ) ) {
2009-03-30 16:50:10 +02:00
// success!
sequenceNumber = manager - > getSequenceNumber ( ) ;
2009-03-27 21:55:26 +01:00
return ;
}
log ( 1 ) < < " setShardVersion failed! \n " < < result < < endl ;
if ( result . getBoolField ( " need_authoritative " ) )
massert ( " need_authoritative set but in authoritative mode already " , ! authoritative ) ;
if ( ! authoritative ) {
checkShardVersion ( conn , ns , 1 ) ;
return ;
}
log ( 1 ) < < " setShardVersion failed: " < < result < < endl ;
massert ( " setShardVersion failed! " , 0 ) ;
2009-03-25 22:35:38 +01:00
}
2009-03-27 21:55:26 +01:00
bool setShardVersion ( DBClientBase & conn , const string & ns , ServerShardVersion version , bool authoritative , BSONObj & result ) {
2009-03-25 22:35:38 +01:00
2009-03-27 21:55:26 +01:00
BSONObjBuilder cmdBuilder ;
cmdBuilder . append ( " setShardVersion " , ns . c_str ( ) ) ;
cmdBuilder . append ( " configdb " , configServer . modelServer ( ) ) ;
cmdBuilder . appendTimestamp ( " version " , version ) ;
2009-04-17 23:11:32 +02:00
cmdBuilder . appendOID ( " serverID " , & serverID ) ;
2009-03-27 21:55:26 +01:00
if ( authoritative )
cmdBuilder . appendBool ( " authoritative " , 1 ) ;
BSONObj cmd = cmdBuilder . obj ( ) ;
2009-03-30 15:48:15 +02:00
log ( 1 ) < < " setShardVersion " < < conn . getServerAddress ( ) < < " " < < ns < < " " < < cmd < < " " < < & conn < < endl ;
2009-03-27 21:55:26 +01:00
return conn . runCommand ( " admin " , cmd , result ) ;
}
2009-04-03 20:21:00 +02:00
bool lockNamespaceOnServer ( const string & server , const string & ns ) {
ScopedDbConnection conn ( server ) ;
bool res = lockNamespaceOnServer ( conn . conn ( ) , ns ) ;
conn . done ( ) ;
return res ;
}
bool lockNamespaceOnServer ( DBClientBase & conn , const string & ns ) {
BSONObj lockResult ;
return setShardVersion ( conn , ns , grid . getNextOpTime ( ) , true , lockResult ) ;
}
2009-04-17 23:11:32 +02:00
2009-02-20 16:46:42 +01:00
}