2010-07-18 19:41:20 +02:00
|
|
|
/** @file oplogreader.h */
|
|
|
|
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include "../client/dbclient.h"
|
|
|
|
#include "../client/constants.h"
|
2010-07-23 20:52:14 +02:00
|
|
|
#include "dbhelpers.h"
|
2010-07-18 19:41:20 +02:00
|
|
|
|
|
|
|
namespace mongo {
|
|
|
|
|
|
|
|
/* started abstracting out the querying of the primary/master's oplog
|
|
|
|
still fairly awkward but a start.
|
|
|
|
*/
|
|
|
|
class OplogReader {
|
|
|
|
auto_ptr<DBClientConnection> _conn;
|
|
|
|
auto_ptr<DBClientCursor> cursor;
|
|
|
|
public:
|
|
|
|
|
2010-07-30 22:21:28 +02:00
|
|
|
OplogReader() {
|
|
|
|
}
|
|
|
|
~OplogReader() {
|
|
|
|
}
|
|
|
|
|
2010-07-21 19:13:36 +02:00
|
|
|
void resetCursor() {
|
2010-07-18 19:41:20 +02:00
|
|
|
cursor.reset();
|
|
|
|
}
|
|
|
|
void resetConnection() {
|
|
|
|
cursor.reset();
|
|
|
|
_conn.reset();
|
|
|
|
}
|
|
|
|
DBClientConnection* conn() { return _conn.get(); }
|
2010-07-21 22:30:32 +02:00
|
|
|
BSONObj findOne(const char *ns, const Query& q) {
|
2010-11-09 23:28:32 +01:00
|
|
|
return conn()->findOne(ns, q, 0, QueryOption_SlaveOk);
|
2010-07-18 19:41:20 +02:00
|
|
|
}
|
|
|
|
|
2010-07-18 21:02:46 +02:00
|
|
|
BSONObj getLastOp(const char *ns) {
|
2010-07-23 20:52:14 +02:00
|
|
|
return findOne(ns, Query().sort(reverseNaturalObj));
|
2010-07-18 21:02:46 +02:00
|
|
|
}
|
|
|
|
|
2010-07-18 19:41:20 +02:00
|
|
|
/* ok to call if already connected */
|
|
|
|
bool connect(string hostname);
|
|
|
|
|
2010-07-20 19:37:09 +02:00
|
|
|
void tailCheck() {
|
2010-07-18 19:41:20 +02:00
|
|
|
if( cursor.get() && cursor->isDead() ) {
|
2010-07-20 19:37:09 +02:00
|
|
|
log() << "repl: old cursor isDead, will initiate a new one" << endl;
|
2010-07-21 19:13:36 +02:00
|
|
|
resetCursor();
|
2010-07-18 19:41:20 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool haveCursor() { return cursor.get() != 0; }
|
|
|
|
|
2010-08-02 20:03:33 +02:00
|
|
|
void query(const char *ns, const BSONObj& query) {
|
|
|
|
assert( !haveCursor() );
|
2010-08-02 19:19:18 +02:00
|
|
|
cursor = _conn->query(ns, query, 0, 0, 0, QueryOption_SlaveOk);
|
2010-08-02 20:03:33 +02:00
|
|
|
}
|
|
|
|
|
2010-07-20 04:40:22 +02:00
|
|
|
void tailingQuery(const char *ns, const BSONObj& query) {
|
2010-07-18 19:41:20 +02:00
|
|
|
assert( !haveCursor() );
|
|
|
|
log(2) << "repl: " << ns << ".find(" << query.toString() << ')' << endl;
|
|
|
|
cursor = _conn->query( ns, query, 0, 0, 0,
|
|
|
|
QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay |
|
2010-08-02 20:03:33 +02:00
|
|
|
/* TODO: slaveok maybe shouldn't use? */
|
2010-07-18 19:41:20 +02:00
|
|
|
QueryOption_AwaitData
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2010-07-20 04:32:43 +02:00
|
|
|
void tailingQueryGTE(const char *ns, OpTime t) {
|
|
|
|
BSONObjBuilder q;
|
|
|
|
q.appendDate("$gte", t.asDate());
|
|
|
|
BSONObjBuilder query;
|
|
|
|
query.append("ts", q.done());
|
|
|
|
tailingQuery(ns, query.done());
|
|
|
|
}
|
|
|
|
|
2010-07-18 19:41:20 +02:00
|
|
|
bool more() {
|
|
|
|
assert( cursor.get() );
|
|
|
|
return cursor->more();
|
|
|
|
}
|
2010-07-20 17:05:27 +02:00
|
|
|
bool moreInCurrentBatch() {
|
|
|
|
assert( cursor.get() );
|
|
|
|
return cursor->moreInCurrentBatch();
|
|
|
|
}
|
2010-07-18 19:41:20 +02:00
|
|
|
|
|
|
|
/* old mongod's can't do the await flag... */
|
|
|
|
bool awaitCapable() {
|
|
|
|
return cursor->hasResultFlag(ResultFlag_AwaitCapable);
|
|
|
|
}
|
|
|
|
|
|
|
|
void peek(vector<BSONObj>& v, int n) {
|
|
|
|
if( cursor.get() )
|
|
|
|
cursor->peek(v,n);
|
|
|
|
}
|
|
|
|
|
2010-07-20 04:32:43 +02:00
|
|
|
BSONObj nextSafe() { return cursor->nextSafe(); }
|
|
|
|
|
2010-07-18 19:41:20 +02:00
|
|
|
BSONObj next() {
|
|
|
|
return cursor->next();
|
|
|
|
}
|
|
|
|
|
|
|
|
void putBack(BSONObj op) {
|
|
|
|
cursor->putBack(op);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|