0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

Use socketless interface for cloning using single db instance

This commit is contained in:
Aaron 2008-12-18 12:02:12 -05:00
parent f0a215a44c
commit 6ae140a2df
9 changed files with 288 additions and 207 deletions

View File

@ -99,9 +99,21 @@ void DBClientConnection::checkConnection() {
log() << "reconnect " << serverAddress << " ok" << endl;
}
auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, BSONObj query, int nToReturn, int nToSkip, BSONObj *fieldsToReturn, int queryOptions) {
auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, BSONObj query, int nToReturn,
int nToSkip, BSONObj *fieldsToReturn, int queryOptions) {
checkConnection();
auto_ptr<DBClientCursor> c( new DBClientCursor( new CursorConnector( this ),
ns, query, nToReturn, nToSkip,
fieldsToReturn, queryOptions ) );
if( c->init() )
return c;
return auto_ptr< DBClientCursor >( 0 );
}
/* -- DBClientCursor ---------------------------------------------- */
void assembleRequest( const char *ns, BSONObj query, int nToReturn, int nToSkip, BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) {
// see query.h for the protocol we are using here.
BufBuilder b;
int opts = queryOptions;
@ -113,38 +125,55 @@ auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, BSONObj query
query.appendSelfToBufBuilder(b);
if( fieldsToReturn )
fieldsToReturn->appendSelfToBufBuilder(b);
Message toSend;
toSend.setData(dbQuery, b.buf(), b.len());
auto_ptr<Message> response(new Message());
if( !p->call(toSend, *response) ) {
failed = true;
return auto_ptr<DBClientCursor>(0);
}
auto_ptr<DBClientCursor> c(new DBClientCursor(this, *p.get(), response, opts));
c->ns = ns;
c->nToReturn = nToReturn;
return c;
}
/* -- DBClientCursor ---------------------------------------------- */
bool DBClientConnection::CursorConnector::send( Message &toSend, Message &response, bool assertOk ) {
if( !conn->port().call(toSend, response) ) {
conn->failed = true;
if( assertOk )
massert("dbclient error communicating with server", false);
return false;
}
return true;
}
void DBClientConnection::CursorConnector::checkResponse( const char *data, int nReturned ) {
/* check for errors. the only one we really care about at
this stage is "not master" */
if( conn->clientPaired && nReturned ) {
BSONObj o(data);
BSONElement e = o.firstElement();
if( strcmp(e.fieldName(), "$err") == 0 &&
e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
conn->clientPaired->isntMaster();
}
}
}
bool DBClientCursor::init() {
Message toSend;
assembleRequest( ns, query, nToReturn, nToSkip, fieldsToReturn, opts, toSend );
if( !connector->send( toSend, *m, false ) )
return false;
dataReceived();
return true;
}
void DBClientCursor::requestMore() {
assert( cursorId && pos == nReturned );
BufBuilder b;
b.append(opts);
b.append(ns.c_str());
b.append(ns);
b.append(nToReturn);
b.append(cursorId);
Message toSend;
toSend.setData(dbGetMore, b.buf(), b.len());
auto_ptr<Message> response(new Message());
if( !p.call(toSend, *response) ) {
conn->failed = true;
massert("dbclient error communicating with server", false);
}
connector->send( toSend, *response );
m = response;
dataReceived();
@ -166,17 +195,7 @@ void DBClientCursor::dataReceived() {
pos = 0;
data = qr->data();
/* check for errors. the only one we really care about at
this stage is "not master" */
if( conn->clientPaired && nReturned ) {
BSONObj o(data);
BSONElement e = o.firstElement();
if( strcmp(e.fieldName(), "$err") == 0 &&
e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
conn->clientPaired->isntMaster();
}
}
connector->checkResponse( data, nReturned );
/* this assert would fire the way we currently work:
assert( nReturned || cursorId == 0 );
*/

View File

@ -52,30 +52,8 @@ struct QueryResult : public MsgData {
};
#pragma pack(pop)
class DBClientConnection;
class DBClientCursor : boost::noncopyable {
friend class DBClientConnection;
DBClientConnection *conn;
MessagingPort& p;
long long cursorId;
int nReturned;
int pos;
const char *data;
auto_ptr<Message> m;
int opts;
string ns;
int nToReturn;
void dataReceived();
void requestMore();
DBClientCursor(DBClientConnection *_conn, MessagingPort& _p, auto_ptr<Message> _m, int _opts) :
conn(_conn), p(_p), m(_m), opts(_opts) {
cursorId = 0;
dataReceived();
}
public:
bool more(); // if true, safe to call next()
/* returns next object in the result cursor.
@ -100,6 +78,44 @@ public:
bool isDead() const { return cursorId == 0; }
bool tailable() const { return (opts & Option_CursorTailable) != 0; }
bool init();
class Connector {
public:
virtual bool send( Message &toSend, Message &response, bool assertOk=true ) = 0;
virtual void checkResponse( const char *data, int nReturned ) {}
};
DBClientCursor( Connector *_connector, const char * _ns, BSONObj _query, int _nToReturn,
int _nToSkip, BSONObj *_fieldsToReturn, int queryOptions ) :
connector(_connector),
ns(_ns),
query(_query),
nToReturn(_nToReturn),
nToSkip(_nToSkip),
fieldsToReturn(_fieldsToReturn),
opts(queryOptions),
m(new Message()) {
cursorId = 0;
}
private:
auto_ptr< Connector > connector;
const char * ns;
BSONObj query;
int nToReturn;
int nToSkip;
BSONObj *fieldsToReturn;
int opts;
auto_ptr<Message> m;
long long cursorId;
int nReturned;
int pos;
const char *data;
void dataReceived();
void requestMore();
};
class DBClientInterface : boost::noncopyable {
@ -128,7 +144,6 @@ public:
class DBClientPaired;
class DBClientConnection : public DBClientWithCommands {
friend class DBClientCursor;
DBClientPaired *clientPaired;
auto_ptr<MessagingPort> p;
auto_ptr<SockAddr> server;
@ -174,18 +189,27 @@ public:
*/
/*throws AssertionException*/
auto_ptr<DBClientCursor> query(const char *ns, BSONObj query, int nToReturn = 0, int nToSkip = 0,
BSONObj *fieldsToReturn = 0, int queryOptions = 0);
BSONObj *fieldsToReturn = 0, int queryOptions = 0);
/*throws AssertionException*/
virtual
BSONObj findOne(const char *ns, BSONObj query, BSONObj *fieldsToReturn = 0, int queryOptions = 0);
private:
class CursorConnector : public DBClientCursor::Connector {
DBClientConnection *conn;
virtual bool send( Message &toSend, Message &response, bool assertOk = true );
virtual void checkResponse( const char *data, int nReturned );
public:
CursorConnector( DBClientConnection *_conn ) :
conn( _conn ) {
}
};
};
/* Use this class to connect to a replica pair of servers. The class will manage
checking for which is master, and do failover automatically.
*/
class DBClientPaired : public DBClientWithCommands {
friend class DBClientCursor;
DBClientConnection left,right;
enum State {
NotSetL=0,
@ -196,11 +220,6 @@ class DBClientPaired : public DBClientWithCommands {
void _checkMaster();
DBClientConnection& checkMaster();
/* notification that we got a "not master" error.
*/
void isntMaster() {
master = ( ( master == Left ) ? NotSetR : NotSetL );
}
public:
DBClientPaired();
@ -221,6 +240,12 @@ public:
BSONObj findOne(const char *ns, BSONObj query, BSONObj *fieldsToReturn = 0, int queryOptions = 0);
string toString();
/* notification that we got a "not master" error.
*/
void isntMaster() {
master = ( ( master == Left ) ? NotSetR : NotSetL );
}
};

View File

@ -24,13 +24,16 @@
#include "query.h"
#include "commands.h"
#include "db.h"
#include "instance.h"
#include "repl.h"
extern int port;
class Cloner: boost::noncopyable {
DBClientConnection conn;
void copy(const char *from_ns, const char *to_ns, bool isindex, bool logForRepl, bool slaveOk);
void copy(const char *from_ns, const char *to_ns, bool isindex, bool logForRepl,
bool masterSameProcess, bool slaveOk);
auto_ptr<DBClientCursor> createCursor(bool masterSameProcess, const char *ns, bool slaveOk);
public:
Cloner() { }
@ -76,11 +79,11 @@ BSONObj fixindex(BSONObj o) {
/* copy the specified collection
isindex - if true, this is system.indexes collection.
*/
void Cloner::copy(const char *from_collection, const char *to_collection, bool isindex, bool logForRepl, bool slaveOk) {
void Cloner::copy(const char *from_collection, const char *to_collection, bool isindex, bool logForRepl, bool masterSameProcess, bool slaveOk) {
auto_ptr<DBClientCursor> c;
{
dbtemprelease r;
c = auto_ptr<DBClientCursor>( conn.query(from_collection, emptyObj, 0, 0, 0, slaveOk ? Option_SlaveOk : 0) );\
c = createCursor( masterSameProcess, from_collection, slaveOk );
}
assert( c.get() );
while( 1 ) {
@ -109,12 +112,35 @@ void Cloner::copy(const char *from_collection, const char *to_collection, bool i
}
}
bool Cloner::go(const char *masterHost, string& errmsg, const string& fromdb, bool logForRepl, bool slaveOK) {
class DirectConnector : public DBClientCursor::Connector {
virtual bool send( Message &toSend, Message &response, bool assertOk=true ) {
DbResponse dbResponse;
assembleResponse( toSend, dbResponse );
assert( dbResponse.response );
response = *dbResponse.response;
return true;
}
};
auto_ptr< DBClientCursor > Cloner::createCursor( bool masterSameProcess, const char *ns, bool slaveOk ) {
auto_ptr< DBClientCursor > c;
if ( !masterSameProcess ) {
c = auto_ptr<DBClientCursor>( conn.query(ns, emptyObj, 0, 0, 0, slaveOk ? Option_SlaveOk : 0) );
} else {
c = auto_ptr<DBClientCursor>( new DBClientCursor( new DirectConnector(), ns,
emptyObj, 0, 0, 0, slaveOk ? Option_SlaveOk : 0 ) );
}
c->init();
return c;
}
bool Cloner::go(const char *masterHost, string& errmsg, const string& fromdb, bool logForRepl, bool slaveOk) {
string todb = database->name;
stringstream a,b;
a << "localhost:" << port;
b << "127.0.0.1:" << port;
if( a.str() == masterHost || b.str() == masterHost ) {
bool masterSameProcess = ( a.str() == masterHost || b.str() == masterHost );
if( masterSameProcess ) {
if( fromdb == todb && database->path == dbpath ) {
// guard against an "infinite" loop
/* if you are replicating, the local.sources config may be wrong if you get this */
@ -129,9 +155,10 @@ bool Cloner::go(const char *masterHost, string& errmsg, const string& fromdb, bo
auto_ptr<DBClientCursor> c;
{
dbtemprelease r;
if( !conn.connect(masterHost, errmsg) )
return false;
c = auto_ptr<DBClientCursor>( conn.query(ns.c_str(), emptyObj, 0, 0, 0, slaveOK ? Option_SlaveOk : 0) );
if ( !masterSameProcess )
if( !conn.connect(masterHost, errmsg) )
return false;
c = createCursor( masterSameProcess, ns.c_str(), slaveOk );
}
if( c.get() == 0 ) {
errmsg = "query failed " + ns;
@ -175,13 +202,13 @@ bool Cloner::go(const char *masterHost, string& errmsg, const string& fromdb, bo
string err;
userCreateNS(to_name.c_str(), options, err, logForRepl);
}
copy(from_name, to_name.c_str(), false, logForRepl, slaveOK);
copy(from_name, to_name.c_str(), false, logForRepl, masterSameProcess, slaveOk);
}
// now build the indexes
string system_indexes_from = fromdb + ".system.indexes";
string system_indexes_to = todb + ".system.indexes";
copy(system_indexes_from.c_str(), system_indexes_to.c_str(), true, logForRepl, slaveOK);
copy(system_indexes_from.c_str(), system_indexes_to.c_str(), true, logForRepl, masterSameProcess, slaveOk);
return true;
}

140
db/db.cpp
View File

@ -149,7 +149,6 @@ void connThread()
Message m;
while( 1 ) {
m.reset();
stringstream ss;
if( !dbMsgPort.recv(m) ) {
log() << "end connection " << dbMsgPort.farEnd.toString() << endl;
@ -157,137 +156,20 @@ void connThread()
break;
}
char buf[64];
time_t_to_String(time(0), buf);
buf[20] = 0; // don't want the year
ss << buf;
// ss << curTimeMillis() % 10000 << ' ';
DbResponse dbresponse;
{
dblock lk;
Timer t;
database = 0;
curOp = 0;
int ms;
bool log = false;
curOp = m.data->operation();
#if 0
/* use this if you only want to process operations for a particular namespace.
maybe add to cmd line parms or something fancier.
*/
DbMessage ddd(m);
if( strncmp(ddd.getns(), "clusterstock", 12) != 0 ) {
static int q;
if( ++q < 20 )
cout << "TEMP skip " << ddd.getns() << endl;
goto skip;
}
#endif
if( m.data->operation() == dbMsg ) {
ss << "msg ";
char *p = m.data->_data;
int len = strlen(p);
if( len > 400 )
cout << curTimeMillis() % 10000 <<
" long msg received, len:" << len <<
" ends with: " << p + len - 10 << endl;
bool end = strcmp("end", p) == 0;
Message *resp = new Message();
resp->setData(opReply, "i am fine");
dbresponse.response = resp;
dbresponse.responseTo = m.data->id;
//dbMsgPort.reply(m, resp);
if( end ) {
cout << curTimeMillis() % 10000 << " end msg " << dbMsgPort.farEnd.toString() << endl;
if( dbMsgPort.farEnd.isLocalHost() ) {
dbMsgPort.shutdown();
sleepmillis(50);
problem() << "exiting end msg" << endl;
exit(EXIT_SUCCESS);
}
else {
cout << " (not from localhost, ignoring end msg)" << endl;
}
}
if( !assembleResponse( m, dbresponse ) ) {
cout << curTimeMillis() % 10000 << " end msg " << dbMsgPort.farEnd.toString() << endl;
if( dbMsgPort.farEnd.isLocalHost() ) {
dbMsgPort.shutdown();
sleepmillis(50);
problem() << "exiting end msg" << endl;
exit(EXIT_SUCCESS);
}
else if( m.data->operation() == dbQuery ) {
receivedQuery(dbresponse, m, ss, true);
else {
cout << " (not from localhost, ignoring end msg)" << endl;
}
else if( m.data->operation() == dbInsert ) {
OPWRITE;
try {
ss << "insert ";
receivedInsert(m, ss);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion insert, continuing\n";
ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbUpdate ) {
OPWRITE;
try {
ss << "update ";
receivedUpdate(m, ss);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion update, continuing" << endl;
ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbDelete ) {
OPWRITE;
try {
ss << "remove ";
receivedDelete(m);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion receivedDelete, continuing" << endl;
ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbGetMore ) {
OPREAD;
DEV log = true;
ss << "getmore ";
receivedGetMore(dbresponse, m, ss);
}
else if( m.data->operation() == dbKillCursors ) {
OPREAD;
try {
log = true;
ss << "killcursors ";
receivedKillCursors(m);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion in kill cursors, continuing" << endl;
ss << " exception " + e.toString();
}
}
else {
cout << " operation isn't supported: " << m.data->operation() << endl;
assert(false);
}
ms = t.millis();
log = log || (ctr++ % 512 == 0 && !quiet);
DEV log = true;
if( log || ms > 100 ) {
ss << ' ' << t.millis() << "ms";
cout << ss.str().c_str() << endl;
}
if( database && database->profile >= 1 ) {
if( database->profile >= 2 || ms >= 100 ) {
// profile it
profile(ss.str().c_str()+20/*skip ts*/, ms);
}
}
} /* end lock */
}
if( dbresponse.response )
dbMsgPort.reply(m, *dbresponse.response, dbresponse.responseTo);
}

View File

@ -24,7 +24,7 @@
class DbMessage {
public:
DbMessage(Message& _m) : m(_m) {
DbMessage(const Message& _m) : m(_m) {
theEnd = _m.data->_data + _m.data->dataLen();
int *r = (int *) _m.data->_data;
reserved = *r;
@ -81,10 +81,10 @@ public:
return js;
}
Message& msg() { return m; }
const Message& msg() { return m; }
private:
Message& m;
const Message& m;
int reserved;
const char *data;
const char *nextjsobj;

View File

@ -47,6 +47,134 @@ bool useCursors = true;
void closeAllSockets();
void flushOpLog() { _oplog.flush(); }
int ctr = 0;
bool quiet = false;
// Returns false when request includes 'end'
bool assembleResponse( Message &m, DbResponse &dbresponse ) {
dblock lk;
stringstream ss;
char buf[64];
time_t_to_String(time(0), buf);
buf[20] = 0; // don't want the year
ss << buf;
// ss << curTimeMillis() % 10000 << ' ';
Timer t;
database = 0;
curOp = 0;
int ms;
bool log = false;
curOp = m.data->operation();
#if 0
/* use this if you only want to process operations for a particular namespace.
maybe add to cmd line parms or something fancier.
*/
DbMessage ddd(m);
if( strncmp(ddd.getns(), "clusterstock", 12) != 0 ) {
static int q;
if( ++q < 20 )
cout << "TEMP skip " << ddd.getns() << endl;
goto skip;
}
#endif
if( m.data->operation() == dbMsg ) {
ss << "msg ";
char *p = m.data->_data;
int len = strlen(p);
if( len > 400 )
cout << curTimeMillis() % 10000 <<
" long msg received, len:" << len <<
" ends with: " << p + len - 10 << endl;
bool end = strcmp("end", p) == 0;
Message *resp = new Message();
resp->setData(opReply, "i am fine");
dbresponse.response = resp;
dbresponse.responseTo = m.data->id;
//dbMsgPort.reply(m, resp);
if( end )
return false;
}
else if( m.data->operation() == dbQuery ) {
receivedQuery(dbresponse, m, ss, true);
}
else if( m.data->operation() == dbInsert ) {
OPWRITE;
try {
ss << "insert ";
receivedInsert(m, ss);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion insert, continuing\n";
ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbUpdate ) {
OPWRITE;
try {
ss << "update ";
receivedUpdate(m, ss);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion update, continuing" << endl;
ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbDelete ) {
OPWRITE;
try {
ss << "remove ";
receivedDelete(m);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion receivedDelete, continuing" << endl;
ss << " exception " + e.toString();
}
}
else if( m.data->operation() == dbGetMore ) {
OPREAD;
DEV log = true;
ss << "getmore ";
receivedGetMore(dbresponse, m, ss);
}
else if( m.data->operation() == dbKillCursors ) {
OPREAD;
try {
log = true;
ss << "killcursors ";
receivedKillCursors(m);
}
catch( AssertionException& e ) {
problem() << " Caught Assertion in kill cursors, continuing" << endl;
ss << " exception " + e.toString();
}
}
else {
cout << " operation isn't supported: " << m.data->operation() << endl;
assert(false);
}
ms = t.millis();
log = log || (ctr++ % 512 == 0 && !quiet);
DEV log = true;
if( log || ms > 100 ) {
ss << ' ' << t.millis() << "ms";
cout << ss.str().c_str() << endl;
}
if( database && database->profile >= 1 ) {
if( database->profile >= 2 || ms >= 100 ) {
// profile it
profile(ss.str().c_str()+20/*skip ts*/, ms);
}
}
return true;
}
void killCursors(int n, long long *ids);
void receivedKillCursors(Message& m) {
int *x = (int *) m.data->_data;
@ -191,7 +319,6 @@ void receivedGetMore(DbResponse& dbresponse, /*AbstractMessagingPort& dbMsgPort,
}
bool objcheck = false;
bool quiet = false;
void receivedInsert(Message& m, stringstream& ss) {
DbMessage d(m);
@ -212,7 +339,6 @@ void receivedInsert(Message& m, stringstream& ss) {
}
}
int ctr = 0;
extern int callDepth;
class JniMessagingPort : public AbstractMessagingPort {

View File

@ -73,6 +73,8 @@ struct DbResponse {
~DbResponse() { delete response; }
};
bool assembleResponse( Message &m, DbResponse &dbresponse );
void receivedKillCursors(Message& m);
void receivedUpdate(Message& m, stringstream& ss);
void receivedDelete(Message& m);

View File

@ -249,7 +249,7 @@ void MessagingPort::reply(Message& received, Message& response, MSGID responseTo
say(/*received.from, */response, responseTo);
}
bool MessagingPort::call(Message& toSend, Message& response) {
bool MessagingPort::call(const Message& toSend, Message& response) {
mmm( cout << "*call()" << endl; )
MSGID old = toSend.data->id;
say(/*to,*/ toSend);
@ -272,7 +272,7 @@ bool MessagingPort::call(Message& toSend, Message& response) {
return true;
}
void MessagingPort::say(Message& toSend, int responseTo) {
void MessagingPort::say(const Message& toSend, int responseTo) {
mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
MSGID msgid = NextMsgId;
++NextMsgId;

View File

@ -58,8 +58,8 @@ public:
bool recv(Message& m);
void reply(Message& received, Message& response, MSGID responseTo);
void reply(Message& received, Message& response);
bool call(Message& toSend, Message& response);
void say(Message& toSend, int responseTo = -1);
bool call(const Message& toSend, Message& response);
void say(const Message& toSend, int responseTo = -1);
private:
int sock;