mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-61985 SERVER-67193 Make reshardingPauseCoordinatorBeforeCompletion failpoint pause conditionally
This commit is contained in:
parent
6a8cc3ae11
commit
0d5fd57f9e
@ -1,6 +1,10 @@
|
||||
/**
|
||||
* Tests that setFeatureCompatibilityVersion command aborts an ongoing reshardCollection command
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
|
||||
load("jstests/libs/parallel_shell_helpers.js");
|
||||
load("jstests/sharding/libs/resharding_test_fixture.js");
|
||||
load('jstests/libs/discover_topology.js');
|
||||
load('jstests/libs/fail_point_util.js');
|
||||
@ -21,6 +25,8 @@ function runTest(forcePooledConnectionsDropped) {
|
||||
],
|
||||
});
|
||||
|
||||
const sourceNamespace = inputCollection.getFullName();
|
||||
|
||||
let mongos = inputCollection.getMongo();
|
||||
|
||||
for (let x = 0; x < 1000; x++) {
|
||||
@ -37,7 +43,17 @@ function runTest(forcePooledConnectionsDropped) {
|
||||
pauseBeforeCloseCxns = configureFailPoint(config, "pauseBeforeCloseCxns");
|
||||
}
|
||||
|
||||
function checkCoordinatorDoc() {
|
||||
assert.soon(() => {
|
||||
const coordinatorDoc =
|
||||
mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace});
|
||||
|
||||
return coordinatorDoc === null || coordinatorDoc.state === "aborting";
|
||||
});
|
||||
}
|
||||
|
||||
const recipientShardNames = reshardingTest.recipientShardNames;
|
||||
let awaitShell;
|
||||
reshardingTest.withReshardingInBackground(
|
||||
{
|
||||
newShardKeyPattern: {newKey: 1},
|
||||
@ -63,7 +79,7 @@ function runTest(forcePooledConnectionsDropped) {
|
||||
assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
|
||||
}`;
|
||||
|
||||
let awaitShell = startParallelShell(codeToRunInParallelShell, mongos.port);
|
||||
awaitShell = startParallelShell(codeToRunInParallelShell, mongos.port);
|
||||
|
||||
if (forcePooledConnectionsDropped) {
|
||||
pauseBeforeCloseCxns.wait();
|
||||
@ -88,8 +104,7 @@ function runTest(forcePooledConnectionsDropped) {
|
||||
jsTestLog("Turn off pause before pauseBeforeMarkKeepOpen failpoint");
|
||||
pauseBeforeMarkKeepOpen.off();
|
||||
}
|
||||
|
||||
awaitShell();
|
||||
checkCoordinatorDoc();
|
||||
},
|
||||
{
|
||||
expectedErrorCode: [
|
||||
@ -98,6 +113,8 @@ function runTest(forcePooledConnectionsDropped) {
|
||||
]
|
||||
});
|
||||
|
||||
awaitShell();
|
||||
|
||||
reshardingTest.withReshardingInBackground(
|
||||
{
|
||||
newShardKeyPattern: {newKey: 1},
|
||||
@ -107,7 +124,14 @@ function runTest(forcePooledConnectionsDropped) {
|
||||
],
|
||||
},
|
||||
() => {
|
||||
assert.commandWorked(mongos.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
|
||||
assert.soon(() => {
|
||||
return mongos.getDB('config').reshardingOperations.findOne() != null;
|
||||
}, "timed out waiting for coordinator doc to be written", 30 * 1000);
|
||||
awaitShell = startParallelShell(funWithArgs(function(latestFCV) {
|
||||
assert.commandWorked(db.adminCommand(
|
||||
{setFeatureCompatibilityVersion: latestFCV}));
|
||||
}, latestFCV), mongos.port);
|
||||
checkCoordinatorDoc();
|
||||
},
|
||||
{
|
||||
expectedErrorCode: [
|
||||
@ -117,6 +141,7 @@ function runTest(forcePooledConnectionsDropped) {
|
||||
]
|
||||
});
|
||||
|
||||
awaitShell();
|
||||
reshardingTest.teardown();
|
||||
}
|
||||
|
@ -321,8 +321,10 @@ var ReshardingTest = class {
|
||||
configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeBlockingWrites");
|
||||
this._pauseCoordinatorBeforeDecisionPersistedFailpoint =
|
||||
configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted");
|
||||
this._pauseCoordinatorBeforeCompletionFailpoint = configureFailPoint(
|
||||
configPrimary, "reshardingPauseCoordinatorBeforeCompletion", {}, {times: 1});
|
||||
this._pauseCoordinatorBeforeCompletionFailpoint =
|
||||
configureFailPoint(configPrimary,
|
||||
"reshardingPauseCoordinatorBeforeCompletion",
|
||||
{"sourceNamespace": this._ns});
|
||||
|
||||
this._commandDoneSignal = new CountDownLatch(1);
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
"use strict";
|
||||
load("jstests/libs/discover_topology.js");
|
||||
load("jstests/sharding/libs/resharding_test_fixture.js");
|
||||
load('jstests/libs/parallel_shell_helpers.js');
|
||||
|
||||
const originalCollectionNs = "reshardingDb.coll";
|
||||
|
||||
@ -36,6 +37,7 @@ const configsvr = new Mongo(topology.configsvr.nodes[0]);
|
||||
const pauseAfterPreparingToDonateFP =
|
||||
configureFailPoint(configsvr, "reshardingPauseCoordinatorAfterPreparingToDonate");
|
||||
|
||||
let awaitAbort;
|
||||
reshardingTest.withReshardingInBackground(
|
||||
{
|
||||
|
||||
@ -47,13 +49,30 @@ reshardingTest.withReshardingInBackground(
|
||||
},
|
||||
() => {
|
||||
pauseAfterPreparingToDonateFP.wait();
|
||||
assert.commandWorked(mongos.adminCommand({abortReshardCollection: originalCollectionNs}));
|
||||
assert.neq(null, mongos.getCollection("config.reshardingOperations").findOne({
|
||||
ns: originalCollectionNs
|
||||
}));
|
||||
// Signaling abort will cause the
|
||||
// pauseAfterPreparingToDonateFP to throw, implicitly
|
||||
// allowing the coordinator to make progress without
|
||||
// explicitly turning off the failpoint.
|
||||
awaitAbort =
|
||||
startParallelShell(funWithArgs(function(sourceNamespace) {
|
||||
db.adminCommand({abortReshardCollection: sourceNamespace});
|
||||
}, originalCollectionNs), mongos.port);
|
||||
// Wait for the coordinator to remove coordinator document from config.reshardingOperations
|
||||
// as a result of the recipients and donors transitioning to done due to abort.
|
||||
assert.soon(() => {
|
||||
const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({
|
||||
ns: originalCollectionNs
|
||||
});
|
||||
return coordinatorDoc === null || coordinatorDoc.state === "aborting";
|
||||
});
|
||||
},
|
||||
{expectedErrorCode: ErrorCodes.ReshardCollectionAborted});
|
||||
|
||||
awaitAbort();
|
||||
pauseAfterPreparingToDonateFP.off();
|
||||
|
||||
reshardingTest.teardown();
|
||||
})();
|
||||
|
@ -109,19 +109,7 @@ reshardingTest.withReshardingInBackground(
|
||||
}
|
||||
},
|
||||
{
|
||||
// As a result of the elections intentionally triggered on the config server replica sets,
|
||||
// the primary shard of the database may retry the _configsvrReshardCollection command. It
|
||||
// is possible for the resharding operation from the first _configsvrReshardCollection
|
||||
// command to have entirely finished executing to the point of removing the coordinator
|
||||
// state document. A retry of the _configsvrReshardCollection command in this situation will
|
||||
// lead to a second resharding operation to run. The second resharding operation will have
|
||||
// the duplicate documents cloned by the ReshardingCollectionCloner rather than applied by
|
||||
// the ReshardingOplogApplier as intended. This results in the reshardCollection command
|
||||
// failing with a DuplicateKey error rather than the error code for the stash collections
|
||||
// being non-empty. The recipient must have been able to successfully update its state to
|
||||
// "applying" in the first resharding operation even when the ReshardingCoordinatorService
|
||||
// had yet to be rebuilt so we accept DuplicateKey as an error too.
|
||||
expectedErrorCode: [5356800, ErrorCodes.DuplicateKey],
|
||||
expectedErrorCode: 5356800,
|
||||
});
|
||||
|
||||
reshardingTest.teardown();
|
||||
|
@ -118,6 +118,7 @@ const waitUntilReshardingInitializedOnDonor = () => {
|
||||
* @param {Function} config.setup
|
||||
* @param {AfterReshardingCallback} afterReshardingFn
|
||||
*/
|
||||
|
||||
const withReshardingInBackground =
|
||||
(duringReshardingFn,
|
||||
{setup = () => {}, expectedErrorCode, afterReshardingFn = () => {}} = {}) => {
|
||||
@ -132,22 +133,34 @@ const withReshardingInBackground =
|
||||
},
|
||||
duringReshardingFn,
|
||||
{expectedErrorCode: expectedErrorCode, afterReshardingFn: afterReshardingFn});
|
||||
|
||||
assertCommandsSucceedAfterReshardingOpFinishes(mongos.getDB(databaseName));
|
||||
assert.commandWorked(sourceCollection.dropIndex(indexCreatedByTest));
|
||||
};
|
||||
|
||||
// Tests that the prohibited commands work if the resharding operation is aborted.
|
||||
let awaitAbort;
|
||||
withReshardingInBackground(() => {
|
||||
waitUntilReshardingInitializedOnDonor();
|
||||
assert.neq(null,
|
||||
mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace}));
|
||||
awaitAbort = startParallelShell(funWithArgs(function(sourceNamespace) {
|
||||
db.adminCommand({abortReshardCollection: sourceNamespace});
|
||||
}, sourceNamespace), mongos.port);
|
||||
// Wait for the coordinator to remove coordinator document from config.reshardingOperations
|
||||
// as a result of the recipients and donors transitioning to done due to abort.
|
||||
assert.soon(() => {
|
||||
const coordinatorDoc =
|
||||
mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace});
|
||||
|
||||
assert.commandWorked(mongos.adminCommand({abortReshardCollection: sourceNamespace}));
|
||||
return coordinatorDoc === null || coordinatorDoc.state === "aborting";
|
||||
});
|
||||
}, {
|
||||
expectedErrorCode: ErrorCodes.ReshardCollectionAborted,
|
||||
});
|
||||
awaitAbort();
|
||||
|
||||
// Tests that the prohibited commands succeed if the resharding operation succeeds. During the
|
||||
// operation it makes sures that the prohibited commands are rejected during the resharding
|
||||
// operation it makes sure that the prohibited commands are rejected during the resharding
|
||||
// operation.
|
||||
withReshardingInBackground(() => {
|
||||
waitUntilReshardingInitializedOnDonor();
|
||||
|
@ -1364,8 +1364,15 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
|
||||
})
|
||||
.onCompletion([this, executor](Status status) {
|
||||
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
|
||||
reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled(
|
||||
opCtx.get(), _ctHolder->getStepdownToken());
|
||||
reshardingPauseCoordinatorBeforeCompletion.executeIf(
|
||||
[&](const BSONObj&) {
|
||||
reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled(
|
||||
opCtx.get(), _ctHolder->getStepdownToken());
|
||||
},
|
||||
[&](const BSONObj& data) {
|
||||
auto ns = data.getStringField("sourceNamespace");
|
||||
return ns.empty() ? true : ns.toString() == _coordinatorDoc.getSourceNss().ns();
|
||||
});
|
||||
|
||||
{
|
||||
auto lg = stdx::lock_guard(_fulfillmentMutex);
|
||||
|
Loading…
Reference in New Issue
Block a user