mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-19554 populate lastOpVisible in ReplSetMetadata
This commit is contained in:
parent
f075019a1d
commit
b77f580a7c
60
jstests/replsets/last_op_visible.js
Normal file
60
jstests/replsets/last_op_visible.js
Normal file
@ -0,0 +1,60 @@
|
||||
// Test the populating lastOpVisible field of the ReplSetMetadata.
|
||||
// First we do a writeConcern-free write and ensure that a local read will return the same
|
||||
// lastOpVisible, and that majority read with afterOpTime of lastOpVisible will return it as well.
|
||||
// We then confirm that a writeConcern majority write will be seen as the lastVisibleOp by a
|
||||
// majority read.
|
||||
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
var name = 'lastOpVisible';
|
||||
var replTest = new ReplSetTest({name: name, nodes: 3});
|
||||
|
||||
replTest.startSet();
|
||||
replTest.initiate();
|
||||
|
||||
var primary = replTest.getPrimary();
|
||||
|
||||
if (!primary.getDB(name).serverStatus().storageEngine.supportsCommittedReads) {
|
||||
print("Skipping read_majority.js since storageEngine doesn't support it.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Do an insert without writeConcern.
|
||||
var res = primary.getDB(name).runCommandWithMetadata("insert",
|
||||
{insert: name, documents: [{x:1}]},
|
||||
{"$replData": 1});
|
||||
assert.commandWorked(res.commandReply);
|
||||
var last_op_visible = res.metadata["$replData"].lastOpVisible;
|
||||
|
||||
// A find should return the same lastVisibleOp.
|
||||
res = primary.getDB(name).runCommandWithMetadata("find",
|
||||
{find: name, readConcern: {level: "local"}},
|
||||
{"$replData": 1});
|
||||
assert.commandWorked(res.commandReply);
|
||||
assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible);
|
||||
|
||||
// A majority readConcern with afterOpTime: lastOpVisible should also return the same lastVisibleOp.
|
||||
res = primary.getDB(name).runCommandWithMetadata(
|
||||
"find",
|
||||
{find: name, readConcern: {level: "majority", afterOpTime: last_op_visible}},
|
||||
{"$replData": 1});
|
||||
assert.commandWorked(res.commandReply);
|
||||
assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible);
|
||||
|
||||
// Do an insert without writeConcern.
|
||||
res = primary.getDB(name).runCommandWithMetadata(
|
||||
"insert",
|
||||
{insert: name, documents: [{x:1}], writeConcern: {w: "majority"}},
|
||||
{"$replData": 1});
|
||||
assert.commandWorked(res.commandReply);
|
||||
last_op_visible = res.metadata["$replData"].lastOpVisible;
|
||||
|
||||
// A majority readConcern should return the same lastVisibleOp.
|
||||
res = primary.getDB(name).runCommandWithMetadata("find",
|
||||
{find: name, readConcern: {level: "majority"}},
|
||||
{"$replData": 1});
|
||||
assert.commandWorked(res.commandReply);
|
||||
assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible);
|
||||
|
||||
}());
|
@ -1286,9 +1286,9 @@ bool Command::run(OperationContext* txn,
|
||||
|
||||
repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
|
||||
|
||||
repl::ReadConcernArgs readConcern;
|
||||
{
|
||||
// parse and validate ReadConcernArgs
|
||||
repl::ReadConcernArgs readConcern;
|
||||
auto readConcernParseStatus = readConcern.initialize(request.getCommandArgs());
|
||||
if (!readConcernParseStatus.isOK()) {
|
||||
replyBuilder->setMetadata(rpc::makeEmptyMetadata())
|
||||
@ -1343,18 +1343,19 @@ bool Command::run(OperationContext* txn,
|
||||
|
||||
BSONObjBuilder metadataBob;
|
||||
|
||||
// For commands from mongos, append some info to help getLastError(w) work.
|
||||
// TODO: refactor out of here as part of SERVER-18326
|
||||
bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
|
||||
|
||||
if (isReplSet && ShardingState::get(txn)->enabled()) {
|
||||
rpc::ShardingMetadata(
|
||||
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp().getTimestamp(),
|
||||
replCoord->getElectionId()).writeToMetadata(&metadataBob);
|
||||
}
|
||||
|
||||
if (isReplSet) {
|
||||
replCoord->prepareReplResponseMetadata(request, &metadataBob);
|
||||
repl::OpTime lastOpTimeFromClient =
|
||||
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
|
||||
replCoord->prepareReplResponseMetadata(
|
||||
request, lastOpTimeFromClient, readConcern, &metadataBob);
|
||||
|
||||
// For commands from mongos, append some info to help getLastError(w) work.
|
||||
// TODO: refactor out of here as part of SERVER-18326
|
||||
if (ShardingState::get(txn)->enabled()) {
|
||||
rpc::ShardingMetadata(lastOpTimeFromClient.getTimestamp(), replCoord->getElectionId())
|
||||
.writeToMetadata(&metadataBob);
|
||||
}
|
||||
}
|
||||
|
||||
auto cmdResponse = replyBuilderBob.done();
|
||||
|
@ -617,6 +617,8 @@ public:
|
||||
* Prepares a metadata object describing the current term, primary, and lastOp information.
|
||||
*/
|
||||
virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request,
|
||||
const OpTime& lastOpTimeFromClient,
|
||||
const ReadConcernArgs& readConcern,
|
||||
BSONObjBuilder* builder) = 0;
|
||||
|
||||
/**
|
||||
@ -678,6 +680,11 @@ public:
|
||||
*/
|
||||
virtual void dropAllSnapshots() = 0;
|
||||
|
||||
/**
|
||||
* Gets the latest OpTime of the currentCommittedSnapshot.
|
||||
*/
|
||||
virtual OpTime getCurrentCommittedSnapshotOpTime() = 0;
|
||||
|
||||
protected:
|
||||
ReplicationCoordinator();
|
||||
};
|
||||
|
@ -248,7 +248,7 @@ ReplicaSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() {
|
||||
return _rsConfig;
|
||||
}
|
||||
|
||||
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshot_forTest() {
|
||||
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() {
|
||||
stdx::lock_guard<stdx::mutex> lk(_mutex);
|
||||
if (_currentCommittedSnapshot) {
|
||||
return _currentCommittedSnapshot->opTime;
|
||||
@ -2749,6 +2749,8 @@ void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish(
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request,
|
||||
const OpTime& lastOpTimeFromClient,
|
||||
const ReadConcernArgs& readConcern,
|
||||
BSONObjBuilder* builder) {
|
||||
if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
|
||||
rpc::ReplSetMetadata metadata;
|
||||
@ -2757,6 +2759,8 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI
|
||||
stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish,
|
||||
this,
|
||||
stdx::placeholders::_1,
|
||||
lastOpTimeFromClient,
|
||||
readConcern,
|
||||
&metadata));
|
||||
|
||||
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
|
||||
@ -2771,8 +2775,15 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish(
|
||||
const ReplicationExecutor::CallbackArgs& cbData, rpc::ReplSetMetadata* metadata) {
|
||||
_topCoord->prepareReplResponseMetadata(metadata, getLastCommittedOpTime());
|
||||
const ReplicationExecutor::CallbackArgs& cbData,
|
||||
const OpTime& lastOpTimeFromClient,
|
||||
const ReadConcernArgs& readConcern,
|
||||
rpc::ReplSetMetadata* metadata) {
|
||||
OpTime lastReadableOpTime = readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern
|
||||
? getCurrentCommittedSnapshotOpTime()
|
||||
: getMyLastOptime();
|
||||
OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
|
||||
_topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, getLastCommittedOpTime());
|
||||
}
|
||||
|
||||
bool ReplicationCoordinatorImpl::isV1ElectionProtocol() {
|
||||
|
@ -259,6 +259,8 @@ public:
|
||||
long long* responseTerm) override;
|
||||
|
||||
void prepareReplResponseMetadata(const rpc::RequestInterface&,
|
||||
const OpTime& lastOpTimeFromClient,
|
||||
const ReadConcernArgs& readConcern,
|
||||
BSONObjBuilder* builder) override;
|
||||
|
||||
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
|
||||
@ -282,6 +284,8 @@ public:
|
||||
|
||||
virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override;
|
||||
|
||||
virtual OpTime getCurrentCommittedSnapshotOpTime() override;
|
||||
|
||||
// ================== Test support API ===================
|
||||
|
||||
/**
|
||||
@ -295,11 +299,6 @@ public:
|
||||
*/
|
||||
ReplicaSetConfig getReplicaSetConfig_forTest();
|
||||
|
||||
/**
|
||||
* Gets the latest OpTime of the currentCommittedSnapshot.
|
||||
*/
|
||||
OpTime getCurrentCommittedSnapshot_forTest();
|
||||
|
||||
/**
|
||||
* Simple wrapper around _setLastOptime_inlock to make it easier to test.
|
||||
*/
|
||||
@ -547,6 +546,8 @@ private:
|
||||
* Bottom half of prepareReplResponseMetadata.
|
||||
*/
|
||||
void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData,
|
||||
const OpTime& lastOpTimeFromClient,
|
||||
const ReadConcernArgs& readConcern,
|
||||
rpc::ReplSetMetadata* metadata);
|
||||
/**
|
||||
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
|
||||
|
@ -2586,8 +2586,7 @@ TEST_F(ReplCoordTest, MetadataUpdatesLastCommittedOpTime) {
|
||||
<< 2 << "primaryIndex" << 2 << "term" << 1)));
|
||||
getReplCoord()->processReplSetMetadata(metadata.getValue());
|
||||
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
|
||||
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1),
|
||||
getReplCoord()->getCurrentCommittedSnapshot_forTest());
|
||||
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime());
|
||||
|
||||
// lower OpTime, should not change
|
||||
StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
|
||||
@ -2678,21 +2677,21 @@ TEST_F(ReplCoordTest, SnapshotCommitting) {
|
||||
|
||||
// ensure current snapshot follows price is right rules (closest but not greater than)
|
||||
getReplCoord()->setMyLastOptime(time3);
|
||||
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshot_forTest());
|
||||
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime());
|
||||
getReplCoord()->setMyLastOptime(time4);
|
||||
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshot_forTest());
|
||||
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime());
|
||||
|
||||
// ensure current snapshot will not advance beyond existing snapshots
|
||||
getReplCoord()->setMyLastOptime(time6);
|
||||
ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshot_forTest());
|
||||
ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime());
|
||||
|
||||
// ensure current snapshot updates on new snapshot if we are that far
|
||||
getReplCoord()->onSnapshotCreate(time6, SnapshotName(4));
|
||||
ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshot_forTest());
|
||||
ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshotOpTime());
|
||||
|
||||
// ensure dropping all snapshots should reset the current committed snapshot
|
||||
getReplCoord()->dropAllSnapshots();
|
||||
ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshot_forTest());
|
||||
ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime());
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, MoveOpTimeForward) {
|
||||
|
@ -337,6 +337,8 @@ Status ReplicationCoordinatorMock::processReplSetDeclareElectionWinner(
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request,
|
||||
const OpTime& lastOpTimeFromClient,
|
||||
const ReadConcernArgs& readConcern,
|
||||
BSONObjBuilder* builder) {}
|
||||
|
||||
Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
|
||||
@ -368,5 +370,9 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho
|
||||
|
||||
void ReplicationCoordinatorMock::dropAllSnapshots() {}
|
||||
|
||||
OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() {
|
||||
return OpTime();
|
||||
}
|
||||
|
||||
} // namespace repl
|
||||
} // namespace mongo
|
||||
|
@ -193,6 +193,8 @@ public:
|
||||
long long* responseTerm);
|
||||
|
||||
void prepareReplResponseMetadata(const rpc::RequestInterface& request,
|
||||
const OpTime& lastOpTimeFromClient,
|
||||
const ReadConcernArgs& readConcern,
|
||||
BSONObjBuilder* builder) override;
|
||||
|
||||
virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
|
||||
@ -214,6 +216,8 @@ public:
|
||||
|
||||
virtual void dropAllSnapshots() override;
|
||||
|
||||
virtual OpTime getCurrentCommittedSnapshotOpTime() override;
|
||||
|
||||
private:
|
||||
AtomicUInt64 _snapshotNameGenerator;
|
||||
const ReplSettings _settings;
|
||||
|
@ -386,6 +386,7 @@ public:
|
||||
* Prepares a BSONObj describing the current term, primary, and lastOp information.
|
||||
*/
|
||||
virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
|
||||
const OpTime& lastVisibleOpTime,
|
||||
const OpTime& lastCommittedOpTime) const = 0;
|
||||
|
||||
/**
|
||||
|
@ -2163,11 +2163,12 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
|
||||
}
|
||||
|
||||
void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
|
||||
const OpTime& lastVisibleOpTime,
|
||||
const OpTime& lastCommittedOpTime) const {
|
||||
// TODO(dannenberg): change the third arg to be the lastOpTimeVisible.
|
||||
*metadata = rpc::ReplSetMetadata(_term,
|
||||
lastCommittedOpTime,
|
||||
lastCommittedOpTime,
|
||||
lastVisibleOpTime,
|
||||
_rsConfig.getConfigVersion(),
|
||||
_currentPrimaryIndex);
|
||||
}
|
||||
|
@ -219,6 +219,7 @@ public:
|
||||
virtual bool stepDownIfPending();
|
||||
virtual Date_t getStepDownTime() const;
|
||||
virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
|
||||
const OpTime& lastVisibleOpTime,
|
||||
const OpTime& lastCommitttedOpTime) const;
|
||||
Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
|
||||
long long* responseTerm);
|
||||
|
Loading…
Reference in New Issue
Block a user