diff --git a/jstests/replsets/last_op_visible.js b/jstests/replsets/last_op_visible.js new file mode 100644 index 00000000000..75fdd952ba9 --- /dev/null +++ b/jstests/replsets/last_op_visible.js @@ -0,0 +1,60 @@ +// Test the populating lastOpVisible field of the ReplSetMetadata. +// First we do a writeConcern-free write and ensure that a local read will return the same +// lastOpVisible, and that majority read with afterOpTime of lastOpVisible will return it as well. +// We then confirm that a writeConcern majority write will be seen as the lastVisibleOp by a +// majority read. + +(function() { +"use strict"; + +var name = 'lastOpVisible'; +var replTest = new ReplSetTest({name: name, nodes: 3}); + +replTest.startSet(); +replTest.initiate(); + +var primary = replTest.getPrimary(); + +if (!primary.getDB(name).serverStatus().storageEngine.supportsCommittedReads) { + print("Skipping read_majority.js since storageEngine doesn't support it."); + return; +} + +// Do an insert without writeConcern. +var res = primary.getDB(name).runCommandWithMetadata("insert", + {insert: name, documents: [{x:1}]}, + {"$replData": 1}); +assert.commandWorked(res.commandReply); +var last_op_visible = res.metadata["$replData"].lastOpVisible; + +// A find should return the same lastVisibleOp. +res = primary.getDB(name).runCommandWithMetadata("find", + {find: name, readConcern: {level: "local"}}, + {"$replData": 1}); +assert.commandWorked(res.commandReply); +assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible); + +// A majority readConcern with afterOpTime: lastOpVisible should also return the same lastVisibleOp. +res = primary.getDB(name).runCommandWithMetadata( + "find", + {find: name, readConcern: {level: "majority", afterOpTime: last_op_visible}}, + {"$replData": 1}); +assert.commandWorked(res.commandReply); +assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible); + +// Do an insert without writeConcern. +res = primary.getDB(name).runCommandWithMetadata( + "insert", + {insert: name, documents: [{x:1}], writeConcern: {w: "majority"}}, + {"$replData": 1}); +assert.commandWorked(res.commandReply); +last_op_visible = res.metadata["$replData"].lastOpVisible; + +// A majority readConcern should return the same lastVisibleOp. +res = primary.getDB(name).runCommandWithMetadata("find", + {find: name, readConcern: {level: "majority"}}, + {"$replData": 1}); +assert.commandWorked(res.commandReply); +assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible); + +}()); diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 88c8b31d77b..9fcfb6032e1 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1286,9 +1286,9 @@ bool Command::run(OperationContext* txn, repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator(); + repl::ReadConcernArgs readConcern; { // parse and validate ReadConcernArgs - repl::ReadConcernArgs readConcern; auto readConcernParseStatus = readConcern.initialize(request.getCommandArgs()); if (!readConcernParseStatus.isOK()) { replyBuilder->setMetadata(rpc::makeEmptyMetadata()) @@ -1343,18 +1343,19 @@ bool Command::run(OperationContext* txn, BSONObjBuilder metadataBob; - // For commands from mongos, append some info to help getLastError(w) work. - // TODO: refactor out of here as part of SERVER-18326 bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - - if (isReplSet && ShardingState::get(txn)->enabled()) { - rpc::ShardingMetadata( - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp().getTimestamp(), - replCoord->getElectionId()).writeToMetadata(&metadataBob); - } - if (isReplSet) { - replCoord->prepareReplResponseMetadata(request, &metadataBob); + repl::OpTime lastOpTimeFromClient = + repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + replCoord->prepareReplResponseMetadata( + request, lastOpTimeFromClient, readConcern, &metadataBob); + + // For commands from mongos, append some info to help getLastError(w) work. + // TODO: refactor out of here as part of SERVER-18326 + if (ShardingState::get(txn)->enabled()) { + rpc::ShardingMetadata(lastOpTimeFromClient.getTimestamp(), replCoord->getElectionId()) + .writeToMetadata(&metadataBob); + } } auto cmdResponse = replyBuilderBob.done(); diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 49957d07dad..ef0bc1972f8 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -617,6 +617,8 @@ public: * Prepares a metadata object describing the current term, primary, and lastOp information. */ virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request, + const OpTime& lastOpTimeFromClient, + const ReadConcernArgs& readConcern, BSONObjBuilder* builder) = 0; /** @@ -678,6 +680,11 @@ public: */ virtual void dropAllSnapshots() = 0; + /** + * Gets the latest OpTime of the currentCommittedSnapshot. + */ + virtual OpTime getCurrentCommittedSnapshotOpTime() = 0; + protected: ReplicationCoordinator(); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 0a3f7e75dc4..0ed558f7edb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -248,7 +248,7 @@ ReplicaSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() { return _rsConfig; } -OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshot_forTest() { +OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() { stdx::lock_guard lk(_mutex); if (_currentCommittedSnapshot) { return _currentCommittedSnapshot->opTime; @@ -2749,6 +2749,8 @@ void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish( } void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request, + const OpTime& lastOpTimeFromClient, + const ReadConcernArgs& readConcern, BSONObjBuilder* builder) { if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) { rpc::ReplSetMetadata metadata; @@ -2757,6 +2759,8 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish, this, stdx::placeholders::_1, + lastOpTimeFromClient, + readConcern, &metadata)); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { @@ -2771,8 +2775,15 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI } void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish( - const ReplicationExecutor::CallbackArgs& cbData, rpc::ReplSetMetadata* metadata) { - _topCoord->prepareReplResponseMetadata(metadata, getLastCommittedOpTime()); + const ReplicationExecutor::CallbackArgs& cbData, + const OpTime& lastOpTimeFromClient, + const ReadConcernArgs& readConcern, + rpc::ReplSetMetadata* metadata) { + OpTime lastReadableOpTime = readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern + ? getCurrentCommittedSnapshotOpTime() + : getMyLastOptime(); + OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime); + _topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, getLastCommittedOpTime()); } bool ReplicationCoordinatorImpl::isV1ElectionProtocol() { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index c282c22eaab..85001ad2256 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -259,6 +259,8 @@ public: long long* responseTerm) override; void prepareReplResponseMetadata(const rpc::RequestInterface&, + const OpTime& lastOpTimeFromClient, + const ReadConcernArgs& readConcern, BSONObjBuilder* builder) override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, @@ -282,6 +284,8 @@ public: virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override; + virtual OpTime getCurrentCommittedSnapshotOpTime() override; + // ================== Test support API =================== /** @@ -295,11 +299,6 @@ public: */ ReplicaSetConfig getReplicaSetConfig_forTest(); - /** - * Gets the latest OpTime of the currentCommittedSnapshot. - */ - OpTime getCurrentCommittedSnapshot_forTest(); - /** * Simple wrapper around _setLastOptime_inlock to make it easier to test. */ @@ -547,6 +546,8 @@ private: * Bottom half of prepareReplResponseMetadata. */ void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData, + const OpTime& lastOpTimeFromClient, + const ReadConcernArgs& readConcern, rpc::ReplSetMetadata* metadata); /** * Scheduled to cause the ReplicationCoordinator to reconsider any state that might diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index c1e2185c235..fde39e60fd0 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -2586,8 +2586,7 @@ TEST_F(ReplCoordTest, MetadataUpdatesLastCommittedOpTime) { << 2 << "primaryIndex" << 2 << "term" << 1))); getReplCoord()->processReplSetMetadata(metadata.getValue()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); - ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), - getReplCoord()->getCurrentCommittedSnapshot_forTest()); + ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime()); // lower OpTime, should not change StatusWith metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON( @@ -2678,21 +2677,21 @@ TEST_F(ReplCoordTest, SnapshotCommitting) { // ensure current snapshot follows price is right rules (closest but not greater than) getReplCoord()->setMyLastOptime(time3); - ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshot_forTest()); + ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); getReplCoord()->setMyLastOptime(time4); - ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshot_forTest()); + ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime()); // ensure current snapshot will not advance beyond existing snapshots getReplCoord()->setMyLastOptime(time6); - ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshot_forTest()); + ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime()); // ensure current snapshot updates on new snapshot if we are that far getReplCoord()->onSnapshotCreate(time6, SnapshotName(4)); - ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshot_forTest()); + ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshotOpTime()); // ensure dropping all snapshots should reset the current committed snapshot getReplCoord()->dropAllSnapshots(); - ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshot_forTest()); + ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime()); } TEST_F(ReplCoordTest, MoveOpTimeForward) { diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index d7aca36960d..4d35c3e641d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -337,6 +337,8 @@ Status ReplicationCoordinatorMock::processReplSetDeclareElectionWinner( } void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request, + const OpTime& lastOpTimeFromClient, + const ReadConcernArgs& readConcern, BSONObjBuilder* builder) {} Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, @@ -368,5 +370,9 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho void ReplicationCoordinatorMock::dropAllSnapshots() {} +OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() { + return OpTime(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 7547937b9f4..b0d82de2f1d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -193,6 +193,8 @@ public: long long* responseTerm); void prepareReplResponseMetadata(const rpc::RequestInterface& request, + const OpTime& lastOpTimeFromClient, + const ReadConcernArgs& readConcern, BSONObjBuilder* builder) override; virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, @@ -214,6 +216,8 @@ public: virtual void dropAllSnapshots() override; + virtual OpTime getCurrentCommittedSnapshotOpTime() override; + private: AtomicUInt64 _snapshotNameGenerator; const ReplSettings _settings; diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 3ea0694493d..9e1876e3869 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -386,6 +386,7 @@ public: * Prepares a BSONObj describing the current term, primary, and lastOp information. */ virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const = 0; /** diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index e9b1b0987d4..7af3f46ec9b 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2163,11 +2163,12 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS } void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, const OpTime& lastCommittedOpTime) const { // TODO(dannenberg): change the third arg to be the lastOpTimeVisible. *metadata = rpc::ReplSetMetadata(_term, lastCommittedOpTime, - lastCommittedOpTime, + lastVisibleOpTime, _rsConfig.getConfigVersion(), _currentPrimaryIndex); } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 526d1f4320f..ef3b8f4adf7 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -219,6 +219,7 @@ public: virtual bool stepDownIfPending(); virtual Date_t getStepDownTime() const; virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata, + const OpTime& lastVisibleOpTime, const OpTime& lastCommitttedOpTime) const; Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm);