0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 00:56:44 +01:00

Merge branch 'master' of git@github.com:mongodb/mongo

This commit is contained in:
Aaron 2009-03-02 13:45:26 -05:00
commit 67d39935dc
5 changed files with 115 additions and 78 deletions

View File

@ -134,7 +134,8 @@ if os.path.exists( "util/processinfo_" + os.sys.platform + ".cpp" ):
else:
commonFiles += [ "util/processinfo_none.cpp" ]
coreDbFiles = Split( "" )
coreDbFiles = []
coreServerFiles = [ "util/message_server_port.cpp" , "util/message_server_asio.cpp" ]
serverOnlyFiles = Split( "db/query.cpp db/introspect.cpp db/btree.cpp db/clientcursor.cpp db/javajs.cpp db/tests.cpp db/repl.cpp db/btreecursor.cpp db/cloner.cpp db/namespace.cpp db/matcher.cpp db/dbcommands.cpp db/dbeval.cpp db/dbwebserver.cpp db/dbinfo.cpp db/dbhelpers.cpp db/instance.cpp db/pdfile.cpp db/cursor.cpp db/security_commands.cpp db/security.cpp util/miniwebserver.cpp db/storage.cpp db/reccache.cpp db/queryoptimizer.cpp" )
@ -386,6 +387,9 @@ def doConfigure( myenv , needJava=True , needPcre=True , shell=False ):
else:
Exit(1)
if conf.CheckCXXHeader( "boost/asio.hpp" ):
myenv.Append( CPPDEFINES=[ "USE_ASIO" ] )
for b in boostLibs:
l = "boost_" + b
myCheckLib( [ l + "-mt" , l ] , release or not shell)
@ -512,9 +516,7 @@ env.Program( "mongoimportjson" , allToolFiles + [ "tools/importJSON.cpp" ] )
env.Program( "mongofiles" , allToolFiles + [ "tools/files.cpp" ] )
# mongos
mongos = env.Program( "mongos" , commonFiles + coreDbFiles + Glob( "s/*.cpp" ) )
#blah = env.Program( "blah" , commonFiles + coreDbFiles + [ "util/message_server_asio.cpp"] )
mongos = env.Program( "mongos" , commonFiles + coreDbFiles + coreServerFiles + Glob( "s/*.cpp" ) )
# c++ library
clientLibName = str( env.Library( "mongoclient" , allClientFiles )[0] )

View File

@ -0,0 +1,10 @@
s = new ShardingTest( "passthrough1" , 2 )
db = s.getDB( "test" );
db.foo.insert( { num : 1 , name : "eliot" } );
db.foo.insert( { num : 2 , name : "sara" } );
db.foo.insert( { num : -1 , name : "joe" } );
assert.eq( 3 , db.foo.find().length() );
s.stop();

View File

@ -20,6 +20,7 @@
#include "../util/message.h"
#include "../util/unittest.h"
#include "../client/connpool.h"
#include "../util/message_server.h"
#include "server.h"
#include "request.h"
@ -47,23 +48,12 @@ namespace mongo {
out() << " in our hostname with \"-grid\".\n";
out() << endl;
}
MessagingPort *grab = 0;
void _dbGridConnThread() {
MessagingPort& dbMsgPort = *grab;
grab = 0;
Message m;
while ( 1 ) {
m.reset();
if ( !dbMsgPort.recv(m) ) {
log() << "end connection " << dbMsgPort.farEnd.toString() << endl;
dbMsgPort.shutdown();
break;
}
Request r( m , &dbMsgPort );
class ShardedMessageHandler : public MessageHandler {
public:
virtual ~ShardedMessageHandler(){}
virtual void process( Message& m , AbstractMessagingPort* p ){
Request r( m , p );
try {
r.process();
}
@ -71,40 +61,19 @@ namespace mongo {
log() << "UserException: " << e.what() << endl;
if ( r.expectResponse() ){
BSONObj err = BSON( "$err" << e.what() );
replyToQuery( QueryResult::ResultFlag_ErrSet, &dbMsgPort , m , err );
replyToQuery( QueryResult::ResultFlag_ErrSet, p , m , err );
}
}
}
}
void dbGridConnThread() {
MessagingPort *p = grab;
try {
_dbGridConnThread();
} catch ( ... ){
problem() << "uncaught exception in dbgridconnthread, closing connection" << endl;
delete p;
}
}
class DbGridListener : public Listener {
public:
DbGridListener(int p) : Listener(p) { }
virtual void accepted(MessagingPort *mp) {
assert( grab == 0 );
grab = mp;
boost::thread thr(dbGridConnThread);
while ( grab )
sleepmillis(1);
}
};
void start() {
log() << "waiting for connections on port " << port << "..." << endl;
DbGridListener l(port);
l.listen();
//DbGridListener l(port);
//l.listen();
ShardedMessageHandler handler;
MessageServer * server = createServer( port , &handler );
server->run();
}
} // namespace mongo

View File

@ -1,5 +1,7 @@
// message_server_asio.cpp
#ifdef USE_ASIO
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
@ -30,17 +32,10 @@ namespace mongo {
void start(){
cout << "MessageServerSession start from:" << _socket.remote_endpoint() << endl;
async_read( _socket ,
buffer( &_inHeader , sizeof( _inHeader ) ) ,
bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) );
_startHeaderRead();
}
void handleReadHeader( const boost::system::error_code& error ){
cout << "got header\n"
<< " len: " << _inHeader.len << "\n"
<< " id : " << _inHeader.id << "\n"
<< " op : " << _inHeader._operation << "\n";
if ( ! _inHeader.valid() ){
cerr << " got invalid header from: " << _socket.remote_endpoint() << " closing connected" << endl;
return;
@ -52,7 +47,7 @@ namespace mongo {
memcpy( data , &_inHeader , sizeof( _inHeader ) );
assert( data->len == _inHeader.len );
uassert( "_cur not empty!" , ! _cur.data );
uassert( "_cur not empty! pipelining requests not supported" , ! _cur.data );
_cur.setData( data , true );
async_read( _socket ,
@ -61,18 +56,29 @@ namespace mongo {
}
void handleReadBody( const boost::system::error_code& error ){
_replyCalled = false;
_handler->process( _cur , this );
if ( ! _replyCalled ){
_cur.reset();
_startHeaderRead();
}
}
void handleWriteDone( const boost::system::error_code& error ){
_cur.reset();
_replyCalled = false;
_startHeaderRead();
}
virtual void reply( Message& received, Message& response ){
reply( received , response , received.data->id );
}
virtual void reply( Message& query , Message& toSend, MSGID responseTo ){
_replyCalled = true;
toSend.data->id = nextMessageId();
toSend.data->responseTo = responseTo;
uassert( "pipelining requests doesn't work yet" , query.data->id == _cur.data->id );
@ -83,10 +89,19 @@ namespace mongo {
private:
void _startHeaderRead(){
async_read( _socket ,
buffer( &_inHeader , sizeof( _inHeader ) ) ,
bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) );
}
MessageHandler * _handler;
tcp::socket _socket;
MsgData _inHeader;
Message _cur;
bool _replyCalled;
};
@ -133,27 +148,11 @@ namespace mongo {
tcp::endpoint _endpoint;
tcp::acceptor _acceptor;
};
// --temp hacks--
void dbexit( int rc , const char * why ){
cerr << "dbserver.cpp::dbexit" << endl;
::exit(rc);
}
const char * curNs = "";
string getDbContext(){
return "getDbContext bad";
}
MessageServer * createServer( int port , MessageHandler * handler ){
return new AsyncMessageServer( port , handler );
}
}
using namespace mongo;
int main(){
mongo::AsyncMessageServer s(9999,0);
s.run();
return 0;
}
#endif

View File

@ -0,0 +1,57 @@
// message_server_port.cpp
#ifndef USE_ASIO
#include "message.h"
#include "message_server.h"
namespace mongo {
class PortMessageServer : public MessageServer , public Listener {
public:
PortMessageServer( int port , MessageHandler * handler ) :
MessageServer( port , handler ) ,
Listener( port ){
}
void threadRun( MessagingPort * p ){
assert( p );
Message m;
try {
while ( 1 ){
m.reset();
if ( ! p->recv(m) ) {
log() << "end connection " << p->farEnd.toString() << endl;
p->shutdown();
break;
}
_handler->process( m , p );
}
}
catch ( ... ){
problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl;
delete p;
}
}
virtual void accepted(MessagingPort * p) {
boost::thread thr( bind( &PortMessageServer::threadRun , this , p ) );
}
void run(){
listen();
}
};
MessageServer * createServer( int port , MessageHandler * handler ){
return new PortMessageServer( port , handler );
}
}
#endif