2010-07-18 12:53:40 -04:00
/**
* Copyright ( C ) 2008 10 gen Inc .
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU Affero General Public License , version 3 ,
* as published by the Free Software Foundation .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU Affero General Public License for more details .
*
* You should have received a copy of the GNU Affero General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
# include "pch.h"
2010-11-02 18:47:04 -04:00
# include "../repl.h"
2010-07-18 12:53:40 -04:00
# include "../client.h"
# include "../../client/dbclient.h"
# include "rs.h"
2010-07-18 13:34:16 -04:00
# include "../oplogreader.h"
2010-07-18 15:02:46 -04:00
# include "../../util/mongoutils/str.h"
2010-07-19 09:35:13 -04:00
# include "../dbhelpers.h"
2010-08-02 14:03:33 -04:00
# include "rs_optime.h"
2010-08-02 12:16:01 -04:00
# include "../oplog.h"
2010-07-18 12:53:40 -04:00
namespace mongo {
2010-07-18 15:02:46 -04:00
using namespace mongoutils ;
2010-08-02 14:03:33 -04:00
using namespace bson ;
2010-07-18 15:02:46 -04:00
2010-07-18 12:53:40 -04:00
void dropAllDatabasesExceptLocal ( ) ;
2010-07-18 15:02:46 -04:00
// add try/catch with sleep
2011-06-02 02:07:21 -04:00
void isyncassert ( const string & msg , bool expr ) {
2011-01-04 00:40:41 -05:00
if ( ! expr ) {
2010-07-18 15:02:46 -04:00
string m = str : : stream ( ) < < " initial sync " < < msg ;
theReplSet - > sethbmsg ( m , 0 ) ;
2010-07-22 14:20:29 -04:00
uasserted ( 13404 , m ) ;
2010-07-18 15:02:46 -04:00
}
}
2011-01-04 00:40:41 -05:00
void ReplSetImpl : : syncDoInitialSync ( ) {
2010-10-21 16:10:27 -04:00
createOplog ( ) ;
2011-01-04 00:40:41 -05:00
2010-07-18 15:02:46 -04:00
while ( 1 ) {
try {
_syncDoInitialSync ( ) ;
break ;
}
2010-07-19 18:05:44 -04:00
catch ( DBException & e ) {
sethbmsg ( " initial sync exception " + e . toString ( ) , 0 ) ;
2010-07-18 15:02:46 -04:00
sleepsecs ( 30 ) ;
}
}
}
2010-07-19 09:29:49 -04:00
/* todo : progress metering to sethbmsg. */
static bool clone ( const char * master , string db ) {
string err ;
2010-11-09 17:28:32 -05:00
return cloneFrom ( master , err , db , false ,
2011-04-01 18:33:11 -07:00
/* slave_ok */ true , true , false , /*mayYield*/ true , /*mayBeInterrupted*/ false ) ;
2010-07-19 09:29:49 -04:00
}
void _logOpObjRS ( const BSONObj & op ) ;
2010-08-02 14:03:33 -04:00
static void emptyOplog ( ) {
writelock lk ( rsoplog ) ;
Client : : Context ctx ( rsoplog ) ;
2011-01-04 00:40:41 -05:00
NamespaceDetails * d = nsdetails ( rsoplog ) ;
2010-08-02 12:16:01 -04:00
2011-01-04 00:40:41 -05:00
// temp
if ( d & & d - > stats . nrecords = = 0 )
return ; // already empty, ok.
2010-08-02 12:16:01 -04:00
2010-08-02 17:52:33 -04:00
log ( 1 ) < < " replSet empty oplog " < < rsLog ;
d - > emptyCappedCollection ( rsoplog ) ;
2010-08-02 14:03:33 -04:00
}
2010-11-09 17:28:32 -05:00
const Member * ReplSetImpl : : getMemberToSyncTo ( ) {
2011-04-06 12:40:53 -04:00
Member * closest = 0 ;
// find the member with the lowest ping time that has more data than me
for ( Member * m = _members . head ( ) ; m ; m = m - > next ( ) ) {
if ( m - > hbinfo ( ) . up ( ) & &
2011-07-27 14:49:27 -04:00
HeartbeatInfo : : numPings > config ( ) . members . size ( ) * 2 & &
2011-04-07 14:25:25 -04:00
( m - > state ( ) = = MemberState : : RS_PRIMARY | |
( m - > state ( ) = = MemberState : : RS_SECONDARY & & m - > hbinfo ( ) . opTime > lastOpTimeWritten ) ) & &
( ! closest | | m - > hbinfo ( ) . ping < closest - > hbinfo ( ) . ping ) ) {
2011-04-06 12:40:53 -04:00
closest = m ;
2010-11-17 18:30:24 -05:00
}
2010-11-09 17:28:32 -05:00
}
2011-01-04 00:40:41 -05:00
2011-05-04 18:46:16 -04:00
{
lock lk ( this ) ;
if ( ! closest ) {
_currentSyncTarget = NULL ;
return NULL ;
}
_currentSyncTarget = closest ;
2010-11-17 18:30:24 -05:00
}
2011-01-04 00:40:41 -05:00
2011-04-06 12:40:53 -04:00
sethbmsg ( str : : stream ( ) < < " syncing to: " < < closest - > fullName ( ) , 0 ) ;
2011-05-04 18:46:16 -04:00
2011-04-06 12:40:53 -04:00
return const_cast < Member * > ( closest ) ;
2010-11-09 17:28:32 -05:00
}
2011-01-04 00:40:41 -05:00
2010-11-09 17:28:32 -05:00
/**
2010-12-27 18:53:06 -05:00
* Do the initial sync for this member .
2010-11-09 17:28:32 -05:00
*/
2011-01-04 00:40:41 -05:00
void ReplSetImpl : : _syncDoInitialSync ( ) {
2010-11-09 17:28:32 -05:00
sethbmsg ( " initial sync pending " , 0 ) ;
2011-01-04 00:40:41 -05:00
2011-04-20 16:13:21 -04:00
// if this is the first node, it may have already become primary
if ( box . getState ( ) . primary ( ) ) {
sethbmsg ( " I'm already primary, no need for initial sync " , 0 ) ;
return ;
}
2010-11-09 17:28:32 -05:00
const Member * source = getMemberToSyncTo ( ) ;
if ( ! source ) {
sethbmsg ( " initial sync need a member to be primary or secondary to do our initial sync " , 0 ) ;
sleepsecs ( 15 ) ;
return ;
}
2011-01-04 00:40:41 -05:00
2010-11-09 17:28:32 -05:00
string sourceHostname = source - > h ( ) . toString ( ) ;
2010-07-18 13:34:16 -04:00
OplogReader r ;
2010-11-09 17:28:32 -05:00
if ( ! r . connect ( sourceHostname ) ) {
sethbmsg ( str : : stream ( ) < < " initial sync couldn't connect to " < < source - > h ( ) . toString ( ) , 0 ) ;
2010-07-18 15:02:46 -04:00
sleepsecs ( 15 ) ;
return ;
}
BSONObj lastOp = r . getLastOp ( rsoplog ) ;
2011-01-04 00:40:41 -05:00
if ( lastOp . isEmpty ( ) ) {
2010-07-19 18:05:44 -04:00
sethbmsg ( " initial sync couldn't read remote oplog " , 0 ) ;
2010-07-19 09:29:49 -04:00
sleepsecs ( 15 ) ;
return ;
}
2010-08-02 14:03:33 -04:00
OpTime startingTS = lastOp [ " ts " ] . _opTime ( ) ;
2011-01-04 00:40:41 -05:00
2010-11-02 18:47:04 -04:00
if ( replSettings . fastsync ) {
log ( ) < < " fastsync: skipping database clone " < < rsLog ;
}
else {
sethbmsg ( " initial sync drop all databases " , 0 ) ;
dropAllDatabasesExceptLocal ( ) ;
sethbmsg ( " initial sync clone all databases " , 0 ) ;
list < string > dbs = r . conn ( ) - > getDatabaseNames ( ) ;
for ( list < string > : : iterator i = dbs . begin ( ) ; i ! = dbs . end ( ) ; i + + ) {
string db = * i ;
if ( db ! = " local " ) {
sethbmsg ( str : : stream ( ) < < " initial sync cloning db: " < < db , 0 ) ;
bool ok ;
{
writelock lk ( db ) ;
Client : : Context ctx ( db ) ;
2010-11-09 17:28:32 -05:00
ok = clone ( sourceHostname . c_str ( ) , db ) ;
2010-11-02 18:47:04 -04:00
}
2011-01-04 00:40:41 -05:00
if ( ! ok ) {
2010-11-02 18:47:04 -04:00
sethbmsg ( str : : stream ( ) < < " initial sync error clone of " < < db < < " failed sleeping 5 minutes " , 0 ) ;
sleepsecs ( 300 ) ;
return ;
}
2010-07-19 09:29:49 -04:00
}
}
}
2010-07-19 09:35:13 -04:00
2010-07-19 18:05:44 -04:00
sethbmsg ( " initial sync query minValid " , 0 ) ;
2010-07-19 09:35:13 -04:00
2010-12-27 18:53:06 -05:00
isyncassert ( " initial sync source must remain readable throughout our initial sync " , source - > state ( ) . readable ( ) ) ;
2010-11-07 15:02:48 -05:00
2011-01-04 00:40:41 -05:00
/* our cloned copy will be strange until we apply oplog events that occurred
2010-07-19 09:35:13 -04:00
through the process . we note that time point here . */
BSONObj minValid = r . getLastOp ( rsoplog ) ;
2010-11-07 15:02:48 -05:00
isyncassert ( " getLastOp is empty " , ! minValid . isEmpty ( ) ) ;
2010-08-02 14:03:33 -04:00
OpTime mvoptime = minValid [ " ts " ] . _opTime ( ) ;
assert ( ! mvoptime . isNull ( ) ) ;
2010-07-19 09:35:13 -04:00
2011-01-04 00:40:41 -05:00
/* apply relevant portion of the oplog
2010-08-02 14:03:33 -04:00
*/
{
2011-06-02 02:07:21 -04:00
isyncassert ( str : : stream ( ) < < " initial sync source must remain readable throughout our initial sync [2] state now: " < < source - > state ( ) . toString ( ) , source - > state ( ) . readable ( ) ) ;
2010-11-09 17:28:32 -05:00
if ( ! initialSyncOplogApplication ( source , /*applyGTE*/ startingTS , /*minValid*/ mvoptime ) ) { // note we assume here that this call does not throw
2010-08-02 18:07:55 -04:00
log ( ) < < " replSet initial sync failed during applyoplog " < < rsLog ;
2010-08-02 14:03:33 -04:00
emptyOplog ( ) ; // otherwise we'll be up!
2011-05-14 11:43:14 -04:00
lastOpTimeWritten = OpTime ( ) ;
lastH = 0 ;
2011-05-04 18:46:16 -04:00
2010-08-02 18:07:55 -04:00
log ( ) < < " replSet cleaning up [1] " < < rsLog ;
2010-08-02 14:03:33 -04:00
{
writelock lk ( " local. " ) ;
Client : : Context cx ( " local. " ) ;
2011-01-04 00:40:41 -05:00
cx . db ( ) - > flushFiles ( true ) ;
2010-08-02 14:03:33 -04:00
}
2010-08-02 18:07:55 -04:00
log ( ) < < " replSet cleaning up [2] " < < rsLog ;
2010-10-30 14:41:49 -04:00
sleepsecs ( 5 ) ;
2010-08-02 14:03:33 -04:00
return ;
}
}
sethbmsg ( " initial sync finishing up " , 0 ) ;
2011-01-04 00:40:41 -05:00
2010-07-22 20:21:23 -04:00
assert ( ! box . getState ( ) . primary ( ) ) ; // wouldn't make sense if we were.
2010-07-19 09:29:49 -04:00
{
writelock lk ( " local. " ) ;
2010-07-26 17:28:24 -04:00
Client : : Context cx ( " local. " ) ;
2011-01-04 00:40:41 -05:00
cx . db ( ) - > flushFiles ( true ) ;
2010-08-02 14:03:33 -04:00
try {
log ( ) < < " replSet set minValid= " < < minValid [ " ts " ] . _opTime ( ) . toString ( ) < < rsLog ;
}
catch ( . . . ) { }
2010-07-19 09:35:13 -04:00
Helpers : : putSingleton ( " local.replset.minvalid " , minValid ) ;
2010-07-26 17:28:24 -04:00
cx . db ( ) - > flushFiles ( true ) ;
2010-07-19 09:29:49 -04:00
}
2010-07-26 17:28:24 -04:00
2010-07-19 18:05:44 -04:00
sethbmsg ( " initial sync done " , 0 ) ;
2010-07-18 12:53:40 -04:00
}
}