mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-45565: Modified Oplog Batcher behavior to process large transaction oplog entries in separate batches
create mode 100644 jstests/noPassthrough/large_txn_correctness.js
This commit is contained in:
parent
82424b7423
commit
9557a35b77
55
jstests/noPassthrough/large_txn_correctness.js
Normal file
55
jstests/noPassthrough/large_txn_correctness.js
Normal file
@ -0,0 +1,55 @@
|
||||
/**
|
||||
* This test serves to ensure that the oplog batcher behavior correctly processes large transactions
|
||||
* so that it does not cause any correctness problems.
|
||||
*
|
||||
* @tags: [requires_journaling]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
load("jstests/core/txns/libs/prepare_helpers.js");
|
||||
|
||||
// Declare constants.
|
||||
const DB_NAME = "db_large_txn_correctness";
|
||||
const COLL_NAME = "db_large_txn_correctness";
|
||||
|
||||
// Make a large document of size 'numMB' so that it can easily fill up an oplog entry.
|
||||
const makeLargeDoc = numMB => new Array(numMB * 1024 * 1024).join('a');
|
||||
|
||||
// Spin up a replica set.
|
||||
const replSet = new ReplSetTest({nodes: 1});
|
||||
replSet.startSet();
|
||||
replSet.initiate();
|
||||
const primary = replSet.getPrimary();
|
||||
|
||||
const session = primary.startSession();
|
||||
|
||||
// Creating a collection so the first test can just test if regular CRUD operations work.
|
||||
session.getDatabase(DB_NAME).createCollection(COLL_NAME);
|
||||
|
||||
let commitRes;
|
||||
|
||||
try {
|
||||
// Perform a large transaction (>16MB) with only CRUD operations to ensure that nothing
|
||||
// fundamental is broken.
|
||||
session.startTransaction();
|
||||
session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
|
||||
session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
|
||||
commitRes = session.commitTransaction_forTesting();
|
||||
assert.eq(1, commitRes.ok);
|
||||
|
||||
// Ensure that the collection has been dropped so that collection creation can be tested
|
||||
// in a txn.
|
||||
session.getDatabase(DB_NAME)[COLL_NAME].drop();
|
||||
|
||||
// Create a large transaction (>16MB) with a command and ensure that it works.
|
||||
session.startTransaction();
|
||||
session.getDatabase(DB_NAME).createCollection(COLL_NAME);
|
||||
session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
|
||||
session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
|
||||
commitRes = session.commitTransaction_forTesting();
|
||||
assert.eq(1, commitRes.ok);
|
||||
} finally {
|
||||
session.endSession();
|
||||
replSet.stopSet();
|
||||
}
|
||||
})();
|
@ -185,6 +185,73 @@ OplogEntry makeCommitTransactionOplogEntry(int t, StringData dbName, bool prepar
|
||||
boost::none); // post-image optime
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates oplog entries that are meant to be all parts of a mocked large transaction. This function
|
||||
* does the following:
|
||||
*
|
||||
* 1. If we intend to make the first oplog entry of the transaction, we add a Null prevOptime to
|
||||
* denote that there is no entry that comes before this one. This entry will just be a applyOps.
|
||||
* 2. If we intend to make the last oplog entry of the transaction, then we make a commit oplog
|
||||
* entry.
|
||||
* 3. Otherwise, we create applyOps oplog entries that denote all of the intermediate oplog entries.
|
||||
*/
|
||||
OplogEntry makeLargeTransactionOplogEntries(
|
||||
int t, bool prepared, bool isFirst, bool isLast, int curr, int count) {
|
||||
auto nss = NamespaceString(NamespaceString::kAdminDb).getCommandNS();
|
||||
OpTime prevWriteOpTime = isFirst ? OpTime() : OpTime(Timestamp(t - 1, 1), 1);
|
||||
BSONObj oField;
|
||||
if (isLast) {
|
||||
// Makes a commit oplog entry if this is the last oplog entry we wish to create.
|
||||
if (prepared) {
|
||||
CommitTransactionOplogObject cmdObj;
|
||||
cmdObj.setCount(count);
|
||||
oField = cmdObj.toBSON();
|
||||
} else {
|
||||
oField = BSON("applyOps" << BSONArray() << "count" << count);
|
||||
}
|
||||
} else {
|
||||
BSONObjBuilder oFieldBuilder;
|
||||
oFieldBuilder.append("applyOps", BSONArray());
|
||||
if (prepared && curr == count - 1) {
|
||||
oFieldBuilder.append("prepare", true);
|
||||
}
|
||||
oFieldBuilder.append("partialTxn", true);
|
||||
oField = oFieldBuilder.obj();
|
||||
}
|
||||
return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime
|
||||
boost::none, // hash
|
||||
OpTypeEnum::kCommand, // op type
|
||||
nss, // namespace
|
||||
boost::none, // uuid
|
||||
boost::none, // fromMigrate
|
||||
OplogEntry::kOplogVersion, // version
|
||||
oField, // o
|
||||
boost::none, // o2
|
||||
{}, // sessionInfo
|
||||
boost::none, // upsert
|
||||
Date_t() + Seconds(t), // wall clock time
|
||||
boost::none, // statement id
|
||||
prevWriteOpTime, // optime of previous write within same transaction
|
||||
boost::none, // pre-image optime
|
||||
boost::none); // post-image optime
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a mock large-transaction which has more than one oplog entry.
|
||||
*/
|
||||
std::vector<OplogEntry> makeMultiEntryTransactionOplogEntries(int t,
|
||||
StringData dbName,
|
||||
bool prepared,
|
||||
int count) {
|
||||
ASSERT_GTE(count, 2);
|
||||
std::vector<OplogEntry> vec;
|
||||
for (int i = 0; i < count; i++) {
|
||||
vec.push_back(makeLargeTransactionOplogEntries(
|
||||
t + i, prepared, i == 0, i == count - 1, i + 1, count));
|
||||
}
|
||||
return vec;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns string representation of std::vector<OplogEntry>.
|
||||
*/
|
||||
@ -401,6 +468,47 @@ TEST_F(OplogApplierTest,
|
||||
ASSERT_EQUALS(srcOps[1], batch[0]);
|
||||
}
|
||||
|
||||
TEST_F(OplogApplierTest, LastOpInLargeTransactionIsProcessedIndividually) {
|
||||
std::vector<OplogEntry> srcOps;
|
||||
srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")));
|
||||
|
||||
// Makes entries with ts from range [2, 5).
|
||||
std::vector<OplogEntry> multiEntryTransaction =
|
||||
makeMultiEntryTransactionOplogEntries(2, dbName, /* prepared */ false, /* num entries*/ 3);
|
||||
for (auto entry : multiEntryTransaction) {
|
||||
srcOps.push_back(entry);
|
||||
}
|
||||
|
||||
// Push one extra operation to ensure that the last oplog entry of a large transaction
|
||||
// is processed by itself.
|
||||
srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")));
|
||||
|
||||
_applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
|
||||
|
||||
// Set large enough batch limit to ensure that batcher is not batching because of limit, but
|
||||
// rather because it encountered the final oplog entry of a large transaction.
|
||||
_limits.ops = 10U;
|
||||
|
||||
// First batch: [insert, applyOps, applyOps]
|
||||
auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
|
||||
ASSERT_EQUALS(3U, batch.size()) << toString(batch);
|
||||
ASSERT_EQUALS(srcOps[0], batch[0]);
|
||||
ASSERT_EQUALS(srcOps[1], batch[1]);
|
||||
ASSERT_EQUALS(srcOps[2], batch[2]);
|
||||
|
||||
// Second batch: [applyOps]. The last oplog entry of a large transaction must be processed by
|
||||
// itself.
|
||||
batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
|
||||
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
|
||||
ASSERT_EQUALS(srcOps[3], batch[0]);
|
||||
|
||||
// Third batch: [insert]. The this confirms that the last oplog entry of a large txn will be
|
||||
// batched individually.
|
||||
batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
|
||||
ASSERT_EQUALS(1U, batch.size()) << toString(batch);
|
||||
ASSERT_EQUALS(srcOps[4], batch[0]);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace repl
|
||||
} // namespace mongo
|
||||
|
@ -109,23 +109,32 @@ bool isUnpreparedCommit(const OplogEntry& entry) {
|
||||
*
|
||||
* Commands, in most cases, must be processed one at a time. The exceptions to this rule are
|
||||
* unprepared applyOps and unprepared commitTransaction for transactions that only contain CRUD
|
||||
* operations. These two cases expand to CRUD operations, which can be safely batched with other
|
||||
* CRUD operations. All other command oplog entries, including unprepared applyOps/commitTransaction
|
||||
* for transactions that contain commands, must be processed in their own batch.
|
||||
* operations and commands found within large transactions (>16MB). The prior two cases expand to
|
||||
* CRUD operations, which can be safely batched with other CRUD operations. All other command oplog
|
||||
* entries, including unprepared applyOps/commitTransaction for transactions that contain commands,
|
||||
* must be processed in their own batch.
|
||||
* Note that 'unprepared applyOps' could mean a partial transaction oplog entry, an implicit commit
|
||||
* applyOps oplog entry, or an atomic applyOps oplog entry outside of a transaction.
|
||||
*
|
||||
* Command operations inside large transactions do not need to be processed individually as long as
|
||||
* the final oplog entry in the transaction is processed individually, since the operations are not
|
||||
* actually run until the commit operation is reached.
|
||||
*
|
||||
* Oplog entries on 'system.views' should also be processed one at a time. View catalog immediately
|
||||
* reflects changes for each oplog entry so we can see inconsistent view catalog if multiple oplog
|
||||
* entries on 'system.views' are being applied out of the original order.
|
||||
*
|
||||
* Process updates to 'admin.system.version' individually as well so the secondary's FCV when
|
||||
* processing each operation matches the primary's when committing that operation.
|
||||
*
|
||||
* The ends of large transactions (> 16MB) should also be processed immediately on its own in order
|
||||
* to avoid scenarios where parts of the transaction is batched with other operations not in the
|
||||
* transaction.
|
||||
*/
|
||||
bool mustProcessIndividually(const OplogEntry& entry) {
|
||||
if (entry.isCommand()) {
|
||||
if (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare() ||
|
||||
entry.isTransactionWithCommand()) {
|
||||
entry.isSingleOplogEntryTransactionWithCommand() || entry.isEndOfLargeTransaction()) {
|
||||
return true;
|
||||
} else {
|
||||
// This branch covers unprepared CRUD applyOps and unprepared CRUD commits.
|
||||
|
@ -353,16 +353,44 @@ bool OplogEntry::shouldPrepare() const {
|
||||
getObject()[ApplyOpsCommandInfoBase::kPrepareFieldName].booleanSafe();
|
||||
}
|
||||
|
||||
bool OplogEntry::isTransactionWithCommand() const {
|
||||
bool OplogEntry::isSingleOplogEntryTransaction() const {
|
||||
if (getCommandType() != CommandType::kApplyOps || !getTxnNumber() || !getSessionId() ||
|
||||
getObject()[ApplyOpsCommandInfoBase::kPartialTxnFieldName].booleanSafe()) {
|
||||
return false;
|
||||
}
|
||||
auto prevOptimeOpt = getPrevWriteOpTimeInTransaction();
|
||||
if (!prevOptimeOpt) {
|
||||
// If there is no prevWriteOptime, then this oplog entry is not a part of a transaction.
|
||||
return false;
|
||||
}
|
||||
return prevOptimeOpt->isNull();
|
||||
}
|
||||
|
||||
bool OplogEntry::isEndOfLargeTransaction() const {
|
||||
if (getCommandType() != CommandType::kApplyOps) {
|
||||
// If the oplog entry is neither commit nor abort, then it must be an applyOps. Otherwise,
|
||||
// it cannot be a termainal oplog entry of a large transaction.
|
||||
return false;
|
||||
}
|
||||
auto prevOptimeOpt = getPrevWriteOpTimeInTransaction();
|
||||
if (!prevOptimeOpt) {
|
||||
// If the oplog entry is neither commit nor abort, then it must be an applyOps. Otherwise,
|
||||
// it cannot be a terminal oplog entry of a large transaction.
|
||||
return false;
|
||||
}
|
||||
// There should be a previous oplog entry in a multiple oplog entry transaction if this is
|
||||
// supposed to be the last one. The first oplog entry in a large transaction will have a null
|
||||
// ts.
|
||||
return !prevOptimeOpt->isNull() && !isPartialTransaction();
|
||||
}
|
||||
|
||||
bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const {
|
||||
if (!isSingleOplogEntryTransaction()) {
|
||||
return false;
|
||||
}
|
||||
// Since we know that this oplog entry at this point is part of a transaction, we can safely
|
||||
// assume that it has an applyOps field.
|
||||
auto applyOps = getObject().getField("applyOps");
|
||||
if (applyOps.eoo()) {
|
||||
return false;
|
||||
}
|
||||
if (!getTxnNumber() || !getSessionId()) {
|
||||
// Only transactions can produce applyOps oplog entries with transaction numbers and
|
||||
// session IDs.
|
||||
return false;
|
||||
}
|
||||
// Iterating through the entire applyOps array is not optimal for performance. A potential
|
||||
// optimization, if necessary, could be to ensure the primary always constructs applyOps oplog
|
||||
// entries with commands at the beginning.
|
||||
|
@ -316,6 +316,11 @@ public:
|
||||
return getObject()[ApplyOpsCommandInfoBase::kPartialTxnFieldName].booleanSafe();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether if the oplog entry is the last applyOps in a multiple-entry transaction.
|
||||
*/
|
||||
bool isEndOfLargeTransaction() const;
|
||||
|
||||
/**
|
||||
* Returns if this is a prepared 'commitTransaction' oplog entry.
|
||||
*/
|
||||
@ -334,10 +339,15 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the oplog entry represents an applyOps with a commnd inside. This will occur
|
||||
* if a multi-document transaction performs a command.
|
||||
* Returns whether the oplog entry represents a single oplog entry transaction.
|
||||
*/
|
||||
bool isTransactionWithCommand() const;
|
||||
bool isSingleOplogEntryTransaction() const;
|
||||
|
||||
/**
|
||||
* Returns whether the oplog entry represents an applyOps with a command inside. This is only
|
||||
* for transactions with only one oplog entry.
|
||||
*/
|
||||
bool isSingleOplogEntryTransactionWithCommand() const;
|
||||
|
||||
/**
|
||||
* Returns if the oplog entry is for a CRUD operation.
|
||||
|
Loading…
Reference in New Issue
Block a user