0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

endian check capability

repl work in progress
cleanup
This commit is contained in:
Dwight 2008-09-04 10:33:56 -04:00
parent 8cac7e4dc4
commit 5deec92469
11 changed files with 94 additions and 42 deletions

View File

@ -264,7 +264,7 @@ void receivedQuery(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort, *
qr->_data[2] = 0;
qr->_data[3] = 0;
qr->len = sizeof(QueryResult);
qr->operation = opReply;
qr->setOperation(opReply);
qr->cursorId = 0;
qr->startingFrom = 0;
qr->nReturned = 0;
@ -429,9 +429,9 @@ void jniCallback(Message& m, Message& out)
Timer t;
bool log = false;
curOp = m.data->operation;
curOp = m.data->operation();
if( m.data->operation == dbQuery ) {
if( m.data->operation() == dbQuery ) {
// on a query, the Message must have m.freeIt true so that the buffer data can be
// retained by cursors. As freeIt is false, we make a copy here.
assert( m.data->len > 0 && m.data->len < 32000000 );
@ -441,26 +441,26 @@ void jniCallback(Message& m, Message& out)
receivedQuery(dbr, copy, ss, false);
jmp.reply(m, *dbr.response, dbr.responseTo);
}
else if( m.data->operation == dbInsert ) {
else if( m.data->operation() == dbInsert ) {
ss << "insert ";
receivedInsert(m, ss);
}
else if( m.data->operation == dbUpdate ) {
else if( m.data->operation() == dbUpdate ) {
ss << "update ";
receivedUpdate(m, ss);
}
else if( m.data->operation == dbDelete ) {
else if( m.data->operation() == dbDelete ) {
ss << "remove ";
receivedDelete(m);
}
else if( m.data->operation == dbGetMore ) {
else if( m.data->operation() == dbGetMore ) {
DEV log = true;
ss << "getmore ";
DbResponse dbr;
receivedGetMore(dbr, m, ss);
jmp.reply(m, *dbr.response, dbr.responseTo);
}
else if( m.data->operation == dbKillCursors ) {
else if( m.data->operation() == dbKillCursors ) {
try {
log = true;
ss << "killcursors ";
@ -472,7 +472,7 @@ void jniCallback(Message& m, Message& out)
}
}
else {
cout << " jnicall: operation isn't supported: " << m.data->operation << endl;
cout << " jnicall: operation isn't supported: " << m.data->operation() << endl;
assert(false);
}
@ -540,7 +540,7 @@ void connThread()
int ms;
bool log = false;
curOp = m.data->operation;
curOp = m.data->operation();
#if 0
/* use this if you only want to process operations for a particular namespace.
@ -555,7 +555,7 @@ void connThread()
}
#endif
if( m.data->operation == dbMsg ) {
if( m.data->operation() == dbMsg ) {
ss << "msg ";
char *p = m.data->_data;
int len = strlen(p);
@ -582,10 +582,10 @@ void connThread()
}
}
}
else if( m.data->operation == dbQuery ) {
else if( m.data->operation() == dbQuery ) {
receivedQuery(dbresponse, m, ss, true);
}
else if( m.data->operation == dbInsert ) {
else if( m.data->operation() == dbInsert ) {
OPWRITE;
try {
ss << "insert ";
@ -596,7 +596,7 @@ void connThread()
ss << " exception ";
}
}
else if( m.data->operation == dbUpdate ) {
else if( m.data->operation() == dbUpdate ) {
OPWRITE;
try {
ss << "update ";
@ -607,7 +607,7 @@ void connThread()
ss << " exception ";
}
}
else if( m.data->operation == dbDelete ) {
else if( m.data->operation() == dbDelete ) {
OPWRITE;
try {
ss << "remove ";
@ -618,13 +618,13 @@ void connThread()
ss << " exception ";
}
}
else if( m.data->operation == dbGetMore ) {
else if( m.data->operation() == dbGetMore ) {
OPREAD;
DEV log = true;
ss << "getmore ";
receivedGetMore(dbresponse, m, ss);
}
else if( m.data->operation == dbKillCursors ) {
else if( m.data->operation() == dbKillCursors ) {
OPREAD;
try {
log = true;
@ -637,7 +637,7 @@ void connThread()
}
}
else {
cout << " operation isn't supported: " << m.data->operation << endl;
cout << " operation isn't supported: " << m.data->operation() << endl;
assert(false);
}

View File

@ -45,7 +45,7 @@ bool DBClientConnection::connect(const char *serverAddress, string& errmsg) {
int idx = ip.find( ":" );
if ( idx != string::npos ){
cout << "port string:" << ip.substr( idx ) << endl;
//cout << "port string:" << ip.substr( idx ) << endl;
port = atoi( ip.substr( idx + 1 ).c_str() );
ip = ip.substr( 0 , idx );
ip = hostbyname_nonreentrant(ip.c_str());

View File

@ -1098,7 +1098,7 @@ QueryResult* runQuery(Message& message, const char *ns, int ntoskip, int _ntoret
qr->len = b.len();
ss << " reslen:" << b.len();
// qr->channel = 0;
qr->operation = opReply;
qr->setOperation(opReply);
qr->cursorId = cursorid;
qr->startingFrom = 0;
qr->nReturned = n;
@ -1123,7 +1123,7 @@ QueryResult* emptyMoreResult(long long cursorid) {
qr->cursorId = 0; // 0 indicates no more data to retrieve.
qr->startingFrom = 0;
qr->len = b.len();
qr->operation = opReply;
qr->setOperation(opReply);
qr->nReturned = 0;
b.decouple();
return qr;
@ -1201,7 +1201,7 @@ done:
QueryResult *qr = (QueryResult *) b.buf();
qr->len = b.len();
qr->operation = opReply;
qr->setOperation(opReply);
qr->resultFlags() = resultFlags;
qr->cursorId = cursorid;
qr->startingFrom = start;

View File

@ -36,6 +36,10 @@ bool _runCommands(const char *ns, JSObj& jsobj, stringstream& ss, BufBuilder &b,
bool cloneFrom(const char *masterHost, string& errmsg);
void ensureHaveIdIndex(const char *ns);
#include "replset.h"
ReplSet *replSetPair = 0;
OpTime last(0, 0);
OpTime OpTime::now() {
@ -68,6 +72,11 @@ int test2() {
/* --------------------------------------------------------------*/
ReplSource::ReplSource() {
nClonedThisPass = 0;
sourceName = "main";
}
ReplSource::ReplSource(JSObj o) : nClonedThisPass(0) {
only = o.getStringField("only");
hostName = o.getStringField("host");
@ -154,15 +163,25 @@ void ReplSource::loadAll(vector<ReplSource*>& v) {
vector<ReplSource *> old = v;
v.erase(v.begin(), v.end());
bool gotPairWith = false;
setClient("local.sources");
auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
while( c->ok() ) {
ReplSource tmp(c->current());
ReplSource tmp(c->current());
if( replSetPair && tmp.hostName == replSetPair->remote && tmp.sourceName == "main" )
gotPairWith = true;
addSourceToList(v, tmp, old);
c->advance();
}
client = 0;
if( !gotPairWith && replSetPair ) {
/* add the --pairwith server */
ReplSource *s = new ReplSource();
s->hostName = replSetPair->remote;
v.push_back(s);
}
for( vector<ReplSource*>::iterator i = old.begin(); i != old.end(); i++ )
delete *i;
}
@ -558,8 +577,9 @@ void logOurDbsPresence() {
a real one
*/
string dbname = string(f.c_str(), f.size() - 2);
if( dbname != "local." )
if( dbname != "local." ) {
logOp("db", dbname.c_str(), emptyObj);
}
}
i++;
}
@ -578,18 +598,18 @@ void replMasterThread() {
}
}
#include "replset.h"
ReplSet *replSet = 0;
void startReplication() {
if( slave ) {
log() << "slave=true" << endl;
if( slave || replSetPair ) {
if( slave )
log() << "slave=true" << endl;
slave = true;
boost::thread repl_thread(replSlaveThread);
}
if( master || replSet ) {
log() << "master=true" << endl;
if( master || replSetPair ) {
if( master )
log() << "master=true" << endl;
master = true;
{
dblock lk;
/* create an oplog collection, if it doesn't yet exist. */
@ -609,6 +629,5 @@ void startReplication() {
/* called from main at server startup */
void pairWith(const char *remoteEnd) {
replSet = new ReplSet(remoteEnd);
// uassert("not done yet!", false);
replSetPair = new ReplSet(remoteEnd);
}

View File

@ -92,6 +92,8 @@ class ReplSource {
auto_ptr<DBClientConnection> conn;
auto_ptr<DBClientCursor> cursor;
ReplSource();
public:
string hostName; // ip addr or hostname
string sourceName; // a logical source name.

View File

@ -32,12 +32,14 @@ class ReplSet {
public:
int remotePort;
string remoteHost;
string remote; // host:port if port specified.
ReplSet(const char *remoteEnd);
};
ReplSet::ReplSet(const char *remoteEnd) {
remote = remoteEnd;
remotePort = DBPort;
remoteHost = remoteEnd;
const char *p = strchr(remoteEnd, ':');
@ -45,5 +47,7 @@ ReplSet::ReplSet(const char *remoteEnd) {
remoteHost = string(remoteEnd, p-remoteEnd);
remotePort = atoi(p+1);
uassert("bad port #", remotePort > 0 && remotePort < 0x10000 );
if( remotePort == DBPort )
remote = remoteHost; // don't include ":27017" as it is default; in case ran in diff ways over time to normalizke the hostname format in sources collection
}
}

View File

@ -121,6 +121,7 @@ bool MessagingPort::connect(SockAddr& _far)
}
bool MessagingPort::recv(Message& m) {
again:
mmm( cout << "* recv() sock:" << this->sock << endl; )
int len = -1;
@ -146,9 +147,17 @@ bool MessagingPort::recv(Message& m) {
assert( lft > 0 );
}
// assert( x == 4 );
if( len < 0 || len > 16000000 ) {
if( len == 0xffffffff ) {
// Endian check from the client, after connecting, to see what mode server is running in.
unsigned foo = 0x10203040;
int x = ::send(sock, (char *) &foo, 4, 0);
if( x <= 0 ) {
cout << "MessagingPort endian send() error " << errno << ' ' << farEnd.toString() << endl;
return false;
}
goto again;
}
cout << "bad recv() len: " << len << '\n';
return false;
}

View File

@ -72,12 +72,10 @@ public:
enum Operations {
opReply = 1, /* reply. responseTo is set. */
dbMsg = 1000, /* generic msg command followed by a string */
dbUpdate = 2001, /* update object */
dbInsert = 2002,
// dbGetByOID = 2003,
//dbGetByOID = 2003,
dbQuery = 2004,
dbGetMore = 2005,
dbDelete = 2006,
@ -88,7 +86,9 @@ struct MsgData {
int len; /* len of the msg, including this field */
MSGID id; /* request/reply id's match... */
int responseTo; /* id of the message we are responding to */
int operation;
int _operation;
int operation() const { return _operation; }
void setOperation(int o) { _operation = o; }
char _data[4];
int& dataAsInt() { return *((int *) _data); }
@ -138,8 +138,8 @@ public:
int dataLen = len + sizeof(MsgData) - 4;
MsgData *d = (MsgData *) malloc(dataLen);
memcpy(d->_data, msgdata, len);
d->len = dataLen;
d->operation = operation;
d->len = fixEndian(dataLen);
d->setOperation(operation);
freeIt= true;
data = d;
}

View File

@ -187,3 +187,4 @@ using namespace boost::filesystem;
#include "util/goodies.h"
#include "util/log.h"

View File

@ -199,3 +199,18 @@ inline bool endsWith(const char *p, const char *suffix) {
return strcmp(p + a - b, suffix) == 0;
}
#include "boost/detail/endian.hpp"
inline unsigned long swapEndian(unsigned long x) {
return
((x & 0xff) << 24) |
((x & 0xff00) << 8) |
((x & 0xff0000) >> 8) |
((x & 0xff000000) >> 24);
}
#if defined(BOOST_LITTLE_ENDIAN)
inline unsigned long fixEndian(unsigned long x) { return x; }
#else
inline unsigned long fixEndian(unsigned long x) { return swapEndian(x); }
#endif

View File

@ -68,5 +68,7 @@ struct UtilTest {
assert( endsWith("abcde", "de") );
assert( !endsWith("abcde", "dasdfasdfashkfde") );
assert( swapEndian(0x01020304) == 0x04030201 );
}
} utilTest;