mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 01:21:03 +01:00

944 lines
26 KiB

// repl.cpp
_ on a syncexception, don't allow going back to master state?
* Copyright (C) 2008 10gen Inc.
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* GNU Affero General Public License for more details.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "stdafx.h"
#include "jsobj.h"
#include "../util/goodies.h"
#include "repl.h"
#include "../grid/message.h"
#include "../client/dbclient.h"
#include "pdfile.h"
#include "query.h"
#include "json.h"
#include "db.h"
#include "commands.h"
extern boost::mutex dbMutex;
auto_ptr<Cursor> findTableScan(const char *ns, BSONObj& order, bool *isSorted=0);
bool userCreateNS(const char *ns, BSONObj& j, string& err);
int _updateObjects(const char *ns, BSONObj updateobj, BSONObj pattern, bool upsert, stringstream& ss, bool logOp=false);
bool _runCommands(const char *ns, BSONObj& jsobj, stringstream& ss, BufBuilder &b, BSONObjBuilder& anObjBuilder);
void ensureHaveIdIndex(const char *ns);
/* "dead" means something really bad happened like replication falling completely out of sync.
when non-null, we are dead and the string is informational
const char *allDead = 0;
#include "replset.h"
#define debugrepl(z) cout << "debugrepl " << z << '\n'
//define debugrepl
/* --- ReplPair -------------------------------- */
ReplPair *replPair = 0;
/* output by the web console */
string replInfo = "no repl yet";
/* peer unreachable, try our arbiter */
void ReplPair::arbitrate() {
if( arbHost == "-" ) {
// no arbiter. we are up, let's assume he is down and network is not partitioned.
setMaster(State_Master, "remote unreachable");
auto_ptr<DBClientConnection> conn( new DBClientConnection() );
string errmsg;
if( !conn->connect(arbHost.c_str(), errmsg) ) {
setMaster(State_CantArb, "can't connect to arb");
bool is_master;
BSONObj res = conn->cmdIsMaster(is_master);
/*findOne("admin.$cmd", ismasterobj);*/
if( res.isEmpty() ) {
setMaster(State_CantArb, "can't arb 2");
setMaster(State_Master, "remote down, arbiter reached");
/* --------------------------------------------- */
class CmdIsMaster : public Command {
CmdIsMaster() : Command("ismaster") { }
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result) {
if( allDead ) {
result.append("ismaster", 0.0);
if( replPair )
result.append("remote", replPair->remote);
result.append("info", allDead);
else if( replPair ) {
int x = replPair->state;
result.append("ismaster", replPair->state);
result.append("remote", replPair->remote);
if( replPair->info.empty() )
result.append("info", replPair->info);
else {
result.append("ismaster", 1);
result.append("msg", "not paired");
return true;
} cmdismaster;
/* negotiate who is master
-1=not set (probably means we just booted)
0=was slave
1=was master
remote,local -> new remote,local
!1,1 -> 0,1
1,!1 -> 1,0
-1,-1 -> dominant->1, nondom->0
0,0 -> dominant->1, nondom->0
1,1 -> dominant->1, nondom->0
{ negotiatemaster:1, i_was:<state>, your_name:<hostname> }
{ ok:1, you_are:..., i_am:... }
class CmdNegotiateMaster : public Command {
CmdNegotiateMaster() : Command("negotiatemaster") { }
virtual bool adminOnly() { return true; }
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result) {
if( replPair == 0 ) {
problem() << "got negotiatemaster cmd but we are not in paired mode." << endl;
errmsg = "not paired";
return false;
int was = cmdObj.getIntField("i_was");
string myname = cmdObj.getStringField("your_name");
if( myname.empty() || was < -1 ) {
errmsg = "your_name/i_was not specified";
return false;
int me, you;
if( was == replPair->state ) {
if( replPair->dominant(myname) ) {
else {
else if( was == 1 ) {
else {
replPair->state = me;
result.append("you_are", you);
result.append("i_am", me);
return true;
} cmdnegotiatemaster;
void ReplPair::negotiate(DBClientConnection *conn) {
BSONObjBuilder b;
b.append("i_was", state);
b.append("your_name", remoteHost);
BSONObj cmd = b.done();
BSONObj res = conn->findOne("admin.$cmd", cmd);
if( res.getIntField("ok") != 1 ) {
problem() << "negotiate fails: " << res.toString() << '\n';
int x = res.getIntField("you_are");
if( x != 0 && x != 1 ) {
problem() << "negotiate: bad you_are value " << res.toString() << endl;
OpTime last(0, 0);
OpTime OpTime::now() {
unsigned t = (unsigned) time(0);
if( last.secs == t ) {
return last;
last = OpTime(t, 1);
return last;
struct TestOpTime {
TestOpTime() {
OpTime t;
for( int i = 0; i < 10; i++ ) {
OpTime s = OpTime::now();
assert( s != t );
t = s;
OpTime q = t;
assert( q == t );
assert( !(q != t) );
} testoptime;
int test2() {
return 0;
/* --------------------------------------------------------------*/
ReplSource::ReplSource() {
nClonedThisPass = 0;
paired = false;
ReplSource::ReplSource(BSONObj o) : nClonedThisPass(0) {
paired = false;
only = o.getStringField("only");
hostName = o.getStringField("host");
_sourceName = o.getStringField("source");
uassert( "'host' field not set in sources collection object", !hostName.empty() );
uassert( "only source='main' allowed for now with replication", sourceName() == "main" );
BSONElement e = o.getField("syncedTo");
if( !e.eoo() ) {
uassert( "bad sources 'syncedTo' field value", e.type() == Date );
OpTime tmp( e.date() );
syncedTo = tmp;
//syncedTo.asDate() = e.date();
BSONObj dbsObj = o.getObjectField("dbs");
if( !dbsObj.isEmpty() ) {
BSONObjIterator i(dbsObj);
while( 1 ) {
BSONElement e = i.next();
if( e.eoo() )
dbs.insert( e.fieldName() );
/* Turn our C++ Source object into a BSONObj */
BSONObj ReplSource::jsobj() {
BSONObjBuilder b;
b.append("host", hostName);
b.append("source", sourceName());
if( !only.empty() )
b.append("only", only);
if( !syncedTo.isNull() )
b.appendDate("syncedTo", syncedTo.asDate());
BSONObjBuilder dbs_builder;
int n = 0;
for( set<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
dbs_builder.appendBool(i->c_str(), 1);
if( n )
b.append("dbs", dbs_builder.done());
return b.doneAndDecouple();
void ReplSource::save() {
BSONObjBuilder b;
assert( !hostName.empty() );
b.append("host", hostName);
// todo: finish allowing multiple source configs.
// this line doesn't work right when source is null, if that is allowed as it is now:
//b.append("source", _sourceName);
BSONObj pattern = b.done();
BSONObj o = jsobj();
stringstream ss;
int u = _updateObjects("local.sources", o, pattern, true/*upsert for pair feature*/, ss);
assert( u == 1 || u == 4 );
database = 0;
void ReplSource::cleanup(vector<ReplSource*>& v) {
for( vector<ReplSource*>::iterator i = v.begin(); i != v.end(); i++ )
delete *i;
string dashDashSource;
static void addSourceToList(vector<ReplSource*>&v, ReplSource& s, vector<ReplSource*>&old) {
for( vector<ReplSource*>::iterator i = old.begin(); i != old.end(); ) {
if( s == **i ) {
v.push_back( new ReplSource(s) );
/* we reuse our existing objects so that we can keep our existing connection
and cursor in effect.
void ReplSource::loadAll(vector<ReplSource*>& v) {
vector<ReplSource *> old = v;
v.erase(v.begin(), v.end());
bool gotPairWith = false;
if( !dashDashSource.empty() ) {
// --source <host> specified.
// check that no items are in sources other than that
// add if missing
auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
int n = 0;
while( c->ok() ) {
ReplSource tmp(c->current());
if( tmp.hostName != dashDashSource ) {
problem() << "--source " << dashDashSource << " != " << tmp.hostName << " from local.sources collection" << endl;
log() << "terminating after 30 seconds" << endl;
uassert( "local.sources collection corrupt?", n<2 );
if( n == 0 ) {
// source missing. add.
ReplSource s;
s.hostName = dashDashSource;
auto_ptr<Cursor> c = findTableScan("local.sources", emptyObj);
while( c->ok() ) {
ReplSource tmp(c->current());
if( replPair && tmp.hostName == replPair->remote && tmp.sourceName() == "main" ) {
gotPairWith = true;
tmp.paired = true;
addSourceToList(v, tmp, old);
database = 0;
if( !gotPairWith && replPair ) {
/* add the --pairwith server */
ReplSource *s = new ReplSource();
s->paired = true;
s->hostName = replPair->remote;
for( vector<ReplSource*>::iterator i = old.begin(); i != old.end(); i++ )
delete *i;
BSONObj opTimeQuery = fromjson("{getoptime:1}");
bool ReplSource::resync(string db) {
log() << "resync: dropping database " << db << endl;
string dummyns = db + ".";
assert( database->name == db );
log() << "resync: cloning database " << db << endl;
//Cloner c;
string errmsg;
bool ok = cloneFrom(hostName.c_str(), errmsg, database->name);
//bool ok = c.go(hostName.c_str(), errmsg);
if( !ok ) {
problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
throw SyncException();
log() << "resync: done " << db << endl;
/* add the db to our dbs array which we will write back to local.sources.
note we are not in a consistent state until the oplog gets applied,
which happens next when this returns.
return true;
/* local.$oplog.main is of the form:
{ ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
see logOp() comments.
void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op) {
char clientName[MaxClientLen];
const char *ns = op.getStringField("ns");
nsToClient(ns, clientName);
if( !only.empty() && only != clientName )
bool newDb = dbs.count(clientName) == 0;
if( newDb && nClonedThisPass ) {
/* we only clone one database per pass, even if a lot need done. This helps us
avoid overflowing the master's transaction log by doing too much work before going
back to read more transactions. (Imagine a scenario of slave startup where we try to
clone 100 databases in one pass.)
dblock lk;
bool justCreated = setClientTempNs(ns);
if( allDead ) {
log() << "allDead, throwing SyncException\n";
throw SyncException();
// operation type -- see logOp() comments for types
const char *opType = op.getStringField("op");
if( justCreated || /* datafiles were missing. so we need everything, no matter what sources object says */
newDb ) /* if not in dbs, we've never synced this database before, so we need everything */
if( op.getBoolField("first") ) {
log() << "pull: got {first:true} op ns:" << ns << '\n';
/* this is the first thing in the oplog ever, so we don't need to resync(). */
if( newDb )
problem() << "warning: justCreated && !newDb in repl " << op.toString() << endl;
else if( paired && !justCreated ) {
if( strcmp(opType,"db") == 0 && strcmp(ns, "admin.") == 0 ) {
// "admin" is a special namespace we use for priviledged commands -- ok if it exists first on
// either side
else {
/* the other half of our pair has some operations. yet we already had a db on our
disk even though the db in question is not listed in the source.
allDead = "pair: historical image missing for a db";
problem() << "pair: historical image missing for " << clientName << ", setting allDead=true" << endl;
log() << "op:" << op.toString() << endl;
log() << "TEMP: pair: assuming we have the historical image for: " <<
clientName << ". add extra checks here." << endl;
else {
stringstream ss;
BSONObj o = op.getObjectField("o");
try {
if( *opType == 'i' ) {
const char *p = strchr(ns, '.');
if( p && strcmp(p, ".system.indexes") == 0 ) {
// updates aren't allowed for indexes -- so we will do a regular insert. if index already
// exists, that is ok.
theDataFileMgr.insert(ns, (void*) o.objdata(), o.objsize());
else {
// do upserts for inserts as we might get replayed more than once
OID *oid = o.getOID();
if( oid == 0 ) {
_updateObjects(ns, o, o, true, ss);
else {
BSONObjBuilder b;
b.appendOID("_id", oid);
RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
_updateObjects(ns, o, b.done(), true, ss);
else if( *opType == 'u' ) {
RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
_updateObjects(ns, o, op.getObjectField("o2"), op.getBoolField("b"), ss);
else if( *opType == 'd' ) {
if( opType[1] == 0 )
deleteObjects(ns, o, op.getBoolField("b"));
assert( opType[1] == 'b' ); // "db" advertisement
else {
BufBuilder bb;
BSONObjBuilder ob;
assert( *opType == 'c' );
_runCommands(ns, o, ss, bb, ob);
catch( UserAssertionException& e ) {
log() << "sync: caught user assertion " << e.msg << '\n';
database = 0;
/* note: not yet in mutex at this point. */
void ReplSource::sync_pullOpLog() {
string ns = string("local.oplog.$") + sourceName();
debugrepl( "sync_pullOpLog " << ns );
bool tailing = true;
DBClientCursor *c = cursor.get();
if( c && c->isDead() ) {
log() << "pull: old cursor isDead, initiating a new one\n";
c = 0;
if( c == 0 ) {
// queryObj = { ts: { $gte: syncedTo } }
BSONObjBuilder q;
q.appendDate("$gte", syncedTo.asDate());
BSONObjBuilder query;
query.append("ts", q.done());
BSONObj queryObj = query.done();
debugrepl( ns << ".find(" << queryObj.toString() << ')' );
cursor = conn->query( ns.c_str(), queryObj, 0, 0, 0, Option_CursorTailable );
c = cursor.get();
tailing = false;
else {
debugrepl( "tailing=true" );
if( c == 0 ) {
problem() << "pull: dbclient::query returns null (conn closed?)" << endl;
// show any deferred database creates from a previous pass
set<string>::iterator i = addDbNextPass.begin();
if( i != addDbNextPass.end() ) {
BSONObjBuilder b;
b.append("ns", *i + '.');
b.append("op", "db");
BSONObj op = b.done();
if( !c->more() ) {
if( tailing ) {
debugrepl( "tailing & no new activity" );
} else
log() << "pull: " << ns << " oplog is empty\n";
int n = 0;
BSONObj op = c->next();
BSONElement ts = op.findElement("ts");
if( ts.type() != Date ) {
problem() << "pull: bad object read from remote oplog: " << op.toString() << '\n';
OpTime nextOpTime( ts.date() );
debugrepl( "first op time received: " << nextOpTime.toString() );
bool initial = syncedTo.isNull();
if( initial || tailing ) {
if( tailing ) {
assert( syncedTo < nextOpTime );
else {
log() << "pull: initial run\n";
else if( nextOpTime != syncedTo ) {
Logstream& l = log();
l << "pull: nextOpTime " << nextOpTime.toString() << ' ';
if( nextOpTime < syncedTo )
l << "<??";
l << ">";
l << " syncedTo " << syncedTo.toString() << '\n';
log() << "pull: time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs()) << "sec\n";
log() << "pull: tailing: " << tailing << '\n';
log() << "pull: data too stale, halting replication" << endl;
allDead = "data too stale halted replication";
assert( syncedTo < nextOpTime );
throw SyncException();
else {
/* t == syncedTo, so the first op was applied previously, no need to redo it. */
// apply operations
while( 1 ) {
if( !c->more() ) {
log() << "pull: applied " << n << " operations" << endl;
syncedTo = nextOpTime;
debugrepl( "end sync_pullOpLog syncedTo: " << syncedTo.toString() );
dblock lk;
save(); // note how far we are synced up to now
/* todo: get out of the mutex for the next()? */
BSONObj op = c->next();
ts = op.findElement("ts");
assert( ts.type() == Date );
OpTime last = nextOpTime;
OpTime tmp( ts.date() );
nextOpTime = tmp;
if( !( last < nextOpTime ) ) {
problem() << "sync error: last " << last.toString() << " >= nextOpTime " << nextOpTime.toString() << endl;
uassert("bad 'ts' value in sources", false);
/* note: not yet in mutex at this point.
returns true if everything happy. return false if you want to reconnect.
bool ReplSource::sync() {
log() << "pull: " << sourceName() << '@' << hostName << endl;
nClonedThisPass = 0;
if( (string("localhost") == hostName || string("") == hostName) && port == DBPort ) {
log() << "pull: can't sync from self (localhost). sources configuration may be wrong." << endl;
return false;
if( conn.get() == 0 ) {
conn = auto_ptr<DBClientConnection>(new DBClientConnection());
string errmsg;
if( !conn->connect(hostName.c_str(), errmsg) ) {
log() << "pull: cantconn " << errmsg << endl;
if( replPair && paired ) {
assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) );
return false;
if( paired )
// get current mtime at the server.
BSONObj o = conn->findOne("admin.$cmd", opTimeQuery);
BSONElement e = o.findElement("optime");
if( e.eoo() ) {
log() << "pull: failed to get cur optime from master" << endl;
log() << " " << o.toString() << endl;
return false;
uassert( e.type() == Date );
OpTime serverCurTime;
serverCurTime.asDate() = e.date();
return true;
/* -- Logging of operations -------------------------------------*/
// cached copies of these...
NamespaceDetails *localOplogMainDetails = 0;
Database *localOplogClient = 0;
/* we write to local.opload.$main:
{ ts : ..., op: ..., ns: ..., o: ... }
ts: an OpTime timestamp
"i" insert
"u" update
"d" delete
"c" db cmd
"db" declares presence of a database (ns is set to the db name + '.')
if not null, specifies a boolean to pass along to the other side as b: param.
used for "justOne" or "upsert" flags on 'd', 'u'
first: true
when set, indicates this is the first thing we have logged for this database.
thus, the slave does not need to copy down all the data when it sees this.
void _logOp(const char *opstr, const char *ns, BSONObj& obj, BSONObj *o2, bool *bb) {
if( strncmp(ns, "local.", 6) == 0 )
Database *oldClient = database;
bool haveLogged = database && database->haveLogged();
/* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
instead we do a single copy to the destination position in the memory mapped file.
BSONObjBuilder b;
b.appendDate("ts", OpTime::now().asDate());
b.append("op", opstr);
b.append("ns", ns);
if( bb )
b.appendBool("b", *bb);
if( o2 )
b.append("o2", *o2);
if( !haveLogged ) {
b.appendBool("first", true);
if( database ) // null on dropDatabase()'s logging.
BSONObj partial = b.done();
int posz = partial.objsize();
int len = posz + obj.objsize() + 1 + 2 /*o:*/;
if( localOplogMainDetails == 0 ) {
localOplogClient = database;
localOplogMainDetails = nsdetails("local.oplog.$main");
database = localOplogClient;
Record *r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, "local.oplog.$main", len);
char *p = r->data;
memcpy(p, partial.objdata(), posz);
*((unsigned *)p) += obj.objsize() + 1 + 2;
p += posz - 1;
*p++ = (char) Object;
*p++ = 'o';
*p++ = 0;
memcpy(p, obj.objdata(), obj.objsize());
p += obj.objsize();
*p = EOO;
//BSONObj temp(r);
//cout << "temp:" << temp.toString() << endl;
database = oldClient;
/* --------------------------------------------------------------*/
_ source has autoptr to the cursor
_ reuse that cursor when we can
void replMain() {
vector<ReplSource*> sources;
bool first = true;
while( 1 ) {
dblock lk;
if( !first && sources.empty() )
for( vector<ReplSource*>::iterator i = sources.begin(); i != sources.end(); i++ ) {
ReplSource *s = *i;
bool ok = false;
try {
ok = s->sync();
catch( SyncException& ) {
log() << "caught SyncException, sleeping 10 secs" << endl;
catch( AssertionException& e ) {
if( e.severe() ) {
log() << "replMain caught AssertionException, sleeping 1 minutes" << endl;
else {
log() << e.toString() << '\n';
if( !ok )
int debug_stop_repl = 0;
void replSlaveThread() {
while( 1 ) {
try {
if( debug_stop_repl )
catch( AssertionException& ) {
problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
/* used to verify that slave knows what databases we have */
void logOurDbsPresence() {
path dbs(dbpath);
directory_iterator end;
directory_iterator i(dbs);
dblock lk;
int k = 0;
while( i != end ) {
path p = *i;
string f = p.leaf();
if( endsWith(f.c_str(), ".ns") ) {
/* note: we keep trailing "." so that when slave calls setClient(ns) everything is happy; e.g.,
valid namespaces must always have a dot, even though here it is just a placeholder not
a real one
string dbname = string(f.c_str(), f.size() - 2);
if( dbname != "local." ) {
logOp("db", dbname.c_str(), emptyObj);
database = 0;
/* we have to log the db presence periodically as that "advertisement" will roll out of the log
as it is of finite length. also as we only do one db cloning per pass, we could skip over a bunch of
advertisements and thus need to see them again later. so this mechanism can actually be very slow to
work, and should be improved.
void replMasterThread() {
// if you are testing, you might finish test and shutdown in less than 10
// minutes yet not have done something in first 15 -- this is to exercise
// this code some.
while( 1 ) {
sleepsecs(60 * 10);
void startReplication() {
if( slave || replPair ) {
if( slave )
log() << "slave=true" << endl;
slave = true;
boost::thread repl_thread(replSlaveThread);
if( master || replPair ) {
if( master )
log() << "master=true" << endl;
master = true;
dblock lk;
/* create an oplog collection, if it doesn't yet exist. */
BSONObjBuilder b;
double sz = 50.0 * 1000 * 1000;
if( sizeof(int *) >= 8 )
sz = 990.0 * 1000 * 1000;
b.append("size", sz);
b.appendBool("capped", 1);
string err;
BSONObj o = b.done();
userCreateNS("local.oplog.$main", o, err);
database = 0;
boost::thread mt(replMasterThread);
/* called from main at server startup */
void pairWith(const char *remoteEnd, const char *arb) {
replPair = new ReplPair(remoteEnd, arb);