2010-07-12 19:07:32 -04:00
// @file distlock.h
2010-07-12 17:53:02 -04:00
/* Copyright 2009 10gen Inc.
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2010-07-12 19:07:32 -04:00
# include "pch.h"
2010-07-12 17:53:02 -04:00
# include "dbclient.h"
# include "distlock.h"
namespace mongo {
2010-07-13 00:56:08 -04:00
2011-03-18 02:54:24 -04:00
LabeledLevel DistributedLock : : logLvl ( 1 ) ;
2011-03-04 16:54:37 -05:00
2010-07-13 00:56:08 -04:00
ThreadLocalValue < string > distLockIds ( " " ) ;
2010-08-19 12:54:28 -04:00
/* ==================
* Module initialization
*/
boost : : once_flag _init = BOOST_ONCE_INIT ;
static string * _cachedProcessString = NULL ;
static void initModule ( ) {
// cache process string
stringstream ss ;
2011-03-11 17:29:22 -05:00
ss < < getHostName ( ) < < " : " < < cmdLine . port < < " : " < < time ( 0 ) < < " : " < < rand ( ) ;
2010-08-19 12:54:28 -04:00
_cachedProcessString = new string ( ss . str ( ) ) ;
}
/* =================== */
2011-01-04 00:40:41 -05:00
string getDistLockProcess ( ) {
2010-08-20 19:52:39 -04:00
boost : : call_once ( initModule , _init ) ;
2010-08-19 12:54:28 -04:00
assert ( _cachedProcessString ) ;
return * _cachedProcessString ;
2010-07-13 01:21:08 -04:00
}
2010-07-13 00:56:08 -04:00
2011-01-04 00:40:41 -05:00
string getDistLockId ( ) {
2010-07-13 00:56:08 -04:00
string s = distLockIds . get ( ) ;
2011-01-04 00:40:41 -05:00
if ( s . empty ( ) ) {
2010-07-13 00:56:08 -04:00
stringstream ss ;
2010-07-13 01:21:08 -04:00
ss < < getDistLockProcess ( ) < < " : " < < getThreadName ( ) < < " : " < < rand ( ) ;
2010-07-13 00:56:08 -04:00
s = ss . str ( ) ;
distLockIds . set ( s ) ;
}
return s ;
}
2011-01-04 00:40:41 -05:00
2010-08-12 09:50:29 -04:00
2011-02-15 18:08:59 -05:00
class DistributedLockPinger {
public :
2011-03-04 16:54:37 -05:00
2011-02-15 18:08:59 -05:00
DistributedLockPinger ( )
: _mutex ( " DistributedLockPinger " ) {
}
void _distLockPingThread ( ConnectionString addr , string process , unsigned long long sleepTime ) {
2011-03-04 16:54:37 -05:00
2011-02-15 18:08:59 -05:00
setThreadName ( " LockPinger " ) ;
2011-03-04 16:54:37 -05:00
string pingId = pingThreadId ( addr , process ) ;
log ( DistributedLock : : logLvl - 1 ) < < " creating distributed lock ping thread for " < < addr
< < " and process " < < process
< < " (sleeping for " < < sleepTime < < " ms) " < < endl ;
2010-08-11 16:46:37 -04:00
2011-02-15 18:08:59 -05:00
static int loops = 0 ;
2011-03-01 16:56:58 -05:00
while ( ! inShutdown ( ) & & ! shouldKill ( addr , process ) ) {
2010-08-11 16:46:37 -04:00
2011-03-04 16:54:37 -05:00
log ( DistributedLock : : logLvl + 2 ) < < " distributed lock pinger ' " < < pingId < < " ' about to ping. " < < endl ;
2011-02-15 18:08:59 -05:00
Date_t pingTime ;
2010-10-01 15:00:19 -04:00
2011-02-15 18:08:59 -05:00
try {
ScopedDbConnection conn ( addr ) ;
pingTime = jsTime ( ) ;
// refresh the entry corresponding to this process in the lockpings collection
conn - > update ( DistributedLock : : lockPingNS ,
BSON ( " _id " < < process ) ,
BSON ( " $set " < < BSON ( " ping " < < pingTime ) ) ,
true ) ;
2011-03-04 16:54:37 -05:00
2011-02-15 18:08:59 -05:00
string err = conn - > getLastError ( ) ;
if ( ! err . empty ( ) ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " pinging failed for distributed lock pinger ' " < < pingId < < " '. "
2011-03-31 11:32:47 -04:00
< < causedBy ( err ) < < endl ;
2011-02-15 18:08:59 -05:00
conn . done ( ) ;
2011-03-04 16:54:37 -05:00
// Sleep for normal ping time
sleepmillis ( sleepTime ) ;
2011-02-15 18:08:59 -05:00
continue ;
}
// remove really old entries from the lockpings collection if they're not holding a lock
// (this may happen if an instance of a process was taken down and no new instance came up to
// replace it for a quite a while)
// if the lock is taken, the take-over mechanism should handle the situation
auto_ptr < DBClientCursor > c = conn - > query ( DistributedLock : : locksNS , BSONObj ( ) ) ;
2011-06-03 17:58:26 -04:00
set < string > pids ;
2011-02-15 18:08:59 -05:00
while ( c - > more ( ) ) {
BSONObj lock = c - > next ( ) ;
if ( ! lock [ " process " ] . eoo ( ) ) {
2011-06-03 17:58:26 -04:00
pids . insert ( lock [ " process " ] . valuestrsafe ( ) ) ;
2011-02-15 18:08:59 -05:00
}
}
Date_t fourDays = pingTime - ( 4 * 86400 * 1000 ) ; // 4 days
conn - > remove ( DistributedLock : : lockPingNS , BSON ( " _id " < < BSON ( " $nin " < < pids ) < < " ping " < < LT < < fourDays ) ) ;
err = conn - > getLastError ( ) ;
if ( ! err . empty ( ) ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " ping cleanup for distributed lock pinger ' " < < pingId < < " failed. "
2011-03-31 11:32:47 -04:00
< < causedBy ( err ) < < endl ;
2011-02-15 18:08:59 -05:00
conn . done ( ) ;
2011-03-04 16:54:37 -05:00
// Sleep for normal ping time
sleepmillis ( sleepTime ) ;
2011-02-15 18:08:59 -05:00
continue ;
}
// create index so remove is fast even with a lot of servers
if ( loops + + = = 0 ) {
conn - > ensureIndex ( DistributedLock : : lockPingNS , BSON ( " ping " < < 1 ) ) ;
2010-11-11 14:35:18 -05:00
}
2011-03-04 16:54:37 -05:00
log ( DistributedLock : : logLvl - ( loops % 10 = = 0 ? 1 : 0 ) ) < < " cluster " < < addr < < " pinged successfully at " < < pingTime
< < " by distributed lock pinger ' " < < pingId
< < " ', sleeping for " < < sleepTime < < " ms " < < endl ;
2011-05-03 11:21:49 -04:00
// Remove old locks, if possible
// Make sure no one else is adding to this list at the same time
scoped_lock lk ( _mutex ) ;
int numOldLocks = _oldLockOIDs . size ( ) ;
if ( numOldLocks > 0 )
log ( DistributedLock : : logLvl - 1 ) < < " trying to delete " < < _oldLockOIDs . size ( ) < < " old lock entries for process " < < process < < endl ;
bool removed = false ;
for ( list < OID > : : iterator i = _oldLockOIDs . begin ( ) ; i ! = _oldLockOIDs . end ( ) ;
i = ( removed ? _oldLockOIDs . erase ( i ) : + + i ) ) {
removed = false ;
try {
// Got OID from lock with id, so we don't need to specify id again
conn - > update ( DistributedLock : : locksNS ,
BSON ( " ts " < < * i ) ,
BSON ( " $set " < < BSON ( " state " < < 0 ) ) ) ;
// Either the update went through or it didn't, either way we're done trying to
// unlock
log ( DistributedLock : : logLvl - 1 ) < < " handled late remove of old distributed lock with ts " < < * i < < endl ;
removed = true ;
}
catch ( UpdateNotTheSame & ) {
log ( DistributedLock : : logLvl - 1 ) < < " partially removed old distributed lock with ts " < < * i < < endl ;
removed = true ;
}
catch ( std : : exception & e ) {
warning ( ) < < " could not remove old distributed lock with ts " < < * i
< < causedBy ( e ) < < endl ;
}
}
if ( numOldLocks > 0 & & _oldLockOIDs . size ( ) > 0 ) {
log ( DistributedLock : : logLvl - 1 ) < < " not all old lock entries could be removed for process " < < process < < endl ;
}
conn . done ( ) ;
2010-10-01 15:00:19 -04:00
}
2011-02-15 18:08:59 -05:00
catch ( std : : exception & e ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " distributed lock pinger ' " < < pingId < < " ' detected an exception while pinging. "
2011-03-31 11:32:47 -04:00
< < causedBy ( e ) < < endl ;
2010-07-13 01:21:08 -04:00
}
2011-01-04 00:40:41 -05:00
2011-02-15 18:08:59 -05:00
sleepmillis ( sleepTime ) ;
2010-07-13 00:56:08 -04:00
}
2010-08-11 16:46:37 -04:00
2011-03-04 16:54:37 -05:00
warning ( ) < < " removing distributed lock ping thread ' " < < pingId < < " ' " < < endl ;
2011-02-15 18:08:59 -05:00
2011-03-01 16:56:58 -05:00
if ( shouldKill ( addr , process ) )
2011-02-15 18:08:59 -05:00
finishKill ( addr , process ) ;
2011-01-04 00:40:41 -05:00
2011-02-14 16:21:12 -05:00
}
2011-02-15 18:08:59 -05:00
void distLockPingThread ( ConnectionString addr , long long clockSkew , string processId , unsigned long long sleepTime ) {
try {
jsTimeVirtualThreadSkew ( clockSkew ) ;
_distLockPingThread ( addr , processId , sleepTime ) ;
}
catch ( std : : exception & e ) {
2011-03-31 11:32:47 -04:00
error ( ) < < " unexpected error while running distributed lock pinger for " < < addr < < " , process " < < processId < < causedBy ( e ) < < endl ;
2011-02-15 18:08:59 -05:00
}
catch ( . . . ) {
2011-03-04 16:54:37 -05:00
error ( ) < < " unknown error while running distributed lock pinger for " < < addr < < " , process " < < processId < < endl ;
2011-02-15 18:08:59 -05:00
}
2011-02-14 16:21:12 -05:00
}
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
string pingThreadId ( const ConnectionString & conn , const string & processId ) {
2011-03-01 16:56:58 -05:00
return conn . toString ( ) + " / " + processId ;
}
2011-02-15 18:08:59 -05:00
2011-03-01 16:56:58 -05:00
string got ( DistributedLock & lock , unsigned long long sleepTime ) {
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// Make sure we don't start multiple threads for a process id
2011-02-15 18:08:59 -05:00
scoped_lock lk ( _mutex ) ;
const ConnectionString & conn = lock . getRemoteConnection ( ) ;
const string & processId = lock . getProcessId ( ) ;
2011-03-01 16:56:58 -05:00
string s = pingThreadId ( conn , processId ) ;
2011-02-15 18:08:59 -05:00
// Ignore if we already have a pinging thread for this process.
2011-03-01 16:56:58 -05:00
if ( _seen . count ( s ) > 0 ) return " " ;
2011-02-15 18:08:59 -05:00
// Check our clock skew
try {
if ( lock . isRemoteTimeSkewed ( ) ) {
2011-03-31 11:32:47 -04:00
throw LockException ( str : : stream ( ) < < " clock skew of the cluster " < < conn . toString ( ) < < " is too far out of bounds to allow distributed locking. " , 13650 ) ;
2011-02-15 18:08:59 -05:00
}
}
catch ( LockException & e ) {
2011-03-31 11:32:47 -04:00
throw LockException ( str : : stream ( ) < < " error checking clock skew of cluster " < < conn . toString ( ) < < causedBy ( e ) , 13651 ) ;
2011-02-15 18:08:59 -05:00
}
boost : : thread t ( boost : : bind ( & DistributedLockPinger : : distLockPingThread , this , conn , getJSTimeVirtualThreadSkew ( ) , processId , sleepTime ) ) ;
_seen . insert ( s ) ;
2011-03-01 16:56:58 -05:00
return s ;
2011-02-14 16:21:12 -05:00
}
2011-05-09 15:30:17 -04:00
void addUnlockOID ( const OID & oid ) {
2011-05-03 11:21:49 -04:00
// Modifying the lock from some other thread
scoped_lock lk ( _mutex ) ;
_oldLockOIDs . push_back ( oid ) ;
}
2011-05-09 15:30:17 -04:00
bool willUnlockOID ( const OID & oid ) {
2011-05-03 11:21:49 -04:00
scoped_lock lk ( _mutex ) ;
return find ( _oldLockOIDs . begin ( ) , _oldLockOIDs . end ( ) , oid ) ! = _oldLockOIDs . end ( ) ;
}
2011-05-09 15:30:17 -04:00
void kill ( const ConnectionString & conn , const string & processId ) {
2011-02-15 18:08:59 -05:00
// Make sure we're in a consistent state before other threads can see us
scoped_lock lk ( _mutex ) ;
2011-02-14 16:21:12 -05:00
2011-03-01 16:56:58 -05:00
string pingId = pingThreadId ( conn , processId ) ;
assert ( _seen . count ( pingId ) > 0 ) ;
_kill . insert ( pingId ) ;
2011-02-15 18:08:59 -05:00
}
2011-05-09 15:30:17 -04:00
bool shouldKill ( const ConnectionString & conn , const string & processId ) {
2011-03-01 16:56:58 -05:00
return _kill . count ( pingThreadId ( conn , processId ) ) > 0 ;
2010-07-13 00:56:08 -04:00
}
2011-01-04 00:40:41 -05:00
2011-05-09 15:30:17 -04:00
void finishKill ( const ConnectionString & conn , const string & processId ) {
2011-02-15 18:08:59 -05:00
// Make sure we're in a consistent state before other threads can see us
2010-07-13 00:56:08 -04:00
scoped_lock lk ( _mutex ) ;
2011-02-15 18:08:59 -05:00
2011-03-01 16:56:58 -05:00
string pingId = pingThreadId ( conn , processId ) ;
_kill . erase ( pingId ) ;
_seen . erase ( pingId ) ;
2011-02-15 18:08:59 -05:00
2010-07-13 00:56:08 -04:00
}
2011-01-04 00:40:41 -05:00
2011-03-01 16:56:58 -05:00
set < string > _kill ;
2010-07-13 00:56:08 -04:00
set < string > _seen ;
mongo : : mutex _mutex ;
2011-05-03 11:21:49 -04:00
list < OID > _oldLockOIDs ;
2011-01-04 00:40:41 -05:00
2010-07-13 00:56:08 -04:00
} distLockPinger ;
2011-01-04 00:40:41 -05:00
2011-02-15 18:08:59 -05:00
const string DistributedLock : : lockPingNS = " config.lockpings " ;
const string DistributedLock : : locksNS = " config.locks " ;
/**
* Create a new distributed lock , potentially with a custom sleep and takeover time . If a custom sleep time is
* specified ( time between pings )
*/
2011-05-09 15:30:17 -04:00
DistributedLock : : DistributedLock ( const ConnectionString & conn , const string & name , unsigned long long lockTimeout , bool asProcess )
: _conn ( conn ) , _name ( name ) , _id ( BSON ( " _id " < < name ) ) , _processId ( asProcess ? getDistLockId ( ) : getDistLockProcess ( ) ) ,
_lockTimeout ( lockTimeout = = 0 ? LOCK_TIMEOUT : lockTimeout ) , _maxClockSkew ( _lockTimeout / LOCK_SKEW_FACTOR ) , _maxNetSkew ( _maxClockSkew ) , _lockPing ( _maxClockSkew ) ,
2011-05-11 17:34:08 -04:00
_mutex ( " DistributedLock " )
2011-05-09 15:30:17 -04:00
{
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " created new distributed lock for " < < name < < " on " < < conn
2011-05-09 15:30:17 -04:00
< < " ( lock timeout : " < < _lockTimeout
2011-05-12 15:26:39 -04:00
< < " , ping interval : " < < _lockPing < < " , process : " < < asProcess < < " ) " < < endl ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
Date_t DistributedLock : : getRemoteTime ( ) {
return DistributedLock : : remoteTime ( _conn , _maxNetSkew ) ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
bool DistributedLock : : isRemoteTimeSkewed ( ) {
return ! DistributedLock : : checkSkew ( _conn , NUM_LOCK_SKEW_CHECKS , _maxClockSkew , _maxNetSkew ) ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
const ConnectionString & DistributedLock : : getRemoteConnection ( ) {
return _conn ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
const string & DistributedLock : : getProcessId ( ) {
return _processId ;
2011-02-15 18:08:59 -05:00
}
/**
* Returns the remote time as reported by the cluster or server . The maximum difference between the reported time
* and the actual time on the remote server ( at the completion of the function ) is the maxNetSkew
*/
Date_t DistributedLock : : remoteTime ( const ConnectionString & cluster , unsigned long long maxNetSkew ) {
ConnectionString server ( * cluster . getServers ( ) . begin ( ) ) ;
ScopedDbConnection conn ( server ) ;
BSONObj result ;
long long delay ;
try {
Date_t then = jsTime ( ) ;
bool success = conn - > runCommand ( string ( " admin " ) , BSON ( " serverStatus " < < 1 ) , result ) ;
delay = jsTime ( ) - then ;
if ( ! success )
2011-03-31 11:32:47 -04:00
throw TimeNotFoundException ( str : : stream ( ) < < " could not get status from server "
< < server . toString ( ) < < " in cluster " < < cluster . toString ( )
< < " to check time " , 13647 ) ;
2011-02-15 18:08:59 -05:00
// Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote
// time value can be off by if we assume a response in the middle of the delay.
if ( delay > ( long long ) ( maxNetSkew * 2 ) )
2011-03-31 11:32:47 -04:00
throw TimeNotFoundException ( str : : stream ( ) < < " server " < < server . toString ( )
< < " in cluster " < < cluster . toString ( )
< < " did not respond within max network delay of "
< < maxNetSkew < < " ms " , 13648 ) ;
2011-02-15 18:08:59 -05:00
}
catch ( . . . ) {
conn . done ( ) ;
throw ;
}
conn . done ( ) ;
return result [ " localTime " ] . Date ( ) - ( delay / 2 ) ;
2010-07-13 00:56:08 -04:00
}
2011-02-15 18:08:59 -05:00
bool DistributedLock : : checkSkew ( const ConnectionString & cluster , unsigned skewChecks , unsigned long long maxClockSkew , unsigned long long maxNetSkew ) {
vector < HostAndPort > servers = cluster . getServers ( ) ;
if ( servers . size ( ) < 1 ) return true ;
vector < long long > avgSkews ;
for ( unsigned i = 0 ; i < skewChecks ; i + + ) {
// Find the average skew for each server
unsigned s = 0 ;
for ( vector < HostAndPort > : : iterator si = servers . begin ( ) ; si ! = servers . end ( ) ; + + si , s + + ) {
if ( i = = 0 ) avgSkews . push_back ( 0 ) ;
// Could check if this is self, but shouldn't matter since local network connection should be fast.
ConnectionString server ( * si ) ;
vector < long long > skew ;
BSONObj result ;
Date_t remote = remoteTime ( server , maxNetSkew ) ;
Date_t local = jsTime ( ) ;
// Remote time can be delayed by at most MAX_NET_SKEW
// Skew is how much time we'd have to add to local to get to remote
avgSkews [ s ] + = ( long long ) ( remote - local ) ;
2011-03-04 16:54:37 -05:00
log ( logLvl + 1 ) < < " skew from remote server " < < server < < " found: " < < ( long long ) ( remote - local ) < < endl ;
2011-02-15 18:08:59 -05:00
}
}
// Analyze skews
long long serverMaxSkew = 0 ;
long long serverMinSkew = 0 ;
for ( unsigned s = 0 ; s < avgSkews . size ( ) ; s + + ) {
long long avgSkew = ( avgSkews [ s ] / = skewChecks ) ;
// Keep track of max and min skews
if ( s = = 0 ) {
serverMaxSkew = avgSkew ;
serverMinSkew = avgSkew ;
}
else {
if ( avgSkew > serverMaxSkew )
serverMaxSkew = avgSkew ;
if ( avgSkew < serverMinSkew )
serverMinSkew = avgSkew ;
}
}
long long totalSkew = serverMaxSkew - serverMinSkew ;
// Make sure our max skew is not more than our pre-set limit
2011-03-04 16:54:37 -05:00
if ( totalSkew > ( long long ) maxClockSkew ) {
log ( logLvl + 1 ) < < " total clock skew of " < < totalSkew < < " ms for servers " < < cluster < < " is out of " < < maxClockSkew < < " ms bounds. " < < endl ;
return false ;
2011-02-15 18:08:59 -05:00
}
2011-03-04 16:54:37 -05:00
log ( logLvl + 1 ) < < " total clock skew of " < < totalSkew < < " ms for servers " < < cluster < < " is in " < < maxClockSkew < < " ms bounds. " < < endl ;
2011-02-15 18:08:59 -05:00
return true ;
}
2011-03-01 16:56:58 -05:00
// For use in testing, ping thread should run indefinitely in practice.
2011-03-04 16:54:37 -05:00
bool DistributedLock : : killPinger ( DistributedLock & lock ) {
if ( lock . _threadId = = " " ) return false ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
distLockPinger . kill ( lock . _conn , lock . _processId ) ;
return true ;
2011-02-15 18:08:59 -05:00
}
2011-01-04 00:40:41 -05:00
2011-03-22 22:27:16 -04:00
// Semantics of this method are basically that if the lock cannot be acquired, returns false, can be retried.
2011-05-03 11:21:49 -04:00
// If the lock should not be tried again (some unexpected error) a LockException is thrown.
// If we are only trying to re-enter a currently held lock, reenter should be true.
// Note: reenter doesn't actually make this lock re-entrant in the normal sense, since it can still only
// be unlocked once, instead it is used to verify that the lock is already held.
2011-05-11 12:00:40 -04:00
bool DistributedLock : : lock_try ( const string & why , bool reenter , BSONObj * other ) {
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// TODO: Start pinging only when we actually get the lock?
// If we don't have a thread pinger, make sure we shouldn't have one
2011-05-11 17:34:08 -04:00
if ( _threadId = = " " ) {
scoped_lock lk ( _mutex ) ;
2011-03-04 16:54:37 -05:00
_threadId = distLockPinger . got ( * this , _lockPing ) ;
2011-05-11 17:34:08 -04:00
}
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// This should always be true, if not, we are using the lock incorrectly.
assert ( _name ! = " " ) ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// write to dummy if 'other' is null
2011-01-04 00:40:41 -05:00
BSONObj dummyOther ;
2010-10-26 15:25:26 -04:00
if ( other = = NULL )
other = & dummyOther ;
2010-07-12 17:53:02 -04:00
ScopedDbConnection conn ( _conn ) ;
2011-01-04 00:40:41 -05:00
2010-07-12 17:53:02 -04:00
BSONObjBuilder queryBuilder ;
queryBuilder . appendElements ( _id ) ;
2011-01-04 00:40:41 -05:00
queryBuilder . append ( " state " , 0 ) ;
2010-07-12 17:53:02 -04:00
2011-01-04 00:40:41 -05:00
{
// make sure its there so we can use simple update logic below
2011-05-09 15:30:17 -04:00
BSONObj o = conn - > findOne ( locksNS , _id ) . getOwned ( ) ;
2011-03-04 16:54:37 -05:00
// Case 1: No locks
2011-01-04 00:40:41 -05:00
if ( o . isEmpty ( ) ) {
2010-07-12 17:53:02 -04:00
try {
2011-05-09 15:30:17 -04:00
log ( logLvl ) < < " inserting initial doc in " < < locksNS < < " for lock " < < _name < < endl ;
conn - > insert ( locksNS , BSON ( " _id " < < _name < < " state " < < 0 < < " who " < < " " ) ) ;
2010-07-12 17:53:02 -04:00
}
2011-01-04 00:40:41 -05:00
catch ( UserException & e ) {
2011-03-31 11:32:47 -04:00
warning ( ) < < " could not insert initial doc for distributed lock " < < _name < < causedBy ( e ) < < endl ;
2010-07-12 17:53:02 -04:00
}
}
2011-03-04 16:54:37 -05:00
// Case 2: A set lock that we might be able to force
2011-01-04 00:40:41 -05:00
else if ( o [ " state " ] . numberInt ( ) > 0 ) {
2011-02-15 18:08:59 -05:00
string lockName = o [ " _id " ] . String ( ) + string ( " / " ) + o [ " process " ] . String ( ) ;
2011-05-03 11:21:49 -04:00
bool canReenter = reenter & & o [ " process " ] . String ( ) = = _processId & & ! distLockPinger . willUnlockOID ( o [ " ts " ] . OID ( ) ) & & o [ " state " ] . numberInt ( ) = = 2 ;
if ( reenter & & ! canReenter ) {
2011-05-11 14:30:56 -04:00
log ( logLvl - 1 ) < < " not re-entering distributed lock " < < lockName ;
if ( o [ " process " ] . String ( ) ! = _processId ) log ( logLvl - 1 ) < < " , different process " < < _processId < < endl ;
else if ( o [ " state " ] . numberInt ( ) = = 2 ) log ( logLvl - 1 ) < < " , state not finalized " < < endl ;
else log ( logLvl - 1 ) < < " , ts " < < o [ " ts " ] . OID ( ) < < " scheduled for late unlock " < < endl ;
// reset since we've been bounced by a previous lock not being where we thought it was,
// and should go through full forcing process if required.
// (in theory we should never see a ping here if used correctly)
* other = o ; other - > getOwned ( ) ; conn . done ( ) ; resetLastPing ( ) ;
2011-05-03 11:21:49 -04:00
return false ;
}
2010-07-13 01:21:08 -04:00
BSONObj lastPing = conn - > findOne ( lockPingNS , o [ " process " ] . wrap ( " _id " ) ) ;
2011-01-04 00:40:41 -05:00
if ( lastPing . isEmpty ( ) ) {
2011-05-09 15:30:17 -04:00
log ( logLvl ) < < " empty ping found for process in lock ' " < < lockName < < " ' " < < endl ;
// TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
lastPing = BSON ( " _id " < < o [ " process " ] . String ( ) < < " ping " < < ( Date_t ) 0 ) ;
2010-07-13 01:21:08 -04:00
}
2011-02-15 18:08:59 -05:00
unsigned long long elapsed = 0 ;
2011-05-09 15:30:17 -04:00
unsigned long long takeover = _lockTimeout ;
2011-02-15 18:08:59 -05:00
2011-05-09 15:30:17 -04:00
log ( logLvl ) < < " checking last ping for lock ' " < < lockName < < " ' " < < " against process " < < _lastPingCheck . get < 0 > ( ) < < " and ping " < < _lastPingCheck . get < 1 > ( ) < < endl ;
2011-02-15 18:08:59 -05:00
2011-05-09 15:30:17 -04:00
try {
2011-02-15 18:08:59 -05:00
2011-05-09 15:30:17 -04:00
Date_t remote = remoteTime ( _conn ) ;
2011-05-03 11:21:49 -04:00
2011-05-09 15:30:17 -04:00
// Timeout the elapsed time using comparisons of remote clock
// For non-finalized locks, timeout 15 minutes since last seen (ts)
// For finalized locks, timeout 15 minutes since last ping
bool recPingChange = o [ " state " ] . numberInt ( ) = = 2 & & ( _lastPingCheck . get < 0 > ( ) ! = lastPing [ " _id " ] . String ( ) | | _lastPingCheck . get < 1 > ( ) ! = lastPing [ " ping " ] . Date ( ) ) ;
2011-06-01 13:27:47 -07:00
bool recTSChange = _lastPingCheck . get < 3 > ( ) ! = o [ " ts " ] . OID ( ) ;
2011-02-15 18:08:59 -05:00
2011-05-09 15:30:17 -04:00
if ( recPingChange | | recTSChange ) {
// If the ping has changed since we last checked, mark the current date and time
2011-05-11 17:34:08 -04:00
scoped_lock lk ( _mutex ) ;
_lastPingCheck = boost : : tuple < string , Date_t , Date_t , OID > ( lastPing [ " _id " ] . String ( ) . c_str ( ) , lastPing [ " ping " ] . Date ( ) , remote , o [ " ts " ] . OID ( ) ) ;
2011-02-15 18:08:59 -05:00
}
2011-05-09 15:30:17 -04:00
else {
2011-02-15 18:08:59 -05:00
2011-05-09 15:30:17 -04:00
// GOTCHA! Due to network issues, it is possible that the current time
// is less than the remote time. We *have* to check this here, otherwise
// we overflow and our lock breaks.
if ( _lastPingCheck . get < 2 > ( ) > = remote )
elapsed = 0 ;
else
elapsed = remote - _lastPingCheck . get < 2 > ( ) ;
2011-02-15 18:08:59 -05:00
}
2011-02-17 19:59:33 -05:00
}
2011-05-09 15:30:17 -04:00
catch ( LockException & e ) {
// Remote server cannot be found / is not responsive
warning ( ) < < " Could not get remote time from " < < _conn < < causedBy ( e ) ;
2011-05-11 14:30:56 -04:00
// If our config server is having issues, forget all the pings until we can see it again
resetLastPing ( ) ;
2011-05-09 15:30:17 -04:00
2011-02-16 17:23:16 -05:00
}
2011-02-15 18:08:59 -05:00
2011-05-03 11:21:49 -04:00
if ( elapsed < = takeover & & ! canReenter ) {
log ( logLvl ) < < " could not force lock ' " < < lockName < < " ' because elapsed time " < < elapsed < < " <= takeover time " < < takeover < < endl ;
* other = o ; other - > getOwned ( ) ; conn . done ( ) ;
return false ;
}
else if ( elapsed > takeover & & canReenter ) {
2011-05-11 14:30:56 -04:00
log ( logLvl - 1 ) < < " not re-entering distributed lock " < < lockName < < " ' because elapsed time " < < elapsed < < " > takeover time " < < takeover < < endl ;
2011-05-03 11:21:49 -04:00
* other = o ; other - > getOwned ( ) ; conn . done ( ) ;
2010-07-13 01:21:08 -04:00
return false ;
2010-07-13 18:21:37 -04:00
}
2011-01-04 00:40:41 -05:00
2011-05-03 11:21:49 -04:00
log ( logLvl - 1 ) < < ( canReenter ? " re-entering " : " forcing " ) < < " lock ' " < < lockName < < " ' because "
< < ( canReenter ? " re-entering is allowed, " : " " )
< < " elapsed time " < < elapsed < < " > takeover time " < < takeover < < endl ;
if ( elapsed > takeover ) {
2011-02-15 18:08:59 -05:00
2011-05-11 14:30:56 -04:00
// Lock may forced, reset our timer if succeeds or fails
// Ensures that another timeout must happen if something borks up here, and resets our pristine
// ping state if acquired.
resetLastPing ( ) ;
2011-05-03 11:21:49 -04:00
try {
// Check the clock skew again. If we check this before we get a lock
// and after the lock times out, we can be pretty sure the time is
// increasing at the same rate on all servers and therefore our
// timeout is accurate
uassert ( 14023 , str : : stream ( ) < < " remote time in cluster " < < _conn . toString ( ) < < " is now skewed, cannot force lock. " , ! isRemoteTimeSkewed ( ) ) ;
// Make sure we break the lock with the correct "ts" (OID) value, otherwise
// we can overwrite a new lock inserted in the meantime.
2011-05-09 15:30:17 -04:00
conn - > update ( locksNS , BSON ( " _id " < < _id [ " _id " ] . String ( ) < < " state " < < o [ " state " ] . numberInt ( ) < < " ts " < < o [ " ts " ] ) ,
2011-05-03 11:21:49 -04:00
BSON ( " $set " < < BSON ( " state " < < 0 ) ) ) ;
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
// TODO: Clean up all the extra code to exit this method, probably with a refactor
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
( errMsg . empty ( ) ? log ( logLvl - 1 ) : warning ( ) ) < < " Could not force lock ' " < < lockName < < " ' "
< < ( ! errMsg . empty ( ) ? causedBy ( errMsg ) : string ( " (another force won) " ) ) < < endl ;
* other = o ; other - > getOwned ( ) ; conn . done ( ) ;
return false ;
}
}
catch ( UpdateNotTheSame & ) {
// Ok to continue since we know we forced at least one lock document, and all lock docs
// are required for a lock to be held.
warning ( ) < < " lock forcing " < < lockName < < " inconsistent " < < endl ;
}
catch ( std : : exception & e ) {
2011-02-15 18:08:59 -05:00
conn . done ( ) ;
2011-05-03 11:21:49 -04:00
throw LockException ( str : : stream ( ) < < " exception forcing distributed lock "
< < lockName < < causedBy ( e ) , 13660 ) ;
2011-02-15 18:08:59 -05:00
}
}
2011-05-03 11:21:49 -04:00
else {
assert ( canReenter ) ;
2011-05-11 14:30:56 -04:00
// Lock may be re-entered, reset our timer if succeeds or fails
// Not strictly necessary, but helpful for small timeouts where thread scheduling is significant.
// This ensures that two attempts are still required for a force if not acquired, and resets our
// state if we are acquired.
resetLastPing ( ) ;
2011-05-03 11:21:49 -04:00
// Test that the lock is held by trying to update the finalized state of the lock to the same state
// if it does not update or does not update on all servers, we can't re-enter.
try {
// Test the lock with the correct "ts" (OID) value
2011-05-09 15:30:17 -04:00
conn - > update ( locksNS , BSON ( " _id " < < _id [ " _id " ] . String ( ) < < " state " < < 2 < < " ts " < < o [ " ts " ] ) ,
2011-05-03 11:21:49 -04:00
BSON ( " $set " < < BSON ( " state " < < 2 ) ) ) ;
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
// TODO: Clean up all the extra code to exit this method, probably with a refactor
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
( errMsg . empty ( ) ? log ( logLvl - 1 ) : warning ( ) ) < < " Could not re-enter lock ' " < < lockName < < " ' "
2011-08-06 09:36:34 -04:00
< < ( ! errMsg . empty ( ) ? causedBy ( errMsg ) : string ( " (not sure lock is held) " ) )
< < " gle: " < < err
< < endl ;
2011-05-03 11:21:49 -04:00
* other = o ; other - > getOwned ( ) ; conn . done ( ) ;
return false ;
}
}
catch ( UpdateNotTheSame & ) {
// NOT ok to continue since our lock isn't held by all servers, so isn't valid.
warning ( ) < < " inconsistent state re-entering lock, lock " < < lockName < < " not held " < < endl ;
* other = o ; other - > getOwned ( ) ; conn . done ( ) ;
return false ;
}
catch ( std : : exception & e ) {
conn . done ( ) ;
throw LockException ( str : : stream ( ) < < " exception re-entering distributed lock "
< < lockName < < causedBy ( e ) , 13660 ) ;
}
log ( logLvl - 1 ) < < " re-entered distributed lock ' " < < lockName < < " ' " < < endl ;
* other = o ; other - > getOwned ( ) ; conn . done ( ) ;
return true ;
2010-11-30 12:00:27 -05:00
}
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " lock ' " < < lockName < < " ' successfully forced " < < endl ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
// We don't need the ts value in the query, since we will only ever replace locks with state=0.
2010-07-12 17:53:02 -04:00
}
2011-03-04 16:54:37 -05:00
// Case 3: We have an expired lock
2011-01-04 00:40:41 -05:00
else if ( o [ " ts " ] . type ( ) ) {
2010-07-12 17:53:02 -04:00
queryBuilder . append ( o [ " ts " ] ) ;
}
}
2011-01-04 00:40:41 -05:00
2011-05-11 14:30:56 -04:00
// Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open
// and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock.
resetLastPing ( ) ;
2010-07-12 17:53:02 -04:00
bool gotLock = false ;
2011-03-04 16:54:37 -05:00
BSONObj currLock ;
2010-08-11 16:46:37 -04:00
2011-02-15 18:08:59 -05:00
BSONObj lockDetails = BSON ( " state " < < 1 < < " who " < < getDistLockId ( ) < < " process " < < _processId < <
" when " < < jsTime ( ) < < " why " < < why < < " ts " < < OID : : gen ( ) ) ;
2010-08-11 16:46:37 -04:00
BSONObj whatIWant = BSON ( " $set " < < lockDetails ) ;
2011-02-15 18:08:59 -05:00
BSONObj query = queryBuilder . obj ( ) ;
2011-03-04 16:54:37 -05:00
string lockName = _name + string ( " / " ) + _processId ;
2010-07-12 17:53:02 -04:00
try {
2010-08-11 16:46:37 -04:00
2011-03-22 22:27:16 -04:00
// Main codepath to acquire lock
2011-03-04 16:54:37 -05:00
log ( logLvl ) < < " about to acquire distributed lock ' " < < lockName < < " : \n "
< < lockDetails . jsonString ( Strict , true ) < < " \n "
< < query . jsonString ( Strict , true ) < < endl ;
2011-05-09 15:30:17 -04:00
conn - > update ( locksNS , query , whatIWant ) ;
2011-01-04 00:40:41 -05:00
2011-03-04 16:54:37 -05:00
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
2011-01-04 00:40:41 -05:00
2011-05-09 15:30:17 -04:00
currLock = conn - > findOne ( locksNS , _id ) ;
2011-03-04 16:54:37 -05:00
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
( errMsg . empty ( ) ? log ( logLvl - 1 ) : warning ( ) ) < < " could not acquire lock ' " < < lockName < < " ' "
2011-03-31 11:32:47 -04:00
< < ( ! errMsg . empty ( ) ? causedBy ( errMsg ) : string ( " (another update won) " ) ) < < endl ;
2011-03-04 16:54:37 -05:00
* other = currLock ;
2010-10-26 15:25:26 -04:00
other - > getOwned ( ) ;
2010-07-12 17:53:02 -04:00
gotLock = false ;
}
else {
gotLock = true ;
}
}
2011-01-04 00:40:41 -05:00
catch ( UpdateNotTheSame & up ) {
2011-03-22 22:27:16 -04:00
2010-07-12 17:53:02 -04:00
// this means our update got through on some, but not others
2011-03-31 11:32:47 -04:00
warning ( ) < < " distributed lock ' " < < lockName < < " did not propagate properly. " < < causedBy ( up ) < < endl ;
2010-07-12 17:53:02 -04:00
2011-03-22 22:27:16 -04:00
// Overall protection derives from:
// All unlocking updates use the ts value when setting state to 0
// This ensures that during locking, we can override all smaller ts locks with
// our own safe ts value and not be unlocked afterward.
for ( unsigned i = 0 ; i < up . size ( ) ; i + + ) {
2010-07-12 17:53:02 -04:00
2011-03-04 16:54:37 -05:00
ScopedDbConnection indDB ( up [ i ] . first ) ;
2011-03-22 22:27:16 -04:00
BSONObj indUpdate ;
try {
2011-05-09 15:30:17 -04:00
indUpdate = indDB - > findOne ( locksNS , _id ) ;
2011-03-22 22:27:16 -04:00
// If we override this lock in any way, grab and protect it.
// We assume/ensure that if a process does not have all lock documents, it is no longer
// holding the lock.
2011-05-03 11:21:49 -04:00
// Note - finalized locks may compete too, but we know they've won already if competing
// in this round. Cleanup of crashes during finalizing may take a few tries.
2011-03-22 22:27:16 -04:00
if ( indUpdate [ " ts " ] < lockDetails [ " ts " ] | | indUpdate [ " state " ] . numberInt ( ) = = 0 ) {
BSONObj grabQuery = BSON ( " _id " < < _id [ " _id " ] . String ( ) < < " ts " < < indUpdate [ " ts " ] . OID ( ) ) ;
// Change ts so we won't be forced, state so we won't be relocked
BSONObj grabChanges = BSON ( " ts " < < lockDetails [ " ts " ] . OID ( ) < < " state " < < 1 ) ;
2011-03-04 16:54:37 -05:00
2011-03-22 22:27:16 -04:00
// Either our update will succeed, and we'll grab the lock, or it will fail b/c some other
// process grabbed the lock (which will change the ts), but the lock will be set until forcing
2011-05-09 15:30:17 -04:00
indDB - > update ( locksNS , grabQuery , BSON ( " $set " < < grabChanges ) ) ;
2011-03-22 22:27:16 -04:00
2011-05-09 15:30:17 -04:00
indUpdate = indDB - > findOne ( locksNS , _id ) ;
2011-03-22 22:27:16 -04:00
// Our lock should now be set until forcing.
assert ( indUpdate [ " state " ] . numberInt ( ) = = 1 ) ;
}
// else our lock is the same, in which case we're safe, or it's a bigger lock,
// in which case we won't need to protect anything since we won't have the lock.
}
catch ( std : : exception & e ) {
conn . done ( ) ;
2011-03-31 11:32:47 -04:00
throw LockException ( str : : stream ( ) < < " distributed lock " < < lockName
< < " had errors communicating with individual server "
< < up [ 1 ] . first < < causedBy ( e ) , 13661 ) ;
2011-03-22 22:27:16 -04:00
}
assert ( ! indUpdate . isEmpty ( ) ) ;
// Find max TS value
2011-03-04 16:54:37 -05:00
if ( currLock . isEmpty ( ) | | currLock [ " ts " ] < indUpdate [ " ts " ] ) {
currLock = indUpdate . getOwned ( ) ;
2010-07-12 17:53:02 -04:00
}
2011-03-04 16:54:37 -05:00
indDB . done ( ) ;
2010-07-12 17:53:02 -04:00
}
2011-03-23 12:46:36 -04:00
// Locks on all servers are now set and safe until forcing
2011-03-22 22:27:16 -04:00
if ( currLock [ " ts " ] = = lockDetails [ " ts " ] ) {
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " lock update won, completing lock propagation for ' " < < lockName < < " ' " < < endl ;
2011-05-03 11:21:49 -04:00
gotLock = true ;
2010-07-12 17:53:02 -04:00
}
else {
2011-03-04 16:54:37 -05:00
log ( logLvl - 1 ) < < " lock update lost, lock ' " < < lockName < < " ' not propagated. " < < endl ;
2011-05-03 11:21:49 -04:00
// Register the lock for deletion, to speed up failover
// Not strictly necessary, but helpful
distLockPinger . addUnlockOID ( lockDetails [ " ts " ] . OID ( ) ) ;
2010-07-12 17:53:02 -04:00
gotLock = false ;
}
}
2011-03-22 22:27:16 -04:00
catch ( std : : exception & e ) {
conn . done ( ) ;
2011-03-31 11:32:47 -04:00
throw LockException ( str : : stream ( ) < < " exception creating distributed lock "
< < lockName < < causedBy ( e ) , 13663 ) ;
2011-03-22 22:27:16 -04:00
}
2011-01-04 00:40:41 -05:00
2011-05-03 11:21:49 -04:00
// Complete lock propagation
if ( gotLock ) {
// This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at
// least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set.
// The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that
// indicates the lock is active, but this means the process creating/destroying them must explicitly poll
// when something goes wrong.
try {
BSONObjBuilder finalLockDetails ;
BSONObjIterator bi ( lockDetails ) ;
while ( bi . more ( ) ) {
BSONElement el = bi . next ( ) ;
if ( ( string ) ( el . fieldName ( ) ) = = " state " )
finalLockDetails . append ( " state " , 2 ) ;
else finalLockDetails . append ( el ) ;
}
2011-05-09 15:30:17 -04:00
conn - > update ( locksNS , _id , BSON ( " $set " < < finalLockDetails . obj ( ) ) ) ;
2011-05-03 11:21:49 -04:00
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
2011-05-09 15:30:17 -04:00
currLock = conn - > findOne ( locksNS , _id ) ;
2011-05-03 11:21:49 -04:00
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
warning ( ) < < " could not finalize winning lock " < < lockName
< < ( ! errMsg . empty ( ) ? causedBy ( errMsg ) : " (did not update lock) " ) < < endl ;
gotLock = false ;
}
else {
// SUCCESS!
gotLock = true ;
}
}
catch ( std : : exception & e ) {
conn . done ( ) ;
// Register the bad final lock for deletion, in case it exists
distLockPinger . addUnlockOID ( lockDetails [ " ts " ] . OID ( ) ) ;
throw LockException ( str : : stream ( ) < < " exception finalizing winning lock "
< < causedBy ( e ) , 13662 ) ;
}
}
* other = currLock ;
other - > getOwned ( ) ;
// Log our lock results
2011-03-04 16:54:37 -05:00
if ( gotLock )
2011-06-16 17:09:34 -04:00
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' acquired, ts : " < < currLock [ " ts " ] . OID ( ) < < endl ;
2011-03-04 16:54:37 -05:00
else
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' was not acquired. " < < endl ;
2011-02-15 18:08:59 -05:00
2010-07-12 17:53:02 -04:00
conn . done ( ) ;
2011-01-04 00:40:41 -05:00
2010-08-12 11:21:34 -04:00
return gotLock ;
2010-07-12 17:53:02 -04:00
}
2011-05-03 11:21:49 -04:00
// Unlock now takes an optional pointer to the lock, so you can be specific about which
// particular lock you want to unlock. This is required when the config server is down,
// and so cannot tell you what lock ts you should try later.
void DistributedLock : : unlock ( BSONObj * oldLockPtr ) {
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
assert ( _name ! = " " ) ;
string lockName = _name + string ( " / " ) + _processId ;
2011-02-15 18:08:59 -05:00
2011-03-04 16:54:37 -05:00
const int maxAttempts = 3 ;
2010-08-05 11:52:14 -04:00
int attempted = 0 ;
2011-05-03 11:21:49 -04:00
2011-03-22 22:27:16 -04:00
BSONObj oldLock ;
2011-05-03 11:21:49 -04:00
if ( oldLockPtr ) oldLock = * oldLockPtr ;
2011-03-22 22:27:16 -04:00
2010-08-05 11:52:14 -04:00
while ( + + attempted < = maxAttempts ) {
2011-03-22 22:27:16 -04:00
ScopedDbConnection conn ( _conn ) ;
2010-08-05 11:52:14 -04:00
try {
2011-03-22 22:27:16 -04:00
if ( oldLock . isEmpty ( ) )
2011-05-09 15:30:17 -04:00
oldLock = conn - > findOne ( locksNS , _id ) ;
2010-08-05 11:52:14 -04:00
2011-05-03 11:21:49 -04:00
if ( oldLock [ " state " ] . eoo ( ) | | oldLock [ " state " ] . numberInt ( ) ! = 2 | | oldLock [ " ts " ] . eoo ( ) ) {
warning ( ) < < " cannot unlock invalid distributed lock " < < oldLock < < endl ;
conn . done ( ) ;
break ;
}
2011-03-22 22:27:16 -04:00
// Use ts when updating lock, so that new locks can be sure they won't get trampled.
2011-05-09 15:30:17 -04:00
conn - > update ( locksNS ,
2011-03-22 22:27:16 -04:00
BSON ( " _id " < < _id [ " _id " ] . String ( ) < < " ts " < < oldLock [ " ts " ] . OID ( ) ) ,
BSON ( " $set " < < BSON ( " state " < < 0 ) ) ) ;
2011-05-12 17:03:53 -04:00
// Check that the lock was actually unlocked... if not, try again
BSONObj err = conn - > getLastErrorDetailed ( ) ;
string errMsg = DBClientWithCommands : : getLastErrorString ( err ) ;
if ( ! errMsg . empty ( ) | | ! err [ " n " ] . type ( ) | | err [ " n " ] . numberInt ( ) < 1 ) {
warning ( ) < < " distributed lock unlock update failed, retrying "
< < ( errMsg . empty ( ) ? causedBy ( " ( update not registered ) " ) : causedBy ( errMsg ) ) < < endl ;
conn . done ( ) ;
continue ;
}
2011-03-22 22:27:16 -04:00
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' unlocked. " < < endl ;
2011-03-04 16:54:37 -05:00
conn . done ( ) ;
return ;
2011-03-22 22:27:16 -04:00
}
2011-03-28 16:10:30 -04:00
catch ( UpdateNotTheSame & ) {
2011-03-22 22:27:16 -04:00
log ( logLvl - 1 ) < < " distributed lock ' " < < lockName < < " ' unlocked (messily). " < < endl ;
conn . done ( ) ;
break ;
2011-01-04 00:40:41 -05:00
}
catch ( std : : exception & e ) {
2011-03-04 16:54:37 -05:00
warning ( ) < < " distributed lock ' " < < lockName < < " ' failed unlock attempt. "
2011-03-31 11:32:47 -04:00
< < causedBy ( e ) < < endl ;
2010-07-12 17:53:02 -04:00
2011-03-22 22:27:16 -04:00
conn . done ( ) ;
// TODO: If our lock timeout is small, sleeping this long may be unsafe.
if ( attempted ! = maxAttempts ) sleepsecs ( 1 < < attempted ) ;
2010-08-05 11:52:14 -04:00
}
}
2011-05-03 11:21:49 -04:00
if ( attempted > maxAttempts & & ! oldLock . isEmpty ( ) & & ! oldLock [ " ts " ] . eoo ( ) ) {
log ( logLvl - 1 ) < < " could not unlock distributed lock with ts " < < oldLock [ " ts " ] . OID ( )
< < " , will attempt again later " < < endl ;
// We couldn't unlock the lock at all, so try again later in the pinging thread...
distLockPinger . addUnlockOID ( oldLock [ " ts " ] . OID ( ) ) ;
}
else if ( attempted > maxAttempts ) {
warning ( ) < < " could not unlock untracked distributed lock, a manual force may be required " < < endl ;
}
2011-03-04 16:54:37 -05:00
warning ( ) < < " distributed lock ' " < < lockName < < " ' couldn't consummate unlock request. "
2011-05-09 15:30:17 -04:00
< < " lock may be taken over after " < < ( _lockTimeout / ( 60 * 1000 ) )
2011-03-04 16:54:37 -05:00
< < " minutes timeout. " < < endl ;
2010-08-05 11:52:14 -04:00
}
2010-07-12 17:53:02 -04:00
2011-02-15 18:08:59 -05:00
2010-07-12 17:53:02 -04:00
}