From 9539c8f8be32a057901d785561c7a6ab62413723 Mon Sep 17 00:00:00 2001 From: Xuerui Fa Date: Mon, 10 Feb 2020 15:17:04 -0500 Subject: [PATCH] SERVER-45995: Test that nodes in initial sync maintain and gossip the commit point create mode 100644 jstests/replsets/initial_sync_nodes_maintain_and_gossip_commit_point.js --- ..._nodes_maintain_and_gossip_commit_point.js | 139 ++++++++++++++++++ src/mongo/db/repl/initial_syncer.cpp | 8 + 2 files changed, 147 insertions(+) create mode 100644 jstests/replsets/initial_sync_nodes_maintain_and_gossip_commit_point.js diff --git a/jstests/replsets/initial_sync_nodes_maintain_and_gossip_commit_point.js b/jstests/replsets/initial_sync_nodes_maintain_and_gossip_commit_point.js new file mode 100644 index 00000000000..4f04286d0ca --- /dev/null +++ b/jstests/replsets/initial_sync_nodes_maintain_and_gossip_commit_point.js @@ -0,0 +1,139 @@ +/* + * Tests that nodes in initial sync update their commit point and gossip their commit point to + * other nodes. This is done starting with a 3 node replica set with one non-voting secondary. We + * disconnect the non-voting secondary from the other nodes and then add a new node to the replica + * set. Thus, the non-voting secondary can only communicate with the initial syncing node. We then + * hang the initial syncing node at various stages, perform multiple majority writes to advance the + * commit point, and verify that the commit point on the initial syncing node is updated. Finally, + * we ensure that the disconnected secondary is able to update its commit point from the initial + * syncing node via heartbeats. + * @tags: [requires_fcv_44] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +const name = jsTestName(); +const rst = new ReplSetTest({ + name, + nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0, votes: 0}}], + useBridge: true +}); +rst.startSet(); +rst.initiateWithHighElectionTimeout(); + +const primary = rst.getPrimary(); +const primaryDb = primary.getDB("test"); +const secondary = rst.getSecondaries()[0]; +const nonVotingSecondary = rst.getSecondaries()[1]; + +// Insert initial data to ensure that the repl set is initialized correctly. +assert.commandWorked(primaryDb.test.insert({a: 1})); +rst.awaitReplication(); + +/* + * Fetches the 'lastCommittedOpTime' field of the given node. + */ +function getLastCommittedOpTime(conn) { + const replSetStatus = assert.commandWorked(conn.adminCommand({replSetGetStatus: 1})); + return replSetStatus.optimes.lastCommittedOpTime; +} + +const firstCommitPoint = getLastCommittedOpTime(primary); + +// Disconnect the non-voting secondary from the other nodes so that it won't update its commit point +// from the other nodes' heartbeats. +nonVotingSecondary.disconnect(secondary); +nonVotingSecondary.disconnect(primary); + +jsTest.log("Adding a new node to the replica set"); +const initialSyncNode = rst.add({ + setParameter: { + // Make sure our initial sync node does not sync from the node with votes 0. + 'failpoint.forceSyncSourceCandidate': + tojson({mode: 'alwaysOn', data: {"hostAndPort": primary.host}}), + } +}); + +const hangAfterGettingBeginFetchingTimestamp = + configureFailPoint(initialSyncNode, "initialSyncHangAfterGettingBeginFetchingTimestamp"); +const hangBeforeCompletingOplogFetching = + configureFailPoint(initialSyncNode, "initialSyncHangBeforeCompletingOplogFetching"); +const hangBeforeFinish = configureFailPoint(initialSyncNode, "initialSyncHangBeforeFinish"); + +jsTestLog("Waiting for initial sync node to reach initial sync state"); +rst.reInitiate(); +rst.waitForState(initialSyncNode, ReplSetTest.State.STARTUP_2); + +// Hang the initial sync node after it sets 'beginFetchingTimestamp' to ensure that the node will +// not set 'stopTimestamp' until after we perform the next write. +hangAfterGettingBeginFetchingTimestamp.wait(); + +// Do a write to be applied by the initial sync node. This is necessary because we cannot update the +// commit point if the term of `lastAppliedOpTime` is not equal to the current term. +assert.commandWorked(primaryDb.test.insert({b: 2})); +// Wait for knowledge of the last commit point to advance to the last write on the primary and +// secondary. +rst.awaitLastOpCommitted(undefined, [primary, secondary]); + +const secondCommitPointPrimary = getLastCommittedOpTime(primary); +const secondCommitPointSecondary = getLastCommittedOpTime(secondary); + +// Verify that the commit point has advanced on the primary and secondary. +assert.eq(1, rs.compareOpTimes(secondCommitPointPrimary, firstCommitPoint)); +assert.eq(1, rs.compareOpTimes(secondCommitPointSecondary, firstCommitPoint)); + +// Verify that the commit point has *NOT* advanced on the non-voting secondary. +let commitPointNonVotingSecondary = getLastCommittedOpTime(nonVotingSecondary); +assert.eq(rs.compareOpTimes(commitPointNonVotingSecondary, secondCommitPointPrimary), -1); + +// Allow the node to proceed to the oplog applying phase of initial sync and ensure that the oplog +// fetcher thread is still running. +hangAfterGettingBeginFetchingTimestamp.off(); +hangBeforeCompletingOplogFetching.wait(); + +// The initial sync node will be able to update its commit point after fetching this write, since it +// set its `lastAppliedOpTime` from the previous write. +assert.commandWorked(primaryDb.test.insert({c: 3})); +// Wait for knowledge of the last commit point to advance to the last write on the primary and +// secondary. +rst.awaitLastOpCommitted(undefined, [primary, secondary]); + +const thirdCommitPointPrimary = getLastCommittedOpTime(primary); +const thirdCommitPointSecondary = getLastCommittedOpTime(secondary); + +// Verify that the commit point has advanced on the primary and secondary. +assert.eq(1, rs.compareOpTimes(thirdCommitPointPrimary, secondCommitPointPrimary)); +assert.eq(1, rs.compareOpTimes(thirdCommitPointSecondary, secondCommitPointSecondary)); + +// Verify that the commit point has *NOT* advanced on the non-voting secondary. +commitPointNonVotingSecondary = getLastCommittedOpTime(nonVotingSecondary); +assert.eq(rs.compareOpTimes(commitPointNonVotingSecondary, thirdCommitPointPrimary), -1); + +// Allow the initial sync node to complete oplog fetching but hang it before it completes initial +// sync. +hangBeforeCompletingOplogFetching.off(); +hangBeforeFinish.wait(); + +// Verify that the initial sync node receives the commit point from the primary via oplog fetching. +// We only assert that it is greater than or equal to the second commit point because it is possible +// for the commit point to not yet be advanced by the primary when we fetch the oplog entry. +assert.gte(rs.compareOpTimes(getLastCommittedOpTime(initialSyncNode), secondCommitPointPrimary), 0); + +// Verify that the non-voting secondary has received the updated commit point via heartbeats from +// the initial sync node. +assert.soon( + () => rs.compareOpTimes(getLastCommittedOpTime(nonVotingSecondary), + getLastCommittedOpTime(initialSyncNode)) >= 0, + "The nonVotingSecondary was unable to update its commit point from the initial sync node"); + +// Since the primary sends a shut down command to all secondaries in `rst.stopSet()`, we reconnect +// the disconnected secondary to the primary to allow it to be shut down. +nonVotingSecondary.reconnect(primary); + +hangBeforeFinish.off(); +waitForState(initialSyncNode, ReplSetTest.State.SECONDARY); + +rst.stopSet(); +})(); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 575dfaee65d..6c57a4efc79 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -114,6 +114,9 @@ MONGO_FAIL_POINT_DEFINE(failInitialSyncBeforeApplyingBatch); // Failpoint which fasserts if applying a batch fails. MONGO_FAIL_POINT_DEFINE(initialSyncFassertIfApplyingBatchFails); +// Failpoint which causes the initial sync function to hang before stopping the oplog fetcher. +MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCompletingOplogFetching); + // Failpoints for synchronization, shared with cloners. extern FailPoint initialSyncFuzzerSynchronizationPoint1; extern FailPoint initialSyncFuzzerSynchronizationPoint2; @@ -1545,6 +1548,11 @@ void InitialSyncer::_rollbackCheckerCheckForRollbackCallback( return; } + if (MONGO_unlikely(initialSyncHangBeforeCompletingOplogFetching.shouldFail())) { + LOGV2(4599500, "initialSyncHangBeforeCompletingOplogFetching fail point enabled."); + initialSyncHangBeforeCompletingOplogFetching.pauseWhileSet(); + } + // Success! onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, _lastApplied); }