diff --git a/db/oplogreader.h b/db/oplogreader.h index 17c5a809a33..204c1c46360 100644 --- a/db/oplogreader.h +++ b/db/oplogreader.h @@ -30,7 +30,7 @@ namespace mongo { } DBClientConnection* conn() { return _conn.get(); } BSONObj findOne(const char *ns, const Query& q) { - return conn()->findOne(ns, q); + return conn()->findOne(ns, q, 0, QueryOption_SlaveOk); } BSONObj getLastOp(const char *ns) { diff --git a/db/repl/rs.h b/db/repl/rs.h index ed15061f30b..815b6445db2 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -269,6 +269,7 @@ namespace mongo { void assumePrimary(); void loadLastOpTimeWritten(); void changeState(MemberState s); + const Member* getMemberToSyncTo(); void _changeArbiterState(); protected: // "heartbeat message" diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 9c29e17c0d1..92b6c6c9b97 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -63,8 +63,8 @@ namespace mongo { /* todo : progress metering to sethbmsg. */ static bool clone(const char *master, string db) { string err; - return cloneFrom(master, err, db, false, - /*slaveok later can be true*/ false, true, false); + return cloneFrom(master, err, db, false, + /* slave_ok */ true, true, false); } void _logOpObjRS(const BSONObj& op); @@ -103,23 +103,60 @@ namespace mongo { */ } - void ReplSetImpl::_syncDoInitialSync() { - sethbmsg("initial sync pending",0); + /** + * Choose a member to sync from. Prefers a secondary, if available. + * + * TODO: make a config setting like "cloneFromPrimary" or something to force + * machines to clone from the primary. We might want to integrate this with + * tags functionality, if we only want it to be able to clone from certain + * machines. + */ + const Member* ReplSetImpl::getMemberToSyncTo() { + for( Member *m = head(); m; m = m->next() ) { + if (m->hbinfo().up() && m->state() == MemberState::RS_SECONDARY) { + sethbmsg( str::stream() << "syncing to secondary: " << m->fullName(), 0); + return const_cast(m); + } + } + // can't find secondary, try primary StateBox::SP sp = box.get(); assert( !sp.state.primary() ); // wouldn't make sense if we were. - const Member *cp = sp.primary; - if( cp == 0 ) { - sethbmsg("initial sync need a member to be primary to do our initial sync", 0); + // we just checked in _syncDoInitialSync, but we could have lost a + // primary since then + if (!sp.primary) { + return 0; + } + sethbmsg( str::stream() << "syncing to primary: " << sp.primary->fullName(), 0); + return const_cast(sp.primary); + } + + /** + * Do the initial sync for this member. There must be a primary available + * for the whole intial sync, even if we're syncing from a secondary. + */ + void ReplSetImpl::_syncDoInitialSync() { + sethbmsg("initial sync pending",0); + + const Member *cp = box.getPrimary(); + if (!cp) { + sethbmsg("initial sync needs a member to be primary to begin"); sleepsecs(15); return; } - - string masterHostname = cp->h().toString(); + + const Member *source = getMemberToSyncTo(); + if (!source) { + sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); + sleepsecs(15); + return; + } + + string sourceHostname = source->h().toString(); OplogReader r; - if( !r.connect(masterHostname) ) { - sethbmsg( str::stream() << "initial sync couldn't connect to " << cp->h().toString() , 0); + if( !r.connect(sourceHostname) ) { + sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); sleepsecs(15); return; } @@ -150,7 +187,7 @@ namespace mongo { { writelock lk(db); Client::Context ctx(db); - ok = clone(masterHostname.c_str(), db); + ok = clone(sourceHostname.c_str(), db); } if( !ok ) { sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); @@ -177,7 +214,7 @@ namespace mongo { { sethbmsg("initial sync initial oplog application"); isyncassert( "initial sync source must remain primary throughout our initial sync [2]", box.getPrimary() == cp ); - if( ! initialSyncOplogApplication(/*primary*/cp, /*applyGTE*/startingTS, /*minValid*/mvoptime) ) { // note we assume here that this call does not throw + if( ! initialSyncOplogApplication(source, /*applyGTE*/startingTS, /*minValid*/mvoptime) ) { // note we assume here that this call does not throw log() << "replSet initial sync failed during applyoplog" << rsLog; emptyOplog(); // otherwise we'll be up! lastOpTimeWritten = OpTime(); diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index 5d18add4939..cb99a97080f 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -50,13 +50,13 @@ namespace mongo { this method returns an error and doesn't throw exceptions (i think). */ bool ReplSetImpl::initialSyncOplogApplication( - const Member *primary, + const Member *source, OpTime applyGTE, OpTime minValid) { - if( primary == 0 ) return false; + if( source == 0 ) return false; - const string hn = primary->h().toString(); + const string hn = source->h().toString(); OpTime ts; try { OplogReader r; @@ -115,15 +115,17 @@ namespace mongo { /* if we have become primary, we dont' want to apply things from elsewhere anymore. assumePrimary is in the db lock so we are safe as long as we check after we locked above. */ - const Member *p1 = box.getPrimary(); - if( p1 != primary || replSetForceInitialSyncFailure ) { + if( (source->state() != MemberState::RS_PRIMARY && + source->state() != MemberState::RS_SECONDARY) || + replSetForceInitialSyncFailure ) { + int f = replSetForceInitialSyncFailure; if( f > 0 ) { replSetForceInitialSyncFailure = f-1; log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog; + throw DBException("forced error",0); } - log() << "replSet primary was:" << primary->fullName() << " now:" << - (p1 != 0 ? p1->fullName() : "none") << rsLog; + log() << "replSet we are now primary" << rsLog; throw DBException("primary changed",0); } @@ -367,16 +369,17 @@ namespace mongo { return; } - /* later, we can sync from up secondaries if we want. tbd. */ - if( sp.primary == 0 ) - return; - /* do we have anything at all? */ if( lastOpTimeWritten.isNull() ) { syncDoInitialSync(); return; // _syncThread will be recalled, starts from top again in case sync failed. } + /* later, we can sync from up secondaries if we want. tbd. */ + if( sp.primary == 0 ) { + return; + } + /* we have some data. continue tailing. */ syncTail(); } diff --git a/jstests/replsets/initial_sync1.js b/jstests/replsets/initial_sync1.js new file mode 100644 index 00000000000..39fde081f45 --- /dev/null +++ b/jstests/replsets/initial_sync1.js @@ -0,0 +1,122 @@ +/** + * Test killing the secondary during initially sync + * + * 1. Bring up set + * 2. Insert some data + * 4. Make sure synced + * 5. Freeze #2 + * 6. Bring up #3 + * 7. Kill #2 in the middle of syncing + * 8. Eventually it should become a secondary + * 9. Bring #2 back up + * 10. Insert some stuff + * 11. Everyone happy eventually + */ + +load("jstests/replsets/rslib.js"); +var basename = "jstests_initsync1"; + + +print("1. Bring up set"); +var replTest = new ReplSetTest( {name: basename, nodes: 2} ); +var conns = replTest.startSet(); +replTest.initiate(); + +var master = replTest.getMaster(); +var foo = master.getDB("foo"); +var admin = master.getDB("admin"); + +var slave1 = replTest.liveNodes.slaves[0]; +var admin_s1 = slave1.getDB("admin"); +var local_s1 = slave1.getDB("local"); + +print("2. Insert some data"); +for (var i=0; i<10000; i++) { + foo.bar.insert({date : new Date(), x : i, str : "all the talk on the market"}); +} +print("total in foo: "+foo.bar.count()); + + +print("4. Make sure synced"); +replTest.awaitReplication(); + + +print("5. Freeze #2"); +admin_s1.runCommand({replSetFreeze:999999}); + + +print("6. Bring up #3"); +var ports = allocatePorts( 3 ); +var basePath = "/data/db/" + basename; +var hostname = getHostName(); + +var sargs = new MongodRunner( ports[ 2 ], basePath, false, false, + ["--replSet", basename, "--oplogSize", 2], + {no_bind : true} ); +var slave2 = sargs.start(); +var local_s2 = slave2.getDB("local"); +var admin_s2 = slave2.getDB("admin"); + +var config = replTest.getReplSetConfig(); +config.version = 2; +config.members.push({_id:2, host:hostname+":"+ports[2]}); + +try { + admin.runCommand({replSetReconfig:config}); +} +catch(e) { + print(e); +} +reconnect(slave1); +reconnect(slave2); + +wait(function() { + var config2 = local_s1.system.replset.findOne(); + var config3 = local_s2.system.replset.findOne(); + + printjson(config2); + printjson(config3); + + return config2.version == config.version && + (config3 && config3.version == config.version); + }); + +wait(function() { + var status = admin_s2.runCommand({replSetGetStatus:1}); + printjson(status); + return status.members[2].state == 3 || + status.members[2].state == 2; + }); + + +print("7. Kill #2 in the middle of syncing"); +replTest.stop(1); + + +print("8. Eventually it should become a secondary"); +wait(function() { + var status = admin_s2.runCommand({replSetGetStatus:1}); + printjson(status); + return status.members[2].state == 2; + }); + + +print("9. Bring #2 back up"); +replTest.start(1, {}, true); +reconnect(slave1); +wait(function() { + var status = admin_s1.runCommand({replSetGetStatus:1}); + printjson(status); + return status.members[1].state == 2; + }); + + +print("10. Insert some stuff"); +master = replTest.getMaster(); +for (var i=0; i<10000; i++) { + foo.bar.insert({date : new Date(), x : i, str : "all the talk on the market"}); +} + + +print("11. Everyone happy eventually"); +replTest.awaitReplication(); diff --git a/jstests/replsets/initial_sync2.js b/jstests/replsets/initial_sync2.js new file mode 100644 index 00000000000..04d53acef74 --- /dev/null +++ b/jstests/replsets/initial_sync2.js @@ -0,0 +1,132 @@ +/** + * Test killing the primary during initial sync + * and don't allow the other secondary to become primary + * + * 1. Bring up set + * 2. Insert some data + * 4. Make sure synced + * 5. Freeze #2 + * 6. Bring up #3 + * 7. Kill #1 in the middle of syncing + * 8. Check that #1 doesn't make it into secondary state for a while + * 9. Bring #1 back up + * 10. Initial sync should succeed + * 11. Insert some stuff + * 12. Everyone happy eventually + */ + +load("jstests/replsets/rslib.js"); +var basename = "jstests_initsync2"; + + +print("1. Bring up set"); +var replTest = new ReplSetTest( {name: basename, nodes: 2} ); +var conns = replTest.startSet(); +replTest.initiate(); + +var master = replTest.getMaster(); +var foo = master.getDB("foo"); +var admin = master.getDB("admin"); + +var slave1 = replTest.liveNodes.slaves[0]; +var admin_s1 = slave1.getDB("admin"); +var local_s1 = slave1.getDB("local"); + +print("2. Insert some data"); +for (var i=0; i<10000; i++) { + foo.bar.insert({date : new Date(), x : i, str : "all the talk on the market"}); +} +print("total in foo: "+foo.bar.count()); + + +print("4. Make sure synced"); +replTest.awaitReplication(); + + +print("5. Freeze #2"); +admin_s1.runCommand({replSetFreeze:999999}); + + +print("6. Bring up #3"); +var ports = allocatePorts( 3 ); +var basePath = "/data/db/" + basename; +var hostname = getHostName(); + +var sargs = new MongodRunner( ports[ 2 ], basePath, false, false, + ["--replSet", basename, "--oplogSize", 2], + {no_bind : true} ); +var slave2 = sargs.start(); +var local_s2 = slave2.getDB("local"); +var admin_s2 = slave2.getDB("admin"); + +var config = replTest.getReplSetConfig(); +config.version = 2; +config.members.push({_id:2, host:hostname+":"+ports[2]}); + +try { + admin.runCommand({replSetReconfig:config}); +} +catch(e) { + print(e); +} +reconnect(slave1); +reconnect(slave2); + +wait(function() { + var config2 = local_s1.system.replset.findOne(); + var config3 = local_s2.system.replset.findOne(); + + printjson(config2); + printjson(config3); + + return config2.version == config.version && + (config3 && config3.version == config.version); + }); + +wait(function() { + var status = admin_s2.runCommand({replSetGetStatus:1}); + printjson(status); + return status.members[2].state == 3 || + status.members[2].state == 2; + }); + + +print("7. Kill #1 in the middle of syncing"); +replTest.stop(0); + + +print("8. Check that #1 doesn't make it into secondary state for a while"); +for (var i=0; i<100; i++) { + var status = admin_s2.runCommand({replSetGetStatus:1}); + assert(status.members[2].state != 2); + sleep(1000); +} + + +print("9. Bring #2 back up"); +replTest.start(0, {}, true); +reconnect(master); +wait(function() { + var status = admin.runCommand({replSetGetStatus:1}); + printjson(status); + return status.members[0].state == 1; + }); + + +print("10. Initial sync should succeed"); +wait(function() { + var status = admin_s2.runCommand({replSetGetStatus:1}); + printjson(status); + return status.members[2].state == 2; + }); + + +print("11. Insert some stuff"); +master = replTest.getMaster(); +for (var i=0; i<10000; i++) { + foo.bar.insert({date : new Date(), x : i, str : "all the talk on the market"}); +} + + +print("12. Everyone happy eventually"); +replTest.awaitReplication(); diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js index a849abc7b7a..872d018e780 100644 --- a/jstests/replsets/rslib.js +++ b/jstests/replsets/rslib.js @@ -31,3 +31,14 @@ var reconnect = function(a) { } }); }; + + +var getLatestOp = function(server) { + server.getDB("admin").getMongo().setSlaveOk(); + var log = server.getDB("local")['oplog.rs']; + var cursor = log.find({}).sort({'$natural': -1}).limit(1); + if (cursor.hasNext()) { + return cursor.next(); + } + return null; +};