0
0
mirror of https://github.com/mongodb/mongo.git synced 2024-12-01 09:32:32 +01:00

Merge branch 'master' of github.com:mongodb/mongo

This commit is contained in:
dwight 2011-05-10 12:40:19 -04:00
commit 9b52904dc6
5 changed files with 79 additions and 223 deletions

View File

@ -243,18 +243,18 @@ namespace mongo {
return s;
}
void addUnlockOID( OID oid ) {
void addUnlockOID( const OID& oid ) {
// Modifying the lock from some other thread
scoped_lock lk( _mutex );
_oldLockOIDs.push_back( oid );
}
bool willUnlockOID( OID oid ) {
bool willUnlockOID( const OID& oid ) {
scoped_lock lk( _mutex );
return find( _oldLockOIDs.begin(), _oldLockOIDs.end(), oid ) != _oldLockOIDs.end();
}
void kill( ConnectionString& conn, string& processId ) {
void kill( const ConnectionString& conn, const string& processId ) {
// Make sure we're in a consistent state before other threads can see us
scoped_lock lk( _mutex );
@ -265,11 +265,11 @@ namespace mongo {
}
bool shouldKill( ConnectionString& conn, string& processId ) {
bool shouldKill( const ConnectionString& conn, const string& processId ) {
return _kill.count( pingThreadId( conn, processId ) ) > 0;
}
void finishKill( ConnectionString& conn, string& processId ) {
void finishKill( const ConnectionString& conn, const string& processId ) {
// Make sure we're in a consistent state before other threads can see us
scoped_lock lk( _mutex );
@ -295,39 +295,14 @@ namespace mongo {
* Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is
* specified (time between pings)
*/
DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess, bool legacy)
: _conn(conn) , _name(name) , _lockTimeout(lockTimeout), _takeoverMinutes(0), _lastPingCheck(string(""), (mongo::Date_t) 0, (mongo::Date_t) 0) {
_id = BSON( "_id" << name );
_ns = locksNS;
_maxClockSkew = 0;
_maxNetSkew = 0;
_lockPing = 0;
// If this is a legacy lock, set our takeover minutes for local time comparisons
if(legacy) {
_takeoverMinutes = (unsigned) _lockTimeout;
if(_takeoverMinutes == 0) _takeoverMinutes = 15;
_lockTimeout = 0;
_lockPing = _maxClockSkew = _maxNetSkew = (_takeoverMinutes * 60 * 1000) / LOCK_SKEW_FACTOR;
}
else {
if(lockTimeout == 0) _lockTimeout = lockTimeout = LOCK_TIMEOUT;
//if(lockPing == 0) lockPing = LOCK_PING;
_lockPing = _maxClockSkew = _maxNetSkew = _lockTimeout / LOCK_SKEW_FACTOR;
}
// If we're emulating a new process for this lock, generate a processId
if(asProcess) _processId = getDistLockId();
else _processId = getDistLockProcess();
DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess )
: _conn(conn) , _name(name) , _id( BSON( "_id" << name ) ), _processId( asProcess ? getDistLockId() : getDistLockProcess() ),
_lockTimeout( lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout ), _maxClockSkew( _lockTimeout / LOCK_SKEW_FACTOR ), _maxNetSkew( _maxClockSkew ), _lockPing( _maxClockSkew ),
_lastPingCheck( string(""), (mongo::Date_t) 0, (mongo::Date_t) 0, OID() )
{
log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn
<< " ( lock timeout : " << _lockTimeout << ", legacy timeout : " << _takeoverMinutes
<< ", ping interval : " << _lockPing << ", process : " << asProcess
<< ", legacy : " << legacy << " )" << endl;
<< " ( lock timeout : " << _lockTimeout
<< ", ping interval : " << _lockPing << ", process : " << asProcess << endl;
}
Date_t DistributedLock::getRemoteTime() {
@ -494,13 +469,13 @@ namespace mongo {
{
// make sure its there so we can use simple update logic below
BSONObj o = conn->findOne( _ns , _id ).getOwned();
BSONObj o = conn->findOne( locksNS , _id ).getOwned();
// Case 1: No locks
if ( o.isEmpty() ) {
try {
log( logLvl ) << "inserting initial doc in " << _ns << " for lock " << _name << endl;
conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) );
log( logLvl ) << "inserting initial doc in " << locksNS << " for lock " << _name << endl;
conn->insert( locksNS , BSON( "_id" << _name << "state" << 0 << "who" << "" ) );
}
catch ( UserException& e ) {
warning() << "could not insert initial doc for distributed lock " << _name << causedBy( e ) << endl;
@ -525,77 +500,47 @@ namespace mongo {
BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) );
if ( lastPing.isEmpty() ) {
if(_lockTimeout > 0) {
log( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl;
// TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
lastPing = BSON( "_id" << o["process"].String() << "ping" << (Date_t) 0 );
}
else {
// LEGACY
// if a lock is taken but there's no ping for it, we're in an inconsistent situation
// if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed
// but we'd require analysis of the situation before a manual intervention
warning() << "config.locks: " << _name << " lock is taken by old process? "
<< "remove the following lock if the process is not active anymore: " << o << endl;
*other = o; other->getOwned(); conn.done();
return false;
}
log( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl;
// TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
lastPing = BSON( "_id" << o["process"].String() << "ping" << (Date_t) 0 );
}
unsigned long long elapsed = 0;
unsigned long long takeover = ( _lockTimeout > 0 ? _lockTimeout : _takeoverMinutes);
if( _lockTimeout > 0 ) {
unsigned long long takeover = _lockTimeout;
log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl;
log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl;
try {
try {
Date_t remote = remoteTime( _conn );
Date_t remote = remoteTime( _conn );
// Timeout the elapsed time using comparisons of remote clock
// For non-finalized locks, timeout 15 minutes since last seen (ts)
// For finalized locks, timeout 15 minutes since last ping
bool recPingChange = o["state"].numberInt() == 2 && ( _lastPingCheck.get<0>() != lastPing["_id"].String() || _lastPingCheck.get<1>() != lastPing["ping"].Date() );
bool recTSChange = o["state"].numberInt() == 1 && _lastPingCheck.get<3>() != o["ts"].OID();
if( recPingChange || recTSChange ) {
// If the ping has changed since we last checked, mark the current date and time
_lastPingCheck = make_tuple(lastPing["_id"].String(), lastPing["ping"].Date(), remote, o["ts"].OID() );
}
else {
// GOTCHA! Due to network issues, it is possible that the current time
// is less than the remote time. We *have* to check this here, otherwise
// we overflow and our lock breaks.
if(_lastPingCheck.get<2>() >= remote)
elapsed = 0;
else
elapsed = remote - _lastPingCheck.get<2>();
}
// Timeout the elapsed time using comparisons of remote clock
// For non-finalized locks, timeout 15 minutes since last seen (ts)
// For finalized locks, timeout 15 minutes since last ping
bool recPingChange = o["state"].numberInt() == 2 && ( _lastPingCheck.get<0>() != lastPing["_id"].String() || _lastPingCheck.get<1>() != lastPing["ping"].Date() );
bool recTSChange = o["state"].numberInt() == 1 && _lastPingCheck.get<3>() != o["ts"].OID();
if( recPingChange || recTSChange ) {
// If the ping has changed since we last checked, mark the current date and time
_lastPingCheck = make_tuple(lastPing["_id"].String(), lastPing["ping"].Date(), remote, o["ts"].OID() );
}
catch( LockException& e ) {
// Remote server cannot be found / is not responsive
warning() << "Could not get remote time from " << _conn << causedBy( e );
else {
// GOTCHA! Due to network issues, it is possible that the current time
// is less than the remote time. We *have* to check this here, otherwise
// we overflow and our lock breaks.
if(_lastPingCheck.get<2>() >= remote)
elapsed = 0;
else
elapsed = remote - _lastPingCheck.get<2>();
}
}
else {
// LEGACY
catch( LockException& e ) {
// GOTCHA! If jsTime() (current time) is less than the remote time,
// we should definitely not break the lock. However, if we don't check
// this here, we get an invalid unsigned elapsed, which is ginormous and
// causes the lock to be forced.
if(lastPing["ping"].Date() > jsTime())
elapsed = 0;
else
elapsed = jsTime() - lastPing["ping"].Date(); // in ms
// Remote server cannot be found / is not responsive
warning() << "Could not get remote time from " << _conn << causedBy( e );
elapsed = elapsed / ( 1000 * 60 ); // convert to minutes
}
if ( elapsed <= takeover && ! canReenter ) {
@ -625,7 +570,7 @@ namespace mongo {
// Make sure we break the lock with the correct "ts" (OID) value, otherwise
// we can overwrite a new lock inserted in the meantime.
conn->update( _ns , BSON( "_id" << _id["_id"].String() << "state" << o["state"].numberInt() << "ts" << o["ts"] ),
conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << o["state"].numberInt() << "ts" << o["ts"] ),
BSON( "$set" << BSON( "state" << 0 ) ) );
BSONObj err = conn->getLastErrorDetailed();
@ -652,8 +597,7 @@ namespace mongo {
}
// Lock forced, reset our timer
if(_lockTimeout > 0)
_lastPingCheck = make_tuple(string(""), 0, 0, OID());
_lastPingCheck = make_tuple(string(""), 0, 0, OID());
}
else {
@ -664,7 +608,7 @@ namespace mongo {
try {
// Test the lock with the correct "ts" (OID) value
conn->update( _ns , BSON( "_id" << _id["_id"].String() << "state" << 2 << "ts" << o["ts"] ),
conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << 2 << "ts" << o["ts"] ),
BSON( "$set" << BSON( "state" << 2 ) ) );
BSONObj err = conn->getLastErrorDetailed();
@ -694,8 +638,7 @@ namespace mongo {
// Lock re-entered, reset our timer
// Not strictly necessary, but helpful for small timeouts where thread scheduling is significant.
// This ensures that a force can only happen after one unsuccessful force or reacquire attempt.
if(_lockTimeout > 0)
_lastPingCheck = make_tuple(string(""), 0, 0, OID());
_lastPingCheck = make_tuple(string(""), 0, 0, OID());
log( logLvl - 1 ) << "re-entered distributed lock '" << lockName << "'" << endl;
*other = o; other->getOwned(); conn.done();
@ -732,12 +675,12 @@ namespace mongo {
<< lockDetails.jsonString(Strict, true) << "\n"
<< query.jsonString(Strict, true) << endl;
conn->update( _ns , query , whatIWant );
conn->update( locksNS , query , whatIWant );
BSONObj err = conn->getLastErrorDetailed();
string errMsg = DBClientWithCommands::getLastErrorString(err);
currLock = conn->findOne( _ns , _id );
currLock = conn->findOne( locksNS , _id );
if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "could not acquire lock '" << lockName << "' "
@ -767,7 +710,7 @@ namespace mongo {
try {
indUpdate = indDB->findOne( _ns , _id );
indUpdate = indDB->findOne( locksNS , _id );
// If we override this lock in any way, grab and protect it.
// We assume/ensure that if a process does not have all lock documents, it is no longer
@ -783,9 +726,9 @@ namespace mongo {
// Either our update will succeed, and we'll grab the lock, or it will fail b/c some other
// process grabbed the lock (which will change the ts), but the lock will be set until forcing
indDB->update( _ns, grabQuery, BSON( "$set" << grabChanges ) );
indDB->update( locksNS, grabQuery, BSON( "$set" << grabChanges ) );
indUpdate = indDB->findOne( _ns, _id );
indUpdate = indDB->findOne( locksNS, _id );
// Our lock should now be set until forcing.
assert( indUpdate["state"].numberInt() == 1 );
@ -854,12 +797,12 @@ namespace mongo {
else finalLockDetails.append( el );
}
conn->update( _ns , _id , BSON( "$set" << finalLockDetails.obj() ) );
conn->update( locksNS , _id , BSON( "$set" << finalLockDetails.obj() ) );
BSONObj err = conn->getLastErrorDetailed();
string errMsg = DBClientWithCommands::getLastErrorString(err);
currLock = conn->findOne( _ns , _id );
currLock = conn->findOne( locksNS , _id );
if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
warning() << "could not finalize winning lock " << lockName
@ -920,7 +863,7 @@ namespace mongo {
try {
if( oldLock.isEmpty() )
oldLock = conn->findOne( _ns, _id );
oldLock = conn->findOne( locksNS, _id );
if( oldLock["state"].eoo() || oldLock["state"].numberInt() != 2 || oldLock["ts"].eoo() ) {
warning() << "cannot unlock invalid distributed lock " << oldLock << endl;
@ -929,7 +872,7 @@ namespace mongo {
}
// Use ts when updating lock, so that new locks can be sure they won't get trampled.
conn->update( _ns ,
conn->update( locksNS ,
BSON( "_id" << _id["_id"].String() << "ts" << oldLock["ts"].OID() ),
BSON( "$set" << BSON( "state" << 0 ) ) );
@ -965,8 +908,7 @@ namespace mongo {
}
warning() << "distributed lock '" << lockName << "' couldn't consummate unlock request. "
<< "lock may be taken over after "
<< (_lockTimeout > 0 ? _lockTimeout / (60 * 1000) : _takeoverMinutes)
<< "lock may be taken over after " << ( _lockTimeout / (60 * 1000) )
<< " minutes timeout." << endl;
}

View File

@ -82,7 +82,7 @@ namespace mongo {
* @param legacy use legacy logic
*
*/
DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout = 0, bool asProcess = false, bool legacy = false);
DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout = 0, bool asProcess = false );
~DistributedLock(){};
/**
@ -132,26 +132,20 @@ namespace mongo {
*/
static const string locksNS;
private:
ConnectionString _conn;
string _name;
// TODO: This shouldn't be a field, just constant?
string _ns;
BSONObj _id;
const ConnectionString _conn;
const string _name;
const BSONObj _id;
const string _processId;
// Timeout for lock, usually LOCK_TIMEOUT
unsigned long long _lockTimeout;
// Deprecated
unsigned _takeoverMinutes;
unsigned long long _maxClockSkew;
unsigned long long _maxNetSkew;
unsigned long long _lockPing;
const unsigned long long _lockTimeout;
const unsigned long long _maxClockSkew;
const unsigned long long _maxNetSkew;
const unsigned long long _lockPing;
private:
// Data from last check of process with ping time
boost::tuple<string, Date_t, Date_t, OID> _lastPingCheck;
// Process id, in case we need to customize this
string _processId;
// May or may not exist, depending on startup
string _threadId;

View File

@ -90,7 +90,7 @@ namespace mongo {
BSONObjBuilder& result, bool) {
Timer t;
DistributedLock lk(ConnectionString(cmdObj["host"].String(),
ConnectionString::SYNC), "testdistlockwithsync", 0, 0, true);
ConnectionString::SYNC), "testdistlockwithsync", 0, 0);
current = &lk;
count = 0;
gotit = 0;
@ -183,9 +183,6 @@ namespace mongo {
int threadSleep = (int) number_field(cmdObj, "threadSleep", 30);
if(threadSleep <= 0) threadSleep = 1;
// (Legacy) how long until the lock is forced in mins, measured locally
int takeoverMins = (int) number_field(cmdObj, "takeoverMins", 0);
// How long until the lock is forced in ms, only compared locally
unsigned long long takeoverMS = (unsigned long long) number_field(cmdObj, "takeoverMS", 0);
@ -201,7 +198,6 @@ namespace mongo {
int skew = 0;
bool legacy = (takeoverMins > 0);
if (!lock.get()) {
// Pick a skew, but the first two threads skew the whole range
@ -216,11 +212,11 @@ namespace mongo {
log() << "Initializing lock with skew of " << skew << " for thread " << threadId << endl;
lock.reset(new DistributedLock(hostConn, lockName, legacy ? (unsigned long long)takeoverMins : takeoverMS, true, legacy));
lock.reset(new DistributedLock(hostConn, lockName, takeoverMS, true ));
log() << "Skewed time " << jsTime() << " for thread " << threadId << endl
<< " max wait (with lock: " << threadWait << ", after lock: " << threadSleep << ")" << endl
<< " takeover in " << (legacy ? (unsigned long long)takeoverMins : takeoverMS) << (legacy ? " (mins local)" : "(ms remote)") << endl;
<< " takeover in " << takeoverMS << "(ms remote)" << endl;
}
@ -235,22 +231,16 @@ namespace mongo {
log() << "**** Locked for thread " << threadId << " with ts " << lockObj["ts"] << endl;
// Legacy locks are not always guaranteed re-entry since they may not have a valid ping yet,
// but don't use this feature anyway
if( ! legacy ) {
if( count % 2 == 1 && ! myLock->lock_try( "Testing lock re-entry.", true ) ) {
errors = true;
log() << "**** !Could not re-enter lock already held" << endl;
break;
}
if( count % 3 == 1 && myLock->lock_try( "Testing lock non-re-entry.", false ) ) {
errors = true;
log() << "**** !Invalid lock re-entry" << endl;
break;
}
if( count % 2 == 1 && ! myLock->lock_try( "Testing lock re-entry.", true ) ) {
errors = true;
log() << "**** !Could not re-enter lock already held" << endl;
break;
}
if( count % 3 == 1 && myLock->lock_try( "Testing lock non-re-entry.", false ) ) {
errors = true;
log() << "**** !Invalid lock re-entry" << endl;
break;
}
count++;
@ -289,7 +279,7 @@ namespace mongo {
result << "errors" << errors
<< "skew" << skew
<< "takeover" << (long long) (legacy ? takeoverMS : (unsigned long long)takeoverMins)
<< "takeover" << (long long) takeoverMS
<< "localTimeout" << (takeoverMS > 0);
}

View File

@ -1,70 +0,0 @@
// Test the (legacy) distributed lock with significant time skewing.
// Negative test does not always pass, so turned off, but useful to have around.
test = new SyncCCTest( "sync5-", { logpath : "/dev/null" } )
var commandConn = startMongodTest( 30000 + 4, "syncCommander", false, {})//{ logpath : "/dev/null" } )
// { logpath : "/data/db/syncCommander/mongod.log" } );
for ( var i = 0; i < 10; i++ ) {
command = { _testDistLockWithSkew : 1 }
command["lockName"] = "TimeSkewTest_lock_" + i;
command["host"] = test.url
command["seed"] = i
command["numThreads"] = ( i % 50 ) + 1
command["wait"] = 1000
// Use the legacy logic with 15 mins per lock.
command["takeoverMins"] = 15
// Amount of time to hold the lock
command["threadWait"] = ( ( i + 1 ) * 10 ) % 200
// Amount of total clock skew possible between locking threads (processes)
// Must be much less than the takeover minutes
// TODO: Put divisor back to 2, if possible
command["skewRange"] = ( command["takeoverMins"] / 5 ) * 60 * 1000
result = commandConn.getDB( "admin" ).runCommand( command )
printjson( result )
printjson( command )
assert( result.ok, "Skewed threads did not increment correctly." );
}
if ( false ) {
// Test that the distributed lock fails with more skew.
var badTakeover = false;
for ( var i = 0; i < 10 && !badTakeover; i++ ) {
command = { _testDistLockWithSkew : 1 }
command["host"] = test.url
command["seed"] = i
command["numThreads"] = ( i % 50 ) + 1
command["wait"] = 1000
// Use the legacy logic with 15 mins per lock.
command["takeoverMins"] = 15
// Amount of time to hold the lock
command["threadWait"] = ( ( i + 1 ) * 10 ) % 200
// Amount of total clock skew possible between locking threads
// (processes)
// Must be much less than the takeover minutes
command["skewRange"] = ( command["takeoverMins"] * 2 ) * 60 * 1000
command["localTimeout"] = true
result = commandConn.getDB( "admin" ).runCommand( command )
printjson( result )
printjson( command )
badTakeover = !result.ok;
}
assert( badTakeover, "Skewed threads did not interfere with one another." );
}
test.stop();

View File

@ -26,7 +26,7 @@
killcursors
*/
#include "../pch.h"
#include <pcap.h>
#ifdef _WIN32