/** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ /* message todo: authenticate; encrypt? */ #include "stdafx.h" #include "message.h" #include #include "../util/goodies.h" #include namespace mongo { // if you want trace output: #define mmm(x) #ifdef MSG_NOSIGNAL const int portSendFlags = MSG_NOSIGNAL; #else const int portSendFlags = 0; #endif /* listener ------------------------------------------------------------------- */ void Listener::listen() { SockAddr me(port); int sock = socket(AF_INET, SOCK_STREAM, 0); if ( sock == INVALID_SOCKET ) { log() << "ERROR: listen(): invalid socket? " << errno << endl; return; } prebindOptions( sock ); if ( ::bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) { log() << "listen(): bind() failed errno:" << errno << endl; if ( errno == 98 ) log() << "98 == addr already in use" << endl; closesocket(sock); return; } if ( ::listen(sock, 128) != 0 ) { log() << "listen(): listen() failed " << errno << endl; closesocket(sock); return; } SockAddr from; while ( 1 ) { int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); if ( s < 0 ) { log() << "Listener: accept() returns " << s << " errno:" << errno << endl; continue; } disableNagle(s); log() << "connection accepted from " << from.toString() << endl; accepted( new MessagingPort(s, from) ); } } /* messagingport -------------------------------------------------------------- */ class PiggyBackData { public: PiggyBackData( MessagingPort * port ) { _port = port; _buf = new char[1300]; _cur = _buf; } ~PiggyBackData() { flush(); delete( _cur ); } void append( Message& m ) { assert( m.data->len <= 1300 ); if ( len() + m.data->len > 1300 ) flush(); memcpy( _cur , m.data , m.data->len ); _cur += m.data->len; } int flush() { if ( _buf == _cur ) return 0; int x = ::send( _port->sock , _buf , len() , 0 ); _cur = _buf; return x; } int len() { return _cur - _buf; } private: MessagingPort* _port; char * _buf; char * _cur; }; MSGID NextMsgId; struct MsgStart { MsgStart() { NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis(); assert(MsgDataHeaderSize == 16); } } msgstart; // we "new" this so it guaranteed to still be around when other automatic global vars // are being destructed during termination. set& ports = *(new set()); void closeAllSockets() { for ( set::iterator i = ports.begin(); i != ports.end(); i++ ) (*i)->shutdown(); } MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far) { ports.insert(this); } MessagingPort::MessagingPort() { ports.insert(this); sock = -1; piggyBackData = 0; } void MessagingPort::shutdown() { if ( sock >= 0 ) { closesocket(sock); sock = -1; } } MessagingPort::~MessagingPort() { if ( piggyBackData ) delete( piggyBackData ); shutdown(); ports.erase(this); } } // namespace mongo #include "../util/background.h" namespace mongo { class ConnectBG : public BackgroundJob { public: int sock; int res; SockAddr farEnd; void run() { res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); } }; bool MessagingPort::connect(SockAddr& _far) { farEnd = _far; sock = socket(AF_INET, SOCK_STREAM, 0); if ( sock == INVALID_SOCKET ) { log() << "ERROR: connect(): invalid socket? " << errno << endl; return false; } #if 0 long fl = fcntl(sock, F_GETFL, 0); assert( fl >= 0 ); fl |= O_NONBLOCK; fcntl(sock, F_SETFL, fl); int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); if ( res ) { if ( errno == EINPROGRESS ) //log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl; closesocket(sock); sock = -1; return false; } #endif ConnectBG bg; bg.sock = sock; bg.farEnd = farEnd; bg.go(); // int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize); if ( bg.wait(5000) ) { if ( bg.res ) { closesocket(sock); sock = -1; return false; } } else { // time out the connect closesocket(sock); sock = -1; bg.wait(); // so bg stays in scope until bg thread terminates return false; } disableNagle(sock); #ifdef SO_NOSIGPIPE // osx const int one = 1; setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); #endif return true; } bool MessagingPort::recv(Message& m) { again: mmm( out() << "* recv() sock:" << this->sock << endl; ) int len = -1; char *lenbuf = (char *) &len; int lft = 4; while ( 1 ) { int x = ::recv(sock, lenbuf, lft, 0); if ( x == 0 ) { DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; m.reset(); return false; } if ( x < 0 ) { log() << "MessagingPort recv() error " << errno << ' ' << farEnd.toString()<=len); MsgData *md = (MsgData *) malloc(z); md->len = len; if ( len <= 0 ) { out() << "got a length of " << len << ", something is wrong" << endl; return false; } char *p = (char *) &md->id; int left = len -4; while ( 1 ) { int x = ::recv(sock, p, left, 0); if ( x == 0 ) { DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; m.reset(); return false; } if ( x < 0 ) { log() << "MessagingPort recv() error " << errno << ' ' << farEnd.toString() << endl; m.reset(); return false; } left -= x; p += x; if ( left <= 0 ) break; } m.setData(md, true); return true; } void MessagingPort::reply(Message& received, Message& response) { say(/*received.from, */response, received.data->id); } void MessagingPort::reply(Message& received, Message& response, MSGID responseTo) { say(/*received.from, */response, responseTo); } bool MessagingPort::call(Message& toSend, Message& response) { mmm( out() << "*call()" << endl; ) MSGID old = toSend.data->id; say(/*to,*/ toSend); while ( 1 ) { bool ok = recv(response); if ( !ok ) return false; //out() << "got response: " << response.data->responseTo << endl; if ( response.data->responseTo == toSend.data->id ) break; out() << "********************" << endl; out() << "ERROR: MessagingPort::call() wrong id got:" << (unsigned)response.data->responseTo << " expect:" << (unsigned)toSend.data->id << endl; out() << " old:" << (unsigned)old << endl; out() << " response msgid:" << (unsigned)response.data->id << endl; out() << " response len: " << (unsigned)response.data->len << endl; assert(false); response.reset(); } mmm( out() << "*call() end" << endl; ) return true; } void MessagingPort::say(Message& toSend, int responseTo) { mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) MSGID msgid = NextMsgId; ++NextMsgId; toSend.data->id = msgid; toSend.data->responseTo = responseTo; int x = -100; if ( piggyBackData && piggyBackData->len() ) { mmm( out() << "* have piggy back" << endl; ) if ( ( piggyBackData->len() + toSend.data->len ) > 1300 ) { // won't fit in a packet - so just send it off piggyBackData->flush(); } else { piggyBackData->append( toSend ); x = piggyBackData->flush(); } } if ( x == -100 ) x = ::send(sock, (char*)toSend.data, toSend.data->len , portSendFlags ); if ( x <= 0 ) { log() << "MessagingPort say send() error " << errno << ' ' << farEnd.toString() << endl; throw SocketException(); } } void MessagingPort::piggyBack( Message& toSend , int responseTo ) { if ( toSend.data->len > 1300 ) { // not worth saving because its almost an entire packet say( toSend ); return; } // we're going to be storing this, so need to set it up MSGID msgid = NextMsgId; ++NextMsgId; toSend.data->id = msgid; toSend.data->responseTo = responseTo; if ( ! piggyBackData ) piggyBackData = new PiggyBackData( this ); piggyBackData->append( toSend ); } MSGID nextMessageId(){ MSGID msgid = NextMsgId; ++NextMsgId; return msgid; } } // namespace mongo