0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 01:21:03 +01:00
mongodb/db/dbclient.cpp

329 lines
9.3 KiB
C++
Raw Normal View History

// dbclient.cpp - connect to a Mongo database as a client, from C++
2008-07-28 00:36:47 +02:00
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "stdafx.h"
#include "pdfile.h"
2008-07-28 00:36:47 +02:00
#include "dbclient.h"
#include "../util/builder.h"
#include "jsobj.h"
#include "query.h"
2008-10-19 18:17:25 +02:00
#include "json.h"
/* --- dbclientcommands --- */
JSObj ismastercmdobj = fromjson("{ismaster:1}");
JSObj DBClientCommands::cmdIsMaster(bool& isMaster) {
JSObj o = findOne("admin.$cmd", ismastercmdobj);
2008-10-19 18:17:25 +02:00
isMaster = (o.getIntField("ismaster") == 1);
return o;
}
/* --- dbclientconnection --- */
2008-07-28 00:36:47 +02:00
2008-09-11 19:04:30 +02:00
JSObj DBClientConnection::findOne(const char *ns, JSObj query, JSObj *fieldsToReturn, int queryOptions) {
auto_ptr<DBClientCursor> c =
2008-09-11 19:04:30 +02:00
this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
massert( "DBClientConnection::findOne: transport error", c.get() );
if( !c->more() )
return JSObj();
return c->next().copy();
}
bool DBClientConnection::connect(const char *_serverAddress, string& errmsg) {
serverAddress = _serverAddress;
int port = DBPort;
string ip = hostbyname(_serverAddress);
2008-07-28 00:36:47 +02:00
if( ip.empty() )
ip = serverAddress;
int idx = ip.find( ":" );
if ( idx != string::npos ){
//cout << "port string:" << ip.substr( idx ) << endl;
port = atoi( ip.substr( idx + 1 ).c_str() );
ip = ip.substr( 0 , idx );
2008-10-14 20:02:43 +02:00
ip = hostbyname(ip.c_str());
}
if( ip.empty() )
ip = serverAddress;
// we keep around SockAddr for connection life -- maybe MessagingPort
// requires that?
server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
p = auto_ptr<MessagingPort>(new MessagingPort());
if( !p->connect(*server) ) {
errmsg = string("couldn't connect to server ") + serverAddress + ' ' + ip;
failed = true;
2008-07-28 00:36:47 +02:00
return false;
}
return true;
}
void DBClientConnection::checkConnection() {
if( !failed )
return;
if( lastReconnectTry && time(0)-lastReconnectTry < 2 )
return;
if( !autoReconnect )
return;
lastReconnectTry = time(0);
log() << "trying reconnect to " << serverAddress << endl;
string errmsg;
string tmp = serverAddress;
failed = false;
if( !connect(tmp.c_str(), errmsg) )
log() << "reconnect " << serverAddress << " failed " << errmsg << endl;
else
log() << "reconnect " << serverAddress << " ok" << endl;
}
auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query, int nToReturn, int nToSkip, JSObj *fieldsToReturn, int queryOptions) {
checkConnection();
// see query.h for the protocol we are using here.
BufBuilder b;
int opts = queryOptions;
assert( (opts&Option_ALLMASK) == opts );
b.append(opts);
b.append(ns);
b.append(nToSkip);
b.append(nToReturn);
query.appendSelfToBufBuilder(b);
if( fieldsToReturn )
fieldsToReturn->appendSelfToBufBuilder(b);
Message toSend;
toSend.setData(dbQuery, b.buf(), b.len());
auto_ptr<Message> response(new Message());
if( !p->call(toSend, *response) ) {
failed = true;
return auto_ptr<DBClientCursor>(0);
}
auto_ptr<DBClientCursor> c(new DBClientCursor(this, *p.get(), response, opts));
c->ns = ns;
c->nToReturn = nToReturn;
return c;
}
/* -- DBClientCursor ---------------------------------------------- */
void DBClientCursor::requestMore() {
assert( cursorId && pos == nReturned );
BufBuilder b;
b.append(opts);
b.append(ns.c_str());
b.append(nToReturn);
b.append(cursorId);
Message toSend;
toSend.setData(dbGetMore, b.buf(), b.len());
auto_ptr<Message> response(new Message());
if( !p.call(toSend, *response) ) {
conn->failed = true;
massert("dbclient error communicating with server", false);
}
m = response;
dataReceived();
}
void DBClientCursor::dataReceived() {
QueryResult *qr = (QueryResult *) m->data;
if( qr->resultFlags() & ResultFlag_CursorNotFound ) {
// cursor id no longer valid at the server.
assert( qr->cursorId == 0 );
cursorId = 0; // 0 indicates no longer valid (dead)
}
if( cursorId == 0 ) {
// only set initially: we don't want to kill it on end of data
// if it's a tailable cursor
cursorId = qr->cursorId;
}
nReturned = qr->nReturned;
pos = 0;
data = qr->data();
/* check for errors. the only one we really care about at
this stage is "not master" */
if( conn->clientPaired && nReturned ) {
JSObj o(data);
Element e = o.firstElement();
if( strcmp(e.fieldName(), "$err") == 0 &&
e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
conn->clientPaired->isntMaster();
}
}
/* this assert would fire the way we currently work:
assert( nReturned || cursorId == 0 );
*/
}
bool DBClientCursor::more() {
if( pos < nReturned )
return true;
if( cursorId == 0 )
return false;
requestMore();
return pos < nReturned;
}
JSObj DBClientCursor::next() {
assert( more() );
pos++;
JSObj o(data);
data += o.objsize();
return o;
}
/* ------------------------------------------------------ */
// "./db testclient" to invoke
extern JSObj emptyObj;
void testClient() {
cout << "testClient()" << endl;
// DBClientConnection c(true);
DBClientPaired c;
string err;
if( !c.connect("10.211.55.2", "1.2.3.4") ) {
// if( !c.connect("10.211.55.2", err) ) {
cout << "testClient: connect() failed" << endl;
}
else {
// temp:
cout << "test query returns: " << c.findOne("foo.bar", fromjson("{}")).toString() << endl;
}
again:
cout << "query foo.bar..." << endl;
auto_ptr<DBClientCursor> cursor =
c.query("foo.bar", emptyObj, 0, 0, 0, Option_CursorTailable);
DBClientCursor *cc = cursor.get();
2008-10-19 18:17:25 +02:00
if( cc == 0 ) {
cout << "query() returned 0, sleeping 10 secs" << endl;
sleepsecs(10);
goto again;
}
while( 1 ) {
bool m;
try {
m = cc->more();
} catch(AssertionException&) {
cout << "more() asserted, sleeping 10 sec" << endl;
goto again;
}
cout << "more: " << m << " dead:" << cc->isDead() << endl;
if( !m ) {
if( cc->isDead() )
cout << "cursor dead, stopping" << endl;
else {
cout << "Sleeping 10 seconds" << endl;
sleepsecs(10);
continue;
}
break;
}
cout << cc->next().toString() << endl;
}
}
2008-10-19 18:17:25 +02:00
/* --- class dbclientpaired --- */
DBClientPaired::DBClientPaired() :
left(true), right(true)
{
master = NotSetL;
}
/* find which server, the left or right, is currently master mode */
void DBClientPaired::_checkMaster() {
for( int retry = 0; retry < 2; retry++ ) {
int x = master;
for( int pass = 0; pass < 2; pass++ ) {
DBClientConnection& c = x == 0 ? left : right;
try {
bool im;
JSObj o = c.cmdIsMaster(im);
if( retry )
log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n';
if( im ) {
master = (State) (x + 2);
return;
}
}
catch(AssertionException&) {
if( retry )
log() << "checkmaster: caught exception " << c.toString() << '\n';
}
x = x^1;
}
sleepsecs(1);
}
2008-10-19 18:17:25 +02:00
uassert("checkmaster: no master found", false);
}
2008-10-19 18:17:25 +02:00
inline DBClientConnection& DBClientPaired::checkMaster() {
if( master > NotSetR ) {
// a master is selected. let's just make sure connection didn't die
DBClientConnection& c = master == Left ? left : right;
if( !c.isFailed() )
return c;
// after a failure, on the next checkMaster, start with the other
// server -- presumably it took over. (not critical which we check first,
// just will make the failover slightly faster if we guess right)
master = master == Left ? NotSetR : NotSetL;
2008-10-19 18:17:25 +02:00
}
_checkMaster();
assert( master > NotSetR );
return master == Left ? left : right;
2008-10-19 18:17:25 +02:00
}
bool DBClientPaired::connect(const char *serverHostname1, const char *serverHostname2) {
string errmsg;
bool l = left.connect(serverHostname1, errmsg);
bool r = right.connect(serverHostname2, errmsg);
master = l ? NotSetL : NotSetR;
if( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow
return false;
try { checkMaster(); }
catch(UserAssertionException&) {
return false;
}
return true;
2008-10-19 18:17:25 +02:00
}
auto_ptr<DBClientCursor> DBClientPaired::query(const char *a, JSObj b, int c, int d,
JSObj *e, int f)
{
return checkMaster().query(a,b,c,d,e,f);
2008-10-19 18:17:25 +02:00
}
JSObj DBClientPaired::findOne(const char *a, JSObj b, JSObj *c, int d) {
return checkMaster().findOne(a,b,c,d);
2008-10-19 18:17:25 +02:00
}