mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-57024 Fix SpillableCache writes to disk across getMore requests
This commit is contained in:
parent
532a6a8ae7
commit
b43ec74870
@ -42,6 +42,35 @@ seedWithTickerData(coll, 10);
|
||||
// Run $sum test with memory limits that cause spilling to disk.
|
||||
testAccumAgainstGroup(coll, "$sum", 0);
|
||||
|
||||
function checkProfilerForDiskWrite(dbToCheck) {
|
||||
if (!FixtureHelpers.isMongos) {
|
||||
const profileObj = getLatestProfilerEntry(dbToCheck, {
|
||||
$or: [
|
||||
{originatingCommand: {pipeline: {$setWindowFields: {$exists: true}}}},
|
||||
{command: {pipeline: {$setWindowFields: {$exists: true}}}}
|
||||
]
|
||||
});
|
||||
assert(profileObj.usedDisk, tojson(profileObj));
|
||||
}
|
||||
}
|
||||
|
||||
FixtureHelpers.runCommandOnEachPrimary({db: db, cmdObj: {profile: 2}});
|
||||
// Test that a query that spills to disk succeeds across getMore requests.
|
||||
const wfResults =
|
||||
coll.aggregate(
|
||||
[
|
||||
{
|
||||
$setWindowFields: {
|
||||
sortBy: {_id: 1},
|
||||
output: {res: {$sum: "$price", window: {documents: ["unbounded", 5]}}}
|
||||
},
|
||||
},
|
||||
],
|
||||
{allowDiskUse: true, cursor: {batchSize: 1}})
|
||||
.toArray();
|
||||
assert.eq(wfResults.length, 20);
|
||||
checkProfilerForDiskWrite(db);
|
||||
|
||||
// Test a small, in memory, partition and a larger partition that requires spilling to disk.
|
||||
coll.drop();
|
||||
// Create small partition.
|
||||
@ -53,19 +82,6 @@ for (let i = 0; i < largePartitionSize; i++) {
|
||||
assert.commandWorked(coll.insert({_id: i + smallPartitionSize, val: i, partition: 2}));
|
||||
}
|
||||
|
||||
function checkProfilerForDiskWrite(dbToCheck) {
|
||||
if (!FixtureHelpers.isMongos) {
|
||||
let profileObj = getLatestProfilerEntry(dbToCheck, {
|
||||
$or: [
|
||||
{originatingCommand: {pipeline: {$setWindowFields: {$exists: true}}}},
|
||||
{command: {pipeline: {$setWindowFields: {$exists: true}}}}
|
||||
]
|
||||
});
|
||||
assert(profileObj.usedDisk, tojson(profileObj));
|
||||
}
|
||||
}
|
||||
|
||||
FixtureHelpers.runCommandOnEachPrimary({db: db, cmdObj: {profile: 2}});
|
||||
// Run an aggregation that will keep all documents in the cache for all documents.
|
||||
let results =
|
||||
coll.aggregate(
|
||||
|
@ -143,6 +143,17 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
||||
CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator()));
|
||||
}
|
||||
|
||||
// In an operation across GetMore requests we need to check that ignore conflicts is set for each
|
||||
// write to the RecordStore.
|
||||
void setIgnoreConflictsWriteBehavior(const boost::intrusive_ptr<ExpressionContext>& expCtx) {
|
||||
if (expCtx->opCtx->recoveryUnit()->getPrepareConflictBehavior() !=
|
||||
PrepareConflictBehavior::kIgnoreConflictsAllowWrites) {
|
||||
expCtx->opCtx->recoveryUnit()->abandonSnapshot();
|
||||
expCtx->opCtx->recoveryUnit()->setPrepareConflictBehavior(
|
||||
PrepareConflictBehavior::kIgnoreConflictsAllowWrites);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
std::unique_ptr<TransactionHistoryIteratorBase>
|
||||
@ -666,6 +677,7 @@ void CommonMongodProcessInterface::writeRecordsToRecordStore(
|
||||
std::vector<Record>* records,
|
||||
const std::vector<Timestamp>& ts) const {
|
||||
tassert(5643012, "Attempted to write to record store with nullptr", records);
|
||||
setIgnoreConflictsWriteBehavior(expCtx);
|
||||
writeConflictRetry(expCtx->opCtx, "MPI::writeRecordsToRecordStore", expCtx->ns.ns(), [&] {
|
||||
AutoGetCollection autoColl(expCtx->opCtx, expCtx->ns, MODE_IX);
|
||||
WriteUnitOfWork wuow(expCtx->opCtx);
|
||||
@ -696,6 +708,7 @@ Document CommonMongodProcessInterface::readRecordFromRecordStore(
|
||||
|
||||
void CommonMongodProcessInterface::deleteRecordFromRecordStore(
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx, RecordStore* rs, RecordId rID) const {
|
||||
setIgnoreConflictsWriteBehavior(expCtx);
|
||||
writeConflictRetry(expCtx->opCtx, "MPI::deleteFromRecordStore", expCtx->ns.ns(), [&] {
|
||||
AutoGetCollection autoColl(expCtx->opCtx, expCtx->ns, MODE_IX);
|
||||
WriteUnitOfWork wuow(expCtx->opCtx);
|
||||
@ -706,6 +719,7 @@ void CommonMongodProcessInterface::deleteRecordFromRecordStore(
|
||||
|
||||
void CommonMongodProcessInterface::truncateRecordStore(
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx, RecordStore* rs) const {
|
||||
setIgnoreConflictsWriteBehavior(expCtx);
|
||||
writeConflictRetry(expCtx->opCtx, "MPI::truncateRecordStore", expCtx->ns.ns(), [&] {
|
||||
AutoGetCollection autoColl(expCtx->opCtx, expCtx->ns, MODE_IX);
|
||||
WriteUnitOfWork wuow(expCtx->opCtx);
|
||||
@ -718,6 +732,7 @@ void CommonMongodProcessInterface::truncateRecordStore(
|
||||
void CommonMongodProcessInterface::deleteTemporaryRecordStore(
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
||||
std::unique_ptr<TemporaryRecordStore> rs) const {
|
||||
setIgnoreConflictsWriteBehavior(expCtx);
|
||||
AutoGetCollection autoColl(expCtx->opCtx, expCtx->ns, MODE_IX);
|
||||
rs->finalizeTemporaryTable(expCtx->opCtx, TemporaryRecordStore::FinalizationAction::kDelete);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user