mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
Remap cursor id on forward
This commit is contained in:
parent
b5a5e6f756
commit
dc197ff71a
@ -85,10 +85,13 @@ namespace mongo {
|
||||
nextjsobj += 4;
|
||||
return i;
|
||||
}
|
||||
long long pullInt64() {
|
||||
long long pullInt64() const {
|
||||
return pullInt64();
|
||||
}
|
||||
long long &pullInt64() {
|
||||
if ( nextjsobj == data )
|
||||
nextjsobj += strlen(data) + 1; // skip namespace
|
||||
long long i = *((long long *)nextjsobj);
|
||||
long long &i = *((long long *)nextjsobj);
|
||||
nextjsobj += 8;
|
||||
return i;
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ using mongo::DbMessage;
|
||||
using mongo::BSONObj;
|
||||
using mongo::BufBuilder;
|
||||
using mongo::DBClientConnection;
|
||||
using mongo::QueryResult;
|
||||
|
||||
#define SNAP_LEN 65535
|
||||
|
||||
@ -92,6 +93,7 @@ struct sniff_tcp {
|
||||
u_short th_urp; /* urgent pointer */
|
||||
};
|
||||
|
||||
#pragma pack( 1 )
|
||||
struct Connection {
|
||||
struct in_addr srcAddr;
|
||||
u_short srcPort;
|
||||
@ -100,13 +102,24 @@ struct Connection {
|
||||
bool operator<( const Connection &other ) const {
|
||||
return memcmp( this, &other, sizeof( Connection ) ) < 0;
|
||||
}
|
||||
Connection reverse() const {
|
||||
Connection c;
|
||||
c.srcAddr = dstAddr;
|
||||
c.srcPort = dstPort;
|
||||
c.dstAddr = srcAddr;
|
||||
c.dstPort = srcPort;
|
||||
return c;
|
||||
}
|
||||
};
|
||||
#pragma pack()
|
||||
|
||||
map< Connection, bool > seen;
|
||||
map< Connection, int > bytesRemainingInMessage;
|
||||
map< Connection, boost::shared_ptr< BufBuilder > > messageBuilder;
|
||||
map< Connection, unsigned > expectedSeq;
|
||||
map< Connection, DBClientConnection* > forwarder;
|
||||
map< Connection, long long > lastCursor;
|
||||
map< Connection, map< long long, long long > > mapCursor;
|
||||
|
||||
void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *packet){
|
||||
|
||||
@ -206,7 +219,7 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa
|
||||
switch( m.data->operation() ){
|
||||
case mongo::opReply:{
|
||||
mongo::QueryResult* r = (mongo::QueryResult*)m.data;
|
||||
cout << "\treply" << " n:" << r->nReturned << endl;
|
||||
cout << "\treply" << " n:" << r->nReturned << " cursorId: " << r->cursorId << endl;
|
||||
if ( r->nReturned ){
|
||||
mongo::BSONObj o( r->data() , 0 );
|
||||
cout << "\t" << o << endl;
|
||||
@ -233,7 +246,7 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa
|
||||
}
|
||||
case mongo::dbGetMore:{
|
||||
int nToReturn = d.pullInt();
|
||||
int cursorId = d.pullInt64();
|
||||
long long cursorId = d.pullInt64();
|
||||
cout << "\tgetMore nToReturn: " << nToReturn << " cursorId: " << cursorId << endl;
|
||||
break;
|
||||
}
|
||||
@ -254,19 +267,51 @@ void got_packet(u_char *args, const struct pcap_pkthdr *header, const u_char *pa
|
||||
cerr << "*** CANNOT HANDLE TYPE: " << m.data->operation() << endl;
|
||||
}
|
||||
|
||||
if ( m.data->operation() != mongo::opReply && !forwardAddress.empty() ) {
|
||||
DBClientConnection *conn = forwarder[ c ];
|
||||
if ( !conn ) {
|
||||
// These won't get freed on error, oh well...
|
||||
conn = new DBClientConnection( true );
|
||||
conn->connect( forwardAddress );
|
||||
forwarder[ c ] = conn;
|
||||
}
|
||||
if ( m.data->operation() == mongo::dbQuery || m.data->operation() == mongo::dbGetMore ) {
|
||||
Message response;
|
||||
conn->port().call( m, response );
|
||||
if ( !forwardAddress.empty() ) {
|
||||
if ( m.data->operation() != mongo::opReply ) {
|
||||
DBClientConnection *conn = forwarder[ c ];
|
||||
if ( !conn ) {
|
||||
// These won't get freed on error, oh well hopefully we'll just
|
||||
// abort in that case anyway.
|
||||
conn = new DBClientConnection( true );
|
||||
conn->connect( forwardAddress );
|
||||
forwarder[ c ] = conn;
|
||||
}
|
||||
if ( m.data->operation() == mongo::dbQuery || m.data->operation() == mongo::dbGetMore ) {
|
||||
if ( m.data->operation() == mongo::dbGetMore ) {
|
||||
DbMessage d( m );
|
||||
d.pullInt();
|
||||
long long &cId = d.pullInt64();
|
||||
cId = mapCursor[ c ][ cId ];
|
||||
}
|
||||
Message response;
|
||||
conn->port().call( m, response );
|
||||
QueryResult *qr = (QueryResult *) response.data;
|
||||
if ( !( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) ) {
|
||||
if ( qr->cursorId != 0 ) {
|
||||
lastCursor[ c ] = qr->cursorId;
|
||||
return;
|
||||
}
|
||||
}
|
||||
lastCursor[ c ] = 0;
|
||||
} else {
|
||||
conn->port().say( m );
|
||||
}
|
||||
} else {
|
||||
conn->port().say( m );
|
||||
Connection r = c.reverse();
|
||||
long long myCursor = lastCursor[ r ];
|
||||
QueryResult *qr = (QueryResult *) m.data;
|
||||
long long yourCursor = qr->cursorId;
|
||||
if ( ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) )
|
||||
yourCursor = 0;
|
||||
if ( myCursor && !yourCursor )
|
||||
cerr << "Expected valid cursor in sniffed response, found none" << endl;
|
||||
if ( !myCursor && yourCursor )
|
||||
cerr << "Sniffed valid cursor when none expected" << endl;
|
||||
if ( myCursor && yourCursor ) {
|
||||
mapCursor[ r ][ qr->cursorId ] = lastCursor[ r ];
|
||||
lastCursor[ r ] = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user