From 13b828f59247baa73f9389f3975cd6203ed36bc1 Mon Sep 17 00:00:00 2001 From: Eliot Horowitz Date: Mon, 2 Mar 2009 09:13:20 -0500 Subject: [PATCH] checkpoint for async message server --- SConstruct | 2 + util/message_server.h | 26 +++++++ util/message_server_asio.cpp | 134 +++++++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+) create mode 100644 util/message_server.h create mode 100644 util/message_server_asio.cpp diff --git a/SConstruct b/SConstruct index 62bfed26cb4..f0ddcc97d40 100644 --- a/SConstruct +++ b/SConstruct @@ -514,6 +514,8 @@ env.Program( "mongofiles" , allToolFiles + [ "tools/files.cpp" ] ) # mongos mongos = env.Program( "mongos" , commonFiles + coreDbFiles + Glob( "s/*.cpp" ) ) +blah = env.Program( "blah" , commonFiles + coreDbFiles + [ "util/message_server_asio.cpp"] ) + # c++ library clientLibName = str( env.Library( "mongoclient" , allClientFiles )[0] ) env.Library( "mongotestfiles" , commonFiles + coreDbFiles + serverOnlyFiles ) diff --git a/util/message_server.h b/util/message_server.h new file mode 100644 index 00000000000..376e78834f3 --- /dev/null +++ b/util/message_server.h @@ -0,0 +1,26 @@ +// message_server.h + +/* + abstract database server + async io core, worker thread system + */ + +#pragma once + +#include "stdafx.h" + +namespace mongo { + + class MessageServer { + public: + MessageServer( int port ) : _port( port ){} + virtual ~MessageServer(){} + + virtual void run() = 0; + + protected: + int _port; + }; + + MessageServer * createServer( int port ); +} diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp new file mode 100644 index 00000000000..04b4af7f961 --- /dev/null +++ b/util/message_server_asio.cpp @@ -0,0 +1,134 @@ +// message_server_asio.cpp + +#include +#include +#include +#include + +#include +#include + +#include "message.h" +#include "message_server.h" + +using namespace boost; +using namespace boost::asio; +using namespace boost::asio::ip; +using namespace std; + +namespace mongo { + + class MessageServerSession : public enable_shared_from_this { + public: + MessageServerSession( io_service& ioservice ) : _socket( ioservice ){ + + } + + tcp::socket& socket(){ + return _socket; + } + + void start(){ + cout << "MessageServerSession start from:" << _socket.remote_endpoint() << endl; + async_read( _socket , + buffer( &_inHeader , sizeof( _inHeader ) ) , + bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) ); + } + + void handleReadHeader( const boost::system::error_code& error ){ + cout << "got header\n" + << " len: " << _inHeader.len << "\n" + << " id : " << _inHeader.id << "\n" + << " op : " << _inHeader._operation << "\n"; + + char * raw = (char*)malloc( _inHeader.len ); + + MsgData * data = (MsgData*)raw; + memcpy( data , &_inHeader , sizeof( _inHeader ) ); + assert( data->len == _inHeader.len ); + + _cur.setData( data , true ); + async_read( _socket , + buffer( raw + sizeof( _inHeader ) , _inHeader.len - sizeof( _inHeader ) ) , + bind( &MessageServerSession::handleReadBody , shared_from_this() , placeholders::error ) ); + } + + void handleReadBody( const boost::system::error_code& error ){ + cout << "got whole message!" << endl; + } + + private: + tcp::socket _socket; + MsgData _inHeader; + Message _cur; + }; + + + class AsyncMessageServer : public MessageServer { + public: + AsyncMessageServer( int port ) : MessageServer( port ) , + _endpoint( tcp::v4() , port ) , + _acceptor( _ioservice , _endpoint ){ + _accept(); + } + virtual ~AsyncMessageServer(){ + + } + + void run(){ + cout << "AsyncMessageServer starting to listen on: " << _port << endl; + _ioservice.run(); + cout << "AsyncMessageServer done listening on: " << _port << endl; + } + + void handleAccept( shared_ptr session , + const boost::system::error_code& error ){ + if ( error ){ + cerr << "handleAccept error!" << endl; + return; + } + session->start(); + _accept(); + } + + private: + + void _accept(){ + shared_ptr session( new MessageServerSession( _ioservice ) ); + _acceptor.async_accept( session->socket() , + bind( &AsyncMessageServer::handleAccept, + this, + session, + boost::asio::placeholders::error ) + ); + } + + io_service _ioservice; + tcp::endpoint _endpoint; + tcp::acceptor _acceptor; + + }; + + + // --temp hacks-- + void dbexit( int rc , const char * why ){ + cerr << "dbserver.cpp::dbexit" << endl; + ::exit(rc); + } + + const char * curNs = ""; + + string getDbContext(){ + return "getDbContext bad"; + } + +} + +using namespace mongo; + +int main(){ + mongo::AsyncMessageServer s(9999); + s.run(); + + return 0; +}