mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
can use sharding with SyncClusterConnection for 2 phase commit across 3 servers SHARDING-39
This commit is contained in:
parent
ced7756639
commit
1e6780f844
@ -21,6 +21,7 @@
|
||||
#include "stdafx.h"
|
||||
#include "connpool.h"
|
||||
#include "../db/commands.h"
|
||||
#include "syncclusterconnection.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
@ -33,11 +34,13 @@ namespace mongo {
|
||||
if ( p == 0 )
|
||||
p = new PoolForHost();
|
||||
if ( p->pool.empty() ) {
|
||||
string errmsg;
|
||||
int numCommas = DBClientBase::countCommas( host );
|
||||
DBClientBase *c;
|
||||
if( host.find(',') == string::npos ) {
|
||||
|
||||
if( numCommas == 0 ) {
|
||||
DBClientConnection *cc = new DBClientConnection(true);
|
||||
log(2) << "creating new connection for pool to:" << host << endl;
|
||||
string errmsg;
|
||||
if ( !cc->connect(host.c_str(), errmsg) ) {
|
||||
delete cc;
|
||||
uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false);
|
||||
@ -46,7 +49,7 @@ namespace mongo {
|
||||
c = cc;
|
||||
onCreate( c );
|
||||
}
|
||||
else {
|
||||
else if ( numCommas == 1 ) {
|
||||
DBClientPaired *p = new DBClientPaired();
|
||||
if( !p->connect(host) ) {
|
||||
delete p;
|
||||
@ -55,6 +58,12 @@ namespace mongo {
|
||||
}
|
||||
c = p;
|
||||
}
|
||||
else if ( numCommas == 2 ) {
|
||||
c = new SyncClusterConnection( host );
|
||||
}
|
||||
else {
|
||||
uassert( 13071 , (string)"invalid hostname [" + host + "]" , 0 );
|
||||
}
|
||||
return c;
|
||||
}
|
||||
DBClientBase *c = p->pool.top();
|
||||
@ -105,6 +114,15 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
ScopedDbConnection::~ScopedDbConnection() {
|
||||
if ( _conn && ! _conn->isFailed() ) {
|
||||
/* see done() comments above for why we log this line */
|
||||
log() << "~ScopedDBConnection: _conn != null" << endl;
|
||||
kill();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class PoolFlushCmd : public Command {
|
||||
public:
|
||||
PoolFlushCmd() : Command( "connpoolsync" ){}
|
||||
|
@ -122,14 +122,9 @@ namespace mongo {
|
||||
pool.release(host, _conn);
|
||||
_conn = 0;
|
||||
}
|
||||
|
||||
~ScopedDbConnection();
|
||||
|
||||
~ScopedDbConnection() {
|
||||
if ( _conn && ! _conn->isFailed() ) {
|
||||
/* see done() comments above for why we log this line */
|
||||
log() << "~ScopedDBConnection: _conn != null" << endl;
|
||||
kill();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -824,7 +824,6 @@ namespace mongo {
|
||||
return serverAddress;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual bool call( Message &toSend, Message &response, bool assertOk = true );
|
||||
virtual void say( Message &toSend );
|
||||
virtual void sayPiggyBack( Message &toSend );
|
||||
|
@ -18,12 +18,14 @@
|
||||
|
||||
#include "stdafx.h"
|
||||
#include "syncclusterconnection.h"
|
||||
#include "../db/dbmessage.h"
|
||||
|
||||
// error codes 8000-8009
|
||||
|
||||
namespace mongo {
|
||||
|
||||
SyncClusterConnection::SyncClusterConnection( string commaSeperated ){
|
||||
_address = commaSeperated;
|
||||
string::size_type idx;
|
||||
while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){
|
||||
string h = commaSeperated.substr( 0 , idx );
|
||||
@ -35,12 +37,17 @@ namespace mongo {
|
||||
}
|
||||
|
||||
SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){
|
||||
_address = a + "," + b + "," + c;
|
||||
// connect to all even if not working
|
||||
_connect( a );
|
||||
_connect( b );
|
||||
_connect( c );
|
||||
}
|
||||
|
||||
SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ){
|
||||
assert(0);
|
||||
}
|
||||
|
||||
SyncClusterConnection::~SyncClusterConnection(){
|
||||
for ( size_t i=0; i<_conns.size(); i++ )
|
||||
delete _conns[i];
|
||||
@ -111,7 +118,7 @@ namespace mongo {
|
||||
}
|
||||
|
||||
void SyncClusterConnection::_connect( string host ){
|
||||
log() << "SyncClusterConnection connecting to: " << host << endl;
|
||||
log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
|
||||
DBClientConnection * c = new DBClientConnection( true );
|
||||
string errmsg;
|
||||
if ( ! c->connect( host , errmsg ) )
|
||||
@ -138,7 +145,7 @@ namespace mongo {
|
||||
lockType = i->second;
|
||||
}
|
||||
|
||||
uassert( 13054 , "write $cmd not supported in SyncClusterConnection" , lockType <= 0 );
|
||||
uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection: " + cmdName , lockType <= 0 );
|
||||
}
|
||||
|
||||
return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
|
||||
@ -193,25 +200,47 @@ namespace mongo {
|
||||
uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0);
|
||||
}
|
||||
|
||||
void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ assert(0); }
|
||||
void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){
|
||||
assert(0);
|
||||
}
|
||||
|
||||
void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ assert(0); }
|
||||
void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){
|
||||
string errmsg;
|
||||
if ( ! prepare( errmsg ) )
|
||||
throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg );
|
||||
|
||||
for ( size_t i=0; i<_conns.size(); i++ ){
|
||||
_conns[i]->update( ns , query , obj , upsert , multi );
|
||||
}
|
||||
|
||||
_checkLast();
|
||||
}
|
||||
|
||||
string SyncClusterConnection::_toString() const {
|
||||
stringstream ss;
|
||||
ss << "SyncClusterConnection [";
|
||||
for ( size_t i=0; i<_conns.size(); i++ ){
|
||||
if ( i > 0 )
|
||||
ss << ",";
|
||||
ss << _conns[i]->toString();
|
||||
}
|
||||
ss << "]";
|
||||
ss << "SyncClusterConnection [" << _address << "]";
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk ){
|
||||
assert(0);
|
||||
return false;
|
||||
uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" ,
|
||||
toSend.operation() == dbQuery );
|
||||
|
||||
DbMessage d( toSend );
|
||||
uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 );
|
||||
|
||||
for ( size_t i=0; i<_conns.size(); i++ ){
|
||||
try {
|
||||
bool ok = _conns[i]->call( toSend , response , assertOk );
|
||||
if ( ok )
|
||||
return ok;
|
||||
log() << "call failed to: " << _conns[i]->toString() << " no data" << endl;
|
||||
}
|
||||
catch ( ... ){
|
||||
log() << "call failed to: " << _conns[i]->toString() << " exception" << endl;
|
||||
}
|
||||
}
|
||||
throw UserException( 8008 , "all servers down!" );
|
||||
}
|
||||
|
||||
void SyncClusterConnection::say( Message &toSend ){
|
||||
|
@ -68,9 +68,7 @@ namespace mongo {
|
||||
virtual void say( Message &toSend );
|
||||
virtual void sayPiggyBack( Message &toSend );
|
||||
|
||||
virtual string getServerAddress() const {
|
||||
return _toString();
|
||||
}
|
||||
virtual string getServerAddress() const { return _address; }
|
||||
|
||||
virtual bool isFailed() const {
|
||||
return false;
|
||||
@ -78,6 +76,8 @@ namespace mongo {
|
||||
|
||||
private:
|
||||
|
||||
SyncClusterConnection( SyncClusterConnection& prev );
|
||||
|
||||
string _toString() const;
|
||||
|
||||
bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
|
||||
@ -90,8 +90,9 @@ namespace mongo {
|
||||
void _checkLast();
|
||||
|
||||
void _connect( string host );
|
||||
|
||||
string _address;
|
||||
vector<DBClientConnection*> _conns;
|
||||
|
||||
map<string,int> _lockTypes;
|
||||
};
|
||||
|
||||
|
@ -437,9 +437,10 @@ namespace mongo {
|
||||
virtual bool slaveOk() {
|
||||
return true;
|
||||
}
|
||||
virtual LockType locktype(){ return WRITE; }
|
||||
virtual LockType locktype(){ return NONE; }
|
||||
CmdGetOpTime() : Command("getoptime") { }
|
||||
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
|
||||
writelock l( "" );
|
||||
result.appendDate("optime", OpTime::now().asDate());
|
||||
return true;
|
||||
}
|
||||
|
48
jstests/sharding/sync2.js
Normal file
48
jstests/sharding/sync2.js
Normal file
@ -0,0 +1,48 @@
|
||||
// sync2.js
|
||||
|
||||
s = new ShardingTest( "sync2" , 3 , 50 , 2 , { sync : true } );
|
||||
|
||||
s2 = s._mongos[1];
|
||||
|
||||
s.adminCommand( { enablesharding : "test" } );
|
||||
s.adminCommand( { shardcollection : "test.foo" , key : { num : 1 } } );
|
||||
|
||||
s.getDB( "test" ).foo.save( { num : 1 } );
|
||||
s.getDB( "test" ).foo.save( { num : 2 } );
|
||||
s.getDB( "test" ).foo.save( { num : 3 } );
|
||||
s.getDB( "test" ).foo.save( { num : 4 } );
|
||||
s.getDB( "test" ).foo.save( { num : 5 } );
|
||||
s.getDB( "test" ).foo.save( { num : 6 } );
|
||||
s.getDB( "test" ).foo.save( { num : 7 } );
|
||||
|
||||
assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal A" );
|
||||
assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other A" );
|
||||
|
||||
s.adminCommand( { split : "test.foo" , middle : { num : 4 } } );
|
||||
s.adminCommand( { movechunk : "test.foo" , find : { num : 3 } , to : s.getFirstOther( s.getServer( "test" ) ).name } );
|
||||
|
||||
assert( s._connections[0].getDB( "test" ).foo.find().toArray().length > 0 , "blah 1" );
|
||||
assert( s._connections[1].getDB( "test" ).foo.find().toArray().length > 0 , "blah 2" );
|
||||
assert.eq( 7 , s._connections[0].getDB( "test" ).foo.find().toArray().length +
|
||||
s._connections[1].getDB( "test" ).foo.find().toArray().length , "blah 3" );
|
||||
|
||||
assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B" );
|
||||
assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B" );
|
||||
|
||||
s.adminCommand( { split : "test.foo" , middle : { num : 2 } } );
|
||||
s.printChunks();
|
||||
|
||||
print( "* A" );
|
||||
|
||||
assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 1" );
|
||||
assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B 2" );
|
||||
print( "* B" );
|
||||
assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 3" );
|
||||
assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B 4" );
|
||||
|
||||
for ( var i=0; i<10; i++ ){
|
||||
print( "* C " + i );
|
||||
assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B " + i );
|
||||
}
|
||||
|
||||
s.stop();
|
11
s/config.cpp
11
s/config.cpp
@ -371,13 +371,18 @@ namespace mongo {
|
||||
}
|
||||
ourHostname = hn;
|
||||
|
||||
stringstream fullString;
|
||||
|
||||
set<string> hosts;
|
||||
for ( size_t i=0; i<configHosts.size(); i++ ){
|
||||
string host = configHosts[i];
|
||||
hosts.insert( getHost( host , false ) );
|
||||
configHosts[i] = getHost( host , true );
|
||||
if ( i > 0 )
|
||||
fullString << ",";
|
||||
fullString << configHosts[i];
|
||||
}
|
||||
|
||||
|
||||
for ( set<string>::iterator i=hosts.begin(); i!=hosts.end(); i++ ){
|
||||
string host = *i;
|
||||
bool ok = false;
|
||||
@ -393,8 +398,8 @@ namespace mongo {
|
||||
return false;
|
||||
}
|
||||
|
||||
uassert( 10188 , "can only hand 1 config db right now" , configHosts.size() == 1 );
|
||||
_primary = configHosts[0];
|
||||
_primary = fullString.str();
|
||||
log(1) << " config string : " << fullString.str() << endl;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -41,14 +41,12 @@ namespace mongo {
|
||||
void Strategy::doQuery( Request& r , string server ){
|
||||
try{
|
||||
ScopedDbConnection dbcon( server );
|
||||
DBClientBase &_c = dbcon.conn();
|
||||
DBClientBase &c = dbcon.conn();
|
||||
|
||||
checkShardVersion( _c , r.getns() );
|
||||
checkShardVersion( c , r.getns() );
|
||||
|
||||
// TODO: This will not work with Paired connections. Fix.
|
||||
DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
|
||||
Message response;
|
||||
bool ok = c.port().call( r.m(), response);
|
||||
bool ok = c.call( r.m(), response);
|
||||
|
||||
{
|
||||
QueryResult *qr = (QueryResult *) response.data;
|
||||
|
1
s/util.h
1
s/util.h
@ -35,6 +35,7 @@ namespace mongo {
|
||||
stringstream s;
|
||||
s << "StaleConfigException ns: " << ns << " " << msg;
|
||||
_msg = s.str();
|
||||
log(1) << _msg << endl;
|
||||
}
|
||||
|
||||
virtual ~StaleConfigException() throw(){}
|
||||
|
@ -226,6 +226,7 @@ namespace mongo {
|
||||
|
||||
auto_ptr<DBClientCursor> cursor = conn->query( ns , q , nToReturn , nToSkip , f.nFields() ? &f : 0 , slaveOk ? QueryOption_SlaveOk : 0 , batchSize );
|
||||
if ( ! cursor.get() ){
|
||||
log() << "query failed : " << ns << " " << q << " to: " << conn->toString() << endl;
|
||||
JS_ReportError( cx , "error doing query: failed" );
|
||||
return JS_FALSE;
|
||||
}
|
||||
|
@ -136,15 +136,24 @@ ShardingTest = function( testName , numServers , verboseLevel , numMongos , othe
|
||||
if ( ! otherParams )
|
||||
otherParams = {}
|
||||
this._connections = [];
|
||||
|
||||
if ( otherParams.sync && numServers < 3 )
|
||||
throw "if you want sync, you need at least 3 servers";
|
||||
|
||||
for ( var i=0; i<numServers; i++){
|
||||
var conn = startMongodTest( 30000 + i , testName + i );
|
||||
this._connections.push( conn );
|
||||
}
|
||||
|
||||
this._configDB = "localhost:30000";
|
||||
this._connections[0].getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } );
|
||||
|
||||
if ( otherParams.sync ){
|
||||
this._configDB = "localhost:30000,localhost:30001,localhost:30002";
|
||||
this._configConnection = new Mongo( this._configDB );
|
||||
this._configConnection.getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } );
|
||||
}
|
||||
else {
|
||||
this._configDB = "localhost:30000";
|
||||
this._connections[0].getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } );
|
||||
}
|
||||
|
||||
this._mongos = [];
|
||||
var startMongosPort = 31000;
|
||||
@ -195,6 +204,14 @@ ShardingTest.prototype.getOther = function( one ){
|
||||
return this._connections[0];
|
||||
}
|
||||
|
||||
ShardingTest.prototype.getFirstOther = function( one ){
|
||||
for ( var i=0; i<this._connections.length; i++ ){
|
||||
if ( this._connections[i] != one )
|
||||
return this._connections[i];
|
||||
}
|
||||
throw "impossible";
|
||||
}
|
||||
|
||||
ShardingTest.prototype.stop = function(){
|
||||
for ( var i=0; i<this._mongos.length; i++ ){
|
||||
stopMongoProgram( 31000 - i );
|
||||
|
@ -168,10 +168,14 @@ namespace mongo {
|
||||
~Message() {
|
||||
reset();
|
||||
}
|
||||
|
||||
|
||||
SockAddr from;
|
||||
MsgData *data;
|
||||
|
||||
int operation() const {
|
||||
return data->operation();
|
||||
}
|
||||
|
||||
Message& operator=(Message& r) {
|
||||
assert( data == 0 );
|
||||
data = r.data;
|
||||
|
Loading…
Reference in New Issue
Block a user