mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
client id work and make getLastError work non sharded-collections in sharding, but with connection pooling SHARDING-16
This commit is contained in:
parent
1811d1e47f
commit
335d92c443
12
db/db.cpp
12
db/db.cpp
@ -145,18 +145,6 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
class JniMessagingPort : public AbstractMessagingPort {
|
||||
public:
|
||||
JniMessagingPort(Message& _container) : container(_container) { }
|
||||
void reply(Message& received, Message& response, MSGID) {
|
||||
container = response;
|
||||
}
|
||||
void reply(Message& received, Message& response) {
|
||||
container = response;
|
||||
}
|
||||
Message & container;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
#include "lasterror.h"
|
||||
|
@ -487,9 +487,12 @@ namespace mongo {
|
||||
void reply(Message& received, Message& response) {
|
||||
container = response;
|
||||
}
|
||||
unsigned remotePort(){
|
||||
return 1;
|
||||
}
|
||||
Message & container;
|
||||
};
|
||||
|
||||
|
||||
/* a call from java/js to the database locally.
|
||||
|
||||
m - inbound message
|
||||
|
15
jstests/sharding/error1.js
Normal file
15
jstests/sharding/error1.js
Normal file
@ -0,0 +1,15 @@
|
||||
|
||||
s = new ShardingTest( "error1" , 2 , 1 , 1 );
|
||||
s.adminCommand( { enablesharding : "test" } );
|
||||
|
||||
// ---- simple getLastError ----
|
||||
|
||||
db = s.getDB( "test" );
|
||||
db.foo.insert( { _id : 1 } );
|
||||
assert.isnull( db.getLastError() , "gle 1" );
|
||||
db.foo.insert( { _id : 1 } );
|
||||
assert( db.getLastError() , "gle21" );
|
||||
|
||||
|
||||
// ----
|
||||
s.stop();
|
@ -582,10 +582,18 @@ namespace mongo {
|
||||
help << "check for an error on the last command executed";
|
||||
}
|
||||
CmdShardingGetLastError() : Command("getlasterror") { }
|
||||
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
|
||||
errmsg += "getlasterror not working yet for sharded environments";
|
||||
result << "ok" << 0;
|
||||
return false;
|
||||
virtual bool run(const char *nsraw, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
|
||||
string dbName = nsraw;
|
||||
dbName = dbName.substr( 0 , dbName.size() - 5 );
|
||||
|
||||
DBConfig * conf = grid.getDBConfig( dbName , false );
|
||||
|
||||
ScopedDbConnection conn( conf->getPrimary() );
|
||||
BSONObj res;
|
||||
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
|
||||
result.appendElements( res );
|
||||
conn.done();
|
||||
return ok;
|
||||
}
|
||||
} cmdGetLastError;
|
||||
|
||||
|
@ -45,12 +45,13 @@ namespace mongo {
|
||||
string getDBName( string ns ){
|
||||
return ns.substr( 0 , ns.size() - 5 );
|
||||
}
|
||||
|
||||
|
||||
bool passthrough( DBConfig * conf, const BSONObj& cmdObj , BSONObjBuilder& result ){
|
||||
ScopedDbConnection conn( conf->getPrimary() );
|
||||
BSONObj res;
|
||||
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
|
||||
result.appendElements( res );
|
||||
conn.done();
|
||||
return ok;
|
||||
}
|
||||
};
|
||||
|
10
s/server.cpp
10
s/server.cpp
@ -58,6 +58,13 @@ namespace mongo {
|
||||
// out() << " in our hostname with \"-grid\".\n";
|
||||
out() << endl;
|
||||
}
|
||||
|
||||
class ShardingConnectionHook : public DBConnectionHook {
|
||||
public:
|
||||
virtual void onCreate( DBClientBase * conn ){
|
||||
conn->simpleCommand( "admin" , 0 , "switchtoclienterrors" );
|
||||
}
|
||||
} shardingConnectionHook;
|
||||
|
||||
class ShardedMessageHandler : public MessageHandler {
|
||||
public:
|
||||
@ -65,6 +72,7 @@ namespace mongo {
|
||||
virtual void process( Message& m , AbstractMessagingPort* p ){
|
||||
Request r( m , p );
|
||||
try {
|
||||
setClientId( p->remotePort() << 16 );
|
||||
r.process();
|
||||
}
|
||||
catch ( DBException& e ){
|
||||
@ -149,6 +157,8 @@ int main(int argc, char* argv[], char *envp[] ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
pool.addHook( &shardingConnectionHook );
|
||||
|
||||
if ( argc <= 1 ) {
|
||||
usage( argv );
|
||||
return 3;
|
||||
|
@ -135,14 +135,6 @@ namespace mongo {
|
||||
char * _cur;
|
||||
};
|
||||
|
||||
MSGID NextMsgId;
|
||||
struct MsgStart {
|
||||
MsgStart() {
|
||||
NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
|
||||
assert(MsgDataHeaderSize == 16);
|
||||
}
|
||||
} msgstart;
|
||||
|
||||
class Ports {
|
||||
set<MessagingPort*>& ports;
|
||||
boost::mutex& m;
|
||||
@ -380,9 +372,7 @@ again:
|
||||
|
||||
void MessagingPort::say(Message& toSend, int responseTo) {
|
||||
mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
|
||||
MSGID msgid = NextMsgId;
|
||||
++NextMsgId;
|
||||
toSend.data->id = msgid;
|
||||
toSend.data->id = nextMessageId();
|
||||
toSend.data->responseTo = responseTo;
|
||||
|
||||
int x = -100;
|
||||
@ -418,9 +408,7 @@ again:
|
||||
}
|
||||
|
||||
// we're going to be storing this, so need to set it up
|
||||
MSGID msgid = NextMsgId;
|
||||
++NextMsgId;
|
||||
toSend.data->id = msgid;
|
||||
toSend.data->id = nextMessageId();
|
||||
toSend.data->responseTo = responseTo;
|
||||
|
||||
if ( ! piggyBackData )
|
||||
@ -429,14 +417,41 @@ again:
|
||||
piggyBackData->append( toSend );
|
||||
}
|
||||
|
||||
unsigned MessagingPort::remotePort(){
|
||||
return farEnd.getPort();
|
||||
}
|
||||
|
||||
MSGID NextMsgId;
|
||||
bool usingClientIds = 0;
|
||||
ThreadLocalInt clientId;
|
||||
|
||||
struct MsgStart {
|
||||
MsgStart() {
|
||||
NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
|
||||
assert(MsgDataHeaderSize == 16);
|
||||
}
|
||||
} msgstart;
|
||||
|
||||
MSGID nextMessageId(){
|
||||
MSGID msgid = NextMsgId;
|
||||
++NextMsgId;
|
||||
|
||||
if ( usingClientIds ){
|
||||
msgid = msgid & 0xFFFF;
|
||||
msgid = msgid | clientId.get();
|
||||
}
|
||||
|
||||
return msgid;
|
||||
}
|
||||
|
||||
bool doesOpGetAResponse( int op ){
|
||||
return op == dbQuery || op == dbGetMore;
|
||||
}
|
||||
|
||||
|
||||
void setClientId( int id ){
|
||||
usingClientIds = true;
|
||||
id = id & 0xFFFF0000;
|
||||
massert( "invalid id" , id );
|
||||
clientId.reset( id );
|
||||
}
|
||||
} // namespace mongo
|
||||
|
@ -48,16 +48,18 @@ namespace mongo {
|
||||
virtual ~AbstractMessagingPort() { }
|
||||
virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available
|
||||
virtual void reply(Message& received, Message& response) = 0;
|
||||
|
||||
virtual unsigned remotePort() = 0 ;
|
||||
};
|
||||
|
||||
class MessagingPort : public AbstractMessagingPort {
|
||||
public:
|
||||
MessagingPort(int sock, SockAddr& farEnd);
|
||||
MessagingPort();
|
||||
~MessagingPort();
|
||||
virtual ~MessagingPort();
|
||||
|
||||
void shutdown();
|
||||
|
||||
|
||||
bool connect(SockAddr& farEnd);
|
||||
|
||||
/* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's.
|
||||
@ -71,6 +73,7 @@ namespace mongo {
|
||||
|
||||
void piggyBack( Message& toSend , int responseTo = -1 );
|
||||
|
||||
virtual unsigned remotePort();
|
||||
private:
|
||||
int sock;
|
||||
PiggyBackData * piggyBackData;
|
||||
@ -199,4 +202,5 @@ namespace mongo {
|
||||
|
||||
MSGID nextMessageId();
|
||||
|
||||
void setClientId( int id );
|
||||
} // namespace mongo
|
||||
|
Loading…
Reference in New Issue
Block a user