diff --git a/db/client.cpp b/db/client.cpp index 38b94476911..5fed03780cc 100644 --- a/db/client.cpp +++ b/db/client.cpp @@ -43,7 +43,7 @@ namespace mongo { /* each thread which does db operations has a Client object in TLS. call this when your thread starts. */ - Client& Client::initThread(const char *desc, MessagingPort *mp) { + Client& Client::initThread(const char *desc, AbstractMessagingPort *mp) { assert( currentClient.get() == 0 ); Client *c = new Client(desc, mp); currentClient.reset(c); @@ -51,7 +51,7 @@ namespace mongo { return *c; } - Client::Client(const char *desc, MessagingPort *p) : + Client::Client(const char *desc, AbstractMessagingPort *p) : _context(0), _shutdown(false), _desc(desc), diff --git a/db/client.h b/db/client.h index 92bdb2f4fb2..b9d5030495e 100644 --- a/db/client.h +++ b/db/client.h @@ -38,7 +38,7 @@ namespace mongo { class CurOp; class Command; class Client; - class MessagingPort; + class AbstractMessagingPort; extern boost::thread_specific_ptr currentClient; @@ -58,7 +58,7 @@ namespace mongo { /* each thread which does db operations has a Client object in TLS. call this when your thread starts. */ - static Client& initThread(const char *desc, MessagingPort *mp = 0); + static Client& initThread(const char *desc, AbstractMessagingPort *mp = 0); ~Client(); @@ -92,7 +92,7 @@ namespace mongo { void gotHandshake( const BSONObj& o ); BSONObj getRemoteID() const { return _remoteId; } BSONObj getHandshake() const { return _handshake; } - MessagingPort * port() const { return _mp; } + AbstractMessagingPort * port() const { return _mp; } ConnectionId getConnectionId() const { return _connectionId; } private: @@ -106,9 +106,9 @@ namespace mongo { ReplTime _lastOp; BSONObj _handshake; BSONObj _remoteId; - MessagingPort * const _mp; + AbstractMessagingPort * const _mp; - Client(const char *desc, MessagingPort *p = 0); + Client(const char *desc, AbstractMessagingPort *p = 0); friend class CurOp; diff --git a/db/curop.h b/db/curop.h index 398395cd83d..efd33bd51b3 100644 --- a/db/curop.h +++ b/db/curop.h @@ -167,7 +167,7 @@ namespace mongo { _query.reset(); } - void reset( const SockAddr & remote, int op ) { + void reset( const HostAndPort& remote, int op ) { reset(); _remote = remote; _op = op; @@ -288,7 +288,7 @@ namespace mongo { int _dbprofile; // 0=off, 1=slow, 2=all AtomicUInt _opNum; char _ns[Namespace::MaxNsLen+2]; - struct SockAddr _remote; + HostAndPort _remote; CachedBSONObj _query; OpDebug _debug; ThreadSafeString _message; diff --git a/db/db.cpp b/db/db.cpp index 78e24147f11..948a5b6340e 100644 --- a/db/db.cpp +++ b/db/db.cpp @@ -36,6 +36,7 @@ #include "stats/snapshots.h" #include "../util/concurrency/task.h" #include "../util/version.h" +#include "../util/message_server.h" #include "client.h" #include "restapi.h" #include "dbwebserver.h" @@ -196,6 +197,64 @@ namespace mongo { sleepmicros( Client::recommendedYieldMicros() ); } + class MyMessageHandler : public MessageHandler { + public: + virtual void connected( AbstractMessagingPort* p ) { + Client& c = Client::initThread("conn", p); + c.getAuthenticationInfo()->isLocalHost = p->remote().isLocalHost(); + } + + virtual void process( Message& m , AbstractMessagingPort* port , LastError * le) { + while ( true ) { + if ( inShutdown() ) { + log() << "got request after shutdown()" << endl; + break; + } + + lastError.startRequest( m , le ); + + DbResponse dbresponse; + assembleResponse( m, dbresponse, port->remote() ); + + if ( dbresponse.response ) { + port->reply(m, *dbresponse.response, dbresponse.responseTo); + if( dbresponse.exhaust ) { + MsgData *header = dbresponse.response->header(); + QueryResult *qr = (QueryResult *) header; + long long cursorid = qr->cursorId; + if( cursorid ) { + assert( dbresponse.exhaust && *dbresponse.exhaust != 0 ); + string ns = dbresponse.exhaust; // before reset() free's it... + m.reset(); + BufBuilder b(512); + b.appendNum((int) 0 /*size set later in appendData()*/); + b.appendNum(header->id); + b.appendNum(header->responseTo); + b.appendNum((int) dbGetMore); + b.appendNum((int) 0); + b.appendStr(ns); + b.appendNum((int) 0); // ntoreturn + b.appendNum(cursorid); + m.appendData(b.buf(), b.len()); + b.decouple(); + DEV log() << "exhaust=true sending more" << endl; + beNice(); + continue; // this goes back to top loop + } + } + } + break; + } + } + + virtual void disconnected( AbstractMessagingPort* p ) { + Client * c = currentClient.get(); + if( c ) c->shutdown(); + globalScriptEngine->threadDone(); + } + + }; + /* we create one thread for each connection from an app server database. app server will open a pool of threads. todo: one day, asio... diff --git a/db/instance.cpp b/db/instance.cpp index ad8c7b1bb4f..afc6f9ee853 100644 --- a/db/instance.cpp +++ b/db/instance.cpp @@ -221,7 +221,7 @@ namespace mongo { } // Returns false when request includes 'end' - void assembleResponse( Message &m, DbResponse &dbresponse, const SockAddr &client ) { + void assembleResponse( Message &m, DbResponse &dbresponse, const HostAndPort& remote ) { // before we lock... int op = m.operation(); @@ -268,7 +268,7 @@ namespace mongo { currentOpP = nestedOp.get(); } CurOp& currentOp = *currentOpP; - currentOp.reset(client,op); + currentOp.reset(remote,op); OpDebug& debug = currentOp.debug(); StringBuilder& ss = debug.str; @@ -652,7 +652,7 @@ namespace mongo { if ( lastError._get() ) lastError.startRequest( toSend, lastError._get() ); DbResponse dbResponse; - assembleResponse( toSend, dbResponse ); + assembleResponse( toSend, dbResponse , HostAndPort( "localhost" , -1 ) ); assert( dbResponse.response ); dbResponse.response->concat(); // can get rid of this if we make response handling smarter response = *dbResponse.response; @@ -664,7 +664,7 @@ namespace mongo { if ( lastError._get() ) lastError.startRequest( toSend, lastError._get() ); DbResponse dbResponse; - assembleResponse( toSend, dbResponse ); + assembleResponse( toSend, dbResponse , HostAndPort( "localhost" , -1 ) ); getDur().commitIfNeeded(); } diff --git a/db/instance.h b/db/instance.h index e3c339210f6..7814eb0999c 100644 --- a/db/instance.h +++ b/db/instance.h @@ -103,7 +103,7 @@ namespace mongo { ~DbResponse() { delete response; } }; - void assembleResponse( Message &m, DbResponse &dbresponse, const SockAddr &client = unknownAddress ); + void assembleResponse( Message &m, DbResponse &dbresponse, const HostAndPort &client ); void getDatabaseNames( vector< string > &names , const string& usePath = dbpath ); diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 7e38f1c8702..8331c6faef5 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -65,7 +65,7 @@ namespace mongo { /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ { - MessagingPort *mp = cc().port(); + AbstractMessagingPort *mp = cc().port(); if( mp ) mp->tag |= 1; } diff --git a/s/s_only.cpp b/s/s_only.cpp index 83bceace37c..221eb5a3356 100644 --- a/s/s_only.cpp +++ b/s/s_only.cpp @@ -31,7 +31,7 @@ namespace mongo { boost::thread_specific_ptr currentClient; - Client::Client(const char *desc , MessagingPort *p) : + Client::Client(const char *desc , AbstractMessagingPort *p) : _context(0), _shutdown(false), _desc(desc), @@ -42,7 +42,7 @@ namespace mongo { Client::~Client() {} bool Client::shutdown() { return true; } - Client& Client::initThread(const char *desc, MessagingPort *mp) { + Client& Client::initThread(const char *desc, AbstractMessagingPort *mp) { setThreadName(desc); assert( currentClient.get() == 0 ); Client *c = new Client(desc, mp); diff --git a/s/server.cpp b/s/server.cpp index 0b014bf9fd5..b19667f41e0 100644 --- a/s/server.cpp +++ b/s/server.cpp @@ -352,6 +352,12 @@ int main(int argc, char* argv[]) { } #undef exit + +void mongo::exitCleanly( ExitCode code ) { + // TODO: do we need to add anything? + mongo::dbexit( code ); +} + void mongo::dbexit( ExitCode rc, const char *why, bool tryToGetLock ) { dbexitCalled = true; log() << "dbexit: " << why diff --git a/util/hostandport.h b/util/hostandport.h index fd2729609cf..7dd09029b5e 100644 --- a/util/hostandport.h +++ b/util/hostandport.h @@ -70,8 +70,10 @@ namespace mongo { bool isLocalHost() const; - // @returns host:port - string toString() const; + /** + * @param includePort host:port if true, host otherwise + */ + string toString( bool includePort=true ) const; operator string() const { return toString(); } @@ -130,7 +132,10 @@ namespace mongo { return HostAndPort(h, cmdLine.port); } - inline string HostAndPort::toString() const { + inline string HostAndPort::toString( bool includePort ) const { + if ( ! includePort ) + return _host; + stringstream ss; ss << _host; if ( _port != -1 ) { diff --git a/util/message.cpp b/util/message.cpp index c1724916839..f5b2d5c407d 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -327,12 +327,12 @@ namespace mongo { ports.closeAll(mask); } - MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), _bytesIn(0), _bytesOut(0), farEnd(_far), _timeout(), tag(0) { + MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), _bytesIn(0), _bytesOut(0), farEnd(_far), _timeout() { _logLevel = 0; ports.insert(this); } - MessagingPort::MessagingPort( double timeout, int ll ) : _bytesIn(0), _bytesOut(0), tag(0) { + MessagingPort::MessagingPort( double timeout, int ll ) : _bytesIn(0), _bytesOut(0) { _logLevel = ll; ports.insert(this); sock = -1; diff --git a/util/message.h b/util/message.h index 4b80cabbe2d..4dd1aa66760 100644 --- a/util/message.h +++ b/util/message.h @@ -78,6 +78,7 @@ namespace mongo { class AbstractMessagingPort : boost::noncopyable { public: + AbstractMessagingPort() : tag(0) {} virtual ~AbstractMessagingPort() { } virtual void reply(Message& received, Message& response, MSGID responseTo) = 0; // like the reply below, but doesn't rely on received.data still being available virtual void reply(Message& received, Message& response) = 0; @@ -86,7 +87,13 @@ namespace mongo { virtual unsigned remotePort() const = 0; private: - int _clientId; + + public: + // TODO make this private with some helpers + + /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */ + unsigned tag; + }; class MessagingPort : public AbstractMessagingPort { @@ -160,9 +167,6 @@ namespace mongo { static void closeAllSockets(unsigned tagMask = 0xffffffff); - /* ports can be tagged with various classes. see closeAllSockets(tag). defaults to 0. */ - unsigned tag; - friend class PiggyBackData; }; diff --git a/util/message_server.h b/util/message_server.h index defae0b59ed..e9e2c2c29fd 100644 --- a/util/message_server.h +++ b/util/message_server.h @@ -29,9 +29,21 @@ namespace mongo { class MessageHandler { public: virtual ~MessageHandler() {} - + + /** + * called once when a socket is connected + */ virtual void connected( AbstractMessagingPort* p ) = 0; + + /** + * called every time a message comes in + * handler is responsible for responding to client + */ virtual void process( Message& m , AbstractMessagingPort* p , LastError * err ) = 0; + + /** + * called once when a socket is disconnected + */ virtual void disconnected( AbstractMessagingPort* p ) = 0; }; diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp index cc69f898ec2..89325139cee 100644 --- a/util/message_server_port.cpp +++ b/util/message_server_port.cpp @@ -34,11 +34,11 @@ namespace mongo { void threadRun( MessagingPort * inPort) { TicketHolderReleaser connTicketReleaser( &connTicketHolder ); - - assert( inPort ); setThreadName( "conn" ); - + + assert( inPort ); + inPort->_logLevel = 1; scoped_ptr p( inPort ); string otherSide; @@ -52,7 +52,7 @@ namespace mongo { handler->connected( p.get() ); - while ( 1 ) { + while ( ! inShutdown() ) { m.reset(); p->clearCounters(); @@ -67,14 +67,25 @@ namespace mongo { networkCounter.hit( p->getBytesIn() , p->getBytesOut() ); } } - catch ( const SocketException& ) { - log() << "unclean socket shutdown from: " << otherSide << endl; + catch ( AssertionException& e ) { + log() << "AssertionException in connThread, closing client connection: " << e << endl; + p->shutdown(); } - catch ( const std::exception& e ) { - problem() << "uncaught exception (" << e.what() << ")(" << demangleName( typeid(e) ) <<") in PortMessageServer::threadRun, closing connection" << endl; + catch ( SocketException& e ) { + log() << "SocketException in connThread, closing client connection: " << e << endl; + p->shutdown(); + } + catch ( const ClockSkewException & ) { + log() << "ClockSkewException - shutting down" << endl; + exitCleanly( EXIT_CLOCK_SKEW ); + } + catch ( std::exception &e ) { + error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; + dbexit( EXIT_UNCAUGHT ); } catch ( ... ) { - problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl; + error() << "Uncaught exception, terminating" << endl; + dbexit( EXIT_UNCAUGHT ); } handler->disconnected( p.get() );