diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index bd522ebf4bc..4cebed3eb61 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -441,7 +441,6 @@ public: ~BulkBuilder() { _cursor->close(_cursor); - WiredTigerRecoveryUnit::get(_txn)->getSessionCache()->releaseSession(_session); } protected: @@ -468,7 +467,7 @@ protected: const Ordering _ordering; OperationContext* const _txn; - WiredTigerSession* const _session; + UniqueWiredTigerSession const _session; WT_CURSOR* const _cursor; }; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index d37bc706093..d7590c3a6fe 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1479,11 +1479,10 @@ Status WiredTigerRecordStore::compact(OperationContext* txn, CompactStats* stats) { WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(txn)->getSessionCache(); if (!cache->isEphemeral()) { - WiredTigerSession* session = cache->getSession(); + UniqueWiredTigerSession session = cache->getSession(); WT_SESSION* s = session->getSession(); int ret = s->compact(s, getURI().c_str(), "timeout=0"); invariantWTOK(ret); - cache->releaseSession(session); } return Status::OK(); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 7a41d0d7341..64ef576378a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -47,7 +47,6 @@ namespace mongo { WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc) : _sessionCache(sc), - _session(NULL), _inUnitOfWork(false), _active(false), _myTransactionCount(1), @@ -56,10 +55,6 @@ WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc) WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() { invariant(!_inUnitOfWork); _abort(); - if (_session) { - _sessionCache->releaseSession(_session); - _session = NULL; - } } void WiredTigerRecoveryUnit::reportState(BSONObjBuilder* b) const { @@ -174,12 +169,12 @@ WiredTigerSession* WiredTigerRecoveryUnit::getSession(OperationContext* opCtx) { if (!_active) { _txnOpen(opCtx); } - return _session; + return _session.get(); } WiredTigerSession* WiredTigerRecoveryUnit::getSessionNoTxn(OperationContext* opCtx) { _ensureSession(); - return _session; + return _session.get(); } void WiredTigerRecoveryUnit::abandonSnapshot() { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index b70b4bebfea..63e49c06c86 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -39,14 +39,13 @@ #include "mongo/db/record_id.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/storage/snapshot_name.h" +#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/timer.h" namespace mongo { class BSONObjBuilder; -class WiredTigerSession; -class WiredTigerSessionCache; class WiredTigerRecoveryUnit final : public RecoveryUnit { public: @@ -128,7 +127,7 @@ private: void _txnOpen(OperationContext* opCtx); WiredTigerSessionCache* _sessionCache; // not owned - WiredTigerSession* _session; // owned, but from pool + UniqueWiredTigerSession _session; bool _areWriteUnitOfWorksBanned = false; bool _inUnitOfWork; bool _active; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index 6f27a7bb6d9..4acc3e34e05 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -49,6 +49,16 @@ WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, int epoch) invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); } +WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn, WiredTigerSessionCache* cache, int epoch) + : _epoch(epoch), + _cache(cache), + _session(NULL), + _cursorGen(0), + _cursorsCached(0), + _cursorsOut(0) { + invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session)); +} + WiredTigerSession::~WiredTigerSession() { if (_session) { invariantWTOK(_session->close(_session, NULL)); @@ -164,8 +174,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { // When forcing a checkpoint with journaling enabled, don't synchronize with other // waiters, as a log flush is much cheaper than a full checkpoint. if (forceCheckpoint && _engine->isDurable()) { - WiredTigerSession* session = getSession(); - ON_BLOCK_EXIT([this, session] { releaseSession(session); }); + UniqueWiredTigerSession session = getSession(); WT_SESSION* s = session->getSession(); { stdx::unique_lock lk(_journalListenerMutex); @@ -189,8 +198,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { _lastSyncTime.store(current + 1); // Nobody has synched yet, so we have to sync ourselves. - WiredTigerSession* session = getSession(); - ON_BLOCK_EXIT([this, session] { releaseSession(session); }); + auto session = getSession(); WT_SESSION* s = session->getSession(); // This gets the token (OpTime) from the last write, before flushing (either the journal, or a @@ -228,7 +236,7 @@ bool WiredTigerSessionCache::isEphemeral() { return _engine && _engine->isEphemeral(); } -WiredTigerSession* WiredTigerSessionCache::getSession() { +UniqueWiredTigerSession WiredTigerSessionCache::getSession() { // We should never be able to get here after _shuttingDown is set, because no new // operations should be allowed to start. invariant(!(_shuttingDown.loadRelaxed() & kShuttingDownMask)); @@ -240,12 +248,12 @@ WiredTigerSession* WiredTigerSessionCache::getSession() { // discarding older ones WiredTigerSession* cachedSession = _sessions.back(); _sessions.pop_back(); - return cachedSession; + return UniqueWiredTigerSession(cachedSession); } } // Outside of the cache partition lock, but on release will be put back on the cache - return new WiredTigerSession(_conn, _epoch.load()); + return UniqueWiredTigerSession(new WiredTigerSession(_conn, this, _epoch.load())); } void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { @@ -291,8 +299,14 @@ void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) { _engine->dropAllQueued(); } + void WiredTigerSessionCache::setJournalListener(JournalListener* jl) { stdx::unique_lock lk(_journalListenerMutex); _journalListener = jl; } + +void WiredTigerSessionCache::WiredTigerSessionDeleter::operator()( + WiredTigerSession* session) const { + session->_cache->releaseSession(session); } +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h index d5354101ac7..9ec468d9ffc 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.h @@ -1,7 +1,7 @@ // wiredtiger_session_cache.h /** - * Copyright (C) 2014 MongoDB Inc. + * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, @@ -46,6 +46,7 @@ namespace mongo { class WiredTigerKVEngine; +class WiredTigerSessionCache; class WiredTigerCachedCursor { public: @@ -68,14 +69,23 @@ public: * Creates a new WT session on the specified connection. * * @param conn WT connection - * @param cachePartition If the session comes from the session cache, this indicates to - * which partition it should be returned. Value of -1 means it doesn't come from - * cache and that it should not be cached, but closed directly. * @param epoch In which session cache cleanup epoch was this session instantiated. Value * of -1 means that this value is not necessary since the session will not be * cached. */ WiredTigerSession(WT_CONNECTION* conn, int epoch = -1); + + /** + * Creates a new WT session on the specified connection. + * + * @param conn WT connection + * @param cache The WiredTigerSessionCache that owns this session. + * @param epoch In which session cache cleanup epoch was this session instantiated. Value + * of -1 means that this value is not necessary since the session will not be + * cached. + */ + WiredTigerSession(WT_CONNECTION* conn, WiredTigerSessionCache* cache, int epoch = -1); + ~WiredTigerSession(); WT_SESSION* getSession() const { @@ -111,8 +121,9 @@ private: } const uint64_t _epoch; - WT_SESSION* _session; // owned - CursorCache _cursors; // owned + WiredTigerSessionCache* _cache; // not owned + WT_SESSION* _session; // owned + CursorCache _cursors; // owned uint64_t _cursorGen; int _cursorsCached, _cursorsOut; }; @@ -128,17 +139,19 @@ public: ~WiredTigerSessionCache(); /** - * Returns a previously released session for reuse, or creates a new session. + * This deleter automatically releases WiredTigerSession objects when no longer needed. + */ + class WiredTigerSessionDeleter { + public: + void operator()(WiredTigerSession* session) const; + }; + + /** + * Returns a smart pointer to a previously released session for reuse, or creates a new session. * This method must only be called while holding the global lock to avoid races with * shuttingDown, but otherwise is thread safe. */ - WiredTigerSession* getSession(); - - /** - * Returns a session to the cache for later reuse. If closeAll was called between getting this - * session and releasing it, the session is directly released. This method is thread safe. - */ - void releaseSession(WiredTigerSession* session); + std::unique_ptr getSession(); /** * Free all cached sessions and ensures that previously acquired sessions will be freed on @@ -200,5 +213,18 @@ private: JournalListener* _journalListener = &NoOpJournalListener::instance; // Protects _journalListener. stdx::mutex _journalListenerMutex; + + /** + * Returns a session to the cache for later reuse. If closeAll was called between getting this + * session and releasing it, the session is directly released. This method is thread safe. + */ + void releaseSession(WiredTigerSession* session); }; + +/** + * A unique handle type for WiredTigerSession pointers obtained from a WiredTigerSessionCache. + */ +typedef std::unique_ptr + UniqueWiredTigerSession; } // namespace