mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 01:21:03 +01:00
prep for op passthrough for w SERVER-992
This commit is contained in:
parent
a6684a66aa
commit
ae459f9ef5
@ -12,8 +12,8 @@ namespace mongo {
|
||||
still fairly awkward but a start.
|
||||
*/
|
||||
class OplogReader {
|
||||
auto_ptr<DBClientConnection> _conn;
|
||||
auto_ptr<DBClientCursor> cursor;
|
||||
shared_ptr<DBClientConnection> _conn;
|
||||
shared_ptr<DBClientCursor> cursor;
|
||||
public:
|
||||
|
||||
OplogReader() {
|
||||
@ -40,6 +40,9 @@ namespace mongo {
|
||||
/* ok to call if already connected */
|
||||
bool connect(string hostname);
|
||||
|
||||
bool connect(const string& from, const string& to);
|
||||
|
||||
|
||||
void tailCheck() {
|
||||
if( cursor.get() && cursor->isDead() ) {
|
||||
log() << "repl: old cursor isDead, will initiate a new one" << endl;
|
||||
@ -62,22 +65,28 @@ namespace mongo {
|
||||
query(ns, q2.done());
|
||||
}
|
||||
|
||||
void tailingQuery(const char *ns, const BSONObj& query) {
|
||||
void tailingQuery(const char *ns, const BSONObj& query, const BSONObj* fields=0) {
|
||||
assert( !haveCursor() );
|
||||
log(2) << "repl: " << ns << ".find(" << query.toString() << ')' << endl;
|
||||
cursor = _conn->query( ns, query, 0, 0, 0,
|
||||
cursor = _conn->query( ns, query, 0, 0, fields,
|
||||
QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay |
|
||||
/* TODO: slaveok maybe shouldn't use? */
|
||||
QueryOption_AwaitData
|
||||
);
|
||||
}
|
||||
|
||||
void tailingQueryGTE(const char *ns, OpTime t) {
|
||||
void tailingQueryGTE(const char *ns, OpTime t, const BSONObj* fields=0) {
|
||||
BSONObjBuilder q;
|
||||
q.appendDate("$gte", t.asDate());
|
||||
BSONObjBuilder query;
|
||||
query.append("ts", q.done());
|
||||
tailingQuery(ns, query.done());
|
||||
tailingQuery(ns, query.done(), fields);
|
||||
}
|
||||
|
||||
/* Do a tailing query, but only send the ts field back. */
|
||||
void ghostQueryGTE(const char *ns, OpTime t) {
|
||||
const BSONObj fields = BSON("ts" << 1 << "_id" << 0);
|
||||
return tailingQueryGTE(ns, t, &fields);
|
||||
}
|
||||
|
||||
bool more() {
|
||||
@ -108,6 +117,10 @@ namespace mongo {
|
||||
void putBack(BSONObj op) {
|
||||
cursor->putBack(op);
|
||||
}
|
||||
|
||||
private:
|
||||
bool commonConnect(const string& hostName);
|
||||
bool passthroughHandshake(const string& f);
|
||||
};
|
||||
|
||||
}
|
||||
|
39
db/repl.cpp
39
db/repl.cpp
@ -48,6 +48,7 @@
|
||||
#include "repl_block.h"
|
||||
#include "repl/rs.h"
|
||||
#include "replutil.h"
|
||||
#include "repl/connections.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
@ -1092,14 +1093,13 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool OplogReader::connect(string hostName) {
|
||||
bool OplogReader::commonConnect(const string& hostName) {
|
||||
if( conn() == 0 ) {
|
||||
_conn = auto_ptr<DBClientConnection>(new DBClientConnection( false, 0, 0 /* tcp timeout */));
|
||||
_conn = shared_ptr<DBClientConnection>(new DBClientConnection( false, 0, 0 /* tcp timeout */));
|
||||
string errmsg;
|
||||
ReplInfo r("trying to connect to sync source");
|
||||
if ( !_conn->connect(hostName.c_str(), errmsg) ||
|
||||
(!noauth && !replAuthenticate(_conn.get())) ||
|
||||
!replHandshake(_conn.get()) ) {
|
||||
(!noauth && !replAuthenticate(_conn.get())) ) {
|
||||
resetConnection();
|
||||
log() << "repl: " << errmsg << endl;
|
||||
return false;
|
||||
@ -1107,6 +1107,37 @@ namespace mongo {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool OplogReader::connect(string hostName) {
|
||||
if (commonConnect(hostName)) {
|
||||
return replHandshake(_conn.get());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool OplogReader::connect(const string& from, const string& to) {
|
||||
if (commonConnect(to)) {
|
||||
return passthroughHandshake(from);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool OplogReader::passthroughHandshake(const string& f) {
|
||||
ScopedConn from(f);
|
||||
|
||||
BSONObj me = from.findOne("local.me", BSONObj(), NULL, 0);
|
||||
if (me.isEmpty() || !me.hasField("_id")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BSONObjBuilder cmd;
|
||||
cmd.appendAs( me["_id"] , "handshake" );
|
||||
|
||||
BSONObj res;
|
||||
bool ok = conn()->runCommand( "admin" , cmd.obj() , res );
|
||||
log(ok) << "replHandshake res not: " << ok << " res: " << res << endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* note: not yet in mutex at this point.
|
||||
returns >= 0 if ok. return -1 if you want to reconnect.
|
||||
|
Loading…
Reference in New Issue
Block a user