0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

SERVER-39022 Make the sharding fixed task executor use unlimited thread pool size

This commit is contained in:
Kaloian Manassiev 2019-01-15 17:51:49 -05:00
parent 692be59335
commit 75f8c26ace
6 changed files with 72 additions and 75 deletions

View File

@ -1,7 +1,6 @@
/**
* Test that a new primary that gets elected will properly perform shard initialization.
*/
(function() {
"use strict";
@ -9,6 +8,7 @@
var replTest = new ReplSetTest({nodes: 3});
replTest.startSet({shardsvr: ''});
var nodes = replTest.nodeList();
replTest.initiate({
_id: replTest.name,
@ -41,7 +41,7 @@
shardIdentityQuery, shardIdentityUpdate, {upsert: true, writeConcern: {w: 'majority'}}));
replTest.stopMaster();
replTest.waitForMaster();
replTest.waitForMaster(30000);
primaryConn = replTest.getPrimary();
@ -55,5 +55,4 @@
replTest.stopSet();
st.stop();
})();

View File

@ -42,9 +42,6 @@
#include "mongo/util/future.h"
namespace mongo {
class BSONObjBuilder;
namespace executor {
MONGO_FAIL_POINT_DECLARE(networkInterfaceDiscardCommandsBeforeAcquireConn);

View File

@ -67,15 +67,26 @@ NetworkInterfaceMock::~NetworkInterfaceMock() {
void NetworkInterfaceMock::logQueues() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_logQueues_inlock();
const std::vector<std::pair<std::string, const NetworkOperationList*>> queues{
{"unscheduled", &_unscheduled},
{"scheduled", &_scheduled},
{"processing", &_processing},
{"blackholes", &_blackHoled}};
for (auto&& queue : queues) {
if (queue.second->empty()) {
continue;
}
log() << "**** queue: " << queue.first << " ****";
for (auto&& item : *queue.second) {
log() << "\t\t " << item.getDiagnosticString();
}
}
}
std::string NetworkInterfaceMock::getDiagnosticString() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
return _getDiagnosticString_inlock();
}
std::string NetworkInterfaceMock::_getDiagnosticString_inlock() const {
return str::stream() << "NetworkInterfaceMock -- waitingToRunMask:" << _waitingToRunMask
<< ", now:" << _now_inlock().toString() << ", hasStarted:" << _hasStarted
<< ", inShutdown: " << _inShutdown.load()
@ -85,25 +96,6 @@ std::string NetworkInterfaceMock::_getDiagnosticString_inlock() const {
<< ", unscheduled: " << _unscheduled.size();
}
void NetworkInterfaceMock::_logQueues_inlock() const {
std::vector<std::pair<std::string, const NetworkOperationList*>> queues{
{"unscheduled", &_unscheduled},
{"scheduled", &_scheduled},
{"processing", &_processing},
{"blackholes", &_blackHoled}};
for (auto&& queue : queues) {
if (queue.second->empty()) {
continue;
}
log() << "**** queue: " << queue.first << " ****";
for (auto&& item : *queue.second) {
log() << "\t\t " << item.getDiagnosticString();
}
}
}
void NetworkInterfaceMock::appendConnectionStats(ConnectionPoolStats* stats) const {}
Date_t NetworkInterfaceMock::now() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _now_inlock();

View File

@ -52,7 +52,6 @@ class BSONObj;
namespace executor {
using ResponseStatus = TaskExecutor::ResponseStatus;
class NetworkConnectionHook;
/**
@ -84,16 +83,11 @@ public:
NetworkInterfaceMock();
virtual ~NetworkInterfaceMock();
virtual void appendConnectionStats(ConnectionPoolStats* stats) const;
virtual std::string getDiagnosticString();
Counters getCounters() const override {
return Counters();
}
/**
* Logs the contents of the queues for diagnostics.
*/
virtual void logQueues();
void logQueues();
////////////////////////////////////////////////////////////////////////////////
//
@ -101,20 +95,26 @@ public:
//
////////////////////////////////////////////////////////////////////////////////
virtual void startup();
virtual void shutdown();
virtual bool inShutdown() const;
virtual void waitForWork();
virtual void waitForWorkUntil(Date_t when);
virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
virtual void signalWorkAvailable();
virtual Date_t now();
virtual std::string getHostName();
virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton = nullptr);
void appendConnectionStats(ConnectionPoolStats* stats) const override {}
std::string getDiagnosticString() override;
Counters getCounters() const override {
return Counters();
}
void startup() override;
void shutdown() override;
bool inShutdown() const override;
void waitForWork() override;
void waitForWorkUntil(Date_t when) override;
void signalWorkAvailable() override;
Date_t now() override;
std::string getHostName() override;
Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
RemoteCommandCompletionFn&& onFinish,
const transport::BatonHandle& baton = nullptr) override;
/**
* If the network operation is in the _unscheduled or _processing queues, moves the operation
@ -122,21 +122,20 @@ public:
* the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is
* called after the task has already completed, but its callback has not yet been run.
*/
virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const transport::BatonHandle& baton = nullptr);
void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
const transport::BatonHandle& baton = nullptr) override;
/**
* Not implemented.
*/
virtual Status setAlarm(Date_t when,
unique_function<void()> action,
const transport::BatonHandle& baton = nullptr);
Status setAlarm(Date_t when,
unique_function<void()> action,
const transport::BatonHandle& baton = nullptr) override;
virtual bool onNetworkThread();
bool onNetworkThread() override;
void dropConnections(const HostAndPort&) override {}
////////////////////////////////////////////////////////////////////////////////
//
// Methods for simulating network operations and the passage of time.
@ -151,6 +150,10 @@ public:
*/
class InNetworkGuard;
void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
/**
* Causes the currently running (non-executor) thread to assume the mantle of the network
* simulation thread.
@ -199,7 +202,7 @@ public:
*/
void scheduleResponse(NetworkOperationIterator noi,
Date_t when,
const ResponseStatus& response);
const TaskExecutor::ResponseStatus& response);
/**
* Schedules a successful "response" to "noi" at virtual time "when".
@ -221,14 +224,13 @@ public:
* "when" defaults to now().
*/
RemoteCommandRequest scheduleErrorResponse(const Status& response);
RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response);
RemoteCommandRequest scheduleErrorResponse(const TaskExecutor::ResponseStatus response);
RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
const Status& response);
RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
Date_t when,
const Status& response);
/**
* Swallows "noi", causing the network interface to not respond to it until
* shutdown() is called.
@ -277,7 +279,7 @@ public:
*/
void _interruptWithResponse_inlock(const TaskExecutor::CallbackHandle& cbHandle,
const std::vector<NetworkOperationList*> queuesToCheck,
const ResponseStatus& response);
const TaskExecutor::ResponseStatus& response);
private:
/**
@ -307,15 +309,6 @@ private:
*/
void _startup_inlock();
/**
* Returns information about the state of this mock for diagnostic purposes.
*/
std::string _getDiagnosticString_inlock() const;
/**
* Logs the contents of the queues for diagnostics.
*/
void _logQueues_inlock() const;
/**
* Returns the current virtualized time.
*/
@ -446,7 +439,7 @@ public:
/**
* Sets the response and thet virtual time at which it will be delivered.
*/
void setResponse(Date_t responseDate, const ResponseStatus& response);
void setResponse(Date_t responseDate, const TaskExecutor::ResponseStatus& response);
/**
* Predicate that returns true if cbHandle equals the executor's handle for this network
@ -506,7 +499,7 @@ private:
Date_t _responseDate;
TaskExecutor::CallbackHandle _cbHandle;
RemoteCommandRequest _request;
ResponseStatus _response;
TaskExecutor::ResponseStatus _response;
RemoteCommandCompletionFn _onFinish;
};

View File

@ -195,7 +195,7 @@ protected:
return guard->hasReadyRequests();
}
void scheduleErrorResponse(executor::ResponseStatus rs) {
void scheduleErrorResponse(executor::TaskExecutor::ResponseStatus rs) {
invariant(!rs.isOK());
rs.elapsedMillis = Milliseconds(0);
executor::NetworkInterfaceMock* net = network();

View File

@ -67,6 +67,7 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/sharding_task_executor.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@ -120,6 +121,21 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service
return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
}
std::unique_ptr<executor::TaskExecutor> makeShardingFixedTaskExecutor(
std::unique_ptr<NetworkInterface> net) {
auto executor =
stdx::make_unique<ThreadPoolTaskExecutor>(stdx::make_unique<ThreadPool>([] {
ThreadPool::Options opts;
opts.poolName = "Sharding-Fixed";
opts.maxThreads =
ThreadPool::Options::kUnlimited;
return opts;
}()),
std::move(net));
return stdx::make_unique<executor::ShardingTaskExecutor>(std::move(executor));
}
std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
std::unique_ptr<NetworkInterface> fixedNet,
rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder,
@ -140,7 +156,7 @@ std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
}
// Add executor used to perform non-performance critical work.
auto fixedExec = makeShardingTaskExecutor(std::move(fixedNet));
auto fixedExec = makeShardingFixedTaskExecutor(std::move(fixedNet));
auto executorPool = stdx::make_unique<TaskExecutorPool>();
executorPool->addExecutors(std::move(executors), std::move(fixedExec));