diff --git a/bson/util/builder.h b/bson/util/builder.h index c71a3d9017a..37288d0d2ca 100644 --- a/bson/util/builder.h +++ b/bson/util/builder.h @@ -67,10 +67,8 @@ namespace mongo { char* buf() { return data; } const char* buf() const { return data; } - /* assume ownership of the buffer - you must then free it */ - void decouple() { - data = 0; - } + /* assume ownership of the buffer - you must then free() it */ + void decouple() { data = 0; } template void append(T j) { *((T*)grow(sizeof(T))) = j; diff --git a/db/clientcursor.h b/db/clientcursor.h index 1bce1a3b463..886d801e1ad 100644 --- a/db/clientcursor.h +++ b/db/clientcursor.h @@ -104,9 +104,9 @@ namespace mongo { /*const*/ CursorId cursorid; string ns; shared_ptr c; - int pos; // # objects into the cursor so far + int pos; // # objects into the cursor so far BSONObj query; - int _queryOptions; + int _queryOptions; // see enum QueryOptions dbclient.h OpTime _slaveReadTill; ClientCursor(int queryOptions, shared_ptr& _c, const char *_ns) : diff --git a/db/db.cpp b/db/db.cpp index 7f7bc0c6ec1..5ff89ff9be0 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -183,7 +183,7 @@ namespace mongo { dbMsgPort->shutdown(); break; } - +sendmore: if ( inShutdown() ) { log() << "got request after shutdown()" << endl; break; @@ -209,16 +209,36 @@ namespace mongo { if ( dbresponse.response ) { dbMsgPort->reply(m, *dbresponse.response, dbresponse.responseTo); if( dbresponse.exhaust ) { - while( 1 ) { - log() << "exhausting" << endl; + MsgData *header = dbresponse.response->header(); + QueryResult *qr = (QueryResult *) header; + long long cursorid = qr->cursorId; + if( cursorid ) { + string ns = dbresponse.exhaust; // before reset() free's it... + m.reset(); + BufBuilder b(512); + b.append((int) 0 /*size set later in appendData()*/); + b.append(header->id); + b.append(header->responseTo); + b.append((int) dbGetMore); + b.append((int) 0); + assert( dbresponse.exhaust && *dbresponse.exhaust != 0 ); + b.append(ns); + b.append((int) 0); // ntoreturn + b.append(cursorid); + m.appendData(b.buf(), b.len()); + b.decouple(); + DEV log() << "exhaust=true sending more" << endl; + sleepmillis(1); + goto sendmore; } } } } } - catch ( AssertionException& ) { - problem() << "AssertionException in connThread, closing client connection" << endl; + catch ( AssertionException& e ) { + log() << "AssertionException in connThread, closing client connection" << endl; + log() << ' ' << e.what() << endl; dbMsgPort->shutdown(); } catch ( SocketException& ) { diff --git a/db/db.vcxproj b/db/db.vcxproj index a191c0122d9..96d99a37c73 100644 --- a/db/db.vcxproj +++ b/db/db.vcxproj @@ -557,6 +557,9 @@ + + + diff --git a/db/db.vcxproj.filters b/db/db.vcxproj.filters index b1ec878e879..a655a276a88 100755 --- a/db/db.vcxproj.filters +++ b/db/db.vcxproj.filters @@ -778,6 +778,9 @@ db + + + diff --git a/db/dbmessage.h b/db/dbmessage.h index 9e00aa2dbe8..8be52d26e96 100644 --- a/db/dbmessage.h +++ b/db/dbmessage.h @@ -75,20 +75,24 @@ namespace mongo { /* For the database/server protocol, these objects and functions encapsulate the various messages transmitted over the connection. - */ + See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol + */ class DbMessage { public: - DbMessage(const Message& _m) : m(_m) { + DbMessage(const Message& _m) : m(_m) + { // for received messages, Message has only one buffer theEnd = _m.singleData()->_data + _m.header()->dataLen(); - int *r = (int *) _m.singleData()->_data; - reserved = *r; - r++; - data = (const char *) r; + char *r = _m.singleData()->_data; + reserved = (int *) r; + data = r + 4; nextjsobj = data; } + /** the 32 bit field before the ns */ + int& reservedField() { return *reserved; } + const char * getns() const { return data; } @@ -109,13 +113,12 @@ namespace mongo { return getInt( 1 ); } - void resetPull(){ - nextjsobj = data; - } - int pullInt() { + void resetPull(){ nextjsobj = data; } + int pullInt() const { return pullInt(); } + int& pullInt() { if ( nextjsobj == data ) nextjsobj += strlen(data) + 1; // skip namespace - int i = *((int *)nextjsobj); + int& i = *((int *)nextjsobj); nextjsobj += 4; return i; } @@ -164,9 +167,7 @@ namespace mongo { return js; } - const Message& msg() const { - return m; - } + const Message& msg() const { return m; } void markSet(){ mark = nextjsobj; @@ -178,7 +179,7 @@ namespace mongo { private: const Message& m; - int reserved; + int* reserved; const char *data; const char *nextjsobj; const char *theEnd; diff --git a/db/instance.cpp b/db/instance.cpp index 9cc9298b84b..126c21efb81 100644 --- a/db/instance.cpp +++ b/db/instance.cpp @@ -490,24 +490,28 @@ namespace mongo { int ntoreturn = d.pullInt(); long long cursorid = d.pullInt64(); - ss << ns << " cid:" << cursorid << " ntoreturn:" << ntoreturn;; + ss << ns << " cid:" << cursorid; + if( ntoreturn ) + ss << " ntoreturn:" << ntoreturn; - int pass = 0; - + int pass = 0; + bool exhaust = false; QueryResult* msgdata; while( 1 ) { try { mongolock lk(false); Client::Context ctx(ns); - msgdata = processGetMore(ns, ntoreturn, cursorid, curop, pass ); + msgdata = processGetMore(ns, ntoreturn, cursorid, curop, pass, exhaust); } catch ( GetMoreWaitException& ) { + exhaust = false; massert(13073, "shutting down", !inShutdown() ); pass++; sleepmillis(2); continue; } catch ( AssertionException& e ) { + exhaust = false; ss << " exception " << e.toString(); msgdata = emptyMoreResult(cursorid); ok = false; @@ -521,7 +525,8 @@ namespace mongo { ss << " nreturned:" << msgdata->nReturned; dbresponse.response = resp; dbresponse.responseTo = m.header()->id; - + if( exhaust ) { ss << " exhaust "; + dbresponse.exhaust = ns;} return ok; } diff --git a/db/instance.h b/db/instance.h index 9356dd3d630..351726c7f0a 100644 --- a/db/instance.h +++ b/db/instance.h @@ -93,16 +93,13 @@ namespace mongo { struct DbResponse { Message *response; MSGID responseTo; - bool exhaust; - DbResponse(Message *r, MSGID rt) : response(r), responseTo(rt), exhaust(false) { - } + const char *exhaust; /* points to ns if exhaust mode. 0=normal mode*/ + DbResponse(Message *r, MSGID rt) : response(r), responseTo(rt), exhaust(0) { } DbResponse() { response = 0; - exhaust = false; - } - ~DbResponse() { - delete response; + exhaust = 0; } + ~DbResponse() { delete response; } }; bool assembleResponse( Message &m, DbResponse &dbresponse, const SockAddr &client = unknownAddress ); diff --git a/db/query.cpp b/db/query.cpp index a4c5835cfca..1463ada4566 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -256,8 +256,8 @@ namespace mongo { return qr; } - QueryResult* processGetMore(const char *ns, int ntoreturn, long long cursorid , CurOp& curop, int pass ) { - + QueryResult* processGetMore(const char *ns, int ntoreturn, long long cursorid , CurOp& curop, int pass, bool& exhaust ) { + exhaust = false; ClientCursor::Pointer p(cursorid); ClientCursor *cc = p._c; @@ -323,7 +323,6 @@ namespace mongo { if ( c->matcher() && !c->matcher()->matches(c->currKey(), c->currLoc() ) ) { } else { - //out() << "matches " << c->currLoc().toString() << '\n'; if( c->getsetdup(c->currLoc()) ) { //out() << " but it's a dup \n"; } @@ -349,6 +348,7 @@ namespace mongo { cc->updateLocation(); cc->mayUpgradeStorage(); cc->storeOpForSlave( last ); + exhaust = cc->_queryOptions & QueryOption_Exhaust; } } @@ -769,7 +769,7 @@ namespace mongo { }; /* run a query -- includes checking for and running a Command */ - bool runQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result) { + const char *runQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result) { StringBuilder& ss = curop.debug().str; shared_ptr pq_shared( new ParsedQuery(q) ); ParsedQuery& pq( *pq_shared ); @@ -936,7 +936,7 @@ namespace mongo { if( logLevel >= 5 ) log() << " used cursor: " << cursor.get() << endl; long long cursorid = 0; - bool exhaust = false; + const char * exhaust = 0; if ( dqo.saveClientCursor() || mps->mayRunMore() ) { ClientCursor *cc; bool moreClauses = mps->mayRunMore(); @@ -957,7 +957,10 @@ namespace mongo { cc->updateLocation(); if ( !cc->c->ok() && cc->c->tailable() ) DEV tlog() << "query has no more but tailable, cursorid: " << cursorid << endl; - exhaust = queryOptions & QueryOption_Exhaust; + if( queryOptions & QueryOption_Exhaust ) { + exhaust = ns; + ss << " exhaust "; + } } QueryResult *qr = (QueryResult *) result.header(); diff --git a/db/query.h b/db/query.h index 663e68811ef..13a254bb381 100644 --- a/db/query.h +++ b/db/query.h @@ -76,7 +76,7 @@ namespace mongo { // for an existing query (ie a ClientCursor), send back additional information. struct GetMoreWaitException { }; - QueryResult* processGetMore(const char *ns, int ntoreturn, long long cursorid , CurOp& op, int pass ); + QueryResult* processGetMore(const char *ns, int ntoreturn, long long cursorid , CurOp& op, int pass, bool& exhaust); struct UpdateResult { bool existing; @@ -114,7 +114,7 @@ namespace mongo { long long runCount(const char *ns, const BSONObj& cmd, string& err); - bool runQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result); + const char * runQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result); /* This is for languages whose "objects" are not well ordered (JSON is well ordered). [ { a : ... } , { b : ... } ] -> { a : ..., b : ... } diff --git a/util/message.h b/util/message.h index bd6bc3b533a..6d54f8f9e98 100644 --- a/util/message.h +++ b/util/message.h @@ -113,9 +113,6 @@ namespace mongo { friend class PiggyBackData; }; - //#pragma pack() -#pragma pack(1) - enum Operations { opReply = 1, /* reply. responseTo is set. */ dbMsg = 1000, /* generic msg command followed by a string */ @@ -148,6 +145,27 @@ namespace mongo { } } +#pragma pack(1) +/* see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol +*/ +struct MSGHEADER { + int messageLength; // total message size, including this + int requestID; // identifier for this message + int responseTo; // requestID from the original request + // (used in reponses from db) + int opCode; +}; +struct OP_GETMORE : public MSGHEADER { + MSGHEADER header; // standard message header + int ZERO_or_flags; // 0 - reserved for future use + //cstring fullCollectionName; // "dbname.collectionname" + //int32 numberToReturn; // number of documents to return + //int64 cursorID; // cursorID from the OP_REPLY +}; +#pragma pack() + +#pragma pack(1) + /* todo merge this with MSGHEADER (or inherit from it). */ struct MsgData { int len; /* len of the msg, including this field */ MSGID id; /* request/reply id's match... */ @@ -186,7 +204,6 @@ namespace mongo { inline int MsgData::dataLen() { return len - MsgDataHeaderSize; } - #pragma pack() class Message {