2010-07-12 19:07:32 -04:00
// @file distlock.h
2010-07-12 17:53:02 -04:00
/* Copyright 2009 10gen Inc.
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2010-07-12 19:07:32 -04:00
# include "pch.h"
2010-07-12 17:53:02 -04:00
# include "dbclient.h"
# include "distlock.h"
namespace mongo {
2010-07-13 00:56:08 -04:00
2011-03-18 02:54:24 -04:00
LabeledLevel DistributedLock : : logLvl ( 1 ) ;
2011-03-04 16:54:37 -05:00
2010-07-13 00:56:08 -04:00
ThreadLocalValue < string > distLockIds ( " " ) ;
2010-08-19 12:54:28 -04:00
/* ==================
* Module initialization
*/
boost : : once_flag _init = BOOST_ONCE_INIT ;
static string * _cachedProcessString = NULL ;
static void initModule ( ) {
// cache process string
stringstream ss ;
2011-03-11 17:29:22 -05:00
ss < < getHostName ( ) < < " : " < < cmdLine . port < < " : " < < time ( 0 ) < < " : " < < rand ( ) ;
2010-08-19 12:54:28 -04:00
_cachedProcessString = new string ( ss . str ( ) ) ;
}
/* =================== */
2011-01-04 00:40:41 -05:00
string getDistLockProcess ( ) {
2010-08-20 19:52:39 -04:00
boost : : call_once ( initModule , _init ) ;
2010-08-19 12:54:28 -04:00
assert ( _cachedProcessString ) ;
return * _cachedProcessString ;
2010-07-13 01:21:08 -04:00
}
2010-07-13 00:56:08 -04:00
2011-01-04 00:40:41 -05:00
string getDistLockId ( ) {
2010-07-13 00:56:08 -04:00
string s = distLockIds . get ( ) ;
2011-01-04 00:40:41 -05:00
if ( s . empty ( ) ) {
2010-07-13 00:56:08 -04:00
stringstream ss ;
2010-07-13 01:21:08 -04:00
ss < < getDistLockProcess ( ) < < " : " < < getThreadName ( ) < < " : " < < rand ( ) ;
2010-07-13 00:56:08 -04:00
s = ss . str ( ) ;
distLockIds . set ( s ) ;
}
return s ;
}
2011-01-04 00:40:41 -05:00
2010-08-12 09:50:29 -04:00
2011-02-15 18:08:59 -05:00
class DistributedLockPinger {
public :
2011-03-04 16:54:37 -05:00
2011-02-15 18:08:59 -05:00
DistributedLockPinger ( )
: _mutex ( " DistributedLockPinger " ) {
}
void _distLockPingThread ( ConnectionString addr , string process , unsigned long long sleepTime ) {
2011-03-04 16:54:37 -05:00
2011-02-15 18:08:59 -05:00
setThreadName ( " LockPinger " ) ;
2011-03-04 16:54:37 -05:00
string pingId = pingThreadId ( addr , process ) ;
log ( DistributedLock : : logLvl - 1 ) < < " creating distributed lock ping thread for " < < addr
< < " and process " < < process
< < " (sleeping for " < < sleepTime < < " ms) " < < endl ;
2010-08-11 16:46:37 -04:00
2011-02-15 18:08:59 -05:00
static int loops = 0 ;
2011-03-01 16:56:58 -05:00
while ( ! inShutdown ( ) & & ! shouldKill ( addr , process ) ) {
2010-08-11 16:46:37 -04:00
2011-03-04 16:54:37 -05:00
log ( DistributedLock : : logLvl + 2 ) < < " distributed lock pinger ' " < < pingId < < " ' about to ping. " < < endl ;
2011-02-15 18:08:59 -05:00
Date_t pingTime ;
2010-10-01 15:00:19 -04:00
2011-02-15 18:08:59 -05:00
try {
ScopedDbConnection conn ( addr ) ;
pingTime = jsTime ( ) ;
// refresh the entry corresponding to this process in the lockpings collection
conn - > update ( DistributedLock : : lockPingNS ,
BSON ( " _id " < < process ) ,
BSON ( " $set " < < BSON ( " ping " < < pingTime ) ) ,
true ) ;
2011-03-04 16:54:37 -05:00
2011-02-15 18:08:59 -05:00
string err = conn - > getLastError ( ) ;
if ( ! err . empty ( ) ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " pinging failed for distributed lock pinger ' " < < pingId < < " '. "
< < m_error_message ( err ) < < endl ;
2011-02-15 18:08:59 -05:00
conn . done ( ) ;
2011-03-04 16:54:37 -05:00
// Sleep for normal ping time
sleepmillis ( sleepTime ) ;
2011-02-15 18:08:59 -05:00
continue ;
}
// remove really old entries from the lockpings collection if they're not holding a lock
// (this may happen if an instance of a process was taken down and no new instance came up to
// replace it for a quite a while)
// if the lock is taken, the take-over mechanism should handle the situation
auto_ptr < DBClientCursor > c = conn - > query ( DistributedLock : : locksNS , BSONObj ( ) ) ;
vector < string > pids ;
while ( c - > more ( ) ) {
BSONObj lock = c - > next ( ) ;
if ( ! lock [ " process " ] . eoo ( ) ) {
pids . push_back ( lock [ " process " ] . valuestrsafe ( ) ) ;
}
}
Date_t fourDays = pingTime - ( 4 * 86400 * 1000 ) ; // 4 days
conn - > remove ( DistributedLock : : lockPingNS , BSON ( " _id " < < BSON ( " $nin " < < pids ) < < " ping " < < LT < < fourDays ) ) ;
err = conn - > getLastError ( ) ;
if ( ! err . empty ( ) ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " ping cleanup for distributed lock pinger ' " < < pingId < < " failed. "
< < m_error_message ( err ) < < endl ;
2011-02-15 18:08:59 -05:00
conn . done ( ) ;
2011-03-04 16:54:37 -05:00
// Sleep for normal ping time
sleepmillis ( sleepTime ) ;
2011-02-15 18:08:59 -05:00
continue ;
}
// create index so remove is fast even with a lot of servers
if ( loops + + = = 0 ) {
conn - > ensureIndex ( DistributedLock : : lockPingNS , BSON ( " ping " < < 1 ) ) ;
2010-11-11 14:35:18 -05:00
}
2010-10-01 16:51:09 -04:00
conn . done ( ) ;
2011-03-04 16:54:37 -05:00
log ( DistributedLock : : logLvl - ( loops % 10 = = 0 ? 1 : 0 ) ) < < " cluster " < < addr < < " pinged successfully at " < < pingTime
< < " by distributed lock pinger ' " < < pingId
< < " ', sleeping for " < < sleepTime < < " ms " < < endl ;
2010-10-01 15:00:19 -04:00
}
2011-02-15 18:08:59 -05:00
catch ( std : : exception & e ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " distributed lock pinger ' " < < pingId < < " ' detected an exception while pinging. "
< < m_error_message ( e . what ( ) ) < < endl ;
2010-07-13 01:21:08 -04:00
}
2011-01-04 00:40:41 -05:00
2011-02-15 18:08:59 -05:00
sleepmillis ( sleepTime ) ;
2010-07-13 00:56:08 -04:00
}
2010-08-11 16:46:37 -04:00
2011-03-04 16:54:37 -05:00
warning ( ) < < " removing distributed lock ping thread ' " < < pingId < < " ' " < < endl ;
2011-02-15 18:08:59 -05:00
2011-03-01 16:56:58 -05:00
if ( shouldKill ( addr , process ) )
2011-02-15 18:08:59 -05:00
finishKill ( addr , process ) ;
2011-01-04 00:40:41 -05:00
2011-02-14 16:21:12 -05:00
}
2011-02-15 18:08:59 -05:00
void distLockPingThread ( ConnectionString addr , long long clockSkew , string processId , unsigned long long sleepTime ) {
try {
jsTimeVirtualThreadSkew ( clockSkew ) ;
_distLockPingThread ( addr , processId , sleepTime ) ;
}
catch ( std : : exception & e ) {
2011-03-04 16:54:37 -05:00
error ( ) < < " unexpected error while running distributed lock pinger for " < < addr < < " , process " < < processId < < m_error_message ( e . what ( ) ) < < endl ;
2011-02-15 18:08:59 -05:00
}
catch ( . . . ) {
2011-03-04 16:54:37 -05:00
error ( ) < < " unknown error while running distributed lock pinger for " < < addr < < " , process " < < processId < < endl ;
2011-02-15 18:08:59 -05:00
}
2011-02-14 16:21:12 -05:00
}
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
string pingThreadId ( const ConnectionString & conn , const string & processId ) {
2011-03-01 16:56:58 -05:00
return conn . toString ( ) + " / " + processId ;
}
2011-02-15 18:08:59 -05:00
2011-03-01 16:56:58 -05:00
string got ( DistributedLock & lock , unsigned long long sleepTime ) {
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// Make sure we don't start multiple threads for a process id
2011-02-15 18:08:59 -05:00
scoped_lock lk ( _mutex ) ;
const ConnectionString & conn = lock . getRemoteConnection ( ) ;
const string & processId = lock . getProcessId ( ) ;
2011-03-01 16:56:58 -05:00
string s = pingThreadId ( conn , processId ) ;
2011-02-15 18:08:59 -05:00
// Ignore if we already have a pinging thread for this process.
2011-03-01 16:56:58 -05:00
if ( _seen . count ( s ) > 0 ) return " " ;
2011-02-15 18:08:59 -05:00
// Check our clock skew
try {
if ( lock . isRemoteTimeSkewed ( ) ) {
2011-03-04 16:54:37 -05:00
m_throw_exception ( 13650 , LockException , " clock skew of the cluster " < < conn . toString ( ) < < " is too far out of bounds to allow distributed locking. " ) ;
2011-02-15 18:08:59 -05:00
}
}
catch ( LockException & e ) {
2011-03-04 16:54:37 -05:00
m_chain_exception ( 13651 , e , LockException , " error checking clock skew of cluster " < < conn . toString ( ) ) ;
2011-02-15 18:08:59 -05:00
}
boost : : thread t ( boost : : bind ( & DistributedLockPinger : : distLockPingThread , this , conn , getJSTimeVirtualThreadSkew ( ) , processId , sleepTime ) ) ;
_seen . insert ( s ) ;
2011-03-01 16:56:58 -05:00
return s ;
2011-02-14 16:21:12 -05:00
}
2011-03-04 16:54:37 -05:00
void kill ( ConnectionString & conn , string & processId ) {
2011-02-15 18:08:59 -05:00
// Make sure we're in a consistent state before other threads can see us
scoped_lock lk ( _mutex ) ;
2011-02-14 16:21:12 -05:00
2011-03-01 16:56:58 -05:00
string pingId = pingThreadId ( conn , processId ) ;
assert ( _seen . count ( pingId ) > 0 ) ;
_kill . insert ( pingId ) ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
bool shouldKill ( ConnectionString & conn , string & processId ) {
2011-03-01 16:56:58 -05:00
return _kill . count ( pingThreadId ( conn , processId ) ) > 0 ;
2010-07-13 00:56:08 -04:00
}
2011-01-04 00:40:41 -05:00
2011-03-04 16:54:37 -05:00
void finishKill ( ConnectionString & conn , string & processId ) {
2011-02-15 18:08:59 -05:00
// Make sure we're in a consistent state before other threads can see us
2010-07-13 00:56:08 -04:00
scoped_lock lk ( _mutex ) ;
2011-02-15 18:08:59 -05:00
2011-03-01 16:56:58 -05:00
string pingId = pingThreadId ( conn , processId ) ;
_kill . erase ( pingId ) ;
_seen . erase ( pingId ) ;
2011-02-15 18:08:59 -05:00
2010-07-13 00:56:08 -04:00
}
2011-01-04 00:40:41 -05:00
2011-03-01 16:56:58 -05:00
set < string > _kill ;
2010-07-13 00:56:08 -04:00
set < string > _seen ;
mongo : : mutex _mutex ;
2011-01-04 00:40:41 -05:00
2010-07-13 00:56:08 -04:00
} distLockPinger ;
2011-01-04 00:40:41 -05:00
2011-02-15 18:08:59 -05:00
const string DistributedLock : : lockPingNS = " config.lockpings " ;
const string DistributedLock : : locksNS = " config.locks " ;
/**
* Create a new distributed lock , potentially with a custom sleep and takeover time . If a custom sleep time is
* specified ( time between pings )
*/
DistributedLock : : DistributedLock ( const ConnectionString & conn , const string & name , unsigned long long lockTimeout , bool asProcess , bool legacy )
: _conn ( conn ) , _name ( name ) , _lockTimeout ( lockTimeout ) , _takeoverMinutes ( 0 ) , _lastPingCheck ( string ( " " ) , ( mongo : : Date_t ) 0 , ( mongo : : Date_t ) 0 ) {
2010-07-13 00:56:08 -04:00
_id = BSON ( " _id " < < name ) ;
2011-02-15 18:08:59 -05:00
_ns = locksNS ;
_maxClockSkew = 0 ;
_maxNetSkew = 0 ;
_lockPing = 0 ;
// If this is a legacy lock, set our takeover minutes for local time comparisons
if ( legacy ) {
2011-03-01 20:35:49 -05:00
_takeoverMinutes = ( unsigned ) _lockTimeout ;
2011-02-15 18:08:59 -05:00
if ( _takeoverMinutes = = 0 ) _takeoverMinutes = 15 ;
_lockTimeout = 0 ;
_lockPing = _maxClockSkew = _maxNetSkew = ( _takeoverMinutes * 60 * 1000 ) / LOCK_SKEW_FACTOR ;
}
else {
if ( lockTimeout = = 0 ) _lockTimeout = lockTimeout = LOCK_TIMEOUT ;
//if(lockPing == 0) lockPing = LOCK_PING;
_lockPing = _maxClockSkew = _maxNetSkew = _lockTimeout / LOCK_SKEW_FACTOR ;
}
// If we're emulating a new process for this lock, generate a processId
if ( asProcess ) _processId = getDistLockId ( ) ;
else _processId = getDistLockProcess ( ) ;
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " created new distributed lock for " < < name < < " on " < < conn
< < " ( lock timeout : " < < _lockTimeout < < " , legacy timeout : " < < _takeoverMinutes
< < " , ping interval : " < < _lockPing < < " , process : " < < asProcess
< < " , legacy : " < < legacy < < " ) " < < endl ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
Date_t DistributedLock : : getRemoteTime ( ) {
return DistributedLock : : remoteTime ( _conn , _maxNetSkew ) ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
bool DistributedLock : : isRemoteTimeSkewed ( ) {
return ! DistributedLock : : checkSkew ( _conn , NUM_LOCK_SKEW_CHECKS , _maxClockSkew , _maxNetSkew ) ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
const ConnectionString & DistributedLock : : getRemoteConnection ( ) {
return _conn ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
const string & DistributedLock : : getProcessId ( ) {
return _processId ;
2011-02-15 18:08:59 -05:00
}
/**
* Returns the remote time as reported by the cluster or server . The maximum difference between the reported time
* and the actual time on the remote server ( at the completion of the function ) is the maxNetSkew
*/
Date_t DistributedLock : : remoteTime ( const ConnectionString & cluster , unsigned long long maxNetSkew ) {
ConnectionString server ( * cluster . getServers ( ) . begin ( ) ) ;
ScopedDbConnection conn ( server ) ;
BSONObj result ;
long long delay ;
try {
Date_t then = jsTime ( ) ;
bool success = conn - > runCommand ( string ( " admin " ) , BSON ( " serverStatus " < < 1 ) , result ) ;
delay = jsTime ( ) - then ;
if ( ! success )
2011-03-04 16:54:37 -05:00
m_throw_exception ( 13647 , TimeNotFoundException , " could not get status from server " < < server . toString ( ) < < " in cluster " < < cluster . toString ( ) < < " to check time " ) ;
2011-02-15 18:08:59 -05:00
// Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote
// time value can be off by if we assume a response in the middle of the delay.
if ( delay > ( long long ) ( maxNetSkew * 2 ) )
2011-03-04 16:54:37 -05:00
m_throw_exception ( 13648 , TimeNotFoundException , " server " < < server . toString ( ) < < " in cluster " < < cluster . toString ( ) < < " did not respond within max network delay of " < < maxNetSkew < < " ms " ) ;
2011-02-15 18:08:59 -05:00
}
catch ( . . . ) {
conn . done ( ) ;
throw ;
}
conn . done ( ) ;
return result [ " localTime " ] . Date ( ) - ( delay / 2 ) ;
2010-07-13 00:56:08 -04:00
}
2011-02-15 18:08:59 -05:00
bool DistributedLock : : checkSkew ( const ConnectionString & cluster , unsigned skewChecks , unsigned long long maxClockSkew , unsigned long long maxNetSkew ) {
vector < HostAndPort > servers = cluster . getServers ( ) ;
if ( servers . size ( ) < 1 ) return true ;
vector < long long > avgSkews ;
for ( unsigned i = 0 ; i < skewChecks ; i + + ) {
// Find the average skew for each server
unsigned s = 0 ;
for ( vector < HostAndPort > : : iterator si = servers . begin ( ) ; si ! = servers . end ( ) ; + + si , s + + ) {
if ( i = = 0 ) avgSkews . push_back ( 0 ) ;
// Could check if this is self, but shouldn't matter since local network connection should be fast.
ConnectionString server ( * si ) ;
vector < long long > skew ;
BSONObj result ;
Date_t remote = remoteTime ( server , maxNetSkew ) ;
Date_t local = jsTime ( ) ;
// Remote time can be delayed by at most MAX_NET_SKEW
// Skew is how much time we'd have to add to local to get to remote
avgSkews [ s ] + = ( long long ) ( remote - local ) ;
2011-03-04 16:54:37 -05:00
log ( logLvl + 1 ) < < " skew from remote server " < < server < < " found: " < < ( long long ) ( remote - local ) < < endl ;
2011-02-15 18:08:59 -05:00
}
}
// Analyze skews
long long serverMaxSkew = 0 ;
long long serverMinSkew = 0 ;
for ( unsigned s = 0 ; s < avgSkews . size ( ) ; s + + ) {
long long avgSkew = ( avgSkews [ s ] / = skewChecks ) ;
// Keep track of max and min skews
if ( s = = 0 ) {
serverMaxSkew = avgSkew ;
serverMinSkew = avgSkew ;
}
else {
if ( avgSkew > serverMaxSkew )
serverMaxSkew = avgSkew ;
if ( avgSkew < serverMinSkew )
serverMinSkew = avgSkew ;
}
}
long long totalSkew = serverMaxSkew - serverMinSkew ;
// Make sure our max skew is not more than our pre-set limit
2011-03-04 16:54:37 -05:00
if ( totalSkew > ( long long ) maxClockSkew ) {
log ( logLvl + 1 ) < < " total clock skew of " < < totalSkew < < " ms for servers " < < cluster < < " is out of " < < maxClockSkew < < " ms bounds. " < < endl ;
return false ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
log ( logLvl + 1 ) < < " total clock skew of " < < totalSkew < < " ms for servers " < < cluster < < " is in " < < maxClockSkew < < " ms bounds. " < < endl ;
2011-02-15 18:08:59 -05:00
return true ;
}
2011-03-01 16:56:58 -05:00
// For use in testing, ping thread should run indefinitely in practice.
2011-03-04 16:54:37 -05:00
bool DistributedLock : : killPinger ( DistributedLock & lock ) {
if ( lock . _threadId = = " " ) return false ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
distLockPinger . kill ( lock . _conn , lock . _processId ) ;
return true ;
2011-02-15 18:08:59 -05:00
}
2011-01-04 00:40:41 -05:00
2011-03-22 22:27:16 -04:00
// Semantics of this method are basically that if the lock cannot be acquired, returns false, can be retried.
// If the lock should not be tried again (some unexpected error) a LockException is thrown
2011-01-04 00:40:41 -05:00
bool DistributedLock : : lock_try ( string why , BSONObj * other ) {
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// TODO: Start pinging only when we actually get the lock?
// If we don't have a thread pinger, make sure we shouldn't have one
if ( _threadId = = " " )
_threadId = distLockPinger . got ( * this , _lockPing ) ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// This should always be true, if not, we are using the lock incorrectly.
assert ( _name ! = " " ) ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// write to dummy if 'other' is null
2011-01-04 00:40:41 -05:00
BSONObj dummyOther ;
2010-10-26 15:25:26 -04:00
if ( other = = NULL )
other = & dummyOther ;
2010-07-12 17:53:02 -04:00
ScopedDbConnection conn ( _conn ) ;
2011-01-04 00:40:41 -05:00
2010-07-12 17:53:02 -04:00
BSONObjBuilder queryBuilder ;
queryBuilder . appendElements ( _id ) ;
2011-01-04 00:40:41 -05:00
queryBuilder . append ( " state " , 0 ) ;
2010-07-12 17:53:02 -04:00
2011-01-04 00:40:41 -05:00
{
// make sure its there so we can use simple update logic below
2011-02-17 19:59:33 -05:00
BSONObj o = conn - > findOne ( _ns , _id ) . getOwned ( ) ;
2011-03-04 16:54:37 -05:00
// Case 1: No locks
2011-01-04 00:40:41 -05:00
if ( o . isEmpty ( ) ) {
2010-07-12 17:53:02 -04:00
try {
2011-03-04 16:54:37 -05:00
log ( logLvl ) < < " inserting initial doc in " < < _ns < < " for lock " < < _name < < endl ;
2010-07-12 17:53:02 -04:00
conn - > insert ( _ns , BSON ( " _id " < < _name < < " state " < < 0 < < " who " < < " " ) ) ;
}
2011-01-04 00:40:41 -05:00
catch ( UserException & e ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " could not insert initial doc for distributed lock " < < _name < < m_caused_by ( e ) < < endl ;
2010-07-12 17:53:02 -04:00
}
}
2011-03-04 16:54:37 -05:00
// Case 2: A set lock that we might be able to force
2011-01-04 00:40:41 -05:00
else if ( o [ " state " ] . numberInt ( ) > 0 ) {
2011-02-15 18:08:59 -05:00
string lockName = o [ " _id " ] . String ( ) + string ( " / " ) + o [ " process " ] . String ( ) ;
2010-07-13 01:21:08 -04:00
BSONObj lastPing = conn - > findOne ( lockPingNS , o [ " process " ] . wrap ( " _id " ) ) ;
2011-01-04 00:40:41 -05:00
if ( lastPing . isEmpty ( ) ) {
2011-02-15 18:08:59 -05:00
if ( _lockTimeout > 0 ) {
2011-03-04 16:54:37 -05:00
log ( logLvl ) < < " empty ping found for process in lock ' " < < lockName < < " ' " < < endl ;
2011-02-15 18:08:59 -05:00
// TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
lastPing = BSON ( " _id " < < o [ " process " ] . String ( ) < < " ping " < < ( Date_t ) 0 ) ;
}
else {
2011-03-04 16:54:37 -05:00
// LEGACY
2011-02-15 18:08:59 -05:00
// if a lock is taken but there's no ping for it, we're in an inconsistent situation
// if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed
// but we'd require analysis of the situation before a manual intervention
warning ( ) < < " config.locks: " < < _name < < " lock is taken by old process? "
< < " remove the following lock if the process is not active anymore: " < < o < < endl ;
* other = o ;
other - > getOwned ( ) ;
conn . done ( ) ;
return false ;
}
2010-07-13 01:21:08 -04:00
}
2011-02-15 18:08:59 -05:00
unsigned long long elapsed = 0 ;
unsigned long long takeover = ( _lockTimeout > 0 ? _lockTimeout : _takeoverMinutes ) ;
if ( _lockTimeout > 0 ) {
2011-03-04 16:54:37 -05:00
log ( logLvl ) < < " checking last ping for lock ' " < < lockName < < " ' " < < " against process " < < _lastPingCheck . get < 0 > ( ) < < " and ping " < < _lastPingCheck . get < 1 > ( ) < < endl ;
2011-02-15 18:08:59 -05:00
try {
Date_t remote = remoteTime ( _conn ) ;
// Timeout the elapsed time using comparisons of remote clock
if ( _lastPingCheck . get < 0 > ( ) ! = lastPing [ " _id " ] . String ( ) | | _lastPingCheck . get < 1 > ( ) ! = lastPing [ " ping " ] . Date ( ) ) {
// If the ping has changed since we last checked, mark the current date and time.
_lastPingCheck = make_tuple ( lastPing [ " _id " ] . String ( ) , lastPing [ " ping " ] . Date ( ) , remote ) ;
}
else {
// GOTCHA! Due to network issues, it is possible that the current time
// is less than the remote time. We *have* to check this here, otherwise
// we overflow and our lock breaks.
if ( _lastPingCheck . get < 2 > ( ) > = remote )
2011-03-04 16:54:37 -05:00
elapsed = 0 ;
2011-02-15 18:08:59 -05:00
else
2011-03-04 16:54:37 -05:00
elapsed = remote - _lastPingCheck . get < 2 > ( ) ;
2011-02-15 18:08:59 -05:00
}
}
catch ( LockException & e ) {
// Remote server cannot be found / is not responsive
warning ( ) < < " Could not get remote time from " < < _conn < < m_caused_by ( e ) ;
}
2011-02-17 19:59:33 -05:00
}
2011-02-15 18:08:59 -05:00
else {
2011-03-04 16:54:37 -05:00
// LEGACY
2011-02-15 18:08:59 -05:00
// GOTCHA! If jsTime() (current time) is less than the remote time,
// we should definitely not break the lock. However, if we don't check
// this here, we get an invalid unsigned elapsed, which is ginormous and
// causes the lock to be forced.
if ( lastPing [ " ping " ] . Date ( ) > jsTime ( ) )
elapsed = 0 ;
else
elapsed = jsTime ( ) - lastPing [ " ping " ] . Date ( ) ; // in ms
elapsed = elapsed / ( 1000 * 60 ) ; // convert to minutes
2011-02-16 17:23:16 -05:00
}
2011-02-15 18:08:59 -05:00
if ( elapsed < = takeover ) {
2011-03-04 16:54:37 -05:00
log ( logLvl ) < < " could not force lock ' " < < lockName < < " ' because elapsed time " < < elapsed < < " <= " < < " takeover time " < < takeover < < endl ;
2010-10-26 15:25:26 -04:00
* other = o ;
2010-07-13 18:21:37 -04:00
conn . done ( ) ;
2010-07-13 01:21:08 -04:00
return false ;
2010-07-13 18:21:37 -04:00
}
2011-01-04 00:40:41 -05:00
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " forcing lock ' " < < lockName < < " ' because elapsed time " < < elapsed < < " > " < < " takeover time " < < takeover < < endl ;
2011-02-15 18:08:59 -05:00
try {
// Check the clock skew again. If we check this before we get a lock
// and after the lock times out, we can be pretty sure the time is
// increasing at the same rate on all servers and therefore our
// timeout is accurate
2011-03-04 16:54:37 -05:00
uassert_msg ( 13652 , " remote time in cluster " < < _conn . toString ( ) < < " is now skewed, cannot force lock. " , ! isRemoteTimeSkewed ( ) ) ;
2011-02-15 18:08:59 -05:00
// Make sure we break the lock with the correct "ts" (OID) value, otherwise
// we can overwrite a new lock inserted in the meantime.
2011-03-22 22:27:16 -04:00
conn - > update ( _ns , BSON ( " _id " < < _id [ " _id " ] . String ( ) < < " state " < < o [ " state " ] . numberInt ( ) < < " ts " < < o [ " ts " ] ) ,
BSON ( " $set " < < BSON ( " state " < < 0 ) ) ) ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
2011-02-15 18:08:59 -05:00
// TODO: Clean up all the extra code to exit this method, probably with a refactor
2011-03-04 16:54:37 -05:00
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
( errMsg . empty ( ) ? log ( logLvl - 1 ) : warning ( ) ) < < " Could not force lock ' " < < lockName < < " ' "
< < ( ! errMsg . empty ( ) ? m_error_message ( errMsg ) : string ( " (another force won) " ) ) < < endl ;
2011-02-15 18:08:59 -05:00
* other = o ;
other - > getOwned ( ) ;
conn . done ( ) ;
return false ;
}
}
2011-03-23 16:34:40 -04:00
catch ( UpdateNotTheSame & ) {
2011-03-22 22:27:16 -04:00
// Ok to continue since we know we forced at least one lock document, and all lock docs
// are required for a lock to be held.
warning ( ) < < " lock forcing " < < lockName < < " inconsistent " < < endl ;
2011-03-04 16:54:37 -05:00
}
2011-03-22 22:27:16 -04:00
catch ( std : : exception & e ) {
2010-11-30 12:00:27 -05:00
conn . done ( ) ;
2011-03-22 22:27:16 -04:00
m_throw_exception ( 70000 , LockException ,
" exception forcing distributed lock " < < lockName < < m_error_message ( e . what ( ) ) ) ;
2010-11-30 12:00:27 -05:00
}
2011-02-15 18:08:59 -05:00
// Lock forced, reset our timer
if ( _lockTimeout > 0 )
_lastPingCheck = make_tuple ( string ( " " ) , 0 , 0 ) ;
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " lock ' " < < lockName < < " ' successfully forced " < < endl ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// We don't need the ts value in the query, since we will only ever replace locks with state=0.
2010-07-12 17:53:02 -04:00
}
2011-03-04 16:54:37 -05:00
// Case 3: We have an expired lock
2011-01-04 00:40:41 -05:00
else if ( o [ " ts " ] . type ( ) ) {
2010-07-12 17:53:02 -04:00
queryBuilder . append ( o [ " ts " ] ) ;
}
}
2011-01-04 00:40:41 -05:00
2010-07-12 17:53:02 -04:00
bool gotLock = false ;
2011-03-04 16:54:37 -05:00
BSONObj currLock ;
2010-08-11 16:46:37 -04:00
2011-02-15 18:08:59 -05:00
BSONObj lockDetails = BSON ( " state " < < 1 < < " who " < < getDistLockId ( ) < < " process " < < _processId < <
" when " < < jsTime ( ) < < " why " < < why < < " ts " < < OID : : gen ( ) ) ;
2010-08-11 16:46:37 -04:00
BSONObj whatIWant = BSON ( " $set " < < lockDetails ) ;
2011-02-15 18:08:59 -05:00
BSONObj query = queryBuilder . obj ( ) ;
2011-03-04 16:54:37 -05:00
string lockName = _name + string ( " / " ) + _processId ;
2010-07-12 17:53:02 -04:00
try {
2010-08-11 16:46:37 -04:00
2011-03-22 22:27:16 -04:00
// Main codepath to acquire lock
2011-03-04 16:54:37 -05:00
log ( logLvl ) < < " about to acquire distributed lock ' " < < lockName < < " : \n "
< < lockDetails . jsonString ( Strict , true ) < < " \n "
< < query . jsonString ( Strict , true ) < < endl ;
2011-02-15 18:08:59 -05:00
conn - > update ( _ns , query , whatIWant ) ;
2011-01-04 00:40:41 -05:00
2011-03-04 16:54:37 -05:00
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
2011-01-04 00:40:41 -05:00
2011-03-04 16:54:37 -05:00
currLock = conn - > findOne ( _ns , _id ) ;
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
( errMsg . empty ( ) ? log ( logLvl - 1 ) : warning ( ) ) < < " could not acquire lock ' " < < lockName < < " ' "
< < ( ! errMsg . empty ( ) ? m_error_message ( errMsg ) : string ( " (another update won) " ) ) < < endl ;
* other = currLock ;
2010-10-26 15:25:26 -04:00
other - > getOwned ( ) ;
2010-07-12 17:53:02 -04:00
gotLock = false ;
}
else {
gotLock = true ;
}
}
2011-01-04 00:40:41 -05:00
catch ( UpdateNotTheSame & up ) {
2011-03-22 22:27:16 -04:00
2010-07-12 17:53:02 -04:00
// this means our update got through on some, but not others
2011-03-20 22:58:15 -07:00
warning ( ) < < " distributed lock ' " < < lockName < < " did not propagate properly. " < < m_caused_by ( up ) < < endl ;
2010-07-12 17:53:02 -04:00
2011-03-22 22:27:16 -04:00
// Overall protection derives from:
// All unlocking updates use the ts value when setting state to 0
// This ensures that during locking, we can override all smaller ts locks with
// our own safe ts value and not be unlocked afterward.
for ( unsigned i = 0 ; i < up . size ( ) ; i + + ) {
2010-07-12 17:53:02 -04:00
2011-03-04 16:54:37 -05:00
ScopedDbConnection indDB ( up [ i ] . first ) ;
2011-03-22 22:27:16 -04:00
BSONObj indUpdate ;
try {
indUpdate = indDB - > findOne ( _ns , _id ) ;
// If we override this lock in any way, grab and protect it.
// We assume/ensure that if a process does not have all lock documents, it is no longer
// holding the lock.
if ( indUpdate [ " ts " ] < lockDetails [ " ts " ] | | indUpdate [ " state " ] . numberInt ( ) = = 0 ) {
BSONObj grabQuery = BSON ( " _id " < < _id [ " _id " ] . String ( ) < < " ts " < < indUpdate [ " ts " ] . OID ( ) ) ;
// Change ts so we won't be forced, state so we won't be relocked
BSONObj grabChanges = BSON ( " ts " < < lockDetails [ " ts " ] . OID ( ) < < " state " < < 1 ) ;
2011-03-04 16:54:37 -05:00
2011-03-22 22:27:16 -04:00
// Either our update will succeed, and we'll grab the lock, or it will fail b/c some other
// process grabbed the lock (which will change the ts), but the lock will be set until forcing
indDB - > update ( _ns , grabQuery , BSON ( " $set " < < grabChanges ) ) ;
indUpdate = indDB - > findOne ( _ns , _id ) ;
// Our lock should now be set until forcing.
assert ( indUpdate [ " state " ] . numberInt ( ) = = 1 ) ;
}
// else our lock is the same, in which case we're safe, or it's a bigger lock,
// in which case we won't need to protect anything since we won't have the lock.
}
catch ( std : : exception & e ) {
conn . done ( ) ;
m_throw_exception ( 70000 , LockException ,
" distributed lock " < < lockName < < " had errors communicating with individual server " < < up [ 1 ] . first
< < m_error_message ( e . what ( ) ) ) ;
}
assert ( ! indUpdate . isEmpty ( ) ) ;
// Find max TS value
2011-03-04 16:54:37 -05:00
if ( currLock . isEmpty ( ) | | currLock [ " ts " ] < indUpdate [ " ts " ] ) {
currLock = indUpdate . getOwned ( ) ;
2010-07-12 17:53:02 -04:00
}
2011-03-04 16:54:37 -05:00
indDB . done ( ) ;
2010-07-12 17:53:02 -04:00
}
2011-03-23 12:46:36 -04:00
// Locks on all servers are now set and safe until forcing
2011-03-22 22:27:16 -04:00
// No longer need to update currLock when exiting below here
if ( currLock [ " ts " ] = = lockDetails [ " ts " ] ) {
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " lock update won, completing lock propagation for ' " < < lockName < < " ' " < < endl ;
2011-03-22 22:27:16 -04:00
// This is now safe, since we know that no new locks can be placed on top of ours on each ind server.
try {
conn - > update ( _ns , _id , whatIWant ) ;
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
warning ( ) < < " could not finalize winning lock " < < lockName
< < ( ! errMsg . empty ( ) ? m_error_message ( errMsg ) : " (did not update lock) " ) < < endl ;
gotLock = false ;
}
else {
// SUCCESS!
gotLock = true ;
}
}
catch ( std : : exception & e ) {
conn . done ( ) ;
m_throw_exception ( 70000 , LockException ,
" exception finalizing winning lock " < < m_error_message ( e . what ( ) ) ) ;
}
2010-07-12 17:53:02 -04:00
}
else {
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " lock update lost, lock ' " < < lockName < < " ' not propagated. " < < endl ;
2010-07-12 17:53:02 -04:00
gotLock = false ;
}
}
2011-03-22 22:27:16 -04:00
catch ( std : : exception & e ) {
conn . done ( ) ;
m_throw_exception ( 70000 , LockException ,
" exception creating distributed lock " < < lockName < < m_error_message ( e . what ( ) ) ) ;
}
2011-01-04 00:40:41 -05:00
2011-03-04 16:54:37 -05:00
if ( gotLock )
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' acquired, now : " < < currLock < < endl ;
else
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' was not acquired. " < < endl ;
2011-02-15 18:08:59 -05:00
2010-07-12 17:53:02 -04:00
conn . done ( ) ;
2011-01-04 00:40:41 -05:00
2010-08-12 11:21:34 -04:00
return gotLock ;
2010-07-12 17:53:02 -04:00
}
2011-01-04 00:40:41 -05:00
void DistributedLock : : unlock ( ) {
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
assert ( _name ! = " " ) ;
string lockName = _name + string ( " / " ) + _processId ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
const int maxAttempts = 3 ;
2010-08-05 11:52:14 -04:00
int attempted = 0 ;
2011-03-22 22:27:16 -04:00
BSONObj oldLock ;
2010-08-05 11:52:14 -04:00
while ( + + attempted < = maxAttempts ) {
2011-03-22 22:27:16 -04:00
ScopedDbConnection conn ( _conn ) ;
2010-08-05 11:52:14 -04:00
try {
2011-03-22 22:27:16 -04:00
if ( oldLock . isEmpty ( ) )
oldLock = conn - > findOne ( _ns , _id ) ;
2010-08-05 11:52:14 -04:00
2011-03-22 22:27:16 -04:00
assert ( oldLock [ " state " ] . numberInt ( ) = = 1 ) ;
assert ( ! oldLock [ " ts " ] . eoo ( ) ) ;
// Use ts when updating lock, so that new locks can be sure they won't get trampled.
conn - > update ( _ns ,
BSON ( " _id " < < _id [ " _id " ] . String ( ) < < " ts " < < oldLock [ " ts " ] . OID ( ) ) ,
BSON ( " $set " < < BSON ( " state " < < 0 ) ) ) ;
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' unlocked. " < < endl ;
2011-03-04 16:54:37 -05:00
conn . done ( ) ;
return ;
2011-03-22 22:27:16 -04:00
}
2011-03-28 16:10:30 -04:00
catch ( UpdateNotTheSame & ) {
2011-03-22 22:27:16 -04:00
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' unlocked (messily). " < < endl ;
conn . done ( ) ;
break ;
2011-01-04 00:40:41 -05:00
}
catch ( std : : exception & e ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " distributed lock ' " < < lockName < < " ' failed unlock attempt. "
< < m_error_message ( e . what ( ) ) < < endl ;
2010-07-12 17:53:02 -04:00
2011-03-22 22:27:16 -04:00
conn . done ( ) ;
// TODO: If our lock timeout is small, sleeping this long may be unsafe.
if ( attempted ! = maxAttempts ) sleepsecs ( 1 < < attempted ) ;
2010-08-05 11:52:14 -04:00
}
}
2011-03-04 16:54:37 -05:00
warning ( ) < < " distributed lock ' " < < lockName < < " ' couldn't consummate unlock request. "
2011-03-22 22:27:16 -04:00
< < " lock may be taken over after "
2011-03-04 16:54:37 -05:00
< < ( _lockTimeout > 0 ? _lockTimeout / ( 60 * 1000 ) : _takeoverMinutes )
< < " minutes timeout. " < < endl ;
2010-08-05 11:52:14 -04:00
}
2010-07-12 17:53:02 -04:00
2011-02-15 18:08:59 -05:00
2010-07-12 17:53:02 -04:00
}