diff --git a/db/query.cpp b/db/query.cpp index 9451aa8431a..9f1267e4de8 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -449,12 +449,9 @@ namespace mongo { return res->count(); } - int _findingStartInitialTimeout = 5; // configurable for testing - // Implements database 'query' requests using the query optimizer's QueryOp interface class UserQueryOp : public QueryOp { public: - enum FindingStartMode { Initial, FindExtent, InExtent }; UserQueryOp( const ParsedQuery& pq ) : //int ntoskip, int ntoreturn, const BSONObj &order, bool wantMore, @@ -466,36 +463,18 @@ namespace mongo { _n(0), _inMemSort(false), _saveClientCursor(false), - _findingStart( pq.hasOption( QueryOption_OplogReplay) ) , - _findingStartCursor(0), - _findingStartTimer(0), - _findingStartMode() + _oplogReplay( pq.hasOption( QueryOption_OplogReplay) ) {} - void setupMatcher() { - _matcher.reset(new CoveredIndexMatcher( qp().query() , qp().indexKey())); - } - virtual void init() { _buf.skip( sizeof( QueryResult ) ); - if ( _findingStart ) { - // Use a ClientCursor here so we can release db mutex while scanning - // oplog (can take quite a while with large oplogs). - auto_ptr c = qp().newReverseCursor(); - _findingStartCursor = new ClientCursor(c, qp().ns(), false); - _findingStartTimer.reset(); - _findingStartMode = Initial; - BSONElement tsElt = qp().query()[ "ts" ]; - massert( 13044, "no ts field in query", !tsElt.eoo() ); - BSONObjBuilder b; - b.append( tsElt ); - BSONObj tsQuery = b.obj(); - _matcher.reset(new CoveredIndexMatcher(tsQuery, qp().indexKey())); + if ( _oplogReplay ) { + _findingStartCursor.reset( new FindingStartCursor( qp() ) ); } else { _c = qp().newCursor( DiskLoc() , _pq.getNumToReturn() + _pq.getSkip() ); - setupMatcher(); } + _matcher.reset(new CoveredIndexMatcher( qp().query() , qp().indexKey())); if ( qp().scanAndOrderRequired() ) { _inMemSort = true; @@ -503,107 +482,15 @@ namespace mongo { } } - DiskLoc startLoc( const DiskLoc &rec ) { - Extent *e = rec.rec()->myExtent( rec ); - if ( e->myLoc != qp().nsd()->capExtent ) - return e->firstRecord; - // Likely we are on the fresh side of capExtent, so return first fresh record. - // If we are on the stale side of capExtent, then the collection is small and it - // doesn't matter if we start the extent scan with capFirstNewRecord. - return qp().nsd()->capFirstNewRecord; - } - - DiskLoc prevLoc( const DiskLoc &rec ) { - Extent *e = rec.rec()->myExtent( rec ); - if ( e->xprev.isNull() ) - e = qp().nsd()->lastExtent.ext(); - else - e = e->xprev.ext(); - if ( e->myLoc != qp().nsd()->capExtent ) - return e->firstRecord; - return DiskLoc(); // reached beginning of collection - } - - void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) { - auto_ptr c = qp().newCursor( startLoc ); - _findingStartCursor = new ClientCursor(c, qp().ns(), false); - } - - void maybeRelease() { - RARELY { - CursorId id = _findingStartCursor->cursorid; - _findingStartCursor->updateLocation(); - { - dbtemprelease t; - } - _findingStartCursor = ClientCursor::find( id, false ); - } - } - virtual void next() { - if ( _findingStart ) { - if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) { - _findingStart = false; - _c = qp().newCursor(); // on error, start from beginning - setupMatcher(); - return; + if ( _findingStartCursor.get() ) { + if ( _findingStartCursor->done() ) { + _c = _findingStartCursor->cRelease(); + _findingStartCursor.reset( 0 ); + } else { + _findingStartCursor->next(); } - switch( _findingStartMode ) { - case Initial: { - if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { - _findingStart = false; // found first record out of query range, so scan normally - _c = qp().newCursor( _findingStartCursor->c->currLoc() ); - setupMatcher(); - return; - } - _findingStartCursor->c->advance(); - RARELY { - if ( _findingStartTimer.seconds() >= _findingStartInitialTimeout ) { - createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) ); - _findingStartMode = FindExtent; - return; - } - } - maybeRelease(); - return; - } - case FindExtent: { - if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { - _findingStartMode = InExtent; - return; - } - DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() ); - if ( prev.isNull() ) { // hit beginning, so start scanning from here - createClientCursor(); - _findingStartMode = InExtent; - return; - } - // There might be a more efficient implementation than creating new cursor & client cursor each time, - // not worrying about that for now - createClientCursor( prev ); - maybeRelease(); - return; - } - case InExtent: { - if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { - _findingStart = false; // found first record in query range, so scan normally - _c = qp().newCursor( _findingStartCursor->c->currLoc() ); - setupMatcher(); - return; - } - _findingStartCursor->c->advance(); - maybeRelease(); - return; - } - default: { - massert( 12600, "invalid _findingStartMode", false ); - } - } - } - - if ( _findingStartCursor ) { - ClientCursor::erase( _findingStartCursor->cursorid ); - _findingStartCursor = 0; + return; } if ( !_c->ok() ) { @@ -722,11 +609,8 @@ namespace mongo { auto_ptr< CoveredIndexMatcher > _matcher; bool _saveClientCursor; - - bool _findingStart; - ClientCursor * _findingStartCursor; - Timer _findingStartTimer; - FindingStartMode _findingStartMode; + bool _oplogReplay; + auto_ptr< FindingStartCursor > _findingStartCursor; }; /* run a query -- includes checking for and running a Command */ diff --git a/db/repl.cpp b/db/repl.cpp index 33e0f0eee28..5a4eecb089a 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -69,6 +69,8 @@ namespace mongo { IdTracker &idTracker = *( new IdTracker() ); + int __findingStartInitialTimeout = 5; // configurable for testing + } // namespace mongo #include "replset.h" diff --git a/db/repl.h b/db/repl.h index 6a643c606f4..c5e0f639e0a 100644 --- a/db/repl.h +++ b/db/repl.h @@ -32,6 +32,7 @@ #include "db.h" #include "dbhelpers.h" #include "query.h" +#include "queryoptimizer.h" #include "../client/dbclient.h" @@ -339,5 +340,142 @@ namespace mongo { void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level = 0 ); void replCheckCloseDatabase( Database * db ); + + extern int __findingStartInitialTimeout; // configurable for testing + + class FindingStartCursor { + public: + FindingStartCursor( const QueryPlan & qp ) : + _qp( qp ), + _findingStart( true ), + _findingStartMode(), + _findingStartTimer( 0 ), + _findingStartCursor( 0 ) + { init(); } + bool done() const { return !_findingStart; } + auto_ptr< Cursor > cRelease() { return _c; } + void next() { + if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) { + _findingStart = false; + _c = _qp.newCursor(); // on error, start from beginning + destroyClientCursor(); + return; + } + switch( _findingStartMode ) { + case Initial: { + if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { + _findingStart = false; // found first record out of query range, so scan normally + _c = _qp.newCursor( _findingStartCursor->c->currLoc() ); + destroyClientCursor(); + return; + } + _findingStartCursor->c->advance(); + RARELY { + if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) { + createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) ); + _findingStartMode = FindExtent; + return; + } + } + maybeRelease(); + return; + } + case FindExtent: { + if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { + _findingStartMode = InExtent; + return; + } + DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() ); + if ( prev.isNull() ) { // hit beginning, so start scanning from here + createClientCursor(); + _findingStartMode = InExtent; + return; + } + // There might be a more efficient implementation than creating new cursor & client cursor each time, + // not worrying about that for now + createClientCursor( prev ); + maybeRelease(); + return; + } + case InExtent: { + if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { + _findingStart = false; // found first record in query range, so scan normally + _c = _qp.newCursor( _findingStartCursor->c->currLoc() ); + destroyClientCursor(); + return; + } + _findingStartCursor->c->advance(); + maybeRelease(); + return; + } + default: { + massert( 12600, "invalid _findingStartMode", false ); + } + } + } + private: + enum FindingStartMode { Initial, FindExtent, InExtent }; + const QueryPlan &_qp; + bool _findingStart; + FindingStartMode _findingStartMode; + auto_ptr< CoveredIndexMatcher > _matcher; + Timer _findingStartTimer; + ClientCursor * _findingStartCursor; + auto_ptr< Cursor > _c; + DiskLoc startLoc( const DiskLoc &rec ) { + Extent *e = rec.rec()->myExtent( rec ); + if ( e->myLoc != _qp.nsd()->capExtent ) + return e->firstRecord; + // Likely we are on the fresh side of capExtent, so return first fresh record. + // If we are on the stale side of capExtent, then the collection is small and it + // doesn't matter if we start the extent scan with capFirstNewRecord. + return _qp.nsd()->capFirstNewRecord; + } + + DiskLoc prevLoc( const DiskLoc &rec ) { + Extent *e = rec.rec()->myExtent( rec ); + if ( e->xprev.isNull() ) + e = _qp.nsd()->lastExtent.ext(); + else + e = e->xprev.ext(); + if ( e->myLoc != _qp.nsd()->capExtent ) + return e->firstRecord; + return DiskLoc(); // reached beginning of collection + } + void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) { + auto_ptr c = _qp.newCursor( startLoc ); + _findingStartCursor = new ClientCursor(c, _qp.ns(), false); + } + void destroyClientCursor() { + if ( _findingStartCursor ) { + ClientCursor::erase( _findingStartCursor->cursorid ); + _findingStartCursor = 0; + } + } + void maybeRelease() { + RARELY { + CursorId id = _findingStartCursor->cursorid; + _findingStartCursor->updateLocation(); + { + dbtemprelease t; + } + _findingStartCursor = ClientCursor::find( id, false ); + } + } + void init() { + // Use a ClientCursor here so we can release db mutex while scanning + // oplog (can take quite a while with large oplogs). + auto_ptr c = _qp.newReverseCursor(); + _findingStartCursor = new ClientCursor(c, _qp.ns(), false); + _findingStartTimer.reset(); + _findingStartMode = Initial; + BSONElement tsElt = _qp.query()[ "ts" ]; + massert( 13044, "no ts field in query", !tsElt.eoo() ); + BSONObjBuilder b; + b.append( tsElt ); + BSONObj tsQuery = b.obj(); + _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey())); + } + }; } // namespace mongo diff --git a/dbtests/querytests.cpp b/dbtests/querytests.cpp index 2bbd1f3c719..1c509745909 100644 --- a/dbtests/querytests.cpp +++ b/dbtests/querytests.cpp @@ -28,7 +28,7 @@ #include "dbtests.h" namespace mongo { - extern int _findingStartInitialTimeout; + extern int __findingStartInitialTimeout; } namespace QueryTests { @@ -931,11 +931,11 @@ namespace QueryTests { class FindingStart : public CollectionBase { public: - FindingStart() : CollectionBase( "findingstart" ), _old( _findingStartInitialTimeout ) { - _findingStartInitialTimeout = 0; + FindingStart() : CollectionBase( "findingstart" ), _old( __findingStartInitialTimeout ) { + __findingStartInitialTimeout = 0; } ~FindingStart() { - _findingStartInitialTimeout = _old; + __findingStartInitialTimeout = _old; } void run() {