mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
Generalize recloning docs on initial oplog application SERVER-3367
This commit is contained in:
parent
cf8368c20a
commit
c99abec538
34
db/oplog.cpp
34
db/oplog.cpp
@ -624,6 +624,40 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
bool shouldRetry(const BSONObj& o, const string& hn) {
|
||||
OplogReader missingObjReader;
|
||||
|
||||
// we don't have the object yet, which is possible on initial sync. get it.
|
||||
log() << "replication info adding missing object" << endl; // rare enough we can log
|
||||
uassert(15916, str::stream() << "Can no longer connect to initial sync source: " << hn, missingObjReader.connect(hn));
|
||||
|
||||
const char *ns = o.getStringField("ns");
|
||||
// might be more than just _id in the update criteria
|
||||
BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")["_id"]).obj();
|
||||
BSONObj missingObj;
|
||||
try {
|
||||
missingObj = missingObjReader.findOne(ns, query);
|
||||
} catch(DBException& e) {
|
||||
log() << "replication assertion fetching missing object: " << e.what() << endl;
|
||||
throw;
|
||||
}
|
||||
|
||||
if( missingObj.isEmpty() ) {
|
||||
log() << "replication missing object not found on source. presumably deleted later in oplog" << endl;
|
||||
log() << "replication o2: " << o.getObjectField("o2").toString() << endl;
|
||||
log() << "replication o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl;
|
||||
|
||||
return false;
|
||||
}
|
||||
else {
|
||||
Client::Context ctx(ns);
|
||||
DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize());
|
||||
uassert(15917, "Got bad disk location when attempting to insert", !d.isNull());
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/** @param fromRepl false if from ApplyOpsCmd
|
||||
@return true if was and update should have happened and the document DNE. see replset initial sync code.
|
||||
*/
|
||||
|
@ -129,6 +129,12 @@ namespace mongo {
|
||||
* take an op and apply locally
|
||||
* used for applying from an oplog
|
||||
* @param fromRepl really from replication or for testing/internal/command/etc...
|
||||
* Returns if the op was an update that could not be applied (true on failure)
|
||||
*/
|
||||
bool applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
|
||||
|
||||
/**
|
||||
* If applyOperation_inlock should be called again after an update fails.
|
||||
*/
|
||||
bool shouldRetry(const BSONObj& op , const string& hn);
|
||||
}
|
||||
|
@ -613,8 +613,10 @@ namespace mongo {
|
||||
|
||||
void ReplSource::applyOperation(const BSONObj& op) {
|
||||
try {
|
||||
// TODO SERVER-3367
|
||||
applyOperation_inlock( op );
|
||||
bool failedUpdate = applyOperation_inlock( op );
|
||||
if (failedUpdate && shouldRetry(op, hostName)) {
|
||||
uassert(15914, "Failure retrying initial sync update", applyOperation_inlock(op));
|
||||
}
|
||||
}
|
||||
catch ( UserException& e ) {
|
||||
log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;;
|
||||
|
@ -122,11 +122,11 @@ namespace mongo {
|
||||
* @return true iff an op with the specified ns may be applied.
|
||||
*/
|
||||
bool handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db );
|
||||
|
||||
|
||||
public:
|
||||
OplogReader oplogReader;
|
||||
|
||||
static void applyOperation(const BSONObj& op);
|
||||
void applyOperation(const BSONObj& op);
|
||||
string hostName; // ip addr or hostname plus optionally, ":<port>"
|
||||
string _sourceName; // a logical source name.
|
||||
string sourceName() const { return _sourceName.empty() ? "main" : _sourceName; }
|
||||
|
@ -74,7 +74,6 @@ namespace mongo {
|
||||
const OpTime& applyGTE, const OpTime& minValid) {
|
||||
|
||||
const string hn = source->fullName();
|
||||
OplogReader missingObjReader;
|
||||
try {
|
||||
r.tailingQueryGTE( rsoplog, applyGTE );
|
||||
if ( !r.haveCursor() ) {
|
||||
@ -145,45 +144,8 @@ namespace mongo {
|
||||
|
||||
if( ts >= applyGTE ) { // optimes before we started copying need not be applied.
|
||||
bool failedUpdate = syncApply(o);
|
||||
if( failedUpdate ) {
|
||||
// we don't have the object yet, which is possible on initial sync. get it.
|
||||
log() << "replSet info adding missing object" << endl; // rare enough we can log
|
||||
if( !missingObjReader.connect(hn) ) { // ok to call more than once
|
||||
log() << "replSet initial sync fails, couldn't connect to " << hn << endl;
|
||||
return false;
|
||||
}
|
||||
const char *ns = o.getStringField("ns");
|
||||
BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")["_id"]).obj(); // might be more than just _id in the update criteria
|
||||
BSONObj missingObj;
|
||||
try {
|
||||
missingObj = missingObjReader.findOne(
|
||||
ns,
|
||||
query );
|
||||
} catch(...) {
|
||||
log() << "replSet assertion fetching missing object" << endl;
|
||||
throw;
|
||||
}
|
||||
if( missingObj.isEmpty() ) {
|
||||
log() << "replSet missing object not found on source. presumably deleted later in oplog" << endl;
|
||||
log() << "replSet o2: " << o.getObjectField("o2").toString() << endl;
|
||||
log() << "replSet o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl;
|
||||
}
|
||||
else {
|
||||
Client::Context ctx(ns);
|
||||
try {
|
||||
DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize());
|
||||
assert( !d.isNull() );
|
||||
} catch(...) {
|
||||
log() << "replSet assertion during insert of missing object" << endl;
|
||||
throw;
|
||||
}
|
||||
// now reapply the update from above
|
||||
bool failed = syncApply(o);
|
||||
if( failed ) {
|
||||
log() << "replSet update still fails after adding missing object " << ns << endl;
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
if( failedUpdate && shouldRetry(o, hn)) {
|
||||
uassert(15915, "replSet update still fails after adding missing object", syncApply(o));
|
||||
}
|
||||
}
|
||||
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
|
||||
|
Loading…
Reference in New Issue
Block a user