mirror of
https://github.com/mongodb/mongo.git
synced 2024-11-21 12:39:08 +01:00
SERVER-96215: Max queue depth parameter added to ticket holder (#29200)
GitOrigin-RevId: 98edf0a4eed1c45789ad0a20331cb4b6fd5c412f
This commit is contained in:
parent
e446a7f1d0
commit
2bf401fcd9
@ -43,12 +43,14 @@ namespace mongo {
|
||||
TicketHolder::TicketHolder(ServiceContext* serviceContext,
|
||||
int numTickets,
|
||||
bool trackPeakUsed,
|
||||
ResizePolicy resizePolicy)
|
||||
ResizePolicy resizePolicy,
|
||||
int32_t maxQueueDepth)
|
||||
: _trackPeakUsed(trackPeakUsed),
|
||||
_resizePolicy(resizePolicy),
|
||||
_serviceContext(serviceContext),
|
||||
_tickets(numTickets),
|
||||
_outof(numTickets) {}
|
||||
_outof(numTickets),
|
||||
_maxQueueDepth(maxQueueDepth) {}
|
||||
|
||||
bool TicketHolder::resize(OperationContext* opCtx, int32_t newSize, Date_t deadline) {
|
||||
stdx::lock_guard<stdx::mutex> lk(_resizeMutex);
|
||||
@ -178,13 +180,22 @@ boost::optional<Ticket> TicketHolder::_performWaitForTicketUntil(OperationContex
|
||||
while (true) {
|
||||
if (boost::optional<Ticket> maybeTicket = _tryAcquireNormalPriorityTicket(admCtx);
|
||||
maybeTicket) {
|
||||
return std::move(*maybeTicket);
|
||||
return maybeTicket;
|
||||
}
|
||||
|
||||
Date_t deadline = nextDeadline();
|
||||
_waiterCount.fetchAndAdd(1);
|
||||
_tickets.waitUntil(0, deadline);
|
||||
_waiterCount.fetchAndSubtract(1);
|
||||
{
|
||||
const auto previousWaiterCount = _waiterCount.fetchAndAdd(1);
|
||||
|
||||
// Since uassert throws, we use raii to substract the waiter count
|
||||
ON_BLOCK_EXIT([&] { _waiterCount.fetchAndSubtract(1); });
|
||||
|
||||
uassert(ErrorCodes::AdmissionQueueOverflow,
|
||||
"MongoDB is overloaded and cannot accept new operations. Try again later.",
|
||||
previousWaiterCount < _maxQueueDepth);
|
||||
|
||||
_tickets.waitUntil(0, deadline);
|
||||
}
|
||||
|
||||
if (interruptible) {
|
||||
opCtx->checkForInterrupt();
|
||||
@ -327,4 +338,8 @@ void TicketHolder::setNumFinishedProcessing_forTest(int32_t numFinishedProcessin
|
||||
void TicketHolder::setPeakUsed_forTest(int32_t used) {
|
||||
_peakUsed.store(used);
|
||||
}
|
||||
|
||||
int32_t TicketHolder::waiting_forTest() const {
|
||||
return _waiterCount.load();
|
||||
}
|
||||
} // namespace mongo
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include <boost/move/utility_core.hpp>
|
||||
#include <boost/optional/optional.hpp>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
@ -73,7 +74,8 @@ public:
|
||||
TicketHolder(ServiceContext* serviceContext,
|
||||
int numTickets,
|
||||
bool trackPeakUsed,
|
||||
ResizePolicy resizePolicy = ResizePolicy::kGradual);
|
||||
ResizePolicy resizePolicy = ResizePolicy::kGradual,
|
||||
int32_t maxQueueDepth = std::numeric_limits<int32_t>::max());
|
||||
|
||||
/**
|
||||
* Adjusts the total number of tickets allocated for the ticket pool to 'newSize'.
|
||||
@ -145,6 +147,11 @@ public:
|
||||
*/
|
||||
int32_t getAndResetPeakUsed();
|
||||
|
||||
/**
|
||||
* Exposes the amount of waiting threads for testing purpose.
|
||||
*/
|
||||
int32_t waiting_forTest() const;
|
||||
|
||||
/**
|
||||
* Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket
|
||||
* pool.
|
||||
@ -235,6 +242,7 @@ private:
|
||||
Atomic<int32_t> _waiterCount{0};
|
||||
AtomicWord<int32_t> _outof;
|
||||
AtomicWord<int32_t> _peakUsed;
|
||||
int32_t _maxQueueDepth;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -26,6 +26,8 @@
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
#include "mongo/stdx/thread.h"
|
||||
#include <concepts>
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/db/service_context_test_fixture.h"
|
||||
@ -37,10 +39,14 @@
|
||||
#include "mongo/util/packaged_task.h"
|
||||
#include "mongo/util/tick_source_mock.h"
|
||||
|
||||
#include "mongo/unittest/assert.h"
|
||||
#include "mongo/util/assert_util.h"
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
|
||||
|
||||
namespace {
|
||||
using namespace mongo;
|
||||
using namespace std::literals;
|
||||
|
||||
// Timeout to use to ensure that waiters get queued and/or receive tickets.
|
||||
// We use this timeout so we can bail-out early and fail with a better diagnostic when we appear to
|
||||
@ -59,6 +65,8 @@ public:
|
||||
std::make_unique<ScopedGlobalServiceContextForTest>(ServiceContext::make(
|
||||
nullptr, nullptr, std::make_unique<TickSourceMock<Microseconds>>()))) {}
|
||||
|
||||
static inline const Milliseconds kSleepTime{1};
|
||||
|
||||
void setUp() override {
|
||||
ServiceContextTest::setUp();
|
||||
_client = getServiceContext()->getService()->makeClient("test");
|
||||
@ -103,6 +111,15 @@ public:
|
||||
TicketHolder::ResizePolicy::kImmediate);
|
||||
}
|
||||
|
||||
template <std::invocable Predicate>
|
||||
void waitUntilCanceled(OperationContext& opCtx, Predicate predicate) {
|
||||
while (opCtx.checkForInterruptNoAssert() == Status::OK()) {
|
||||
if (predicate())
|
||||
return;
|
||||
sleepFor(kSleepTime);
|
||||
}
|
||||
ASSERT(false);
|
||||
}
|
||||
|
||||
protected:
|
||||
class Stats;
|
||||
@ -588,4 +605,101 @@ TEST_F(TicketHolderTest, ReleaseToPoolWakesWaiters) {
|
||||
_opCtx->runWithDeadline(
|
||||
getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] { allDone.get(_opCtx.get()); });
|
||||
}
|
||||
|
||||
TEST_F(TicketHolderImmediateResizeTest, WaitQueueMax0) {
|
||||
constexpr int initialNumTickets = 4;
|
||||
constexpr int maxNumberOfWaiters = 0;
|
||||
auto holder = std::make_unique<TicketHolder>(getServiceContext(),
|
||||
initialNumTickets,
|
||||
false /* trackPeakUsed */,
|
||||
TicketHolder::ResizePolicy::kImmediate,
|
||||
maxNumberOfWaiters);
|
||||
|
||||
// acquire 4 tickets
|
||||
std::array<MockAdmissionContext, 6> admCtxs;
|
||||
std::array<boost::optional<Ticket>, 4> tickets;
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
tickets[i] = holder->waitForTicket(_opCtx.get(), &admCtxs[i + 1]);
|
||||
}
|
||||
|
||||
// ensure 4 are now in-use and 0 are left available
|
||||
ASSERT_EQ(holder->used(), 4);
|
||||
ASSERT_EQ(holder->available(), 0);
|
||||
ASSERT_EQ(holder->outof(), 4);
|
||||
|
||||
// Since no thread can be waiting for a ticket, it will cause waiting
|
||||
ASSERT_THROWS(holder->waitForTicket(_opCtx.get(), &admCtxs[0]),
|
||||
ExceptionFor<ErrorCodes::AdmissionQueueOverflow>);
|
||||
|
||||
// Releasing the tickets, making all tickets available in the process
|
||||
tickets = {};
|
||||
|
||||
// ensure 0 are now in-use and 4 are left available
|
||||
ASSERT_EQ(holder->used(), 0);
|
||||
ASSERT_EQ(holder->available(), 4);
|
||||
ASSERT_EQ(holder->outof(), 4);
|
||||
}
|
||||
|
||||
TEST_F(TicketHolderImmediateResizeTest, WaitQueueMax1) {
|
||||
constexpr int initialNumTickets = 4;
|
||||
constexpr int maxNumberOfWaiters = 1;
|
||||
auto holder = std::make_unique<TicketHolder>(getServiceContext(),
|
||||
initialNumTickets,
|
||||
false /* trackPeakUsed */,
|
||||
TicketHolder::ResizePolicy::kImmediate,
|
||||
maxNumberOfWaiters);
|
||||
|
||||
// acquire 4 tickets
|
||||
std::array<MockAdmissionContext, 5> admCtxs;
|
||||
std::array<boost::optional<Ticket>, 4> tickets;
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
tickets[i] = holder->waitForTicket(_opCtx.get(), &admCtxs[i + 1]);
|
||||
}
|
||||
|
||||
// ensure 4 are now in-use and 0 are left available
|
||||
ASSERT_EQ(holder->used(), 4);
|
||||
ASSERT_EQ(holder->available(), 0);
|
||||
ASSERT_EQ(holder->outof(), 4);
|
||||
|
||||
// We aquire a ticket in another thread.
|
||||
// Since no ticket available, it will cause a blocking wait on that thread.
|
||||
MockAdmission releaseWaiterAdmission{getServiceContext(), AdmissionContext::Priority::kNormal};
|
||||
Future<Ticket> ticketFuture = spawn([&]() {
|
||||
return holder->waitForTicket(releaseWaiterAdmission.opCtx.get(),
|
||||
&releaseWaiterAdmission.admCtx);
|
||||
});
|
||||
|
||||
// We wait until ticketFuture is actually waiting for the ticket or until timeout exceeded
|
||||
_opCtx->runWithDeadline(getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
|
||||
waitUntilCanceled(*_opCtx, [&] { return holder->waiting_forTest() == 1; });
|
||||
});
|
||||
|
||||
// Since the maximum amount of ticket is one, and one is already waiting, it will throw
|
||||
ASSERT_THROWS(holder->waitForTicketUntil(_opCtx.get(), &admCtxs[0], getNextDeadline()),
|
||||
ExceptionFor<ErrorCodes::AdmissionQueueOverflow>);
|
||||
|
||||
// Releasing the tickets, resuming the waiter
|
||||
tickets = {};
|
||||
|
||||
// The fifth ticket is getting aquired after waiting
|
||||
boost::optional<Ticket> ticket;
|
||||
_opCtx->runWithDeadline(getNextDeadline(), ErrorCodes::ExceededTimeLimit, [&] {
|
||||
// We can gst a ticket when one is available
|
||||
ticket = std::move(ticketFuture).get(_opCtx.get());
|
||||
});
|
||||
|
||||
// ensure 3 are now available and 1 are in-use
|
||||
// The one in use was the waiting ticket
|
||||
ASSERT_EQ(holder->used(), 1);
|
||||
ASSERT_EQ(holder->available(), 3);
|
||||
ASSERT_EQ(holder->outof(), 4);
|
||||
|
||||
// Releasing the fifth ticket
|
||||
ticket.reset();
|
||||
|
||||
// ensure 4 are now available and 0 are in-use
|
||||
ASSERT_EQ(holder->used(), 0);
|
||||
ASSERT_EQ(holder->available(), 4);
|
||||
ASSERT_EQ(holder->outof(), 4);
|
||||
}
|
||||
} // namespace
|
||||
|
Loading…
Reference in New Issue
Block a user