0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

Sharding: finish write back on bad shard

This commit is contained in:
Eliot Horowitz 2009-04-18 21:55:34 -04:00
parent 98522371c4
commit 4a53d46653
4 changed files with 37 additions and 9 deletions

View File

@ -40,6 +40,7 @@ print( "* A" );
assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 1" );
s2.getDB( "test" ).foo.save( { num : 2 } );
//assert.eq( 8 , s2.getDB( "test" ).foo.find().toArray().length , "other B 2" );
sleep( 200 ); // give the write back time to happen
assert.eq( 8 , s2.getDB( "test" ).foo.find().toArray().length , "other B 2" );
s.stop();

View File

@ -46,7 +46,7 @@ namespace mongo {
string shardConfigServer;
boost::thread_specific_ptr<OID> clientServerIds;
map< string , BlockingQueue<int>* > clientQueues;
map< string , BlockingQueue<BSONObj>* > clientQueues;
unsigned long long getVersion( BSONElement e , string& errmsg ){
if ( e.eoo() ){
@ -93,12 +93,12 @@ namespace mongo {
dbtemprelease unlock;
if ( ! clientQueues[id.str()] )
clientQueues[id.str()] = new BlockingQueue<int>();
clientQueues[id.str()] = new BlockingQueue<BSONObj>();
int z = clientQueues[id.str()]->blockingPop();
log() << "WriteBackCommand got : " << z << endl;
BSONObj z = clientQueues[id.str()]->blockingPop();
log(1) << "WriteBackCommand got : " << z << endl;
result.append( "msg" , z );
result.append( "data" , z );
return true;
}
@ -153,7 +153,7 @@ namespace mongo {
clientServerIds.reset( nid );
if ( ! clientQueues[s] )
clientQueues[s] = new BlockingQueue<int>();
clientQueues[s] = new BlockingQueue<BSONObj>();
}
else if ( clientId != *clientServerIds.get() ){
errmsg = "server id has changed!";
@ -470,7 +470,14 @@ namespace mongo {
massert( "write with bad shard config and no server id!" , clientID );
log() << "got write with an old config - writing back" << endl;
clientQueues[clientID->str()]->push( 5 );
BSONObjBuilder b;
b.appendBool( "writeBack" , true );
b.append( "ns" , ns );
b.appendBinData( "msg" , m.data->len , bdtCustom , (char*)(m.data) );
log() << "writing back msg with len: " << m.data->len << " op: " << m.data->_operation << endl;
clientQueues[clientID->str()]->push( b.obj() );
return true;
}

View File

@ -50,6 +50,7 @@ namespace mongo {
}
_m.data->id = _id;
}
string Request::singleServerName(){

View File

@ -56,6 +56,7 @@ namespace mongo {
void Strategy::insert( string server , const char * ns , const BSONObj& obj ){
ScopedDbConnection dbcon( server );
checkShardVersion( dbcon.conn() , ns );
dbcon->insert( ns , obj );
dbcon.done();
}
@ -87,7 +88,25 @@ namespace mongo {
}
cout << "writebacklisten result: " << result << endl;
log(1) << "writebacklisten result: " << result << endl;
BSONObj data = result.getObjectField( "data" );
if ( data.getBoolField( "writeBack" ) ){
string ns = data["ns"].valuestrsafe();
int len;
Message m( (void*)data["msg"].binData( len ) , false );
massert( "invalid writeback message" , m.data->valid() );
grid.getDBConfig( ns )->getShardManager( ns , true );
Request r( m , 0 );
r.process();
}
else {
log() << "unknown writeBack result: " << result << endl;
}
conn.done();
secsToSleep = 0;