mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 09:32:32 +01:00
SERVER-58766 Add session pool for internal sessions
This commit is contained in:
parent
914b9146ec
commit
791308c096
@ -39,6 +39,9 @@ let transactionsCollOnPrimary = shard0Primary.getCollection(kConfigTxnsNs);
|
|||||||
let imageCollOnPrimary = shard0Primary.getCollection(kImageCollNs);
|
let imageCollOnPrimary = shard0Primary.getCollection(kImageCollNs);
|
||||||
let testDB = shard0Primary.getDB(kDbName);
|
let testDB = shard0Primary.getDB(kDbName);
|
||||||
|
|
||||||
|
assert.commandWorked(testDB.createCollection(kCollName));
|
||||||
|
assert.commandWorked(shard0Primary.adminCommand({refreshLogicalSessionCacheNow: 1}));
|
||||||
|
|
||||||
const sessionUUID = UUID();
|
const sessionUUID = UUID();
|
||||||
const parentLsid = {
|
const parentLsid = {
|
||||||
id: sessionUUID
|
id: sessionUUID
|
||||||
|
@ -22,6 +22,7 @@ const testDB = st.s.getDB(kDbName);
|
|||||||
const sessionUUID = UUID();
|
const sessionUUID = UUID();
|
||||||
|
|
||||||
assert.commandWorked(testDB.runCommand({create: kCollName}));
|
assert.commandWorked(testDB.runCommand({create: kCollName}));
|
||||||
|
assert.commandWorked(shard0Primary.adminCommand({refreshLogicalSessionCacheNow: 1}));
|
||||||
|
|
||||||
(() => {
|
(() => {
|
||||||
jsTest.log(
|
jsTest.log(
|
||||||
|
@ -1388,11 +1388,13 @@ env.Library(
|
|||||||
env.Library(
|
env.Library(
|
||||||
target='logical_session_id_helpers',
|
target='logical_session_id_helpers',
|
||||||
source=[
|
source=[
|
||||||
|
'internal_session_pool.cpp',
|
||||||
'logical_session_id_helpers.cpp',
|
'logical_session_id_helpers.cpp',
|
||||||
],
|
],
|
||||||
LIBDEPS=[
|
LIBDEPS=[
|
||||||
'$BUILD_DIR/mongo/db/auth/auth',
|
'$BUILD_DIR/mongo/db/auth/auth',
|
||||||
'$BUILD_DIR/mongo/db/auth/authprivilege',
|
'$BUILD_DIR/mongo/db/auth/authprivilege',
|
||||||
|
'logical_session_cache',
|
||||||
'logical_session_id',
|
'logical_session_id',
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@ -2398,6 +2400,7 @@ if wiredtiger:
|
|||||||
'hasher_test.cpp',
|
'hasher_test.cpp',
|
||||||
'index_build_entry_helpers_test.cpp',
|
'index_build_entry_helpers_test.cpp',
|
||||||
'index_builds_coordinator_mongod_test.cpp',
|
'index_builds_coordinator_mongod_test.cpp',
|
||||||
|
'internal_session_pool_test.cpp',
|
||||||
'keypattern_test.cpp',
|
'keypattern_test.cpp',
|
||||||
'keys_collection_document_test.cpp',
|
'keys_collection_document_test.cpp',
|
||||||
'logical_session_cache_test.cpp',
|
'logical_session_cache_test.cpp',
|
||||||
|
129
src/mongo/db/internal_session_pool.cpp
Normal file
129
src/mongo/db/internal_session_pool.cpp
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (C) 2021-present MongoDB, Inc.
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the Server Side Public License, version 1,
|
||||||
|
* as published by MongoDB, Inc.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* Server Side Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the Server Side Public License
|
||||||
|
* along with this program. If not, see
|
||||||
|
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||||
|
*
|
||||||
|
* As a special exception, the copyright holders give permission to link the
|
||||||
|
* code of portions of this program with the OpenSSL library under certain
|
||||||
|
* conditions as described in each individual source file and distribute
|
||||||
|
* linked combinations including the program with the OpenSSL library. You
|
||||||
|
* must comply with the Server Side Public License in all respects for
|
||||||
|
* all of the code used other than as permitted herein. If you modify file(s)
|
||||||
|
* with this exception, you may extend this exception to your version of the
|
||||||
|
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||||
|
* delete this exception statement from your version. If you delete this
|
||||||
|
* exception statement from all source files in the program, then also delete
|
||||||
|
* it in the license file.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kControl
|
||||||
|
|
||||||
|
#include "mongo/platform/basic.h"
|
||||||
|
|
||||||
|
#include "mongo/db/internal_session_pool.h"
|
||||||
|
#include "mongo/db/logical_session_cache.h"
|
||||||
|
#include "mongo/db/logical_session_id_helpers.h"
|
||||||
|
#include "mongo/logv2/log.h"
|
||||||
|
|
||||||
|
namespace mongo {
|
||||||
|
|
||||||
|
const auto serviceDecorator = ServiceContext::declareDecoration<InternalSessionPool>();
|
||||||
|
|
||||||
|
auto InternalSessionPool::get(ServiceContext* serviceContext) -> InternalSessionPool* {
|
||||||
|
return &serviceDecorator(serviceContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto InternalSessionPool::get(OperationContext* opCtx) -> InternalSessionPool* {
|
||||||
|
return get(opCtx->getServiceContext());
|
||||||
|
}
|
||||||
|
|
||||||
|
InternalSessionPool::Session InternalSessionPool::acquire(OperationContext* opCtx) {
|
||||||
|
const InternalSessionPool::Session session = [&] {
|
||||||
|
stdx::lock_guard<Latch> lock(_mutex);
|
||||||
|
|
||||||
|
if (!_nonChildSessions.empty()) {
|
||||||
|
auto session = std::move(_nonChildSessions.top());
|
||||||
|
_nonChildSessions.pop();
|
||||||
|
return session;
|
||||||
|
} else {
|
||||||
|
auto lsid = makeSystemLogicalSessionId();
|
||||||
|
auto session = InternalSessionPool::Session(lsid, TxnNumber(0));
|
||||||
|
|
||||||
|
auto lsc = LogicalSessionCache::get(opCtx->getServiceContext());
|
||||||
|
uassertStatusOK(lsc->vivify(opCtx, lsid));
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
}();
|
||||||
|
|
||||||
|
LOGV2_DEBUG(5876600,
|
||||||
|
2,
|
||||||
|
"Acquired internal session",
|
||||||
|
"lsid"_attr = session.getSessionId(),
|
||||||
|
"txnNumber"_attr = session.getTxnNumber());
|
||||||
|
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
|
||||||
|
InternalSessionPool::Session InternalSessionPool::acquire(OperationContext* opCtx,
|
||||||
|
const LogicalSessionId& parentLsid) {
|
||||||
|
const InternalSessionPool::Session session = [&] {
|
||||||
|
stdx::lock_guard<Latch> lock(_mutex);
|
||||||
|
|
||||||
|
auto it = _childSessions.find(parentLsid);
|
||||||
|
if (it != _childSessions.end()) {
|
||||||
|
auto session = std::move(it->second);
|
||||||
|
_childSessions.erase(it);
|
||||||
|
return session;
|
||||||
|
} else {
|
||||||
|
auto lsid = LogicalSessionId{parentLsid.getId(), parentLsid.getUid()};
|
||||||
|
|
||||||
|
lsid.getInternalSessionFields().setTxnUUID(UUID::gen());
|
||||||
|
|
||||||
|
auto session = InternalSessionPool::Session(lsid, TxnNumber(0));
|
||||||
|
|
||||||
|
auto lsc = LogicalSessionCache::get(opCtx->getServiceContext());
|
||||||
|
uassertStatusOK(lsc->vivify(opCtx, lsid));
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
}();
|
||||||
|
|
||||||
|
LOGV2_DEBUG(5876601,
|
||||||
|
2,
|
||||||
|
"Acquired internal session with parent session",
|
||||||
|
"lsid"_attr = session.getSessionId(),
|
||||||
|
"txnNumber"_attr = session.getTxnNumber(),
|
||||||
|
"parentLsid"_attr = parentLsid);
|
||||||
|
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InternalSessionPool::release(Session session) {
|
||||||
|
LOGV2_DEBUG(5876602,
|
||||||
|
2,
|
||||||
|
"Released internal session",
|
||||||
|
"lsid"_attr = session.getSessionId(),
|
||||||
|
"txnNumber"_attr = session.getTxnNumber());
|
||||||
|
|
||||||
|
session.setTxnNumber(session.getTxnNumber() + 1);
|
||||||
|
if (session.getSessionId().getTxnUUID()) {
|
||||||
|
auto lsid = session.getSessionId();
|
||||||
|
stdx::lock_guard<Latch> lock(_mutex);
|
||||||
|
_childSessions.insert({std::move(lsid), std::move(session)});
|
||||||
|
} else {
|
||||||
|
stdx::lock_guard<Latch> lock(_mutex);
|
||||||
|
_nonChildSessions.push(std::move(session));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace mongo
|
89
src/mongo/db/internal_session_pool.h
Normal file
89
src/mongo/db/internal_session_pool.h
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (C) 2021-present MongoDB, Inc.
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the Server Side Public License, version 1,
|
||||||
|
* as published by MongoDB, Inc.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* Server Side Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the Server Side Public License
|
||||||
|
* along with this program. If not, see
|
||||||
|
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||||
|
*
|
||||||
|
* As a special exception, the copyright holders give permission to link the
|
||||||
|
* code of portions of this program with the OpenSSL library under certain
|
||||||
|
* conditions as described in each individual source file and distribute
|
||||||
|
* linked combinations including the program with the OpenSSL library. You
|
||||||
|
* must comply with the Server Side Public License in all respects for
|
||||||
|
* all of the code used other than as permitted herein. If you modify file(s)
|
||||||
|
* with this exception, you may extend this exception to your version of the
|
||||||
|
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||||
|
* delete this exception statement from your version. If you delete this
|
||||||
|
* exception statement from all source files in the program, then also delete
|
||||||
|
* it in the license file.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "mongo/db/operation_context.h"
|
||||||
|
|
||||||
|
#include <stack>
|
||||||
|
|
||||||
|
namespace mongo {
|
||||||
|
|
||||||
|
class InternalSessionPool {
|
||||||
|
|
||||||
|
public:
|
||||||
|
class Session {
|
||||||
|
friend class InternalSessionPool;
|
||||||
|
|
||||||
|
public:
|
||||||
|
Session(LogicalSessionId lsid, TxnNumber txnNumber)
|
||||||
|
: _lsid(std::move(lsid)), _txnNumber(txnNumber) {}
|
||||||
|
|
||||||
|
const LogicalSessionId& getSessionId() const {
|
||||||
|
return _lsid;
|
||||||
|
}
|
||||||
|
const TxnNumber& getTxnNumber() const {
|
||||||
|
return _txnNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void setSessionId(LogicalSessionId lsid) {
|
||||||
|
_lsid = std::move(lsid);
|
||||||
|
}
|
||||||
|
void setTxnNumber(TxnNumber txnNumber) {
|
||||||
|
_txnNumber = txnNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
LogicalSessionId _lsid;
|
||||||
|
TxnNumber _txnNumber;
|
||||||
|
};
|
||||||
|
|
||||||
|
InternalSessionPool() = default;
|
||||||
|
|
||||||
|
static InternalSessionPool* get(ServiceContext* serviceContext);
|
||||||
|
static InternalSessionPool* get(OperationContext* opCtx);
|
||||||
|
|
||||||
|
Session acquire(OperationContext* opCtx);
|
||||||
|
Session acquire(OperationContext* opCtx, const LogicalSessionId& parentLsid);
|
||||||
|
|
||||||
|
void release(Session);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// Used for associating parent lsids with existing Sessions of the form <id, uid, txnUUID>
|
||||||
|
LogicalSessionIdMap<Session> _childSessions;
|
||||||
|
|
||||||
|
// Used for standalone Sessions
|
||||||
|
std::stack<Session> _nonChildSessions;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Protects the internal data structures
|
||||||
|
mutable Mutex _mutex = MONGO_MAKE_LATCH("InternalSessionPool::_mutex");
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace mongo
|
152
src/mongo/db/internal_session_pool_test.cpp
Normal file
152
src/mongo/db/internal_session_pool_test.cpp
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (C) 2021-present MongoDB, Inc.
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the Server Side Public License, version 1,
|
||||||
|
* as published by MongoDB, Inc.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* Server Side Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the Server Side Public License
|
||||||
|
* along with this program. If not, see
|
||||||
|
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||||
|
*
|
||||||
|
* As a special exception, the copyright holders give permission to link the
|
||||||
|
* code of portions of this program with the OpenSSL library under certain
|
||||||
|
* conditions as described in each individual source file and distribute
|
||||||
|
* linked combinations including the program with the OpenSSL library. You
|
||||||
|
* must comply with the Server Side Public License in all respects for
|
||||||
|
* all of the code used other than as permitted herein. If you modify file(s)
|
||||||
|
* with this exception, you may extend this exception to your version of the
|
||||||
|
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||||
|
* delete this exception statement from your version. If you delete this
|
||||||
|
* exception statement from all source files in the program, then also delete
|
||||||
|
* it in the license file.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "mongo/platform/basic.h"
|
||||||
|
|
||||||
|
#include "mongo/db/internal_session_pool.h"
|
||||||
|
#include "mongo/db/logical_session_cache.h"
|
||||||
|
#include "mongo/db/logical_session_cache_impl.h"
|
||||||
|
#include "mongo/db/logical_session_id_helpers.h"
|
||||||
|
#include "mongo/db/service_context_test_fixture.h"
|
||||||
|
#include "mongo/db/service_liaison_mock.h"
|
||||||
|
#include "mongo/db/sessions_collection_mock.h"
|
||||||
|
#include "mongo/idl/server_parameter_test_util.h"
|
||||||
|
#include "mongo/unittest/unittest.h"
|
||||||
|
|
||||||
|
namespace mongo {
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
class TestInternalSessionPool : public InternalSessionPool {
|
||||||
|
public:
|
||||||
|
void reset() {
|
||||||
|
_childSessions = LogicalSessionIdMap<InternalSessionPool::Session>();
|
||||||
|
_nonChildSessions = std::stack<InternalSessionPool::Session>();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
class InternalSessionPoolTest : public ServiceContextTest {
|
||||||
|
public:
|
||||||
|
void setUp() override {
|
||||||
|
ServiceContextTest::setUp();
|
||||||
|
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
|
||||||
|
_pool.reset();
|
||||||
|
|
||||||
|
_opCtx = makeOperationContext();
|
||||||
|
|
||||||
|
auto localServiceLiaison =
|
||||||
|
std::make_unique<MockServiceLiaison>(std::make_shared<MockServiceLiaisonImpl>());
|
||||||
|
auto localSessionsCollection = std::make_unique<MockSessionsCollection>(
|
||||||
|
std::make_shared<MockSessionsCollectionImpl>());
|
||||||
|
|
||||||
|
auto localLogicalSessionCache = std::make_unique<LogicalSessionCacheImpl>(
|
||||||
|
std::move(localServiceLiaison),
|
||||||
|
std::move(localSessionsCollection),
|
||||||
|
[](OperationContext*, SessionsCollection&, Date_t) {
|
||||||
|
return 0; /* No op*/
|
||||||
|
});
|
||||||
|
|
||||||
|
LogicalSessionCache::set(getServiceContext(), std::move(localLogicalSessionCache));
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationContext* opCtx() const {
|
||||||
|
return _opCtx.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
TestInternalSessionPool _pool;
|
||||||
|
RAIIServerParameterControllerForTest _featureFlagInternalTransactions{
|
||||||
|
"featureFlagInternalTransactions", true};
|
||||||
|
|
||||||
|
private:
|
||||||
|
ServiceContext::UniqueOperationContext _opCtx;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(InternalSessionPoolTest, AcquireWithoutParentSessionFromEmptyPool) {
|
||||||
|
auto session = _pool.acquire(opCtx());
|
||||||
|
ASSERT_EQ(TxnNumber(0), session.getTxnNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InternalSessionPoolTest, AcquireWithParentSessionFromEmptyPool) {
|
||||||
|
auto parentLsid = makeLogicalSessionIdForTest();
|
||||||
|
auto session = _pool.acquire(opCtx(), parentLsid);
|
||||||
|
|
||||||
|
ASSERT_EQ(parentLsid, *getParentSessionId(session.getSessionId()));
|
||||||
|
ASSERT_EQ(TxnNumber(0), session.getTxnNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InternalSessionPoolTest, AcquireWithoutParentSessionFromPool) {
|
||||||
|
auto expectedLsid = makeLogicalSessionIdForTest();
|
||||||
|
auto sessionToRelease = InternalSessionPool::Session(expectedLsid, TxnNumber(0));
|
||||||
|
_pool.release(sessionToRelease);
|
||||||
|
|
||||||
|
auto session = _pool.acquire(opCtx());
|
||||||
|
|
||||||
|
ASSERT_EQ(expectedLsid, session.getSessionId());
|
||||||
|
|
||||||
|
// txnNumber should be 1 larger than the released session.
|
||||||
|
ASSERT_EQ(TxnNumber(1), session.getTxnNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InternalSessionPoolTest, AcquireWithParentSessionFromPoolWithoutParentEntry) {
|
||||||
|
LogicalSessionId parentLsid1 = makeLogicalSessionIdForTest();
|
||||||
|
LogicalSessionId parentLsid2 = makeLogicalSessionIdForTest();
|
||||||
|
|
||||||
|
auto parentSession1 = InternalSessionPool::Session(parentLsid1, TxnNumber(1));
|
||||||
|
_pool.release(parentSession1);
|
||||||
|
|
||||||
|
auto session = _pool.acquire(opCtx(), parentLsid2);
|
||||||
|
|
||||||
|
ASSERT_NOT_EQUALS(parentLsid1, session.getSessionId());
|
||||||
|
ASSERT_EQ(TxnNumber(0), session.getTxnNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InternalSessionPoolTest, AcquireWithParentSessionFromPoolWithParentEntry) {
|
||||||
|
LogicalSessionId parentLsid1 = makeLogicalSessionIdForTest();
|
||||||
|
LogicalSessionId parentLsid2 = makeLogicalSessionIdForTest();
|
||||||
|
|
||||||
|
// Set txnUUID for parentLsids.
|
||||||
|
parentLsid1.getInternalSessionFields().setTxnUUID(UUID::gen());
|
||||||
|
parentLsid2.getInternalSessionFields().setTxnUUID(UUID::gen());
|
||||||
|
|
||||||
|
auto parentSession1 = InternalSessionPool::Session(parentLsid1, TxnNumber(1));
|
||||||
|
_pool.release(parentSession1);
|
||||||
|
|
||||||
|
auto parentSession2 = InternalSessionPool::Session(parentLsid2, TxnNumber(2));
|
||||||
|
_pool.release(parentSession2);
|
||||||
|
|
||||||
|
auto childSession2 = _pool.acquire(opCtx(), parentLsid2);
|
||||||
|
|
||||||
|
ASSERT_NOT_EQUALS(parentLsid1, childSession2.getSessionId());
|
||||||
|
ASSERT_EQ(parentLsid2, childSession2.getSessionId());
|
||||||
|
|
||||||
|
// txnNumber should be 1 larger than the released parent session.
|
||||||
|
ASSERT_EQ(TxnNumber(3), childSession2.getTxnNumber());
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
} // namespace mongo
|
@ -267,7 +267,8 @@ SemiFuture<void> ShardingDDLCoordinator::run(std::shared_ptr<executor::ScopedTas
|
|||||||
if (status.isOK() && session) {
|
if (status.isOK() && session) {
|
||||||
// Return lsid to the SessionCache. If status is not OK, let the lsid be
|
// Return lsid to the SessionCache. If status is not OK, let the lsid be
|
||||||
// discarded.
|
// discarded.
|
||||||
SessionCache::get(opCtx)->release(*session);
|
InternalSessionPool::get(opCtx)->release(
|
||||||
|
{session->getLsid(), session->getTxnNumber()});
|
||||||
}
|
}
|
||||||
} catch (DBException& ex) {
|
} catch (DBException& ex) {
|
||||||
static constexpr auto errMsg = "Failed to release sharding DDL coordinator";
|
static constexpr auto errMsg = "Failed to release sharding DDL coordinator";
|
||||||
@ -324,49 +325,4 @@ SemiFuture<void> ShardingDDLCoordinator_NORESILIENT::run(OperationContext* opCtx
|
|||||||
return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
|
return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto serviceDecorator =
|
|
||||||
ServiceContext::declareDecoration<ShardingDDLCoordinator::SessionCache>();
|
|
||||||
|
|
||||||
auto ShardingDDLCoordinator::SessionCache::get(ServiceContext* serviceContext) -> SessionCache* {
|
|
||||||
return &serviceDecorator(serviceContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto ShardingDDLCoordinator::SessionCache::get(OperationContext* opCtx) -> SessionCache* {
|
|
||||||
return get(opCtx->getServiceContext());
|
|
||||||
}
|
|
||||||
|
|
||||||
ShardingDDLSession ShardingDDLCoordinator::SessionCache::acquire() {
|
|
||||||
const ShardingDDLSession session = [&] {
|
|
||||||
stdx::unique_lock<Latch> lock(_cacheMutex);
|
|
||||||
|
|
||||||
if (!_cache.empty()) {
|
|
||||||
auto session = std::move(_cache.top());
|
|
||||||
_cache.pop();
|
|
||||||
return session;
|
|
||||||
} else {
|
|
||||||
return ShardingDDLSession(makeSystemLogicalSessionId(), TxnNumber(0));
|
|
||||||
}
|
|
||||||
}();
|
|
||||||
|
|
||||||
LOGV2_DEBUG(5565606,
|
|
||||||
2,
|
|
||||||
"Acquired new DDL logical session",
|
|
||||||
"lsid"_attr = session.getLsid(),
|
|
||||||
"txnNumber"_attr = session.getTxnNumber());
|
|
||||||
|
|
||||||
return session;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ShardingDDLCoordinator::SessionCache::release(ShardingDDLSession session) {
|
|
||||||
LOGV2_DEBUG(5565607,
|
|
||||||
2,
|
|
||||||
"Released DDL logical session",
|
|
||||||
"lsid"_attr = session.getLsid(),
|
|
||||||
"highestUsedTxnNumber"_attr = session.getTxnNumber());
|
|
||||||
|
|
||||||
session.setTxnNumber(session.getTxnNumber() + 1);
|
|
||||||
stdx::unique_lock<Latch> lock(_cacheMutex);
|
|
||||||
_cache.push(std::move(session));
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace mongo
|
} // namespace mongo
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "mongo/db/internal_session_pool.h"
|
||||||
#include "mongo/db/namespace_string.h"
|
#include "mongo/db/namespace_string.h"
|
||||||
#include "mongo/db/operation_context.h"
|
#include "mongo/db/operation_context.h"
|
||||||
#include "mongo/db/persistent_task_store.h"
|
#include "mongo/db/persistent_task_store.h"
|
||||||
@ -90,26 +91,6 @@ public:
|
|||||||
return metadata().getForwardableOpMetadata().get();
|
return metadata().getForwardableOpMetadata().get();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cached LSIDs shared between DDL coordinator instances
|
|
||||||
class SessionCache {
|
|
||||||
|
|
||||||
public:
|
|
||||||
SessionCache() = default;
|
|
||||||
|
|
||||||
static SessionCache* get(ServiceContext* serviceContext);
|
|
||||||
static SessionCache* get(OperationContext* opCtx);
|
|
||||||
|
|
||||||
ShardingDDLSession acquire();
|
|
||||||
|
|
||||||
void release(ShardingDDLSession);
|
|
||||||
|
|
||||||
private:
|
|
||||||
std::stack<ShardingDDLSession> _cache;
|
|
||||||
|
|
||||||
// Protects _cache.
|
|
||||||
mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("SessionCache::_cacheMutex");
|
|
||||||
};
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) {
|
virtual std::vector<StringData> _acquireAdditionalLocks(OperationContext* opCtx) {
|
||||||
return {};
|
return {};
|
||||||
@ -152,7 +133,9 @@ protected:
|
|||||||
optSession->setTxnNumber(++txnNumber);
|
optSession->setTxnNumber(++txnNumber);
|
||||||
newShardingDDLCoordinatorMetadata.setSession(optSession);
|
newShardingDDLCoordinatorMetadata.setSession(optSession);
|
||||||
} else {
|
} else {
|
||||||
newShardingDDLCoordinatorMetadata.setSession(SessionCache::get(opCtx)->acquire());
|
auto session = InternalSessionPool::get(opCtx)->acquire(opCtx);
|
||||||
|
newShardingDDLCoordinatorMetadata.setSession(
|
||||||
|
ShardingDDLSession(session.getSessionId(), session.getTxnNumber()));
|
||||||
}
|
}
|
||||||
|
|
||||||
StateDoc newDoc(doc);
|
StateDoc newDoc(doc);
|
||||||
|
Loading…
Reference in New Issue
Block a user