mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
Merge branch 'master' of git@github.com:mongodb/mongo
This commit is contained in:
commit
25cbcbcba6
@ -38,8 +38,9 @@ namespace mongo {
|
||||
#pragma pack(1)
|
||||
struct QueryResult : public MsgData {
|
||||
enum ResultFlagType {
|
||||
ResultFlag_CursorNotFound = 1, /* returned, with zero results, when getMore is called but the cursor id is not valid at the server. */
|
||||
ResultFlag_ErrSet = 2 /* { $err : ... } is being returned */
|
||||
ResultFlag_CursorNotFound = 1, /* returned, with zero results, when getMore is called but the cursor id is not valid at the server. */
|
||||
ResultFlag_ErrSet = 2, /* { $err : ... } is being returned */
|
||||
ResultFlag_ShardConfigStale = 4 /* have to update config from the server, usually $err is also set */
|
||||
};
|
||||
|
||||
long long cursorId;
|
||||
|
30
jstests/sharding/shard3.js
Normal file
30
jstests/sharding/shard3.js
Normal file
@ -0,0 +1,30 @@
|
||||
// shard3.js
|
||||
|
||||
s = new ShardingTest( "shard3" , 2 , 50 , 2 );
|
||||
|
||||
s2 = s._mongos[1];
|
||||
|
||||
s.adminCommand( { partition : "test" } );
|
||||
s.adminCommand( { shard : "test.foo" , key : { num : 1 } } );
|
||||
|
||||
s.getDB( "test" ).foo.save( { num : 1 } );
|
||||
s.getDB( "test" ).foo.save( { num : 2 } );
|
||||
s.getDB( "test" ).foo.save( { num : 3 } );
|
||||
|
||||
assert.eq( 3 , s.getDB( "test" ).foo.find().toArray().length , "normal A" );
|
||||
assert.eq( 3 , s2.getDB( "test" ).foo.find().toArray().length , "other A" );
|
||||
|
||||
s.adminCommand( { split : "test.foo" , middle : { num : 2 } } );
|
||||
s.adminCommand( { moveshard : "test.foo" , find : { num : 3 } , to : s.getOther( s.getServer( "test" ) ).name } );
|
||||
|
||||
assert( s._connections[0].getDB( "test" ).foo.find().toArray().length > 0 , "blah 1" );
|
||||
assert( s._connections[1].getDB( "test" ).foo.find().toArray().length > 0 , "blah 2" );
|
||||
assert.eq( 3 , s._connections[0].getDB( "test" ).foo.find().toArray().length +
|
||||
s._connections[1].getDB( "test" ).foo.find().toArray().length , "blah 3" );
|
||||
|
||||
assert.eq( 3 , s.getDB( "test" ).foo.find().toArray().length , "normal B" );
|
||||
print( "HI" );
|
||||
assert.eq( 3 , s2.getDB( "test" ).foo.find().toArray().length , "other B" );
|
||||
|
||||
|
||||
s.stop();
|
@ -365,7 +365,9 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
errmsg = "your version is too old!";
|
||||
errmsg = "your version is too old. ";
|
||||
errmsg += " ns: " + ns;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -390,7 +392,7 @@ namespace mongo {
|
||||
}
|
||||
|
||||
QueryResult *qr = (QueryResult*)b.buf();
|
||||
qr->resultFlags() = QueryResult::ResultFlag_ErrSet;
|
||||
qr->resultFlags() = QueryResult::ResultFlag_ErrSet | QueryResult::ResultFlag_ShardConfigStale;
|
||||
qr->len = b.len();
|
||||
qr->setOperation( opReply );
|
||||
qr->cursorId = 0;
|
||||
|
@ -34,15 +34,20 @@ namespace mongo {
|
||||
Request::Request( Message& m, AbstractMessagingPort* p ) : _m(m) , _d( m ) , _p(p){
|
||||
assert( _d.getns() );
|
||||
_id = _m.data->id;
|
||||
|
||||
reset();
|
||||
}
|
||||
|
||||
void Request::reset( bool reload ){
|
||||
_config = grid.getDBConfig( getns() );
|
||||
|
||||
if ( _config->sharded( getns() ) ){
|
||||
_shardInfo = _config->getShardManager( getns() );
|
||||
_shardInfo = _config->getShardManager( getns() , reload );
|
||||
uassert( (string)"no shard info for: " + getns() , _shardInfo );
|
||||
}
|
||||
else {
|
||||
_shardInfo = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
string Request::singleServerName(){
|
||||
@ -56,7 +61,7 @@ namespace mongo {
|
||||
return s;
|
||||
}
|
||||
|
||||
void Request::process(){
|
||||
void Request::process( int attempt ){
|
||||
|
||||
int op = _m.data->operation();
|
||||
assert( op > dbMsg );
|
||||
@ -93,7 +98,18 @@ namespace mongo {
|
||||
}
|
||||
|
||||
if ( op == dbQuery ) {
|
||||
s->queryOp( *this );
|
||||
try {
|
||||
s->queryOp( *this );
|
||||
}
|
||||
catch ( StaleConfigException& staleConfig ){
|
||||
log() << "got stale config exception ns:" << getns() << " attempt: " << attempt << endl;
|
||||
uassert( "too many attempts to update config, failing" , attempt < 5 );
|
||||
|
||||
sleep( attempt );
|
||||
reset( true );
|
||||
process( attempt + 1 );
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if ( op == dbGetMore ) {
|
||||
s->getMore( *this );
|
||||
|
12
s/request.h
12
s/request.h
@ -57,9 +57,12 @@ namespace mongo {
|
||||
DbMessage& d(){ return _d; }
|
||||
AbstractMessagingPort* p(){ return _p; }
|
||||
|
||||
void process();
|
||||
|
||||
void process( int attempt = 0 );
|
||||
|
||||
private:
|
||||
|
||||
void reset( bool reload=true );
|
||||
|
||||
Message& _m;
|
||||
DbMessage _d;
|
||||
AbstractMessagingPort* _p;
|
||||
@ -68,7 +71,10 @@ namespace mongo {
|
||||
DBConfig * _config;
|
||||
ShardManager * _shardInfo;
|
||||
};
|
||||
|
||||
|
||||
class StaleConfigException : public std::exception {
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
#include "strategy.h"
|
||||
|
@ -27,11 +27,19 @@ namespace mongo {
|
||||
DBClientBase &_c = dbcon.conn();
|
||||
|
||||
checkShardVersion( _c , r.getns() );
|
||||
|
||||
|
||||
// TODO: This will not work with Paired connections. Fix.
|
||||
DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
|
||||
Message response;
|
||||
bool ok = c.port().call( r.m(), response);
|
||||
|
||||
{
|
||||
QueryResult *qr = (QueryResult *) response.data;
|
||||
if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){
|
||||
throw StaleConfigException();
|
||||
}
|
||||
}
|
||||
|
||||
uassert("mongos: error calling db", ok);
|
||||
r.reply( response );
|
||||
dbcon.done();
|
||||
|
Loading…
Reference in New Issue
Block a user