2008-07-28 00:36:47 +02:00
// repl.cpp
2008-09-06 00:04:29 +02:00
/* TODO
PAIRING
_ on a syncexception , don ' t allow going back to master state ?
*/
2008-07-28 00:36:47 +02:00
/**
* Copyright ( C ) 2008 10 gen Inc .
2008-12-29 02:28:49 +01:00
*
2008-07-28 00:36:47 +02:00
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU Affero General Public License , version 3 ,
* as published by the Free Software Foundation .
2008-12-29 02:28:49 +01:00
*
2008-07-28 00:36:47 +02:00
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU Affero General Public License for more details .
2008-12-29 02:28:49 +01:00
*
2008-07-28 00:36:47 +02:00
* You should have received a copy of the GNU Affero General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
2008-12-15 23:23:54 +01:00
/* Collections we use:
local . sources - indicates what sources we pull from as a " slave " , and the last update of each
local . oplog . $ main - our op log as " master "
2009-03-17 18:48:40 +01:00
local . dbinfo . < dbname >
2008-12-15 23:23:54 +01:00
local . pair . startup - can contain a special value indicating for a pair that we have the master copy .
used when replacing other half of the pair which has permanently failed .
2008-12-29 02:28:49 +01:00
local . pair . sync - { initialsynccomplete : 1 }
2008-12-15 23:23:54 +01:00
*/
2008-07-28 00:36:47 +02:00
# include "stdafx.h"
# include "jsobj.h"
# include "../util/goodies.h"
# include "repl.h"
2009-02-03 19:30:28 +01:00
# include "../util/message.h"
2008-10-20 00:46:53 +02:00
# include "../client/dbclient.h"
2008-07-28 00:36:47 +02:00
# include "pdfile.h"
2008-07-28 19:51:39 +02:00
# include "query.h"
2008-08-02 20:58:15 +02:00
# include "db.h"
2008-09-11 15:15:34 +02:00
# include "commands.h"
2009-01-20 17:05:53 +01:00
# include "security.h"
2009-08-25 20:35:22 +02:00
# include "cmdline.h"
2008-07-28 00:36:47 +02:00
2009-01-14 23:09:51 +01:00
namespace mongo {
2009-05-06 17:32:48 +02:00
extern boost : : recursive_mutex & dbMutex ;
2009-01-15 16:17:11 +01:00
void ensureHaveIdIndex ( const char * ns ) ;
/* if 1 sync() is running */
int syncing = 0 ;
/* if true replace our peer in a replication pair -- don't worry about if his
local . oplog . $ main is empty .
*/
bool replacePeer = false ;
2008-12-15 23:23:54 +01:00
2009-01-15 16:17:11 +01:00
/* "dead" means something really bad happened like replication falling completely out of sync.
when non - null , we are dead and the string is informational
*/
2009-02-04 19:22:02 +01:00
const char * replAllDead = 0 ;
2008-12-01 20:55:36 +01:00
2009-02-02 17:15:24 +01:00
extern bool autoresync ;
time_t lastForcedResync = 0 ;
2009-05-11 16:46:54 +02:00
IdTracker & idTracker = * ( new IdTracker ( ) ) ;
2009-04-23 20:44:05 +02:00
2009-01-14 23:09:51 +01:00
} // namespace mongo
2008-09-04 16:33:56 +02:00
# include "replset.h"
2009-01-14 23:09:51 +01:00
namespace mongo {
2009-01-15 16:17:11 +01:00
PairSync * pairSync = new PairSync ( ) ;
bool getInitialSyncCompleted ( ) {
return pairSync - > initialSyncCompleted ( ) ;
}
2008-12-29 04:01:18 +01:00
2009-01-15 16:17:11 +01:00
/* --- ReplPair -------------------------------- */
2008-09-11 21:13:47 +02:00
2009-01-15 16:17:11 +01:00
ReplPair * replPair = 0 ;
2008-09-04 16:33:56 +02:00
2009-01-15 16:17:11 +01:00
/* output by the web console */
const char * replInfo = " " ;
struct ReplInfo {
ReplInfo ( const char * msg ) {
replInfo = msg ;
}
~ ReplInfo ( ) {
replInfo = " ? " ;
}
} ;
2008-09-11 21:13:47 +02:00
2009-01-15 16:17:11 +01:00
void ReplPair : : setMaster ( int n , const char * _comment ) {
if ( n = = State_Master & & ! getInitialSyncCompleted ( ) )
return ;
info = _comment ;
2009-08-25 20:35:22 +02:00
if ( n ! = state & & ! cmdLine . quiet )
2009-01-15 16:17:11 +01:00
log ( ) < < " pair: setting master= " < < n < < " was " < < state < < ' \n ' ;
state = n ;
2008-09-11 21:13:47 +02:00
}
2009-01-15 16:17:11 +01:00
/* peer unreachable, try our arbiter */
void ReplPair : : arbitrate ( ) {
ReplInfo r ( " arbitrate " ) ;
2008-09-11 21:13:47 +02:00
2009-01-15 16:17:11 +01:00
if ( arbHost = = " - " ) {
2009-02-02 15:53:01 +01:00
// no arbiter. we are up, let's assume partner is down and network is not partitioned.
2009-01-15 16:17:11 +01:00
setMasterLocked ( State_Master , " remote unreachable " ) ;
return ;
}
2008-09-11 21:13:47 +02:00
2009-01-15 16:17:11 +01:00
auto_ptr < DBClientConnection > conn ( newClientConnection ( ) ) ;
string errmsg ;
if ( ! conn - > connect ( arbHost . c_str ( ) , errmsg ) ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: cantconn arbiter " < < errmsg < < endl ;
2009-01-15 16:17:11 +01:00
setMasterLocked ( State_CantArb , " can't connect to arb " ) ;
return ;
}
2008-09-11 21:13:47 +02:00
2009-02-02 15:53:01 +01:00
negotiate ( conn . get ( ) , " arbiter " ) ;
2008-12-29 02:28:49 +01:00
}
2009-01-15 16:17:11 +01:00
/* --------------------------------------------- */
class CmdReplacePeer : public Command {
public :
virtual bool slaveOk ( ) {
return true ;
2008-12-29 02:28:49 +01:00
}
2009-01-15 16:17:11 +01:00
virtual bool adminOnly ( ) {
return true ;
2008-12-15 23:23:54 +01:00
}
2009-01-15 16:17:11 +01:00
virtual bool logTheOp ( ) {
2008-12-15 23:23:54 +01:00
return false ;
}
2009-01-15 16:17:11 +01:00
CmdReplacePeer ( ) : Command ( " replacepeer " ) { }
virtual bool run ( const char * ns , BSONObj & cmdObj , string & errmsg , BSONObjBuilder & result , bool fromRepl ) {
if ( replPair = = 0 ) {
errmsg = " not paired " ;
return false ;
2008-12-15 23:23:54 +01:00
}
2009-01-15 16:17:11 +01:00
if ( ! getInitialSyncCompleted ( ) ) {
errmsg = " not caught up cannot replace peer " ;
2008-12-16 00:00:10 +01:00
return false ;
}
2009-01-15 16:17:11 +01:00
if ( syncing < 0 ) {
errmsg = " replacepeer already invoked " ;
return false ;
}
Timer t ;
while ( 1 ) {
if ( syncing = = 0 | | t . millis ( ) > 20000 )
break ;
{
dbtemprelease t ;
sleepmillis ( 10 ) ;
}
}
if ( syncing ) {
assert ( syncing > 0 ) ;
errmsg = " timeout waiting for sync() to finish " ;
return false ;
}
{
2009-04-01 22:00:56 +02:00
ReplSource : : SourceVector sources ;
2009-01-15 16:17:11 +01:00
ReplSource : : loadAll ( sources ) ;
if ( sources . size ( ) ! = 1 ) {
errmsg = " local.sources.count() != 1, cannot replace peer " ;
return false ;
}
}
{
2009-01-18 23:48:44 +01:00
Helpers : : emptyCollection ( " local.sources " ) ;
2009-01-15 16:17:11 +01:00
BSONObj o = fromjson ( " { \" replacepeer \" :1} " ) ;
2009-01-18 23:48:44 +01:00
Helpers : : putSingleton ( " local.pair.startup " , o ) ;
2009-01-15 16:17:11 +01:00
}
syncing = - 1 ;
2009-02-04 19:22:02 +01:00
replAllDead = " replacepeer invoked -- adjust local.sources hostname then restart this db process " ;
2009-01-15 16:17:11 +01:00
result . append ( " info " , " adjust local.sources hostname; db restart now required " ) ;
return true ;
2008-12-16 00:00:10 +01:00
}
2009-01-15 16:17:11 +01:00
} cmdReplacePeer ;
2008-12-15 23:23:54 +01:00
2009-04-23 18:16:18 +02:00
class CmdForceDead : public Command {
public :
virtual bool slaveOk ( ) {
return true ;
}
virtual bool adminOnly ( ) {
return true ;
}
virtual bool logTheOp ( ) {
return false ;
}
CmdForceDead ( ) : Command ( " forcedead " ) { }
virtual bool run ( const char * ns , BSONObj & cmdObj , string & errmsg , BSONObjBuilder & result , bool fromRepl ) {
replAllDead = " forced by command " ;
return true ;
}
} cmdForceDead ;
2009-08-10 22:57:59 +02:00
/* operator requested resynchronization of replication (on the slave). { resync : 1 } */
2009-01-29 17:46:45 +01:00
class CmdResync : public Command {
public :
virtual bool slaveOk ( ) {
return true ;
}
virtual bool adminOnly ( ) {
return true ;
}
virtual bool logTheOp ( ) {
return false ;
}
CmdResync ( ) : Command ( " resync " ) { }
virtual bool run ( const char * ns , BSONObj & cmdObj , string & errmsg , BSONObjBuilder & result , bool fromRepl ) {
2009-04-22 16:52:32 +02:00
if ( cmdObj . getBoolField ( " force " ) ) {
if ( ! waitForSyncToFinish ( errmsg ) )
return false ;
replAllDead = " resync forced " ;
}
2009-02-04 19:22:02 +01:00
if ( ! replAllDead ) {
2009-01-29 17:46:45 +01:00
errmsg = " not dead, no need to resync " ;
return false ;
}
2009-04-22 16:52:32 +02:00
if ( ! waitForSyncToFinish ( errmsg ) )
return false ;
2009-03-31 17:44:35 +02:00
2009-04-22 16:52:32 +02:00
ReplSource : : forceResyncDead ( " client " ) ;
result . append ( " info " , " triggered resync for all sources " ) ;
return true ;
}
bool waitForSyncToFinish ( string & errmsg ) const {
2009-03-31 17:44:35 +02:00
// Wait for slave thread to finish syncing, so sources will be be
// reloaded with new saved state on next pass.
Timer t ;
while ( 1 ) {
if ( syncing = = 0 | | t . millis ( ) > 20000 )
break ;
{
dbtemprelease t ;
sleepmillis ( 10 ) ;
}
}
if ( syncing ) {
errmsg = " timeout waiting for sync() to finish " ;
return false ;
}
2009-04-22 16:52:32 +02:00
return true ;
}
2009-01-29 17:46:45 +01:00
} cmdResync ;
2009-01-15 16:17:11 +01:00
class CmdIsMaster : public Command {
public :
2009-01-24 22:05:12 +01:00
virtual bool requiresAuth ( ) { return false ; }
2009-01-15 16:17:11 +01:00
virtual bool slaveOk ( ) {
return true ;
}
CmdIsMaster ( ) : Command ( " ismaster " ) { }
virtual bool run ( const char * ns , BSONObj & cmdObj , string & errmsg , BSONObjBuilder & result , bool /*fromRepl*/ ) {
2009-01-24 22:05:12 +01:00
/* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not
authenticated .
we allow unauthenticated ismaster but we aren ' t as verbose informationally if
one is not authenticated for admin db to be safe .
*/
2009-10-12 21:12:16 +02:00
AuthenticationInfo * ai = currentClient . get ( ) - > ai ;
2009-10-09 20:59:44 +02:00
bool authed = ai - > isAuthorized ( " admin " ) ;
2009-01-24 22:05:12 +01:00
2009-02-04 19:22:02 +01:00
if ( replAllDead ) {
2009-01-15 16:17:11 +01:00
result . append ( " ismaster " , 0.0 ) ;
2009-01-24 22:05:12 +01:00
if ( authed ) {
if ( replPair )
result . append ( " remote " , replPair - > remote ) ;
2009-02-04 19:22:02 +01:00
result . append ( " info " , replAllDead ) ;
2009-01-24 22:05:12 +01:00
}
2009-01-15 16:17:11 +01:00
}
else if ( replPair ) {
result . append ( " ismaster " , replPair - > state ) ;
2009-01-24 22:05:12 +01:00
if ( authed ) {
result . append ( " remote " , replPair - > remote ) ;
if ( ! replPair - > info . empty ( ) )
result . append ( " info " , replPair - > info ) ;
}
}
2009-01-15 16:17:11 +01:00
else {
2009-07-20 17:23:12 +02:00
result . append ( " ismaster " , slave ? 0 : 1 ) ;
2009-01-24 22:05:12 +01:00
result . append ( " msg " , " not paired " ) ;
2009-01-15 16:17:11 +01:00
}
2009-01-29 23:26:07 +01:00
2009-01-15 16:17:11 +01:00
return true ;
2008-11-21 00:03:41 +01:00
}
2009-01-15 16:17:11 +01:00
} cmdismaster ;
2009-04-28 21:33:56 +02:00
class CmdIsInitialSyncComplete : public Command {
public :
virtual bool requiresAuth ( ) { return false ; }
virtual bool slaveOk ( ) {
return true ;
}
CmdIsInitialSyncComplete ( ) : Command ( " isinitialsynccomplete " ) { }
virtual bool run ( const char * ns , BSONObj & cmdObj , string & errmsg , BSONObjBuilder & result , bool /*fromRepl*/ ) {
result . appendBool ( " initialsynccomplete " , getInitialSyncCompleted ( ) ) ;
return true ;
}
} cmdisinitialsynccomplete ;
2009-01-15 16:17:11 +01:00
/* negotiate who is master
- 1 = not set ( probably means we just booted )
0 = was slave
1 = was master
remote , local - > new remote , local
! 1 , 1 - > 0 , 1
1 , ! 1 - > 1 , 0
- 1 , - 1 - > dominant - > 1 , nondom - > 0
0 , 0 - > dominant - > 1 , nondom - > 0
1 , 1 - > dominant - > 1 , nondom - > 0
{ negotiatemaster : 1 , i_was : < state > , your_name : < hostname > }
returns :
{ ok : 1 , you_are : . . . , i_am : . . . }
*/
class CmdNegotiateMaster : public Command {
public :
CmdNegotiateMaster ( ) : Command ( " negotiatemaster " ) { }
virtual bool slaveOk ( ) {
return true ;
2008-09-11 15:15:34 +02:00
}
2009-01-15 16:17:11 +01:00
virtual bool adminOnly ( ) {
return true ;
2008-09-11 15:15:34 +02:00
}
2008-09-11 19:48:30 +02:00
2009-01-15 16:17:11 +01:00
virtual bool run ( const char * ns , BSONObj & cmdObj , string & errmsg , BSONObjBuilder & result , bool ) {
if ( replPair = = 0 ) {
2009-05-28 19:35:39 +02:00
massert ( " Another mongod instance believes incorrectly that this node is its peer " , ! cmdObj . getBoolField ( " fromArbiter " ) ) ;
2009-02-02 15:53:01 +01:00
// assume that we are an arbiter and should forward the request
string host = cmdObj . getStringField ( " your_name " ) ;
int port = cmdObj . getIntField ( " your_port " ) ;
if ( port = = INT_MIN ) {
errmsg = " no port specified " ;
problem ( ) < < errmsg < < endl ;
return false ;
}
stringstream ss ;
ss < < host < < " : " < < port ;
string remote = ss . str ( ) ;
2009-05-27 19:55:30 +02:00
BSONObj ret ;
{
dbtemprelease t ;
auto_ptr < DBClientConnection > conn ( new DBClientConnection ( ) ) ;
if ( ! conn - > connect ( remote . c_str ( ) , errmsg ) ) {
result . append ( " you_are " , ReplPair : : State_Master ) ;
return true ;
}
2009-05-28 19:35:39 +02:00
BSONObjBuilder forwardCommand ;
forwardCommand . appendElements ( cmdObj ) ;
forwardCommand . appendBool ( " fromArbiter " , true ) ;
ret = conn - > findOne ( " admin.$cmd " , forwardCommand . done ( ) ) ;
2009-02-02 15:53:01 +01:00
}
BSONObjIterator i ( ret ) ;
2009-06-09 17:43:04 +02:00
while ( i . moreWithEOO ( ) ) {
2009-02-02 15:53:01 +01:00
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
if ( e . fieldName ( ) ! = string ( " ok " ) )
result . append ( e ) ;
}
return ( ret . getIntField ( " ok " ) = = 1 ) ;
2009-01-15 16:17:11 +01:00
}
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
int was = cmdObj . getIntField ( " i_was " ) ;
string myname = cmdObj . getStringField ( " your_name " ) ;
2009-04-03 20:24:05 +02:00
if ( myname . empty ( ) | | was < - 3 ) {
2009-01-15 16:17:11 +01:00
errmsg = " your_name/i_was not specified " ;
return false ;
}
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
int N = ReplPair : : State_Negotiating ;
int M = ReplPair : : State_Master ;
int S = ReplPair : : State_Slave ;
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
if ( ! replPair - > dominant ( myname ) ) {
result . append ( " you_are " , N ) ;
2009-04-10 23:03:45 +02:00
result . append ( " i_am " , replPair - > state ) ;
2009-01-15 16:17:11 +01:00
return true ;
}
2008-12-29 02:28:49 +01:00
2009-01-15 16:17:11 +01:00
int me , you ;
if ( ! getInitialSyncCompleted ( ) | | ( replPair - > state ! = M & & was = = M ) ) {
me = S ;
you = M ;
}
else {
me = M ;
you = S ;
}
replPair - > setMaster ( me , " CmdNegotiateMaster::run() " ) ;
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
result . append ( " you_are " , you ) ;
result . append ( " i_am " , me ) ;
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
return true ;
}
} cmdnegotiatemaster ;
2009-02-02 15:53:01 +01:00
2009-04-10 23:03:45 +02:00
int ReplPair : : negotiate ( DBClientConnection * conn , string method ) {
2009-01-15 16:17:11 +01:00
BSONObjBuilder b ;
b . append ( " negotiatemaster " , 1 ) ;
b . append ( " i_was " , state ) ;
b . append ( " your_name " , remoteHost ) ;
2009-02-02 15:53:01 +01:00
b . append ( " your_port " , remotePort ) ;
2009-01-15 16:17:11 +01:00
BSONObj cmd = b . done ( ) ;
BSONObj res = conn - > findOne ( " admin.$cmd " , cmd ) ;
if ( res . getIntField ( " ok " ) ! = 1 ) {
2009-02-02 15:53:01 +01:00
string message = method + " negotiate failed " ;
problem ( ) < < message < < " : " < < res . toString ( ) < < ' \n ' ;
setMasterLocked ( State_Confused , message . c_str ( ) ) ;
2009-04-10 23:03:45 +02:00
return State_Confused ;
2009-01-15 16:17:11 +01:00
}
int x = res . getIntField ( " you_are " ) ;
2009-04-10 23:03:45 +02:00
int remote = res . getIntField ( " i_am " ) ;
2009-01-15 16:17:11 +01:00
// State_Negotiating means the remote node is not dominant and cannot
// choose who is master.
if ( x ! = State_Slave & & x ! = State_Master & & x ! = State_Negotiating ) {
2009-02-02 15:53:01 +01:00
problem ( ) < < method < < " negotiate: bad you_are value " < < res . toString ( ) < < endl ;
2009-04-10 23:03:45 +02:00
} else if ( x ! = State_Negotiating ) {
2009-02-02 15:53:01 +01:00
string message = method + " negotiation " ;
setMasterLocked ( x , message . c_str ( ) ) ;
2009-01-15 16:17:11 +01:00
}
2009-04-10 23:03:45 +02:00
return remote ;
2009-01-15 16:17:11 +01:00
}
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
struct TestOpTime {
TestOpTime ( ) {
OpTime t ;
for ( int i = 0 ; i < 10 ; i + + ) {
OpTime s = OpTime : : now ( ) ;
assert ( s ! = t ) ;
t = s ;
}
OpTime q = t ;
assert ( q = = t ) ;
assert ( ! ( q ! = t ) ) ;
2008-12-29 02:28:49 +01:00
}
2009-01-15 16:17:11 +01:00
} testoptime ;
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
/* --------------------------------------------------------------*/
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
ReplSource : : ReplSource ( ) {
replacing = false ;
nClonedThisPass = 0 ;
paired = false ;
2008-12-29 02:28:49 +01:00
}
2008-09-11 15:15:34 +02:00
2009-01-15 16:17:11 +01:00
ReplSource : : ReplSource ( BSONObj o ) : nClonedThisPass ( 0 ) {
replacing = false ;
paired = false ;
only = o . getStringField ( " only " ) ;
hostName = o . getStringField ( " host " ) ;
_sourceName = o . getStringField ( " source " ) ;
uassert ( " 'host' field not set in sources collection object " , ! hostName . empty ( ) ) ;
uassert ( " only source='main' allowed for now with replication " , sourceName ( ) = = " main " ) ;
BSONElement e = o . getField ( " syncedTo " ) ;
if ( ! e . eoo ( ) ) {
2009-10-09 19:10:04 +02:00
uassert ( " bad sources 'syncedTo' field value " , e . type ( ) = = Date | | e . type ( ) = = Timestamp ) ;
2009-01-15 16:17:11 +01:00
OpTime tmp ( e . date ( ) ) ;
syncedTo = tmp ;
}
2008-07-28 19:51:39 +02:00
2009-03-31 20:05:20 +02:00
BSONObj dbsObj = o . getObjectField ( " dbsNextPass " ) ;
if ( ! dbsObj . isEmpty ( ) ) {
BSONObjIterator i ( dbsObj ) ;
while ( 1 ) {
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
addDbNextPass . insert ( e . fieldName ( ) ) ;
}
}
2009-04-16 17:36:06 +02:00
dbsObj = o . getObjectField ( " incompleteCloneDbs " ) ;
if ( ! dbsObj . isEmpty ( ) ) {
BSONObjIterator i ( dbsObj ) ;
while ( 1 ) {
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
incompleteCloneDbs . insert ( e . fieldName ( ) ) ;
}
}
2009-04-10 00:50:29 +02:00
lastSavedLocalTs_ = OpTime ( o . getField ( " localLogTs " ) . date ( ) ) ;
2008-12-29 02:28:49 +01:00
}
2009-01-15 16:17:11 +01:00
/* Turn our C++ Source object into a BSONObj */
BSONObj ReplSource : : jsobj ( ) {
BSONObjBuilder b ;
b . append ( " host " , hostName ) ;
b . append ( " source " , sourceName ( ) ) ;
if ( ! only . empty ( ) )
b . append ( " only " , only ) ;
if ( ! syncedTo . isNull ( ) )
2009-10-09 19:10:04 +02:00
b . appendTimestamp ( " syncedTo " , syncedTo . asDate ( ) ) ;
2009-01-15 16:17:11 +01:00
2009-10-09 19:10:04 +02:00
b . appendTimestamp ( " localLogTs " , lastSavedLocalTs_ . asDate ( ) ) ;
2009-04-10 00:50:29 +02:00
2009-03-31 20:05:20 +02:00
BSONObjBuilder dbsNextPassBuilder ;
2009-04-22 19:53:35 +02:00
int n = 0 ;
2009-03-31 20:05:20 +02:00
for ( set < string > : : iterator i = addDbNextPass . begin ( ) ; i ! = addDbNextPass . end ( ) ; i + + ) {
n + + ;
dbsNextPassBuilder . appendBool ( i - > c_str ( ) , 1 ) ;
}
if ( n )
b . append ( " dbsNextPass " , dbsNextPassBuilder . done ( ) ) ;
2009-01-15 16:17:11 +01:00
2009-04-16 17:36:06 +02:00
BSONObjBuilder incompleteCloneDbsBuilder ;
n = 0 ;
for ( set < string > : : iterator i = incompleteCloneDbs . begin ( ) ; i ! = incompleteCloneDbs . end ( ) ; i + + ) {
n + + ;
incompleteCloneDbsBuilder . appendBool ( i - > c_str ( ) , 1 ) ;
}
if ( n )
b . append ( " incompleteCloneDbs " , incompleteCloneDbsBuilder . done ( ) ) ;
2009-02-09 19:04:32 +01:00
return b . obj ( ) ;
2008-12-29 02:28:49 +01:00
}
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
void ReplSource : : save ( ) {
BSONObjBuilder b ;
assert ( ! hostName . empty ( ) ) ;
b . append ( " host " , hostName ) ;
// todo: finish allowing multiple source configs.
// this line doesn't work right when source is null, if that is allowed as it is now:
//b.append("source", _sourceName);
BSONObj pattern = b . done ( ) ;
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
BSONObj o = jsobj ( ) ;
2009-03-17 18:18:54 +01:00
log ( 1 ) < < " Saving repl source: " < < o < < endl ;
2008-11-10 17:20:30 +01:00
2009-01-15 16:17:11 +01:00
stringstream ss ;
setClient ( " local.sources " ) ;
2009-10-21 21:18:21 +02:00
UpdateResult res = updateObjects ( " local.sources " , o , pattern , true /*upsert for pair feature*/ , false , ss , false ) ;
assert ( ! res . mod ) ;
assert ( res . num = = 1 ) ;
2009-10-14 20:34:38 +02:00
cc ( ) . clearns ( ) ;
2009-01-15 16:17:11 +01:00
if ( replacing ) {
/* if we were in "replace" mode, we now have synced up with the replacement,
so turn that off .
*/
replacing = false ;
wassert ( replacePeer ) ;
replacePeer = false ;
2009-01-18 23:48:44 +01:00
Helpers : : emptyCollection ( " local.pair.startup " ) ;
2008-12-29 02:28:49 +01:00
}
}
2009-04-01 22:00:56 +02:00
static void addSourceToList ( ReplSource : : SourceVector & v , ReplSource & s , const BSONObj & spec , ReplSource : : SourceVector & old ) {
2009-04-22 17:57:45 +02:00
if ( ! s . syncedTo . isNull ( ) ) { // Don't reuse old ReplSource if there was a forced resync.
2009-04-01 22:00:56 +02:00
for ( ReplSource : : SourceVector : : iterator i = old . begin ( ) ; i ! = old . end ( ) ; ) {
2009-03-31 17:44:35 +02:00
if ( s = = * * i ) {
v . push_back ( * i ) ;
old . erase ( i ) ;
return ;
}
i + + ;
}
}
2009-04-16 17:36:06 +02:00
2009-04-01 22:00:56 +02:00
v . push_back ( shared_ptr < ReplSource > ( new ReplSource ( s ) ) ) ;
2009-01-15 16:17:11 +01:00
}
/* we reuse our existing objects so that we can keep our existing connection
and cursor in effect .
*/
2009-04-01 22:00:56 +02:00
void ReplSource : : loadAll ( SourceVector & v ) {
SourceVector old = v ;
v . clear ( ) ;
2009-01-15 16:17:11 +01:00
bool gotPairWith = false ;
2009-08-25 20:35:22 +02:00
if ( ! cmdLine . source . empty ( ) ) {
2009-01-15 16:17:11 +01:00
setClient ( " local.sources " ) ;
// --source <host> specified.
// check that no items are in sources other than that
// add if missing
2009-03-19 21:23:04 +01:00
auto_ptr < Cursor > c = findTableScan ( " local.sources " , BSONObj ( ) ) ;
2009-01-15 16:17:11 +01:00
int n = 0 ;
while ( c - > ok ( ) ) {
n + + ;
ReplSource tmp ( c - > current ( ) ) ;
2009-08-25 20:35:22 +02:00
if ( tmp . hostName ! = cmdLine . source ) {
log ( ) < < " E10000 --source " < < cmdLine . source < < " != " < < tmp . hostName < < " from local.sources collection " < < endl ;
2009-02-12 21:03:38 +01:00
log ( ) < < " terminating after 30 seconds " < < endl ;
sleepsecs ( 30 ) ;
2009-08-05 22:00:27 +02:00
dbexit ( EXIT_REPLICATION_ERROR ) ;
2009-02-12 21:03:38 +01:00
}
2009-08-25 20:35:22 +02:00
if ( tmp . only ! = cmdLine . only ) {
log ( ) < < " E10001 --only " < < cmdLine . only < < " != " < < tmp . only < < " from local.sources collection " < < endl ;
2009-01-15 16:17:11 +01:00
log ( ) < < " terminating after 30 seconds " < < endl ;
sleepsecs ( 30 ) ;
2009-08-05 22:00:27 +02:00
dbexit ( EXIT_REPLICATION_ERROR ) ;
2009-01-15 16:17:11 +01:00
}
c - > advance ( ) ;
}
2009-02-12 21:03:38 +01:00
uassert ( " E10002 local.sources collection corrupt? " , n < 2 ) ;
2009-01-15 16:17:11 +01:00
if ( n = = 0 ) {
// source missing. add.
ReplSource s ;
2009-08-25 20:35:22 +02:00
s . hostName = cmdLine . source ;
s . only = cmdLine . only ;
2009-01-15 16:17:11 +01:00
s . save ( ) ;
}
}
2009-03-24 16:46:55 +01:00
else {
try {
2009-08-25 20:35:22 +02:00
massert ( " --only requires use of --source " , cmdLine . only . empty ( ) ) ;
2009-03-24 16:46:55 +01:00
} catch ( . . . ) {
2009-08-07 21:37:50 +02:00
dbexit ( EXIT_BADOPTIONS ) ;
2009-03-24 16:46:55 +01:00
}
2009-02-12 21:03:38 +01:00
}
2009-03-23 16:38:22 +01:00
if ( replPair ) {
const string & remote = replPair - > remote ;
setClient ( " local.sources " ) ;
// --pairwith host specified.
// check that no items are in sources other than that
// add if missing
auto_ptr < Cursor > c = findTableScan ( " local.sources " , BSONObj ( ) ) ;
int n = 0 ;
while ( c - > ok ( ) ) {
n + + ;
ReplSource tmp ( c - > current ( ) ) ;
if ( tmp . hostName ! = remote ) {
log ( ) < < " E10003 pairwith " < < remote < < " != " < < tmp . hostName < < " from local.sources collection " < < endl ;
log ( ) < < " terminating after 30 seconds " < < endl ;
sleepsecs ( 30 ) ;
2009-08-05 22:00:27 +02:00
dbexit ( EXIT_REPLICATION_ERROR ) ;
2009-03-23 16:38:22 +01:00
}
c - > advance ( ) ;
}
uassert ( " E10002 local.sources collection corrupt? " , n < 2 ) ;
if ( n = = 0 ) {
// source missing. add.
ReplSource s ;
s . hostName = remote ;
s . save ( ) ;
}
}
2008-11-10 23:45:39 +01:00
setClient ( " local.sources " ) ;
2009-03-19 21:23:04 +01:00
auto_ptr < Cursor > c = findTableScan ( " local.sources " , BSONObj ( ) ) ;
2008-12-29 02:28:49 +01:00
while ( c - > ok ( ) ) {
2008-11-10 23:45:39 +01:00
ReplSource tmp ( c - > current ( ) ) ;
2009-01-15 16:17:11 +01:00
if ( replPair & & tmp . hostName = = replPair - > remote & & tmp . sourceName ( ) = = " main " ) {
gotPairWith = true ;
tmp . paired = true ;
if ( replacePeer ) {
// peer was replaced -- start back at the beginning.
tmp . syncedTo = OpTime ( ) ;
tmp . replacing = true ;
}
2008-11-10 23:45:39 +01:00
}
2009-03-31 17:44:35 +02:00
addSourceToList ( v , tmp , c - > current ( ) , old ) ;
2008-11-10 23:45:39 +01:00
c - > advance ( ) ;
}
2009-10-14 20:34:38 +02:00
cc ( ) . clearns ( ) ;
2009-01-15 16:17:11 +01:00
if ( ! gotPairWith & & replPair ) {
/* add the --pairwith server */
2009-04-01 22:00:56 +02:00
shared_ptr < ReplSource > s ( new ReplSource ( ) ) ;
2009-01-15 16:17:11 +01:00
s - > paired = true ;
s - > hostName = replPair - > remote ;
s - > replacing = replacePeer ;
v . push_back ( s ) ;
2008-11-10 23:45:39 +01:00
}
}
2009-01-15 16:17:11 +01:00
BSONObj opTimeQuery = fromjson ( " { \" getoptime \" :1} " ) ;
2009-02-02 17:15:24 +01:00
bool ReplSource : : throttledForceResyncDead ( const char * requester ) {
if ( time ( 0 ) - lastForcedResync > 600 ) {
forceResyncDead ( requester ) ;
lastForcedResync = time ( 0 ) ;
return true ;
}
return false ;
}
void ReplSource : : forceResyncDead ( const char * requester ) {
2009-02-04 19:22:02 +01:00
if ( ! replAllDead )
2009-02-02 17:15:24 +01:00
return ;
2009-04-01 22:00:56 +02:00
SourceVector sources ;
2009-02-02 17:15:24 +01:00
ReplSource : : loadAll ( sources ) ;
2009-04-01 22:00:56 +02:00
for ( SourceVector : : iterator i = sources . begin ( ) ; i ! = sources . end ( ) ; + + i ) {
2009-02-02 17:15:24 +01:00
( * i ) - > forceResync ( requester ) ;
}
2009-02-04 19:22:02 +01:00
replAllDead = 0 ;
2009-02-02 17:15:24 +01:00
}
void ReplSource : : forceResync ( const char * requester ) {
2009-03-18 18:45:32 +01:00
BSONObj info ;
{
dbtemprelease t ;
connect ( ) ;
bool ok = conn - > runCommand ( " admin " , BSON ( " listDatabases " < < 1 ) , info ) ;
massert ( " Unable to get database list " , ok ) ;
}
BSONObjIterator i ( info . getField ( " databases " ) . embeddedObject ( ) ) ;
2009-06-09 17:43:04 +02:00
while ( i . moreWithEOO ( ) ) {
2009-03-18 18:45:32 +01:00
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
string name = e . embeddedObject ( ) . getField ( " name " ) . valuestr ( ) ;
2009-05-01 18:18:17 +02:00
if ( ! e . embeddedObject ( ) . getBoolField ( " empty " ) ) {
if ( name ! = " local " ) {
if ( only . empty ( ) | | only = = name ) {
resyncDrop ( name . c_str ( ) , requester ) ;
}
2009-03-18 18:45:32 +01:00
}
}
}
2009-01-29 17:46:45 +01:00
syncedTo = OpTime ( ) ;
2009-04-23 01:10:23 +02:00
addDbNextPass . clear ( ) ;
2009-03-31 16:23:05 +02:00
save ( ) ;
2009-01-29 17:46:45 +01:00
}
2009-03-18 18:45:32 +01:00
string ReplSource : : resyncDrop ( const char * db , const char * requester ) {
log ( ) < < " resync: dropping database " < < db < < endl ;
string dummyns = string ( db ) + " . " ;
2009-10-13 22:01:02 +02:00
setClient ( dummyns . c_str ( ) ) ;
2009-10-14 20:34:38 +02:00
assert ( cc ( ) . database ( ) - > name = = db ) ;
2009-03-18 18:45:32 +01:00
dropDatabase ( dummyns . c_str ( ) ) ;
return dummyns ;
}
2009-01-29 17:46:45 +01:00
2009-08-10 22:57:59 +02:00
/* grab initial copy of a database from the master */
2009-01-15 16:17:11 +01:00
bool ReplSource : : resync ( string db ) {
2009-03-18 18:45:32 +01:00
string dummyNs = resyncDrop ( db . c_str ( ) , " internal " ) ;
2009-10-13 22:01:02 +02:00
setClient ( dummyNs . c_str ( ) ) ;
2009-01-15 16:17:11 +01:00
{
log ( ) < < " resync: cloning database " < < db < < endl ;
ReplInfo r ( " resync: cloning a database " ) ;
string errmsg ;
2009-10-14 20:34:38 +02:00
bool ok = cloneFrom ( hostName . c_str ( ) , errmsg , cc ( ) . database ( ) - > name , false , /*slaveok*/ true , /*replauth*/ true , /*snapshot*/ false ) ;
2009-01-15 16:17:11 +01:00
if ( ! ok ) {
problem ( ) < < " resync of " < < db < < " from " < < hostName < < " failed " < < errmsg < < endl ;
throw SyncException ( ) ;
}
}
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
log ( ) < < " resync: done " < < db < < endl ;
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
return true ;
2008-12-29 02:28:49 +01:00
}
2009-01-23 16:17:29 +01:00
void ReplSource : : applyOperation ( const BSONObj & op ) {
2009-07-30 01:10:34 +02:00
log ( 6 ) < < " applying op: " < < op < < endl ;
2009-01-23 16:17:29 +01:00
stringstream ss ;
BSONObj o = op . getObjectField ( " o " ) ;
const char * ns = op . getStringField ( " ns " ) ;
// operation type -- see logOp() comments for types
const char * opType = op . getStringField ( " op " ) ;
try {
if ( * opType = = ' i ' ) {
const char * p = strchr ( ns , ' . ' ) ;
if ( p & & strcmp ( p , " .system.indexes " ) = = 0 ) {
// updates aren't allowed for indexes -- so we will do a regular insert. if index already
// exists, that is ok.
theDataFileMgr . insert ( ns , ( void * ) o . objdata ( ) , o . objsize ( ) ) ;
}
else {
// do upserts for inserts as we might get replayed more than once
2009-01-26 01:53:51 +01:00
BSONElement _id ;
if ( ! o . getObjectID ( _id ) ) {
/* No _id. This will be very slow. */
2009-02-11 17:28:49 +01:00
Timer t ;
2009-10-21 21:18:21 +02:00
updateObjects ( ns , o , o , true , false , ss , false ) ;
2009-02-11 17:28:49 +01:00
if ( t . millis ( ) > = 2 ) {
2009-03-03 00:39:10 +01:00
RARELY OCCASIONALLY log ( ) < < " warning, repl doing slow updates (no _id field) for " < < ns < < endl ;
2009-02-11 17:28:49 +01:00
}
2009-01-23 16:17:29 +01:00
}
else {
BSONObjBuilder b ;
2009-01-26 01:53:51 +01:00
b . append ( _id ) ;
2009-10-16 16:20:48 +02:00
/* erh 10/16/2009 - this is probably not relevant any more since its auto-created, but not worth removing */
RARELY ensureHaveIdIndex ( ns ) ; // otherwise updates will be slow
2009-10-21 21:18:21 +02:00
updateObjects ( ns , o , b . done ( ) , true , false , ss , false ) ;
2009-01-23 16:17:29 +01:00
}
}
}
else if ( * opType = = ' u ' ) {
RARELY ensureHaveIdIndex ( ns ) ; // otherwise updates will be super slow
2009-10-21 21:18:21 +02:00
updateObjects ( ns , o , op . getObjectField ( " o2 " ) , op . getBoolField ( " b " ) , false , ss , false ) ;
2009-01-23 16:17:29 +01:00
}
else if ( * opType = = ' d ' ) {
if ( opType [ 1 ] = = 0 )
deleteObjects ( ns , o , op . getBoolField ( " b " ) ) ;
else
assert ( opType [ 1 ] = = ' b ' ) ; // "db" advertisement
}
2009-04-22 16:52:32 +02:00
else if ( * opType = = ' n ' ) {
// no op
}
2009-01-23 16:17:29 +01:00
else {
BufBuilder bb ;
BSONObjBuilder ob ;
assert ( * opType = = ' c ' ) ;
2009-02-04 23:24:15 +01:00
_runCommands ( ns , o , ss , bb , ob , true , 0 ) ;
2009-01-23 16:17:29 +01:00
}
}
2009-02-06 22:56:14 +01:00
catch ( UserException & e ) {
2009-05-05 18:52:53 +02:00
log ( ) < < " sync: caught user assertion " < < e < < " while applying op: " < < op < < endl ; ;
}
catch ( DBException & e ) {
log ( ) < < " sync: caught db exception " < < e < < " while applying op: " < < op < < endl ; ;
}
2009-01-23 16:17:29 +01:00
}
2009-01-15 16:17:11 +01:00
/* local.$oplog.main is of the form:
{ ts : . . . , op : < optype > , ns : . . . , o : < obj > , o2 : < extraobj > , b : < boolflag > }
. . .
see logOp ( ) comments .
*/
2009-04-23 20:44:05 +02:00
void ReplSource : : sync_pullOpLog_applyOperation ( BSONObj & op , OpTime * localLogTail ) {
2009-04-23 18:16:18 +02:00
log ( 6 ) < < " processing op: " < < op < < endl ;
2009-04-22 16:52:32 +02:00
// skip no-op
if ( op . getStringField ( " op " ) [ 0 ] = = ' n ' )
return ;
2009-01-15 16:17:11 +01:00
char clientName [ MaxClientLen ] ;
const char * ns = op . getStringField ( " ns " ) ;
nsToClient ( ns , clientName ) ;
if ( * ns = = ' . ' ) {
problem ( ) < < " skipping bad op in oplog: " < < op . toString ( ) < < endl ;
return ;
}
else if ( * ns = = 0 ) {
problem ( ) < < " halting replication, bad op in oplog: \n " < < op . toString ( ) < < endl ;
2009-02-04 19:22:02 +01:00
replAllDead = " bad object in oplog " ;
2008-12-29 02:28:49 +01:00
throw SyncException ( ) ;
}
2009-01-15 16:17:11 +01:00
if ( ! only . empty ( ) & & only ! = clientName )
return ;
2008-12-24 17:11:10 +01:00
2009-01-15 16:17:11 +01:00
dblock lk ;
2009-04-17 19:21:50 +02:00
if ( localLogTail & & replPair & & replPair - > state = = ReplPair : : State_Master ) {
2009-04-24 17:14:29 +02:00
updateSetsWithLocalOps ( * localLogTail , true ) ; // allow unlocking
updateSetsWithLocalOps ( * localLogTail , false ) ; // don't allow unlocking or conversion to db backed storage
2009-04-17 19:21:50 +02:00
}
2009-04-22 19:53:35 +02:00
if ( replAllDead ) {
// hmmm why is this check here and not at top of this function? does it get set between top and here?
log ( ) < < " replAllDead, throwing SyncException \n " ;
throw SyncException ( ) ;
}
2009-04-17 19:21:50 +02:00
2009-01-15 16:17:11 +01:00
bool justCreated ;
try {
2009-10-13 22:01:02 +02:00
justCreated = setClient ( ns ) ;
2009-01-15 16:17:11 +01:00
} catch ( AssertionException & ) {
problem ( ) < < " skipping bad(?) op in oplog, setClient() failed, ns: ' " < < ns < < " ' \n " ;
addDbNextPass . erase ( clientName ) ;
return ;
}
2008-08-25 22:46:39 +02:00
2009-04-23 19:55:07 +02:00
bool empty = clientIsEmpty ( ) ;
2009-04-16 17:36:06 +02:00
bool incompleteClone = incompleteCloneDbs . count ( clientName ) ! = 0 ;
2009-04-22 19:53:35 +02:00
2009-04-23 19:55:07 +02:00
log ( 6 ) < < " ns: " < < ns < < " , justCreated: " < < justCreated < < " , empty: " < < empty < < " , incompleteClone: " < < incompleteClone < < endl ;
2009-07-30 01:10:34 +02:00
// always apply admin command command
// this is a bit hacky -- the semantics of replication/commands aren't well specified
if ( strcmp ( clientName , " admin " ) = = 0 & & * op . getStringField ( " op " ) = = ' c ' ) {
applyOperation ( op ) ;
2009-10-14 20:34:38 +02:00
cc ( ) . clearns ( ) ;
2009-07-30 01:10:34 +02:00
return ;
}
2009-04-23 18:16:18 +02:00
2009-04-23 19:55:07 +02:00
if ( justCreated | | empty | | incompleteClone ) {
2009-04-23 01:10:23 +02:00
// we must add to incomplete list now that setClient has been called
incompleteCloneDbs . insert ( clientName ) ;
2009-04-22 19:53:35 +02:00
if ( nClonedThisPass ) {
/* we only clone one database per pass, even if a lot need done. This helps us
avoid overflowing the master ' s transaction log by doing too much work before going
back to read more transactions . ( Imagine a scenario of slave startup where we try to
clone 100 databases in one pass . )
*/
addDbNextPass . insert ( clientName ) ;
} else {
2009-04-29 23:28:14 +02:00
if ( incompleteClone ) {
log ( ) < < " An earlier initial clone of ' " < < clientName < < " ' did not complete, now resyncing. " < < endl ;
}
2009-04-22 19:53:35 +02:00
save ( ) ;
2009-10-13 22:01:02 +02:00
setClient ( ns ) ;
2009-04-22 19:53:35 +02:00
nClonedThisPass + + ;
2009-10-14 20:34:38 +02:00
resync ( cc ( ) . database ( ) - > name ) ;
2009-04-22 19:53:35 +02:00
addDbNextPass . erase ( clientName ) ;
incompleteCloneDbs . erase ( clientName ) ;
}
2009-04-16 17:36:06 +02:00
save ( ) ;
2009-03-17 22:20:08 +01:00
} else {
2009-04-10 00:50:29 +02:00
bool mod ;
2009-04-25 00:14:35 +02:00
if ( replPair & & replPair - > state = = ReplPair : : State_Master ) {
BSONObj id = idForOp ( op , mod ) ;
if ( ! idTracker . haveId ( ns , id ) ) {
applyOperation ( op ) ;
} else if ( idTracker . haveModId ( ns , id ) ) {
2009-05-13 22:34:35 +02:00
log ( 6 ) < < " skipping operation matching mod id object " < < op < < endl ;
2009-04-25 00:14:35 +02:00
BSONObj existing ;
if ( Helpers : : findOne ( ns , id , existing ) )
logOp ( " i " , ns , existing ) ;
2009-05-13 22:34:35 +02:00
} else {
log ( 6 ) < < " skipping operation matching changed id object " < < op < < endl ;
2009-04-25 00:14:35 +02:00
}
} else {
applyOperation ( op ) ;
2009-04-10 00:50:29 +02:00
}
2009-03-31 20:05:20 +02:00
addDbNextPass . erase ( clientName ) ;
2009-03-17 22:20:08 +01:00
}
2009-10-14 20:34:38 +02:00
cc ( ) . clearns ( ) ;
2009-01-15 16:17:11 +01:00
}
2009-04-10 00:50:29 +02:00
BSONObj ReplSource : : idForOp ( const BSONObj & op , bool & mod ) {
mod = false ;
const char * opType = op . getStringField ( " op " ) ;
BSONObj o = op . getObjectField ( " o " ) ;
switch ( opType [ 0 ] ) {
case ' i ' : {
BSONObjBuilder idBuilder ;
BSONElement id ;
if ( ! o . getObjectID ( id ) )
return BSONObj ( ) ;
idBuilder . append ( id ) ;
return idBuilder . obj ( ) ;
}
case ' u ' : {
BSONObj o2 = op . getObjectField ( " o2 " ) ;
if ( strcmp ( o2 . firstElement ( ) . fieldName ( ) , " _id " ) ! = 0 )
return BSONObj ( ) ;
if ( o . firstElement ( ) . fieldName ( ) [ 0 ] = = ' $ ' )
mod = true ;
return o2 ;
}
case ' d ' : {
if ( opType [ 1 ] ! = ' \0 ' )
return BSONObj ( ) ; // skip "db" op type
return o ;
}
default :
break ;
}
return BSONObj ( ) ;
}
2009-04-24 17:14:29 +02:00
void ReplSource : : updateSetsWithOp ( const BSONObj & op , bool mayUnlock ) {
if ( mayUnlock ) {
2009-04-27 16:31:32 +02:00
idTracker . mayUpgradeStorage ( ) ;
2009-04-24 17:14:29 +02:00
}
2009-04-17 19:21:50 +02:00
bool mod ;
BSONObj id = idForOp ( op , mod ) ;
if ( ! id . isEmpty ( ) ) {
const char * ns = op . getStringField ( " ns " ) ;
2009-05-19 23:30:11 +02:00
// Since our range of local ops may not be the same as our peer's
// range of unapplied ops, it is always necessary to rewrite objects
// to the oplog after a mod update.
if ( mod )
idTracker . haveModId ( ns , id , true ) ;
2009-04-23 21:00:40 +02:00
idTracker . haveId ( ns , id , true ) ;
2009-04-17 19:21:50 +02:00
}
}
2009-04-22 23:44:23 +02:00
void ReplSource : : syncToTailOfRemoteLog ( ) {
string _ns = ns ( ) ;
BSONObj last = conn - > findOne ( _ns . c_str ( ) , Query ( ) . sort ( BSON ( " $natural " < < - 1 ) ) ) ;
if ( ! last . isEmpty ( ) ) {
BSONElement ts = last . findElement ( " ts " ) ;
2009-10-09 19:10:04 +02:00
massert ( " non Date ts found " , ts . type ( ) = = Date | | ts . type ( ) = = Timestamp ) ;
2009-04-22 23:44:23 +02:00
syncedTo = OpTime ( ts . date ( ) ) ;
}
}
2009-04-27 21:30:17 +02:00
OpTime ReplSource : : nextLastSavedLocalTs ( ) const {
2009-04-22 23:44:23 +02:00
setClient ( " local.oplog.$main " ) ;
auto_ptr < Cursor > c = findTableScan ( " local.oplog.$main " , BSON ( " $natural " < < - 1 ) ) ;
if ( c - > ok ( ) )
2009-04-27 21:30:17 +02:00
return OpTime ( c - > current ( ) . getField ( " ts " ) . date ( ) ) ;
return OpTime ( ) ;
}
void ReplSource : : setLastSavedLocalTs ( const OpTime & nextLocalTs ) {
lastSavedLocalTs_ = nextLocalTs ;
2009-04-23 20:44:05 +02:00
log ( 3 ) < < " updated lastSavedLocalTs_ to: " < < lastSavedLocalTs_ < < endl ;
2009-04-22 23:44:23 +02:00
}
void ReplSource : : resetSlave ( ) {
2009-04-27 21:32:27 +02:00
massert ( " request to kill slave replication falied " ,
conn - > simpleCommand ( " admin " , 0 , " forcedead " ) ) ;
2009-04-22 23:44:23 +02:00
syncToTailOfRemoteLog ( ) ;
{
dblock lk ;
2009-04-27 21:30:17 +02:00
setLastSavedLocalTs ( nextLastSavedLocalTs ( ) ) ;
2009-04-22 23:44:23 +02:00
save ( ) ;
cursor . reset ( ) ;
}
}
2009-04-24 17:14:29 +02:00
bool ReplSource : : updateSetsWithLocalOps ( OpTime & localLogTail , bool mayUnlock ) {
2009-04-22 23:44:23 +02:00
setClient ( " local.oplog.$main " ) ;
auto_ptr < Cursor > localLog = findTableScan ( " local.oplog.$main " , BSON ( " $natural " < < - 1 ) ) ;
2009-04-24 20:35:13 +02:00
OpTime newTail ;
2009-04-22 23:44:23 +02:00
for ( ; localLog - > ok ( ) ; localLog - > advance ( ) ) {
BSONObj op = localLog - > current ( ) ;
OpTime ts ( localLog - > current ( ) . getField ( " ts " ) . date ( ) ) ;
2009-04-24 20:35:13 +02:00
if ( newTail . isNull ( ) ) {
newTail = ts ;
2009-04-22 23:44:23 +02:00
}
2009-04-24 20:35:13 +02:00
if ( ! ( localLogTail < ts ) )
2009-04-22 23:44:23 +02:00
break ;
2009-04-24 17:14:29 +02:00
updateSetsWithOp ( op , mayUnlock ) ;
if ( mayUnlock ) {
2009-04-27 16:31:32 +02:00
RARELY {
2009-04-24 20:35:13 +02:00
dbtemprelease t ;
}
2009-04-22 23:44:23 +02:00
}
}
2009-05-01 19:14:37 +02:00
if ( ! localLogTail . isNull ( ) & & ! localLog - > ok ( ) ) {
2009-04-22 23:52:33 +02:00
// local log filled up
2009-04-23 21:00:40 +02:00
idTracker . reset ( ) ;
2009-04-23 22:00:57 +02:00
dbtemprelease t ;
2009-04-22 23:44:23 +02:00
resetSlave ( ) ;
2009-04-27 21:11:43 +02:00
massert ( " local master log filled, forcing slave resync " , false ) ;
2009-04-22 23:44:23 +02:00
}
2009-04-24 20:35:13 +02:00
if ( ! newTail . isNull ( ) )
localLogTail = newTail ;
2009-04-22 23:44:23 +02:00
return true ;
}
2009-01-15 16:17:11 +01:00
/* note: not yet in mutex at this point. */
2009-09-22 16:10:02 +02:00
bool ReplSource : : sync_pullOpLog ( int & nApplied ) {
2009-01-15 16:17:11 +01:00
string ns = string ( " local.oplog.$ " ) + sourceName ( ) ;
2009-01-24 00:24:15 +01:00
log ( 2 ) < < " repl: sync_pullOpLog " < < ns < < " syncedTo: " < < syncedTo . toStringLong ( ) < < ' \n ' ;
2009-01-15 16:17:11 +01:00
bool tailing = true ;
DBClientCursor * c = cursor . get ( ) ;
if ( c & & c - > isDead ( ) ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: old cursor isDead, initiating a new one \n " ;
2009-01-15 16:17:11 +01:00
c = 0 ;
}
2009-05-01 19:14:37 +02:00
if ( replPair & & replPair - > state = = ReplPair : : State_Master ) {
2009-04-23 22:00:57 +02:00
dblock lk ;
idTracker . reset ( ) ;
}
2009-04-24 20:35:13 +02:00
OpTime localLogTail = lastSavedLocalTs_ ;
2009-04-16 17:36:06 +02:00
2009-03-30 22:28:52 +02:00
bool initial = syncedTo . isNull ( ) ;
2009-04-10 23:03:45 +02:00
2009-04-23 01:10:23 +02:00
if ( c = = 0 | | initial ) {
if ( initial ) {
2009-03-31 17:44:35 +02:00
// Important to grab last oplog timestamp before listing databases.
2009-04-22 23:44:23 +02:00
syncToTailOfRemoteLog ( ) ;
2009-04-16 17:36:06 +02:00
BSONObj info ;
bool ok = conn - > runCommand ( " admin " , BSON ( " listDatabases " < < 1 ) , info ) ;
massert ( " Unable to get database list " , ok ) ;
BSONObjIterator i ( info . getField ( " databases " ) . embeddedObject ( ) ) ;
2009-06-09 17:43:04 +02:00
while ( i . moreWithEOO ( ) ) {
2009-04-16 17:36:06 +02:00
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
string name = e . embeddedObject ( ) . getField ( " name " ) . valuestr ( ) ;
2009-05-01 18:18:17 +02:00
if ( ! e . embeddedObject ( ) . getBoolField ( " empty " ) ) {
if ( name ! = " local " ) {
if ( only . empty ( ) | | only = = name ) {
log ( 2 ) < < " adding to 'addDbNextPass': " < < name < < endl ;
addDbNextPass . insert ( name ) ;
}
2009-03-17 18:18:54 +01:00
}
}
}
2009-04-22 22:25:53 +02:00
dblock lk ;
save ( ) ;
2009-03-17 18:18:54 +01:00
}
2009-04-16 17:36:06 +02:00
2009-01-15 16:17:11 +01:00
BSONObjBuilder q ;
q . appendDate ( " $gte " , syncedTo . asDate ( ) ) ;
BSONObjBuilder query ;
query . append ( " ts " , q . done ( ) ) ;
2009-02-12 21:03:38 +01:00
if ( ! only . empty ( ) ) {
// note we may here skip a LOT of data table scanning, a lot of work for the master.
query . appendRegex ( " ns " , string ( " ^ " ) + only ) ;
}
2009-01-15 16:17:11 +01:00
BSONObj queryObj = query . done ( ) ;
// queryObj = { ts: { $gte: syncedTo } }
2009-01-24 00:24:15 +01:00
log ( 2 ) < < " repl: " < < ns < < " .find( " < < queryObj . toString ( ) < < ' ) ' < < ' \n ' ;
2009-03-18 22:24:10 +01:00
cursor = conn - > query ( ns . c_str ( ) , queryObj , 0 , 0 , 0 , Option_CursorTailable | Option_SlaveOk | Option_OplogReplay ) ;
2009-01-15 16:17:11 +01:00
c = cursor . get ( ) ;
tailing = false ;
2008-12-29 02:28:49 +01:00
}
else {
2009-01-24 00:24:15 +01:00
log ( 2 ) < < " repl: tailing=true \n " ;
2008-12-29 02:28:49 +01:00
}
2008-08-19 20:39:44 +02:00
2009-01-15 16:17:11 +01:00
if ( c = = 0 ) {
2009-09-22 16:10:02 +02:00
problem ( ) < < " repl: dbclient::query returns null (conn closed?) " < < endl ;
2009-01-15 16:17:11 +01:00
resetConnection ( ) ;
return false ;
}
2008-09-05 16:40:00 +02:00
2009-01-15 16:17:11 +01:00
// show any deferred database creates from a previous pass
{
set < string > : : iterator i = addDbNextPass . begin ( ) ;
if ( i ! = addDbNextPass . end ( ) ) {
BSONObjBuilder b ;
b . append ( " ns " , * i + ' . ' ) ;
b . append ( " op " , " db " ) ;
BSONObj op = b . done ( ) ;
2009-04-23 20:44:05 +02:00
sync_pullOpLog_applyOperation ( op , 0 ) ;
2009-01-15 16:17:11 +01:00
}
2008-12-29 02:28:49 +01:00
}
2009-01-15 16:17:11 +01:00
if ( ! c - > more ( ) ) {
if ( tailing ) {
2009-01-24 00:24:15 +01:00
log ( 2 ) < < " repl: tailing & no new activity \n " ;
2009-04-23 20:44:05 +02:00
} else {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: " < < ns < < " oplog is empty \n " ;
2009-04-23 20:44:05 +02:00
}
{
dblock lk ;
2009-04-27 21:30:17 +02:00
OpTime nextLastSaved = nextLastSavedLocalTs ( ) ;
{
dbtemprelease t ;
if ( ! c - > more ( ) ) {
setLastSavedLocalTs ( nextLastSaved ) ;
}
}
2009-04-23 20:44:05 +02:00
save ( ) ;
}
2009-01-15 16:17:11 +01:00
return true ;
}
2008-08-02 20:58:15 +02:00
2009-04-22 16:52:32 +02:00
int n = 0 ;
BSONObj op = c - > next ( ) ;
BSONElement ts = op . findElement ( " ts " ) ;
2009-10-09 19:10:04 +02:00
if ( ts . type ( ) ! = Date & & ts . type ( ) ! = Timestamp ) {
2009-04-22 16:52:32 +02:00
string err = op . getStringField ( " $err " ) ;
if ( ! err . empty ( ) ) {
2009-09-22 16:10:02 +02:00
problem ( ) < < " repl: $err reading remote oplog: " + err < < ' \n ' ;
2009-04-22 16:52:32 +02:00
massert ( " got $err reading remote oplog " , false ) ;
}
else {
2009-09-22 16:10:02 +02:00
problem ( ) < < " repl: bad object read from remote oplog: " < < op . toString ( ) < < ' \n ' ;
massert ( " repl: bad object read from remote oplog " , false ) ;
2009-04-22 16:52:32 +02:00
}
}
2009-04-10 23:55:55 +02:00
if ( replPair & & replPair - > state = = ReplPair : : State_Master ) {
2009-04-22 16:52:32 +02:00
OpTime nextOpTime ( ts . date ( ) ) ;
2009-04-22 23:44:23 +02:00
if ( ! tailing & & ! initial & & nextOpTime ! = syncedTo ) {
log ( ) < < " remote slave log filled, forcing slave resync " < < endl ;
resetSlave ( ) ;
2009-04-22 16:52:32 +02:00
return true ;
}
2009-04-10 23:03:45 +02:00
dblock lk ;
2009-04-27 21:11:43 +02:00
updateSetsWithLocalOps ( localLogTail , true ) ;
2008-12-10 17:32:56 +01:00
}
2009-04-22 16:52:32 +02:00
2009-01-15 16:17:11 +01:00
OpTime nextOpTime ( ts . date ( ) ) ;
2009-01-24 00:24:15 +01:00
log ( 2 ) < < " repl: first op time received: " < < nextOpTime . toString ( ) < < ' \n ' ;
2009-03-30 22:28:52 +02:00
if ( tailing | | initial ) {
if ( initial )
2009-09-22 16:10:02 +02:00
log ( 1 ) < < " repl: initial run \n " ;
2009-03-30 22:28:52 +02:00
else
assert ( syncedTo < nextOpTime ) ;
2009-04-23 20:44:05 +02:00
sync_pullOpLog_applyOperation ( op , & localLogTail ) ;
2009-03-17 18:18:54 +01:00
n + + ;
2008-12-10 17:32:56 +01:00
}
2009-01-15 16:17:11 +01:00
else if ( nextOpTime ! = syncedTo ) {
2009-01-20 20:30:59 +01:00
Nullstream & l = log ( ) ;
2009-09-22 16:10:02 +02:00
l < < " repl: nextOpTime " < < nextOpTime . toStringLong ( ) < < ' ' ;
2009-01-15 16:17:11 +01:00
if ( nextOpTime < syncedTo )
l < < " <?? " ;
else
l < < " > " ;
l < < " syncedTo " < < syncedTo . toStringLong ( ) < < ' \n ' ;
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: time diff: " < < ( nextOpTime . getSecs ( ) - syncedTo . getSecs ( ) ) < < " sec \n " ;
log ( ) < < " repl: tailing: " < < tailing < < ' \n ' ;
log ( ) < < " repl: data too stale, halting replication " < < endl ;
2009-02-04 19:22:02 +01:00
replInfo = replAllDead = " data too stale halted replication " ;
2008-12-29 02:28:49 +01:00
assert ( syncedTo < nextOpTime ) ;
2009-01-15 16:17:11 +01:00
throw SyncException ( ) ;
2008-12-29 02:28:49 +01:00
}
else {
2009-03-30 22:28:52 +02:00
/* t == syncedTo, so the first op was applied previously. */
2008-12-29 02:28:49 +01:00
}
2009-01-15 16:17:11 +01:00
// apply operations
2008-08-02 20:58:15 +02:00
{
2009-01-22 00:48:37 +01:00
time_t saveLast = time ( 0 ) ;
2009-01-15 16:17:11 +01:00
while ( 1 ) {
2009-10-19 19:29:12 +02:00
/* from a.s.:
I think the idea here is that we can establish a sync point between the local op log and the remote log with the following steps :
1 ) identify most recent op in local log - - call it O
2 ) ask " does nextOpTime reflect the tail of the remote op log? " ( in other words , is more ( ) false ? ) - If yes , all subsequent ops after nextOpTime in the remote log must have occurred after O . If no , we can ' t establish a sync point .
Note that we can ' t do step ( 2 ) followed by step ( 1 ) because if we do so ops may be added to both machines between steps ( 2 ) and ( 1 ) and we can ' t establish a sync point . ( In particular , between ( 2 ) and ( 1 ) an op may be added to the remote log before a different op is added to the local log . In this case , the newest remote op will have occurred after nextOpTime but before O . )
Now , for performance reasons we don ' t want to have to identify the most recent op in the local log every time we call c - > more ( ) because in performance sensitive situations more ( ) will be true most of the time . So we do :
0 ) more ( ) ?
1 ) find most recent op in local log
2 ) more ( ) ?
*/
2009-01-15 16:17:11 +01:00
if ( ! c - > more ( ) ) {
2009-04-27 21:30:17 +02:00
dblock lk ;
2009-10-14 17:01:29 +02:00
OpTime nextLastSaved = nextLastSavedLocalTs ( ) ; // this may make c->more() become true
2009-04-27 21:30:17 +02:00
{
dbtemprelease t ;
2009-10-13 23:13:51 +02:00
if ( c - > more ( ) ) {
2009-04-27 21:30:17 +02:00
continue ;
2009-10-13 23:13:51 +02:00
} else {
2009-04-27 21:30:17 +02:00
setLastSavedLocalTs ( nextLastSaved ) ;
}
}
2009-01-15 16:17:11 +01:00
syncedTo = nextOpTime ;
save ( ) ; // note how far we are synced up to now
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: applied " < < n < < " operations " < < endl ;
nApplied = n ;
2009-05-28 21:37:45 +02:00
log ( ) < < " repl: end sync_pullOpLog syncedTo: " < < syncedTo . toStringLong ( ) < < endl ;
2009-01-15 16:17:11 +01:00
break ;
}
2009-01-22 00:48:37 +01:00
2009-10-13 22:01:02 +02:00
OCCASIONALLY if ( n > 100000 | | time ( 0 ) - saveLast > 60 ) {
2009-01-22 00:48:37 +01:00
// periodically note our progress, in case we are doing a lot of work and crash
dblock lk ;
2009-05-21 20:30:41 +02:00
syncedTo = nextOpTime ;
// can't update local log ts since there are pending operations from our peer
2009-01-22 00:48:37 +01:00
save ( ) ;
2009-10-13 22:01:02 +02:00
log ( ) < < " repl: checkpoint applied " < < n < < " operations " < < endl ;
log ( ) < < " repl: syncedTo: " < < syncedTo . toStringLong ( ) < < endl ;
2009-01-22 00:48:37 +01:00
saveLast = time ( 0 ) ;
2009-05-21 20:30:41 +02:00
n = 0 ;
2009-01-22 00:48:37 +01:00
}
2009-01-15 16:17:11 +01:00
BSONObj op = c - > next ( ) ;
ts = op . findElement ( " ts " ) ;
2009-10-09 19:10:04 +02:00
assert ( ts . type ( ) = = Date | | ts . type ( ) = = Timestamp ) ;
2009-01-15 16:17:11 +01:00
OpTime last = nextOpTime ;
OpTime tmp ( ts . date ( ) ) ;
nextOpTime = tmp ;
if ( ! ( last < nextOpTime ) ) {
problem ( ) < < " sync error: last " < < last . toString ( ) < < " >= nextOpTime " < < nextOpTime . toString ( ) < < endl ;
uassert ( " bad 'ts' value in sources " , false ) ;
}
2008-08-02 20:58:15 +02:00
2009-04-23 20:44:05 +02:00
sync_pullOpLog_applyOperation ( op , & localLogTail ) ;
2009-01-15 16:17:11 +01:00
n + + ;
2008-12-29 02:28:49 +01:00
}
}
2008-12-15 23:23:54 +01:00
2009-01-15 16:17:11 +01:00
return true ;
2008-08-02 20:58:15 +02:00
}
2009-01-24 00:24:15 +01:00
BSONObj userReplQuery = fromjson ( " { \" user \" : \" repl \" } " ) ;
bool replAuthenticate ( DBClientConnection * conn ) {
2009-10-12 21:12:16 +02:00
AuthenticationInfo * ai = currentClient . get ( ) - > ai ;
2009-10-09 20:59:44 +02:00
if ( ! ai - > isAuthorized ( " admin " ) ) {
2009-01-24 00:24:15 +01:00
log ( ) < < " replauthenticate: requires admin permissions, failing \n " ;
return false ;
}
BSONObj user ;
{
dblock lk ;
DBContext ctxt ( " local. " ) ;
if ( ! Helpers : : findOne ( " local.system.users " , userReplQuery , user ) ) {
// try the first user is local
if ( ! Helpers : : getSingleton ( " local.system.users " , user ) ) {
if ( noauth )
return true ; // presumably we are running a --noauth setup all around.
log ( ) < < " replauthenticate: no user in local.system.users to use for authentication \n " ;
return false ;
}
}
}
string u = user . getStringField ( " user " ) ;
string p = user . getStringField ( " pwd " ) ;
massert ( " bad user object? [1] " , ! u . empty ( ) ) ;
massert ( " bad user object? [2] " , ! p . empty ( ) ) ;
string err ;
2009-01-25 03:25:55 +01:00
if ( ! conn - > auth ( " local " , u . c_str ( ) , p . c_str ( ) , err , false ) ) {
2009-01-24 00:24:15 +01:00
log ( ) < < " replauthenticate: can't authenticate to master server, user: " < < u < < endl ;
return false ;
}
return true ;
}
2009-03-18 18:45:32 +01:00
bool ReplSource : : connect ( ) {
if ( conn . get ( ) = = 0 ) {
conn = auto_ptr < DBClientConnection > ( new DBClientConnection ( ) ) ;
string errmsg ;
ReplInfo r ( " trying to connect to sync source " ) ;
if ( ! conn - > connect ( hostName . c_str ( ) , errmsg ) | | ! replAuthenticate ( conn . get ( ) ) ) {
resetConnection ( ) ;
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: " < < errmsg < < endl ;
2009-03-18 18:45:32 +01:00
return false ;
}
}
return true ;
}
2009-01-15 16:17:11 +01:00
/* note: not yet in mutex at this point.
returns true if everything happy . return false if you want to reconnect .
*/
2009-09-22 16:10:02 +02:00
bool ReplSource : : sync ( int & nApplied ) {
2009-01-15 16:17:11 +01:00
ReplInfo r ( " sync " ) ;
2009-08-25 20:35:22 +02:00
if ( ! cmdLine . quiet )
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: " < < sourceName ( ) < < ' @ ' < < hostName < < endl ;
2009-01-15 16:17:11 +01:00
nClonedThisPass = 0 ;
2009-02-02 22:18:44 +01:00
// FIXME Handle cases where this db isn't on default port, or default port is spec'd in hostName.
2009-08-25 16:24:44 +02:00
if ( ( string ( " localhost " ) = = hostName | | string ( " 127.0.0.1 " ) = = hostName ) & & cmdLine . port = = CmdLine : : DefaultDBPort ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: can't sync from self (localhost). sources configuration may be wrong. " < < endl ;
2009-01-15 16:17:11 +01:00
sleepsecs ( 5 ) ;
2008-12-29 02:28:49 +01:00
return false ;
}
2008-07-28 19:51:39 +02:00
2009-03-18 18:45:32 +01:00
if ( ! connect ( ) ) {
if ( replPair & & paired ) {
assert ( startsWith ( hostName . c_str ( ) , replPair - > remoteHost . c_str ( ) ) ) ;
replPair - > arbitrate ( ) ;
2009-01-15 16:17:11 +01:00
}
2009-03-18 18:45:32 +01:00
{
2009-09-22 16:10:02 +02:00
ReplInfo r ( " can't connect to sync source " ) ;
2009-03-18 18:45:32 +01:00
}
return false ;
2009-01-15 16:17:11 +01:00
}
2009-03-18 18:45:32 +01:00
2009-04-10 23:03:45 +02:00
if ( paired ) {
int remote = replPair - > negotiate ( conn . get ( ) , " direct " ) ;
int nMasters = ( remote = = ReplPair : : State_Master ) + ( replPair - > state = = ReplPair : : State_Master ) ;
if ( getInitialSyncCompleted ( ) & & nMasters ! = 1 ) {
log ( ) < < ( nMasters = = 0 ? " no master " : " two masters " ) < < " , deferring oplog pull " < < endl ;
return true ;
}
}
2009-01-15 16:17:11 +01:00
/*
// get current mtime at the server.
BSONObj o = conn - > findOne ( " admin.$cmd " , opTimeQuery ) ;
BSONElement e = o . findElement ( " optime " ) ;
if ( e . eoo ( ) ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: failed to get cur optime from master " < < endl ;
2009-01-15 16:17:11 +01:00
log ( ) < < " " < < o . toString ( ) < < endl ;
return false ;
}
uassert ( e . type ( ) = = Date ) ;
OpTime serverCurTime ;
serverCurTime . asDate ( ) = e . date ( ) ;
*/
2009-09-22 16:10:02 +02:00
return sync_pullOpLog ( nApplied ) ;
2009-01-15 16:17:11 +01:00
}
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
/* -- Logging of operations -------------------------------------*/
2008-08-02 20:58:15 +02:00
2009-07-29 21:53:14 +02:00
// cached copies of these...so don't rename them
2009-01-15 16:17:11 +01:00
NamespaceDetails * localOplogMainDetails = 0 ;
Database * localOplogClient = 0 ;
2009-03-19 21:23:04 +01:00
void logOp ( const char * opstr , const char * ns , const BSONObj & obj , BSONObj * patt , bool * b ) {
2009-04-10 00:50:29 +02:00
if ( master ) {
_logOp ( opstr , ns , " local.oplog.$main " , obj , patt , b , OpTime : : now ( ) ) ;
char cl [ 256 ] ;
nsToClient ( ns , cl ) ;
}
2009-03-04 21:57:35 +01:00
NamespaceDetailsTransient & t = NamespaceDetailsTransient : : get ( ns ) ;
2009-03-06 17:08:30 +01:00
if ( t . logValid ( ) ) {
try {
2009-04-10 00:50:29 +02:00
_logOp ( opstr , ns , t . logNS ( ) . c_str ( ) , obj , patt , b , OpTime : : now ( ) ) ;
2009-03-06 17:08:30 +01:00
} catch ( const DBException & ) {
t . invalidateLog ( ) ;
}
}
2009-03-04 21:57:35 +01:00
}
2009-01-15 16:17:11 +01:00
/* we write to local.opload.$main:
{ ts : . . . , op : . . . , ns : . . . , o : . . . }
ts : an OpTime timestamp
op :
" i " insert
" u " update
" d " delete
" c " db cmd
" db " declares presence of a database ( ns is set to the db name + ' . ' )
2009-04-22 16:52:32 +02:00
" n " no op
2009-01-15 16:17:11 +01:00
bb :
if not null , specifies a boolean to pass along to the other side as b : param .
used for " justOne " or " upsert " flags on ' d ' , ' u '
first : true
when set , indicates this is the first thing we have logged for this database .
thus , the slave does not need to copy down all the data when it sees this .
2008-08-02 20:58:15 +02:00
*/
2009-04-10 00:50:29 +02:00
void _logOp ( const char * opstr , const char * ns , const char * logNS , const BSONObj & obj , BSONObj * o2 , bool * bb , const OpTime & ts ) {
2009-01-15 16:17:11 +01:00
if ( strncmp ( ns , " local. " , 6 ) = = 0 )
return ;
2008-08-02 20:58:15 +02:00
2009-10-14 22:29:32 +02:00
DEV assertInWriteLock ( ) ;
2009-10-14 20:34:38 +02:00
DBContext context ;
2009-01-15 16:17:11 +01:00
/* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
instead we do a single copy to the destination position in the memory mapped file .
*/
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
BSONObjBuilder b ;
2009-10-09 19:10:04 +02:00
b . appendTimestamp ( " ts " , ts . asDate ( ) ) ;
2009-01-15 16:17:11 +01:00
b . append ( " op " , opstr ) ;
b . append ( " ns " , ns ) ;
if ( bb )
b . appendBool ( " b " , * bb ) ;
if ( o2 )
b . append ( " o2 " , * o2 ) ;
BSONObj partial = b . done ( ) ;
int posz = partial . objsize ( ) ;
int len = posz + obj . objsize ( ) + 1 + 2 /*o:*/ ;
2009-03-06 01:58:07 +01:00
Record * r ;
if ( strncmp ( logNS , " local. " , 6 ) = = 0 ) { // For now, assume this is olog main
if ( localOplogMainDetails = = 0 ) {
2009-10-13 22:01:02 +02:00
setClient ( " local. " ) ;
2009-10-14 20:34:38 +02:00
localOplogClient = cc ( ) . database ( ) ;
2009-03-06 01:58:07 +01:00
localOplogMainDetails = nsdetails ( logNS ) ;
}
2009-10-14 20:34:38 +02:00
cc ( ) . setns ( " " , localOplogClient ) ; // database = localOplogClient;
2009-03-06 01:58:07 +01:00
r = theDataFileMgr . fast_oplog_insert ( localOplogMainDetails , logNS , len ) ;
} else {
setClient ( logNS ) ;
assert ( nsdetails ( logNS ) ) ;
r = theDataFileMgr . fast_oplog_insert ( nsdetails ( logNS ) , logNS , len ) ;
2009-01-15 16:17:11 +01:00
}
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
char * p = r - > data ;
memcpy ( p , partial . objdata ( ) , posz ) ;
* ( ( unsigned * ) p ) + = obj . objsize ( ) + 1 + 2 ;
p + = posz - 1 ;
* p + + = ( char ) Object ;
* p + + = ' o ' ;
* p + + = 0 ;
memcpy ( p , obj . objdata ( ) , obj . objsize ( ) ) ;
p + = obj . objsize ( ) ;
* p = EOO ;
2009-03-03 00:16:27 +01:00
2009-05-21 17:07:11 +02:00
if ( logLevel > = 6 ) {
BSONObj temp ( r ) ;
log ( 6 ) < < " logging op: " < < temp < < endl ;
}
2008-12-15 23:23:54 +01:00
}
2008-12-29 02:28:49 +01:00
2009-01-15 16:17:11 +01:00
/* --------------------------------------------------------------*/
2008-09-11 21:13:47 +02:00
2009-01-15 16:17:11 +01:00
/*
TODO :
_ source has autoptr to the cursor
_ reuse that cursor when we can
*/
2008-08-02 20:58:15 +02:00
2009-09-22 16:10:02 +02:00
/* returns: # of seconds to sleep before next pass
0 = no sleep recommended
1 = special sentinel indicating adaptive sleep recommended
*/
int _replMain ( ReplSource : : SourceVector & sources , int & nApplied ) {
2008-12-15 23:23:54 +01:00
{
2009-01-15 16:17:11 +01:00
ReplInfo r ( " replMain load sources " ) ;
2008-12-15 23:23:54 +01:00
dblock lk ;
2009-01-15 16:17:11 +01:00
ReplSource : : loadAll ( sources ) ;
2008-12-15 23:23:54 +01:00
}
2009-01-15 16:17:11 +01:00
if ( sources . empty ( ) ) {
/* replication is not configured yet (for --slave) in local.sources. Poll for config it
every 20 seconds .
*/
return 20 ;
2008-12-29 02:28:49 +01:00
}
2009-01-15 16:17:11 +01:00
2009-09-22 16:10:02 +02:00
int sleepAdvice = 1 ;
2009-04-01 22:00:56 +02:00
for ( ReplSource : : SourceVector : : iterator i = sources . begin ( ) ; i ! = sources . end ( ) ; i + + ) {
ReplSource * s = i - > get ( ) ;
2009-01-15 16:17:11 +01:00
bool ok = false ;
try {
2009-09-22 16:10:02 +02:00
ok = s - > sync ( nApplied ) ;
2009-01-15 16:17:11 +01:00
bool moreToSync = s - > haveMoreDbsToSync ( ) ;
2009-09-22 16:10:02 +02:00
if ( ! ok ) {
sleepAdvice = 3 ;
}
else if ( moreToSync ) {
sleepAdvice = 0 ;
}
2009-01-15 16:17:11 +01:00
if ( ok & & ! moreToSync /*&& !s->syncedTo.isNull()*/ ) {
pairSync - > setInitialSyncCompletedLocking ( ) ;
}
}
2009-03-12 16:01:52 +01:00
catch ( const SyncException & ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " caught SyncException " < < endl ;
2009-01-15 16:17:11 +01:00
return 10 ;
}
catch ( AssertionException & e ) {
if ( e . severe ( ) ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " replMain AssertionException " < < e . what ( ) < < endl ;
2009-01-15 16:17:11 +01:00
return 60 ;
}
else {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: AssertionException " < < e . what ( ) < < ' \n ' ;
2009-01-15 16:17:11 +01:00
}
replInfo = " replMain caught AssertionException " ;
}
2009-03-12 16:01:52 +01:00
catch ( const DBException & e ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: DBException " < < e . what ( ) < < endl ;
2009-03-12 16:01:52 +01:00
replInfo = " replMain caught DBException " ;
}
catch ( const std : : exception & e ) {
2009-09-22 16:10:02 +02:00
log ( ) < < " repl: std::exception " < < e . what ( ) < < endl ;
2009-03-12 16:01:52 +01:00
replInfo = " replMain caught std::exception " ;
}
2009-02-11 15:58:01 +01:00
catch ( . . . ) {
2009-03-12 16:01:52 +01:00
log ( ) < < " unexpected exception during replication. replication will halt " < < endl ;
replAllDead = " caught unexpected exception during replication " ;
2009-02-11 15:58:01 +01:00
}
2009-01-15 16:17:11 +01:00
if ( ! ok )
s - > resetConnection ( ) ;
2008-12-15 23:23:54 +01:00
}
2009-09-22 16:10:02 +02:00
return sleepAdvice ;
2008-12-15 23:23:54 +01:00
}
2009-01-15 16:17:11 +01:00
void replMain ( ) {
2009-04-01 22:00:56 +02:00
ReplSource : : SourceVector sources ;
2009-01-15 16:17:11 +01:00
while ( 1 ) {
int s = 0 ;
{
dblock lk ;
2009-02-04 19:22:02 +01:00
if ( replAllDead ) {
2009-02-02 17:15:24 +01:00
if ( ! autoresync | | ! ReplSource : : throttledForceResyncDead ( " auto " ) )
break ;
}
2009-01-15 16:17:11 +01:00
assert ( syncing = = 0 ) ;
syncing + + ;
}
try {
2009-09-22 16:10:02 +02:00
int nApplied = 0 ;
s = _replMain ( sources , nApplied ) ;
if ( s = = 1 ) {
if ( nApplied = = 0 ) s = 2 ;
else if ( nApplied > 100 ) {
// sleep very little - just enought that we aren't truly hammering master
sleepmillis ( 75 ) ;
s = 0 ;
}
}
2009-01-15 16:17:11 +01:00
} catch ( . . . ) {
2009-09-22 16:10:02 +02:00
out ( ) < < " caught exception in _replMain " < < endl ;
s = 4 ;
2009-01-15 16:17:11 +01:00
}
{
dblock lk ;
assert ( syncing = = 1 ) ;
syncing - - ;
}
if ( s ) {
stringstream ss ;
2009-09-22 16:10:02 +02:00
ss < < " repl: sleep " < < s < < " sec before next pass " ;
2009-01-15 16:17:11 +01:00
string msg = ss . str ( ) ;
2009-09-22 16:10:02 +02:00
log ( ) < < msg < < endl ;
2009-01-15 16:17:11 +01:00
ReplInfo r ( msg . c_str ( ) ) ;
sleepsecs ( s ) ;
}
}
}
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
int debug_stop_repl = 0 ;
2008-08-02 20:58:15 +02:00
2009-01-15 16:17:11 +01:00
void replSlaveThread ( ) {
sleepsecs ( 1 ) ;
2008-12-15 23:23:54 +01:00
2009-01-15 16:17:11 +01:00
{
dblock lk ;
2009-01-24 00:24:15 +01:00
2009-10-16 21:36:34 +02:00
Client : : initThread ( " replslave " ) ;
currentClient . get ( ) - > ai - > authorize ( " admin " ) ;
2009-01-24 00:24:15 +01:00
2009-01-15 16:17:11 +01:00
BSONObj obj ;
2009-01-18 23:48:44 +01:00
if ( Helpers : : getSingleton ( " local.pair.startup " , obj ) ) {
2009-01-15 16:17:11 +01:00
// should be: {replacepeer:1}
replacePeer = true ;
pairSync - > setInitialSyncCompleted ( ) ; // we are the half that has all the data
}
2008-12-15 23:23:54 +01:00
}
2009-01-15 16:17:11 +01:00
while ( 1 ) {
try {
replMain ( ) ;
if ( debug_stop_repl )
break ;
sleepsecs ( 5 ) ;
}
catch ( AssertionException & ) {
ReplInfo r ( " Assertion in replSlaveThread() : sleeping 5 minutes before retry " ) ;
problem ( ) < < " Assertion in replSlaveThread(): sleeping 5 minutes before retry " < < endl ;
sleepsecs ( 300 ) ;
2008-12-29 02:28:49 +01:00
}
}
2008-09-03 22:43:00 +02:00
}
2008-12-02 20:24:45 +01:00
2009-01-15 16:17:11 +01:00
void tempThread ( ) {
while ( 1 ) {
2009-01-15 17:26:38 +01:00
out ( ) < < dbMutexInfo . isLocked ( ) < < endl ;
2009-01-15 16:17:11 +01:00
sleepmillis ( 100 ) ;
}
2008-12-29 02:28:49 +01:00
}
2009-01-23 16:17:29 +01:00
void createOplog ( ) {
dblock lk ;
2009-10-05 22:26:19 +02:00
const char * ns = " local.oplog.$main " ;
2009-10-13 22:01:02 +02:00
setClient ( ns ) ;
2009-10-05 22:26:19 +02:00
if ( nsdetails ( ns ) )
return ;
2009-01-23 16:17:29 +01:00
/* create an oplog collection, if it doesn't yet exist. */
BSONObjBuilder b ;
double sz ;
2009-10-17 05:32:34 +02:00
if ( cmdLine . oplogSize ! = 0 )
sz = ( double ) cmdLine . oplogSize ;
2009-01-23 16:17:29 +01:00
else {
sz = 50.0 * 1000 * 1000 ;
if ( sizeof ( int * ) > = 8 ) {
sz = 990.0 * 1000 * 1000 ;
boost : : intmax_t free = freeSpace ( ) ; //-1 if call not supported.
double fivePct = free * 0.05 ;
if ( fivePct > sz )
sz = fivePct ;
}
}
2009-10-05 21:16:06 +02:00
2009-10-21 22:00:40 +02:00
log ( ) < < " ****** \n " ;
2009-11-06 14:18:44 +01:00
log ( ) < < " creating replication oplog of size: " < < ( int ) ( sz / ( 1024 * 1024 ) ) < < " MB (use --oplogSize to change) \n " ;
2009-10-05 21:18:04 +02:00
log ( ) < < " ****** " < < endl ;
2009-10-05 21:16:06 +02:00
2009-01-23 16:17:29 +01:00
b . append ( " size " , sz ) ;
b . appendBool ( " capped " , 1 ) ;
2009-04-21 21:42:22 +02:00
b . appendBool ( " autoIndexId " , false ) ;
2009-10-05 22:26:19 +02:00
2009-01-23 16:17:29 +01:00
string err ;
BSONObj o = b . done ( ) ;
2009-10-05 22:26:19 +02:00
userCreateNS ( ns , o , err , false ) ;
2009-04-22 16:52:32 +02:00
logOp ( " n " , " dummy " , BSONObj ( ) ) ;
2009-10-14 20:34:38 +02:00
cc ( ) . clearns ( ) ;
2009-01-23 16:17:29 +01:00
}
2009-01-15 16:17:11 +01:00
void startReplication ( ) {
/* this was just to see if anything locks for longer than it should -- we need to be careful
not to be locked when trying to connect ( ) or query ( ) the other side .
*/
//boost::thread tempt(tempThread);
if ( ! slave & & ! master & & ! replPair )
return ;
2008-12-29 02:28:49 +01:00
{
dblock lk ;
2009-01-15 16:17:11 +01:00
pairSync - > init ( ) ;
}
if ( slave | | replPair ) {
2009-08-17 22:13:13 +02:00
if ( slave ) {
assert ( slave = = SimpleSlave ) ;
2009-01-23 21:15:07 +01:00
log ( 1 ) < < " slave=true " < < endl ;
2009-08-17 22:13:13 +02:00
}
else
slave = ReplPairSlave ;
2009-01-15 16:17:11 +01:00
boost : : thread repl_thread ( replSlaveThread ) ;
}
if ( master | | replPair ) {
2009-01-23 21:15:07 +01:00
if ( master )
log ( 1 ) < < " master=true " < < endl ;
2009-01-15 16:17:11 +01:00
master = true ;
2009-01-23 16:17:29 +01:00
createOplog ( ) ;
2009-01-15 16:17:11 +01:00
}
2008-12-29 02:28:49 +01:00
}
2008-09-03 22:43:00 +02:00
2009-01-15 16:17:11 +01:00
/* called from main at server startup */
void pairWith ( const char * remoteEnd , const char * arb ) {
replPair = new ReplPair ( remoteEnd , arb ) ;
}
2009-01-14 23:09:51 +01:00
2009-03-04 21:57:35 +01:00
class CmdLogCollection : public Command {
public :
virtual bool slaveOk ( ) {
return false ;
}
CmdLogCollection ( ) : Command ( " logCollection " ) { }
virtual void help ( stringstream & help ) const {
help < < " examples: { logCollection: <collection ns>, start: 1 }, "
2009-03-06 16:45:35 +01:00
< < " { logCollection: <collection ns>, validateComplete: 1 } " ;
2009-03-04 21:57:35 +01:00
}
virtual bool run ( const char * ns , BSONObj & cmdObj , string & errmsg , BSONObjBuilder & result , bool fromRepl ) {
2009-03-06 01:58:07 +01:00
string logCollection = cmdObj . getStringField ( " logCollection " ) ;
if ( logCollection . empty ( ) ) {
errmsg = " missing logCollection spec " ;
return false ;
}
2009-03-04 21:57:35 +01:00
bool start = ! cmdObj . getField ( " start " ) . eoo ( ) ;
2009-03-06 16:45:35 +01:00
bool validateComplete = ! cmdObj . getField ( " validateComplete " ) . eoo ( ) ;
if ( start ? validateComplete : ! validateComplete ) {
errmsg = " Must specify exactly one of start:1 or validateComplete:1 " ;
2009-03-04 21:57:35 +01:00
return false ;
}
2009-03-06 17:51:44 +01:00
int logSizeMb = cmdObj . getIntField ( " logSizeMb " ) ;
2009-03-06 01:58:07 +01:00
NamespaceDetailsTransient & t = NamespaceDetailsTransient : : get ( logCollection . c_str ( ) ) ;
2009-03-04 21:57:35 +01:00
if ( start ) {
if ( t . logNS ( ) . empty ( ) ) {
2009-03-06 17:51:44 +01:00
if ( logSizeMb = = INT_MIN ) {
t . startLog ( ) ;
} else {
t . startLog ( logSizeMb ) ;
}
2009-03-04 21:57:35 +01:00
} else {
2009-03-06 01:58:07 +01:00
errmsg = " Log already started for ns: " + logCollection ;
2009-03-04 21:57:35 +01:00
return false ;
}
} else {
if ( t . logNS ( ) . empty ( ) ) {
2009-03-06 16:45:35 +01:00
errmsg = " No log to validateComplete for ns: " + logCollection ;
2009-03-04 21:57:35 +01:00
return false ;
} else {
2009-03-06 16:45:35 +01:00
if ( ! t . validateCompleteLog ( ) ) {
errmsg = " Oplog failure, insufficient space allocated " ;
return false ;
}
2009-03-04 21:57:35 +01:00
}
}
return true ;
}
} cmdlogcollection ;
2009-01-14 23:09:51 +01:00
} // namespace mongo