Files
mongo/db/instance.cpp

972 lines
34 KiB
C++
Raw Normal View History

// instance.cpp : Global state variables and functions.
//
/**
* Copyright (C) 2008 10gen Inc.
2008-12-28 20:28:49 -05:00
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
2008-12-28 20:28:49 -05:00
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
2008-12-28 20:28:49 -05:00
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2010-04-27 15:27:52 -04:00
#include "pch.h"
#include "db.h"
#include "query.h"
#include "introspect.h"
#include "repl.h"
#include "dbmessage.h"
#include "instance.h"
2009-01-05 15:30:07 -05:00
#include "lasterror.h"
2009-01-18 20:31:33 -05:00
#include "security.h"
2009-01-28 18:08:02 -05:00
#include "json.h"
2010-04-21 18:46:31 -04:00
#include "replpair.h"
2009-03-19 10:33:35 -04:00
#include "../s/d_logic.h"
#include "../util/file_allocator.h"
#include "../util/goodies.h"
2009-08-25 10:24:44 -04:00
#include "cmdline.h"
2009-04-01 14:10:21 -04:00
#if !defined(_WIN32)
#include <sys/file.h>
#endif
2010-02-01 10:38:00 -05:00
#include "stats/counters.h"
#include "background.h"
2010-11-17 22:31:38 -05:00
#include "dur_journal.h"
2010-12-21 21:21:16 -05:00
#include "dur_recover.h"
2009-01-14 17:09:51 -05:00
namespace mongo {
2010-05-12 15:26:00 -07:00
inline void opread(Message& m) { if( _diaglog.level & 2 ) _diaglog.readop((char *) m.singleData(), m.header()->len); }
inline void opwrite(Message& m) { if( _diaglog.level & 1 ) _diaglog.write((char *) m.singleData(), m.header()->len); }
2010-04-23 19:32:21 -04:00
void receivedKillCursors(Message& m);
2009-12-29 23:30:29 -05:00
void receivedUpdate(Message& m, CurOp& op);
void receivedDelete(Message& m, CurOp& op);
void receivedInsert(Message& m, CurOp& op);
bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop );
int nloggedsome = 0;
2008-12-28 20:28:49 -05:00
#define LOGSOME if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 )
2008-12-27 12:07:20 -05:00
string dbExecCommand;
char *appsrvPath = NULL;
2009-02-02 18:18:22 -05:00
DiagLog _diaglog;
bool useCursors = true;
2009-03-10 10:54:00 -04:00
bool useHints = true;
2011-01-04 00:40:41 -05:00
void flushDiagLog() {
if( _diaglog.f && _diaglog.f->is_open() ) {
log() << "flushing diag log" << endl;
_diaglog.flush();
2009-02-02 18:18:22 -05:00
}
}
2009-12-22 15:22:37 -05:00
KillCurrentOp killCurrentOp;
2011-01-04 00:40:41 -05:00
2009-04-01 13:48:02 -04:00
int lockFile = 0;
#ifdef WIN32
HANDLE lockFileHandle;
#endif
// see FSyncCommand:
2011-01-04 00:40:41 -05:00
unsigned lockedForWriting;
2010-05-26 00:46:49 -04:00
mongo::mutex lockedForWritingMutex("lockedForWriting");
bool unlockRequested = false;
2009-01-28 18:08:02 -05:00
void inProgCmd( Message &m, DbResponse &dbresponse ) {
BSONObjBuilder b;
2011-01-04 00:40:41 -05:00
if( ! cc().isAdmin() ) {
BSONObjBuilder b;
b.append("err", "unauthorized");
}
else {
2010-02-09 16:58:03 -05:00
DbMessage d(m);
QueryMessage q(d);
bool all = q.query["$all"].trueValue();
vector<BSONObj> vals;
{
2010-02-09 16:58:03 -05:00
Client& me = cc();
scoped_lock bl(Client::clientsMutex);
2011-01-04 00:40:41 -05:00
for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) {
Client *c = *i;
2010-06-02 10:01:42 -04:00
assert( c );
CurOp* co = c->curop();
if ( c == &me && !co ) {
continue;
}
2010-06-02 10:01:42 -04:00
assert( co );
if( all || co->active() )
vals.push_back( co->infoNoauth() );
}
}
b.append("inprog", vals);
unsigned x = lockedForWriting;
if( x ) {
b.append("fsyncLock", x);
b.append("info", "use db.$cmd.sys.unlock.findOne() to terminate the fsync write/snapshot lock");
}
}
2011-01-04 00:40:41 -05:00
replyToQuery(0, m, dbresponse, b.obj());
2009-01-28 18:08:02 -05:00
}
2011-01-04 00:40:41 -05:00
2009-01-28 18:08:02 -05:00
void killOp( Message &m, DbResponse &dbresponse ) {
BSONObj obj;
2011-01-04 00:40:41 -05:00
if( ! cc().isAdmin() ) {
2009-01-29 09:27:45 -05:00
obj = fromjson("{\"err\":\"unauthorized\"}");
}
2011-01-04 00:40:41 -05:00
/*else if( !dbMutexInfo.isLocked() )
2009-01-28 18:08:02 -05:00
obj = fromjson("{\"info\":\"no op in progress/not locked\"}");
2009-12-22 15:22:37 -05:00
*/
2009-01-28 18:08:02 -05:00
else {
2009-12-22 15:22:37 -05:00
DbMessage d(m);
QueryMessage q(d);
BSONElement e = q.query.getField("op");
2011-01-04 00:40:41 -05:00
if( !e.isNumber() ) {
2009-12-22 15:22:37 -05:00
obj = fromjson("{\"err\":\"no op number field specified?\"}");
}
2011-01-04 00:40:41 -05:00
else {
log() << "going to kill op: " << e << endl;
2009-12-22 15:22:37 -05:00
obj = fromjson("{\"info\":\"attempting to kill op\"}");
killCurrentOp.kill( (unsigned) e.number() );
}
2009-01-28 18:08:02 -05:00
}
replyToQuery(0, m, dbresponse, obj);
}
2009-12-21 13:19:20 -05:00
void unlockFsync(const char *ns, Message& m, DbResponse &dbresponse) {
BSONObj obj;
2011-01-04 00:40:41 -05:00
if( ! cc().isAdmin() || strncmp(ns, "admin.", 6) != 0 ) {
obj = fromjson("{\"err\":\"unauthorized\"}");
}
else {
2011-01-04 00:40:41 -05:00
if( lockedForWriting ) {
log() << "command: unlock requested" << endl;
obj = fromjson("{ok:1,\"info\":\"unlock requested\"}");
unlockRequested = true;
}
2011-01-04 00:40:41 -05:00
else {
obj = fromjson("{ok:0,\"errmsg\":\"not locked\"}");
}
}
replyToQuery(0, m, dbresponse, obj);
}
2011-01-04 00:40:41 -05:00
static bool receivedQuery(Client& c, DbResponse& dbresponse, Message& m ) {
bool ok = true;
2010-05-12 15:26:00 -07:00
MSGID responseTo = m.header()->id;
DbMessage d(m);
QueryMessage q(d);
2010-05-18 22:09:22 -07:00
auto_ptr< Message > resp( new Message() );
CurOp& op = *(c.curop());
2011-01-04 00:40:41 -05:00
try {
2010-06-05 22:37:59 -04:00
dbresponse.exhaust = runQuery(m, q, op, *resp);
2010-05-18 22:09:22 -07:00
assert( !resp->empty() );
}
catch ( AssertionException& e ) {
ok = false;
2009-12-29 23:30:29 -05:00
op.debug().str << " exception ";
2011-01-04 00:40:41 -05:00
LOGSOME {
2010-05-15 18:48:13 -04:00
log() << "assertion " << e.toString() << " ns:" << q.ns << " query:" <<
2011-01-04 00:40:41 -05:00
(q.query.valid() ? q.query.toString() : "query object is corrupt") << endl;
2010-05-15 18:48:13 -04:00
if( q.ntoskip || q.ntoreturn )
log() << " ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << endl;
}
BSONObjBuilder err;
2010-06-21 13:41:34 -04:00
e.getInfo().append( err );
BSONObj errObj = err.done();
BufBuilder b;
b.skip(sizeof(QueryResult));
b.appendBuf((void*) errObj.objdata(), errObj.objsize());
// todo: call replyToQuery() from here instead of this!!! see dbmessage.h
2010-05-18 22:09:22 -07:00
QueryResult * msgdata = (QueryResult *) b.buf();
b.decouple();
QueryResult *qr = msgdata;
2010-07-18 13:34:16 -04:00
qr->_resultFlags() = ResultFlag_ErrSet;
2010-07-22 14:20:29 -04:00
if ( e.getCode() == StaleConfigInContextCode )
qr->_resultFlags() |= ResultFlag_ShardConfigStale;
qr->len = b.len();
qr->setOperation(opReply);
qr->cursorId = 0;
qr->startingFrom = 0;
qr->nReturned = 1;
2010-05-18 22:09:22 -07:00
resp.reset( new Message() );
resp->setData( msgdata, true );
}
2010-05-18 22:09:22 -07:00
2011-01-04 00:40:41 -05:00
if ( op.shouldDBProfile( 0 ) ) {
2010-05-12 15:26:00 -07:00
op.debug().str << " bytes:" << resp->header()->dataLen();
}
2011-01-04 00:40:41 -05:00
2010-05-18 22:09:22 -07:00
dbresponse.response = resp.release();
dbresponse.responseTo = responseTo;
2011-01-04 00:40:41 -05:00
return ok;
}
2009-01-28 18:08:02 -05:00
// Returns false when request includes 'end'
void assembleResponse( Message &m, DbResponse &dbresponse, const SockAddr &client ) {
2009-01-28 18:08:02 -05:00
// before we lock...
2010-05-12 15:26:00 -07:00
int op = m.operation();
2010-02-11 12:52:26 -05:00
bool isCommand = false;
2010-05-12 15:26:00 -07:00
const char *ns = m.singleData()->_data + 4;
2009-11-28 13:57:30 -05:00
if ( op == dbQuery ) {
2009-12-03 11:50:09 -05:00
if( strstr(ns, ".$cmd") ) {
2010-02-11 12:52:26 -05:00
isCommand = true;
2010-04-23 19:32:21 -04:00
opwrite(m);
2011-01-04 00:40:41 -05:00
if( strstr(ns, ".$cmd.sys.") ) {
if( strstr(ns, "$cmd.sys.inprog") ) {
inProgCmd(m, dbresponse);
return;
}
2011-01-04 00:40:41 -05:00
if( strstr(ns, "$cmd.sys.killop") ) {
killOp(m, dbresponse);
return;
}
2011-01-04 00:40:41 -05:00
if( strstr(ns, "$cmd.sys.unlock") ) {
unlockFsync(ns, m, dbresponse);
return;
}
2009-01-28 18:08:02 -05:00
}
2010-02-16 00:56:22 -05:00
}
else {
2010-04-23 19:32:21 -04:00
opread(m);
2009-01-28 18:08:02 -05:00
}
}
2009-11-28 13:57:30 -05:00
else if( op == dbGetMore ) {
2010-04-23 19:32:21 -04:00
opread(m);
2009-11-28 13:57:30 -05:00
}
2010-02-16 00:56:22 -05:00
else {
2010-04-23 19:32:21 -04:00
opwrite(m);
2010-02-16 00:56:22 -05:00
}
2011-01-04 00:40:41 -05:00
2010-02-11 12:52:26 -05:00
globalOpCounters.gotOp( op , isCommand );
2011-01-04 00:40:41 -05:00
2009-12-21 13:19:20 -05:00
Client& c = cc();
2011-01-04 00:40:41 -05:00
2009-12-30 12:13:05 -05:00
auto_ptr<CurOp> nestedOp;
CurOp* currentOpP = c.curop();
2011-01-04 00:40:41 -05:00
if ( currentOpP->active() ) {
2010-02-04 15:56:02 -05:00
nestedOp.reset( new CurOp( &c , currentOpP ) );
2009-12-30 12:13:05 -05:00
currentOpP = nestedOp.get();
}
CurOp& currentOp = *currentOpP;
currentOp.reset(client,op);
2011-01-04 00:40:41 -05:00
2009-12-29 23:30:29 -05:00
OpDebug& debug = currentOp.debug();
StringBuilder& ss = debug.str;
2010-02-04 15:56:02 -05:00
ss << opToString( op ) << " ";
int logThreshold = cmdLine.slowMS;
bool log = logLevel >= 1;
2011-01-04 00:40:41 -05:00
2009-11-28 13:57:30 -05:00
if ( op == dbQuery ) {
if ( handlePossibleShardedMessage( m , &dbresponse ) )
return;
receivedQuery(c , dbresponse, m );
2009-01-28 18:08:02 -05:00
}
2009-11-28 13:57:30 -05:00
else if ( op == dbGetMore ) {
2009-12-29 23:30:29 -05:00
if ( ! receivedGetMore(dbresponse, m, currentOp) )
2009-12-02 15:14:37 -05:00
log = true;
2009-09-03 12:52:08 -04:00
}
2009-11-28 13:57:30 -05:00
else if ( op == dbMsg ) {
2010-02-16 01:05:04 -05:00
// deprecated - replaced by commands
2010-05-12 15:26:00 -07:00
char *p = m.singleData()->_data;
int len = strlen(p);
if ( len > 400 )
out() << curTimeMillis() % 10000 <<
2011-01-04 00:40:41 -05:00
" long msg received, len:" << len << endl;
2010-02-16 01:05:04 -05:00
Message *resp = new Message();
2010-02-16 01:05:04 -05:00
if ( strcmp( "end" , p ) == 0 )
resp->setData( opReply , "dbMsg end no longer supported" );
else
resp->setData( opReply , "i am fine - dbMsg deprecated");
dbresponse.response = resp;
2010-05-12 15:26:00 -07:00
dbresponse.responseTo = m.header()->id;
2008-12-28 20:28:49 -05:00
}
2009-01-18 20:31:33 -05:00
else {
2010-05-12 15:26:00 -07:00
const char *ns = m.singleData()->_data + 4;
2009-01-18 20:31:33 -05:00
char cl[256];
2009-12-31 16:31:07 -05:00
nsToDatabase(ns, cl);
2011-01-04 00:40:41 -05:00
if( ! c.getAuthenticationInfo()->isAuthorized(cl) ) {
2009-01-18 20:31:33 -05:00
uassert_nothrow("unauthorized");
}
else {
2009-01-18 20:31:33 -05:00
try {
if ( op == dbInsert ) {
receivedInsert(m, currentOp);
}
else if ( op == dbUpdate ) {
receivedUpdate(m, currentOp);
}
else if ( op == dbDelete ) {
receivedDelete(m, currentOp);
}
else if ( op == dbKillCursors ) {
2010-02-04 15:56:02 -05:00
currentOp.ensureStarted();
logThreshold = 10;
ss << "killcursors ";
receivedKillCursors(m);
}
else {
2010-05-23 15:07:32 -04:00
mongo::log() << " operation isn't supported: " << op << endl;
currentOp.done();
log = true;
}
2009-01-18 20:31:33 -05:00
}
catch ( UserException& ue ) {
tlog(3) << " Caught Assertion in " << opToString(op) << ", continuing " << ue.toString() << endl;
ss << " exception " << ue.toString();
}
2009-01-18 20:31:33 -05:00
catch ( AssertionException& e ) {
tlog(3) << " Caught Assertion in " << opToString(op) << ", continuing " << e.toString() << endl;
ss << " exception " << e.toString();
log = true;
2009-01-18 20:31:33 -05:00
}
}
2008-12-28 20:28:49 -05:00
}
currentOp.ensureStarted();
currentOp.done();
int ms = currentOp.totalTimeMillis();
2011-01-04 00:40:41 -05:00
//DEV log = true;
2009-10-04 21:49:36 -04:00
if ( log || ms > logThreshold ) {
if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && ms < 3000 && !log ) {
/* it's normal for getMore on the oplog to be slow because of use of awaitdata flag. */
2011-01-04 00:40:41 -05:00
}
else {
ss << ' ' << ms << "ms";
2010-05-19 12:11:17 -04:00
mongo::tlog() << ss.str() << endl;
}
}
2011-01-04 00:40:41 -05:00
if ( currentOp.shouldDBProfile( ms ) ) {
// performance profiling is on
2011-01-04 00:40:41 -05:00
if ( dbMutex.getState() < 0 ) {
2010-02-09 23:17:48 -05:00
mongo::log(1) << "note: not profiling because recursive read lock" << endl;
}
else {
2010-10-04 17:00:48 -04:00
writelock lk;
2011-01-04 00:40:41 -05:00
if ( dbHolder.isLoaded( nsToDatabase( currentOp.getNS() ) , dbpath ) ) {
Client::Context c( currentOp.getNS() );
profile(ss.str().c_str(), ms);
}
else {
2010-02-09 23:17:48 -05:00
mongo::log() << "note: not profiling because db went away - probably a close on: " << currentOp.getNS() << endl;
}
}
2008-12-28 20:28:49 -05:00
}
2009-12-31 16:22:28 -05:00
} /* assembleResponse() */
2008-12-28 20:28:49 -05:00
void receivedKillCursors(Message& m) {
2010-05-12 15:26:00 -07:00
int *x = (int *) m.singleData()->_data;
x++; // reserved
int n = *x++;
2010-09-30 10:07:17 -04:00
2011-01-04 00:40:41 -05:00
assert( m.dataSize() == 8 + ( 8 * n ) );
2010-09-30 10:07:17 -04:00
2010-02-12 11:32:44 -05:00
uassert( 13004 , "sent 0 cursors to kill" , n >= 1 );
if ( n > 2000 ) {
log( n < 30000 ? LL_WARNING : LL_ERROR ) << "receivedKillCursors, n=" << n << endl;
assert( n < 30000 );
2008-12-28 20:28:49 -05:00
}
2011-01-04 00:40:41 -05:00
2010-09-30 10:07:17 -04:00
int found = ClientCursor::erase(n, (long long *) x);
2011-01-04 00:40:41 -05:00
if ( logLevel > 0 || found != n ) {
2010-09-30 10:07:17 -04:00
log( found == n ) << "killcursors: found " << found << " of " << n << endl;
}
2008-12-28 20:28:49 -05:00
}
/* db - database name
path - db directory
*/
2010-08-26 10:42:07 -04:00
/*static*/ void Database::closeDatabase( const char *db, const string& path ) {
assertInWriteLock();
2011-01-04 00:40:41 -05:00
Client::Context * ctx = cc().getContext();
assert( ctx );
assert( ctx->inDB( db , path ) );
Database *database = ctx->db();
assert( database->name == db );
2011-01-04 00:40:41 -05:00
2010-08-25 22:43:45 -04:00
oplogCheckCloseDatabase( database ); // oplog caches some things, dirty its caches
2011-01-04 00:40:41 -05:00
if( BackgroundOperation::inProgForDb(db) ) {
log() << "warning: bg op in prog during close db? " << db << endl;
}
/* important: kill all open cursors on the database */
string prefix(db);
prefix += '.';
ClientCursor::invalidate(prefix.c_str());
2008-12-28 20:28:49 -05:00
NamespaceDetailsTransient::clearForPrefix( prefix.c_str() );
2009-03-03 15:32:51 -05:00
dbHolder.erase( db, path );
ctx->clear();
2010-08-26 10:42:07 -04:00
delete database; // closes files
2008-12-28 20:28:49 -05:00
}
2009-12-29 23:30:29 -05:00
void receivedUpdate(Message& m, CurOp& op) {
DbMessage d(m);
const char *ns = d.getns();
assert(*ns);
uassert( 10054 , "not master", isMasterNs( ns ) );
2009-12-29 23:30:29 -05:00
op.debug().str << ns << ' ';
int flags = d.pullInt();
BSONObj query = d.nextJsObj();
assert( d.moreJSObjs() );
2010-05-12 15:26:00 -07:00
assert( query.objsize() < m.header()->dataLen() );
BSONObj toupdate = d.nextJsObj();
uassert( 10055 , "update object too large", toupdate.objsize() <= BSONObjMaxUserSize);
2010-05-12 15:26:00 -07:00
assert( toupdate.objsize() < m.header()->dataLen() );
assert( query.objsize() + toupdate.objsize() < m.header()->dataLen() );
2010-01-03 16:37:38 -05:00
bool upsert = flags & UpdateOption_Upsert;
bool multi = flags & UpdateOption_Multi;
bool broadcast = flags & UpdateOption_Broadcast;
{
string s = query.toString();
2011-01-04 00:40:41 -05:00
/* todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down.
instead, let's just story the query BSON in the debug object, and it can toString()
2010-02-03 17:51:06 -05:00
lazily
*/
2009-12-29 23:30:29 -05:00
op.debug().str << " query: " << s;
op.setQuery(query);
2011-01-04 00:40:41 -05:00
}
2010-10-04 17:00:48 -04:00
writelock lk;
// if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit
if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) )
return;
Client::Context ctx( ns );
2009-12-29 23:30:29 -05:00
UpdateResult res = updateObjects(ns, toupdate, query, upsert, multi, true, op.debug() );
lastError.getSafe()->recordUpdate( res.existing , res.num , res.upserted ); // for getlasterror
2008-12-28 20:28:49 -05:00
}
2009-12-29 23:30:29 -05:00
void receivedDelete(Message& m, CurOp& op) {
DbMessage d(m);
2008-12-28 20:28:49 -05:00
const char *ns = d.getns();
assert(*ns);
uassert( 10056 , "not master", isMasterNs( ns ) );
op.debug().str << ns << ' ';
int flags = d.pullInt();
2010-07-22 22:38:35 -04:00
bool justOne = flags & RemoveOption_JustOne;
bool broadcast = flags & RemoveOption_Broadcast;
assert( d.moreJSObjs() );
BSONObj pattern = d.nextJsObj();
{
string s = pattern.toString();
2009-12-29 23:30:29 -05:00
op.debug().str << " query: " << s;
op.setQuery(pattern);
2011-01-04 00:40:41 -05:00
}
2010-02-16 00:56:22 -05:00
writelock lk(ns);
// if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit
if ( ! broadcast & handlePossibleShardedMessage( m , 0 ) )
return;
2011-01-04 00:40:41 -05:00
2010-02-16 00:56:22 -05:00
Client::Context ctx(ns);
2011-01-04 00:40:41 -05:00
2010-02-03 12:03:33 -05:00
long long n = deleteObjects(ns, pattern, justOne, true);
lastError.getSafe()->recordDelete( n );
}
2011-01-04 00:40:41 -05:00
QueryResult* emptyMoreResult(long long);
2009-12-29 23:30:29 -05:00
bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop ) {
2010-02-14 10:44:06 -05:00
StringBuilder& ss = curop.debug().str;
2009-12-02 15:14:37 -05:00
bool ok = true;
2011-01-04 00:40:41 -05:00
DbMessage d(m);
2010-02-14 10:44:06 -05:00
const char *ns = d.getns();
int ntoreturn = d.pullInt();
long long cursorid = d.pullInt64();
2011-01-04 00:40:41 -05:00
2010-06-06 15:46:55 -04:00
ss << ns << " cid:" << cursorid;
2011-01-04 00:40:41 -05:00
if( ntoreturn )
2010-06-06 15:46:55 -04:00
ss << " ntoreturn:" << ntoreturn;
2010-02-14 10:44:06 -05:00
2011-01-04 00:40:41 -05:00
time_t start = 0;
int pass = 0;
2010-06-06 15:46:55 -04:00
bool exhaust = false;
QueryResult* msgdata;
2010-03-26 16:51:32 -04:00
while( 1 ) {
try {
2010-10-04 17:00:48 -04:00
readlock lk;
Client::Context ctx(ns);
2010-06-06 15:46:55 -04:00
msgdata = processGetMore(ns, ntoreturn, cursorid, curop, pass, exhaust);
}
2011-01-04 00:40:41 -05:00
catch ( GetMoreWaitException& ) {
2010-06-06 15:46:55 -04:00
exhaust = false;
2010-03-26 16:51:32 -04:00
massert(13073, "shutting down", !inShutdown() );
2011-01-04 00:40:41 -05:00
if( pass == 0 ) {
start = time(0);
}
else {
if( time(0) - start >= 4 ) {
// after about 4 seconds, return. this is a sanity check. pass stops at 1000 normally
// for DEV this helps and also if sleep is highly inaccurate on a platform. we want to
// return occasionally so slave can checkpoint.
pass = 10000;
}
}
pass++;
2011-01-04 00:40:41 -05:00
DEV
sleepmillis(20);
else
2010-07-30 16:21:28 -04:00
sleepmillis(2);
continue;
}
catch ( AssertionException& e ) {
2010-06-06 15:46:55 -04:00
exhaust = false;
ss << " exception " << e.toString();
msgdata = emptyMoreResult(cursorid);
ok = false;
}
break;
2010-03-26 16:51:32 -04:00
};
Message *resp = new Message();
resp->setData(msgdata, true);
2010-05-12 15:26:00 -07:00
ss << " bytes:" << resp->header()->dataLen();
ss << " nreturned:" << msgdata->nReturned;
dbresponse.response = resp;
2010-05-12 15:26:00 -07:00
dbresponse.responseTo = m.header()->id;
2011-01-04 00:40:41 -05:00
if( exhaust ) {
ss << " exhaust ";
2010-07-13 00:56:01 -04:00
dbresponse.exhaust = ns;
}
2009-12-02 15:14:37 -05:00
return ok;
2008-12-28 20:28:49 -05:00
}
2009-12-29 23:30:29 -05:00
void receivedInsert(Message& m, CurOp& op) {
DbMessage d(m);
2011-01-04 00:40:41 -05:00
const char *ns = d.getns();
assert(*ns);
uassert( 10058 , "not master", isMasterNs( ns ) );
2009-12-29 23:30:29 -05:00
op.debug().str << ns;
2010-02-16 00:56:22 -05:00
writelock lk(ns);
if ( handlePossibleShardedMessage( m , 0 ) )
return;
2011-01-04 00:40:41 -05:00
Client::Context ctx(ns);
2011-01-13 21:23:12 -05:00
int n = 0;
while ( d.moreJSObjs() ) {
BSONObj js = d.nextJsObj();
uassert( 10059 , "object to insert too large", js.objsize() <= BSONObjMaxUserSize);
2010-11-01 23:43:57 -04:00
2011-01-04 00:40:41 -05:00
{
// check no $ modifiers
2010-11-01 23:43:57 -04:00
BSONObjIterator i( js );
2011-01-04 00:40:41 -05:00
while ( i.more() ) {
2010-11-01 23:43:57 -04:00
BSONElement e = i.next();
2010-12-06 09:47:10 -05:00
uassert( 13511 , "object to insert can't have $ modifiers" , e.fieldName()[0] != '$' );
2010-11-01 23:43:57 -04:00
}
}
2010-04-28 08:25:56 -04:00
theDataFileMgr.insertWithObjMod(ns, js, false);
logOp("i", ns, js);
2011-01-13 21:23:12 -05:00
if( ++n % 4 == 0 ) {
// if we are inserting quite a few, we may need to commit along the way
getDur().commitIfNeeded();
}
}
2011-01-13 21:23:12 -05:00
globalOpCounters.incInsertInWriteLock(n);
2008-12-28 20:28:49 -05:00
}
void getDatabaseNames( vector< string > &names , const string& usePath ) {
boost::filesystem::path path( usePath );
for ( boost::filesystem::directory_iterator i( path );
i != boost::filesystem::directory_iterator(); ++i ) {
if ( directoryperdb ) {
boost::filesystem::path p = *i;
string dbName = p.leaf();
p /= ( dbName + ".ns" );
2010-04-05 17:26:40 -04:00
if ( MMF::exists( p ) )
names.push_back( dbName );
2011-01-04 00:40:41 -05:00
}
else {
string fileName = boost::filesystem::path(*i).leaf();
if ( fileName.length() > 3 && fileName.substr( fileName.length() - 3, 3 ) == ".ns" )
names.push_back( fileName.substr( 0, fileName.length() - 3 ) );
}
}
}
2008-12-28 20:28:49 -05:00
2011-01-04 00:40:41 -05:00
/* returns true if there is data on this server. useful when starting replication.
local database does NOT count except for rsoplog collection.
2010-06-26 13:32:35 -04:00
*/
2011-01-04 00:40:41 -05:00
bool replHasDatabases() {
2010-06-26 13:32:35 -04:00
vector<string> names;
getDatabaseNames(names);
if( names.size() >= 2 ) return true;
2011-01-04 00:40:41 -05:00
if( names.size() == 1 ) {
if( names[0] != "local" )
return true;
// we have a local database. return true if oplog isn't empty
{
readlock lk(rsoplog);
BSONObj o;
2010-07-18 15:02:46 -04:00
if( Helpers::getFirst(rsoplog, o) )
return true;
}
}
2010-06-26 13:32:35 -04:00
return false;
}
bool DBDirectClient::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
if ( lastError._get() )
lastError.startRequest( toSend, lastError._get() );
DbResponse dbResponse;
assembleResponse( toSend, dbResponse );
assert( dbResponse.response );
2010-05-12 15:26:00 -07:00
dbResponse.response->concat(); // can get rid of this if we make response handling smarter
response = *dbResponse.response;
getDur().commitIfNeeded();
return true;
2008-12-28 20:28:49 -05:00
}
void DBDirectClient::say( Message &toSend ) {
if ( lastError._get() )
lastError.startRequest( toSend, lastError._get() );
DbResponse dbResponse;
assembleResponse( toSend, dbResponse );
getDur().commitIfNeeded();
2008-12-29 14:07:21 -05:00
}
auto_ptr<DBClientCursor> DBDirectClient::query(const string &ns, Query query, int nToReturn , int nToSkip ,
2011-01-04 00:40:41 -05:00
const BSONObj *fieldsToReturn , int queryOptions ) {
//if ( ! query.obj.isEmpty() || nToReturn != 0 || nToSkip != 0 || fieldsToReturn || queryOptions )
return DBClientBase::query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions );
//
//assert( query.obj.isEmpty() );
//throw UserException( (string)"yay:" + ns );
}
2011-01-04 00:40:41 -05:00
void DBDirectClient::killCursor( long long id ) {
2010-05-28 14:23:37 -04:00
ClientCursor::erase( id );
}
unsigned long long DBDirectClient::count(const string &ns, const BSONObj& query, int options, int limit, int skip ) {
readlock lk( ns );
string errmsg;
long long res = runCount( ns.c_str() , _countCmd( ns , query , options , limit , skip ) , errmsg );
2011-01-25 17:33:12 -05:00
if ( res == -1 )
return 0;
uassert( 13637 , str::stream() << "count failed in DBDirectClient: " << errmsg , res >= 0 );
return (unsigned long long )res;
}
2011-01-04 00:40:41 -05:00
DBClientBase * createDirectClient() {
2009-05-01 14:35:42 -04:00
return new DBDirectClient();
}
2010-05-26 00:46:49 -04:00
mongo::mutex exitMutex("exit");
2009-12-14 09:50:49 -05:00
int numExitCalls = 0;
2011-01-04 00:40:41 -05:00
bool inShutdown() {
2009-12-14 11:06:06 -05:00
return numExitCalls > 0;
2009-12-14 09:50:49 -05:00
}
2011-01-04 00:40:41 -05:00
void tryToOutputFatal( const string& s ) {
2009-12-14 09:50:49 -05:00
try {
rawOut( s );
return;
}
2011-01-04 00:40:41 -05:00
catch ( ... ) {}
2009-12-14 09:50:49 -05:00
try {
cerr << s << endl;
return;
}
2011-01-04 00:40:41 -05:00
catch ( ... ) {}
2009-12-14 09:50:49 -05:00
// uh - oh, not sure there is anything else we can do...
}
2010-11-17 22:31:38 -05:00
/** also called by ntservice.cpp */
void shutdownServer() {
2011-01-04 00:40:41 -05:00
log() << "shutdown: going to close listening sockets..." << endl;
2010-11-17 22:31:38 -05:00
ListeningSockets::get()->closeAll();
2011-01-19 10:48:20 -05:00
log() << "shutdown: going to flush diaglog..." << endl;
2010-11-17 22:31:38 -05:00
flushDiagLog();
/* must do this before unmapping mem or you may get a seg fault */
log() << "shutdown: going to close sockets..." << endl;
boost::thread close_socket_thread( boost::bind(MessagingPort::closeAllSockets, 0) );
// wait until file preallocation finishes
// we would only hang here if the file_allocator code generates a
// synchronous signal, which we don't expect
log() << "shutdown: waiting for fs preallocator..." << endl;
2011-01-09 01:45:11 -05:00
FileAllocator::get()->waitUntilFinished();
2011-01-04 00:40:41 -05:00
2010-11-28 10:13:01 -05:00
if( cmdLine.dur ) {
2011-01-22 22:52:06 -05:00
log() << "shutdown: lock for final commit..." << endl;
2011-01-10 18:38:48 -05:00
{
int n = 10;
while( 1 ) {
// we may already be in a read lock from earlier in the call stack, so do read lock here
// to be consistent with that.
readlocktry w("", 20000);
2011-01-10 18:38:48 -05:00
if( w.got() ) {
2011-01-22 22:52:06 -05:00
log() << "shutdown: final commit..." << endl;
2011-01-10 18:38:48 -05:00
getDur().commitNow();
break;
}
if( --n <= 0 ) {
log() << "shutdown: couldn't acquire write lock, aborting" << endl;
abort();
}
log() << "shutdown: waiting for write lock..." << endl;
}
}
2010-11-17 22:31:38 -05:00
MemoryMappedFile::flushAll(true);
}
2010-12-21 21:21:16 -05:00
2011-01-10 18:38:48 -05:00
log() << "shutdown: closing all files..." << endl;
2010-11-17 22:31:38 -05:00
stringstream ss3;
MemoryMappedFile::closeAllFiles( ss3 );
rawOut( ss3.str() );
2010-11-28 10:13:01 -05:00
if( cmdLine.dur ) {
2011-03-01 21:20:34 -05:00
dur::journalCleanup(true);
2010-11-17 22:31:38 -05:00
}
#if !defined(__sunos__)
2011-01-04 00:40:41 -05:00
if ( lockFile ) {
2010-11-17 22:31:38 -05:00
log() << "shutdown: removing fs lock..." << endl;
/* This ought to be an unlink(), but Eliot says the last
time that was attempted, there was a race condition
with acquirePathLock(). */
#ifdef WIN32
2011-01-14 15:13:05 -05:00
if( _chsize( lockFile , 0 ) )
log() << "couldn't remove fs lock " << getLastError() << endl;
CloseHandle(lockFileHandle);
#else
2011-01-04 00:40:41 -05:00
if( ftruncate( lockFile , 0 ) )
2010-11-17 22:31:38 -05:00
log() << "couldn't remove fs lock " << errnoWithDescription() << endl;
flock( lockFile, LOCK_UN );
#endif
2010-11-17 22:31:38 -05:00
}
#endif
}
/* not using log() herein in case we are already locked */
2011-01-04 00:40:41 -05:00
void dbexit( ExitCode rc, const char *why, bool tryToGetLock ) {
auto_ptr<writelocktry> wlt;
2011-01-04 00:40:41 -05:00
if ( tryToGetLock ) {
wlt.reset( new writelocktry( "" , 2 * 60 * 1000 ) );
uassert( 13455 , "dbexit timed out getting lock" , wlt->got() );
}
2011-01-04 00:40:41 -05:00
Client * c = currentClient.get();
{
scoped_lock lk( exitMutex );
2009-12-14 09:50:49 -05:00
if ( numExitCalls++ > 0 ) {
2011-01-04 00:40:41 -05:00
if ( numExitCalls > 5 ) {
2009-12-14 09:50:49 -05:00
// this means something horrible has happened
::_exit( rc );
}
stringstream ss;
ss << "dbexit: " << why << "; exiting immediately";
2009-12-14 09:50:49 -05:00
tryToOutputFatal( ss.str() );
if ( c ) c->shutdown();
2011-01-04 00:40:41 -05:00
::exit( rc );
}
}
2011-01-04 00:40:41 -05:00
{
stringstream ss;
ss << "dbexit: " << why;
tryToOutputFatal( ss.str() );
}
2011-01-04 00:40:41 -05:00
2009-12-14 09:50:49 -05:00
try {
2010-11-17 22:31:38 -05:00
shutdownServer(); // gracefully shutdown instance
2009-12-14 09:50:49 -05:00
}
2011-01-04 00:40:41 -05:00
catch ( ... ) {
2009-12-14 09:50:49 -05:00
tryToOutputFatal( "shutdown failed with exception" );
}
2010-05-26 00:46:49 -04:00
2011-01-04 00:40:41 -05:00
try {
2010-05-26 00:46:49 -04:00
mutexDebugger.programEnding();
}
catch (...) { }
2011-01-04 00:40:41 -05:00
2010-08-31 05:31:46 -04:00
tryToOutputFatal( "dbexit: really exiting now" );
if ( c ) c->shutdown();
::exit(rc);
}
2011-01-04 00:40:41 -05:00
#if !defined(__sunos__)
void writePid(int fd) {
stringstream ss;
ss << getpid() << endl;
string s = ss.str();
const char * data = s.c_str();
#ifdef WIN32
assert ( _write( fd, data, strlen( data ) ) );
#else
assert ( write( fd, data, strlen( data ) ) );
#endif
}
void acquirePathLock() {
2011-01-04 00:40:41 -05:00
string name = ( boost::filesystem::path( dbpath ) / "mongod.lock" ).native_file_string();
bool oldFile = false;
2010-08-20 14:05:11 -04:00
if ( boost::filesystem::exists( name ) && boost::filesystem::file_size( name ) > 0 ) {
oldFile = true;
}
#ifdef WIN32
2011-01-14 15:13:05 -05:00
lockFileHandle = CreateFileA( name.c_str(), GENERIC_READ | GENERIC_WRITE,
0 /* do not allow anyone else access */, NULL,
2011-01-14 15:13:05 -05:00
OPEN_ALWAYS /* success if fh can open */, 0, NULL );
if (lockFileHandle == INVALID_HANDLE_VALUE) {
DWORD code = GetLastError();
2011-01-14 15:13:05 -05:00
char *msg;
FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
NULL, code, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPSTR)&msg, 0, NULL);
uasserted( 13627 , msg );
}
lockFile = _open_osfhandle((intptr_t)lockFileHandle, 0);
#else
lockFile = open( name.c_str(), O_RDWR | O_CREAT , S_IRWXU | S_IRWXG | S_IRWXO );
2011-01-04 00:40:41 -05:00
if( lockFile <= 0 ) {
uasserted( 10309 , str::stream() << "Unable to create / open lock file for lockfilepath: " << name << ' ' << errnoWithDescription());
2010-08-20 14:05:11 -04:00
}
if (flock( lockFile, LOCK_EX | LOCK_NB ) != 0) {
close ( lockFile );
lockFile = 0;
uassert( 10310 , "Unable to acquire lock for lockfilepath: " + name, 0 );
}
#endif
2011-01-04 00:40:41 -05:00
if ( oldFile ) {
// we check this here because we want to see if we can get the lock
// if we can't, then its probably just another mongod running
string errmsg;
if (cmdLine.dur) {
2011-01-04 00:40:41 -05:00
if (!dur::haveJournalFiles()) {
vector<string> dbnames;
getDatabaseNames( dbnames );
if ( dbnames.size() == 0 ) {
// this means that mongod crashed
// between initial startup and when journaling was initialized
// it is safe to continue
}
else {
errmsg = str::stream()
<< "************** \n"
2011-02-10 14:18:31 -05:00
<< "old lock file: " << name << ". probably means unclean shutdown,\n"
<< "but there are no journal files to recover.\n"
2011-02-10 14:18:31 -05:00
<< "this is likely human error or filesystem corruption.\n"
2011-02-10 10:00:31 -05:00
<< "found " << dbnames.size() << " dbs.\n"
<< "see: http://dochub.mongodb.org/core/repair for more information\n"
<< "*************";
}
}
}
else {
errmsg = str::stream()
2011-01-04 00:40:41 -05:00
<< "************** \n"
<< "old lock file: " << name << ". probably means unclean shutdown\n"
<< "recommend removing file and running --repair\n"
<< "see: http://dochub.mongodb.org/core/repair for more information\n"
<< "*************";
}
if (!errmsg.empty()) {
cout << errmsg << endl;
#ifdef WIN32
CloseHandle( lockFileHandle );
#else
close ( lockFile );
#endif
lockFile = 0;
uassert( 12596 , "old lock file" , 0 );
}
}
// Not related to lock file, but this is where we handle unclean shutdown
if( !cmdLine.dur && dur::haveJournalFiles() ) {
cout << "**************" << endl;
cout << "Error: journal files are present in journal directory, yet starting without --dur enabled." << endl;
2011-01-26 17:25:08 -08:00
cout << "It is recommended that you start with journaling enabled so that recovery may occur." << endl;
cout << "Alternatively (not recommended), you can backup everything, then delete the journal files, and run --repair" << endl;
cout << "**************" << endl;
uasserted(13597, "can't start without --dur enabled when journal/ files are present");
}
#ifdef WIN32
2011-01-14 15:13:05 -05:00
uassert( 13625, "Unable to truncate lock file", _chsize(lockFile, 0) == 0);
writePid( lockFile );
_commit( lockFile );
#else
uassert( 13342, "Unable to truncate lock file", ftruncate(lockFile, 0) == 0);
writePid( lockFile );
2009-07-28 10:46:56 -04:00
fsync( lockFile );
#endif
}
#else
void acquirePathLock() {
2011-01-02 17:04:08 -05:00
// TODO - this is very bad that the code above not running here.
// Not related to lock file, but this is where we handle unclean shutdown
if( !cmdLine.dur && dur::haveJournalFiles() ) {
cout << "**************" << endl;
cout << "Error: journal files are present in journal directory, yet starting without --dur enabled." << endl;
2011-01-26 17:25:08 -08:00
cout << "It is recommended that you start with journaling enabled so that recovery may occur." << endl;
2011-01-02 17:04:08 -05:00
cout << "Alternatively (not recommended), you can backup everything, then delete the journal files, and run --repair" << endl;
cout << "**************" << endl;
2011-01-02 17:08:36 -05:00
uasserted(13618, "can't start without --dur enabled when journal/ files are present");
2011-01-02 17:04:08 -05:00
}
}
2011-01-04 00:40:41 -05:00
#endif
2009-01-14 17:09:51 -05:00
} // namespace mongo