SERVER-684 refactor findingStart code

This commit is contained in:
Aaron
2010-03-09 13:54:19 -08:00
parent 48f68d1f2e
commit 57d2a1d901
4 changed files with 157 additions and 133 deletions

View File

@@ -449,12 +449,9 @@ namespace mongo {
return res->count();
}
int _findingStartInitialTimeout = 5; // configurable for testing
// Implements database 'query' requests using the query optimizer's QueryOp interface
class UserQueryOp : public QueryOp {
public:
enum FindingStartMode { Initial, FindExtent, InExtent };
UserQueryOp( const ParsedQuery& pq ) :
//int ntoskip, int ntoreturn, const BSONObj &order, bool wantMore,
@@ -466,36 +463,18 @@ namespace mongo {
_n(0),
_inMemSort(false),
_saveClientCursor(false),
_findingStart( pq.hasOption( QueryOption_OplogReplay) ) ,
_findingStartCursor(0),
_findingStartTimer(0),
_findingStartMode()
_oplogReplay( pq.hasOption( QueryOption_OplogReplay) )
{}
void setupMatcher() {
_matcher.reset(new CoveredIndexMatcher( qp().query() , qp().indexKey()));
}
virtual void init() {
_buf.skip( sizeof( QueryResult ) );
if ( _findingStart ) {
// Use a ClientCursor here so we can release db mutex while scanning
// oplog (can take quite a while with large oplogs).
auto_ptr<Cursor> c = qp().newReverseCursor();
_findingStartCursor = new ClientCursor(c, qp().ns(), false);
_findingStartTimer.reset();
_findingStartMode = Initial;
BSONElement tsElt = qp().query()[ "ts" ];
massert( 13044, "no ts field in query", !tsElt.eoo() );
BSONObjBuilder b;
b.append( tsElt );
BSONObj tsQuery = b.obj();
_matcher.reset(new CoveredIndexMatcher(tsQuery, qp().indexKey()));
if ( _oplogReplay ) {
_findingStartCursor.reset( new FindingStartCursor( qp() ) );
} else {
_c = qp().newCursor( DiskLoc() , _pq.getNumToReturn() + _pq.getSkip() );
setupMatcher();
}
_matcher.reset(new CoveredIndexMatcher( qp().query() , qp().indexKey()));
if ( qp().scanAndOrderRequired() ) {
_inMemSort = true;
@@ -503,107 +482,15 @@ namespace mongo {
}
}
DiskLoc startLoc( const DiskLoc &rec ) {
Extent *e = rec.rec()->myExtent( rec );
if ( e->myLoc != qp().nsd()->capExtent )
return e->firstRecord;
// Likely we are on the fresh side of capExtent, so return first fresh record.
// If we are on the stale side of capExtent, then the collection is small and it
// doesn't matter if we start the extent scan with capFirstNewRecord.
return qp().nsd()->capFirstNewRecord;
}
DiskLoc prevLoc( const DiskLoc &rec ) {
Extent *e = rec.rec()->myExtent( rec );
if ( e->xprev.isNull() )
e = qp().nsd()->lastExtent.ext();
else
e = e->xprev.ext();
if ( e->myLoc != qp().nsd()->capExtent )
return e->firstRecord;
return DiskLoc(); // reached beginning of collection
}
void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) {
auto_ptr<Cursor> c = qp().newCursor( startLoc );
_findingStartCursor = new ClientCursor(c, qp().ns(), false);
}
void maybeRelease() {
RARELY {
CursorId id = _findingStartCursor->cursorid;
_findingStartCursor->updateLocation();
{
dbtemprelease t;
}
_findingStartCursor = ClientCursor::find( id, false );
}
}
virtual void next() {
if ( _findingStart ) {
if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) {
_findingStart = false;
_c = qp().newCursor(); // on error, start from beginning
setupMatcher();
return;
if ( _findingStartCursor.get() ) {
if ( _findingStartCursor->done() ) {
_c = _findingStartCursor->cRelease();
_findingStartCursor.reset( 0 );
} else {
_findingStartCursor->next();
}
switch( _findingStartMode ) {
case Initial: {
if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
_findingStart = false; // found first record out of query range, so scan normally
_c = qp().newCursor( _findingStartCursor->c->currLoc() );
setupMatcher();
return;
}
_findingStartCursor->c->advance();
RARELY {
if ( _findingStartTimer.seconds() >= _findingStartInitialTimeout ) {
createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) );
_findingStartMode = FindExtent;
return;
}
}
maybeRelease();
return;
}
case FindExtent: {
if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
_findingStartMode = InExtent;
return;
}
DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() );
if ( prev.isNull() ) { // hit beginning, so start scanning from here
createClientCursor();
_findingStartMode = InExtent;
return;
}
// There might be a more efficient implementation than creating new cursor & client cursor each time,
// not worrying about that for now
createClientCursor( prev );
maybeRelease();
return;
}
case InExtent: {
if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
_findingStart = false; // found first record in query range, so scan normally
_c = qp().newCursor( _findingStartCursor->c->currLoc() );
setupMatcher();
return;
}
_findingStartCursor->c->advance();
maybeRelease();
return;
}
default: {
massert( 12600, "invalid _findingStartMode", false );
}
}
}
if ( _findingStartCursor ) {
ClientCursor::erase( _findingStartCursor->cursorid );
_findingStartCursor = 0;
return;
}
if ( !_c->ok() ) {
@@ -722,11 +609,8 @@ namespace mongo {
auto_ptr< CoveredIndexMatcher > _matcher;
bool _saveClientCursor;
bool _findingStart;
ClientCursor * _findingStartCursor;
Timer _findingStartTimer;
FindingStartMode _findingStartMode;
bool _oplogReplay;
auto_ptr< FindingStartCursor > _findingStartCursor;
};
/* run a query -- includes checking for and running a Command */