0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

checkpoint for async message server

This commit is contained in:
Eliot Horowitz 2009-03-02 09:13:20 -05:00
parent c1207fddd5
commit 13b828f592
3 changed files with 162 additions and 0 deletions

View File

@ -514,6 +514,8 @@ env.Program( "mongofiles" , allToolFiles + [ "tools/files.cpp" ] )
# mongos
mongos = env.Program( "mongos" , commonFiles + coreDbFiles + Glob( "s/*.cpp" ) )
blah = env.Program( "blah" , commonFiles + coreDbFiles + [ "util/message_server_asio.cpp"] )
# c++ library
clientLibName = str( env.Library( "mongoclient" , allClientFiles )[0] )
env.Library( "mongotestfiles" , commonFiles + coreDbFiles + serverOnlyFiles )

26
util/message_server.h Normal file
View File

@ -0,0 +1,26 @@
// message_server.h
/*
abstract database server
async io core, worker thread system
*/
#pragma once
#include "stdafx.h"
namespace mongo {
class MessageServer {
public:
MessageServer( int port ) : _port( port ){}
virtual ~MessageServer(){}
virtual void run() = 0;
protected:
int _port;
};
MessageServer * createServer( int port );
}

View File

@ -0,0 +1,134 @@
// message_server_asio.cpp
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <iostream>
#include <vector>
#include "message.h"
#include "message_server.h"
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
using namespace std;
namespace mongo {
class MessageServerSession : public enable_shared_from_this<MessageServerSession> {
public:
MessageServerSession( io_service& ioservice ) : _socket( ioservice ){
}
tcp::socket& socket(){
return _socket;
}
void start(){
cout << "MessageServerSession start from:" << _socket.remote_endpoint() << endl;
async_read( _socket ,
buffer( &_inHeader , sizeof( _inHeader ) ) ,
bind( &MessageServerSession::handleReadHeader , shared_from_this() , placeholders::error ) );
}
void handleReadHeader( const boost::system::error_code& error ){
cout << "got header\n"
<< " len: " << _inHeader.len << "\n"
<< " id : " << _inHeader.id << "\n"
<< " op : " << _inHeader._operation << "\n";
char * raw = (char*)malloc( _inHeader.len );
MsgData * data = (MsgData*)raw;
memcpy( data , &_inHeader , sizeof( _inHeader ) );
assert( data->len == _inHeader.len );
_cur.setData( data , true );
async_read( _socket ,
buffer( raw + sizeof( _inHeader ) , _inHeader.len - sizeof( _inHeader ) ) ,
bind( &MessageServerSession::handleReadBody , shared_from_this() , placeholders::error ) );
}
void handleReadBody( const boost::system::error_code& error ){
cout << "got whole message!" << endl;
}
private:
tcp::socket _socket;
MsgData _inHeader;
Message _cur;
};
class AsyncMessageServer : public MessageServer {
public:
AsyncMessageServer( int port ) : MessageServer( port ) ,
_endpoint( tcp::v4() , port ) ,
_acceptor( _ioservice , _endpoint ){
_accept();
}
virtual ~AsyncMessageServer(){
}
void run(){
cout << "AsyncMessageServer starting to listen on: " << _port << endl;
_ioservice.run();
cout << "AsyncMessageServer done listening on: " << _port << endl;
}
void handleAccept( shared_ptr<MessageServerSession> session ,
const boost::system::error_code& error ){
if ( error ){
cerr << "handleAccept error!" << endl;
return;
}
session->start();
_accept();
}
private:
void _accept(){
shared_ptr<MessageServerSession> session( new MessageServerSession( _ioservice ) );
_acceptor.async_accept( session->socket() ,
bind( &AsyncMessageServer::handleAccept,
this,
session,
boost::asio::placeholders::error )
);
}
io_service _ioservice;
tcp::endpoint _endpoint;
tcp::acceptor _acceptor;
};
// --temp hacks--
void dbexit( int rc , const char * why ){
cerr << "dbserver.cpp::dbexit" << endl;
::exit(rc);
}
const char * curNs = "";
string getDbContext(){
return "getDbContext bad";
}
}
using namespace mongo;
int main(){
mongo::AsyncMessageServer s(9999);
s.run();
return 0;
}