diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index d43adc418db..61c0db1f52b 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -43,12 +43,14 @@ namespace mongo { TicketHolder::TicketHolder(ServiceContext* serviceContext, int numTickets, bool trackPeakUsed, - ResizePolicy resizePolicy) + ResizePolicy resizePolicy, + int32_t maxQueueDepth) : _trackPeakUsed(trackPeakUsed), _resizePolicy(resizePolicy), _serviceContext(serviceContext), _tickets(numTickets), - _outof(numTickets) {} + _outof(numTickets), + _maxQueueDepth(maxQueueDepth) {} bool TicketHolder::resize(OperationContext* opCtx, int32_t newSize, Date_t deadline) { stdx::lock_guard lk(_resizeMutex); @@ -178,13 +180,22 @@ boost::optional TicketHolder::_performWaitForTicketUntil(OperationContex while (true) { if (boost::optional maybeTicket = _tryAcquireNormalPriorityTicket(admCtx); maybeTicket) { - return std::move(*maybeTicket); + return maybeTicket; } Date_t deadline = nextDeadline(); - _waiterCount.fetchAndAdd(1); - _tickets.waitUntil(0, deadline); - _waiterCount.fetchAndSubtract(1); + { + const auto previousWaiterCount = _waiterCount.fetchAndAdd(1); + + // Since uassert throws, we use raii to substract the waiter count + ON_BLOCK_EXIT([&] { _waiterCount.fetchAndSubtract(1); }); + + uassert(ErrorCodes::AdmissionQueueOverflow, + "MongoDB is overloaded and cannot accept new operations. Try again later.", + previousWaiterCount < _maxQueueDepth); + + _tickets.waitUntil(0, deadline); + } if (interruptible) { opCtx->checkForInterrupt(); @@ -327,4 +338,8 @@ void TicketHolder::setNumFinishedProcessing_forTest(int32_t numFinishedProcessin void TicketHolder::setPeakUsed_forTest(int32_t used) { _peakUsed.store(used); } + +int32_t TicketHolder::waiting_forTest() const { + return _waiterCount.load(); +} } // namespace mongo diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 98ac560ee0b..c821364602e 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -31,6 +31,7 @@ #include #include #include +#include #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/service_context.h" @@ -73,7 +74,8 @@ public: TicketHolder(ServiceContext* serviceContext, int numTickets, bool trackPeakUsed, - ResizePolicy resizePolicy = ResizePolicy::kGradual); + ResizePolicy resizePolicy = ResizePolicy::kGradual, + int32_t maxQueueDepth = std::numeric_limits::max()); /** * Adjusts the total number of tickets allocated for the ticket pool to 'newSize'. @@ -145,6 +147,11 @@ public: */ int32_t getAndResetPeakUsed(); + /** + * Exposes the amount of waiting threads for testing purpose. + */ + int32_t waiting_forTest() const; + /** * Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket * pool. @@ -235,6 +242,7 @@ private: Atomic _waiterCount{0}; AtomicWord _outof; AtomicWord _peakUsed; + int32_t _maxQueueDepth; }; /** diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index c8c8b7737ea..e6d2ee8b1f4 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -26,6 +26,8 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#include "mongo/stdx/thread.h" +#include #include #include "mongo/db/service_context_test_fixture.h" @@ -37,10 +39,14 @@ #include "mongo/util/packaged_task.h" #include "mongo/util/tick_source_mock.h" +#include "mongo/unittest/assert.h" +#include "mongo/util/assert_util.h" + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest namespace { using namespace mongo; +using namespace std::literals; // Timeout to use to ensure that waiters get queued and/or receive tickets. // We use this timeout so we can bail-out early and fail with a better diagnostic when we appear to @@ -59,6 +65,8 @@ public: std::make_unique(ServiceContext::make( nullptr, nullptr, std::make_unique>()))) {} + static inline const Milliseconds kSleepTime{1}; + void setUp() override { ServiceContextTest::setUp(); _client = getServiceContext()->getService()->makeClient("test"); @@ -103,6 +111,15 @@ public: TicketHolder::ResizePolicy::kImmediate); } + template + void waitUntilCanceled(OperationContext& opCtx, Predicate predicate) { + while (opCtx.checkForInterruptNoAssert() == Status::OK()) { + if (predicate()) + return; + sleepFor(kSleepTime); + } + ASSERT(false); + } protected: class Stats; @@ -588,4 +605,101 @@ TEST_F(TicketHolderTest, ReleaseToPoolWakesWaiters) { _opCtx->runWithDeadline( getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] { allDone.get(_opCtx.get()); }); } + +TEST_F(TicketHolderImmediateResizeTest, WaitQueueMax0) { + constexpr int initialNumTickets = 4; + constexpr int maxNumberOfWaiters = 0; + auto holder = std::make_unique(getServiceContext(), + initialNumTickets, + false /* trackPeakUsed */, + TicketHolder::ResizePolicy::kImmediate, + maxNumberOfWaiters); + + // acquire 4 tickets + std::array admCtxs; + std::array, 4> tickets; + for (int i = 0; i < 4; ++i) { + tickets[i] = holder->waitForTicket(_opCtx.get(), &admCtxs[i + 1]); + } + + // ensure 4 are now in-use and 0 are left available + ASSERT_EQ(holder->used(), 4); + ASSERT_EQ(holder->available(), 0); + ASSERT_EQ(holder->outof(), 4); + + // Since no thread can be waiting for a ticket, it will cause waiting + ASSERT_THROWS(holder->waitForTicket(_opCtx.get(), &admCtxs[0]), + ExceptionFor); + + // Releasing the tickets, making all tickets available in the process + tickets = {}; + + // ensure 0 are now in-use and 4 are left available + ASSERT_EQ(holder->used(), 0); + ASSERT_EQ(holder->available(), 4); + ASSERT_EQ(holder->outof(), 4); +} + +TEST_F(TicketHolderImmediateResizeTest, WaitQueueMax1) { + constexpr int initialNumTickets = 4; + constexpr int maxNumberOfWaiters = 1; + auto holder = std::make_unique(getServiceContext(), + initialNumTickets, + false /* trackPeakUsed */, + TicketHolder::ResizePolicy::kImmediate, + maxNumberOfWaiters); + + // acquire 4 tickets + std::array admCtxs; + std::array, 4> tickets; + for (int i = 0; i < 4; ++i) { + tickets[i] = holder->waitForTicket(_opCtx.get(), &admCtxs[i + 1]); + } + + // ensure 4 are now in-use and 0 are left available + ASSERT_EQ(holder->used(), 4); + ASSERT_EQ(holder->available(), 0); + ASSERT_EQ(holder->outof(), 4); + + // We aquire a ticket in another thread. + // Since no ticket available, it will cause a blocking wait on that thread. + MockAdmission releaseWaiterAdmission{getServiceContext(), AdmissionContext::Priority::kNormal}; + Future ticketFuture = spawn([&]() { + return holder->waitForTicket(releaseWaiterAdmission.opCtx.get(), + &releaseWaiterAdmission.admCtx); + }); + + // We wait until ticketFuture is actually waiting for the ticket or until timeout exceeded + _opCtx->runWithDeadline(getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] { + waitUntilCanceled(*_opCtx, [&] { return holder->waiting_forTest() == 1; }); + }); + + // Since the maximum amount of ticket is one, and one is already waiting, it will throw + ASSERT_THROWS(holder->waitForTicketUntil(_opCtx.get(), &admCtxs[0], getNextDeadline()), + ExceptionFor); + + // Releasing the tickets, resuming the waiter + tickets = {}; + + // The fifth ticket is getting aquired after waiting + boost::optional ticket; + _opCtx->runWithDeadline(getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] { + // We can gst a ticket when one is available + ticket = std::move(ticketFuture).get(_opCtx.get()); + }); + + // ensure 3 are now available and 1 are in-use + // The one in use was the waiting ticket + ASSERT_EQ(holder->used(), 1); + ASSERT_EQ(holder->available(), 3); + ASSERT_EQ(holder->outof(), 4); + + // Releasing the fifth ticket + ticket.reset(); + + // ensure 4 are now available and 0 are in-use + ASSERT_EQ(holder->used(), 0); + ASSERT_EQ(holder->available(), 4); + ASSERT_EQ(holder->outof(), 4); +} } // namespace