mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-42342 Replica set monitor shutdown should fail all outstanding RSM promises
This commit is contained in:
parent
4ae9ca6543
commit
71768cd39c
@ -207,13 +207,17 @@ void ReplicaSetMonitor::init() {
|
||||
}
|
||||
}
|
||||
|
||||
ReplicaSetMonitor::~ReplicaSetMonitor() {
|
||||
void ReplicaSetMonitor::drop() {
|
||||
{
|
||||
stdx::lock_guard lk(_state->mutex);
|
||||
_state->drop();
|
||||
}
|
||||
}
|
||||
|
||||
ReplicaSetMonitor::~ReplicaSetMonitor() {
|
||||
drop();
|
||||
}
|
||||
|
||||
void ReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) {
|
||||
// Reschedule the refresh
|
||||
|
||||
@ -296,11 +300,6 @@ SemiFuture<std::vector<HostAndPort>> ReplicaSetMonitor::getHostsOrRefresh(
|
||||
Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh(
|
||||
const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
|
||||
|
||||
// If we're in shutdown, don't bother
|
||||
if (globalRSMonitorManager.isShutdown()) {
|
||||
return Status(ErrorCodes::ShutdownInProgress, "Server is shutting down"_sd);
|
||||
}
|
||||
|
||||
if (_state->isRemovedFromManager.load()) {
|
||||
return Status(ErrorCodes::ReplicaSetMonitorRemoved,
|
||||
str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed");
|
||||
|
@ -73,6 +73,11 @@ public:
|
||||
*/
|
||||
void init();
|
||||
|
||||
/**
|
||||
* Ends any ongoing refreshes.
|
||||
*/
|
||||
void drop();
|
||||
|
||||
/**
|
||||
* Returns a host matching the given read preference or an error, if no host matches.
|
||||
*
|
||||
|
@ -79,22 +79,19 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData se
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicaSetMonitorManager::_setupTaskExecutorInLock(const std::string& name) {
|
||||
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
|
||||
|
||||
// do not restart taskExecutor if is in shutdown
|
||||
if (!_taskExecutor && !_isShutdown) {
|
||||
// construct task executor
|
||||
auto net = executor::makeNetworkInterface(
|
||||
"ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
|
||||
auto netPtr = net.get();
|
||||
_taskExecutor = std::make_unique<ThreadPoolTaskExecutor>(
|
||||
std::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
|
||||
LOG(1) << "Starting up task executor for monitoring replica sets in response to request to "
|
||||
"monitor set: "
|
||||
<< redact(name);
|
||||
_taskExecutor->startup();
|
||||
void ReplicaSetMonitorManager::_setupTaskExecutorInLock() {
|
||||
if (_isShutdown || _taskExecutor) {
|
||||
// do not restart taskExecutor if is in shutdown
|
||||
return;
|
||||
}
|
||||
|
||||
// construct task executor
|
||||
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
|
||||
auto net = executor::makeNetworkInterface(
|
||||
"ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
|
||||
auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
|
||||
_taskExecutor = std::make_unique<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
|
||||
_taskExecutor->startup();
|
||||
}
|
||||
|
||||
namespace {
|
||||
@ -105,30 +102,18 @@ void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode
|
||||
|
||||
shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
|
||||
const ConnectionString& connStr) {
|
||||
invariant(connStr.type() == ConnectionString::SET);
|
||||
|
||||
stdx::lock_guard<stdx::mutex> lk(_mutex);
|
||||
_setupTaskExecutorInLock(connStr.toString());
|
||||
auto setName = connStr.getSetName();
|
||||
auto monitor = _monitors[setName].lock();
|
||||
if (monitor) {
|
||||
uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), transport::kGlobalSSLMode);
|
||||
return monitor;
|
||||
}
|
||||
|
||||
log() << "Starting new replica set monitor for " << connStr.toString();
|
||||
|
||||
auto newMonitor = std::make_shared<ReplicaSetMonitor>(MongoURI(connStr));
|
||||
_monitors[setName] = newMonitor;
|
||||
newMonitor->init();
|
||||
return newMonitor;
|
||||
return getOrCreateMonitor(MongoURI(connStr));
|
||||
}
|
||||
|
||||
shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) {
|
||||
invariant(uri.type() == ConnectionString::SET);
|
||||
|
||||
stdx::lock_guard<stdx::mutex> lk(_mutex);
|
||||
_setupTaskExecutorInLock(uri.toString());
|
||||
uassert(ErrorCodes::ShutdownInProgress,
|
||||
str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown",
|
||||
!_isShutdown);
|
||||
|
||||
_setupTaskExecutorInLock();
|
||||
const auto& setName = uri.getSetName();
|
||||
auto monitor = _monitors[setName].lock();
|
||||
if (monitor) {
|
||||
@ -169,34 +154,36 @@ void ReplicaSetMonitorManager::removeMonitor(StringData setName) {
|
||||
}
|
||||
|
||||
void ReplicaSetMonitorManager::shutdown() {
|
||||
|
||||
decltype(_monitors) monitors;
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lk(_mutex);
|
||||
if (!_taskExecutor || _isShutdown) {
|
||||
if (std::exchange(_isShutdown, true)) {
|
||||
return;
|
||||
}
|
||||
_isShutdown = true;
|
||||
|
||||
monitors = std::exchange(_monitors, {});
|
||||
}
|
||||
|
||||
LOG(1) << "Shutting down task executor used for monitoring replica sets";
|
||||
_taskExecutor->shutdown();
|
||||
_taskExecutor->join();
|
||||
log() << "Dropping all ongoing scans against replica sets";
|
||||
for (auto & [ name, monitor ] : monitors) {
|
||||
auto anchor = monitor.lock();
|
||||
if (!anchor) {
|
||||
continue;
|
||||
}
|
||||
|
||||
anchor->markAsRemoved();
|
||||
anchor->drop();
|
||||
}
|
||||
|
||||
if (auto taskExecutor = std::exchange(_taskExecutor, {})) {
|
||||
LOG(1) << "Shutting down task executor used for monitoring replica sets";
|
||||
taskExecutor->shutdown();
|
||||
taskExecutor->join();
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicaSetMonitorManager::removeAllMonitors() {
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lk(_mutex);
|
||||
_monitors = ReplicaSetMonitorsMap();
|
||||
if (!_taskExecutor || _isShutdown) {
|
||||
return;
|
||||
}
|
||||
_isShutdown = true;
|
||||
}
|
||||
|
||||
LOG(1) << "Shutting down task executor used for monitoring replica sets";
|
||||
_taskExecutor->shutdown();
|
||||
_taskExecutor->join();
|
||||
_taskExecutor.reset();
|
||||
shutdown();
|
||||
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lk(_mutex);
|
||||
|
@ -115,7 +115,7 @@ private:
|
||||
// Needs to be after `_taskExecutor`, so that it will be destroyed before the `_taskExecutor`.
|
||||
ReplicaSetMonitorsMap _monitors;
|
||||
|
||||
void _setupTaskExecutorInLock(const std::string& name);
|
||||
void _setupTaskExecutorInLock();
|
||||
|
||||
// set to true when shutdown has been called.
|
||||
bool _isShutdown{false};
|
||||
|
@ -1553,6 +1553,7 @@ env.Library(
|
||||
],
|
||||
LIBDEPS_PRIVATE=[
|
||||
'op_observer',
|
||||
'$BUILD_DIR/mongo/client/clientdriver_network', # TODO Remove this in SERVER-42437
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -33,6 +33,7 @@
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/client/replica_set_monitor.h"
|
||||
#include "mongo/db/client.h"
|
||||
#include "mongo/db/op_observer_registry.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
@ -47,6 +48,9 @@ ScopedGlobalServiceContextForTest::ScopedGlobalServiceContextForTest() {
|
||||
}
|
||||
|
||||
ScopedGlobalServiceContextForTest::~ScopedGlobalServiceContextForTest() {
|
||||
// TODO Remove in SERVER-42437
|
||||
ReplicaSetMonitor::shutdown();
|
||||
|
||||
setGlobalServiceContext({});
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user