mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-47672: Add minSnapshotHistoryWindowInSeconds and remove snapshot window adjustment logic
This commit is contained in:
parent
049b10ad31
commit
db37687407
@ -1,85 +0,0 @@
|
||||
// Test server validation of snapshot history window related server parameters' settings on startup
|
||||
// and via setParameter command.
|
||||
|
||||
(function() {
|
||||
'use strict';
|
||||
|
||||
load("jstests/noPassthrough/libs/server_parameter_helpers.js");
|
||||
|
||||
// Valid parameter values are in the range [0, infinity).
|
||||
testNumericServerParameter("maxTargetSnapshotHistoryWindowInSeconds",
|
||||
true /*isStartupParameter*/,
|
||||
true /*isRuntimeParameter*/,
|
||||
5 /*defaultValue*/,
|
||||
30 /*nonDefaultValidValue*/,
|
||||
true /*hasLowerBound*/,
|
||||
-1 /*lowerOutOfBounds*/,
|
||||
false /*hasUpperBound*/,
|
||||
"unused" /*upperOutOfBounds*/);
|
||||
|
||||
// Valid parameter values are in the range [0, 100].
|
||||
testNumericServerParameter("cachePressureThreshold",
|
||||
true /*isStartupParameter*/,
|
||||
true /*isRuntimeParameter*/,
|
||||
95 /*defaultValue*/,
|
||||
70 /*nonDefaultValidValue*/,
|
||||
true /*hasLowerBound*/,
|
||||
-1 /*lowerOutOfBounds*/,
|
||||
true /*hasUpperBound*/,
|
||||
101 /*upperOutOfBounds*/);
|
||||
|
||||
// Valid parameter values are in the range (0, 1).
|
||||
testNumericServerParameter("snapshotWindowMultiplicativeDecrease",
|
||||
true /*isStartupParameter*/,
|
||||
true /*isRuntimeParameter*/,
|
||||
0.75 /*defaultValue*/,
|
||||
0.50 /*nonDefaultValidValue*/,
|
||||
true /*hasLowerBound*/,
|
||||
-1 /*lowerOutOfBounds*/,
|
||||
true /*hasUpperBound*/,
|
||||
1.1 /*upperOutOfBounds*/);
|
||||
|
||||
// Valid parameter values are in the range [1, infinity).
|
||||
testNumericServerParameter("snapshotWindowAdditiveIncreaseSeconds",
|
||||
true /*isStartupParameter*/,
|
||||
true /*isRuntimeParameter*/,
|
||||
2 /*defaultValue*/,
|
||||
10 /*nonDefaultValidValue*/,
|
||||
true /*hasLowerBound*/,
|
||||
0 /*lowerOutOfBounds*/,
|
||||
false /*hasUpperBound*/,
|
||||
"unused" /*upperOutOfBounds*/);
|
||||
|
||||
// Valid parameter values are in the range [1, infinity).
|
||||
testNumericServerParameter("checkCachePressurePeriodSeconds",
|
||||
true /*isStartupParameter*/,
|
||||
true /*isRuntimeParameter*/,
|
||||
5 /*defaultValue*/,
|
||||
8 /*nonDefaultValidValue*/,
|
||||
true /*hasLowerBound*/,
|
||||
0 /*lowerOutOfBounds*/,
|
||||
false /*hasUpperBound*/,
|
||||
"unused" /*upperOutOfBounds*/);
|
||||
|
||||
// Valid parameter values are in the range [1, infinity).
|
||||
testNumericServerParameter("minMillisBetweenSnapshotWindowInc",
|
||||
true /*isStartupParameter*/,
|
||||
true /*isRuntimeParameter*/,
|
||||
500 /*defaultValue*/,
|
||||
2 * 1000 /*nonDefaultValidValue*/,
|
||||
true /*hasLowerBound*/,
|
||||
0 /*lowerOutOfBounds*/,
|
||||
false /*hasUpperBound*/,
|
||||
"unused" /*upperOutOfBounds*/);
|
||||
|
||||
// Valid parameter values are in the range [1, infinity).
|
||||
testNumericServerParameter("minMillisBetweenSnapshotWindowDec",
|
||||
true /*isStartupParameter*/,
|
||||
true /*isRuntimeParameter*/,
|
||||
500 /*defaultValue*/,
|
||||
2 * 1000 /*nonDefaultValidValue*/,
|
||||
true /*hasLowerBound*/,
|
||||
0 /*lowerOutOfBounds*/,
|
||||
false /*hasUpperBound*/,
|
||||
"unused" /*upperOutOfBounds*/);
|
||||
})();
|
64
jstests/noPassthrough/snapshot_history_window.js
Normal file
64
jstests/noPassthrough/snapshot_history_window.js
Normal file
@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Test setting minSnapshotHistoryWindowInSeconds at runtime and that server keeps history for up to
|
||||
* minSnapshotHistoryWindowInSeconds.
|
||||
*
|
||||
* @tags: [requires_majority_read_concern, requires_replication]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
const replSet = new ReplSetTest({nodes: 1});
|
||||
|
||||
replSet.startSet();
|
||||
replSet.initiate();
|
||||
|
||||
const collName = "coll";
|
||||
const primary = replSet.getPrimary();
|
||||
const primaryDB = primary.getDB('test');
|
||||
|
||||
const historyWindowSecs = 10;
|
||||
assert.commandWorked(primaryDB.adminCommand(
|
||||
{setParameter: 1, minSnapshotHistoryWindowInSeconds: historyWindowSecs}));
|
||||
|
||||
const startTime = Date.now();
|
||||
const insertTimestamp =
|
||||
assert.commandWorked(primaryDB.runCommand({insert: collName, documents: [{_id: 0}]}))
|
||||
.operationTime;
|
||||
let nextId = 1;
|
||||
|
||||
// Test snapshot window with 1s margin.
|
||||
const testMarginMS = 1000;
|
||||
|
||||
// Test that reading from a snapshot at insertTimestamp is valid for up to historyWindowSecs minus
|
||||
// the testMarginMS (as a buffer) to avoid races between the client's snapshot read and the update
|
||||
// of the oldest timestamp in the server.
|
||||
const testWindowMS = historyWindowSecs * 1000 - testMarginMS;
|
||||
while (Date.now() - startTime < testWindowMS) {
|
||||
// Test that reading from a snapshot at insertTimestamp is still valid.
|
||||
assert.commandWorked(primaryDB.runCommand(
|
||||
{find: collName, readConcern: {level: "snapshot", atClusterTime: insertTimestamp}}));
|
||||
|
||||
// Perform writes to advance stable timestamp and oldest timestamp. We use majority writeConcern
|
||||
// so that we can make sure the stable timestamp and the oldest timestamp are updated after each
|
||||
// insert.
|
||||
assert.commandWorked(primaryDB.runCommand(
|
||||
{insert: collName, documents: [{_id: nextId}], writeConcern: {w: "majority"}}));
|
||||
nextId++;
|
||||
|
||||
sleep(50);
|
||||
}
|
||||
|
||||
// Sleep enough to make sure the insertTimestamp falls off the snapshot window.
|
||||
sleep(testMarginMS * 2);
|
||||
// Perform another majority write to advance the stable timestamp and the oldest timestamp again.
|
||||
assert.commandWorked(primaryDB.runCommand(
|
||||
{insert: collName, documents: [{_id: nextId}], writeConcern: {w: "majority"}}));
|
||||
|
||||
// Test that reading from a snapshot at insertTimestamp returns SnapshotTooOld.
|
||||
assert.commandFailedWithCode(
|
||||
primaryDB.runCommand(
|
||||
{find: collName, readConcern: {level: "snapshot", atClusterTime: insertTimestamp}}),
|
||||
ErrorCodes.SnapshotTooOld);
|
||||
|
||||
replSet.stopSet();
|
||||
})();
|
@ -11,9 +11,9 @@
|
||||
|
||||
load("jstests/libs/global_snapshot_reads_util.js");
|
||||
|
||||
// TODO(SERVER-47672): Use minSnapshotHistoryWindowInSeconds instead.
|
||||
const options = {
|
||||
setParameter: {maxTargetSnapshotHistoryWindowInSeconds: 600}
|
||||
// Set a large snapshot window of 10 minutes for the test.
|
||||
setParameter: {minSnapshotHistoryWindowInSeconds: 600}
|
||||
};
|
||||
const replSet = new ReplSetTest({nodes: 3, nodeOptions: options});
|
||||
replSet.startSet();
|
||||
@ -30,7 +30,7 @@ snapshotReadsTest({
|
||||
}
|
||||
});
|
||||
|
||||
// Ensure "atClusterTime" is omitted from a regular (non-snapshot) reads.
|
||||
// Ensure "atClusterTime" is omitted from a regular (non-snapshot) read.
|
||||
primaryDB["collection"].insertOne({});
|
||||
const cursor = assert.commandWorked(primaryDB.runCommand({find: "test"})).cursor;
|
||||
assert(!cursor.hasOwnProperty("atClusterTime"));
|
||||
|
@ -14,9 +14,9 @@
|
||||
load("jstests/libs/global_snapshot_reads_util.js");
|
||||
load("jstests/sharding/libs/sharded_transactions_helpers.js");
|
||||
|
||||
// TODO(SERVER-47672): Use minSnapshotHistoryWindowInSeconds instead.
|
||||
const configOptions = {
|
||||
setParameter: {maxTargetSnapshotHistoryWindowInSeconds: 600}
|
||||
// Set a large snapshot window of 10 minutes for the test.
|
||||
setParameter: {minSnapshotHistoryWindowInSeconds: 600}
|
||||
};
|
||||
|
||||
const dbName = "test";
|
||||
|
@ -4,7 +4,7 @@
|
||||
*
|
||||
* With majority reads disabled, we are not guaranteed to be able to service reads at the majority
|
||||
* commit point. We can only provide reads within a window behind the primary's 'lastApplied'. The
|
||||
* size of that window is controlled by 'maxTargetSnapshotHistoryWindowInSeconds', which is 5
|
||||
* size of that window is controlled by 'minSnapshotHistoryWindowInSeconds', which is 5
|
||||
* seconds by default. If the commit point lag is greater than that amount, reading at that time
|
||||
* fails with a SnapshotTooOld error. Therefore, in order for the transaction to succeed, mongos
|
||||
* needs to pick a read timestamp that is not derived from the commit point, but rather from the
|
||||
@ -49,10 +49,10 @@ assert.commandWorked(mongosColl.insert({_id: 1, x: 10}, {writeConcern: {w: "majo
|
||||
// We want the secondary to lag for an amount generously greater than the history window.
|
||||
const secondary = rst.getSecondary();
|
||||
const maxWindowResult = assert.commandWorked(secondary.getDB("admin").runCommand(
|
||||
{"getParameter": 1, "maxTargetSnapshotHistoryWindowInSeconds": 1}));
|
||||
{"getParameter": 1, "minSnapshotHistoryWindowInSeconds": 1}));
|
||||
stopServerReplication(secondary);
|
||||
|
||||
const maxWindowInMS = maxWindowResult.maxTargetSnapshotHistoryWindowInSeconds * 1000;
|
||||
const maxWindowInMS = maxWindowResult.minSnapshotHistoryWindowInSeconds * 1000;
|
||||
const lagTimeMS = maxWindowInMS * 2;
|
||||
const startTime = Date.now();
|
||||
let nextId = 1000;
|
||||
|
@ -401,7 +401,6 @@ mongod = env.Program(
|
||||
'db/mongod_options',
|
||||
'db/ops/write_ops_parsers',
|
||||
'db/periodic_runner_job_abort_expired_transactions',
|
||||
'db/periodic_runner_job_decrease_snapshot_cache_pressure',
|
||||
'db/pipeline/aggregation',
|
||||
'db/pipeline/process_interface/mongod_process_interface_factory',
|
||||
'db/query_exec',
|
||||
|
@ -815,7 +815,7 @@ env.Library(
|
||||
'introspect',
|
||||
'lasterror',
|
||||
'query_exec',
|
||||
'snapshot_window_util',
|
||||
'snapshot_window_options',
|
||||
'transaction',
|
||||
'$BUILD_DIR/mongo/db/audit',
|
||||
'$BUILD_DIR/mongo/db/auth/auth',
|
||||
@ -1463,19 +1463,6 @@ env.Library(
|
||||
],
|
||||
)
|
||||
|
||||
env.Library(
|
||||
target='periodic_runner_job_decrease_snapshot_cache_pressure',
|
||||
source=[
|
||||
'periodic_runner_job_decrease_snapshot_cache_pressure.cpp',
|
||||
],
|
||||
LIBDEPS_PRIVATE=[
|
||||
'snapshot_window_options',
|
||||
'snapshot_window_util',
|
||||
'$BUILD_DIR/mongo/db/service_context',
|
||||
'$BUILD_DIR/mongo/util/periodic_runner',
|
||||
],
|
||||
)
|
||||
|
||||
env.Library(
|
||||
target='snapshot_window_options',
|
||||
source=[
|
||||
@ -1487,18 +1474,6 @@ env.Library(
|
||||
],
|
||||
)
|
||||
|
||||
env.Library(
|
||||
target='snapshot_window_util',
|
||||
source=[
|
||||
'snapshot_window_util.cpp',
|
||||
],
|
||||
LIBDEPS_PRIVATE=[
|
||||
'snapshot_window_options',
|
||||
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
|
||||
'$BUILD_DIR/mongo/db/service_context',
|
||||
],
|
||||
)
|
||||
|
||||
env.Library(
|
||||
target='signed_logical_time',
|
||||
source=[
|
||||
@ -1852,7 +1827,6 @@ envWithAsio.CppUnitTest(
|
||||
'server_options_test.cpp',
|
||||
'session_catalog_mongod_test.cpp',
|
||||
'session_catalog_test.cpp',
|
||||
'snapshot_window_util_test.cpp',
|
||||
'startup_warnings_mongod_test.cpp',
|
||||
'thread_client_test.cpp',
|
||||
'time_proof_service_test.cpp',
|
||||
@ -1928,7 +1902,6 @@ envWithAsio.CppUnitTest(
|
||||
'sessions_collection_mock',
|
||||
'signed_logical_time',
|
||||
'snapshot_window_options',
|
||||
'snapshot_window_util',
|
||||
'startup_warnings_mongod',
|
||||
'stats/fill_locker_info',
|
||||
'stats/transaction_stats',
|
||||
|
@ -104,7 +104,6 @@
|
||||
#include "mongo/db/op_observer_registry.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/periodic_runner_job_abort_expired_transactions.h"
|
||||
#include "mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h"
|
||||
#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
|
||||
#include "mongo/db/query/internal_plans.h"
|
||||
#include "mongo/db/read_write_concern_defaults_cache_lookup_mongod.h"
|
||||
@ -711,11 +710,6 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
|
||||
if (storageEngine->supportsReadConcernSnapshot()) {
|
||||
try {
|
||||
PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->start();
|
||||
// The inMemory engine is not yet used for replica or sharded transactions in production
|
||||
// so it does not currently maintain snapshot history. It is live in testing, however.
|
||||
if (!storageEngine->isEphemeral() || getTestCommandsEnabled()) {
|
||||
PeriodicThreadToDecreaseSnapshotHistoryCachePressure::get(serviceContext)->start();
|
||||
}
|
||||
} catch (ExceptionFor<ErrorCodes::PeriodicJobIsStopped>&) {
|
||||
LOGV2_WARNING(4747501, "Not starting periodic jobs as shutdown is in progress");
|
||||
// Shutdown has already started before initialization is complete. Wait for the
|
||||
@ -1143,7 +1137,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
|
||||
if (auto storageEngine = serviceContext->getStorageEngine()) {
|
||||
if (storageEngine->supportsReadConcernSnapshot()) {
|
||||
PeriodicThreadToAbortExpiredTransactions::get(serviceContext)->stop();
|
||||
PeriodicThreadToDecreaseSnapshotHistoryCachePressure::get(serviceContext)->stop();
|
||||
}
|
||||
|
||||
ServiceContext::UniqueOperationContext uniqueOpCtx;
|
||||
|
@ -1,110 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2018-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h"
|
||||
|
||||
#include "mongo/db/client.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/snapshot_window_options.h"
|
||||
#include "mongo/db/snapshot_window_util.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/periodic_runner.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
auto PeriodicThreadToDecreaseSnapshotHistoryCachePressure::get(ServiceContext* serviceContext)
|
||||
-> PeriodicThreadToDecreaseSnapshotHistoryCachePressure& {
|
||||
auto& jobContainer = _serviceDecoration(serviceContext);
|
||||
jobContainer._init(serviceContext);
|
||||
return jobContainer;
|
||||
}
|
||||
|
||||
auto PeriodicThreadToDecreaseSnapshotHistoryCachePressure::operator-> () const noexcept
|
||||
-> PeriodicJobAnchor* {
|
||||
stdx::lock_guard lk(_mutex);
|
||||
return _anchor.get();
|
||||
}
|
||||
|
||||
auto PeriodicThreadToDecreaseSnapshotHistoryCachePressure::operator*() const noexcept
|
||||
-> PeriodicJobAnchor& {
|
||||
stdx::lock_guard lk(_mutex);
|
||||
return *_anchor;
|
||||
}
|
||||
|
||||
void PeriodicThreadToDecreaseSnapshotHistoryCachePressure::_init(ServiceContext* serviceContext) {
|
||||
stdx::lock_guard lk(_mutex);
|
||||
if (_anchor) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto periodicRunner = serviceContext->getPeriodicRunner();
|
||||
invariant(periodicRunner);
|
||||
|
||||
PeriodicRunner::PeriodicJob job(
|
||||
"decreaseSnapshotHistoryCachePressure",
|
||||
[](Client* client) {
|
||||
try {
|
||||
// The opCtx destructor handles unsetting itself from the Client.
|
||||
// (The PeriodicRunnerASIO's Client must be reset before returning.)
|
||||
auto opCtx = client->makeOperationContext();
|
||||
|
||||
SnapshotWindowUtil::decreaseTargetSnapshotWindowSize(opCtx.get());
|
||||
} catch (const ExceptionForCat<ErrorCategory::CancelationError>& ex) {
|
||||
LOGV2_DEBUG(4684102, 4, "Periodic task cancelled", "reason"_attr = ex.toStatus());
|
||||
} catch (const DBException& ex) {
|
||||
LOGV2_WARNING(
|
||||
20894,
|
||||
"Periodic task to check for and decrease cache pressure caused by maintaining "
|
||||
"too much snapshot history failed! Caused by: {ex_toStatus}",
|
||||
"ex_toStatus"_attr = ex.toStatus());
|
||||
}
|
||||
},
|
||||
Seconds(snapshotWindowParams.checkCachePressurePeriodSeconds.load()));
|
||||
|
||||
_anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
|
||||
|
||||
SnapshotWindowParams::observeCheckCachePressurePeriodSeconds.addObserver([anchor = _anchor](
|
||||
const auto& secs) {
|
||||
try {
|
||||
anchor->setPeriod(Seconds(secs));
|
||||
} catch (const DBException& ex) {
|
||||
LOGV2(20893,
|
||||
"Failed to update the period of the thread which decreases data history cache "
|
||||
"target size if there is cache pressure.{ex_toStatus}",
|
||||
"ex_toStatus"_attr = ex.toStatus());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace mongo
|
@ -1,69 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2018-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
#include "mongo/util/hierarchical_acquisition.h"
|
||||
#include "mongo/util/periodic_runner.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* Periodically checks for storage engine cache pressure to determine whether the maintained
|
||||
* snapshot history window target setting should be decreased. Maintaining too much snapshot and
|
||||
* write history can slow down the system. Runs once every checkCachePressurePeriodSeconds.
|
||||
*
|
||||
* This function should only ever be called once, during mongod server startup (db.cpp).
|
||||
* The PeriodicRunner will handle shutting down the job on shutdown, no extra handling necessary.
|
||||
*/
|
||||
class PeriodicThreadToDecreaseSnapshotHistoryCachePressure {
|
||||
public:
|
||||
static PeriodicThreadToDecreaseSnapshotHistoryCachePressure& get(
|
||||
ServiceContext* serviceContext);
|
||||
|
||||
PeriodicJobAnchor* operator->() const noexcept;
|
||||
PeriodicJobAnchor& operator*() const noexcept;
|
||||
|
||||
private:
|
||||
void _init(ServiceContext* serviceContext);
|
||||
|
||||
inline static const auto _serviceDecoration =
|
||||
ServiceContext::declareDecoration<PeriodicThreadToDecreaseSnapshotHistoryCachePressure>();
|
||||
|
||||
mutable Mutex _mutex =
|
||||
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1),
|
||||
"PeriodicThreadToDecreaseSnapshotHistoryCachePressure::_mutex");
|
||||
std::shared_ptr<PeriodicJobAnchor> _anchor;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
@ -76,7 +76,7 @@
|
||||
#include "mongo/db/s/transaction_coordinator_factory.h"
|
||||
#include "mongo/db/service_entry_point_common.h"
|
||||
#include "mongo/db/session_catalog_mongod.h"
|
||||
#include "mongo/db/snapshot_window_util.h"
|
||||
#include "mongo/db/snapshot_window_options.h"
|
||||
#include "mongo/db/stats/counters.h"
|
||||
#include "mongo/db/stats/server_read_concern_metrics.h"
|
||||
#include "mongo/db/stats/top.h"
|
||||
@ -1235,8 +1235,7 @@ void execCommandDatabase(OperationContext* opCtx,
|
||||
// snapshot at their specified atClusterTime. Therefore, we'll try to increase the
|
||||
// snapshot history window that the storage engine maintains in order to increase
|
||||
// the likelihood of successful future PIT atClusterTime requests.
|
||||
SnapshotWindowUtil::incrementSnapshotTooOldErrorCount();
|
||||
SnapshotWindowUtil::increaseTargetSnapshotWindowSize(opCtx);
|
||||
snapshotWindowParams.snapshotTooOldErrorCount.addAndFetch(1);
|
||||
} else {
|
||||
behaviors.handleException(e, opCtx);
|
||||
}
|
||||
|
@ -31,24 +31,8 @@
|
||||
|
||||
#include "mongo/db/snapshot_window_options.h"
|
||||
|
||||
#include "mongo/platform/compiler.h"
|
||||
#include "mongo/util/options_parser/startup_option_init.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
SnapshotWindowParams snapshotWindowParams;
|
||||
|
||||
/**
|
||||
* After startup parameters have been initialized, set targetSnapshotHistoryWindowInSeconds to the
|
||||
* value of maxTargetSnapshotHistoryWindowInSeconds, in case the max has been altered. The cache
|
||||
* pressure is zero to begin with, so the user should not have to wait for the target to slowly
|
||||
* adjust to max.
|
||||
*/
|
||||
MONGO_STARTUP_OPTIONS_POST(SetTargetSnapshotWindowSize)
|
||||
(InitializerContext* context) {
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.store(
|
||||
snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds.load());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
|
@ -37,79 +37,25 @@ namespace mongo {
|
||||
|
||||
/**
|
||||
* These are parameters that affect how much snapshot history the storage engine will keep to
|
||||
* support point-in-time transactions (read or write). This is referred to as the snapshot window.
|
||||
* The window is between the stable timestamp and the oldest timestamp.
|
||||
* support snapshot reads. This is referred to as the snapshot window. The window is between the
|
||||
* stable timestamp and the oldest timestamp.
|
||||
*/
|
||||
struct SnapshotWindowParams {
|
||||
|
||||
// maxTargetSnapshotHistoryWindowInSeconds (startup & runtime server parameter, range 0+).
|
||||
// minSnapshotHistoryWindowInSeconds (startup & runtime server parameter, range 0+).
|
||||
//
|
||||
// Dictates the maximum lag in seconds oldest_timestamp should be behind stable_timestamp.
|
||||
// targetSnapshotHistoryWindowInSeconds below is the actual active lag setting target.
|
||||
// Dictates the lag in seconds oldest_timestamp should be set behind stable_timestamp.
|
||||
//
|
||||
// Note that the window size can become greater than this if an ongoing operation is holding an
|
||||
// older snapshot open.
|
||||
AtomicWord<int> maxTargetSnapshotHistoryWindowInSeconds{5};
|
||||
|
||||
// targetSnapshotHistoryWindowInSeconds (not a server parameter, range 0+).
|
||||
//
|
||||
// Dictates the target lag in seconds oldest_timestamp should be set behind stable_timestamp.
|
||||
// Should only be set in the range [0, maxTargetSnapshotHistoryWindowInSeconds].
|
||||
//
|
||||
// Note that this is the history window we attempt to maintain, but our current system state may
|
||||
// not always reflect it: the window can only change as more writes come in, so it can take time
|
||||
// for the actual window size to catch up with a change. This value guides actions whenever the
|
||||
// system goes to update the oldest_timestamp value (usually when the stable_timestamp is
|
||||
// updated).
|
||||
AtomicWord<int> targetSnapshotHistoryWindowInSeconds{
|
||||
maxTargetSnapshotHistoryWindowInSeconds.load()};
|
||||
AtomicWord<int> minSnapshotHistoryWindowInSeconds{5};
|
||||
|
||||
// cachePressureThreshold (startup & runtime server parameter, range [0, 100]).
|
||||
//
|
||||
// Compares against a storage engine cache pressure indicator that ranges from 0 to 100.
|
||||
// Currently, the only indicator is the WT lookaside score.
|
||||
//
|
||||
// This setting helps preempt storage cache pressure immobilizing the system. Attempts to
|
||||
// increase targetSnapshotHistoryWindowInSeconds will be ignored when the cache pressure reaches
|
||||
// this threshold. Additionally, a periodic task will decrease
|
||||
// targetSnapshotHistoryWindowInSecond when cache pressure exceeds the threshold.
|
||||
AtomicWord<int> cachePressureThreshold{95};
|
||||
|
||||
// snapshotWindowMultiplicativeDecrease (startup & runtime server parameter, range (0,1)).
|
||||
//
|
||||
// Controls by what multiplier the target snapshot history window setting is decreased when
|
||||
// cache pressure becomes too high, per the cachePressureThreshold setting.
|
||||
AtomicDouble snapshotWindowMultiplicativeDecrease{0.75};
|
||||
|
||||
// snapshotWindowAdditiveIncreaseSeconds (startup & runtime server parameter, range 1+).
|
||||
//
|
||||
// Controls by how much the target snapshot history window setting is increased when cache
|
||||
// pressure is OK, per cachePressureThreshold, and we need to service older snapshots for global
|
||||
// point-in-time reads.
|
||||
AtomicWord<int> snapshotWindowAdditiveIncreaseSeconds{2};
|
||||
|
||||
// minMillisBetweenSnapshotWindowInc (startup & runtime server parameter, range 0+).
|
||||
// minMillisBetweenSnapshotWindowDec (startup & runtime server parameter, range 0+).
|
||||
//
|
||||
// Controls how often attempting to increase/decrease the target snapshot window will have an
|
||||
// effect. Multiple callers within minMillisBetweenSnapshotWindowInc will have the same effect
|
||||
// as one. This protects the system because it takes time for the target snapshot window to
|
||||
// affect the actual storage engine snapshot window. The stable timestamp must move forward for
|
||||
// the window between it and oldest timestamp to grow or shrink.
|
||||
AtomicWord<int> minMillisBetweenSnapshotWindowInc{500};
|
||||
AtomicWord<int> minMillisBetweenSnapshotWindowDec{500};
|
||||
|
||||
// checkCachePressurePeriodSeconds (startup & runtime server parameter, range 1+)
|
||||
//
|
||||
// Controls the period of the task that checks for cache pressure and decreases
|
||||
// targetSnapshotHistoryWindowInSeconds if the pressure is above cachePressureThreshold. The
|
||||
// target window size setting must not be decreased too fast because time must be allowed for
|
||||
// the storage engine to attempt to act on the new setting.
|
||||
AtomicWord<int> checkCachePressurePeriodSeconds{5};
|
||||
|
||||
static inline MutableObserverRegistry<decltype(checkCachePressurePeriodSeconds)::WordType>
|
||||
observeCheckCachePressurePeriodSeconds;
|
||||
|
||||
AtomicWord<std::int64_t> snapshotTooOldErrorCount{0};
|
||||
};
|
||||
|
||||
|
@ -35,10 +35,10 @@ imports:
|
||||
- "mongo/idl/basic_types.idl"
|
||||
|
||||
server_parameters:
|
||||
maxTargetSnapshotHistoryWindowInSeconds:
|
||||
description: "Maximum target snapshot history window, in seconds"
|
||||
minSnapshotHistoryWindowInSeconds:
|
||||
description: "Minimum snapshot history to keep, in seconds"
|
||||
set_at: [ startup, runtime ]
|
||||
cpp_varname: "snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds"
|
||||
cpp_varname: "snapshotWindowParams.minSnapshotHistoryWindowInSeconds"
|
||||
validator: { gte: 0 }
|
||||
|
||||
cachePressureThreshold:
|
||||
@ -48,36 +48,3 @@ server_parameters:
|
||||
validator:
|
||||
gte: 0
|
||||
lte: 100
|
||||
|
||||
snapshotWindowMultiplicativeDecrease:
|
||||
description: "Snapshot window multiplicative decrease"
|
||||
set_at: [ startup, runtime ]
|
||||
cpp_varname: "snapshotWindowParams.snapshotWindowMultiplicativeDecrease"
|
||||
validator:
|
||||
gt: 0.0
|
||||
lt: 1.0
|
||||
|
||||
snapshotWindowAdditiveIncreaseSeconds:
|
||||
description: "Snapshot window multiplicative increase, in seconds"
|
||||
set_at: [ startup, runtime ]
|
||||
cpp_varname: "snapshotWindowParams.snapshotWindowAdditiveIncreaseSeconds"
|
||||
validator: { gte: 1 }
|
||||
|
||||
minMillisBetweenSnapshotWindowInc:
|
||||
description: "Minimum duration between snapshot window increment, in milliseconds"
|
||||
set_at: [ startup, runtime ]
|
||||
cpp_varname: "snapshotWindowParams.minMillisBetweenSnapshotWindowInc"
|
||||
validator: { gte: 1 }
|
||||
|
||||
minMillisBetweenSnapshotWindowDec:
|
||||
description: "Minimum duration between snapshot window decrement, in milliseconds"
|
||||
set_at: [ startup, runtime ]
|
||||
cpp_varname: "snapshotWindowParams.minMillisBetweenSnapshotWindowDec"
|
||||
validator: { gte: 1 }
|
||||
|
||||
checkCachePressurePeriodSeconds:
|
||||
description: "Check cache pressure period, in seconds"
|
||||
set_at: [ startup, runtime ]
|
||||
cpp_varname: "snapshotWindowParams.checkCachePressurePeriodSeconds"
|
||||
validator: { gte: 1 }
|
||||
on_update: std::ref(SnapshotWindowParams::observeCheckCachePressurePeriodSeconds)
|
||||
|
@ -1,173 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2018-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/db/snapshot_window_util.h"
|
||||
|
||||
#include "mongo/db/commands/test_commands_enabled.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/snapshot_window_options.h"
|
||||
#include "mongo/db/storage/storage_engine.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
#include "mongo/util/concurrency/with_lock.h"
|
||||
#include "mongo/util/fail_point.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
// This failpoint is used for performance testing.
|
||||
MONGO_FAIL_POINT_DEFINE(preventDynamicSnapshotHistoryWindowTargetAdjustments);
|
||||
|
||||
namespace SnapshotWindowUtil {
|
||||
|
||||
// Adds concurrency control to increaseTargetSnapshotWindowSize() and
|
||||
// decreaseTargetSnapshotWindowSize(). They should not run concurrently with themselves or one
|
||||
// another, since they act on and modify the same storage parameters. Further guards the static
|
||||
// variables "_snapshotWindowLastDecreasedAt" and "_snapshotWindowLastIncreasedAt" used in
|
||||
// increaseTargetSnapshotWindowSize() and decreaseSnapshowWindow().
|
||||
Mutex snapshotWindowMutex;
|
||||
|
||||
namespace {
|
||||
|
||||
void _decreaseTargetSnapshotWindowSize(WithLock lock, OperationContext* opCtx) {
|
||||
// Tracks the last time that the snapshot window was decreased so that it does not go down so
|
||||
// fast that the system does not have time to react and reduce snapshot availability.
|
||||
static Date_t _snapshotWindowLastDecreasedAt{Date_t::min()};
|
||||
|
||||
if (_snapshotWindowLastDecreasedAt >
|
||||
(Date_t::now() -
|
||||
Milliseconds(snapshotWindowParams.minMillisBetweenSnapshotWindowDec.load()))) {
|
||||
// We have already decreased the window size in the last minMillisBetweenSnapshotWindowDec
|
||||
// milliseconds.
|
||||
return;
|
||||
}
|
||||
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.store(
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load() *
|
||||
snapshotWindowParams.snapshotWindowMultiplicativeDecrease.load());
|
||||
|
||||
// Try to set the oldest_timestamp immediately without waiting for a write to adjust the
|
||||
// window. May or may not work depending on the state of the system.
|
||||
StorageEngine* engine = opCtx->getServiceContext()->getStorageEngine();
|
||||
invariant(engine);
|
||||
engine->setOldestTimestampFromStable();
|
||||
|
||||
_snapshotWindowLastDecreasedAt = Date_t::now();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void increaseTargetSnapshotWindowSize(OperationContext* opCtx) {
|
||||
if (MONGO_unlikely(preventDynamicSnapshotHistoryWindowTargetAdjustments.shouldFail())) {
|
||||
return;
|
||||
}
|
||||
|
||||
stdx::unique_lock<Latch> lock(snapshotWindowMutex);
|
||||
|
||||
// Tracks the last time that the snapshot window was increased so that it does not go up so fast
|
||||
// that the storage engine does not have time to improve snapshot availability.
|
||||
static Date_t _snapshotWindowLastIncreasedAt{Date_t::min()};
|
||||
|
||||
if (_snapshotWindowLastIncreasedAt >
|
||||
(Date_t::now() -
|
||||
Milliseconds(snapshotWindowParams.minMillisBetweenSnapshotWindowInc.load()))) {
|
||||
// We have already increased the window size in the last minMillisBetweenSnapshotWindowInc
|
||||
// milliseconds.
|
||||
return;
|
||||
}
|
||||
|
||||
// If the cache pressure is already too high, we will not put more pressure on it by increasing
|
||||
// the window size.
|
||||
StorageEngine* engine = opCtx->getServiceContext()->getStorageEngine();
|
||||
if (engine && engine->isCacheUnderPressure(opCtx)) {
|
||||
invariant(!engine->isEphemeral() || getTestCommandsEnabled());
|
||||
LOGV2_WARNING(
|
||||
23788,
|
||||
"Attempted to increase the time window of available snapshots for "
|
||||
"point-in-time operations (readConcern level 'snapshot' or transactions), but "
|
||||
"the storage engine cache pressure, per the cachePressureThreshold setting of "
|
||||
"'{snapshotWindowParams_cachePressureThreshold_load}', is too high to allow it to "
|
||||
"increase. If this happens frequently, consider "
|
||||
"either increasing the cache pressure threshold or increasing the memory "
|
||||
"available to the storage engine cache, in order to improve the success rate "
|
||||
"or speed of point-in-time requests.",
|
||||
"snapshotWindowParams_cachePressureThreshold_load"_attr =
|
||||
snapshotWindowParams.cachePressureThreshold.load());
|
||||
_decreaseTargetSnapshotWindowSize(lock, opCtx);
|
||||
return;
|
||||
}
|
||||
|
||||
if (snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load() ==
|
||||
snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds.load()) {
|
||||
LOGV2_WARNING(
|
||||
23789,
|
||||
"Attempted to increase the time window of available snapshots for "
|
||||
"point-in-time operations (readConcern level 'snapshot' or transactions), but "
|
||||
"maxTargetSnapshotHistoryWindowInSeconds has already been reached. If this "
|
||||
"happens frequently, consider increasing the "
|
||||
"maxTargetSnapshotHistoryWindowInSeconds setting value, which is currently "
|
||||
"set to '{snapshotWindowParams_maxTargetSnapshotHistoryWindowInSeconds_load}'.",
|
||||
"snapshotWindowParams_maxTargetSnapshotHistoryWindowInSeconds_load"_attr =
|
||||
snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds.load());
|
||||
return;
|
||||
}
|
||||
|
||||
int increasedSnapshotWindow = snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load() +
|
||||
snapshotWindowParams.snapshotWindowAdditiveIncreaseSeconds.load();
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.store(
|
||||
std::min(increasedSnapshotWindow,
|
||||
snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds.load()));
|
||||
|
||||
_snapshotWindowLastIncreasedAt = Date_t::now();
|
||||
}
|
||||
|
||||
void decreaseTargetSnapshotWindowSize(OperationContext* opCtx) {
|
||||
if (MONGO_unlikely(preventDynamicSnapshotHistoryWindowTargetAdjustments.shouldFail())) {
|
||||
return;
|
||||
}
|
||||
|
||||
stdx::unique_lock<Latch> lock(snapshotWindowMutex);
|
||||
|
||||
StorageEngine* engine = opCtx->getServiceContext()->getStorageEngine();
|
||||
if (engine && engine->isCacheUnderPressure(opCtx)) {
|
||||
invariant(!engine->isEphemeral() || getTestCommandsEnabled());
|
||||
_decreaseTargetSnapshotWindowSize(lock, opCtx);
|
||||
}
|
||||
}
|
||||
|
||||
void incrementSnapshotTooOldErrorCount() {
|
||||
snapshotWindowParams.snapshotTooOldErrorCount.addAndFetch(1);
|
||||
}
|
||||
|
||||
} // namespace SnapshotWindowUtil
|
||||
} // namespace mongo
|
@ -1,86 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2018-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace mongo {
|
||||
|
||||
class OperationContext;
|
||||
|
||||
/**
|
||||
* Function helpers to manipulate parameters affecting the snapshot time window size that the
|
||||
* storage engine maintains of available snapshots for point-in-time reads.
|
||||
*/
|
||||
namespace SnapshotWindowUtil {
|
||||
|
||||
/**
|
||||
* Attempts to increase the setting that controls the window of time between stable_timestamp and
|
||||
* oldest_timestamp, in order to provide a greater range of available snapshots for point-in-time
|
||||
* operations. The window will not be increased, however, if the cache pressure is currently too
|
||||
* high. This function will be called when server requests return SnapshotTooOld (or similar)
|
||||
* errors. Note that this will not immediately affect the oldest_timestamp. Rather, it affects
|
||||
* actions taken next time oldest_timestamp is updated, usually when the stable timestamp is
|
||||
* advanced.
|
||||
*
|
||||
* Implements an additive increase algorithm.
|
||||
*
|
||||
* Calling many times all at once has the same effect as calling once. The last update time is
|
||||
* tracked and attempts to increase the window are limited to once in
|
||||
* minMillisBetweenSnapshotWindowInc. This is to protect against a sudden wave of function calls due
|
||||
* to simultaneous SnapshotTooOld errors. Some time must be allowed for the increased target
|
||||
* snapshot window size setting to have an effect. The target size can also never exceed
|
||||
* maxTargetSnapshotHistoryWindowInSeconds.
|
||||
*/
|
||||
void increaseTargetSnapshotWindowSize(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Attempts to decrease (if not already zero) the setting that affects the size of the window of
|
||||
* time between stable_timestamp and oldest_timestamp in order to reduce storage engine cache
|
||||
* pressure. The window target will not be decreased, however, if the cache is not currently under
|
||||
* pressure. Pressure can occur when too much history is being maintained for point-in-time
|
||||
* snapshots. Note that this will not necessarily immediately affect the actual window size; rather,
|
||||
* it affects actions taken whenever oldest_timestamp is updated, usually when the stable timestamp
|
||||
* is advanced.
|
||||
*
|
||||
* This will make one attempt to immediately adjust the window size if possible.
|
||||
*
|
||||
* Implements a multiplicative decrease algorithm.
|
||||
*/
|
||||
void decreaseTargetSnapshotWindowSize(OperationContext* opCtx);
|
||||
|
||||
/**
|
||||
* Increments a counter of SnapshotTooOld errors encountered in the command layer. Used for
|
||||
* statistics purposes. Continuing SnapshotTooOld errors can indicate cache pressure.
|
||||
*
|
||||
* Concurrency safe, the internal counter is atomic.
|
||||
*/
|
||||
void incrementSnapshotTooOldErrorCount();
|
||||
|
||||
} // namespace SnapshotWindowUtil
|
||||
} // namespace mongo
|
@ -1,174 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2018-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/db/snapshot_window_util.h"
|
||||
|
||||
#include "mongo/db/client.h"
|
||||
#include "mongo/db/commands/test_commands_enabled.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/service_context_devnull_test_fixture.h"
|
||||
#include "mongo/db/snapshot_window_options.h"
|
||||
#include "mongo/db/storage/storage_options.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
using namespace SnapshotWindowUtil;
|
||||
|
||||
/**
|
||||
* Tests the functions in snapshot_window_util.h using a devnull storage engine.
|
||||
*/
|
||||
class SnapshotWindowTest : public ServiceContextDevnullTestFixture {
|
||||
public:
|
||||
void setUp() override {
|
||||
ServiceContextDevnullTestFixture::setUp();
|
||||
_opCtx = cc().makeOperationContext();
|
||||
setTestCommandsEnabled(true);
|
||||
}
|
||||
|
||||
void tearDown() override {
|
||||
_opCtx.reset();
|
||||
ServiceContextDevnullTestFixture::tearDown();
|
||||
}
|
||||
|
||||
ServiceContext::UniqueOperationContext _opCtx;
|
||||
};
|
||||
|
||||
TEST_F(SnapshotWindowTest, DecreaseAndIncreaseSnapshotWindow) {
|
||||
auto engine = getServiceContext()->getStorageEngine();
|
||||
invariant(engine);
|
||||
|
||||
snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds.store(100);
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.store(100);
|
||||
|
||||
// Lower the time enforced between function calls to speed up testing.
|
||||
// Dec must match Inc b/c increaseTargetWindowSize can call into decreaseTargetWindowSize.
|
||||
snapshotWindowParams.minMillisBetweenSnapshotWindowInc.store(100);
|
||||
snapshotWindowParams.minMillisBetweenSnapshotWindowDec.store(100);
|
||||
|
||||
// Stabilize for the test so we know that we are testing things as expected.
|
||||
snapshotWindowParams.snapshotWindowAdditiveIncreaseSeconds.store(2);
|
||||
snapshotWindowParams.snapshotWindowMultiplicativeDecrease.store(0.75);
|
||||
|
||||
auto maxTargetSnapshotWindowSeconds =
|
||||
snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds.load();
|
||||
auto snapshotWindowSeconds = snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load();
|
||||
ASSERT_EQ(maxTargetSnapshotWindowSeconds, snapshotWindowSeconds);
|
||||
|
||||
auto windowMultiplicativeDecrease =
|
||||
snapshotWindowParams.snapshotWindowMultiplicativeDecrease.load();
|
||||
auto windowAdditiveIncrease = snapshotWindowParams.snapshotWindowAdditiveIncreaseSeconds.load();
|
||||
|
||||
auto cachePressureThreshold = snapshotWindowParams.cachePressureThreshold.load();
|
||||
auto minTimeBetweenInc = snapshotWindowParams.minMillisBetweenSnapshotWindowInc.load();
|
||||
|
||||
/**
|
||||
* Test that decreasing the size succeeds when cache pressure is ABOVE the threshold
|
||||
*/
|
||||
|
||||
engine->setCachePressureForTest(cachePressureThreshold + 5);
|
||||
|
||||
decreaseTargetSnapshotWindowSize(_opCtx.get());
|
||||
auto snapshotWindowSecondsOne =
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load();
|
||||
|
||||
ASSERT_GT(snapshotWindowSeconds, snapshotWindowSecondsOne);
|
||||
ASSERT_EQ(snapshotWindowSecondsOne,
|
||||
static_cast<int>(snapshotWindowSeconds * windowMultiplicativeDecrease));
|
||||
|
||||
/**
|
||||
* Test that increasing the size SUCCEEDS when the cache pressure is BELOW the threshold.
|
||||
*/
|
||||
|
||||
engine->setCachePressureForTest(cachePressureThreshold - 5);
|
||||
|
||||
increaseTargetSnapshotWindowSize(_opCtx.get());
|
||||
auto snapshotWindowSecondsTwo =
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load();
|
||||
|
||||
ASSERT_EQ(snapshotWindowSecondsTwo, snapshotWindowSecondsOne + windowAdditiveIncrease);
|
||||
|
||||
/**
|
||||
* Test that increasing the size FAILS when the cache pressure is ABOVE the threshold, and
|
||||
* instead this causes the size to be decreased.
|
||||
*/
|
||||
|
||||
engine->setCachePressureForTest(cachePressureThreshold + 5);
|
||||
|
||||
// Sleep for a time because increaseTargetSnapshotWindowSize() enforces a wait time between
|
||||
// updates.
|
||||
sleepmillis(2 * minTimeBetweenInc);
|
||||
|
||||
increaseTargetSnapshotWindowSize(_opCtx.get());
|
||||
auto snapshotWindowSecondsThree =
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load();
|
||||
|
||||
ASSERT_EQ(snapshotWindowSecondsThree,
|
||||
static_cast<int>(snapshotWindowSecondsTwo * windowMultiplicativeDecrease));
|
||||
|
||||
engine->setCachePressureForTest(cachePressureThreshold - 5);
|
||||
|
||||
/**
|
||||
* Test that the size cannot be increased above the maximum size.
|
||||
*/
|
||||
|
||||
// Integers round down, so add 1 to make sure it reaches the max.
|
||||
int numIncreasesToReachMax =
|
||||
(maxTargetSnapshotWindowSeconds - snapshotWindowSecondsThree) / windowAdditiveIncrease + 1;
|
||||
for (int i = 0; i < numIncreasesToReachMax; ++i) {
|
||||
sleepmillis(2 * minTimeBetweenInc);
|
||||
increaseTargetSnapshotWindowSize(_opCtx.get());
|
||||
}
|
||||
|
||||
// Should be at max.
|
||||
auto snapshotWindowSecondsFour =
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load();
|
||||
ASSERT_EQ(snapshotWindowSecondsFour, maxTargetSnapshotWindowSeconds);
|
||||
|
||||
// An attempt to increase beyond max should have no effect.
|
||||
sleepmillis(2 * minTimeBetweenInc);
|
||||
increaseTargetSnapshotWindowSize(_opCtx.get());
|
||||
auto snapshotWindowSecondsFive =
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load();
|
||||
ASSERT_EQ(snapshotWindowSecondsFive, maxTargetSnapshotWindowSeconds);
|
||||
}
|
||||
|
||||
TEST_F(SnapshotWindowTest, IncrementSnapshotTooOldErrorCount) {
|
||||
auto beforeCount = snapshotWindowParams.snapshotTooOldErrorCount.load();
|
||||
incrementSnapshotTooOldErrorCount();
|
||||
incrementSnapshotTooOldErrorCount();
|
||||
auto afterCount = snapshotWindowParams.snapshotTooOldErrorCount.load();
|
||||
ASSERT_EQ(beforeCount + 2, afterCount);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
@ -479,7 +479,7 @@ public:
|
||||
* must maintain snapshot history through.
|
||||
*
|
||||
* oldest_timestamp will be set to stable_timestamp adjusted by
|
||||
* 'targetSnapshotHistoryWindowInSeconds' to create a window of available snapshots on the
|
||||
* 'minSnapshotHistoryWindowInSeconds' to create a window of available snapshots on the
|
||||
* storage engine from oldest to stable. Furthermore, oldest_timestamp will never be set ahead
|
||||
* of the oplog read timestamp, ensuring the oplog reader's 'read_timestamp' can always be
|
||||
* serviced.
|
||||
|
@ -764,7 +764,7 @@ WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
|
||||
// We do not maintain any snapshot history for the ephemeral storage engine in production
|
||||
// because replication and sharded transactions do not currently run on the inMemory engine.
|
||||
// It is live in testing, however.
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.store(0);
|
||||
snapshotWindowParams.minSnapshotHistoryWindowInSeconds.store(0);
|
||||
}
|
||||
|
||||
_sizeStorerUri = _uri("sizeStorer");
|
||||
@ -1861,7 +1861,7 @@ void WiredTigerKVEngine::setOldestTimestampFromStable() {
|
||||
}
|
||||
|
||||
// Calculate what the oldest_timestamp should be from the stable_timestamp. The oldest
|
||||
// timestamp should lag behind stable by 'targetSnapshotHistoryWindowInSeconds' to create a
|
||||
// timestamp should lag behind stable by 'minSnapshotHistoryWindowInSeconds' to create a
|
||||
// window of available snapshots. If the lag window is not yet large enough, we will not
|
||||
// update/forward the oldest_timestamp yet and instead return early.
|
||||
Timestamp newOldestTimestamp = _calculateHistoryLagFromStableTimestamp(stableTimestamp);
|
||||
@ -1904,23 +1904,23 @@ void WiredTigerKVEngine::setOldestTimestamp(Timestamp newOldestTimestamp, bool f
|
||||
|
||||
Timestamp WiredTigerKVEngine::_calculateHistoryLagFromStableTimestamp(Timestamp stableTimestamp) {
|
||||
// The oldest_timestamp should lag behind the stable_timestamp by
|
||||
// 'targetSnapshotHistoryWindowInSeconds' seconds.
|
||||
// 'minSnapshotHistoryWindowInSeconds' seconds.
|
||||
|
||||
if (_ephemeral && !getTestCommandsEnabled()) {
|
||||
// No history should be maintained for the inMemory engine because it is not used yet.
|
||||
invariant(snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load() == 0);
|
||||
invariant(snapshotWindowParams.minSnapshotHistoryWindowInSeconds.load() == 0);
|
||||
}
|
||||
|
||||
if (stableTimestamp.getSecs() <
|
||||
static_cast<unsigned>(snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load())) {
|
||||
static_cast<unsigned>(snapshotWindowParams.minSnapshotHistoryWindowInSeconds.load())) {
|
||||
// The history window is larger than the timestamp history thus far. We must wait for
|
||||
// the history to reach the window size before moving oldest_timestamp forward.
|
||||
// the history to reach the window size before moving oldest_timestamp forward. This should
|
||||
// only happen in unit tests.
|
||||
return Timestamp();
|
||||
}
|
||||
|
||||
Timestamp calculatedOldestTimestamp(
|
||||
stableTimestamp.getSecs() -
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load(),
|
||||
stableTimestamp.getSecs() - snapshotWindowParams.minSnapshotHistoryWindowInSeconds.load(),
|
||||
stableTimestamp.getInc());
|
||||
|
||||
if (calculatedOldestTimestamp.asULL() <= _oldestTimestamp.load()) {
|
||||
|
@ -376,7 +376,7 @@ private:
|
||||
std::string _uri(StringData ident) const;
|
||||
|
||||
/**
|
||||
* Uses the 'stableTimestamp', the 'targetSnapshotHistoryWindowInSeconds' setting and the
|
||||
* Uses the 'stableTimestamp', the 'minSnapshotHistoryWindowInSeconds' setting and the
|
||||
* current _oldestTimestamp to calculate what the new oldest_timestamp should be, in order to
|
||||
* maintain a window of available snapshots on the storage engine from oldest to stable
|
||||
* timestamp.
|
||||
|
@ -748,11 +748,9 @@ void WiredTigerUtil::appendSnapshotWindowSettings(WiredTigerKVEngine* engine,
|
||||
snapshotWindowParams.cachePressureThreshold.load());
|
||||
settings.append("current cache pressure percentage", score);
|
||||
settings.append("total number of SnapshotTooOld errors", totalNumberOfSnapshotTooOldErrors);
|
||||
settings.append("max target available snapshots window size in seconds",
|
||||
snapshotWindowParams.maxTargetSnapshotHistoryWindowInSeconds.load());
|
||||
settings.append("target available snapshots window size in seconds",
|
||||
snapshotWindowParams.targetSnapshotHistoryWindowInSeconds.load());
|
||||
settings.append("current available snapshots window size in seconds",
|
||||
settings.append("minimum target snapshot window size in seconds",
|
||||
snapshotWindowParams.minSnapshotHistoryWindowInSeconds.load());
|
||||
settings.append("current available snapshot window size in seconds",
|
||||
static_cast<int>(currentAvailableSnapshotWindow));
|
||||
settings.append("latest majority snapshot timestamp available",
|
||||
stableTimestamp.toStringPretty());
|
||||
|
Loading…
Reference in New Issue
Block a user