From 9557a35b779a5f6c1a2453514ec9cc67119b288b Mon Sep 17 00:00:00 2001 From: Andrew Chen Date: Wed, 4 Mar 2020 16:50:20 -0500 Subject: [PATCH] SERVER-45565: Modified Oplog Batcher behavior to process large transaction oplog entries in separate batches create mode 100644 jstests/noPassthrough/large_txn_correctness.js --- .../noPassthrough/large_txn_correctness.js | 55 +++++++++ src/mongo/db/repl/oplog_applier_test.cpp | 108 ++++++++++++++++++ src/mongo/db/repl/oplog_batcher.cpp | 17 ++- src/mongo/db/repl/oplog_entry.cpp | 46 ++++++-- src/mongo/db/repl/oplog_entry.h | 16 ++- 5 files changed, 226 insertions(+), 16 deletions(-) create mode 100644 jstests/noPassthrough/large_txn_correctness.js diff --git a/jstests/noPassthrough/large_txn_correctness.js b/jstests/noPassthrough/large_txn_correctness.js new file mode 100644 index 00000000000..5f240dd3cca --- /dev/null +++ b/jstests/noPassthrough/large_txn_correctness.js @@ -0,0 +1,55 @@ +/** + * This test serves to ensure that the oplog batcher behavior correctly processes large transactions + * so that it does not cause any correctness problems. + * + * @tags: [requires_journaling] + */ +(function() { +"use strict"; +load("jstests/core/txns/libs/prepare_helpers.js"); + +// Declare constants. +const DB_NAME = "db_large_txn_correctness"; +const COLL_NAME = "db_large_txn_correctness"; + +// Make a large document of size 'numMB' so that it can easily fill up an oplog entry. +const makeLargeDoc = numMB => new Array(numMB * 1024 * 1024).join('a'); + +// Spin up a replica set. +const replSet = new ReplSetTest({nodes: 1}); +replSet.startSet(); +replSet.initiate(); +const primary = replSet.getPrimary(); + +const session = primary.startSession(); + +// Creating a collection so the first test can just test if regular CRUD operations work. +session.getDatabase(DB_NAME).createCollection(COLL_NAME); + +let commitRes; + +try { + // Perform a large transaction (>16MB) with only CRUD operations to ensure that nothing + // fundamental is broken. + session.startTransaction(); + session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)}); + session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)}); + commitRes = session.commitTransaction_forTesting(); + assert.eq(1, commitRes.ok); + + // Ensure that the collection has been dropped so that collection creation can be tested + // in a txn. + session.getDatabase(DB_NAME)[COLL_NAME].drop(); + + // Create a large transaction (>16MB) with a command and ensure that it works. + session.startTransaction(); + session.getDatabase(DB_NAME).createCollection(COLL_NAME); + session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)}); + session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)}); + commitRes = session.commitTransaction_forTesting(); + assert.eq(1, commitRes.ok); +} finally { + session.endSession(); + replSet.stopSet(); +} +})(); diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp index efd8a75a421..4e8e915db35 100644 --- a/src/mongo/db/repl/oplog_applier_test.cpp +++ b/src/mongo/db/repl/oplog_applier_test.cpp @@ -185,6 +185,73 @@ OplogEntry makeCommitTransactionOplogEntry(int t, StringData dbName, bool prepar boost::none); // post-image optime } +/** + * Creates oplog entries that are meant to be all parts of a mocked large transaction. This function + * does the following: + * + * 1. If we intend to make the first oplog entry of the transaction, we add a Null prevOptime to + * denote that there is no entry that comes before this one. This entry will just be a applyOps. + * 2. If we intend to make the last oplog entry of the transaction, then we make a commit oplog + * entry. + * 3. Otherwise, we create applyOps oplog entries that denote all of the intermediate oplog entries. + */ +OplogEntry makeLargeTransactionOplogEntries( + int t, bool prepared, bool isFirst, bool isLast, int curr, int count) { + auto nss = NamespaceString(NamespaceString::kAdminDb).getCommandNS(); + OpTime prevWriteOpTime = isFirst ? OpTime() : OpTime(Timestamp(t - 1, 1), 1); + BSONObj oField; + if (isLast) { + // Makes a commit oplog entry if this is the last oplog entry we wish to create. + if (prepared) { + CommitTransactionOplogObject cmdObj; + cmdObj.setCount(count); + oField = cmdObj.toBSON(); + } else { + oField = BSON("applyOps" << BSONArray() << "count" << count); + } + } else { + BSONObjBuilder oFieldBuilder; + oFieldBuilder.append("applyOps", BSONArray()); + if (prepared && curr == count - 1) { + oFieldBuilder.append("prepare", true); + } + oFieldBuilder.append("partialTxn", true); + oField = oFieldBuilder.obj(); + } + return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kCommand, // op type + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t() + Seconds(t), // wall clock time + boost::none, // statement id + prevWriteOpTime, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none); // post-image optime +} + +/** + * Generates a mock large-transaction which has more than one oplog entry. + */ +std::vector makeMultiEntryTransactionOplogEntries(int t, + StringData dbName, + bool prepared, + int count) { + ASSERT_GTE(count, 2); + std::vector vec; + for (int i = 0; i < count; i++) { + vec.push_back(makeLargeTransactionOplogEntries( + t + i, prepared, i == 0, i == count - 1, i + 1, count)); + } + return vec; +} + /** * Returns string representation of std::vector. */ @@ -401,6 +468,47 @@ TEST_F(OplogApplierTest, ASSERT_EQUALS(srcOps[1], batch[0]); } +TEST_F(OplogApplierTest, LastOpInLargeTransactionIsProcessedIndividually) { + std::vector srcOps; + srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar"))); + + // Makes entries with ts from range [2, 5). + std::vector multiEntryTransaction = + makeMultiEntryTransactionOplogEntries(2, dbName, /* prepared */ false, /* num entries*/ 3); + for (auto entry : multiEntryTransaction) { + srcOps.push_back(entry); + } + + // Push one extra operation to ensure that the last oplog entry of a large transaction + // is processed by itself. + srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar"))); + + _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + + // Set large enough batch limit to ensure that batcher is not batching because of limit, but + // rather because it encountered the final oplog entry of a large transaction. + _limits.ops = 10U; + + // First batch: [insert, applyOps, applyOps] + auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(3U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[0], batch[0]); + ASSERT_EQUALS(srcOps[1], batch[1]); + ASSERT_EQUALS(srcOps[2], batch[2]); + + // Second batch: [applyOps]. The last oplog entry of a large transaction must be processed by + // itself. + batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(1U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[3], batch[0]); + + // Third batch: [insert]. The this confirms that the last oplog entry of a large txn will be + // batched individually. + batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(1U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[4], batch[0]); +} + } // namespace } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp index 21d154f2e39..83809ce0b81 100644 --- a/src/mongo/db/repl/oplog_batcher.cpp +++ b/src/mongo/db/repl/oplog_batcher.cpp @@ -109,23 +109,32 @@ bool isUnpreparedCommit(const OplogEntry& entry) { * * Commands, in most cases, must be processed one at a time. The exceptions to this rule are * unprepared applyOps and unprepared commitTransaction for transactions that only contain CRUD - * operations. These two cases expand to CRUD operations, which can be safely batched with other - * CRUD operations. All other command oplog entries, including unprepared applyOps/commitTransaction - * for transactions that contain commands, must be processed in their own batch. + * operations and commands found within large transactions (>16MB). The prior two cases expand to + * CRUD operations, which can be safely batched with other CRUD operations. All other command oplog + * entries, including unprepared applyOps/commitTransaction for transactions that contain commands, + * must be processed in their own batch. * Note that 'unprepared applyOps' could mean a partial transaction oplog entry, an implicit commit * applyOps oplog entry, or an atomic applyOps oplog entry outside of a transaction. * + * Command operations inside large transactions do not need to be processed individually as long as + * the final oplog entry in the transaction is processed individually, since the operations are not + * actually run until the commit operation is reached. + * * Oplog entries on 'system.views' should also be processed one at a time. View catalog immediately * reflects changes for each oplog entry so we can see inconsistent view catalog if multiple oplog * entries on 'system.views' are being applied out of the original order. * * Process updates to 'admin.system.version' individually as well so the secondary's FCV when * processing each operation matches the primary's when committing that operation. + * + * The ends of large transactions (> 16MB) should also be processed immediately on its own in order + * to avoid scenarios where parts of the transaction is batched with other operations not in the + * transaction. */ bool mustProcessIndividually(const OplogEntry& entry) { if (entry.isCommand()) { if (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare() || - entry.isTransactionWithCommand()) { + entry.isSingleOplogEntryTransactionWithCommand() || entry.isEndOfLargeTransaction()) { return true; } else { // This branch covers unprepared CRUD applyOps and unprepared CRUD commits. diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 0412f6b2362..980630a3579 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -353,16 +353,44 @@ bool OplogEntry::shouldPrepare() const { getObject()[ApplyOpsCommandInfoBase::kPrepareFieldName].booleanSafe(); } -bool OplogEntry::isTransactionWithCommand() const { +bool OplogEntry::isSingleOplogEntryTransaction() const { + if (getCommandType() != CommandType::kApplyOps || !getTxnNumber() || !getSessionId() || + getObject()[ApplyOpsCommandInfoBase::kPartialTxnFieldName].booleanSafe()) { + return false; + } + auto prevOptimeOpt = getPrevWriteOpTimeInTransaction(); + if (!prevOptimeOpt) { + // If there is no prevWriteOptime, then this oplog entry is not a part of a transaction. + return false; + } + return prevOptimeOpt->isNull(); +} + +bool OplogEntry::isEndOfLargeTransaction() const { + if (getCommandType() != CommandType::kApplyOps) { + // If the oplog entry is neither commit nor abort, then it must be an applyOps. Otherwise, + // it cannot be a termainal oplog entry of a large transaction. + return false; + } + auto prevOptimeOpt = getPrevWriteOpTimeInTransaction(); + if (!prevOptimeOpt) { + // If the oplog entry is neither commit nor abort, then it must be an applyOps. Otherwise, + // it cannot be a terminal oplog entry of a large transaction. + return false; + } + // There should be a previous oplog entry in a multiple oplog entry transaction if this is + // supposed to be the last one. The first oplog entry in a large transaction will have a null + // ts. + return !prevOptimeOpt->isNull() && !isPartialTransaction(); +} + +bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const { + if (!isSingleOplogEntryTransaction()) { + return false; + } + // Since we know that this oplog entry at this point is part of a transaction, we can safely + // assume that it has an applyOps field. auto applyOps = getObject().getField("applyOps"); - if (applyOps.eoo()) { - return false; - } - if (!getTxnNumber() || !getSessionId()) { - // Only transactions can produce applyOps oplog entries with transaction numbers and - // session IDs. - return false; - } // Iterating through the entire applyOps array is not optimal for performance. A potential // optimization, if necessary, could be to ensure the primary always constructs applyOps oplog // entries with commands at the beginning. diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index d17c0a4c512..9f4d6c5ba0c 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -316,6 +316,11 @@ public: return getObject()[ApplyOpsCommandInfoBase::kPartialTxnFieldName].booleanSafe(); } + /** + * Returns whether if the oplog entry is the last applyOps in a multiple-entry transaction. + */ + bool isEndOfLargeTransaction() const; + /** * Returns if this is a prepared 'commitTransaction' oplog entry. */ @@ -334,10 +339,15 @@ public: } /** - * Returns whether the oplog entry represents an applyOps with a commnd inside. This will occur - * if a multi-document transaction performs a command. + * Returns whether the oplog entry represents a single oplog entry transaction. */ - bool isTransactionWithCommand() const; + bool isSingleOplogEntryTransaction() const; + + /** + * Returns whether the oplog entry represents an applyOps with a command inside. This is only + * for transactions with only one oplog entry. + */ + bool isSingleOplogEntryTransactionWithCommand() const; /** * Returns if the oplog entry is for a CRUD operation.