From 4a53d4665312c6dad801bb9b3652b56c8da3a3a4 Mon Sep 17 00:00:00 2001 From: Eliot Horowitz Date: Sat, 18 Apr 2009 21:55:34 -0400 Subject: [PATCH] Sharding: finish write back on bad shard --- jstests/sharding/shard5.js | 3 ++- s/d_logic.cpp | 21 ++++++++++++++------- s/request.cpp | 1 + s/strategy.cpp | 21 ++++++++++++++++++++- 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/jstests/sharding/shard5.js b/jstests/sharding/shard5.js index 02da3a9054d..0d519c068ca 100644 --- a/jstests/sharding/shard5.js +++ b/jstests/sharding/shard5.js @@ -40,6 +40,7 @@ print( "* A" ); assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 1" ); s2.getDB( "test" ).foo.save( { num : 2 } ); -//assert.eq( 8 , s2.getDB( "test" ).foo.find().toArray().length , "other B 2" ); +sleep( 200 ); // give the write back time to happen +assert.eq( 8 , s2.getDB( "test" ).foo.find().toArray().length , "other B 2" ); s.stop(); diff --git a/s/d_logic.cpp b/s/d_logic.cpp index cb859f2b30c..75e42bf3b4f 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -46,7 +46,7 @@ namespace mongo { string shardConfigServer; boost::thread_specific_ptr clientServerIds; - map< string , BlockingQueue* > clientQueues; + map< string , BlockingQueue* > clientQueues; unsigned long long getVersion( BSONElement e , string& errmsg ){ if ( e.eoo() ){ @@ -93,12 +93,12 @@ namespace mongo { dbtemprelease unlock; if ( ! clientQueues[id.str()] ) - clientQueues[id.str()] = new BlockingQueue(); + clientQueues[id.str()] = new BlockingQueue(); - int z = clientQueues[id.str()]->blockingPop(); - log() << "WriteBackCommand got : " << z << endl; + BSONObj z = clientQueues[id.str()]->blockingPop(); + log(1) << "WriteBackCommand got : " << z << endl; - result.append( "msg" , z ); + result.append( "data" , z ); return true; } @@ -153,7 +153,7 @@ namespace mongo { clientServerIds.reset( nid ); if ( ! clientQueues[s] ) - clientQueues[s] = new BlockingQueue(); + clientQueues[s] = new BlockingQueue(); } else if ( clientId != *clientServerIds.get() ){ errmsg = "server id has changed!"; @@ -470,7 +470,14 @@ namespace mongo { massert( "write with bad shard config and no server id!" , clientID ); log() << "got write with an old config - writing back" << endl; - clientQueues[clientID->str()]->push( 5 ); + + BSONObjBuilder b; + b.appendBool( "writeBack" , true ); + b.append( "ns" , ns ); + b.appendBinData( "msg" , m.data->len , bdtCustom , (char*)(m.data) ); + log() << "writing back msg with len: " << m.data->len << " op: " << m.data->_operation << endl; + clientQueues[clientID->str()]->push( b.obj() ); + return true; } diff --git a/s/request.cpp b/s/request.cpp index e73eba23d4c..ff031fdb70c 100644 --- a/s/request.cpp +++ b/s/request.cpp @@ -50,6 +50,7 @@ namespace mongo { } _m.data->id = _id; + } string Request::singleServerName(){ diff --git a/s/strategy.cpp b/s/strategy.cpp index 4c895dc4ba8..80c5c945404 100644 --- a/s/strategy.cpp +++ b/s/strategy.cpp @@ -56,6 +56,7 @@ namespace mongo { void Strategy::insert( string server , const char * ns , const BSONObj& obj ){ ScopedDbConnection dbcon( server ); + checkShardVersion( dbcon.conn() , ns ); dbcon->insert( ns , obj ); dbcon.done(); } @@ -87,7 +88,25 @@ namespace mongo { } - cout << "writebacklisten result: " << result << endl; + log(1) << "writebacklisten result: " << result << endl; + + BSONObj data = result.getObjectField( "data" ); + if ( data.getBoolField( "writeBack" ) ){ + string ns = data["ns"].valuestrsafe(); + + int len; + + Message m( (void*)data["msg"].binData( len ) , false ); + massert( "invalid writeback message" , m.data->valid() ); + + grid.getDBConfig( ns )->getShardManager( ns , true ); + + Request r( m , 0 ); + r.process(); + } + else { + log() << "unknown writeBack result: " << result << endl; + } conn.done(); secsToSleep = 0;