From fe1d205aa8c48ecfaa52d1c975ea60068d535e2f Mon Sep 17 00:00:00 2001 From: dwight Date: Mon, 19 Jul 2010 22:32:43 -0400 Subject: [PATCH] rs --- client/dbclientcursor.cpp | 2 +- db/oplog.cpp | 6 ++-- db/oplog.h | 2 ++ db/oplogreader.h | 10 ++++++ db/repl.cpp | 3 +- db/repl/rs.cpp | 2 +- db/repl/rs.h | 3 +- db/repl/rs_sync.cpp | 65 +++++++++++++++++++++++++++++++++++++-- 8 files changed, 84 insertions(+), 9 deletions(-) diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp index c50106ebb36..273ffbc570f 100644 --- a/client/dbclientcursor.cpp +++ b/client/dbclientcursor.cpp @@ -149,7 +149,7 @@ namespace mongo { } BSONObj DBClientCursor::next() { - assert( more() ); + DEV assert( more() ); if ( !_putBack.empty() ) { BSONObj ret = _putBack.top(); _putBack.pop(); diff --git a/db/oplog.cpp b/db/oplog.cpp index 75fb3af372d..02b85212e3d 100644 --- a/db/oplog.cpp +++ b/db/oplog.cpp @@ -76,7 +76,7 @@ namespace mongo { log() << "replSet error possible failover clock skew issue? " << theReplSet->lastOpTimeWritten.toString() << ' ' << endl; } theReplSet->lastOpTimeWritten = ts; - theReplSet->h = h; + theReplSet->lastH = h; ctx.getClient()->setLastOp( ts.asDate() ); } } @@ -97,7 +97,7 @@ namespace mongo { long long hNew; if( theReplSet ) { massert(13312, "replSet error : logOp() but not primary?", theReplSet->isPrimary()); - hNew = (theReplSet->h * 131 + ts.asLL()) * 17 + theReplSet->selfId(); + hNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId(); } else { // must be initiation @@ -145,7 +145,7 @@ namespace mongo { if( !(theReplSet->lastOpTimeWrittenlastOpTimeWritten << ' ' << ts << endl; theReplSet->lastOpTimeWritten = ts; - theReplSet->h = hNew; + theReplSet->lastH = hNew; ctx.getClient()->setLastOp( ts.asDate() ); } } diff --git a/db/oplog.h b/db/oplog.h index 60a7d480be5..b596d4f23b7 100644 --- a/db/oplog.h +++ b/db/oplog.h @@ -35,6 +35,8 @@ namespace mongo { void createOplog(); + void _logOpObjRS(const BSONObj& op); + /** Write operation to the log (local.oplog.$main) @param opstr diff --git a/db/oplogreader.h b/db/oplogreader.h index 8e34a202d55..7b04c867574 100644 --- a/db/oplogreader.h +++ b/db/oplogreader.h @@ -52,6 +52,14 @@ namespace mongo { ); } + void tailingQueryGTE(const char *ns, OpTime t) { + BSONObjBuilder q; + q.appendDate("$gte", t.asDate()); + BSONObjBuilder query; + query.append("ts", q.done()); + tailingQuery(ns, query.done()); + } + bool more() { assert( cursor.get() ); return cursor->more(); @@ -67,6 +75,8 @@ namespace mongo { cursor->peek(v,n); } + BSONObj nextSafe() { return cursor->nextSafe(); } + BSONObj next() { return cursor->next(); } diff --git a/db/repl.cpp b/db/repl.cpp index 940f17a0285..dc1c79892ad 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -932,7 +932,8 @@ namespace mongo { bool empty = ctx.db()->isEmpty(); bool incompleteClone = incompleteCloneDbs.count( clientName ) != 0; - log( 6 ) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl; + if( logLevel >= 6 ) + log(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl; // always apply admin command command // this is a bit hacky -- the semantics of replication/commands aren't well specified diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index d89f1fdfe6e..99201c35742 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -153,7 +153,7 @@ namespace mongo { { memset(_hbmsg, 0, sizeof(_hbmsg)); *_hbmsg = '.'; // temp...just to see - h = 0; + lastH = 0; _myState = STARTUP; _currentPrimary = 0; diff --git a/db/repl/rs.h b/db/repl/rs.h index dfa8eb282da..21dff9ff048 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -165,7 +165,7 @@ namespace mongo { //bool initiated() const { return curOpTime.initiated(); } OpTime lastOpTimeWritten; - long long h; + long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork" private: unsigned _selfId; // stored redundantly we hit this a lot @@ -254,6 +254,7 @@ namespace mongo { void syncDoInitialSync(); void _syncThread(); void syncTail(); + void syncApply(const BSONObj &o); public: void syncThread(); }; diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index 31181cd0c53..42aa3f60dbe 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -27,10 +27,71 @@ namespace mongo { theReplSet->syncThread(); } + void ReplSetImpl::syncApply(const BSONObj &o) { + //const char *op = o.getStringField("op"); + + char db[MaxDatabaseLen]; + const char *ns = o.getStringField("ns"); + nsToDatabase(ns, db); + + if ( *ns == '.' || *ns == 0 ) { + log() << "replSet skipping bad op in oplog: " << o.toString() << endl; + return; + } + + Client::Context ctx(ns); + ctx.getClient()->curop()->reset(); + + /* todo : if this asserts, do we want to ignore or not? */ + applyOperation_inlock(o); + } + void ReplSetImpl::syncTail() { - // todo : locking... + // todo : locking vis a vis the mgr... + + const Member *primary = currentPrimary(); + if( primary == 0 ) return; + string hn = primary->h().toString(); OplogReader r; -// r.connect( + if( !r.connect(primary->h().toString()) ) { + log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; + return; + } + + r.tailingQueryGTE(rsoplog, lastOpTimeWritten); + assert( r.haveCursor() ); + assert( r.awaitCapable() ); + + { + BSONObj o = r.nextSafe(); + OpTime ts = o["ts"]._opTime(); + long long h = o["h"].numberLong(); + if( ts != lastOpTimeWritten || h != lastH ) { + log() << "replSet rollback not yet implemented!" << rsLog; + log() << "replSet " << lastOpTimeWritten.toStringPretty() << ' ' << ts.toStringPretty() << rsLog; + log() << "replSet " << lastH << ' ' << h << rsLog; + sleepsecs(60); + return; + } + } + + // TODO : switch state to secondary here when appropriate... + + while( 1 ) { + while( r.more() ) { + BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ + { + writelock lk(""); + syncApply(o); + _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ + } + } + if( !r.haveCursor() ) + break; + if( currentPrimary() != primary ) + break; + // looping back is ok because this is a tailable cursor + } } void ReplSetImpl::_syncThread() {