0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

SERVER-35365 MapReduce should write "inc" collections to "local" db.

This commit is contained in:
Justin Seyster 2018-06-07 15:23:06 -04:00
parent 6e82b67a67
commit 30ac39448a
2 changed files with 102 additions and 17 deletions

View File

@ -0,0 +1,83 @@
// All collections created during a map-reduce should be replicated to secondaries unless they are
// in the "local" or "admin" databases. Any collection outside of "local" that does not get
// replicated is a potential problem for workloads with transactions (see SERVER-35365 and
// SERVER-35282).
//
// We verify this requirement by running a map-reduce, examining the logs to find the names of
// all collections created, and checking the oplog for entries logging the creation of each of those
// collections.
(function() {
"use strict";
load("jstests/libs/check_log.js");
const name = "mr_nonrepl_coll_in_local_db";
const replSet = new ReplSetTest({name: name, nodes: 2});
replSet.startSet();
replSet.initiate();
const dbName = name;
const collName = "test";
const primary = replSet.getPrimary();
const primaryDB = primary.getDB(dbName);
const coll = primaryDB[collName];
// Insert 1000 documents in the "test" collection.
const bulk = coll.initializeUnorderedBulkOp();
for (let i = 0; i < 1000; i++) {
const array = Array.from({lengthToInsert: 10000}, _ => Math.floor(Math.random() * 100));
bulk.insert({arr: array});
}
assert.writeOK(bulk.execute());
// Run a simple map-reduce.
const result = coll.mapReduce(
function map() {
return this.arr.forEach(element => emit(element, 1));
},
function reduce(key, values) {
return Array.sum(values);
},
{query: {arr: {$exists: true}}, out: "mr_result"});
assert.commandWorked(result);
// Examine the logs to find a list of created collections.
const logLines = checkLog.getGlobalLog(primaryDB);
let createdCollections = [];
logLines.forEach(function(line) {
let matchResult = line.match(/createCollection: (.+) with/);
if (matchResult) {
createdCollections.push(matchResult[1]);
}
});
createdCollections.forEach(function(createdCollectionName) {
if (createdCollectionName.startsWith("admin.")) {
// Although the "admin.system.version" collection is replicated, no "c" entry gets
// created for it in the oplog, so this test would see it as unreplicated. In general,
// this test is not concerned with the "admin" database, so we don't examine any "admin"
// collections.
return;
}
// Search for a log entry for the creation of this collection.
const oplogEntries = primaryDB.getSiblingDB("local")["oplog.rs"]
.find({op: "c", "o.idIndex.ns": createdCollectionName})
.toArray();
if (createdCollectionName.startsWith("local.")) {
// We do not want to see any replication of "local" collections.
assert.eq(oplogEntries.length,
0,
"Found unexpected oplog entry for creation of " + createdCollectionName +
": " + tojson(oplogEntries));
} else {
assert.eq(oplogEntries.length,
1,
"Found no oplog entry or too many entries for creation of " +
createdCollectionName + ": " + tojson(oplogEntries));
}
});
replSet.stopSet();
}());

View File

@ -304,12 +304,20 @@ Config::Config(const string& _dbname, const BSONObj& cmdObj) {
shardedFirstPass = true;
}
if (outputOptions.outType != INMEMORY) { // setup temp collection name
tempNamespace = NamespaceString(
outputOptions.outDB.empty() ? dbname : outputOptions.outDB,
str::stream() << "tmp.mr." << cmdObj.firstElement().valueStringData() << "_"
<< JOB_NUMBER.fetchAndAdd(1));
incLong = NamespaceString(str::stream() << tempNamespace.ns() << "_inc");
if (outputOptions.outType != INMEMORY) {
// Create names for the temp collection and the incremental collection. The incremental
// collection goes in the "local" database, so that it doesn't get replicated.
const std::string& outDBName = outputOptions.outDB.empty() ? dbname : outputOptions.outDB;
const std::string tmpCollDesc = str::stream()
<< "tmp.mr." << cmdObj.firstElement().valueStringData() << "_"
<< JOB_NUMBER.fetchAndAdd(1);
tempNamespace = NamespaceString(outDBName, tmpCollDesc);
// The name of the incremental collection includes the name of the database that we put
// temporary collection in, to make it easier to see which incremental database is paired
// with which temporary database when debugging.
incLong =
NamespaceString("local", str::stream() << tmpCollDesc << "_" << outDBName << "_inc");
}
{
@ -387,10 +395,6 @@ void State::dropTempCollections() {
ShardConnection::forgetNS(_config.tempNamespace.ns());
}
if (_useIncremental && !_config.incLong.isEmpty()) {
// We don't want to log the deletion of incLong as it isn't replicated. While
// harmless, this would lead to a scary looking warning on the secondaries.
repl::UnreplicatedWritesBlock uwb(_opCtx);
writeConflictRetry(_opCtx, "M/R dropTempCollections", _config.incLong.ns(), [this] {
Lock::DBLock lk(_opCtx, _config.incLong.db(), MODE_X);
if (Database* db =
@ -414,10 +418,8 @@ void State::prepTempCollection() {
dropTempCollections();
if (_useIncremental) {
// Create the inc collection and make sure we have index on "0" key.
// Intentionally not replicating the inc collection to secondaries.
repl::UnreplicatedWritesBlock uwb(_opCtx);
// Create the inc collection and make sure we have index on "0" key. The inc collection is
// in the "local" database, so it does not get replicated to secondaries.
writeConflictRetry(_opCtx, "M/R prepTempCollection", _config.incLong.ns(), [this] {
OldClientWriteContext incCtx(_opCtx, _config.incLong.ns());
WriteUnitOfWork wuow(_opCtx);
@ -783,7 +785,8 @@ void State::insert(const NamespaceString& nss, const BSONObj& o) {
}
/**
* Insert doc into the inc collection. This should not be replicated.
* Insert doc into the inc collection. The inc collection is in the "local" database, so this insert
* will not be replicated.
*/
void State::_insertToInc(BSONObj& o) {
verify(_onDisk);
@ -792,7 +795,6 @@ void State::_insertToInc(BSONObj& o) {
OldClientWriteContext ctx(_opCtx, _config.incLong.ns());
WriteUnitOfWork wuow(_opCtx);
Collection* coll = getCollectionOrUassert(_opCtx, ctx.db(), _config.incLong);
repl::UnreplicatedWritesBlock uwb(_opCtx);
// The documents inserted into the incremental collection are of the form
// {"0": <key>, "1": <value>}, so we cannot call fixDocumentForInsert(o) here because the
@ -1880,5 +1882,5 @@ public:
} mapReduceFinishCommand;
} // namespace
} // namespace mr
} // namespace mongo