mirror of
https://github.com/mongodb/mongo.git
synced 2024-12-01 01:21:03 +01:00
SERVER-15192 Make dbhash and storedFuncMod logOp listeners rollback-safe
This commit is contained in:
parent
628c4fb2d1
commit
ffb3d64e1a
@ -188,7 +188,9 @@ namespace mongo {
|
||||
|
||||
num++;
|
||||
|
||||
logOpForDbHash(ns.c_str());
|
||||
WriteUnitOfWork wuow(txn);
|
||||
logOpForDbHash(txn, ns.c_str());
|
||||
wuow.commit();
|
||||
}
|
||||
|
||||
result.append( "applied" , num );
|
||||
|
@ -56,8 +56,8 @@ namespace mongo {
|
||||
DBHashCmd dbhashCmd;
|
||||
|
||||
|
||||
void logOpForDbHash(const char* ns) {
|
||||
dbhashCmd.wipeCacheForCollection( ns );
|
||||
void logOpForDbHash(OperationContext* txn, const char* ns) {
|
||||
dbhashCmd.wipeCacheForCollection(txn, ns);
|
||||
}
|
||||
|
||||
// ----
|
||||
@ -216,11 +216,30 @@ namespace mongo {
|
||||
return 1;
|
||||
}
|
||||
|
||||
void DBHashCmd::wipeCacheForCollection( StringData ns ) {
|
||||
class DBHashCmd::DBHashLogOpHandler : public RecoveryUnit::Change {
|
||||
public:
|
||||
DBHashLogOpHandler(DBHashCmd* dCmd,
|
||||
StringData ns):
|
||||
_dCmd(dCmd),
|
||||
_ns(ns.toString()) {
|
||||
|
||||
}
|
||||
void commit() {
|
||||
scoped_lock lk( _dCmd->_cachedHashedMutex );
|
||||
_dCmd->_cachedHashed.erase(_ns);
|
||||
}
|
||||
void rollback() { }
|
||||
|
||||
private:
|
||||
DBHashCmd *_dCmd;
|
||||
const std::string _ns;
|
||||
};
|
||||
|
||||
void DBHashCmd::wipeCacheForCollection(OperationContext* txn,
|
||||
StringData ns) {
|
||||
if ( !isCachable( ns ) )
|
||||
return;
|
||||
scoped_lock lk( _cachedHashedMutex );
|
||||
_cachedHashed.erase( ns.toString() );
|
||||
txn->recoveryUnit()->registerChange(new DBHashLogOpHandler(this, ns));
|
||||
}
|
||||
|
||||
bool DBHashCmd::isCachable( StringData ns ) const {
|
||||
|
@ -34,7 +34,7 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
void logOpForDbHash( const char* ns );
|
||||
void logOpForDbHash( OperationContext* txn, const char* ns );
|
||||
|
||||
class DBHashCmd : public Command {
|
||||
public:
|
||||
@ -48,10 +48,15 @@ namespace mongo {
|
||||
|
||||
virtual bool run(OperationContext* txn, const std::string& dbname , BSONObj& cmdObj, int, std::string& errmsg, BSONObjBuilder& result, bool);
|
||||
|
||||
void wipeCacheForCollection( StringData ns );
|
||||
void wipeCacheForCollection(OperationContext* txn, StringData ns);
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* RecoveryUnit::Change subclass used to commit work for dbhash logOp listener
|
||||
*/
|
||||
class DBHashLogOpHandler;
|
||||
|
||||
bool isCachable( StringData ns ) const;
|
||||
|
||||
std::string hashCollection( OperationContext* opCtx, Database* db, const std::string& fullCollectionName, bool* fromCache );
|
||||
|
@ -384,6 +384,11 @@ namespace {
|
||||
//
|
||||
getGlobalAuthorizationManager()->logOp(txn, opstr, ns, obj, patt, b);
|
||||
logOpForSharding(txn, opstr, ns, obj, patt, fromMigrate);
|
||||
logOpForDbHash(txn, ns);
|
||||
|
||||
if ( strstr( ns, ".system.js" ) ) {
|
||||
Scope::storedFuncMod(txn);
|
||||
}
|
||||
|
||||
try {
|
||||
// TODO SERVER-15192 remove this once all listeners are rollback-safe.
|
||||
@ -395,11 +400,6 @@ namespace {
|
||||
}
|
||||
};
|
||||
txn->recoveryUnit()->registerChange(new RollbackPreventer());
|
||||
logOpForDbHash(ns);
|
||||
|
||||
if ( strstr( ns, ".system.js" ) ) {
|
||||
Scope::storedFuncMod(); // this is terrible
|
||||
}
|
||||
}
|
||||
catch (const DBException& ex) {
|
||||
severe() << "Fatal DBException in logOp(): " << ex.toString();
|
||||
|
@ -354,7 +354,7 @@ namespace mongo {
|
||||
|
||||
txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this,
|
||||
txn,
|
||||
ide,
|
||||
ide.wrap(),
|
||||
obj,
|
||||
op,
|
||||
notInActiveChunk));
|
||||
@ -705,13 +705,13 @@ namespace mongo {
|
||||
public:
|
||||
LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus,
|
||||
OperationContext* txn,
|
||||
const BSONElement& ide,
|
||||
const BSONObj& idObj,
|
||||
const BSONObj& obj,
|
||||
const char op,
|
||||
const bool notInActiveChunk):
|
||||
_migrateFromStatus(migrateFromStatus),
|
||||
_txn(txn),
|
||||
_ide(ide),
|
||||
_idObj(idObj.getOwned()),
|
||||
_obj(obj.getOwned()),
|
||||
_op(op),
|
||||
_notInActiveChunk(notInActiveChunk) {
|
||||
@ -736,8 +736,8 @@ namespace mongo {
|
||||
|
||||
scoped_lock sl(_migrateFromStatus->_mutex);
|
||||
// can't filter deletes :(
|
||||
_migrateFromStatus->_deleted.push_back( _ide.wrap() );
|
||||
_migrateFromStatus->_memoryUsed += _ide.size() + 5;
|
||||
_migrateFromStatus->_deleted.push_back(_idObj);
|
||||
_migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -750,9 +750,10 @@ namespace mongo {
|
||||
if (!Helpers::findById(_txn,
|
||||
ctx.db(),
|
||||
_migrateFromStatus->_ns.c_str(),
|
||||
_ide.wrap(),
|
||||
_idObj,
|
||||
it)) {
|
||||
warning() << "logOpForSharding couldn't find: " << _ide
|
||||
warning() << "logOpForSharding couldn't find: "
|
||||
<< _idObj.firstElement()
|
||||
<< " even though should have" << migrateLog;
|
||||
return;
|
||||
}
|
||||
@ -768,8 +769,8 @@ namespace mongo {
|
||||
}
|
||||
|
||||
scoped_lock sl(_migrateFromStatus->_mutex);
|
||||
_migrateFromStatus->_reload.push_back(_ide.wrap());
|
||||
_migrateFromStatus->_memoryUsed += _ide.size() + 5;
|
||||
_migrateFromStatus->_reload.push_back(_idObj);
|
||||
_migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
|
||||
}
|
||||
|
||||
virtual void rollback() { }
|
||||
@ -777,7 +778,7 @@ namespace mongo {
|
||||
private:
|
||||
MigrateFromStatus* _migrateFromStatus;
|
||||
OperationContext* _txn;
|
||||
const BSONElement _ide;
|
||||
const BSONObj _idObj;
|
||||
const BSONObj _obj;
|
||||
const char _op;
|
||||
const bool _notInActiveChunk;
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include "mongo/client/dbclientcursor.h"
|
||||
#include "mongo/client/dbclientinterface.h"
|
||||
#include "mongo/db/global_environment_experiment.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/platform/unordered_set.h"
|
||||
#include "mongo/util/file.h"
|
||||
#include "mongo/util/log.h"
|
||||
@ -56,7 +57,7 @@ namespace mongo {
|
||||
using std::set;
|
||||
using std::string;
|
||||
|
||||
long long Scope::_lastVersion = 1;
|
||||
AtomicInt64 Scope::_lastVersion(1);
|
||||
|
||||
namespace {
|
||||
// 2 GB is the largest support Javascript file size.
|
||||
@ -187,8 +188,16 @@ namespace {
|
||||
return exec(code, filename, printResult, reportError, timeoutMs);
|
||||
}
|
||||
|
||||
void Scope::storedFuncMod() {
|
||||
_lastVersion++;
|
||||
class Scope::StoredFuncModLogOpHandler : public RecoveryUnit::Change {
|
||||
public:
|
||||
void commit() {
|
||||
_lastVersion.fetchAndAdd(1);
|
||||
}
|
||||
void rollback() { }
|
||||
};
|
||||
|
||||
void Scope::storedFuncMod(OperationContext* txn) {
|
||||
txn->recoveryUnit()->registerChange(new StoredFuncModLogOpHandler());
|
||||
}
|
||||
|
||||
void Scope::validateObjectIdString(const string& str) {
|
||||
@ -204,10 +213,11 @@ namespace {
|
||||
uassert(10208, "need to have locallyConnected already", _localDBName.size());
|
||||
}
|
||||
|
||||
if (_loadedVersion == _lastVersion)
|
||||
int64_t lastVersion = _lastVersion.load();
|
||||
if (_loadedVersion == lastVersion)
|
||||
return;
|
||||
|
||||
_loadedVersion = _lastVersion;
|
||||
_loadedVersion = lastVersion;
|
||||
string coll = _localDBName + ".system.js";
|
||||
|
||||
scoped_ptr<DBClientBase> directDBClient(createDirectClient(txn));
|
||||
|
@ -31,6 +31,7 @@
|
||||
|
||||
#include "mongo/db/global_environment_experiment.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
|
||||
namespace mongo {
|
||||
typedef unsigned long long ScriptingFunction;
|
||||
@ -148,7 +149,7 @@ namespace mongo {
|
||||
* if any changes are made to .system.js, call this
|
||||
* right now its just global - slightly inefficient, but a lot simpler
|
||||
*/
|
||||
static void storedFuncMod();
|
||||
static void storedFuncMod(OperationContext *txn);
|
||||
|
||||
static void validateObjectIdString(const std::string& str);
|
||||
|
||||
@ -177,14 +178,21 @@ namespace mongo {
|
||||
|
||||
protected:
|
||||
friend class PooledScope;
|
||||
|
||||
/**
|
||||
* RecoveryUnit::Change subclass used to commit work for
|
||||
* Scope::storedFuncMod logOp listener.
|
||||
*/
|
||||
class StoredFuncModLogOpHandler;
|
||||
|
||||
virtual FunctionCacheMap& getFunctionCache() { return _cachedFunctions; }
|
||||
virtual ScriptingFunction _createFunction(const char* code,
|
||||
ScriptingFunction functionNumber = 0) = 0;
|
||||
|
||||
std::string _localDBName;
|
||||
long long _loadedVersion;
|
||||
int64_t _loadedVersion;
|
||||
std::set<std::string> _storedNames;
|
||||
static long long _lastVersion;
|
||||
static AtomicInt64 _lastVersion;
|
||||
FunctionCacheMap _cachedFunctions;
|
||||
int _numTimesUsed;
|
||||
bool _lastRetIsNativeCode; // v8 only: set to true if eval'd script returns a native func
|
||||
|
Loading…
Reference in New Issue
Block a user