0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

SERVER-47949 Don't fetch or install a newer config via heartbeat while in drain mode

This commit is contained in:
William Schultz 2020-05-08 19:13:58 -04:00 committed by Evergreen Agent
parent 276ac45d7c
commit 2546fe1c22
5 changed files with 239 additions and 12 deletions

View File

@ -56,7 +56,7 @@ public:
static const char kTermFieldName[];
// The term of an OpTime generated by old protocol version.
static const long long kUninitializedTerm = -1;
static constexpr long long kUninitializedTerm = -1;
// The initial term after the first time upgrading from protocol version 0.
//

View File

@ -3337,6 +3337,14 @@ Status ReplicationCoordinatorImpl::doReplSetReconfig(OperationContext* opCtx,
}
auto topCoordTerm = _topCoord->getTerm();
if (!force) {
// For safety of reconfig, since we must commit a config in our own term before executing a
// reconfig, so we should never have a config in an older term. If the current config was
// installed via a force reconfig, we aren't concerned about this safety guarantee.
invariant(_rsConfig.getConfigTerm() == OpTime::kUninitializedTerm ||
_rsConfig.getConfigTerm() == topCoordTerm);
}
auto configWriteConcern = _getConfigReplicationWriteConcern();
// Construct a fake OpTime that can be accepted but isn't used.
OpTime fakeOpTime(Timestamp(1, 1), topCoordTerm);
@ -5157,18 +5165,27 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
}
} else if (result.isOK() &&
response->getConfigVersionAndTerm() < args.getConfigVersionAndTerm()) {
logv2::DynamicAttributes attr;
attr.add("configTerm", args.getConfigTerm());
attr.add("configVersion", args.getConfigVersion());
attr.add("senderHost", senderHost);
// If we are currently in drain mode, we won't allow installing newer configs, so we don't
// schedule a heartbeat to fetch one. We do allow force reconfigs to proceed even if we are
// in drain mode.
if (_memberState.primary() && !_readWriteAbility->canAcceptNonLocalWrites(lk) &&
args.getConfigTerm() != OpTime::kUninitializedTerm) {
LOGV2(4794901,
"Not scheduling a heartbeat to fetch a newer config since we are in PRIMARY "
"state but cannot accept writes yet.",
attr);
}
// Schedule a heartbeat to the sender to fetch the new config.
// Only send this if the sender's config is newer.
// We cannot cancel the enqueued heartbeat, but either this one or the enqueued heartbeat
// will trigger reconfig, which cancels and reschedules all heartbeats.
if (args.hasSender()) {
LOGV2(21401,
"Scheduling heartbeat to fetch a newer config with term {configTerm} and "
"version {configVersion} from member: {senderHost}",
"Scheduling heartbeat to fetch a newer config",
"configTerm"_attr = args.getConfigTerm(),
"configVersion"_attr = args.getConfigVersion(),
"senderHost"_attr = senderHost);
else if (args.hasSender()) {
LOGV2(21401, "Scheduling heartbeat to fetch a newer config", attr);
int senderIndex = _rsConfig.findMemberIndexByHostAndPort(senderHost);
_scheduleHeartbeatToTarget_inlock(senderHost, senderIndex, now);
}

View File

@ -1162,7 +1162,7 @@ private:
/**
* Schedules a replica set config change.
*/
void _scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig);
void _scheduleHeartbeatReconfig(WithLock lk, const ReplSetConfig& newConfig);
/**
* Method to write a configuration transmitted via heartbeat message to stable storage.

View File

@ -356,7 +356,7 @@ stdx::unique_lock<Latch> ReplicationCoordinatorImpl::_handleHeartbeatResponseAct
break;
case HeartbeatResponseAction::Reconfig:
invariant(responseStatus.isOK());
_scheduleHeartbeatReconfig_inlock(responseStatus.getValue().getConfig());
_scheduleHeartbeatReconfig(lock, responseStatus.getValue().getConfig());
break;
case HeartbeatResponseAction::StepDownSelf:
invariant(action.getPrimaryConfigIndex() == _selfIndex);
@ -530,7 +530,8 @@ bool ReplicationCoordinatorImpl::_shouldStepDownOnReconfig(WithLock,
!(myIndex.isOK() && newConfig.getMemberAt(myIndex.getValue()).isElectable());
}
void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSetConfig& newConfig) {
void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
const ReplSetConfig& newConfig) {
if (_inShutdown) {
return;
}
@ -566,6 +567,17 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig_inlock(const ReplSet
"Aborting reconfiguration request",
"_rsConfigState"_attr = int(_rsConfigState));
}
// Allow force reconfigs to proceed even if we are not a writable primary yet.
if (_memberState.primary() && !_readWriteAbility->canAcceptNonLocalWrites(lk) &&
newConfig.getConfigTerm() != OpTime::kUninitializedTerm) {
LOGV2_FOR_HEARTBEATS(
4794900,
1,
"Not scheduling a heartbeat reconfig since we are in PRIMARY state but "
"cannot accept writes yet.");
return;
}
_setConfigState_inlock(kConfigHBReconfiguring);
invariant(!_rsConfig.isInitialized() ||
_rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm());

View File

@ -2037,6 +2037,204 @@ TEST_F(ReplCoordReconfigTest, ForceReconfigShouldThrowIfArbiterNodesHaveNewlyAdd
getReplCoord()->processReplSetReconfig(opCtx.get(), args, &result));
}
TEST_F(ReplCoordTest, StepUpReconfigConcurrentWithHeartbeatReconfig) {
auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kReplication,
logv2::LogSeverity::Debug(2)};
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version" << 2 << "term" << 0 << "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_EQUALS(getReplCoord()->getTerm(), 0);
replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
// Win election but don't exit drain mode.
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
const auto opCtx = makeOperationContext();
simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get());
// Receive a heartbeat that should NOT schedule a new heartbeat to fetch a newer config.
ReplSetHeartbeatArgsV1 hbArgs;
auto rsConfig = getReplCoord()->getConfig();
hbArgs.setConfigVersion(3); // simulate a newer config version.
hbArgs.setConfigTerm(rsConfig.getConfigTerm());
hbArgs.setSetName(rsConfig.getReplSetName());
hbArgs.setSenderHost(HostAndPort("node2", 12345));
hbArgs.setSenderId(2);
hbArgs.setTerm(0);
ASSERT(hbArgs.isInitialized());
ReplSetHeartbeatResponse response;
ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
// No requests should have been scheduled.
getNet()->enterNetwork();
ASSERT_FALSE(getNet()->hasReadyRequests());
getNet()->exitNetwork();
// Receive a heartbeat that schedules a new heartbeat to fetch a newer config. We simulate a
// newer config version and an uninitialized term, so that a heartbeat will be scheduled to
// fetch a new config. When we mock the heartbeat response below, we will respond with a
// non-force config, which is to test the case where the sending node installed a non force
// config after we scheduled a heartbeat to it to fetch a force config. For safety, the
// important aspect is that we don't accept/install configs during drain mode, even if we try to
// fetch them.
hbArgs.setConfigVersion(3);
hbArgs.setConfigTerm(OpTime::kUninitializedTerm);
hbArgs.setSetName(rsConfig.getReplSetName());
hbArgs.setSenderHost(HostAndPort("node2", 12345));
hbArgs.setSenderId(2);
hbArgs.setTerm(0);
ASSERT(hbArgs.isInitialized());
ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
// Schedule a response with a newer config.
auto newerConfigVersion = 3;
auto newerConfig = BSON("_id"
<< "mySet"
<< "version" << newerConfigVersion << "term" << 0 << "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")));
auto net = getNet();
net->enterNetwork();
auto noi = net->getNextReadyRequest();
auto& request = noi->getRequest();
ReplSetHeartbeatArgsV1 args;
ASSERT_OK(args.initialize(request.cmdObj));
startCapturingLogMessages();
OpTime lastApplied(Timestamp(100, 1), 0);
ReplSetHeartbeatResponse hbResp;
ASSERT_OK(rsConfig.initialize(newerConfig));
hbResp.setConfig(rsConfig);
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
hbResp.setConfigTerm(rsConfig.getConfigTerm());
hbResp.setAppliedOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
hbResp.setDurableOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
net->runReadyNetworkOperations();
net->exitNetwork();
stopCapturingLogMessages();
// Make sure the heartbeat reconfig has not been scheduled.
ASSERT_EQUALS(1, countTextFormatLogLinesContaining("Not scheduling a heartbeat reconfig"));
// Let drain mode complete.
signalDrainComplete(opCtx.get());
// We should have moved to a new term in the election, and our config should have the same term.
ASSERT_EQUALS(getReplCoord()->getTerm(), 1);
ASSERT_EQUALS(getReplCoord()->getConfig().getConfigTerm(), 1);
}
TEST_F(ReplCoordTest, StepUpReconfigConcurrentWithForceHeartbeatReconfig) {
auto severityGuard = unittest::MinimumLoggedSeverityGuard{logv2::LogComponent::kReplication,
logv2::LogSeverity::Debug(2)};
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version" << 2 << "term" << 0 << "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_EQUALS(getReplCoord()->getTerm(), 0);
replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0), Date_t() + Seconds(100));
// Win election but don't exit drain mode.
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
const auto opCtx = makeOperationContext();
simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get());
// Receive a heartbeat that schedules a new heartbeat to fetch a newer config.
ReplSetHeartbeatArgsV1 hbArgs;
auto rsConfig = getReplCoord()->getConfig();
hbArgs.setConfigVersion(3); // simulate a newer config version.
hbArgs.setConfigTerm(OpTime::kUninitializedTerm); // force config.
hbArgs.setSetName(rsConfig.getReplSetName());
hbArgs.setSenderHost(HostAndPort("node2", 12345));
hbArgs.setSenderId(2);
hbArgs.setTerm(0);
ASSERT(hbArgs.isInitialized());
ReplSetHeartbeatResponse response;
ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
// Schedule a response with a newer config.
auto newerConfigVersion = 3;
auto newerConfig =
BSON("_id"
<< "mySet"
<< "version" << newerConfigVersion << "term" << OpTime::kUninitializedTerm << "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")));
auto net = getNet();
net->enterNetwork();
auto noi = net->getNextReadyRequest();
auto& request = noi->getRequest();
ReplSetHeartbeatArgsV1 args;
ASSERT_OK(args.initialize(request.cmdObj));
OpTime lastApplied(Timestamp(100, 1), 0);
ReplSetHeartbeatResponse hbResp;
ASSERT_OK(rsConfig.initialize(newerConfig));
hbResp.setConfig(rsConfig);
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
hbResp.setConfigTerm(rsConfig.getConfigTerm());
hbResp.setAppliedOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
hbResp.setDurableOpTimeAndWallTime({lastApplied, Date_t() + Seconds(lastApplied.getSecs())});
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON()));
net->exitNetwork();
{
// Prevent the heartbeat reconfig from completing.
FailPointEnableBlock fpb("blockHeartbeatReconfigFinish");
// Let the heartbeat reconfig begin.
net->enterNetwork();
net->runReadyNetworkOperations();
net->exitNetwork();
// For force reconfigs, we do allow them to proceed even if we are in drain mode, so make
// sure it is in progress, stuck at the failpoint before completion.
fpb->waitForTimesEntered(1);
// At this point the heartbeat reconfig should be in progress but blocked from completion by
// the failpoint. We now let drain mode complete. The step up reconfig should be interrupted
// by the in progress heartbeat reconfig.
signalDrainComplete(opCtx.get());
}
// The failpoint should be released now, allowing the heartbeat reconfig to complete. We run the
// clock forward so the re-scheduled heartbeat reconfig will complete.
net->enterNetwork();
net->runUntil(net->now() + Milliseconds(100));
net->exitNetwork();
// We should have moved to a new term in the election, but our config should have the term from
// the force config.
ASSERT_EQUALS(getReplCoord()->getTerm(), 1);
ASSERT_EQUALS(getReplCoord()->getConfig().getConfigTerm(), OpTime::kUninitializedTerm);
}
} // anonymous namespace
} // namespace repl
} // namespace mongo