mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 01:21:03 +01:00
SERVER-9853 retry ghostsync percolate if socket failure
Without this fix, depending on exact timing of upstream socket failure, you could lose a chaining upstream oplog position update from a secondary. This might then cause a write with a certain level of write concern to time out even though enough secondaries had applied the write. This problem would be masked by a sufficient number of subsequent writes, which would trigger another upstream oplog position update. Note that this only affects chaining; nonchaining updates are already retrying when a socket failure occurs.
This commit is contained in:
parent
1e65ddf0bf
commit
5e02976117
51
jstests/replsets/chaining.js
Normal file
51
jstests/replsets/chaining.js
Normal file
@ -0,0 +1,51 @@
|
||||
// Test that chaining position propegation (percolate) still works properly
|
||||
// in the face of socket exceptions
|
||||
|
||||
function assertGLEOK(status) {
|
||||
assert(status.ok && status.err === null,
|
||||
"Expected OK status object; found " + tojson(status));
|
||||
}
|
||||
|
||||
var replTest = new ReplSetTest({name: 'testSet', nodes: 3});
|
||||
var nodes = replTest.startSet();
|
||||
var hostnames = replTest.nodeList();
|
||||
replTest.initiate(
|
||||
{
|
||||
"_id" : "testSet",
|
||||
"members" : [
|
||||
{"_id" : 0, "host" : hostnames[0], "priority" : 2},
|
||||
{"_id" : 1, "host" : hostnames[1]},
|
||||
{"_id" : 2, "host" : hostnames[2]}
|
||||
],
|
||||
}
|
||||
);
|
||||
|
||||
replTest.awaitReplication();
|
||||
|
||||
replTest.bridge();
|
||||
replTest.partition(0, 2);
|
||||
|
||||
// Now 0 and 2 can't see each other, so 2 should chain through 1 to reach 0.
|
||||
|
||||
var master = replTest.getMaster();
|
||||
var cdb = master.getDB("chaining");
|
||||
var admin = nodes[1].getDB("admin");
|
||||
cdb.foo.insert({a:1});
|
||||
assertGLEOK(cdb.getLastErrorObj());
|
||||
replTest.awaitReplication();
|
||||
|
||||
var result = admin.runCommand( { configureFailPoint: 'rsChaining1', mode: { times : 1 } } );
|
||||
assert.eq(1, result.ok, 'rsChaining1');
|
||||
cdb.foo.insert({a:1});
|
||||
assertGLEOK(cdb.getLastErrorObj());
|
||||
replTest.awaitReplication();
|
||||
admin.runCommand( { configureFailPoint: 'rsChaining2', mode: { times : 1 } } );
|
||||
assert.eq(1, result.ok, 'rsChaining2');
|
||||
cdb.foo.insert({a:1});
|
||||
assertGLEOK(cdb.getLastErrorObj());
|
||||
replTest.awaitReplication();
|
||||
admin.runCommand( { configureFailPoint: 'rsChaining3', mode: { times : 1 } } );
|
||||
assert.eq(1, result.ok, 'rsChaining3');
|
||||
cdb.foo.insert({a:1});
|
||||
assertGLEOK(cdb.getLastErrorObj());
|
||||
replTest.awaitReplication();
|
@ -45,6 +45,11 @@ namespace mongo {
|
||||
|
||||
const int ReplSetImpl::maxSyncSourceLagSecs = 30;
|
||||
|
||||
// For testing network failures in percolate() for chaining
|
||||
MONGO_FP_DECLARE(rsChaining1);
|
||||
MONGO_FP_DECLARE(rsChaining2);
|
||||
MONGO_FP_DECLARE(rsChaining3);
|
||||
|
||||
namespace replset {
|
||||
|
||||
MONGO_FP_DECLARE(rsSyncApplyStop);
|
||||
@ -866,54 +871,81 @@ namespace replset {
|
||||
}
|
||||
verify(slave->slave);
|
||||
|
||||
const Member *target = replset::BackgroundSync::get()->getSyncTarget();
|
||||
if (!target || rs->box.getState().primary()
|
||||
// we are currently syncing from someone who's syncing from us
|
||||
// the target might end up with a new Member, but s.slave never
|
||||
// changes so we'll compare the names
|
||||
|| target == slave->slave || target->fullName() == slave->slave->fullName()) {
|
||||
LOG(1) << "replica set ghost target no good" << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// haveCursor() does not necessarily tell us if we have a non-dead cursor, so we check
|
||||
// tailCheck() as well; see SERVER-8420
|
||||
slave->reader.tailCheck();
|
||||
if (!slave->reader.haveCursor()) {
|
||||
if (!slave->reader.connect(id, slave->slave->id(), target->fullName())) {
|
||||
// error message logged in OplogReader::connect
|
||||
return;
|
||||
}
|
||||
slave->reader.ghostQueryGTE(rsoplog, last);
|
||||
// if we lose the connection between connecting and querying, the cursor may not
|
||||
// exist so we have to check again before using it.
|
||||
if (!slave->reader.haveCursor()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
LOG(1) << "replSet last: " << slave->last.toString() << " to " << last.toString() << rsLog;
|
||||
if (slave->last > last) {
|
||||
// Keep trying to update until we either succeed or we become primary.
|
||||
// Note that this can block the ghostsync thread for quite a while if there
|
||||
// are connection problems to the current sync source ("sync target")
|
||||
while (true) {
|
||||
const Member *target = replset::BackgroundSync::get()->getSyncTarget();
|
||||
if (!target || rs->box.getState().primary()
|
||||
// we are currently syncing from someone who's syncing from us
|
||||
// the target might end up with a new Member, but s.slave never
|
||||
// changes so we'll compare the names
|
||||
|| target == slave->slave || target->fullName() == slave->slave->fullName()) {
|
||||
LOG(1) << "replica set ghost target no good" << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
while (slave->last <= last) {
|
||||
if (!slave->reader.more()) {
|
||||
// we'll be back
|
||||
try {
|
||||
if (MONGO_FAIL_POINT(rsChaining1)) {
|
||||
mongo::getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->
|
||||
setMode(FailPoint::nTimes, 1);
|
||||
}
|
||||
|
||||
// haveCursor() does not necessarily tell us if we have a non-dead cursor,
|
||||
// so we check tailCheck() as well; see SERVER-8420
|
||||
slave->reader.tailCheck();
|
||||
if (!slave->reader.haveCursor()) {
|
||||
if (!slave->reader.connect(id, slave->slave->id(), target->fullName())) {
|
||||
// error message logged in OplogReader::connect
|
||||
sleepsecs(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (MONGO_FAIL_POINT(rsChaining2)) {
|
||||
mongo::getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->
|
||||
setMode(FailPoint::nTimes, 1);
|
||||
}
|
||||
|
||||
slave->reader.ghostQueryGTE(rsoplog, last);
|
||||
// if we lose the connection between connecting and querying, the cursor may not
|
||||
// exist so we have to check again before using it.
|
||||
if (!slave->reader.haveCursor()) {
|
||||
sleepsecs(1);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
LOG(1) << "replSet last: " << slave->last.toString() << " to "
|
||||
<< last.toString() << rsLog;
|
||||
if (slave->last > last) {
|
||||
// Nothing to do; already up to date.
|
||||
return;
|
||||
}
|
||||
|
||||
BSONObj o = slave->reader.nextSafe();
|
||||
slave->last = o["ts"]._opTime();
|
||||
while (slave->last <= last) {
|
||||
if (MONGO_FAIL_POINT(rsChaining3)) {
|
||||
mongo::getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->
|
||||
setMode(FailPoint::nTimes, 1);
|
||||
}
|
||||
|
||||
if (!slave->reader.more()) {
|
||||
// Hit the end of the oplog on the sync source; we're fully up to date now.
|
||||
return;
|
||||
}
|
||||
|
||||
BSONObj o = slave->reader.nextSafe();
|
||||
slave->last = o["ts"]._opTime();
|
||||
}
|
||||
LOG(2) << "now last is " << slave->last.toString() << rsLog;
|
||||
// We moved the cursor forward enough; we're done.
|
||||
return;
|
||||
}
|
||||
catch (const DBException& e) {
|
||||
// This captures SocketExceptions as well.
|
||||
log() << "replSet ghost sync error: " << e.what() << " for "
|
||||
<< slave->slave->fullName() << rsLog;
|
||||
slave->reader.resetConnection();
|
||||
}
|
||||
LOG(2) << "now last is " << slave->last.toString() << rsLog;
|
||||
}
|
||||
catch (const DBException& e) {
|
||||
// we'll be back
|
||||
LOG(2) << "replSet ghost sync error: " << e.what() << " for "
|
||||
<< slave->slave->fullName() << rsLog;
|
||||
slave->reader.resetConnection();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user