0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-11-30 17:10:48 +01:00
mongodb/grid/protorecv.cpp
2007-12-02 11:33:59 -05:00

387 lines
9.8 KiB
C++

// protorecv.cpp
#include "stdafx.h"
#include "protocol.h"
#include "boost/thread.hpp"
#include "../util/goodies.h"
#include "../util/sock.h"
#include "protoimpl.h"
#include "../db/introspect.h"
boost::mutex biglock;
boost::mutex coutmutex;
boost::mutex threadStarterMutex;
boost::condition threadActivate; // creating a new thread, grabbing threadUseThisOne
ProtocolConnection *threadUseThisOne = 0;
void receiverThread();
map<SockAddr,ProtocolConnection*> firstPCForThisAddress;
/* todo: eventually support multiple listeners on diff ports. not
bothering yet.
*/
ProtocolConnection *any = 0;
EndPointToPC pcMap;
class GeneralInspector : public SingleResultObjCursor {
Cursor* clone() { return new GeneralInspector(*this); }
void fill() {
b.append("version", "1.0.0.1");
b.append("versionDesc", "none");
b.append("nConnections", pcMap.size());
}
public:
GeneralInspector() { reg("intr.general"); }
} _geninspectorproto;
#include <signal.h>
/* our version of netstat */
void sighandler(int x) {
cout << "ProtocolConnections:" << endl;
lock lk(biglock);
EndPointToPC::iterator it = pcMap.begin();
while( it != pcMap.end() ) {
cout << " conn " << it->second->toString() << endl;
it++;
}
cout << "any: ";
if( any ) cout << any->toString();
cout << "\ndone" << endl;
}
struct SetSignal {
SetSignal() {
#if !defined(_WIN32)
signal(SIGUSR2, sighandler);
#endif
}
} setSignal;
string ProtocolConnection::toString() {
stringstream out;
out << myEnd.toString() << " <> " <<
farEnd.toString() << " rcvd.size:" << cr.received.size() <<
" pndngMsgs.size:" << cr.pendingMessages.size() <<
" pndngSnd.size:" << cs.pendingSend.size();
return out.str();
}
ProtocolConnection::~ProtocolConnection() {
cout << ".warning: ~ProtocolConnection() not implemented (leaks mem etc)" << endl;
if( any == this )
any = 0;
}
void ProtocolConnection::shutdown() {
ptrace( cout << ".shutdown()" << endl; )
if( acceptAnyChannel() || first )
return;
ptrace( cout << ". to:" << to.toString() << endl; )
__sendRESET(this);
}
inline ProtocolConnection::ProtocolConnection(ProtocolConnection& par, EndPoint& _to) :
udpConnection(par.udpConnection), myEnd(par.myEnd), cs(*this), cr(*this)
{
parent = &par;
farEnd = _to;
first = true;
// todo: LOCK
assert(pcMap.count(farEnd) == 0);
// pcMap[myEnd] = this;
}
inline bool ProtocolConnection::acceptAnyChannel() const {
return myEnd.channel == MessagingPort::ANYCHANNEL;
}
inline void ProtocolConnection::init() {
first = true;
lock lk(biglock);
lock tslk(threadStarterMutex);
if( acceptAnyChannel() ) {
assert( any == 0 );
any = this;
}
else {
pcMap[farEnd] = this;
}
if( firstPCForThisAddress.count(myEnd.sa) == 0 ) {
firstPCForThisAddress[myEnd.sa] = this;
// need a receiver thread. one per port # we listen on. shared by channels.
boost::thread receiver(receiverThread);
threadUseThisOne = this;
threadActivate.notify_one();
return;
}
}
ProtocolConnection::ProtocolConnection(UDPConnection& c, EndPoint& e, SockAddr *_farEnd) :
udpConnection(c), myEnd(e), cs(*this), cr(*this)
{
parent = this;
if( _farEnd ) {
farEnd.channel = myEnd.channel;
farEnd.sa = *_farEnd;
}
init();
}
/* find message for fragment */
MR* CR::getPendingMsg(F *fr) {
MR *m;
map<int,MR*>::iterator i = pendingMessages.find(fr->__msgid());
if( i == pendingMessages.end() ) {
if( pendingMessages.size() > 20 ) {
cout << ".warning: pendingMessages.size()>20, ignoring msg until we dequeue" << endl;
return 0;
}
m = new MR(&pc, fr->__msgid(), pc.farEnd);
pendingMessages[fr->__msgid()] = m;
}
else
m = i->second;
return m;
/*
MR*& m = pendingMessages[fr->__msgid()];
if( m == 0 )
m = new MR(&pc, fr->__msgid(), fromAddr);
return m;
*/
}
void MR::removeFromReceivingList() {
pc.cr.pendingMessages.erase(msgid);
}
MR::MR(ProtocolConnection *_pc, MSGID _msgid, EndPoint& _from) :
pc(*_pc), msgid(_msgid), from(_from), nReceived(0)
{
messageLenExpected = nExpected = -1;
f.push_back(0);
}
void MR::reportMissings(bool reportAll) {
vector<short> missing;
unsigned t = curTimeMillis();
for( unsigned i = 0; i < f.size(); i++ ) {
if( f[i] == 0 ) {
int diff = tdiff(reportTimes[i],t);
assert( diff >= 0 || reportTimes[i] == 0 );
if( diff > 100 || reportTimes[i]==0 ||
(reportAll && diff > 1) ) {
reportTimes[i] = t;
assert( i < 25000 );
missing.push_back(i);
}
}
}
if( !missing.empty() )
__sendMISSING(&pc, from, msgid, missing);
}
inline bool MR::complete() {
if( nReceived == nExpected ) {
assert( nExpected == (int) f.size() );
assert( f[0] != 0 );
return true;
}
return false;
/*
if( nExpected > (int) f.size() || nExpected == -1 )
return false;
for( unsigned i = 0; i < f.size(); i++ )
if( f[i] == 0 )
return false;
return true;
*/
}
/* true=msg complete */
bool MR::got(F *frag, EndPoint& fromAddr) {
MSGID msgid = frag->__msgid();
int i = frag->__num();
if( i == 544 && !frag->__isREQUESTACK() ) {
cout << "************ GOT LAST FRAGMENT #544" << endl;
}
if( nExpected < 0 && i == 0 ) {
messageLenExpected = frag->__firstFragMsgLen();
if( messageLenExpected == frag->__len()-FragHeader )
nExpected = 1;
else {
int mss = frag->__len()-FragHeader;
assert( messageLenExpected > mss );
nExpected = (messageLenExpected + mss - 1) / mss;
ptrace( cout << ".got first frag, expect:" << nExpected << "packets, expectedLen:" << messageLenExpected << endl; )
}
}
if( i >= (int) f.size() )
f.resize(i+1, 0);
if( frag->__isREQUESTACK() ) {
ptrace( cout << "...got(): processing a REQUESTACK" << endl; )
/* we're simply seeing if we got the data, and then acking. */
delete frag;
frag = 0;
reportTimes.resize(f.size(), 0);
if( complete() ) {
__sendACK(&pc, msgid);
return true;
}
reportMissings(true);
return false;
}
else if( f[i] != 0 ) {
ptrace( cout << "\t.dup packet i:" << i << ' ' << from.toString() << endl; )
delete frag;
return false;
}
if( frag ) {
f[i] = frag;
nReceived++;
}
reportTimes.resize(f.size(), 0);
if( !complete() )
return false;
__sendACK(&pc, msgid);
return true;
/*
if( f[0] == 0 || f[i] == 0 ) {
// reportMissings(frag == 0);
return false;
}
if( i+1 < nExpected ) {
//cout << "TEMP COMMENT" << endl;
// if( i > 0 && f[i-1] == 0 )
// reportMissings(frag == 0);
return false;
}
// last fragment received
if( !complete() ) {
// reportMissings(frag == 0);
return false;
}
__sendACK(&pc, fromAddr, msgid);
return frag != 0;
*/
}
MR* CR::recv() {
MR *x;
{
lock lk(biglock);
while( received.empty() )
receivedSome.wait(lk);
x = received.back();
received.pop_back();
}
return x;
}
// this is how we tell protosend.cpp we got these
void gotACK(F*, ProtocolConnection *);
void gotMISSING(F*, ProtocolConnection *);
void receiverThread() {
ProtocolConnection *startingConn; // this thread manages many; this is just initiator or the parent for acceptany
UDPConnection *uc;
{
lock lk(threadStarterMutex);
while( 1 ) {
if( threadUseThisOne != 0 ) {
uc = &threadUseThisOne->udpConnection;
startingConn = threadUseThisOne;
threadUseThisOne = 0;
break;
}
threadActivate.wait(lk);
}
}
cout << "\n.Activating a new receiverThread\n " << startingConn->toString() << '\n' << endl;
EndPoint fromAddr;
while( 1 ) {
F *f = __recv(*uc, fromAddr.sa);
lock l(biglock);
fromAddr.channel = f->__channel();
ptrace( cout << "..__recv() from:" << fromAddr.toString() << " frag:" << f->__num() << endl; )
assert( fromAddr.channel >= 0 );
EndPointToPC::iterator it = pcMap.find(fromAddr);
ProtocolConnection *mypc;
if( it == pcMap.end() ) {
if( !startingConn->acceptAnyChannel() ) {
cout << ".WARNING: got packet from an unknown endpoint:" << fromAddr.toString() << endl;
cout << ". this may be ok if you just restarted" << endl;
delete f;
continue;
}
cout << ".New connection accepted from " << fromAddr.toString() << endl;
mypc = new ProtocolConnection(*startingConn, fromAddr);
pcMap[fromAddr] = mypc;
}
else
mypc = it->second;
assert( fromAddr.channel == mypc->farEnd.channel );
MsgTracker& track = mypc->cr.oldMsgTracker;
if( f->op != F::NORMAL ) {
if( f->__isACK() ) {
gotACK(f, mypc);
delete f;
continue;
}
if( f->__isMISSING() ) {
gotMISSING(f, mypc);
delete f;
continue;
}
if( f->op == F::RESET ) {
ptrace( cout << ".got RESET" << endl; )
track.reset();
mypc->cs.resetIt();
delete f;
continue;
}
}
if( track.recentlyReceived.count(f->__msgid()) ) {
// already done with it. ignore, other than acking.
if( f->__isREQUESTACK() )
__sendACK(mypc, f->__msgid());
else {
ptrace( cout << ".ignoring packet about msg already received msg:" << f->__msgid() << " op:" << f->op << endl; )
}
delete f;
continue;
}
if( f->__msgid() <= track.lastFullyReceived && !track.recentlyReceivedList.empty() ) {
// reconnect on an old channel?
ptrace( cout << ".warning: strange msgid:" << f->__msgid() << " received, last:" << track->lastFullyReceived << " conn:" << fromAddr.toString() << endl; )
}
MR *m = mypc->cr.getPendingMsg(f); /* todo: optimize for single fragment case? */
if( m == 0 ) {
ptrace( cout << "..getPendingMsg() returns 0" << endl; )
delete f;
continue;
}
if( m->got(f, fromAddr) ) {
track.got(m->msgid);
m->removeFromReceivingList();
{
mypc->parent->cr.received.push_back(m);
mypc->parent->cr.receivedSome.notify_one();
}
}
}
}