mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-19764 Store the last vote when voting for myself
This commit is contained in:
parent
b62450de74
commit
d2155f39ef
@ -3023,5 +3023,17 @@ void ReplicationCoordinatorImpl::_dropAllSnapshots_inlock() {
|
||||
_externalState->dropAllSnapshots();
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::waitForElectionFinish_forTest() {
|
||||
if (_electionFinishedEvent.isValid()) {
|
||||
_replExecutor.waitForEvent(_electionFinishedEvent);
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::waitForElectionDryRunFinish_forTest() {
|
||||
if (_electionDryRunFinishedEvent.isValid()) {
|
||||
_replExecutor.waitForEvent(_electionDryRunFinishedEvent);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace repl
|
||||
} // namespace mongo
|
||||
|
@ -305,6 +305,19 @@ public:
|
||||
|
||||
bool updateTerm_forTest(long long term);
|
||||
|
||||
/**
|
||||
* If called after _startElectSelfV1(), blocks until all asynchronous
|
||||
* activities associated with election complete.
|
||||
*/
|
||||
void waitForElectionFinish_forTest();
|
||||
|
||||
/**
|
||||
* If called after _startElectSelfV1(), blocks until all asynchronous
|
||||
* activities associated with election dry run complete, including writing
|
||||
* last vote and scheduling the real election.
|
||||
*/
|
||||
void waitForElectionDryRunFinish_forTest();
|
||||
|
||||
private:
|
||||
struct SnapshotInfo {
|
||||
OpTime opTime;
|
||||
@ -330,6 +343,9 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
class LoseElectionGuardV1;
|
||||
class LoseElectionDryRunGuardV1;
|
||||
|
||||
ReplicationCoordinatorImpl(const ReplSettings& settings,
|
||||
ReplicationCoordinatorExternalState* externalState,
|
||||
TopologyCoordinator* topCoord,
|
||||
@ -762,6 +778,8 @@ private:
|
||||
* For V1 (raft) style elections the election path is:
|
||||
* _startElectSelfV1()
|
||||
* _onDryRunComplete()
|
||||
* _writeLastVoteForMyElection()
|
||||
* _startVoteRequester()
|
||||
* _onVoteRequestComplete()
|
||||
* _onElectionWinnerDeclarerComplete()
|
||||
*/
|
||||
@ -788,6 +806,18 @@ private:
|
||||
*/
|
||||
void _onDryRunComplete(long long originalTerm);
|
||||
|
||||
/**
|
||||
* Writes the last vote in persistent storage after completing dry run successfully.
|
||||
* This job will be scheduled to run in DB worker threads.
|
||||
*/
|
||||
void _writeLastVoteForMyElection(LastVote lastVote,
|
||||
const ReplicationExecutor::CallbackArgs& cbData);
|
||||
|
||||
/**
|
||||
* Starts VoteRequester to run the real election when last vote write has completed.
|
||||
*/
|
||||
void _startVoteRequester(long long newTerm);
|
||||
|
||||
/**
|
||||
* Callback called when the VoteRequester has completed; checks the results and
|
||||
* decides whether to change state to primary and alert other nodes of our primary-ness.
|
||||
@ -1080,6 +1110,10 @@ private:
|
||||
// Unspecified value when _freshnessChecker is NULL.
|
||||
ReplicationExecutor::EventHandle _electionFinishedEvent; // (X)
|
||||
|
||||
// Event that the election code will signal when the in-progress election dry run completes,
|
||||
// which includes writing the last vote and scheduling the real election.
|
||||
ReplicationExecutor::EventHandle _electionDryRunFinishedEvent; // (X)
|
||||
|
||||
// Whether we slept last time we attempted an election but possibly tied with other nodes.
|
||||
bool _sleptLastElection; // (X)
|
||||
|
||||
|
@ -43,32 +43,24 @@
|
||||
namespace mongo {
|
||||
namespace repl {
|
||||
|
||||
namespace {
|
||||
class LoseElectionGuardV1 {
|
||||
class ReplicationCoordinatorImpl::LoseElectionGuardV1 {
|
||||
MONGO_DISALLOW_COPYING(LoseElectionGuardV1);
|
||||
|
||||
public:
|
||||
LoseElectionGuardV1(TopologyCoordinator* topCoord,
|
||||
ReplicationExecutor* executor,
|
||||
std::unique_ptr<VoteRequester>* voteRequester,
|
||||
std::unique_ptr<ElectionWinnerDeclarer>* electionWinnerDeclarer,
|
||||
ReplicationExecutor::EventHandle* electionFinishedEvent)
|
||||
: _topCoord(topCoord),
|
||||
_executor(executor),
|
||||
_voteRequester(voteRequester),
|
||||
_electionWinnerDeclarer(electionWinnerDeclarer),
|
||||
_electionFinishedEvent(electionFinishedEvent),
|
||||
_dismissed(false) {}
|
||||
LoseElectionGuardV1(ReplicationCoordinatorImpl* replCoord) : _replCoord(replCoord) {}
|
||||
|
||||
~LoseElectionGuardV1() {
|
||||
virtual ~LoseElectionGuardV1() {
|
||||
if (_dismissed) {
|
||||
return;
|
||||
}
|
||||
_topCoord->processLoseElection();
|
||||
_electionWinnerDeclarer->reset(nullptr);
|
||||
_voteRequester->reset(nullptr);
|
||||
if (_electionFinishedEvent->isValid()) {
|
||||
_executor->signalEvent(*_electionFinishedEvent);
|
||||
_replCoord->_topCoord->processLoseElection();
|
||||
_replCoord->_electionWinnerDeclarer.reset(nullptr);
|
||||
_replCoord->_voteRequester.reset(nullptr);
|
||||
if (_isDryRun && _replCoord->_electionDryRunFinishedEvent.isValid()) {
|
||||
_replCoord->_replExecutor.signalEvent(_replCoord->_electionDryRunFinishedEvent);
|
||||
}
|
||||
if (_replCoord->_electionFinishedEvent.isValid()) {
|
||||
_replCoord->_replExecutor.signalEvent(_replCoord->_electionFinishedEvent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,16 +68,22 @@ public:
|
||||
_dismissed = true;
|
||||
}
|
||||
|
||||
private:
|
||||
TopologyCoordinator* const _topCoord;
|
||||
ReplicationExecutor* const _executor;
|
||||
std::unique_ptr<VoteRequester>* const _voteRequester;
|
||||
std::unique_ptr<ElectionWinnerDeclarer>* const _electionWinnerDeclarer;
|
||||
const ReplicationExecutor::EventHandle* _electionFinishedEvent;
|
||||
bool _dismissed;
|
||||
protected:
|
||||
ReplicationCoordinatorImpl* const _replCoord;
|
||||
bool _isDryRun = false;
|
||||
bool _dismissed = false;
|
||||
};
|
||||
|
||||
class ReplicationCoordinatorImpl::LoseElectionDryRunGuardV1 : public LoseElectionGuardV1 {
|
||||
MONGO_DISALLOW_COPYING(LoseElectionDryRunGuardV1);
|
||||
|
||||
public:
|
||||
LoseElectionDryRunGuardV1(ReplicationCoordinatorImpl* replCoord)
|
||||
: LoseElectionGuardV1(replCoord) {
|
||||
_isDryRun = true;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
void ReplicationCoordinatorImpl::_startElectSelfV1() {
|
||||
invariant(!_electionWinnerDeclarer);
|
||||
@ -115,11 +113,15 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() {
|
||||
}
|
||||
fassert(28642, finishEvh.getStatus());
|
||||
_electionFinishedEvent = finishEvh.getValue();
|
||||
LoseElectionGuardV1 lossGuard(_topCoord.get(),
|
||||
&_replExecutor,
|
||||
&_voteRequester,
|
||||
&_electionWinnerDeclarer,
|
||||
&_electionFinishedEvent);
|
||||
|
||||
const StatusWith<ReplicationExecutor::EventHandle> dryRunFinishEvh = _replExecutor.makeEvent();
|
||||
if (dryRunFinishEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
|
||||
return;
|
||||
}
|
||||
fassert(28767, dryRunFinishEvh.getStatus());
|
||||
_electionDryRunFinishedEvent = dryRunFinishEvh.getValue();
|
||||
|
||||
LoseElectionDryRunGuardV1 lossGuard(this);
|
||||
|
||||
|
||||
invariant(_rsConfig.getMemberAt(_selfIndex).isElectable());
|
||||
@ -158,11 +160,7 @@ void ReplicationCoordinatorImpl::_startElectSelfV1() {
|
||||
void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
|
||||
invariant(_voteRequester);
|
||||
invariant(!_electionWinnerDeclarer);
|
||||
LoseElectionGuardV1 lossGuard(_topCoord.get(),
|
||||
&_replExecutor,
|
||||
&_voteRequester,
|
||||
&_electionWinnerDeclarer,
|
||||
&_electionFinishedEvent);
|
||||
LoseElectionDryRunGuardV1 lossGuard(this);
|
||||
|
||||
if (_topCoord->getTerm() != originalTerm) {
|
||||
log() << "not running for primary, we have been superceded already";
|
||||
@ -186,10 +184,59 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
|
||||
_updateTerm_incallback(originalTerm + 1, nullptr);
|
||||
// Secure our vote for ourself first
|
||||
_topCoord->voteForMyselfV1();
|
||||
// TODO(siyuan): SERVER-19764 store the vote in persistent storage.
|
||||
|
||||
// Store the vote in persistent storage.
|
||||
LastVote lastVote;
|
||||
lastVote.setTerm(originalTerm + 1);
|
||||
lastVote.setCandidateId(getMyId());
|
||||
|
||||
auto cbStatus = _replExecutor.scheduleDBWork(
|
||||
[this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) {
|
||||
_writeLastVoteForMyElection(lastVote, cbData);
|
||||
});
|
||||
if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
|
||||
return;
|
||||
}
|
||||
fassert(28769, cbStatus.getStatus());
|
||||
lossGuard.dismiss();
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_writeLastVoteForMyElection(
|
||||
LastVote lastVote, const ReplicationExecutor::CallbackArgs& cbData) {
|
||||
invariant(_voteRequester);
|
||||
invariant(!_electionWinnerDeclarer);
|
||||
LoseElectionDryRunGuardV1 lossGuard(this);
|
||||
|
||||
if (cbData.status == ErrorCodes::CallbackCanceled) {
|
||||
return;
|
||||
}
|
||||
invariant(cbData.txn);
|
||||
|
||||
Status status = _externalState->storeLocalLastVoteDocument(cbData.txn, lastVote);
|
||||
if (!status.isOK()) {
|
||||
error() << "failed to store LastVote document when voting for myself: " << status;
|
||||
return;
|
||||
}
|
||||
|
||||
auto cbStatus = _replExecutor.scheduleWork(
|
||||
[this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) {
|
||||
_startVoteRequester(lastVote.getTerm());
|
||||
});
|
||||
if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
|
||||
return;
|
||||
}
|
||||
fassert(28768, cbStatus.getStatus());
|
||||
|
||||
_replExecutor.signalEvent(_electionDryRunFinishedEvent);
|
||||
lossGuard.dismiss();
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) {
|
||||
invariant(_voteRequester);
|
||||
invariant(!_electionWinnerDeclarer);
|
||||
LoseElectionGuardV1 lossGuard(this);
|
||||
|
||||
_voteRequester.reset(new VoteRequester);
|
||||
|
||||
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _voteRequester->start(
|
||||
&_replExecutor,
|
||||
_rsConfig,
|
||||
@ -197,22 +244,19 @@ void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
|
||||
_topCoord->getTerm(),
|
||||
false,
|
||||
getMyLastOptime(),
|
||||
stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, originalTerm + 1));
|
||||
stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
|
||||
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
|
||||
return;
|
||||
}
|
||||
fassert(28643, nextPhaseEvh.getStatus());
|
||||
|
||||
lossGuard.dismiss();
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm) {
|
||||
invariant(_voteRequester);
|
||||
invariant(!_electionWinnerDeclarer);
|
||||
LoseElectionGuardV1 lossGuard(_topCoord.get(),
|
||||
&_replExecutor,
|
||||
&_voteRequester,
|
||||
&_electionWinnerDeclarer,
|
||||
&_electionFinishedEvent);
|
||||
LoseElectionGuardV1 lossGuard(this);
|
||||
|
||||
if (_topCoord->getTerm() != originalTerm) {
|
||||
log() << "not becoming primary, we have been superceded already";
|
||||
@ -244,11 +288,7 @@ void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm)
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_onElectionWinnerDeclarerComplete() {
|
||||
LoseElectionGuardV1 lossGuard(_topCoord.get(),
|
||||
&_replExecutor,
|
||||
&_voteRequester,
|
||||
&_electionWinnerDeclarer,
|
||||
&_electionFinishedEvent);
|
||||
LoseElectionGuardV1 lossGuard(this);
|
||||
|
||||
invariant(_voteRequester);
|
||||
invariant(_electionWinnerDeclarer);
|
||||
|
@ -110,6 +110,7 @@ void ReplCoordElectV1Test::simulateSuccessfulDryRun() {
|
||||
net->runReadyNetworkOperations();
|
||||
}
|
||||
net->exitNetwork();
|
||||
getReplCoord()->waitForElectionDryRunFinish_forTest();
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordElectV1Test, ElectTooSoon) {
|
||||
@ -160,6 +161,10 @@ TEST_F(ReplCoordElectV1Test, ElectTwoNodesWithOneZeroVoter) {
|
||||
net->runReadyNetworkOperations();
|
||||
net->exitNetwork();
|
||||
|
||||
// _startElectSelfV1 is called synchronously in the processing of HB response, so election
|
||||
// finished event has been set.
|
||||
getReplCoord()->waitForElectionFinish_forTest();
|
||||
|
||||
ASSERT(getReplCoord()->getMemberState().primary())
|
||||
<< getReplCoord()->getMemberState().toString();
|
||||
ASSERT(getReplCoord()->isWaitingForApplierToDrain());
|
||||
@ -220,6 +225,14 @@ TEST_F(ReplCoordElectV1Test, ElectManyNodesSuccess) {
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
startCapturingLogMessages();
|
||||
simulateSuccessfulV1Election();
|
||||
getReplCoord()->waitForElectionFinish_forTest();
|
||||
|
||||
// Check last vote
|
||||
auto lastVote = getExternalState()->loadLocalLastVoteDocument(nullptr);
|
||||
ASSERT(lastVote.isOK());
|
||||
ASSERT_EQ(1, lastVote.getValue().getCandidateId());
|
||||
ASSERT_EQ(1, lastVote.getValue().getTerm());
|
||||
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(1, countLogLinesContaining("election succeeded"));
|
||||
}
|
||||
@ -310,6 +323,7 @@ TEST_F(ReplCoordElectV1Test, ElectStaleTermInDryRun) {
|
||||
net->runReadyNetworkOperations();
|
||||
}
|
||||
net->exitNetwork();
|
||||
getReplCoord()->waitForElectionFinish_forTest();
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(
|
||||
1, countLogLinesContaining("not running for primary, we have been superceded already"));
|
||||
@ -509,6 +523,8 @@ TEST_F(ReplCoordElectV1Test, ElectNotEnoughVotes) {
|
||||
net->runReadyNetworkOperations();
|
||||
}
|
||||
net->exitNetwork();
|
||||
|
||||
getReplCoord()->waitForElectionFinish_forTest();
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(1,
|
||||
countLogLinesContaining("not becoming primary, we received insufficient votes"));
|
||||
@ -556,6 +572,8 @@ TEST_F(ReplCoordElectV1Test, ElectStaleTerm) {
|
||||
net->runReadyNetworkOperations();
|
||||
}
|
||||
net->exitNetwork();
|
||||
|
||||
getReplCoord()->waitForElectionFinish_forTest();
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(1,
|
||||
countLogLinesContaining("not becoming primary, we have been superceded already"));
|
||||
@ -585,6 +603,7 @@ TEST_F(ReplCoordElectV1Test, ElectTermChangeDuringDryRun) {
|
||||
// update to a future term before dry run completes
|
||||
getReplCoord()->updateTerm(1000);
|
||||
simulateSuccessfulDryRun();
|
||||
getReplCoord()->waitForElectionFinish_forTest();
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(
|
||||
1, countLogLinesContaining("not running for primary, we have been superceded already"));
|
||||
@ -634,6 +653,7 @@ TEST_F(ReplCoordElectV1Test, ElectTermChangeDuringActualElection) {
|
||||
net->runReadyNetworkOperations();
|
||||
}
|
||||
net->exitNetwork();
|
||||
getReplCoord()->waitForElectionFinish_forTest();
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(1,
|
||||
countLogLinesContaining("not becoming primary, we have been superceded already"));
|
||||
|
Loading…
Reference in New Issue
Block a user