mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-31507 add option to specify oplog application mode in applyOps
This commit is contained in:
parent
6709d70440
commit
ced3d3341a
92
jstests/noPassthrough/apply_ops_mode.js
Normal file
92
jstests/noPassthrough/apply_ops_mode.js
Normal file
@ -0,0 +1,92 @@
|
||||
/**
|
||||
* Tests that applyOps correctly respects the 'oplogApplicationMode' and 'alwaysUpsert' flags.
|
||||
* 'alwaysUpsert' defaults to true and 'oplogApplicationMode' defaults to 'ApplyOps'. We test
|
||||
* that these default values do not lead to command failure.
|
||||
*/
|
||||
|
||||
(function() {
|
||||
'use strict';
|
||||
|
||||
var standalone = MongoRunner.runMongod();
|
||||
var db = standalone.getDB("test");
|
||||
|
||||
var coll = db.getCollection("apply_ops_mode1");
|
||||
coll.drop();
|
||||
assert.writeOK(coll.insert({_id: 1}));
|
||||
|
||||
// ------------ Testing normal updates ---------------
|
||||
|
||||
var id = ObjectId();
|
||||
var updateOp = {op: 'u', ns: coll.getFullName(), o: {_id: id, x: 1}, o2: {_id: id}};
|
||||
assert.commandFailed(db.adminCommand({applyOps: [updateOp], alwaysUpsert: false}));
|
||||
assert.eq(coll.count({x: 1}), 0);
|
||||
|
||||
// Test that 'InitialSync' does not override 'alwaysUpsert: false'.
|
||||
assert.commandFailed(db.adminCommand(
|
||||
{applyOps: [updateOp], alwaysUpsert: false, oplogApplicationMode: "InitialSync"}));
|
||||
assert.eq(coll.count({x: 1}), 0);
|
||||
|
||||
// Test parsing failure.
|
||||
assert.commandFailedWithCode(
|
||||
db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "BadMode"}),
|
||||
ErrorCodes.FailedToParse);
|
||||
assert.commandFailedWithCode(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: 5}),
|
||||
ErrorCodes.TypeMismatch);
|
||||
|
||||
// Test default succeeds.
|
||||
assert.commandWorked(db.adminCommand({applyOps: [updateOp]}));
|
||||
assert.eq(coll.count({x: 1}), 1);
|
||||
|
||||
// Use new collection to make logs cleaner.
|
||||
coll = db.getCollection("apply_ops_mode2");
|
||||
coll.drop();
|
||||
updateOp.ns = coll.getFullName();
|
||||
assert.writeOK(coll.insert({_id: 1}));
|
||||
|
||||
// Test default succeeds in 'InitialSync' mode.
|
||||
assert.commandWorked(
|
||||
db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "InitialSync"}));
|
||||
assert.eq(coll.count({x: 1}), 1);
|
||||
|
||||
// ------------ Testing fCV updates ---------------
|
||||
|
||||
var adminDB = db.getSiblingDB("admin");
|
||||
const systemVersionColl = adminDB.getCollection("system.version");
|
||||
|
||||
updateOp = {
|
||||
op: 'u',
|
||||
ns: systemVersionColl.getFullName(),
|
||||
o: {_id: "featureCompatibilityVersion", version: "3.4"},
|
||||
o2: {_id: "featureCompatibilityVersion"}
|
||||
};
|
||||
assert.commandFailed(
|
||||
db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "InitialSync"}));
|
||||
|
||||
assert.commandWorked(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "ApplyOps"}));
|
||||
|
||||
// Test default succeeds.
|
||||
updateOp.o.targetVersion = "3.6";
|
||||
assert.commandWorked(db.adminCommand({
|
||||
applyOps: [updateOp],
|
||||
}));
|
||||
|
||||
// ------------ Testing commands on the fCV collection ---------------
|
||||
|
||||
var collModOp = {
|
||||
op: 'c',
|
||||
ns: systemVersionColl.getDB() + ".$cmd",
|
||||
o: {collMod: systemVersionColl.getName(), validationLevel: "off"},
|
||||
};
|
||||
assert.commandFailed(
|
||||
db.adminCommand({applyOps: [collModOp], oplogApplicationMode: "InitialSync"}));
|
||||
|
||||
assert.commandWorked(
|
||||
db.adminCommand({applyOps: [collModOp], oplogApplicationMode: "ApplyOps"}));
|
||||
|
||||
// Test default succeeds.
|
||||
collModOp.o.usePowerOf2Sizes = true;
|
||||
assert.commandWorked(db.adminCommand({
|
||||
applyOps: [collModOp],
|
||||
}));
|
||||
|
||||
})();
|
@ -58,6 +58,7 @@ namespace mongo {
|
||||
namespace {
|
||||
|
||||
const auto kPreconditionFieldName = "preCondition"_sd;
|
||||
const auto kOplogApplicationModeFieldName = "oplogApplicationMode"_sd;
|
||||
|
||||
// If enabled, causes loop in _applyOps() to hang after applying current operation.
|
||||
MONGO_FP_DECLARE(applyOpsPauseBetweenOperations);
|
||||
@ -115,6 +116,30 @@ Status _applyOps(OperationContext* opCtx,
|
||||
applyOpCmd.hasField("alwaysUpsert") ? applyOpCmd["alwaysUpsert"].trueValue() : true;
|
||||
const bool haveWrappingWUOW = opCtx->lockState()->inAWriteUnitOfWork();
|
||||
|
||||
// TODO (SERVER-31384): This code is run when applying 'applyOps' oplog entries. Pass through
|
||||
// oplog application mode from applyCommand_inlock as default and consider proper behavior
|
||||
// if default differs from oplog entry.
|
||||
repl::OplogApplication::Mode oplogApplicationMode = repl::OplogApplication::Mode::kApplyOps;
|
||||
std::string oplogApplicationModeString;
|
||||
auto status = bsonExtractStringField(
|
||||
applyOpCmd, kOplogApplicationModeFieldName, &oplogApplicationModeString);
|
||||
if (status.isOK()) {
|
||||
auto modeSW = repl::OplogApplication::parseMode(oplogApplicationModeString);
|
||||
if (!modeSW.isOK()) {
|
||||
return Status(modeSW.getStatus().code(),
|
||||
str::stream() << "Could not parse " << kOplogApplicationModeFieldName
|
||||
<< ": "
|
||||
<< modeSW.getStatus().reason());
|
||||
}
|
||||
oplogApplicationMode = modeSW.getValue();
|
||||
} else if (status != ErrorCodes::NoSuchKey) {
|
||||
// NoSuchKey means the user did not supply a mode.
|
||||
return Status(status.code(),
|
||||
str::stream() << "Could not parse out " << kOplogApplicationModeFieldName
|
||||
<< ": "
|
||||
<< status.reason());
|
||||
}
|
||||
|
||||
while (i.more()) {
|
||||
BSONElement e = i.next();
|
||||
const BSONObj& opObj = e.Obj();
|
||||
@ -172,17 +197,19 @@ Status _applyOps(OperationContext* opCtx,
|
||||
OldClientContext ctx(opCtx, nss.ns());
|
||||
|
||||
status = repl::applyOperation_inlock(
|
||||
opCtx, ctx.db(), opObj, alwaysUpsert, repl::OplogApplication::Mode::kApplyOps);
|
||||
opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
|
||||
if (!status.isOK())
|
||||
return status;
|
||||
} else {
|
||||
try {
|
||||
status = writeConflictRetry(
|
||||
opCtx, "applyOps", nss.ns(), [opCtx, nss, opObj, opType, alwaysUpsert] {
|
||||
opCtx,
|
||||
"applyOps",
|
||||
nss.ns(),
|
||||
[opCtx, nss, opObj, opType, alwaysUpsert, oplogApplicationMode] {
|
||||
if (*opType == 'c') {
|
||||
invariant(opCtx->lockState()->isW());
|
||||
return repl::applyCommand_inlock(
|
||||
opCtx, opObj, repl::OplogApplication::Mode::kApplyOps);
|
||||
return repl::applyCommand_inlock(opCtx, opObj, oplogApplicationMode);
|
||||
}
|
||||
|
||||
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
|
||||
@ -204,11 +231,7 @@ Status _applyOps(OperationContext* opCtx,
|
||||
|
||||
if (!nss.isSystemDotIndexes()) {
|
||||
return repl::applyOperation_inlock(
|
||||
opCtx,
|
||||
ctx.db(),
|
||||
opObj,
|
||||
alwaysUpsert,
|
||||
repl::OplogApplication::Mode::kApplyOps);
|
||||
opCtx, ctx.db(), opObj, alwaysUpsert, oplogApplicationMode);
|
||||
}
|
||||
|
||||
auto fieldO = opObj["o"];
|
||||
|
Loading…
Reference in New Issue
Block a user