mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-45975: Added --oplogMinRetention cli option
create mode 100644 jstests/noPassthrough/oplog_retention_hours.js
This commit is contained in:
parent
910101068e
commit
1df4904eb4
99
jstests/noPassthrough/oplog_retention_hours.js
Normal file
99
jstests/noPassthrough/oplog_retention_hours.js
Normal file
@ -0,0 +1,99 @@
|
||||
/**
|
||||
* When started with the --oplogMinRetentionHours flag, the server must enforce a minimum retention
|
||||
* time (in hours) in addition to the implicit oplogSize for the oplog.
|
||||
*
|
||||
* Only when the oplog's size has exceeded the server's --oplogSize parameter AND the timestamp
|
||||
* of the newest oplog entry in the oldest stone has fallen outside of the retention window do we
|
||||
* remove the last stone.
|
||||
*
|
||||
* This test floods the oplog collection until it reaches --oplogSize, and then checks that the
|
||||
* current size of the oplog is less than --oplogSize only after the minimum retention time has
|
||||
* passed since inserting the first set of oplog entries
|
||||
*
|
||||
* @tags: [requires_journaling]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
load("jstests/core/txns/libs/prepare_helpers.js");
|
||||
|
||||
const doTest = () => {
|
||||
const replSet = new ReplSetTest({
|
||||
oplogSize: PrepareHelpers.oplogSizeMB,
|
||||
// Oplog can be truncated each "sync" cycle. Increase its frequency to once per second.
|
||||
nodeOptions: {syncdelay: 1, setParameter: {logComponentVerbosity: tojson({storage: 1})}},
|
||||
nodes: 1
|
||||
});
|
||||
const oplogMinRetentionHours = 0.002777;
|
||||
const minRetention = {oplogMinRetentionHours}; // 10 seconds
|
||||
replSet.startSet(Object.assign(minRetention, PrepareHelpers.replSetStartSetOptions));
|
||||
replSet.initiate();
|
||||
const primary = replSet.getPrimary();
|
||||
let oplogEntries = primary.getDB("local").getCollection("oplog.rs");
|
||||
|
||||
// ensure that oplog is not initially at capacity
|
||||
assert.lt(oplogEntries.dataSize(), PrepareHelpers.oplogSizeBytes);
|
||||
|
||||
primary.startSession();
|
||||
|
||||
jsTestLog("Insert documents until oplog exceeds oplogSize");
|
||||
const startTime = new Date();
|
||||
PrepareHelpers.growOplogPastMaxSize(replSet);
|
||||
// keep inserting docs until hasReplSetBeenTruncated returns true
|
||||
InsertUntilPred(replSet, didReplSetTruncate, replSet);
|
||||
const endTime = new Date();
|
||||
|
||||
const kNumMSInHour = 1000 * 60 * 60;
|
||||
const truncationElapsedTime = (endTime - startTime) / kNumMSInHour;
|
||||
assert.lte(oplogMinRetentionHours, truncationElapsedTime);
|
||||
|
||||
replSet.stopSet();
|
||||
};
|
||||
|
||||
/**
|
||||
* InsertUntilPred inserts documents into a single-node replica set until the predicate argument
|
||||
* returns true.
|
||||
*
|
||||
* This helper takes in the following arguments:
|
||||
*
|
||||
* - replSet: A single-node replica set
|
||||
*
|
||||
* - pred: A function that returns a boolean statement. When this pred returns true, we stop
|
||||
* inserting documents
|
||||
*
|
||||
* - args: A list of arguments that is passed into the predicate function argument as its
|
||||
* arguments
|
||||
*/
|
||||
const InsertUntilPred = (replSet, pred, ...args) => {
|
||||
const primary = replSet.getPrimary();
|
||||
const oplog = primary.getDB("local").oplog.rs;
|
||||
const coll = primary.getDB("insertUntilPred").growOplogPastMaxSize;
|
||||
const numNodes = replSet.nodeList().length;
|
||||
const tenKB = new Array(10 * 1024).join("a");
|
||||
|
||||
print(`Oplog on ${primary} dataSize = ${oplog.dataSize()}`);
|
||||
assert.soon(
|
||||
() => {
|
||||
if (pred(...args)) {
|
||||
jsTestLog("Predicate returned true, so we're done");
|
||||
return true;
|
||||
}
|
||||
|
||||
jsTestLog("Inserting a doc...");
|
||||
// insert a doc if predicate is not true
|
||||
assert.commandWorked(coll.insert({tenKB: tenKB}, {writeConcern: {w: numNodes}}));
|
||||
return false;
|
||||
},
|
||||
`timeout occurred while waiting for predicate function to return true`,
|
||||
ReplSetTest.kDefaultTimeoutMS,
|
||||
1000);
|
||||
};
|
||||
|
||||
// checks if the oplog has been truncated
|
||||
const didReplSetTruncate = replSet => {
|
||||
const oplogCol = replSet.getPrimary().getDB("local").oplog.rs;
|
||||
// The oplog milestone system allows the oplog to grow to 110% its max size.
|
||||
return oplogCol.dataSize() < 1.1 * PrepareHelpers.oplogSizeBytes;
|
||||
};
|
||||
|
||||
doTest();
|
||||
})();
|
@ -528,6 +528,16 @@ Status storeMongodOptions(const moe::Environment& params) {
|
||||
invariant(replSettings.getOplogSizeBytes() > 0);
|
||||
}
|
||||
|
||||
if (params.count("storage.oplogMinRetentionHours")) {
|
||||
storageGlobalParams.oplogMinRetentionHours.store(
|
||||
params["storage.oplogMinRetentionHours"].as<double>());
|
||||
if (storageGlobalParams.oplogMinRetentionHours.load() < 0) {
|
||||
return Status(ErrorCodes::BadValue,
|
||||
"bad --oplogMinRetentionHours, argument must be greater or equal to 0");
|
||||
}
|
||||
invariant(storageGlobalParams.oplogMinRetentionHours.load() >= 0);
|
||||
}
|
||||
|
||||
if (params.count("cacheSize")) {
|
||||
long x = params["cacheSize"].as<long>();
|
||||
if (x <= 0) {
|
||||
|
@ -122,3 +122,11 @@ configs:
|
||||
deprecated_short_name: nodur
|
||||
arg_vartype: Switch
|
||||
source: [ cli, ini ]
|
||||
|
||||
'storage.oplogMinRetentionHours':
|
||||
description: 'Minimum number of hours to preserve in the oplog. Default is 0 (turned off). Fractions are allowed (e.g. 1.5 hours)'
|
||||
short_name: oplogMinRetentionHours
|
||||
arg_vartype: Double
|
||||
default: 0.0
|
||||
validator:
|
||||
gte: 0.0
|
||||
|
@ -55,6 +55,7 @@ void StorageGlobalParams::reset() {
|
||||
syncdelay = 60.0;
|
||||
readOnly = false;
|
||||
groupCollections = false;
|
||||
oplogMinRetentionHours.store(0.0);
|
||||
}
|
||||
|
||||
StorageGlobalParams storageGlobalParams;
|
||||
|
@ -110,6 +110,12 @@ struct StorageGlobalParams {
|
||||
// workloads that rely heavily on creating many collections within a database.
|
||||
bool groupCollections;
|
||||
|
||||
// --oplogMinRetentionHours
|
||||
// Controls what size the oplog should be in addition to oplogSize. If set, the oplog will only
|
||||
// be truncated if it is over the capped size, and if the bucket of oldest oplog entries fall
|
||||
// outside of the retention window which is set by this option.
|
||||
AtomicWord<double> oplogMinRetentionHours;
|
||||
|
||||
// Controls whether we allow the OplogStones mechanism to delete oplog history on WT.
|
||||
bool allowOplogTruncation = true;
|
||||
};
|
||||
|
@ -49,6 +49,7 @@
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/repl/repl_settings.h"
|
||||
#include "mongo/db/repl/replication_coordinator.h"
|
||||
#include "mongo/db/server_recovery.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/storage/oplog_hack.h"
|
||||
@ -85,6 +86,8 @@ static const int kMaximumRecordStoreVersion = 1;
|
||||
MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion >= kMinimumRecordStoreVersion);
|
||||
MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion <= kMaximumRecordStoreVersion);
|
||||
|
||||
const double kNumSecsInHour = 3600.0;
|
||||
|
||||
void checkOplogFormatVersion(OperationContext* opCtx, const std::string& uri) {
|
||||
StatusWith<BSONObj> appMetadata = WiredTigerUtil::getApplicationMetadata(opCtx, uri);
|
||||
fassert(39999, appMetadata);
|
||||
@ -226,6 +229,33 @@ void WiredTigerRecordStore::OplogStones::awaitHasExcessStonesOrDead() {
|
||||
}
|
||||
}
|
||||
|
||||
bool WiredTigerRecordStore::OplogStones::hasExcessStones_inlock() const {
|
||||
int64_t totalBytes = 0;
|
||||
for (auto&& stone : _stones) {
|
||||
totalBytes += stone.bytes;
|
||||
}
|
||||
|
||||
// check that oplog stones is at capacity
|
||||
if (totalBytes <= _rs->cappedMaxSize()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
double minRetentionHours = storageGlobalParams.oplogMinRetentionHours.load();
|
||||
|
||||
// if we are not checking for time, then yes, there is a stone to be reaped
|
||||
// because oplog is at capacity
|
||||
if (minRetentionHours == 0.0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto rc = repl::ReplicationCoordinator::get(getGlobalServiceContext());
|
||||
double lastAppliedTs = rc->getMyLastAppliedOpTime().getTimestamp().getSecs();
|
||||
double lastStoneTs = Timestamp(_stones.front().lastRecord.repr()).getSecs();
|
||||
|
||||
double currRetentionHours = (lastAppliedTs - lastStoneTs) / kNumSecsInHour;
|
||||
return currRetentionHours >= minRetentionHours;
|
||||
}
|
||||
|
||||
boost::optional<WiredTigerRecordStore::OplogStones::Stone>
|
||||
WiredTigerRecordStore::OplogStones::peekOldestStoneIfNeeded() const {
|
||||
stdx::lock_guard<Latch> lk(_mutex);
|
||||
|
@ -57,15 +57,7 @@ public:
|
||||
|
||||
void kill();
|
||||
|
||||
bool hasExcessStones_inlock() const {
|
||||
int64_t total_bytes = 0;
|
||||
for (std::deque<OplogStones::Stone>::const_iterator it = _stones.begin();
|
||||
it != _stones.end();
|
||||
++it) {
|
||||
total_bytes += it->bytes;
|
||||
}
|
||||
return total_bytes > _rs->cappedMaxSize();
|
||||
}
|
||||
bool hasExcessStones_inlock() const;
|
||||
|
||||
void awaitHasExcessStonesOrDead();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user