2008-07-27 18:36:47 -04:00
// repl.cpp
2008-09-05 18:04:29 -04:00
/* TODO
PAIRING
_ on a syncexception , don ' t allow going back to master state ?
*/
2008-07-27 18:36:47 -04:00
/**
* Copyright ( C ) 2008 10 gen Inc .
2008-12-28 20:28:49 -05:00
*
2008-07-27 18:36:47 -04:00
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU Affero General Public License , version 3 ,
* as published by the Free Software Foundation .
2008-12-28 20:28:49 -05:00
*
2008-07-27 18:36:47 -04:00
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU Affero General Public License for more details .
2008-12-28 20:28:49 -05:00
*
2008-07-27 18:36:47 -04:00
* You should have received a copy of the GNU Affero General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
2008-12-15 17:23:54 -05:00
/* Collections we use:
local . sources - indicates what sources we pull from as a " slave " , and the last update of each
local . oplog . $ main - our op log as " master "
2010-08-26 11:03:31 -04:00
local . dbinfo . < dbname > - no longer used ? ? ?
2011-03-29 16:56:21 -07:00
local . pair . startup - [ deprecated ] can contain a special value indicating for a pair that we have the master copy .
2008-12-15 17:23:54 -05:00
used when replacing other half of the pair which has permanently failed .
2011-03-29 16:56:21 -07:00
local . pair . sync - [ deprecated ] { initialsynccomplete : 1 }
2008-12-15 17:23:54 -05:00
*/
2010-04-27 15:27:52 -04:00
# include "pch.h"
2008-07-27 18:36:47 -04:00
# include "jsobj.h"
# include "../util/goodies.h"
# include "repl.h"
2011-06-26 17:13:54 -04:00
# include "../util/net/message.h"
2010-03-30 14:32:15 -04:00
# include "../util/background.h"
2008-10-19 17:46:53 -05:00
# include "../client/dbclient.h"
2010-02-11 14:35:55 -05:00
# include "../client/connpool.h"
2008-07-27 18:36:47 -04:00
# include "pdfile.h"
2011-06-22 15:51:08 -04:00
# include "ops/query.h"
2008-08-02 14:58:15 -04:00
# include "db.h"
2008-09-11 09:15:34 -04:00
# include "commands.h"
2009-01-20 11:05:53 -05:00
# include "security.h"
2009-08-25 14:35:22 -04:00
# include "cmdline.h"
2010-04-13 11:45:01 -04:00
# include "repl_block.h"
2010-05-29 15:45:47 -04:00
# include "repl/rs.h"
2011-03-29 16:56:21 -07:00
# include "replutil.h"
2011-05-10 19:13:55 -04:00
# include "repl/connections.h"
2011-06-20 16:23:54 -04:00
# include "ops/update.h"
2008-07-27 18:36:47 -04:00
2009-01-14 17:09:51 -05:00
namespace mongo {
2011-01-04 00:40:41 -05:00
2010-03-05 13:50:46 -05:00
// our config from command line etc.
2010-02-08 21:04:09 -05:00
ReplSettings replSettings ;
2009-01-15 10:17:11 -05:00
/* if 1 sync() is running */
2010-03-27 12:23:39 -04:00
volatile int syncing = 0 ;
2011-01-04 00:40:41 -05:00
static volatile int relinquishSyncingSome = 0 ;
2009-01-15 10:17:11 -05:00
/* "dead" means something really bad happened like replication falling completely out of sync.
when non - null , we are dead and the string is informational
*/
2009-02-04 13:22:02 -05:00
const char * replAllDead = 0 ;
2008-12-01 14:55:36 -05:00
2009-02-02 11:15:24 -05:00
time_t lastForcedResync = 0 ;
2011-01-04 00:40:41 -05:00
2009-01-14 17:09:51 -05:00
} // namespace mongo
namespace mongo {
2009-01-15 10:17:11 -05:00
/* output by the web console */
const char * replInfo = " " ;
struct ReplInfo {
ReplInfo ( const char * msg ) {
replInfo = msg ;
}
~ ReplInfo ( ) {
replInfo = " ? " ;
}
} ;
2008-09-11 15:13:47 -04:00
2009-08-10 16:57:59 -04:00
/* operator requested resynchronization of replication (on the slave). { resync : 1 } */
2009-01-29 11:46:45 -05:00
class CmdResync : public Command {
public :
2010-04-23 15:50:49 -04:00
virtual bool slaveOk ( ) const {
2009-01-29 11:46:45 -05:00
return true ;
}
2010-04-23 16:41:56 -04:00
virtual bool adminOnly ( ) const {
2009-01-29 11:46:45 -05:00
return true ;
}
2010-08-16 09:00:58 -04:00
virtual bool logTheOp ( ) { return false ; }
2010-04-23 15:50:49 -04:00
virtual LockType locktype ( ) const { return WRITE ; }
2010-04-23 16:41:56 -04:00
void help ( stringstream & h ) const { h < < " resync (from scratch) an out of date replica slave. \n http://www.mongodb.org/display/DOCS/Master+Slave " ; }
2009-01-29 11:46:45 -05:00
CmdResync ( ) : Command ( " resync " ) { }
2011-07-18 15:23:37 -04:00
virtual bool run ( const string & , BSONObj & cmdObj , int , string & errmsg , BSONObjBuilder & result , bool fromRepl ) {
2011-01-04 00:40:41 -05:00
if ( cmdLine . usingReplSets ( ) ) {
2010-09-29 16:47:19 -04:00
errmsg = " resync command not currently supported with replica sets. See RS102 info in the mongodb documentations " ;
result . append ( " info " , " http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member " ) ;
return false ;
}
2009-04-22 10:52:32 -04:00
if ( cmdObj . getBoolField ( " force " ) ) {
if ( ! waitForSyncToFinish ( errmsg ) )
return false ;
replAllDead = " resync forced " ;
2011-01-04 00:40:41 -05:00
}
2009-02-04 13:22:02 -05:00
if ( ! replAllDead ) {
2009-01-29 11:46:45 -05:00
errmsg = " not dead, no need to resync " ;
return false ;
}
2009-04-22 10:52:32 -04:00
if ( ! waitForSyncToFinish ( errmsg ) )
return false ;
2011-01-04 00:40:41 -05:00
2009-04-22 10:52:32 -04:00
ReplSource : : forceResyncDead ( " client " ) ;
result . append ( " info " , " triggered resync for all sources " ) ;
2011-01-04 00:40:41 -05:00
return true ;
}
2009-04-22 10:52:32 -04:00
bool waitForSyncToFinish ( string & errmsg ) const {
2009-03-31 11:44:35 -04:00
// Wait for slave thread to finish syncing, so sources will be be
// reloaded with new saved state on next pass.
Timer t ;
while ( 1 ) {
2010-03-27 12:23:39 -04:00
if ( syncing = = 0 | | t . millis ( ) > 30000 )
2009-03-31 11:44:35 -04:00
break ;
{
dbtemprelease t ;
2011-01-04 00:40:41 -05:00
relinquishSyncingSome = 1 ;
2010-03-27 12:37:38 -04:00
sleepmillis ( 1 ) ;
2009-03-31 11:44:35 -04:00
}
}
if ( syncing ) {
errmsg = " timeout waiting for sync() to finish " ;
return false ;
}
2009-04-22 10:52:32 -04:00
return true ;
}
2009-01-29 11:46:45 -05:00
} cmdResync ;
2011-01-04 00:40:41 -05:00
bool anyReplEnabled ( ) {
2011-03-29 16:56:21 -07:00
return replSettings . slave | | replSettings . master | | theReplSet ;
2010-02-08 17:17:18 -05:00
}
2010-09-22 11:54:46 -04:00
bool replAuthenticate ( DBClientBase * conn ) ;
2011-01-04 00:40:41 -05:00
void appendReplicationInfo ( BSONObjBuilder & result , bool authed , int level ) {
2010-10-15 11:04:44 -04:00
if ( replSet ) {
2011-01-04 00:40:41 -05:00
if ( theReplSet = = 0 ) {
2010-10-15 11:04:44 -04:00
result . append ( " ismaster " , false ) ;
result . append ( " secondary " , false ) ;
2011-05-06 22:08:09 -04:00
result . append ( " info " , ReplSet : : startupStatusMsg . get ( ) ) ;
2010-10-24 10:36:38 -04:00
result . append ( " isreplicaset " , true ) ;
2010-10-15 11:04:44 -04:00
return ;
}
2011-01-04 00:40:41 -05:00
2010-10-15 11:04:44 -04:00
theReplSet - > fillIsMaster ( result ) ;
return ;
}
2011-01-04 00:40:41 -05:00
2010-02-08 17:17:18 -05:00
if ( replAllDead ) {
2010-04-21 17:40:24 -04:00
result . append ( " ismaster " , 0 ) ;
2010-03-08 13:40:24 -05:00
string s = string ( " dead: " ) + replAllDead ;
result . append ( " info " , s ) ;
2010-02-08 17:17:18 -05:00
}
else {
2010-08-04 16:02:49 -04:00
result . appendBool ( " ismaster " , _isMaster ( ) ) ;
2010-02-08 17:17:18 -05:00
}
2010-08-04 16:02:49 -04:00
2011-01-04 00:40:41 -05:00
if ( level & & replSet ) {
2010-08-04 16:02:49 -04:00
result . append ( " info " , " is replica set " ) ;
}
2011-01-04 00:40:41 -05:00
else if ( level ) {
2010-02-10 14:18:57 -05:00
BSONObjBuilder sources ( result . subarrayStart ( " sources " ) ) ;
2011-01-04 00:40:41 -05:00
2010-02-10 14:18:57 -05:00
readlock lk ( " local.sources " ) ;
2011-01-13 14:47:06 -05:00
Client : : Context ctx ( " local.sources " , dbpath , 0 , authed ) ;
2010-05-07 17:25:57 -04:00
shared_ptr < Cursor > c = findTableScan ( " local.sources " , BSONObj ( ) ) ;
2010-02-10 14:18:57 -05:00
int n = 0 ;
2011-01-04 00:40:41 -05:00
while ( c - > ok ( ) ) {
2010-02-11 14:35:55 -05:00
BSONObj s = c - > current ( ) ;
2011-01-04 00:40:41 -05:00
2010-02-11 14:35:55 -05:00
BSONObjBuilder bb ;
bb . append ( s [ " host " ] ) ;
string sourcename = s [ " source " ] . valuestr ( ) ;
if ( sourcename ! = " main " )
bb . append ( s [ " source " ] ) ;
2011-01-04 00:40:41 -05:00
2010-02-11 14:35:55 -05:00
{
BSONElement e = s [ " syncedTo " ] ;
BSONObjBuilder t ( bb . subobjStart ( " syncedTo " ) ) ;
t . appendDate ( " time " , e . timestampTime ( ) ) ;
t . append ( " inc " , e . timestampInc ( ) ) ;
t . done ( ) ;
}
2011-01-04 00:40:41 -05:00
if ( level > 1 ) {
2010-02-11 17:17:24 -05:00
dbtemprelease unlock ;
2010-10-29 13:53:10 -04:00
// note: there is no so-style timeout on this connection; perhaps we should have one.
2010-02-11 14:35:55 -05:00
ScopedDbConnection conn ( s [ " host " ] . valuestr ( ) ) ;
2010-08-16 13:59:08 -07:00
DBClientConnection * cliConn = dynamic_cast < DBClientConnection * > ( & conn . conn ( ) ) ;
if ( cliConn & & replAuthenticate ( cliConn ) ) {
BSONObj first = conn - > findOne ( ( string ) " local.oplog.$ " + sourcename , Query ( ) . sort ( BSON ( " $natural " < < 1 ) ) ) ;
BSONObj last = conn - > findOne ( ( string ) " local.oplog.$ " + sourcename , Query ( ) . sort ( BSON ( " $natural " < < - 1 ) ) ) ;
bb . appendDate ( " masterFirst " , first [ " ts " ] . timestampTime ( ) ) ;
bb . appendDate ( " masterLast " , last [ " ts " ] . timestampTime ( ) ) ;
double lag = ( double ) ( last [ " ts " ] . timestampTime ( ) - s [ " syncedTo " ] . timestampTime ( ) ) ;
bb . append ( " lagSeconds " , lag / 1000 ) ;
}
2010-02-11 14:35:55 -05:00
conn . done ( ) ;
}
sources . append ( BSONObjBuilder : : numStr ( n + + ) , bb . obj ( ) ) ;
2010-02-10 14:18:57 -05:00
c - > advance ( ) ;
}
2011-01-04 00:40:41 -05:00
2010-02-10 14:18:57 -05:00
sources . done ( ) ;
}
2010-02-08 17:17:18 -05:00
}
2010-04-26 20:58:41 -04:00
class CmdIsMaster : public Command {
2009-01-15 10:17:11 -05:00
public :
2009-01-24 16:05:12 -05:00
virtual bool requiresAuth ( ) { return false ; }
2010-04-23 15:50:49 -04:00
virtual bool slaveOk ( ) const {
2009-01-15 10:17:11 -05:00
return true ;
}
2010-04-23 15:50:49 -04:00
virtual void help ( stringstream & help ) const {
2010-04-26 20:58:41 -04:00
help < < " Check if this server is primary for a replica pair/set; also if it is --master or --slave in simple master/slave setups. \n " ;
2010-04-23 15:50:49 -04:00
help < < " { isMaster : 1 } " ;
}
virtual LockType locktype ( ) const { return NONE ; }
2010-04-26 21:16:57 -04:00
CmdIsMaster ( ) : Command ( " isMaster " , true , " ismaster " ) { }
2011-07-18 15:23:37 -04:00
virtual bool run ( const string & , BSONObj & cmdObj , int , string & errmsg , BSONObjBuilder & result , bool /*fromRepl*/ ) {
2011-01-04 00:40:41 -05:00
/* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not
authenticated .
we allow unauthenticated ismaster but we aren ' t as verbose informationally if
one is not authenticated for admin db to be safe .
*/
2010-10-15 11:04:44 -04:00
bool authed = cc ( ) . getAuthenticationInfo ( ) - > isAuthorizedReads ( " admin " ) ;
appendReplicationInfo ( result , authed ) ;
2010-04-21 17:40:24 -04:00
2010-10-11 11:26:41 -04:00
result . appendNumber ( " maxBsonObjectSize " , BSONObjMaxUserSize ) ;
2009-01-15 10:17:11 -05:00
return true ;
2008-11-20 18:03:41 -05:00
}
2009-01-15 10:17:11 -05:00
} cmdismaster ;
ReplSource : : ReplSource ( ) {
nClonedThisPass = 0 ;
2008-12-28 20:28:49 -05:00
}
2008-09-11 09:15:34 -04:00
2009-01-15 10:17:11 -05:00
ReplSource : : ReplSource ( BSONObj o ) : nClonedThisPass ( 0 ) {
only = o . getStringField ( " only " ) ;
hostName = o . getStringField ( " host " ) ;
_sourceName = o . getStringField ( " source " ) ;
2009-12-28 16:43:43 -05:00
uassert ( 10118 , " 'host' field not set in sources collection object " , ! hostName . empty ( ) ) ;
uassert ( 10119 , " only source='main' allowed for now with replication " , sourceName ( ) = = " main " ) ;
2009-01-15 10:17:11 -05:00
BSONElement e = o . getField ( " syncedTo " ) ;
if ( ! e . eoo ( ) ) {
2009-12-28 16:43:43 -05:00
uassert ( 10120 , " bad sources 'syncedTo' field value " , e . type ( ) = = Date | | e . type ( ) = = Timestamp ) ;
2009-01-15 10:17:11 -05:00
OpTime tmp ( e . date ( ) ) ;
syncedTo = tmp ;
}
2008-07-28 13:51:39 -04:00
2009-03-31 14:05:20 -04:00
BSONObj dbsObj = o . getObjectField ( " dbsNextPass " ) ;
if ( ! dbsObj . isEmpty ( ) ) {
BSONObjIterator i ( dbsObj ) ;
while ( 1 ) {
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
addDbNextPass . insert ( e . fieldName ( ) ) ;
}
2011-01-04 00:40:41 -05:00
}
2009-04-16 11:36:06 -04:00
dbsObj = o . getObjectField ( " incompleteCloneDbs " ) ;
if ( ! dbsObj . isEmpty ( ) ) {
BSONObjIterator i ( dbsObj ) ;
while ( 1 ) {
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
incompleteCloneDbs . insert ( e . fieldName ( ) ) ;
}
2011-01-04 00:40:41 -05:00
}
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
/* Turn our C++ Source object into a BSONObj */
BSONObj ReplSource : : jsobj ( ) {
BSONObjBuilder b ;
b . append ( " host " , hostName ) ;
b . append ( " source " , sourceName ( ) ) ;
if ( ! only . empty ( ) )
b . append ( " only " , only ) ;
if ( ! syncedTo . isNull ( ) )
2009-10-09 13:10:04 -04:00
b . appendTimestamp ( " syncedTo " , syncedTo . asDate ( ) ) ;
2009-01-15 10:17:11 -05:00
2009-03-31 14:05:20 -04:00
BSONObjBuilder dbsNextPassBuilder ;
2009-04-22 13:53:35 -04:00
int n = 0 ;
2009-03-31 14:05:20 -04:00
for ( set < string > : : iterator i = addDbNextPass . begin ( ) ; i ! = addDbNextPass . end ( ) ; i + + ) {
n + + ;
2010-07-20 12:39:35 -04:00
dbsNextPassBuilder . appendBool ( * i , 1 ) ;
2009-03-31 14:05:20 -04:00
}
if ( n )
b . append ( " dbsNextPass " , dbsNextPassBuilder . done ( ) ) ;
2009-01-15 10:17:11 -05:00
2009-04-16 11:36:06 -04:00
BSONObjBuilder incompleteCloneDbsBuilder ;
n = 0 ;
for ( set < string > : : iterator i = incompleteCloneDbs . begin ( ) ; i ! = incompleteCloneDbs . end ( ) ; i + + ) {
n + + ;
2010-07-20 12:39:35 -04:00
incompleteCloneDbsBuilder . appendBool ( * i , 1 ) ;
2009-04-16 11:36:06 -04:00
}
if ( n )
b . append ( " incompleteCloneDbs " , incompleteCloneDbsBuilder . done ( ) ) ;
2009-02-09 13:04:32 -05:00
return b . obj ( ) ;
2008-12-28 20:28:49 -05:00
}
2008-08-02 14:58:15 -04:00
2009-01-15 10:17:11 -05:00
void ReplSource : : save ( ) {
BSONObjBuilder b ;
assert ( ! hostName . empty ( ) ) ;
b . append ( " host " , hostName ) ;
// todo: finish allowing multiple source configs.
// this line doesn't work right when source is null, if that is allowed as it is now:
//b.append("source", _sourceName);
BSONObj pattern = b . done ( ) ;
2008-08-02 14:58:15 -04:00
2009-01-15 10:17:11 -05:00
BSONObj o = jsobj ( ) ;
2010-07-17 16:07:38 -04:00
log ( 1 ) < < " Saving repl source: " < < o < < endl ;
2008-11-10 11:20:30 -05:00
2010-01-29 17:22:34 -05:00
{
OpDebug debug ;
Client : : Context ctx ( " local.sources " ) ;
UpdateResult res = updateObjects ( " local.sources " , o , pattern , true /*upsert for pair feature*/ , false , false , debug ) ;
assert ( ! res . mod ) ;
assert ( res . num = = 1 ) ;
}
2008-12-28 20:28:49 -05:00
}
2011-01-31 15:58:25 -08:00
static void addSourceToList ( ReplSource : : SourceVector & v , ReplSource & s , ReplSource : : SourceVector & old ) {
2009-04-22 11:57:45 -04:00
if ( ! s . syncedTo . isNull ( ) ) { // Don't reuse old ReplSource if there was a forced resync.
2009-04-01 16:00:56 -04:00
for ( ReplSource : : SourceVector : : iterator i = old . begin ( ) ; i ! = old . end ( ) ; ) {
2009-03-31 11:44:35 -04:00
if ( s = = * * i ) {
v . push_back ( * i ) ;
old . erase ( i ) ;
return ;
}
i + + ;
}
}
2009-04-16 11:36:06 -04:00
2009-04-01 16:00:56 -04:00
v . push_back ( shared_ptr < ReplSource > ( new ReplSource ( s ) ) ) ;
2009-01-15 10:17:11 -05:00
}
/* we reuse our existing objects so that we can keep our existing connection
and cursor in effect .
*/
2009-04-01 16:00:56 -04:00
void ReplSource : : loadAll ( SourceVector & v ) {
2010-01-29 17:22:34 -05:00
Client : : Context ctx ( " local.sources " ) ;
2009-04-01 16:00:56 -04:00
SourceVector old = v ;
v . clear ( ) ;
2009-01-15 10:17:11 -05:00
2009-08-25 14:35:22 -04:00
if ( ! cmdLine . source . empty ( ) ) {
2009-01-15 10:17:11 -05:00
// --source <host> specified.
// check that no items are in sources other than that
// add if missing
2010-05-07 17:25:57 -04:00
shared_ptr < Cursor > c = findTableScan ( " local.sources " , BSONObj ( ) ) ;
2009-01-15 10:17:11 -05:00
int n = 0 ;
while ( c - > ok ( ) ) {
n + + ;
ReplSource tmp ( c - > current ( ) ) ;
2009-08-25 14:35:22 -04:00
if ( tmp . hostName ! = cmdLine . source ) {
2010-02-12 16:05:05 -05:00
log ( ) < < " repl: --source " < < cmdLine . source < < " != " < < tmp . hostName < < " from local.sources collection " < < endl ;
2010-10-22 18:10:52 -04:00
log ( ) < < " repl: for instructions on changing this slave's source, see: " < < endl ;
log ( ) < < " http://dochub.mongodb.org/core/masterslave " < < endl ;
2010-02-12 16:05:05 -05:00
log ( ) < < " repl: terminating mongod after 30 seconds " < < endl ;
2009-02-12 15:03:38 -05:00
sleepsecs ( 30 ) ;
2009-08-05 16:00:27 -04:00
dbexit ( EXIT_REPLICATION_ERROR ) ;
2009-02-12 15:03:38 -05:00
}
2009-08-25 14:35:22 -04:00
if ( tmp . only ! = cmdLine . only ) {
2009-12-28 16:43:43 -05:00
log ( ) < < " --only " < < cmdLine . only < < " != " < < tmp . only < < " from local.sources collection " < < endl ;
2009-01-15 10:17:11 -05:00
log ( ) < < " terminating after 30 seconds " < < endl ;
sleepsecs ( 30 ) ;
2009-08-05 16:00:27 -04:00
dbexit ( EXIT_REPLICATION_ERROR ) ;
2009-01-15 10:17:11 -05:00
}
c - > advance ( ) ;
}
2009-12-28 16:43:43 -05:00
uassert ( 10002 , " local.sources collection corrupt? " , n < 2 ) ;
2009-01-15 10:17:11 -05:00
if ( n = = 0 ) {
// source missing. add.
ReplSource s ;
2009-08-25 14:35:22 -04:00
s . hostName = cmdLine . source ;
s . only = cmdLine . only ;
2009-01-15 10:17:11 -05:00
s . save ( ) ;
}
}
2009-03-24 11:46:55 -04:00
else {
try {
2009-12-28 16:43:43 -05:00
massert ( 10384 , " --only requires use of --source " , cmdLine . only . empty ( ) ) ;
2011-01-04 00:40:41 -05:00
}
catch ( . . . ) {
2009-08-07 15:37:50 -04:00
dbexit ( EXIT_BADOPTIONS ) ;
2009-03-24 11:46:55 -04:00
}
2009-02-12 15:03:38 -05:00
}
2011-01-04 00:40:41 -05:00
2010-05-07 17:25:57 -04:00
shared_ptr < Cursor > c = findTableScan ( " local.sources " , BSONObj ( ) ) ;
2008-12-28 20:28:49 -05:00
while ( c - > ok ( ) ) {
2008-11-10 17:45:39 -05:00
ReplSource tmp ( c - > current ( ) ) ;
2011-03-29 16:56:21 -07:00
if ( tmp . syncedTo . isNull ( ) ) {
2010-02-16 11:30:50 -08:00
DBDirectClient c ;
if ( c . exists ( " local.oplog.$main " ) ) {
2010-03-29 16:35:06 -07:00
BSONObj op = c . findOne ( " local.oplog.$main " , QUERY ( " op " < < NE < < " n " ) . sort ( BSON ( " $natural " < < - 1 ) ) ) ;
2010-02-16 11:30:50 -08:00
if ( ! op . isEmpty ( ) ) {
tmp . syncedTo = op [ " ts " ] . date ( ) ;
}
}
2008-11-10 17:45:39 -05:00
}
2011-01-31 15:58:25 -08:00
addSourceToList ( v , tmp , old ) ;
2008-11-10 17:45:39 -05:00
c - > advance ( ) ;
}
}
2009-01-15 10:17:11 -05:00
BSONObj opTimeQuery = fromjson ( " { \" getoptime \" :1} " ) ;
2009-02-02 11:15:24 -05:00
bool ReplSource : : throttledForceResyncDead ( const char * requester ) {
if ( time ( 0 ) - lastForcedResync > 600 ) {
forceResyncDead ( requester ) ;
lastForcedResync = time ( 0 ) ;
return true ;
}
return false ;
}
2011-01-04 00:40:41 -05:00
2009-02-02 11:15:24 -05:00
void ReplSource : : forceResyncDead ( const char * requester ) {
2009-02-04 13:22:02 -05:00
if ( ! replAllDead )
2009-02-02 11:15:24 -05:00
return ;
2009-04-01 16:00:56 -04:00
SourceVector sources ;
2009-02-02 11:15:24 -05:00
ReplSource : : loadAll ( sources ) ;
2009-04-01 16:00:56 -04:00
for ( SourceVector : : iterator i = sources . begin ( ) ; i ! = sources . end ( ) ; + + i ) {
2011-05-03 11:29:30 -04:00
log ( ) < < requester < < " forcing resync from " < < ( * i ) - > hostName < < endl ;
2009-02-02 11:15:24 -05:00
( * i ) - > forceResync ( requester ) ;
}
2011-01-04 00:40:41 -05:00
replAllDead = 0 ;
2009-02-02 11:15:24 -05:00
}
2011-01-04 00:40:41 -05:00
2009-02-02 11:15:24 -05:00
void ReplSource : : forceResync ( const char * requester ) {
2009-03-18 13:45:32 -04:00
BSONObj info ;
{
dbtemprelease t ;
2011-05-03 11:29:30 -04:00
if ( ! oplogReader . connect ( hostName ) ) {
msgassertedNoTrace ( 14051 , " unable to connect to resync " ) ;
}
2010-07-19 09:29:49 -04:00
/* todo use getDatabaseNames() method here */
2010-07-05 12:29:42 -04:00
bool ok = oplogReader . conn ( ) - > runCommand ( " admin " , BSON ( " listDatabases " < < 1 ) , info ) ;
2009-12-28 16:43:43 -05:00
massert ( 10385 , " Unable to get database list " , ok ) ;
2009-03-18 13:45:32 -04:00
}
BSONObjIterator i ( info . getField ( " databases " ) . embeddedObject ( ) ) ;
2009-06-09 11:43:04 -04:00
while ( i . moreWithEOO ( ) ) {
2009-03-18 13:45:32 -04:00
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
string name = e . embeddedObject ( ) . getField ( " name " ) . valuestr ( ) ;
2009-05-01 12:18:17 -04:00
if ( ! e . embeddedObject ( ) . getBoolField ( " empty " ) ) {
if ( name ! = " local " ) {
if ( only . empty ( ) | | only = = name ) {
resyncDrop ( name . c_str ( ) , requester ) ;
}
2009-03-18 13:45:32 -04:00
}
}
2011-01-04 00:40:41 -05:00
}
2009-01-29 11:46:45 -05:00
syncedTo = OpTime ( ) ;
2009-04-22 19:10:23 -04:00
addDbNextPass . clear ( ) ;
2009-03-31 10:23:05 -04:00
save ( ) ;
2009-01-29 11:46:45 -05:00
}
2009-03-18 13:45:32 -04:00
string ReplSource : : resyncDrop ( const char * db , const char * requester ) {
log ( ) < < " resync: dropping database " < < db < < endl ;
2010-05-03 17:04:55 -04:00
Client : : Context ctx ( db ) ;
dropDatabase ( db ) ;
return db ;
2009-03-18 13:45:32 -04:00
}
2011-01-04 00:40:41 -05:00
2009-08-10 16:57:59 -04:00
/* grab initial copy of a database from the master */
2011-04-12 21:53:55 -07:00
void ReplSource : : resync ( string db ) {
2009-03-18 13:45:32 -04:00
string dummyNs = resyncDrop ( db . c_str ( ) , " internal " ) ;
2010-01-29 17:22:34 -05:00
Client : : Context ctx ( dummyNs ) ;
2009-01-15 10:17:11 -05:00
{
2010-02-25 13:52:58 -05:00
log ( ) < < " resync: cloning database " < < db < < " to get an initial copy " < < endl ;
2009-01-15 10:17:11 -05:00
ReplInfo r ( " resync: cloning a database " ) ;
string errmsg ;
2011-04-12 21:53:55 -07:00
int errCode = 0 ;
2011-09-12 12:21:22 -04:00
bool ok = cloneFrom ( hostName . c_str ( ) , errmsg , cc ( ) . database ( ) - > name , false , /*slaveOk*/ true , /*replauth*/ true , /*snapshot*/ false , /*mayYield*/ true , /*mayBeInterrupted*/ false , & errCode ) ;
2009-01-15 10:17:11 -05:00
if ( ! ok ) {
2011-04-12 21:53:55 -07:00
if ( errCode = = DatabaseDifferCaseCode ) {
resyncDrop ( db . c_str ( ) , " internal " ) ;
log ( ) < < " resync: database " < < db < < " not valid on the master due to a name conflict, dropping. " < < endl ;
return ;
}
else {
problem ( ) < < " resync of " < < db < < " from " < < hostName < < " failed " < < errmsg < < endl ;
throw SyncException ( ) ;
}
2009-01-15 10:17:11 -05:00
}
}
2008-08-02 14:58:15 -04:00
2010-02-25 13:52:58 -05:00
log ( ) < < " resync: done with initial clone for db: " < < db < < endl ;
2008-08-02 14:58:15 -04:00
2011-04-12 21:53:55 -07:00
return ;
}
DatabaseIgnorer ___databaseIgnorer ;
void DatabaseIgnorer : : doIgnoreUntilAfter ( const string & db , const OpTime & futureOplogTime ) {
if ( futureOplogTime > _ignores [ db ] ) {
_ignores [ db ] = futureOplogTime ;
}
}
bool DatabaseIgnorer : : ignoreAt ( const string & db , const OpTime & currentOplogTime ) {
if ( _ignores [ db ] . isNull ( ) ) {
return false ;
}
if ( _ignores [ db ] > = currentOplogTime ) {
return true ;
} else {
// The ignore state has expired, so clear it.
_ignores . erase ( db ) ;
return false ;
}
}
bool ReplSource : : handleDuplicateDbName ( const BSONObj & op , const char * ns , const char * db ) {
if ( dbHolder . isLoaded ( ns , dbpath ) ) {
// Database is already present.
return true ;
}
2011-08-23 13:05:44 -07:00
BSONElement ts = op . getField ( " ts " ) ;
if ( ( ts . type ( ) = = Date | | ts . type ( ) = = Timestamp ) & & ___databaseIgnorer . ignoreAt ( db , ts . date ( ) ) ) {
2011-08-23 13:03:56 -07:00
// Database is ignored due to a previous indication that it is
// missing from master after optime "ts".
return false ;
2011-04-12 21:53:55 -07:00
}
if ( Database : : duplicateUncasedName ( db , dbpath ) . empty ( ) ) {
// No duplicate database names are present.
return true ;
}
OpTime lastTime ;
bool dbOk = false ;
{
dbtemprelease release ;
// We always log an operation after executing it (never before), so
// a database list will always be valid as of an oplog entry generated
// before it was retrieved.
BSONObj last = oplogReader . findOne ( this - > ns ( ) . c_str ( ) , Query ( ) . sort ( BSON ( " $natural " < < - 1 ) ) ) ;
if ( ! last . isEmpty ( ) ) {
BSONElement ts = last . getField ( " ts " ) ;
massert ( 14032 , " Invalid 'ts' in remote log " , ts . type ( ) = = Date | | ts . type ( ) = = Timestamp ) ;
lastTime = OpTime ( ts . date ( ) ) ;
}
BSONObj info ;
bool ok = oplogReader . conn ( ) - > runCommand ( " admin " , BSON ( " listDatabases " < < 1 ) , info ) ;
2011-04-12 22:25:05 -07:00
massert ( 14033 , " Unable to get database list " , ok ) ;
2011-04-12 21:53:55 -07:00
BSONObjIterator i ( info . getField ( " databases " ) . embeddedObject ( ) ) ;
while ( i . more ( ) ) {
BSONElement e = i . next ( ) ;
const char * name = e . embeddedObject ( ) . getField ( " name " ) . valuestr ( ) ;
if ( strcasecmp ( name , db ) ! = 0 )
continue ;
if ( strcmp ( name , db ) = = 0 ) {
// The db exists on master, still need to check that no conflicts exist there.
dbOk = true ;
continue ;
}
// The master has a db name that conflicts with the requested name.
dbOk = false ;
break ;
}
}
if ( ! dbOk ) {
___databaseIgnorer . doIgnoreUntilAfter ( db , lastTime ) ;
incompleteCloneDbs . erase ( db ) ;
addDbNextPass . erase ( db ) ;
return false ;
}
// Check for duplicates again, since we released the lock above.
set < string > duplicates ;
Database : : duplicateUncasedName ( db , dbpath , & duplicates ) ;
// The database is present on the master and no conflicting databases
// are present on the master. Drop any local conflicts.
for ( set < string > : : const_iterator i = duplicates . begin ( ) ; i ! = duplicates . end ( ) ; + + i ) {
___databaseIgnorer . doIgnoreUntilAfter ( * i , lastTime ) ;
incompleteCloneDbs . erase ( * i ) ;
addDbNextPass . erase ( * i ) ;
Client : : Context ctx ( * i ) ;
dropDatabase ( * i ) ;
}
2011-04-12 22:25:05 -07:00
massert ( 14034 , " Duplicate database names present after attempting to delete duplicates " ,
2011-04-12 21:53:55 -07:00
Database : : duplicateUncasedName ( db , dbpath ) . empty ( ) ) ;
2009-01-15 10:17:11 -05:00
return true ;
2008-12-28 20:28:49 -05:00
}
2009-01-23 10:17:29 -05:00
void ReplSource : : applyOperation ( const BSONObj & op ) {
try {
2010-06-29 14:49:06 -04:00
applyOperation_inlock ( op ) ;
2009-01-23 10:17:29 -05:00
}
2009-02-06 16:56:14 -05:00
catch ( UserException & e ) {
2010-07-17 16:07:38 -04:00
log ( ) < < " sync: caught user assertion " < < e < < " while applying op: " < < op < < endl ; ;
2009-05-05 12:52:53 -04:00
}
catch ( DBException & e ) {
2011-01-04 00:40:41 -05:00
log ( ) < < " sync: caught db exception " < < e < < " while applying op: " < < op < < endl ; ;
2009-05-05 12:52:53 -04:00
}
2010-06-29 14:49:06 -04:00
2009-01-23 10:17:29 -05:00
}
2010-06-30 15:56:59 -04:00
2009-01-15 10:17:11 -05:00
/* local.$oplog.main is of the form:
{ ts : . . . , op : < optype > , ns : . . . , o : < obj > , o2 : < extraobj > , b : < boolflag > }
. . .
see logOp ( ) comments .
2010-11-29 10:21:18 -05:00
@ param alreadyLocked caller already put us in write lock if true
2009-01-15 10:17:11 -05:00
*/
2011-03-29 16:56:21 -07:00
void ReplSource : : sync_pullOpLog_applyOperation ( BSONObj & op , bool alreadyLocked ) {
2010-08-12 16:13:07 -04:00
if ( logLevel > = 6 ) // op.tostring is expensive so doing this check explicitly
log ( 6 ) < < " processing op: " < < op < < endl ;
2010-08-14 15:32:38 -04:00
2010-08-16 15:32:14 -04:00
if ( op . getStringField ( " op " ) [ 0 ] = = ' n ' )
return ;
2010-09-07 12:18:44 -04:00
char clientName [ MaxDatabaseNameLen ] ;
2009-01-15 10:17:11 -05:00
const char * ns = op . getStringField ( " ns " ) ;
2009-12-31 16:31:07 -05:00
nsToDatabase ( ns , clientName ) ;
2009-01-15 10:17:11 -05:00
if ( * ns = = ' . ' ) {
problem ( ) < < " skipping bad op in oplog: " < < op . toString ( ) < < endl ;
return ;
}
else if ( * ns = = 0 ) {
2010-08-16 15:32:14 -04:00
/*if( op.getStringField("op")[0] != 'n' )*/ {
2010-08-14 15:32:38 -04:00
problem ( ) < < " halting replication, bad op in oplog: \n " < < op . toString ( ) < < endl ;
replAllDead = " bad object in oplog " ;
throw SyncException ( ) ;
}
2010-08-16 15:32:14 -04:00
//ns = "local.system.x";
//nsToDatabase(ns, clientName);
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
if ( ! only . empty ( ) & & only ! = clientName )
return ;
2008-12-24 11:11:10 -05:00
2010-11-29 10:21:18 -05:00
if ( cmdLine . pretouch & & ! alreadyLocked /*doesn't make sense if in write lock already*/ ) {
2010-07-06 17:49:20 -04:00
if ( cmdLine . pretouch > 1 ) {
/* note: this is bad - should be put in ReplSource. but this is first test... */
static int countdown ;
2010-11-12 09:20:09 -05:00
assert ( countdown > = 0 ) ;
2010-07-06 17:49:20 -04:00
if ( countdown > 0 ) {
countdown - - ; // was pretouched on a prev pass
2011-01-04 00:40:41 -05:00
}
else {
2010-07-12 12:37:27 -04:00
const int m = 4 ;
2010-07-06 17:49:20 -04:00
if ( tp . get ( ) = = 0 ) {
2010-07-12 12:37:27 -04:00
int nthr = min ( 8 , cmdLine . pretouch ) ;
2010-07-06 17:49:20 -04:00
nthr = max ( nthr , 1 ) ;
tp . reset ( new ThreadPool ( nthr ) ) ;
}
vector < BSONObj > v ;
oplogReader . peek ( v , cmdLine . pretouch ) ;
unsigned a = 0 ;
while ( 1 ) {
if ( a > = v . size ( ) ) break ;
unsigned b = a + m - 1 ; // v[a..b]
if ( b > = v . size ( ) ) b = v . size ( ) - 1 ;
tp - > schedule ( pretouchN , v , a , b ) ;
DEV cout < < " pretouch task: " < < a < < " .. " < < b < < endl ;
a + = m ;
}
// we do one too...
pretouchOperation ( op ) ;
tp - > join ( ) ;
countdown = v . size ( ) ;
}
}
else {
pretouchOperation ( op ) ;
}
2010-06-30 15:56:59 -04:00
}
2010-11-29 10:57:16 -05:00
scoped_ptr < writelock > lk ( alreadyLocked ? 0 : new writelock ( ) ) ;
2009-04-17 13:21:50 -04:00
2009-04-22 13:53:35 -04:00
if ( replAllDead ) {
// hmmm why is this check here and not at top of this function? does it get set between top and here?
2009-12-22 10:00:47 -05:00
log ( ) < < " replAllDead, throwing SyncException: " < < replAllDead < < endl ;
2009-04-22 13:53:35 -04:00
throw SyncException ( ) ;
}
2011-01-04 00:40:41 -05:00
2011-04-12 21:53:55 -07:00
if ( ! handleDuplicateDbName ( op , ns , clientName ) ) {
return ;
}
2010-01-29 17:22:34 -05:00
Client : : Context ctx ( ns ) ;
2010-03-26 21:38:39 -04:00
ctx . getClient ( ) - > curop ( ) - > reset ( ) ;
2008-08-25 16:46:39 -04:00
2010-01-29 17:22:34 -05:00
bool empty = ctx . db ( ) - > isEmpty ( ) ;
2009-04-16 11:36:06 -04:00
bool incompleteClone = incompleteCloneDbs . count ( clientName ) ! = 0 ;
2009-04-22 13:53:35 -04:00
2010-07-19 22:32:43 -04:00
if ( logLevel > = 6 )
log ( 6 ) < < " ns: " < < ns < < " , justCreated: " < < ctx . justCreated ( ) < < " , empty: " < < empty < < " , incompleteClone: " < < incompleteClone < < endl ;
2011-01-04 00:40:41 -05:00
2010-01-29 17:22:34 -05:00
// always apply admin command command
// this is a bit hacky -- the semantics of replication/commands aren't well specified
if ( strcmp ( clientName , " admin " ) = = 0 & & * op . getStringField ( " op " ) = = ' c ' ) {
applyOperation ( op ) ;
return ;
}
2011-01-04 00:40:41 -05:00
2010-01-29 17:22:34 -05:00
if ( ctx . justCreated ( ) | | empty | | incompleteClone ) {
2009-04-22 19:10:23 -04:00
// we must add to incomplete list now that setClient has been called
incompleteCloneDbs . insert ( clientName ) ;
2009-04-22 13:53:35 -04:00
if ( nClonedThisPass ) {
/* we only clone one database per pass, even if a lot need done. This helps us
avoid overflowing the master ' s transaction log by doing too much work before going
back to read more transactions . ( Imagine a scenario of slave startup where we try to
clone 100 databases in one pass . )
*/
addDbNextPass . insert ( clientName ) ;
2011-01-04 00:40:41 -05:00
}
else {
2009-04-29 17:28:14 -04:00
if ( incompleteClone ) {
log ( ) < < " An earlier initial clone of ' " < < clientName < < " ' did not complete, now resyncing. " < < endl ;
}
2009-04-22 13:53:35 -04:00
save ( ) ;
2010-01-29 17:22:34 -05:00
Client : : Context ctx ( ns ) ;
2009-04-22 13:53:35 -04:00
nClonedThisPass + + ;
2010-01-29 17:22:34 -05:00
resync ( ctx . db ( ) - > name ) ;
2009-04-22 13:53:35 -04:00
addDbNextPass . erase ( clientName ) ;
incompleteCloneDbs . erase ( clientName ) ;
}
2009-04-16 11:36:06 -04:00
save ( ) ;
2011-01-04 00:40:41 -05:00
}
else {
2011-03-29 16:56:21 -07:00
applyOperation ( op ) ;
2009-03-31 14:05:20 -04:00
addDbNextPass . erase ( clientName ) ;
2009-03-17 17:20:08 -04:00
}
2009-01-15 10:17:11 -05:00
}
2009-04-22 17:44:23 -04:00
void ReplSource : : syncToTailOfRemoteLog ( ) {
string _ns = ns ( ) ;
2010-02-25 23:06:37 -08:00
BSONObjBuilder b ;
if ( ! only . empty ( ) ) {
b . appendRegex ( " ns " , string ( " ^ " ) + only ) ;
2011-01-04 00:40:41 -05:00
}
2010-07-05 12:29:42 -04:00
BSONObj last = oplogReader . findOne ( _ns . c_str ( ) , Query ( b . done ( ) ) . sort ( BSON ( " $natural " < < - 1 ) ) ) ;
2009-04-22 17:44:23 -04:00
if ( ! last . isEmpty ( ) ) {
2010-01-28 13:41:51 -05:00
BSONElement ts = last . getField ( " ts " ) ;
2010-05-14 13:33:16 -04:00
massert ( 10386 , " non Date ts found: " + last . toString ( ) , ts . type ( ) = = Date | | ts . type ( ) = = Timestamp ) ;
2009-04-22 17:44:23 -04:00
syncedTo = OpTime ( ts . date ( ) ) ;
2011-01-04 00:40:41 -05:00
}
2009-04-22 17:44:23 -04:00
}
2011-01-04 00:40:41 -05:00
2010-11-29 10:45:47 -05:00
extern unsigned replApplyBatchSize ;
2010-11-29 10:21:18 -05:00
2010-01-04 11:15:09 -05:00
/* slave: pull some data from the master's oplog
2011-01-04 00:40:41 -05:00
note : not yet in db mutex at this point .
2010-03-26 14:37:31 -04:00
@ return - 1 error
0 ok , don ' t sleep
1 ok , sleep
2010-01-04 11:15:09 -05:00
*/
2010-03-26 14:37:31 -04:00
int ReplSource : : sync_pullOpLog ( int & nApplied ) {
int okResultCode = 1 ;
2009-01-15 10:17:11 -05:00
string ns = string ( " local.oplog.$ " ) + sourceName ( ) ;
2009-01-23 18:24:15 -05:00
log ( 2 ) < < " repl: sync_pullOpLog " < < ns < < " syncedTo: " < < syncedTo . toStringLong ( ) < < ' \n ' ;
2009-01-15 10:17:11 -05:00
bool tailing = true ;
2010-07-20 13:37:09 -04:00
oplogReader . tailCheck ( ) ;
2009-01-15 10:17:11 -05:00
2009-03-30 16:28:52 -04:00
bool initial = syncedTo . isNull ( ) ;
2011-01-04 00:40:41 -05:00
2010-07-05 12:29:42 -04:00
if ( ! oplogReader . haveCursor ( ) | | initial ) {
2009-04-22 19:10:23 -04:00
if ( initial ) {
2009-03-31 11:44:35 -04:00
// Important to grab last oplog timestamp before listing databases.
2009-04-22 17:44:23 -04:00
syncToTailOfRemoteLog ( ) ;
2009-04-16 11:36:06 -04:00
BSONObj info ;
2010-07-05 12:29:42 -04:00
bool ok = oplogReader . conn ( ) - > runCommand ( " admin " , BSON ( " listDatabases " < < 1 ) , info ) ;
2009-12-28 16:43:43 -05:00
massert ( 10389 , " Unable to get database list " , ok ) ;
2009-04-16 11:36:06 -04:00
BSONObjIterator i ( info . getField ( " databases " ) . embeddedObject ( ) ) ;
2009-06-09 11:43:04 -04:00
while ( i . moreWithEOO ( ) ) {
2009-04-16 11:36:06 -04:00
BSONElement e = i . next ( ) ;
if ( e . eoo ( ) )
break ;
string name = e . embeddedObject ( ) . getField ( " name " ) . valuestr ( ) ;
2009-05-01 12:18:17 -04:00
if ( ! e . embeddedObject ( ) . getBoolField ( " empty " ) ) {
if ( name ! = " local " ) {
if ( only . empty ( ) | | only = = name ) {
log ( 2 ) < < " adding to 'addDbNextPass': " < < name < < endl ;
addDbNextPass . insert ( name ) ;
}
2009-03-17 13:18:54 -04:00
}
}
}
2009-04-22 16:25:53 -04:00
dblock lk ;
save ( ) ;
2009-03-17 13:18:54 -04:00
}
2011-01-04 00:40:41 -05:00
2009-01-15 10:17:11 -05:00
BSONObjBuilder q ;
q . appendDate ( " $gte " , syncedTo . asDate ( ) ) ;
BSONObjBuilder query ;
query . append ( " ts " , q . done ( ) ) ;
2009-02-12 15:03:38 -05:00
if ( ! only . empty ( ) ) {
2011-01-04 00:40:41 -05:00
// note we may here skip a LOT of data table scanning, a lot of work for the master.
2010-07-05 12:29:42 -04:00
query . appendRegex ( " ns " , string ( " ^ " ) + only ) ; // maybe append "\\." here?
2009-02-12 15:03:38 -05:00
}
2009-01-15 10:17:11 -05:00
BSONObj queryObj = query . done ( ) ;
2010-07-05 12:29:42 -04:00
// e.g. queryObj = { ts: { $gte: syncedTo } }
oplogReader . tailingQuery ( ns . c_str ( ) , queryObj ) ;
2009-01-15 10:17:11 -05:00
tailing = false ;
2008-12-28 20:28:49 -05:00
}
else {
2009-01-23 18:24:15 -05:00
log ( 2 ) < < " repl: tailing=true \n " ;
2008-12-28 20:28:49 -05:00
}
2008-08-19 14:39:44 -04:00
2010-07-05 12:29:42 -04:00
if ( ! oplogReader . haveCursor ( ) ) {
problem ( ) < < " repl: dbclient::query returns null (conn closed?) " < < endl ;
oplogReader . resetConnection ( ) ;
2010-03-26 14:37:31 -04:00
return - 1 ;
2009-01-15 10:17:11 -05:00
}
2008-09-05 10:40:00 -04:00
2009-01-15 10:17:11 -05:00
// show any deferred database creates from a previous pass
{
set < string > : : iterator i = addDbNextPass . begin ( ) ;
if ( i ! = addDbNextPass . end ( ) ) {
BSONObjBuilder b ;
b . append ( " ns " , * i + ' . ' ) ;
b . append ( " op " , " db " ) ;
BSONObj op = b . done ( ) ;
2011-03-29 16:56:21 -07:00
sync_pullOpLog_applyOperation ( op , false ) ;
2009-01-15 10:17:11 -05:00
}
2008-12-28 20:28:49 -05:00
}
2010-07-05 12:29:42 -04:00
if ( ! oplogReader . more ( ) ) {
2009-01-15 10:17:11 -05:00
if ( tailing ) {
2009-01-23 18:24:15 -05:00
log ( 2 ) < < " repl: tailing & no new activity \n " ;
2010-07-05 12:29:42 -04:00
if ( oplogReader . awaitCapable ( ) )
2010-03-26 14:37:31 -04:00
okResultCode = 0 ; // don't sleep
2011-01-04 00:40:41 -05:00
}
else {
2009-09-22 10:10:02 -04:00
log ( ) < < " repl: " < < ns < < " oplog is empty \n " ;
2009-04-23 14:44:05 -04:00
}
{
dblock lk ;
2011-01-04 00:40:41 -05:00
save ( ) ;
2009-04-23 14:44:05 -04:00
}
2010-03-26 14:37:31 -04:00
return okResultCode ;
2009-01-15 10:17:11 -05:00
}
2011-01-04 00:40:41 -05:00
2010-03-01 13:55:31 -08:00
OpTime nextOpTime ;
{
2010-07-05 12:29:42 -04:00
BSONObj op = oplogReader . next ( ) ;
2010-03-01 13:55:31 -08:00
BSONElement ts = op . getField ( " ts " ) ;
if ( ts . type ( ) ! = Date & & ts . type ( ) ! = Timestamp ) {
string err = op . getStringField ( " $err " ) ;
if ( ! err . empty ( ) ) {
2010-07-12 17:39:01 -04:00
// 13051 is "tailable cursor requested on non capped collection"
if ( op . getIntField ( " code " ) = = 13051 ) {
problem ( ) < < " trying to slave off of a non-master " < < ' \n ' ;
massert ( 13344 , " trying to slave off of a non-master " , false ) ;
}
else {
problem ( ) < < " repl: $err reading remote oplog: " + err < < ' \n ' ;
massert ( 10390 , " got $err reading remote oplog " , false ) ;
}
2010-03-01 13:55:31 -08:00
}
else {
problem ( ) < < " repl: bad object read from remote oplog: " < < op . toString ( ) < < ' \n ' ;
massert ( 10391 , " repl: bad object read from remote oplog " , false ) ;
}
2009-04-22 10:52:32 -04:00
}
2011-01-04 00:40:41 -05:00
2010-03-01 13:55:31 -08:00
nextOpTime = OpTime ( ts . date ( ) ) ;
log ( 2 ) < < " repl: first op time received: " < < nextOpTime . toString ( ) < < ' \n ' ;
2010-08-30 13:43:35 -07:00
if ( initial ) {
log ( 1 ) < < " repl: initial run \n " ;
}
if ( tailing ) {
2011-01-04 00:40:41 -05:00
if ( ! ( syncedTo < nextOpTime ) ) {
2010-08-30 13:43:35 -07:00
log ( ) < < " repl ASSERTION failed : syncedTo < nextOpTime " < < endl ;
log ( ) < < " repl syncTo: " < < syncedTo . toStringLong ( ) < < endl ;
log ( ) < < " repl nextOpTime: " < < nextOpTime . toStringLong ( ) < < endl ;
assert ( false ) ;
2010-08-30 00:16:39 -04:00
}
2010-07-05 12:29:42 -04:00
oplogReader . putBack ( op ) ; // op will be processed in the loop below
2010-03-01 13:55:31 -08:00
nextOpTime = OpTime ( ) ; // will reread the op below
}
else if ( nextOpTime ! = syncedTo ) { // didn't get what we queried for - error
Nullstream & l = log ( ) ;
l < < " repl: nextOpTime " < < nextOpTime . toStringLong ( ) < < ' ' ;
if ( nextOpTime < syncedTo )
l < < " <?? " ;
else
l < < " > " ;
l < < " syncedTo " < < syncedTo . toStringLong ( ) < < ' \n ' ;
log ( ) < < " repl: time diff: " < < ( nextOpTime . getSecs ( ) - syncedTo . getSecs ( ) ) < < " sec \n " ;
log ( ) < < " repl: tailing: " < < tailing < < ' \n ' ;
log ( ) < < " repl: data too stale, halting replication " < < endl ;
replInfo = replAllDead = " data too stale halted replication " ;
2009-03-30 16:28:52 -04:00
assert ( syncedTo < nextOpTime ) ;
2010-03-01 13:55:31 -08:00
throw SyncException ( ) ;
}
else {
2010-08-30 13:43:35 -07:00
/* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */
2010-03-01 13:55:31 -08:00
}
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
// apply operations
2008-08-02 14:58:15 -04:00
{
2010-03-01 13:55:31 -08:00
int n = 0 ;
2011-01-04 00:40:41 -05:00
time_t saveLast = time ( 0 ) ;
2009-01-15 10:17:11 -05:00
while ( 1 ) {
2010-08-16 15:32:14 -04:00
bool moreInitialSyncsPending = ! addDbNextPass . empty ( ) & & n ; // we need "&& n" to assure we actually process at least one op to get a sync point recorded in the first place.
if ( moreInitialSyncsPending | | ! oplogReader . more ( ) ) {
2009-04-27 15:30:17 -04:00
dblock lk ;
2011-03-29 16:56:21 -07:00
// NOTE aaron 2011-03-29 This block may be unnecessary, but I'm leaving it in place to avoid changing timing behavior.
2009-04-27 15:30:17 -04:00
{
dbtemprelease t ;
2010-08-16 15:32:14 -04:00
if ( ! moreInitialSyncsPending & & oplogReader . more ( ) ) {
2011-03-29 16:56:21 -07:00
continue ;
2009-04-27 15:30:17 -04:00
}
2011-03-29 16:56:21 -07:00
// otherwise, break out of loop so we can set to completed or clone more dbs
2009-04-27 15:30:17 -04:00
}
2011-03-29 16:56:21 -07:00
2010-07-05 12:29:42 -04:00
if ( oplogReader . awaitCapable ( ) & & tailing )
2010-03-26 14:37:31 -04:00
okResultCode = 0 ; // don't sleep
2009-01-15 10:17:11 -05:00
syncedTo = nextOpTime ;
save ( ) ; // note how far we are synced up to now
2009-09-22 10:10:02 -04:00
log ( ) < < " repl: applied " < < n < < " operations " < < endl ;
nApplied = n ;
2010-03-08 15:03:00 -05:00
log ( ) < < " repl: end sync_pullOpLog syncedTo: " < < syncedTo . toStringLong ( ) < < endl ;
2009-01-15 10:17:11 -05:00
break ;
}
2010-08-16 15:32:14 -04:00
else {
}
2009-01-21 18:48:37 -05:00
2011-01-04 00:40:41 -05:00
OCCASIONALLY if ( n > 0 & & ( n > 100000 | | time ( 0 ) - saveLast > 60 ) ) {
// periodically note our progress, in case we are doing a lot of work and crash
dblock lk ;
2009-05-21 14:30:41 -04:00
syncedTo = nextOpTime ;
// can't update local log ts since there are pending operations from our peer
2011-01-04 00:40:41 -05:00
save ( ) ;
2009-10-13 16:01:02 -04:00
log ( ) < < " repl: checkpoint applied " < < n < < " operations " < < endl ;
log ( ) < < " repl: syncedTo: " < < syncedTo . toStringLong ( ) < < endl ;
2011-01-04 00:40:41 -05:00
saveLast = time ( 0 ) ;
n = 0 ;
}
2009-01-21 18:48:37 -05:00
2010-07-05 12:29:42 -04:00
BSONObj op = oplogReader . next ( ) ;
2010-11-29 10:21:18 -05:00
2010-11-29 10:57:16 -05:00
unsigned b = replApplyBatchSize ;
bool justOne = b = = 1 ;
scoped_ptr < writelock > lk ( justOne ? 0 : new writelock ( ) ) ;
while ( 1 ) {
2010-11-29 10:21:18 -05:00
2011-01-04 00:40:41 -05:00
BSONElement ts = op . getField ( " ts " ) ;
if ( ! ( ts . type ( ) = = Date | | ts . type ( ) = = Timestamp ) ) {
2010-11-29 16:26:18 -05:00
log ( ) < < " sync error: problem querying remote oplog record " < < endl ;
log ( ) < < " op: " < < op . toString ( ) < < endl ;
2010-11-29 10:21:18 -05:00
log ( ) < < " halting replication " < < endl ;
replInfo = replAllDead = " sync error: no ts found querying remote oplog record " ;
throw SyncException ( ) ;
}
OpTime last = nextOpTime ;
nextOpTime = OpTime ( ts . date ( ) ) ;
if ( ! ( last < nextOpTime ) ) {
log ( ) < < " sync error: last applied optime at slave >= nextOpTime from master " < < endl ;
2010-11-29 16:26:18 -05:00
log ( ) < < " last: " < < last . toStringLong ( ) < < endl ;
log ( ) < < " nextOpTime: " < < nextOpTime . toStringLong ( ) < < endl ;
2010-11-29 10:21:18 -05:00
log ( ) < < " halting replication " < < endl ;
replInfo = replAllDead = " sync error last >= nextOpTime " ;
uassert ( 10123 , " replication error last applied optime at slave >= nextOpTime from master " , false ) ;
}
if ( replSettings . slavedelay & & ( unsigned ( time ( 0 ) ) < nextOpTime . getSecs ( ) + replSettings . slavedelay ) ) {
assert ( justOne ) ;
oplogReader . putBack ( op ) ;
_sleepAdviceTime = nextOpTime . getSecs ( ) + replSettings . slavedelay + 1 ;
dblock lk ;
if ( n > 0 ) {
syncedTo = last ;
save ( ) ;
}
log ( ) < < " repl: applied " < < n < < " operations " < < endl ;
log ( ) < < " repl: syncedTo: " < < syncedTo . toStringLong ( ) < < endl ;
log ( ) < < " waiting until: " < < _sleepAdviceTime < < " to continue " < < endl ;
2010-11-29 16:26:18 -05:00
return okResultCode ;
2010-03-01 13:55:31 -08:00
}
2010-11-28 23:41:10 -05:00
2011-03-29 16:56:21 -07:00
sync_pullOpLog_applyOperation ( op , ! justOne ) ;
2010-11-29 10:21:18 -05:00
n + + ;
if ( - - b = = 0 )
break ;
// if to here, we are doing mulpile applications in a singel write lock acquisition
2010-11-29 10:57:16 -05:00
if ( ! oplogReader . moreInCurrentBatch ( ) ) {
// break if no more in batch so we release lock while reading from the master
break ;
}
op = oplogReader . next ( ) ;
2011-03-02 00:46:22 -05:00
2011-03-02 01:28:17 -05:00
getDur ( ) . commitIfNeeded ( ) ;
2010-11-29 10:21:18 -05:00
}
2008-12-28 20:28:49 -05:00
}
}
2008-12-15 17:23:54 -05:00
2010-03-26 14:37:31 -04:00
return okResultCode ;
2008-08-02 14:58:15 -04:00
}
2011-01-04 00:40:41 -05:00
BSONObj userReplQuery = fromjson ( " { \" user \" : \" repl \" } " ) ;
bool replAuthenticate ( DBClientBase * conn ) {
2011-03-24 15:38:17 -04:00
if ( noauth ) {
return true ;
}
2011-01-04 00:40:41 -05:00
if ( ! cc ( ) . isAdmin ( ) ) {
log ( ) < < " replauthenticate: requires admin permissions, failing \n " ;
return false ;
}
2009-01-23 18:24:15 -05:00
2010-12-27 16:05:40 -05:00
string u ;
string p ;
if ( internalSecurity . pwd . length ( ) > 0 ) {
u = internalSecurity . user ;
p = internalSecurity . pwd ;
}
else {
BSONObj user ;
{
dblock lk ;
Client : : Context ctxt ( " local. " ) ;
if ( ! Helpers : : findOne ( " local.system.users " , userReplQuery , user ) | |
2011-01-04 00:40:41 -05:00
// try the first user in local
! Helpers : : getSingleton ( " local.system.users " , user ) ) {
2010-12-27 16:05:40 -05:00
log ( ) < < " replauthenticate: no user in local.system.users to use for authentication \n " ;
2011-03-24 15:38:17 -04:00
return false ;
2010-12-27 16:05:40 -05:00
}
}
u = user . getStringField ( " user " ) ;
p = user . getStringField ( " pwd " ) ;
massert ( 10392 , " bad user object? [1] " , ! u . empty ( ) ) ;
massert ( 10393 , " bad user object? [2] " , ! p . empty ( ) ) ;
}
2011-01-04 00:40:41 -05:00
string err ;
if ( ! conn - > auth ( " local " , u . c_str ( ) , p . c_str ( ) , err , false ) ) {
log ( ) < < " replauthenticate: can't authenticate to master server, user: " < < u < < endl ;
return false ;
}
return true ;
}
2009-01-23 18:24:15 -05:00
2010-04-02 11:29:45 -04:00
bool replHandshake ( DBClientConnection * conn ) {
2011-01-04 00:40:41 -05:00
2011-06-26 11:33:40 -04:00
string myname = getHostName ( ) ;
2010-04-02 11:29:45 -04:00
BSONObj me ;
{
2011-06-26 11:33:40 -04:00
2010-04-02 11:29:45 -04:00
dblock l ;
2011-02-03 12:12:15 -05:00
// local.me is an identifier for a server for getLastError w:2+
2011-06-24 17:56:07 -04:00
if ( ! Helpers : : getSingleton ( " local.me " , me ) | |
2011-06-26 11:33:40 -04:00
! me . hasField ( " host " ) | |
me [ " host " ] . String ( ) ! = myname ) {
2011-06-24 17:56:07 -04:00
// clean out local.me
Helpers : : emptyCollection ( " local.me " ) ;
// repopulate
2010-04-02 11:29:45 -04:00
BSONObjBuilder b ;
b . appendOID ( " _id " , 0 , true ) ;
2011-06-26 11:33:40 -04:00
b . append ( " host " , myname ) ;
2010-04-02 11:29:45 -04:00
me = b . obj ( ) ;
Helpers : : putSingleton ( " local.me " , me ) ;
}
}
2011-01-04 00:40:41 -05:00
2010-04-02 11:29:45 -04:00
BSONObjBuilder cmd ;
cmd . appendAs ( me [ " _id " ] , " handshake " ) ;
2011-06-09 15:05:34 -04:00
if ( theReplSet ) {
cmd . append ( " member " , theReplSet - > selfId ( ) ) ;
}
2010-04-02 11:29:45 -04:00
BSONObj res ;
bool ok = conn - > runCommand ( " admin " , cmd . obj ( ) , res ) ;
// ignoring for now on purpose for older versions
2010-07-17 16:07:38 -04:00
log ( ok ) < < " replHandshake res not: " < < ok < < " res: " < < res < < endl ;
2010-04-02 11:29:45 -04:00
return true ;
}
2011-05-10 19:13:55 -04:00
bool OplogReader : : commonConnect ( const string & hostName ) {
2010-07-05 12:29:42 -04:00
if ( conn ( ) = = 0 ) {
2011-05-10 19:13:55 -04:00
_conn = shared_ptr < DBClientConnection > ( new DBClientConnection ( false , 0 , 0 /* tcp timeout */ ) ) ;
2009-03-18 13:45:32 -04:00
string errmsg ;
ReplInfo r ( " trying to connect to sync source " ) ;
2011-01-04 00:40:41 -05:00
if ( ! _conn - > connect ( hostName . c_str ( ) , errmsg ) | |
2011-06-09 15:05:34 -04:00
( ! noauth & & ! replAuthenticate ( _conn . get ( ) ) ) ) {
2009-03-18 13:45:32 -04:00
resetConnection ( ) ;
2010-02-12 15:56:17 -05:00
log ( ) < < " repl: " < < errmsg < < endl ;
2009-03-18 13:45:32 -04:00
return false ;
}
}
return true ;
}
2011-05-10 19:13:55 -04:00
bool OplogReader : : connect ( string hostName ) {
2011-06-09 15:05:34 -04:00
if ( conn ( ) ! = 0 ) {
return true ;
}
2011-05-10 19:13:55 -04:00
if ( commonConnect ( hostName ) ) {
return replHandshake ( _conn . get ( ) ) ;
}
return false ;
}
2011-06-09 15:05:34 -04:00
bool OplogReader : : connect ( const BSONObj & rid , const int from , const string & to ) {
2011-05-12 12:03:53 -04:00
if ( conn ( ) ! = 0 ) {
return true ;
}
2011-05-10 19:13:55 -04:00
if ( commonConnect ( to ) ) {
2011-05-12 12:03:53 -04:00
log ( ) < < " handshake between " < < from < < " and " < < to < < endl ;
2011-06-09 15:05:34 -04:00
return passthroughHandshake ( rid , from ) ;
2011-05-10 19:13:55 -04:00
}
return false ;
}
2011-06-09 15:05:34 -04:00
bool OplogReader : : passthroughHandshake ( const BSONObj & rid , const int f ) {
2011-05-10 19:13:55 -04:00
BSONObjBuilder cmd ;
2011-06-09 15:05:34 -04:00
cmd . appendAs ( rid [ " _id " ] , " handshake " ) ;
cmd . append ( " member " , f ) ;
2011-05-10 19:13:55 -04:00
BSONObj res ;
2011-05-14 11:43:14 -04:00
return conn ( ) - > runCommand ( " admin " , cmd . obj ( ) , res ) ;
2011-05-10 19:13:55 -04:00
}
2011-01-04 00:40:41 -05:00
2009-01-15 10:17:11 -05:00
/* note: not yet in mutex at this point.
2010-03-26 14:37:31 -04:00
returns > = 0 if ok . return - 1 if you want to reconnect .
return value of zero indicates no sleep necessary before next call
2009-01-15 10:17:11 -05:00
*/
2010-03-26 14:37:31 -04:00
int ReplSource : : sync ( int & nApplied ) {
2010-03-01 13:55:31 -08:00
_sleepAdviceTime = 0 ;
2009-01-15 10:17:11 -05:00
ReplInfo r ( " sync " ) ;
2010-03-08 13:56:55 -05:00
if ( ! cmdLine . quiet ) {
Nullstream & l = log ( ) ;
l < < " repl: from " ;
if ( sourceName ( ) ! = " main " ) {
l < < " source: " < < sourceName ( ) < < ' ' ;
}
l < < " host: " < < hostName < < endl ;
}
2009-01-15 10:17:11 -05:00
nClonedThisPass = 0 ;
2009-02-02 16:18:44 -05:00
// FIXME Handle cases where this db isn't on default port, or default port is spec'd in hostName.
2009-08-25 10:24:44 -04:00
if ( ( string ( " localhost " ) = = hostName | | string ( " 127.0.0.1 " ) = = hostName ) & & cmdLine . port = = CmdLine : : DefaultDBPort ) {
2009-09-22 10:10:02 -04:00
log ( ) < < " repl: can't sync from self (localhost). sources configuration may be wrong. " < < endl ;
2009-01-15 10:17:11 -05:00
sleepsecs ( 5 ) ;
2010-03-26 14:37:31 -04:00
return - 1 ;
2008-12-28 20:28:49 -05:00
}
2008-07-28 13:51:39 -04:00
2010-07-05 12:29:42 -04:00
if ( ! oplogReader . connect ( hostName ) ) {
2011-01-04 00:40:41 -05:00
log ( 4 ) < < " repl: can't connect to sync source " < < endl ;
2010-03-26 14:37:31 -04:00
return - 1 ;
2009-01-15 10:17:11 -05:00
}
2011-01-04 00:40:41 -05:00
2009-01-15 10:17:11 -05:00
/*
2011-01-04 00:40:41 -05:00
// get current mtime at the server.
BSONObj o = conn - > findOne ( " admin.$cmd " , opTimeQuery ) ;
BSONElement e = o . getField ( " optime " ) ;
if ( e . eoo ( ) ) {
log ( ) < < " repl: failed to get cur optime from master " < < endl ;
log ( ) < < " " < < o . toString ( ) < < endl ;
return false ;
}
uassert ( 10124 , e . type ( ) = = Date ) ;
OpTime serverCurTime ;
serverCurTime . asDate ( ) = e . date ( ) ;
2009-01-15 10:17:11 -05:00
*/
2009-09-22 10:10:02 -04:00
return sync_pullOpLog ( nApplied ) ;
2009-01-15 10:17:11 -05:00
}
2008-08-02 14:58:15 -04:00
2009-01-15 10:17:11 -05:00
/* --------------------------------------------------------------*/
2008-09-11 15:13:47 -04:00
2009-01-15 10:17:11 -05:00
/*
TODO :
_ source has autoptr to the cursor
_ reuse that cursor when we can
*/
2008-08-02 14:58:15 -04:00
2011-01-04 00:40:41 -05:00
/* returns: # of seconds to sleep before next pass
2009-09-22 10:10:02 -04:00
0 = no sleep recommended
1 = special sentinel indicating adaptive sleep recommended
*/
int _replMain ( ReplSource : : SourceVector & sources , int & nApplied ) {
2008-12-15 17:23:54 -05:00
{
2009-01-15 10:17:11 -05:00
ReplInfo r ( " replMain load sources " ) ;
2008-12-15 17:23:54 -05:00
dblock lk ;
2009-01-15 10:17:11 -05:00
ReplSource : : loadAll ( sources ) ;
2010-03-29 14:14:50 -07:00
replSettings . fastsync = false ; // only need this param for initial reset
2008-12-15 17:23:54 -05:00
}
2009-01-15 10:17:11 -05:00
if ( sources . empty ( ) ) {
/* replication is not configured yet (for --slave) in local.sources. Poll for config it
every 20 seconds .
*/
2010-11-17 14:58:35 -05:00
log ( ) < < " no source given, add a master to local.sources to start replication " < < endl ;
2009-01-15 10:17:11 -05:00
return 20 ;
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
2009-09-22 10:10:02 -04:00
int sleepAdvice = 1 ;
2009-04-01 16:00:56 -04:00
for ( ReplSource : : SourceVector : : iterator i = sources . begin ( ) ; i ! = sources . end ( ) ; i + + ) {
ReplSource * s = i - > get ( ) ;
2010-03-26 14:37:31 -04:00
int res = - 1 ;
2009-01-15 10:17:11 -05:00
try {
2010-03-26 14:37:31 -04:00
res = s - > sync ( nApplied ) ;
2009-01-15 10:17:11 -05:00
bool moreToSync = s - > haveMoreDbsToSync ( ) ;
2011-01-04 00:40:41 -05:00
if ( res < 0 ) {
2009-09-22 10:10:02 -04:00
sleepAdvice = 3 ;
}
else if ( moreToSync ) {
sleepAdvice = 0 ;
}
2010-03-01 13:55:31 -08:00
else if ( s - > sleepAdvice ( ) ) {
sleepAdvice = s - > sleepAdvice ( ) ;
}
2011-01-04 00:40:41 -05:00
else
2010-03-26 14:37:31 -04:00
sleepAdvice = res ;
2009-01-15 10:17:11 -05:00
}
2009-03-12 11:01:52 -04:00
catch ( const SyncException & ) {
2009-09-22 10:10:02 -04:00
log ( ) < < " caught SyncException " < < endl ;
2009-01-15 10:17:11 -05:00
return 10 ;
}
catch ( AssertionException & e ) {
if ( e . severe ( ) ) {
2009-09-22 10:10:02 -04:00
log ( ) < < " replMain AssertionException " < < e . what ( ) < < endl ;
2009-01-15 10:17:11 -05:00
return 60 ;
}
else {
2009-09-22 10:10:02 -04:00
log ( ) < < " repl: AssertionException " < < e . what ( ) < < ' \n ' ;
2009-01-15 10:17:11 -05:00
}
replInfo = " replMain caught AssertionException " ;
}
2009-03-12 11:01:52 -04:00
catch ( const DBException & e ) {
2009-09-22 10:10:02 -04:00
log ( ) < < " repl: DBException " < < e . what ( ) < < endl ;
2009-03-12 11:01:52 -04:00
replInfo = " replMain caught DBException " ;
}
catch ( const std : : exception & e ) {
2009-09-22 10:10:02 -04:00
log ( ) < < " repl: std::exception " < < e . what ( ) < < endl ;
2011-01-04 00:40:41 -05:00
replInfo = " replMain caught std::exception " ;
2009-03-12 11:01:52 -04:00
}
2011-01-04 00:40:41 -05:00
catch ( . . . ) {
2009-03-12 11:01:52 -04:00
log ( ) < < " unexpected exception during replication. replication will halt " < < endl ;
replAllDead = " caught unexpected exception during replication " ;
2009-02-11 09:58:01 -05:00
}
2010-03-26 14:37:31 -04:00
if ( res < 0 )
2010-07-05 12:29:42 -04:00
s - > oplogReader . resetConnection ( ) ;
2008-12-15 17:23:54 -05:00
}
2009-09-22 10:10:02 -04:00
return sleepAdvice ;
2008-12-15 17:23:54 -05:00
}
2009-01-15 10:17:11 -05:00
void replMain ( ) {
2009-04-01 16:00:56 -04:00
ReplSource : : SourceVector sources ;
2009-01-15 10:17:11 -05:00
while ( 1 ) {
int s = 0 ;
{
dblock lk ;
2009-02-04 13:22:02 -05:00
if ( replAllDead ) {
2011-05-03 11:29:30 -04:00
// throttledForceResyncDead can throw
if ( ! replSettings . autoresync | | ! ReplSource : : throttledForceResyncDead ( " auto " ) ) {
log ( ) < < " all sources dead: " < < replAllDead < < " , sleeping for 5 seconds " < < endl ;
2009-02-02 11:15:24 -05:00
break ;
2011-05-03 11:29:30 -04:00
}
2009-02-02 11:15:24 -05:00
}
2010-01-24 10:26:24 -05:00
assert ( syncing = = 0 ) ; // i.e., there is only one sync thread running. we will want to change/fix this.
2009-01-15 10:17:11 -05:00
syncing + + ;
}
try {
2009-09-22 10:10:02 -04:00
int nApplied = 0 ;
s = _replMain ( sources , nApplied ) ;
2011-01-04 00:40:41 -05:00
if ( s = = 1 ) {
2009-09-22 10:10:02 -04:00
if ( nApplied = = 0 ) s = 2 ;
2011-01-04 00:40:41 -05:00
else if ( nApplied > 100 ) {
2009-09-22 10:10:02 -04:00
// sleep very little - just enought that we aren't truly hammering master
sleepmillis ( 75 ) ;
s = 0 ;
}
}
2011-01-04 00:40:41 -05:00
}
catch ( . . . ) {
2009-09-22 10:10:02 -04:00
out ( ) < < " caught exception in _replMain " < < endl ;
s = 4 ;
2009-01-15 10:17:11 -05:00
}
{
dblock lk ;
assert ( syncing = = 1 ) ;
syncing - - ;
}
2010-03-27 12:37:38 -04:00
2011-01-04 00:40:41 -05:00
if ( relinquishSyncingSome ) {
relinquishSyncingSome = 0 ;
s = 1 ; // sleep before going back in to syncing=1
}
2010-03-27 12:37:38 -04:00
2009-01-15 10:17:11 -05:00
if ( s ) {
stringstream ss ;
2011-05-03 11:29:30 -04:00
ss < < " repl: sleep " < < s < < " sec before next pass " ;
2009-01-15 10:17:11 -05:00
string msg = ss . str ( ) ;
2010-02-05 10:53:22 -05:00
if ( ! cmdLine . quiet )
log ( ) < < msg < < endl ;
2009-01-15 10:17:11 -05:00
ReplInfo r ( msg . c_str ( ) ) ;
sleepsecs ( s ) ;
}
}
}
2008-08-02 14:58:15 -04:00
2010-03-08 15:03:00 -05:00
static void replMasterThread ( ) {
sleepsecs ( 4 ) ;
Client : : initThread ( " replmaster " ) ;
2010-06-04 23:07:41 -04:00
int toSleep = 10 ;
2010-03-08 15:03:00 -05:00
while ( 1 ) {
2010-06-04 23:07:41 -04:00
sleepsecs ( toSleep ) ;
2011-01-04 00:40:41 -05:00
/* write a keep-alive like entry to the log. this will make things like
2010-03-08 15:03:00 -05:00
printReplicationStatus ( ) and printSlaveReplicationStatus ( ) stay up - to - date
even when things are idle .
*/
{
2010-06-04 23:07:41 -04:00
writelocktry lk ( " " , 1 ) ;
2011-01-04 00:40:41 -05:00
if ( lk . got ( ) ) {
2010-06-04 23:07:41 -04:00
toSleep = 10 ;
2011-01-04 00:40:41 -05:00
2011-05-10 14:41:01 -04:00
replLocalAuth ( ) ;
2011-01-04 00:40:41 -05:00
try {
2010-06-04 23:07:41 -04:00
logKeepalive ( ) ;
}
2011-01-04 00:40:41 -05:00
catch ( . . . ) {
2010-06-04 23:07:41 -04:00
log ( ) < < " caught exception in replMasterThread() " < < endl ;
}
2010-03-08 15:03:00 -05:00
}
2010-06-04 23:07:41 -04:00
else {
2010-06-08 11:37:30 -04:00
log ( 5 ) < < " couldn't logKeepalive " < < endl ;
2010-06-04 23:07:41 -04:00
toSleep = 1 ;
2010-03-08 15:03:00 -05:00
}
}
}
}
2009-01-15 10:17:11 -05:00
void replSlaveThread ( ) {
sleepsecs ( 1 ) ;
2010-02-10 14:18:57 -05:00
Client : : initThread ( " replslave " ) ;
2010-08-23 13:55:34 -04:00
cc ( ) . iAmSyncThread ( ) ;
2011-01-04 00:40:41 -05:00
2009-01-15 10:17:11 -05:00
{
dblock lk ;
2011-05-10 14:41:01 -04:00
replLocalAuth ( ) ;
2008-12-15 17:23:54 -05:00
}
2009-01-15 10:17:11 -05:00
while ( 1 ) {
try {
replMain ( ) ;
sleepsecs ( 5 ) ;
}
catch ( AssertionException & ) {
ReplInfo r ( " Assertion in replSlaveThread() : sleeping 5 minutes before retry " ) ;
problem ( ) < < " Assertion in replSlaveThread(): sleeping 5 minutes before retry " < < endl ;
sleepsecs ( 300 ) ;
2008-12-28 20:28:49 -05:00
}
2011-05-03 11:29:30 -04:00
catch ( DBException & e ) {
problem ( ) < < " exception in replSlaveThread(): " < < e . what ( )
< < " , sleeping 5 minutes before retry " < < endl ;
sleepsecs ( 300 ) ;
}
catch ( . . . ) {
problem ( ) < < " error in replSlaveThread(): sleeping 5 minutes before retry " < < endl ;
sleepsecs ( 300 ) ;
}
2008-12-28 20:28:49 -05:00
}
2008-09-03 16:43:00 -04:00
}
2008-12-02 14:24:45 -05:00
2009-01-15 10:17:11 -05:00
void tempThread ( ) {
while ( 1 ) {
2009-12-03 13:12:51 -05:00
out ( ) < < dbMutex . info ( ) . isLocked ( ) < < endl ;
2009-01-15 10:17:11 -05:00
sleepmillis ( 100 ) ;
}
2008-12-28 20:28:49 -05:00
}
2010-05-30 15:38:37 -04:00
void newRepl ( ) ;
void oldRepl ( ) ;
2011-07-25 13:20:25 -04:00
void startReplSets ( ReplSetCmdline * ) ;
2009-01-15 10:17:11 -05:00
void startReplication ( ) {
2010-04-21 16:43:51 -04:00
/* if we are going to be a replica set, we aren't doing other forms of replication. */
2010-08-02 15:21:26 -04:00
if ( ! cmdLine . _replSet . empty ( ) ) {
2011-03-29 16:56:21 -07:00
if ( replSettings . slave | | replSettings . master ) {
2010-05-30 15:38:37 -04:00
log ( ) < < " *** " < < endl ;
2010-05-30 15:04:20 -04:00
log ( ) < < " ERROR: can't use --slave or --master replication options with --replSet " < < endl ;
2010-05-30 15:38:37 -04:00
log ( ) < < " *** " < < endl ;
2010-05-30 15:04:20 -04:00
}
2010-05-30 15:38:37 -04:00
newRepl ( ) ;
2011-07-25 13:20:25 -04:00
replSet = true ;
ReplSetCmdline * replSetCmdline = new ReplSetCmdline ( cmdLine . _replSet ) ;
boost : : thread t ( boost : : bind ( & startReplSets , replSetCmdline ) ) ;
2010-04-13 13:22:42 -04:00
return ;
2010-05-30 15:04:20 -04:00
}
2010-04-13 13:22:42 -04:00
2010-05-30 15:38:37 -04:00
oldRepl ( ) ;
2009-01-15 10:17:11 -05:00
/* this was just to see if anything locks for longer than it should -- we need to be careful
not to be locked when trying to connect ( ) or query ( ) the other side .
*/
//boost::thread tempt(tempThread);
2011-03-29 16:56:21 -07:00
if ( ! replSettings . slave & & ! replSettings . master )
2009-01-15 10:17:11 -05:00
return ;
2008-12-28 20:28:49 -05:00
{
dblock lk ;
2011-05-10 14:41:01 -04:00
replLocalAuth ( ) ;
2009-01-15 10:17:11 -05:00
}
2011-03-29 16:56:21 -07:00
if ( replSettings . slave ) {
assert ( replSettings . slave = = SimpleSlave ) ;
log ( 1 ) < < " slave=true " < < endl ;
2009-01-15 10:17:11 -05:00
boost : : thread repl_thread ( replSlaveThread ) ;
}
2011-03-29 16:56:21 -07:00
if ( replSettings . master ) {
log ( 1 ) < < " master=true " < < endl ;
2010-02-08 21:04:09 -05:00
replSettings . master = true ;
2009-01-23 10:17:29 -05:00
createOplog ( ) ;
2010-03-08 15:03:00 -05:00
boost : : thread t ( replMasterThread ) ;
2009-01-15 10:17:11 -05:00
}
2011-01-04 00:40:41 -05:00
2010-03-29 14:14:50 -07:00
while ( replSettings . fastsync ) // don't allow writes until we've set up from log
sleepmillis ( 50 ) ;
2008-12-28 20:28:49 -05:00
}
2008-09-03 16:43:00 -04:00
2010-07-12 12:37:27 -04:00
void testPretouch ( ) {
int nthr = min ( 8 , 8 ) ;
nthr = max ( nthr , 1 ) ;
int m = 8 / nthr ;
ThreadPool tp ( nthr ) ;
vector < BSONObj > v ;
BSONObj x = BSON ( " ns " < < " test.foo " < < " o " < < BSON ( " _id " < < 1 ) < < " op " < < " i " ) ;
v . push_back ( x ) ;
v . push_back ( x ) ;
v . push_back ( x ) ;
unsigned a = 0 ;
while ( 1 ) {
if ( a > = v . size ( ) ) break ;
unsigned b = a + m - 1 ; // v[a..b]
if ( b > = v . size ( ) ) b = v . size ( ) - 1 ;
tp . schedule ( pretouchN , v , a , b ) ;
DEV cout < < " pretouch task: " < < a < < " .. " < < b < < endl ;
a + = m ;
}
tp . join ( ) ;
}
2011-01-04 00:40:41 -05:00
2010-12-01 11:12:28 -05:00
class ReplApplyBatchSizeValidator : public ParameterValidator {
public :
2011-01-04 00:40:41 -05:00
ReplApplyBatchSizeValidator ( ) : ParameterValidator ( " replApplyBatchSize " ) { }
2010-12-01 11:12:28 -05:00
2011-01-04 00:40:41 -05:00
virtual bool isValid ( BSONElement e , string & errmsg ) {
2010-12-01 11:12:28 -05:00
int b = e . numberInt ( ) ;
2011-01-04 00:40:41 -05:00
if ( b < 1 | | b > 1024 ) {
2010-12-01 11:12:28 -05:00
errmsg = " replApplyBatchSize has to be >= 1 and < 1024 " ;
return false ;
}
2011-01-04 00:40:41 -05:00
if ( replSettings . slavedelay ! = 0 & & b > 1 ) {
2010-12-01 11:12:28 -05:00
errmsg = " can't use a batch size > 1 with slavedelay " ;
return false ;
}
2011-01-04 00:40:41 -05:00
if ( ! replSettings . slave ) {
2010-12-01 11:12:28 -05:00
errmsg = " can't set replApplyBatchSize on a non-slave machine " ;
return false ;
}
return true ;
}
} replApplyBatchSizeValidator ;
2009-01-14 17:09:51 -05:00
} // namespace mongo