mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 01:21:03 +01:00
long msgs work
This commit is contained in:
parent
5c82a67f35
commit
8f932515bf
12
db/db.cpp
12
db/db.cpp
@ -265,7 +265,8 @@ void msg(const char *m) {
|
||||
p.init(29999);
|
||||
|
||||
// SockAddr db("127.0.0.1", MessagingPort::DBPort);
|
||||
SockAddr db("10.0.21.60", MessagingPort::DBPort);
|
||||
// SockAddr db("10.0.21.60", MessagingPort::DBPort);
|
||||
SockAddr db("172.16.0.179", MessagingPort::DBPort);
|
||||
|
||||
Message send;
|
||||
Message response;
|
||||
@ -288,6 +289,7 @@ int main(int argc, char* argv[], char *envp[] )
|
||||
{
|
||||
quicktest();
|
||||
|
||||
|
||||
if( argc >= 2 ) {
|
||||
if( strcmp(argv[1], "quicktest") == 0 )
|
||||
return 0;
|
||||
@ -299,6 +301,13 @@ int main(int argc, char* argv[], char *envp[] )
|
||||
run();
|
||||
return 0;
|
||||
}
|
||||
if( strcmp(argv[1], "longmsg") == 0 ) {
|
||||
char buf[4096];
|
||||
memset(buf, 'a', 4095);
|
||||
buf[4095] = 0;
|
||||
msg(buf);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
cout << "usage:\n";
|
||||
@ -306,6 +315,7 @@ int main(int argc, char* argv[], char *envp[] )
|
||||
cout << " msg [msg] send a request to the db server" << endl;
|
||||
cout << " msg end shut down" << endl;
|
||||
cout << " run run db" << endl;
|
||||
cout << " longmsg send a long test message to the db server" << endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@ struct Fragment {
|
||||
short fragmentNo;
|
||||
char data[1];
|
||||
int fragmentDataLen() { return fragmentLen - 8; }
|
||||
char* fragmentData() { return data; }
|
||||
|
||||
bool ok(int nRead) {
|
||||
if( nRead < MinFragmentLen || fragmentLen > nRead || fragmentLen < MinFragmentLen ) {
|
||||
@ -77,6 +78,7 @@ bool MessagingPort::recv(Message& m) {
|
||||
|
||||
/* we'll need to read more */
|
||||
char *msgData = (char *) malloc(totalLen);
|
||||
m.setData((MsgData*) msgData, true);
|
||||
char *p = msgData;
|
||||
memcpy(p, somd, ff->fragmentDataLen());
|
||||
int sofar = ff->fragmentDataLen();
|
||||
@ -94,8 +96,12 @@ bool MessagingPort::recv(Message& m) {
|
||||
Fragment *f = (Fragment *) b;
|
||||
if( !f->ok(n) )
|
||||
return false;
|
||||
if( f->msgId != msgid || f->fragmentNo != expectedFragmentNo ) {
|
||||
cout << "bad fragment" << endl;
|
||||
if( f->msgId != msgid ) {
|
||||
cout << "bad fragment, wrong msg id, expected:" << msgid << " got:" << f->msgId << endl;
|
||||
return false;
|
||||
}
|
||||
if( f->fragmentNo != expectedFragmentNo ) {
|
||||
cout << "bad fragment, wrong fragmentNo, expected:" << expectedFragmentNo << " got:" << f->fragmentNo << endl;
|
||||
return false;
|
||||
}
|
||||
if( from != m.from ) {
|
||||
@ -104,7 +110,7 @@ bool MessagingPort::recv(Message& m) {
|
||||
return false;
|
||||
}
|
||||
|
||||
memcpy(p, f->startOfMsgData(), f->fragmentDataLen());
|
||||
memcpy(p, f->fragmentData(), f->fragmentDataLen());
|
||||
p += f->fragmentDataLen();
|
||||
wanted -= f->fragmentDataLen();
|
||||
expectedFragmentNo++;
|
||||
@ -149,5 +155,6 @@ void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) {
|
||||
p += l;
|
||||
left -= l;
|
||||
conn.sendto(buf, l+8, to);
|
||||
f->fragmentNo++;
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ inline int MsgData::dataLen() { return len - MsgDataHeaderSize; }
|
||||
|
||||
class Message {
|
||||
public:
|
||||
Message() { data = 0; }
|
||||
Message() { data = 0; freeIt = false; }
|
||||
~Message() { reset(); }
|
||||
|
||||
SockAddr from;
|
||||
@ -71,7 +71,7 @@ public:
|
||||
void reset() {
|
||||
if( freeIt && data )
|
||||
free(data);
|
||||
data = 0;
|
||||
data = 0; freeIt = false;
|
||||
}
|
||||
|
||||
void setData(MsgData *d, bool _freeIt) {
|
||||
|
Loading…
Reference in New Issue
Block a user