mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-53373: Pin resharding durable history across restart.
This commit is contained in:
parent
485d115ac7
commit
674d57fc70
@ -414,6 +414,7 @@ let viewsCommandTests = {
|
||||
movePrimary: {skip: "Tested in sharding/movePrimary1.js"},
|
||||
multicast: {skip: isUnrelated},
|
||||
netstat: {skip: isAnInternalCommand},
|
||||
pinHistoryReplicated: {skip: isAnInternalCommand},
|
||||
ping: {command: {ping: 1}},
|
||||
planCacheClear: {command: {planCacheClear: "view"}, expectFailure: true},
|
||||
planCacheClearFilters: {command: {planCacheClearFilters: "view"}, expectFailure: true},
|
||||
|
@ -239,6 +239,7 @@ const allCommands = {
|
||||
},
|
||||
mergeChunks: {skip: isPrimaryOnly},
|
||||
moveChunk: {skip: isPrimaryOnly},
|
||||
pinHistoryReplicated: {skip: isAnInternalCommand},
|
||||
ping: {skip: isNotAUserDataRead},
|
||||
planCacheClear: {skip: isNotAUserDataRead},
|
||||
planCacheClearFilters: {skip: isNotAUserDataRead},
|
||||
|
91
jstests/replsets/pin_history_after_restart.js
Normal file
91
jstests/replsets/pin_history_after_restart.js
Normal file
@ -0,0 +1,91 @@
|
||||
/**
|
||||
* This test uses the test only `pinHistoryReplicated` command to exercise the DurableHistoryPins
|
||||
* across restart.
|
||||
*
|
||||
* The `pinHistoryReplicated` command will pin the oldest timestamp at the requested time (with an
|
||||
* optional rounding up to oldest). If the pin is successfully, the pinned time is written to a
|
||||
* document inside `mdb_testing.pinned_timestamp`.
|
||||
*
|
||||
* When the `TestingDurableHistoryPin` is registered at startup, it will repin the oldest timestamp
|
||||
* at the minimum of all documents written to `mdb_testing.pinned_timestamp`.
|
||||
*
|
||||
* This test does the following:
|
||||
*
|
||||
* 1) Pin the timestamp with an artificially small value to take advantage of rounding.
|
||||
* 2) See that the timestamp is pinned via serverStatus.
|
||||
* 3) Restart the node, see the pin persist.
|
||||
* 4) Make a new pin at "original pin" + 1
|
||||
* 5) Remove the pin at "original pin"
|
||||
* 6) Restart the node, see the incremented pin value from serverStatus.
|
||||
*
|
||||
* @tags: [requires_fcv_49, requires_majority_read_concern, requires_persistence]
|
||||
*/
|
||||
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
function incTs(ts) {
|
||||
return Timestamp(ts.t, ts.i + 1);
|
||||
}
|
||||
|
||||
let replTest = new ReplSetTest({
|
||||
name: "use_history_after_restart",
|
||||
nodes: 1,
|
||||
nodeOptions: {
|
||||
setParameter: {
|
||||
// Set the history window to zero to more aggressively advance the oldest timestamp.
|
||||
minSnapshotHistoryWindowInSeconds: 0,
|
||||
logComponentVerbosity: tojson({storage: {recovery: 2}}),
|
||||
}
|
||||
}
|
||||
});
|
||||
let nodes = replTest.startSet();
|
||||
replTest.initiate();
|
||||
let primary = replTest.getPrimary();
|
||||
|
||||
// Pin with an arbitrarily small timestamp. Let the rounding tell us where the pin ended up. The
|
||||
// write to the `mdb_testing.pinned_timestamp` collection is not logged/replayed during replication
|
||||
// recovery. Repinning across startup happens before replication recovery. Do a majority write for
|
||||
// predictability of the test.
|
||||
let result = assert.commandWorked(primary.adminCommand(
|
||||
{"pinHistoryReplicated": Timestamp(100, 1), round: true, writeConcern: {w: "majority"}}));
|
||||
let origPinTs = result["pinTs"];
|
||||
jsTestLog({"First pin result": result});
|
||||
|
||||
// Do some additional writes that would traditionally advance the oldest timestamp.
|
||||
for (var idx = 0; idx < 10; ++idx) {
|
||||
assert.commandWorked(primary.getDB("test")["coll"].insert({}));
|
||||
}
|
||||
assert.commandWorked(primary.getDB("test")["coll"].insert({}, {writeConcern: {w: "majority"}}));
|
||||
|
||||
// Observe that the pinned timestamp matches the command response.
|
||||
let serverStatus = assert.commandWorked(primary.adminCommand("serverStatus"));
|
||||
let pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
|
||||
assert.eq(origPinTs, pinnedTs);
|
||||
|
||||
// Restarting the primary should preserve the pin.
|
||||
replTest.restart(primary);
|
||||
primary = replTest.getPrimary();
|
||||
serverStatus = assert.commandWorked(primary.adminCommand("serverStatus"));
|
||||
pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
|
||||
assert.eq(origPinTs, pinnedTs);
|
||||
|
||||
// Create a new pin at "ts + 1". This should succeed, but have no effect.
|
||||
result = assert.commandWorked(
|
||||
primary.adminCommand({"pinHistoryReplicated": incTs(result["pinTs"]), round: false}));
|
||||
jsTestLog({"Second pin result": result});
|
||||
let newPinTs = result["pinTs"];
|
||||
assert.eq(newPinTs, incTs(origPinTs));
|
||||
|
||||
// Remove the old pin at "ts".
|
||||
assert.commandWorked(primary.getDB("mdb_testing")["pinned_timestamp"].remove(
|
||||
{"pinTs": origPinTs}, {writeConcern: {w: "majority"}}));
|
||||
|
||||
// Restarting the node should observe a pin at "ts + 1".
|
||||
replTest.restart(primary);
|
||||
primary = replTest.getPrimary();
|
||||
serverStatus = assert.commandWorked(primary.adminCommand("serverStatus"));
|
||||
pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
|
||||
assert.eq(newPinTs, pinnedTs);
|
||||
replTest.stopSet();
|
||||
})();
|
61
jstests/replsets/unpin_history_after_rollback.js
Normal file
61
jstests/replsets/unpin_history_after_rollback.js
Normal file
@ -0,0 +1,61 @@
|
||||
/**
|
||||
* This test uses the test only `pinHistoryReplicated` command to exercise DurableHistoryPins
|
||||
* across rollback.
|
||||
*
|
||||
* The `pinHistoryReplicated` command will pin the oldest timestamp at the requested time (with an
|
||||
* optional rounding up to oldest). If the pin is successfully, the pinned time is written to a
|
||||
* document inside `mdb_testing.pinned_timestamp`.
|
||||
*
|
||||
* For the purposes of this test, the write is timestamped as its replicated in the oplog. If the
|
||||
* write gets rolled back, this test ensures any pinning effect it had is removed.
|
||||
*
|
||||
* @tags: [requires_fcv_49, requires_majority_read_concern, requires_persistence]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
load("jstests/replsets/libs/rollback_test.js");
|
||||
|
||||
let rst = new ReplSetTest({
|
||||
name: "history_rollback_test",
|
||||
nodes: 3,
|
||||
useBridge: true,
|
||||
nodeOptions: {setParameter: {logComponentVerbosity: tojson({storage: {recovery: 2}})}}
|
||||
});
|
||||
rst.startSet();
|
||||
const config = rst.getReplSetConfig();
|
||||
config.members[2].priority = 0;
|
||||
config.settings = {
|
||||
chainingAllowed: false
|
||||
};
|
||||
rst.initiateWithHighElectionTimeout(config);
|
||||
|
||||
let rollbackTest = new RollbackTest("history_rollback_test", rst);
|
||||
let rollbackNode = rollbackTest.getPrimary();
|
||||
rollbackTest.transitionToRollbackOperations();
|
||||
|
||||
let serverStatus = rollbackNode.adminCommand("serverStatus");
|
||||
// When there is no pin, the `min pinned timestamp` value is `Timestamp::max()`. I don't believe
|
||||
// there is a JS constant for `Timestamp::max()`, so we capture it now for later.
|
||||
let maxTimestampValue =
|
||||
serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
|
||||
|
||||
// Perform a write that pins history. This write will be rolled back.
|
||||
let result = assert.commandWorked(
|
||||
rollbackNode.adminCommand({"pinHistoryReplicated": Timestamp(100, 1), round: true}));
|
||||
let origPinTs = result["pinTs"];
|
||||
|
||||
serverStatus = rollbackNode.adminCommand("serverStatus");
|
||||
let pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
|
||||
assert.eq(origPinTs, pinnedTs);
|
||||
|
||||
rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
|
||||
rollbackTest.transitionToSyncSourceOperationsDuringRollback();
|
||||
rollbackTest.transitionToSteadyStateOperations();
|
||||
|
||||
serverStatus = rollbackNode.adminCommand("serverStatus");
|
||||
pinnedTs = serverStatus["wiredTiger"]["snapshot-window-settings"]["min pinned timestamp"];
|
||||
assert.eq(maxTimestampValue, pinnedTs);
|
||||
|
||||
rst.stopSet();
|
||||
})();
|
@ -495,6 +495,7 @@ let testCases = {
|
||||
movePrimary: {skip: "does not accept read or write concern"},
|
||||
multicast: {skip: "does not accept read or write concern"},
|
||||
netstat: {skip: "internal command"},
|
||||
pinHistoryReplicated: {skip: "internal command"},
|
||||
ping: {skip: "does not accept read or write concern"},
|
||||
planCacheClear: {skip: "does not accept read or write concern"},
|
||||
planCacheClearFilters: {skip: "does not accept read or write concern"},
|
||||
|
@ -2131,6 +2131,7 @@ env.Library(
|
||||
# please add that library as a private libdep of
|
||||
# mongod_initializers.
|
||||
'$BUILD_DIR/mongo/client/clientdriver_minimal',
|
||||
'$BUILD_DIR/mongo/db/commands/shell_protocol',
|
||||
'$BUILD_DIR/mongo/db/repl/tenant_migration_donor_service',
|
||||
'$BUILD_DIR/mongo/s/grid',
|
||||
'$BUILD_DIR/mongo/s/sessions_collection_sharded',
|
||||
|
@ -365,7 +365,6 @@ env.Library(
|
||||
],
|
||||
)
|
||||
|
||||
# Commands required by the shell to connect
|
||||
env.Library(
|
||||
target="shell_protocol",
|
||||
source=[
|
||||
@ -376,6 +375,8 @@ env.Library(
|
||||
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
|
||||
'$BUILD_DIR/mongo/db/catalog/collection',
|
||||
'$BUILD_DIR/mongo/db/commands',
|
||||
'$BUILD_DIR/mongo/db/db_raii',
|
||||
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
|
||||
'test_commands_enabled',
|
||||
],
|
||||
)
|
||||
|
@ -33,6 +33,8 @@
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/db/commands/test_commands.h"
|
||||
|
||||
#include "mongo/base/init.h"
|
||||
#include "mongo/db/catalog/capped_utils.h"
|
||||
#include "mongo/db/catalog/collection.h"
|
||||
@ -41,13 +43,20 @@
|
||||
#include "mongo/db/commands/test_commands_enabled.h"
|
||||
#include "mongo/db/db_raii.h"
|
||||
#include "mongo/db/index_builds_coordinator.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/op_observer.h"
|
||||
#include "mongo/db/ops/insert.h"
|
||||
#include "mongo/db/query/internal_plans.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
namespace {
|
||||
const NamespaceString kDurableHistoryTestNss("mdb_testing.pinned_timestamp");
|
||||
const std::string kTestingDurableHistoryPinName = "_testing";
|
||||
} // namespace
|
||||
|
||||
using repl::UnreplicatedWritesBlock;
|
||||
using std::endl;
|
||||
using std::string;
|
||||
@ -213,4 +222,105 @@ public:
|
||||
};
|
||||
|
||||
MONGO_REGISTER_TEST_COMMAND(EmptyCapped);
|
||||
|
||||
class DurableHistoryReplicatedTestCmd : public BasicCommand {
|
||||
public:
|
||||
DurableHistoryReplicatedTestCmd() : BasicCommand("pinHistoryReplicated") {}
|
||||
|
||||
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return AllowedOnSecondary::kNever;
|
||||
}
|
||||
|
||||
bool supportsWriteConcern(const BSONObj& cmd) const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool adminOnly() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool requiresAuth() const override {
|
||||
return false;
|
||||
}
|
||||
|
||||
// No auth needed because it only works when enabled via command line.
|
||||
void addRequiredPrivileges(const std::string& dbname,
|
||||
const BSONObj& cmdObj,
|
||||
std::vector<Privilege>* out) const override {}
|
||||
|
||||
std::string help() const override {
|
||||
return "pins the oldest timestamp";
|
||||
}
|
||||
|
||||
bool run(OperationContext* opCtx,
|
||||
const std::string& dbname,
|
||||
const BSONObj& cmdObj,
|
||||
BSONObjBuilder& result) override {
|
||||
const Timestamp requestedPinTs = cmdObj.firstElement().timestamp();
|
||||
const bool round = cmdObj["round"].booleanSafe();
|
||||
|
||||
AutoGetOrCreateDb db(opCtx, kDurableHistoryTestNss.db(), MODE_IX);
|
||||
Lock::CollectionLock collLock(opCtx, kDurableHistoryTestNss, MODE_IX);
|
||||
if (!CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(
|
||||
opCtx,
|
||||
kDurableHistoryTestNss)) { // someone else may have beat us to it.
|
||||
uassertStatusOK(userAllowedCreateNS(kDurableHistoryTestNss));
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
CollectionOptions defaultCollectionOptions;
|
||||
uassertStatusOK(
|
||||
db.getDb()->userCreateNS(opCtx, kDurableHistoryTestNss, defaultCollectionOptions));
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
AutoGetCollection autoColl(opCtx, kDurableHistoryTestNss, MODE_IX);
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
|
||||
// Note, this write will replicate to secondaries, but a secondary will not in-turn pin the
|
||||
// oldest timestamp. The write otherwise must be timestamped in a storage engine table with
|
||||
// logging disabled. This is to test that rolling back the written document also results in
|
||||
// the pin being lifted.
|
||||
Timestamp pinTs =
|
||||
uassertStatusOK(opCtx->getServiceContext()->getStorageEngine()->pinOldestTimestamp(
|
||||
opCtx, kTestingDurableHistoryPinName, requestedPinTs, round));
|
||||
|
||||
uassertStatusOK(autoColl->insertDocument(
|
||||
opCtx,
|
||||
InsertStatement(fixDocumentForInsert(opCtx, BSON("pinTs" << pinTs)).getValue()),
|
||||
nullptr));
|
||||
wuow.commit();
|
||||
|
||||
result.append("requestedPinTs", requestedPinTs);
|
||||
result.append("pinTs", pinTs);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
MONGO_REGISTER_TEST_COMMAND(DurableHistoryReplicatedTestCmd);
|
||||
|
||||
std::string TestingDurableHistoryPin::getName() {
|
||||
return kTestingDurableHistoryPinName;
|
||||
}
|
||||
|
||||
boost::optional<Timestamp> TestingDurableHistoryPin::calculatePin(OperationContext* opCtx) {
|
||||
AutoGetCollectionForRead autoColl(opCtx, kDurableHistoryTestNss);
|
||||
if (!autoColl) {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
Timestamp ret = Timestamp::max();
|
||||
auto cursor = autoColl->getCursor(opCtx);
|
||||
for (auto doc = cursor->next(); doc; doc = cursor->next()) {
|
||||
const BSONObj obj = doc.get().data.toBson();
|
||||
const Timestamp ts = obj["pinTs"].timestamp();
|
||||
ret = std::min(ret, ts);
|
||||
}
|
||||
|
||||
if (ret == Timestamp::min()) {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
} // namespace mongo
|
||||
|
51
src/mongo/db/commands/test_commands.h
Normal file
51
src/mongo/db/commands/test_commands.h
Normal file
@ -0,0 +1,51 @@
|
||||
/**
|
||||
* Copyright (C) 2021-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/storage/durable_history_pin.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* This hook pairs with the `pinHistoryReplicated` test-only command. The test command will pin the
|
||||
* oldest timestamp and perform a write into `mdb_testing.pinned_timestamp`. This hook knows how to
|
||||
* read that collection and re-pin any history requests after a restart or across rollback.
|
||||
*/
|
||||
class TestingDurableHistoryPin : public DurableHistoryPin {
|
||||
public:
|
||||
std::string getName() override;
|
||||
|
||||
boost::optional<Timestamp> calculatePin(OperationContext* opCtx) override;
|
||||
};
|
||||
} // namespace mongo
|
@ -67,6 +67,8 @@
|
||||
#include "mongo/db/commands/feature_compatibility_version.h"
|
||||
#include "mongo/db/commands/feature_compatibility_version_gen.h"
|
||||
#include "mongo/db/commands/shutdown.h"
|
||||
#include "mongo/db/commands/test_commands.h"
|
||||
#include "mongo/db/commands/test_commands_enabled.h"
|
||||
#include "mongo/db/concurrency/d_concurrency.h"
|
||||
#include "mongo/db/concurrency/flow_control_ticketholder.h"
|
||||
#include "mongo/db/concurrency/lock_state.h"
|
||||
@ -153,6 +155,7 @@
|
||||
#include "mongo/db/stats/counters.h"
|
||||
#include "mongo/db/storage/backup_cursor_hooks.h"
|
||||
#include "mongo/db/storage/control/storage_control.h"
|
||||
#include "mongo/db/storage/durable_history_pin.h"
|
||||
#include "mongo/db/storage/encryption_hooks.h"
|
||||
#include "mongo/db/storage/flow_control.h"
|
||||
#include "mongo/db/storage/flow_control_parameters_gen.h"
|
||||
@ -492,6 +495,11 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
|
||||
LOGV2(20536, "Flow Control is enabled on this deployment");
|
||||
}
|
||||
|
||||
{
|
||||
Lock::GlobalWrite globalLk(startupOpCtx.get());
|
||||
DurableHistoryRegistry::get(serviceContext)->reconcilePins(startupOpCtx.get());
|
||||
}
|
||||
|
||||
// Notify the storage engine that startup is completed before repair exits below, as repair sets
|
||||
// the upgrade flag to true.
|
||||
serviceContext->getStorageEngine()->notifyStartupComplete();
|
||||
@ -998,8 +1006,11 @@ void setUpReplication(ServiceContext* serviceContext) {
|
||||
void setUpObservers(ServiceContext* serviceContext) {
|
||||
auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
|
||||
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
|
||||
DurableHistoryRegistry::get(serviceContext)
|
||||
->registerPin(std::make_unique<ReshardingHistoryHook>());
|
||||
opObserverRegistry->addObserver(std::make_unique<OpObserverShardingImpl>());
|
||||
opObserverRegistry->addObserver(std::make_unique<ShardServerOpObserver>());
|
||||
opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>());
|
||||
} else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
|
||||
opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
|
||||
opObserverRegistry->addObserver(std::make_unique<ConfigServerOpObserver>());
|
||||
@ -1381,6 +1392,16 @@ int mongod_main(int argc, char* argv[]) {
|
||||
}
|
||||
}();
|
||||
|
||||
{
|
||||
// Create the durable history registry prior to calling the `setUp*` methods. They may
|
||||
// depend on it existing at this point.
|
||||
DurableHistoryRegistry::set(service, std::make_unique<DurableHistoryRegistry>());
|
||||
DurableHistoryRegistry* registry = DurableHistoryRegistry::get(service);
|
||||
if (getTestCommandsEnabled()) {
|
||||
registry->registerPin(std::make_unique<TestingDurableHistoryPin>());
|
||||
}
|
||||
}
|
||||
|
||||
setUpCollectionShardingState(service);
|
||||
setUpCatalog(service);
|
||||
setUpReplication(service);
|
||||
|
@ -111,8 +111,6 @@ Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) {
|
||||
uassertStatusOK(waitForWriteConcern(
|
||||
opCtx.get(), generatedOpTime, WriteConcerns::kMajorityWriteConcern, &result));
|
||||
|
||||
// TODO notify storage engine to pin the minFetchTimestamp
|
||||
|
||||
return generatedOpTime.getTimestamp();
|
||||
}
|
||||
|
||||
|
@ -33,6 +33,9 @@
|
||||
|
||||
#include "mongo/db/s/resharding/resharding_op_observer.h"
|
||||
|
||||
#include "mongo/db/catalog/collection.h"
|
||||
#include "mongo/db/db_raii.h"
|
||||
#include "mongo/db/s/resharding/donor_document_gen.h"
|
||||
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
@ -50,13 +53,126 @@ std::shared_ptr<ReshardingCoordinatorObserver> getReshardingCoordinatorObserver(
|
||||
return (*instance)->getObserver();
|
||||
}
|
||||
|
||||
boost::optional<Timestamp> parseNewMinFetchTimestampValue(const BSONObj& obj) {
|
||||
auto doc = ReshardingDonorDocument::parse(IDLParserErrorContext("Resharding"), obj);
|
||||
if (doc.getState() == DonorStateEnum::kDonatingInitialData) {
|
||||
return doc.getMinFetchTimestamp().get();
|
||||
} else {
|
||||
return boost::none;
|
||||
}
|
||||
}
|
||||
|
||||
boost::optional<Timestamp> _calculatePin(OperationContext* opCtx) {
|
||||
// We recalculate the pin by looking at all documents inside the resharding donor
|
||||
// collection. The caller may or may not be in a transaction. If the caller is in a transaction,
|
||||
// we intentionally read any uncommitted writes it has made.
|
||||
//
|
||||
// If there are concurrent transactions updating different keys in the donor collection, there
|
||||
// can be write skew resulting in the wrong pin, including leaking a resource. We enforce the
|
||||
// collection is held in exclusive mode to prevent this.
|
||||
|
||||
// TODO: Uncomment
|
||||
// invariant(opCtx->lockState()->isCollectionLockedForMode(
|
||||
// NamespaceString::kDonorReshardingOperationsNamespace, LockMode::MODE_X));
|
||||
|
||||
// If the RecoveryUnit already had an open snapshot, keep the snapshot open. Otherwise abandon
|
||||
// the snapshot when exitting the function.
|
||||
auto scopeGuard = makeGuard([&] { opCtx->recoveryUnit()->abandonSnapshot(); });
|
||||
if (opCtx->recoveryUnit()->isActive()) {
|
||||
scopeGuard.dismiss();
|
||||
}
|
||||
|
||||
AutoGetCollectionForRead autoColl(opCtx, NamespaceString::kDonorReshardingOperationsNamespace);
|
||||
if (!autoColl) {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
Timestamp ret = Timestamp::max();
|
||||
auto cursor = autoColl->getCursor(opCtx);
|
||||
for (auto doc = cursor->next(); doc; doc = cursor->next()) {
|
||||
if (auto fetchTs = parseNewMinFetchTimestampValue(doc.get().data.toBson()); fetchTs) {
|
||||
ret = std::min(ret, fetchTs.get());
|
||||
}
|
||||
}
|
||||
|
||||
if (ret == Timestamp::max()) {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void _doPin(OperationContext* opCtx) {
|
||||
auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
|
||||
boost::optional<Timestamp> pin = _calculatePin(opCtx);
|
||||
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
|
||||
if (!pin) {
|
||||
storageEngine->unpinOldestTimestamp(ReshardingHistoryHook::kName.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
StatusWith<Timestamp> res = storageEngine->pinOldestTimestamp(
|
||||
opCtx, ReshardingHistoryHook::kName.toString(), pin.get(), false);
|
||||
if (!res.isOK()) {
|
||||
if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeReplSet) {
|
||||
// The pin has failed, but we're in standalone mode. Ignore the error.
|
||||
return;
|
||||
}
|
||||
|
||||
const auto state = replCoord->getMemberState();
|
||||
if (state.primary()) {
|
||||
// If we're a primary, the pin can fail and the error should bubble up and fail
|
||||
// resharding.
|
||||
uassertStatusOK(res);
|
||||
} else if (state.secondary()) {
|
||||
// The pin timestamp can be earlier than the oplog entry being processed. Thus
|
||||
// the oldest timestamp can race ahead of the pin request. It's not ideal this
|
||||
// node cannot participate in donating documents for the cloning phase, but this
|
||||
// is the most robust path forward. Ignore this case.
|
||||
LOGV2_WARNING(5384104,
|
||||
"This node is unable to pin history for resharding",
|
||||
"requestedTs"_attr = pin.get());
|
||||
} else {
|
||||
// For recovery cases we also ignore the error. The expected scenario is the pin
|
||||
// request is no longer needed, but the write to delete the pin was rolled
|
||||
// back. The write to delete the pin won't be issued until the collection
|
||||
// cloning phase of resharding is majority committed. Thus there should be no
|
||||
// consequence to observing this error. Ignore this case.
|
||||
LOGV2(5384103,
|
||||
"The requested pin was unavailable, but should also be unnecessary",
|
||||
"requestedTs"_attr = pin.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
boost::optional<Timestamp> ReshardingHistoryHook::calculatePin(OperationContext* opCtx) {
|
||||
return _calculatePin(opCtx);
|
||||
}
|
||||
|
||||
ReshardingOpObserver::ReshardingOpObserver() = default;
|
||||
|
||||
ReshardingOpObserver::~ReshardingOpObserver() = default;
|
||||
|
||||
void ReshardingOpObserver::onInserts(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
OptionalCollectionUUID uuid,
|
||||
std::vector<InsertStatement>::const_iterator begin,
|
||||
std::vector<InsertStatement>::const_iterator end,
|
||||
bool fromMigrate) {
|
||||
// If a document is inserted into the resharding donor collection with a `minFetchTimestamp`, we
|
||||
// assume the document was inserted as part of initial sync and do nothing to pin history.
|
||||
return;
|
||||
}
|
||||
|
||||
void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
|
||||
if (args.nss == NamespaceString::kDonorReshardingOperationsNamespace) {
|
||||
// Primaries and secondaries should execute pinning logic when observing changes to the
|
||||
// donor resharding document.
|
||||
_doPin(opCtx);
|
||||
}
|
||||
|
||||
// This is a no-op if either replication is not enabled or this node is a secondary
|
||||
if (!repl::ReplicationCoordinator::get(opCtx)->isReplEnabled() ||
|
||||
!opCtx->writesAreReplicated()) {
|
||||
@ -77,4 +193,16 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn
|
||||
}
|
||||
}
|
||||
|
||||
void ReshardingOpObserver::onDelete(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
OptionalCollectionUUID uuid,
|
||||
StmtId stmtId,
|
||||
bool fromMigrate,
|
||||
const boost::optional<BSONObj>& deletedDoc) {
|
||||
if (nss == NamespaceString::kDonorReshardingOperationsNamespace) {
|
||||
_doPin(opCtx);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -31,8 +31,23 @@
|
||||
|
||||
#include "mongo/db/op_observer.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
#include "mongo/db/storage/durable_history_pin.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class ReshardingHistoryHook : public DurableHistoryPin {
|
||||
public:
|
||||
static constexpr StringData kName = "resharding"_sd;
|
||||
|
||||
std::string getName() override {
|
||||
return kName.toString();
|
||||
}
|
||||
|
||||
boost::optional<Timestamp> calculatePin(OperationContext* opCtx) override;
|
||||
};
|
||||
|
||||
/**
|
||||
* OpObserver for observing writes to internal resharding collections. This includes collections
|
||||
* such as config.reshardingOperations, config.localReshardingOperations.donor, and
|
||||
@ -82,7 +97,7 @@ public:
|
||||
OptionalCollectionUUID uuid,
|
||||
std::vector<InsertStatement>::const_iterator begin,
|
||||
std::vector<InsertStatement>::const_iterator end,
|
||||
bool fromMigrate) override {}
|
||||
bool fromMigrate) override;
|
||||
|
||||
void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override;
|
||||
|
||||
@ -95,7 +110,7 @@ public:
|
||||
OptionalCollectionUUID uuid,
|
||||
StmtId stmtId,
|
||||
bool fromMigrate,
|
||||
const boost::optional<BSONObj>& deletedDoc) override {}
|
||||
const boost::optional<BSONObj>& deletedDoc) override;
|
||||
|
||||
void onInternalOpMessage(OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
|
@ -118,6 +118,7 @@ env.Library(
|
||||
target='storage_control',
|
||||
source=[
|
||||
'control/storage_control.cpp',
|
||||
'durable_history_pin.cpp',
|
||||
],
|
||||
LIBDEPS=[
|
||||
'checkpointer',
|
||||
@ -125,6 +126,7 @@ env.Library(
|
||||
],
|
||||
LIBDEPS_PRIVATE=[
|
||||
'$BUILD_DIR/mongo/base',
|
||||
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
|
||||
'$BUILD_DIR/mongo/db/service_context',
|
||||
'storage_options',
|
||||
],
|
||||
@ -586,6 +588,7 @@ env.Library(
|
||||
'$BUILD_DIR/mongo/db/resumable_index_builds_idl',
|
||||
'$BUILD_DIR/mongo/db/storage/storage_repair_observer',
|
||||
'$BUILD_DIR/mongo/db/vector_clock',
|
||||
'storage_control',
|
||||
'storage_util',
|
||||
'two_phase_index_build_knobs_idl',
|
||||
],
|
||||
|
104
src/mongo/db/storage/durable_history_pin.cpp
Normal file
104
src/mongo/db/storage/durable_history_pin.cpp
Normal file
@ -0,0 +1,104 @@
|
||||
/**
|
||||
* Copyright (C) 2021-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
|
||||
|
||||
#define LOGV2_FOR_RECOVERY(ID, DLEVEL, MESSAGE, ...) \
|
||||
LOGV2_DEBUG_OPTIONS(ID, DLEVEL, {logv2::LogComponent::kStorageRecovery}, MESSAGE, ##__VA_ARGS__)
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/db/storage/durable_history_pin.h"
|
||||
|
||||
#include "mongo/bson/bsonmisc.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/db_raii.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
namespace {
|
||||
const auto getDurableHistoryRegistry =
|
||||
ServiceContext::declareDecoration<std::unique_ptr<DurableHistoryRegistry>>();
|
||||
|
||||
} // namespace
|
||||
|
||||
DurableHistoryRegistry* DurableHistoryRegistry::get(ServiceContext* service) {
|
||||
return getDurableHistoryRegistry(service).get();
|
||||
}
|
||||
|
||||
DurableHistoryRegistry* DurableHistoryRegistry::get(ServiceContext& service) {
|
||||
return getDurableHistoryRegistry(service).get();
|
||||
}
|
||||
|
||||
DurableHistoryRegistry* DurableHistoryRegistry::get(OperationContext* opCtx) {
|
||||
return get(opCtx->getClient()->getServiceContext());
|
||||
}
|
||||
|
||||
void DurableHistoryRegistry::set(ServiceContext* service,
|
||||
std::unique_ptr<DurableHistoryRegistry> registry) {
|
||||
auto& decoratedRegistry = getDurableHistoryRegistry(service);
|
||||
decoratedRegistry = std::move(registry);
|
||||
}
|
||||
|
||||
void DurableHistoryRegistry::registerPin(std::unique_ptr<DurableHistoryPin> pin) {
|
||||
_pins.push_back(std::move(pin));
|
||||
}
|
||||
|
||||
void DurableHistoryRegistry::reconcilePins(OperationContext* opCtx) {
|
||||
StorageEngine* engine = opCtx->getServiceContext()->getStorageEngine();
|
||||
if (!engine->supportsRecoveryTimestamp()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& pin : _pins) {
|
||||
boost::optional<Timestamp> pinTs = pin->calculatePin(opCtx);
|
||||
LOGV2_FOR_RECOVERY(5384102,
|
||||
2,
|
||||
"Reconciling timestamp pin.",
|
||||
"name"_attr = pin->getName(),
|
||||
"ts"_attr = pinTs);
|
||||
if (pinTs) {
|
||||
auto swTimestamp =
|
||||
engine->pinOldestTimestamp(opCtx, pin->getName(), pinTs.get(), false);
|
||||
if (!swTimestamp.isOK()) {
|
||||
LOGV2_WARNING(5384105,
|
||||
"Unable to repin oldest timestamp",
|
||||
"service"_attr = pin->getName(),
|
||||
"request"_attr = pinTs.get(),
|
||||
"error"_attr = swTimestamp.getStatus());
|
||||
}
|
||||
} else {
|
||||
engine->unpinOldestTimestamp(pin->getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace mongo
|
82
src/mongo/db/storage/durable_history_pin.h
Normal file
82
src/mongo/db/storage/durable_history_pin.h
Normal file
@ -0,0 +1,82 @@
|
||||
/**
|
||||
* Copyright (C) 2021-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/optional/optional.hpp>
|
||||
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class DurableHistoryPin {
|
||||
public:
|
||||
virtual ~DurableHistoryPin() {}
|
||||
|
||||
virtual std::string getName() = 0;
|
||||
|
||||
virtual boost::optional<Timestamp> calculatePin(OperationContext* opCtx) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Services that want to preserve storage engine history across restarts or replication rollback
|
||||
* should create a class that implements `DurableHistoryPin` and register an instance of that class
|
||||
* at startup prior to the first call of `reconcilePins`.
|
||||
*/
|
||||
class DurableHistoryRegistry {
|
||||
public:
|
||||
static DurableHistoryRegistry* get(ServiceContext* service);
|
||||
static DurableHistoryRegistry* get(ServiceContext& service);
|
||||
static DurableHistoryRegistry* get(OperationContext* ctx);
|
||||
|
||||
static void set(ServiceContext* service, std::unique_ptr<DurableHistoryRegistry> registry);
|
||||
|
||||
void registerPin(std::unique_ptr<DurableHistoryPin> pin);
|
||||
|
||||
/**
|
||||
* Iterates through each registered pin and takes one of two actions:
|
||||
*
|
||||
* 1) If the pin returns an engaged boost::optional<Timestamp>, forward that pinned timestamp to
|
||||
* the storage engine using the pins name.
|
||||
* 2) If the pin returns boost::none, unpin any resources held by the storage engine on behalf
|
||||
* of the pins name.
|
||||
*
|
||||
* If a requested pin fails, a log will be issued, but the process will otherwise continue.
|
||||
*/
|
||||
void reconcilePins(OperationContext* opCtx);
|
||||
|
||||
private:
|
||||
std::vector<std::unique_ptr<DurableHistoryPin>> _pins;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
@ -386,7 +386,8 @@ public:
|
||||
MONGO_UNREACHABLE;
|
||||
}
|
||||
|
||||
virtual StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
|
||||
virtual StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) {
|
||||
MONGO_UNREACHABLE;
|
||||
|
@ -675,8 +675,12 @@ public:
|
||||
* | requested >= oldest | false/true | <OK, requested timestamp> |
|
||||
* | requested < oldest | false | <SnapshotTooOld> |
|
||||
* | requested < oldest | true | <OK, oldest timestamp> |
|
||||
*
|
||||
* If the input OperationContext is in a WriteUnitOfWork, an `onRollback` handler will be
|
||||
* registered to return the pin for the `requestingServiceName` to the previous value.
|
||||
*/
|
||||
virtual StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
|
||||
virtual StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) = 0;
|
||||
|
||||
|
@ -43,6 +43,7 @@
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/db/storage/durable_catalog_feature_tracker.h"
|
||||
#include "mongo/db/storage/durable_history_pin.h"
|
||||
#include "mongo/db/storage/kv/kv_engine.h"
|
||||
#include "mongo/db/storage/kv/temporary_kv_record_store.h"
|
||||
#include "mongo/db/storage/storage_repair_observer.h"
|
||||
@ -1002,6 +1003,7 @@ StatusWith<Timestamp> StorageEngineImpl::recoverToStableTimestamp(OperationConte
|
||||
}
|
||||
|
||||
catalog::openCatalog(opCtx, state, swTimestamp.getValue());
|
||||
DurableHistoryRegistry::get(opCtx)->reconcilePins(opCtx);
|
||||
|
||||
LOGV2(22259,
|
||||
"recoverToStableTimestamp successful",
|
||||
@ -1253,8 +1255,12 @@ int64_t StorageEngineImpl::sizeOnDiskForDb(OperationContext* opCtx, StringData d
|
||||
}
|
||||
|
||||
StatusWith<Timestamp> StorageEngineImpl::pinOldestTimestamp(
|
||||
const std::string& requestingServiceName, Timestamp requestedTimestamp, bool roundUpIfTooOld) {
|
||||
return _engine->pinOldestTimestamp(requestingServiceName, requestedTimestamp, roundUpIfTooOld);
|
||||
OperationContext* opCtx,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) {
|
||||
return _engine->pinOldestTimestamp(
|
||||
opCtx, requestingServiceName, requestedTimestamp, roundUpIfTooOld);
|
||||
}
|
||||
|
||||
void StorageEngineImpl::unpinOldestTimestamp(const std::string& requestingServiceName) {
|
||||
|
@ -363,7 +363,8 @@ public:
|
||||
return _options.directoryPerDB;
|
||||
}
|
||||
|
||||
StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
|
||||
StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) override;
|
||||
|
||||
|
@ -196,7 +196,8 @@ public:
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
|
||||
StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) final {
|
||||
return Status::OK();
|
||||
|
@ -532,8 +532,13 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
|
||||
// 1) The feature stores the desired pin timestamp in some local collection.
|
||||
// 2) This temporary pinning lasts long enough for the catalog to be loaded and
|
||||
// accessed.
|
||||
uassertStatusOK(pinOldestTimestamp(
|
||||
kPinOldestTimestampAtStartupName, Timestamp(_oldestTimestamp.load()), false));
|
||||
{
|
||||
stdx::lock_guard<Latch> lk(_oldestTimestampPinRequestsMutex);
|
||||
uassertStatusOK(_pinOldestTimestamp(lk,
|
||||
kPinOldestTimestampAtStartupName,
|
||||
Timestamp(_oldestTimestamp.load()),
|
||||
false));
|
||||
}
|
||||
|
||||
setStableTimestamp(_recoveryTimestamp, false);
|
||||
|
||||
@ -2342,7 +2347,10 @@ Timestamp WiredTigerKVEngine::getPinnedOplog() const {
|
||||
}
|
||||
|
||||
StatusWith<Timestamp> WiredTigerKVEngine::pinOldestTimestamp(
|
||||
const std::string& requestingServiceName, Timestamp requestedTimestamp, bool roundUpIfTooOld) {
|
||||
OperationContext* opCtx,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) {
|
||||
stdx::lock_guard<Latch> lock(_oldestTimestampPinRequestsMutex);
|
||||
Timestamp oldest = getOldestTimestamp();
|
||||
LOGV2(5380104,
|
||||
@ -2352,6 +2360,40 @@ StatusWith<Timestamp> WiredTigerKVEngine::pinOldestTimestamp(
|
||||
"roundUpIfTooOld"_attr = roundUpIfTooOld,
|
||||
"currOldestTs"_attr = oldest);
|
||||
|
||||
Timestamp previousTimestamp = _oldestTimestampPinRequests[requestingServiceName];
|
||||
auto swPinnedTimestamp =
|
||||
_pinOldestTimestamp(lock, requestingServiceName, requestedTimestamp, roundUpIfTooOld);
|
||||
if (!swPinnedTimestamp.isOK()) {
|
||||
return swPinnedTimestamp;
|
||||
}
|
||||
|
||||
if (opCtx->lockState()->inAWriteUnitOfWork()) {
|
||||
// If we've moved the pin and are in a `WriteUnitOfWork`, assume the caller has a write that
|
||||
// should be atomic with this pin request. If the `WriteUnitOfWork` is rolled back, either
|
||||
// unpin the oldest timestamp or repin the previous value.
|
||||
opCtx->recoveryUnit()->onRollback(
|
||||
[this, svcName = requestingServiceName, previousTimestamp]() {
|
||||
if (previousTimestamp.isNull()) {
|
||||
unpinOldestTimestamp(svcName);
|
||||
} else {
|
||||
stdx::lock_guard<Latch> lock(_oldestTimestampPinRequestsMutex);
|
||||
// When a write is updating the value from an earlier pin to a later one, use
|
||||
// rounding to make a best effort to repin the earlier value.
|
||||
invariant(_pinOldestTimestamp(lock, svcName, previousTimestamp, true).isOK());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return swPinnedTimestamp;
|
||||
}
|
||||
|
||||
StatusWith<Timestamp> WiredTigerKVEngine::_pinOldestTimestamp(
|
||||
WithLock,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) {
|
||||
|
||||
Timestamp oldest = getOldestTimestamp();
|
||||
if (requestedTimestamp < oldest) {
|
||||
if (roundUpIfTooOld) {
|
||||
requestedTimestamp = oldest;
|
||||
@ -2363,7 +2405,6 @@ StatusWith<Timestamp> WiredTigerKVEngine::pinOldestTimestamp(
|
||||
}
|
||||
|
||||
_oldestTimestampPinRequests[requestingServiceName] = requestedTimestamp;
|
||||
|
||||
return {requestedTimestamp};
|
||||
}
|
||||
|
||||
@ -2371,9 +2412,10 @@ void WiredTigerKVEngine::unpinOldestTimestamp(const std::string& requestingServi
|
||||
stdx::lock_guard<Latch> lock(_oldestTimestampPinRequestsMutex);
|
||||
auto it = _oldestTimestampPinRequests.find(requestingServiceName);
|
||||
if (it == _oldestTimestampPinRequests.end()) {
|
||||
LOGV2_WARNING(5380105,
|
||||
"The requested service had nothing to unpin",
|
||||
"service"_attr = requestingServiceName);
|
||||
LOGV2_DEBUG(2,
|
||||
5380105,
|
||||
"The requested service had nothing to unpin",
|
||||
"service"_attr = requestingServiceName);
|
||||
return;
|
||||
}
|
||||
LOGV2(5380103,
|
||||
|
@ -46,6 +46,7 @@
|
||||
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
|
||||
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
#include "mongo/util/concurrency/with_lock.h"
|
||||
#include "mongo/util/elapsed_tracker.h"
|
||||
|
||||
namespace mongo {
|
||||
@ -345,10 +346,18 @@ public:
|
||||
return _clockSource;
|
||||
}
|
||||
|
||||
StatusWith<Timestamp> pinOldestTimestamp(const std::string& requestingServiceName,
|
||||
StatusWith<Timestamp> pinOldestTimestamp(OperationContext* opCtx,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld) override;
|
||||
|
||||
private:
|
||||
StatusWith<Timestamp> _pinOldestTimestamp(WithLock,
|
||||
const std::string& requestingServiceName,
|
||||
Timestamp requestedTimestamp,
|
||||
bool roundUpIfTooOld);
|
||||
|
||||
public:
|
||||
void unpinOldestTimestamp(const std::string& requestingServiceName) override;
|
||||
|
||||
std::map<std::string, Timestamp> getPinnedTimestampRequests();
|
||||
|
@ -392,6 +392,7 @@ TEST_F(WiredTigerKVEngineTest, IdentDrop) {
|
||||
}
|
||||
|
||||
TEST_F(WiredTigerKVEngineTest, TestBasicPinOldestTimestamp) {
|
||||
auto opCtxRaii = makeOperationContext();
|
||||
const Timestamp initTs = Timestamp(1, 0);
|
||||
|
||||
// Initialize the oldest timestamp.
|
||||
@ -406,8 +407,8 @@ TEST_F(WiredTigerKVEngineTest, TestBasicPinOldestTimestamp) {
|
||||
// as. This error case is not exercised in this test.
|
||||
const bool roundUpIfTooOld = false;
|
||||
// Pin the oldest timestamp to "3".
|
||||
auto pinnedTs =
|
||||
unittest::assertGet(_engine->pinOldestTimestamp("A", initTs + 3, roundUpIfTooOld));
|
||||
auto pinnedTs = unittest::assertGet(
|
||||
_engine->pinOldestTimestamp(opCtxRaii.get(), "A", initTs + 3, roundUpIfTooOld));
|
||||
// Assert that the pinning method returns the same timestamp as was requested.
|
||||
ASSERT_EQ(initTs + 3, pinnedTs);
|
||||
// Assert that pinning the oldest timestamp does not advance it.
|
||||
@ -432,6 +433,7 @@ TEST_F(WiredTigerKVEngineTest, TestBasicPinOldestTimestamp) {
|
||||
* of all active requests will be obeyed.
|
||||
*/
|
||||
TEST_F(WiredTigerKVEngineTest, TestMultiPinOldestTimestamp) {
|
||||
auto opCtxRaii = makeOperationContext();
|
||||
const Timestamp initTs = Timestamp(1, 0);
|
||||
|
||||
_engine->setOldestTimestamp(initTs, false);
|
||||
@ -441,13 +443,14 @@ TEST_F(WiredTigerKVEngineTest, TestMultiPinOldestTimestamp) {
|
||||
// as. This error case is not exercised in this test.
|
||||
const bool roundUpIfTooOld = false;
|
||||
// Have "A" pin the timestamp to "1".
|
||||
auto pinnedTs =
|
||||
unittest::assertGet(_engine->pinOldestTimestamp("A", initTs + 1, roundUpIfTooOld));
|
||||
auto pinnedTs = unittest::assertGet(
|
||||
_engine->pinOldestTimestamp(opCtxRaii.get(), "A", initTs + 1, roundUpIfTooOld));
|
||||
ASSERT_EQ(initTs + 1, pinnedTs);
|
||||
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
|
||||
|
||||
// Have "B" pin the timestamp to "2".
|
||||
pinnedTs = unittest::assertGet(_engine->pinOldestTimestamp("B", initTs + 2, roundUpIfTooOld));
|
||||
pinnedTs = unittest::assertGet(
|
||||
_engine->pinOldestTimestamp(opCtxRaii.get(), "B", initTs + 2, roundUpIfTooOld));
|
||||
ASSERT_EQ(initTs + 2, pinnedTs);
|
||||
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
|
||||
|
||||
@ -471,6 +474,7 @@ TEST_F(WiredTigerKVEngineTest, TestMultiPinOldestTimestamp) {
|
||||
* relative to the current oldest timestamp.
|
||||
*/
|
||||
TEST_F(WiredTigerKVEngineTest, TestPinOldestTimestampErrors) {
|
||||
auto opCtxRaii = makeOperationContext();
|
||||
const Timestamp initTs = Timestamp(10, 0);
|
||||
|
||||
_engine->setOldestTimestamp(initTs, false);
|
||||
@ -482,13 +486,13 @@ TEST_F(WiredTigerKVEngineTest, TestPinOldestTimestampErrors) {
|
||||
|
||||
// When rounding on error, the pin will succeed, but the return value will be the current oldest
|
||||
// timestamp instead of the requested value.
|
||||
auto pinnedTs =
|
||||
unittest::assertGet(_engine->pinOldestTimestamp("A", initTs - 1, roundUpIfTooOld));
|
||||
auto pinnedTs = unittest::assertGet(
|
||||
_engine->pinOldestTimestamp(opCtxRaii.get(), "A", initTs - 1, roundUpIfTooOld));
|
||||
ASSERT_EQ(initTs, pinnedTs);
|
||||
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
|
||||
|
||||
// Using "fail on error" will result in a not-OK return value.
|
||||
ASSERT_NOT_OK(_engine->pinOldestTimestamp("B", initTs - 1, failOnError));
|
||||
ASSERT_NOT_OK(_engine->pinOldestTimestamp(opCtxRaii.get(), "B", initTs - 1, failOnError));
|
||||
ASSERT_EQ(initTs, _engine->getOldestTimestamp());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user