2008-06-06 09:43:15 -04:00
/* message
todo : authenticate ; encrypt ?
*/
2009-10-27 15:58:27 -04:00
/* Copyright 2009 10gen Inc.
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*/
2008-06-06 09:43:15 -04:00
# include "stdafx.h"
# include "message.h"
# include <time.h>
# include "../util/goodies.h"
2009-11-24 15:42:52 -05:00
# include "../util/background.h"
2008-10-24 17:51:28 -04:00
# include <fcntl.h>
2009-04-06 16:21:56 -04:00
# include <errno.h>
2009-12-20 07:17:37 -05:00
# include "../db/cmdline.h"
2008-06-06 09:43:15 -04:00
2009-01-14 17:09:51 -05:00
namespace mongo {
2009-03-13 12:07:00 -04:00
bool objcheck = false ;
2008-06-06 09:43:15 -04:00
// if you want trace output:
# define mmm(x)
2009-01-28 11:56:29 -05:00
# ifdef MSG_NOSIGNAL
const int portSendFlags = MSG_NOSIGNAL ;
# else
const int portSendFlags = 0 ;
# endif
2009-01-15 10:17:11 -05:00
/* listener ------------------------------------------------------------------- */
2009-04-01 12:26:31 -04:00
bool Listener : : init ( ) {
2009-04-29 14:14:51 -04:00
SockAddr me ;
if ( ip . empty ( ) )
me = SockAddr ( port ) ;
else
me = SockAddr ( ip . c_str ( ) , port ) ;
2009-04-01 12:26:31 -04:00
sock = : : socket ( AF_INET , SOCK_STREAM , 0 ) ;
2009-01-15 10:17:11 -05:00
if ( sock = = INVALID_SOCKET ) {
log ( ) < < " ERROR: listen(): invalid socket? " < < errno < < endl ;
2009-04-01 12:26:31 -04:00
return false ;
2009-01-15 10:17:11 -05:00
}
prebindOptions ( sock ) ;
if ( : : bind ( sock , ( sockaddr * ) & me . sa , me . addressSize ) ! = 0 ) {
log ( ) < < " listen(): bind() failed errno: " < < errno < < endl ;
if ( errno = = 98 )
log ( ) < < " 98 == addr already in use " < < endl ;
closesocket ( sock ) ;
2009-04-01 12:26:31 -04:00
return false ;
2009-01-15 10:17:11 -05:00
}
2008-12-28 20:28:49 -05:00
2009-01-15 10:17:11 -05:00
if ( : : listen ( sock , 128 ) ! = 0 ) {
log ( ) < < " listen(): listen() failed " < < errno < < endl ;
closesocket ( sock ) ;
2009-04-01 12:26:31 -04:00
return false ;
2009-01-15 10:17:11 -05:00
}
2009-04-01 12:26:31 -04:00
return true ;
}
2008-12-28 20:28:49 -05:00
2009-04-01 12:26:31 -04:00
void Listener : : listen ( ) {
2009-08-04 14:00:59 -04:00
static long connNumber = 0 ;
2009-01-15 10:17:11 -05:00
SockAddr from ;
while ( 1 ) {
int s = accept ( sock , ( sockaddr * ) & from . sa , & from . addressSize ) ;
if ( s < 0 ) {
2009-04-17 10:37:42 -04:00
if ( errno = = ECONNABORTED | | errno = = EBADF ) {
2009-04-01 12:26:31 -04:00
log ( ) < < " Listener on port " < < port < < " aborted " < < endl ;
return ;
}
2009-04-06 16:21:56 -04:00
log ( ) < < " Listener: accept() returns " < < s < < " errno: " < < errno < < " , strerror: " < < strerror ( errno ) < < endl ;
2009-01-15 10:17:11 -05:00
continue ;
}
disableNagle ( s ) ;
2009-12-20 07:17:37 -05:00
if ( ! cmdLine . quiet ) log ( ) < < " connection accepted from " < < from . toString ( ) < < " # " < < + + connNumber < < endl ;
2009-01-15 10:17:11 -05:00
accepted ( new MessagingPort ( s , from ) ) ;
2008-12-28 20:28:49 -05:00
}
}
2008-06-06 09:43:15 -04:00
2009-01-15 10:17:11 -05:00
/* messagingport -------------------------------------------------------------- */
2008-06-06 09:43:15 -04:00
2009-01-15 10:17:11 -05:00
class PiggyBackData {
public :
PiggyBackData ( MessagingPort * port ) {
_port = port ;
_buf = new char [ 1300 ] ;
_cur = _buf ;
}
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
~ PiggyBackData ( ) {
flush ( ) ;
delete ( _cur ) ;
}
2009-01-13 14:32:36 -05:00
2009-01-15 10:17:11 -05:00
void append ( Message & m ) {
assert ( m . data - > len < = 1300 ) ;
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
if ( len ( ) + m . data - > len > 1300 )
flush ( ) ;
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
memcpy ( _cur , m . data , m . data - > len ) ;
_cur + = m . data - > len ;
}
2009-01-13 14:32:36 -05:00
2009-01-15 10:17:11 -05:00
int flush ( ) {
if ( _buf = = _cur )
return 0 ;
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
int x = : : send ( _port - > sock , _buf , len ( ) , 0 ) ;
_cur = _buf ;
return x ;
}
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
int len ( ) {
return _cur - _buf ;
}
2009-01-13 14:32:36 -05:00
2009-01-15 10:17:11 -05:00
private :
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
MessagingPort * _port ;
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
char * _buf ;
char * _cur ;
} ;
2009-01-13 14:32:36 -05:00
2009-06-01 14:20:49 -04:00
class Ports {
set < MessagingPort * > & ports ;
boost : : mutex & m ;
public :
// we "new" this so it is still be around when other automatic global vars
// are being destructed during termination.
Ports ( ) : ports ( * ( new set < MessagingPort * > ( ) ) ) ,
m ( * ( new boost : : mutex ( ) ) ) { }
void closeAll ( ) { \
boostlock bl ( m ) ;
for ( set < MessagingPort * > : : iterator i = ports . begin ( ) ; i ! = ports . end ( ) ; i + + )
( * i ) - > shutdown ( ) ;
}
void insert ( MessagingPort * p ) {
boostlock bl ( m ) ;
ports . insert ( p ) ;
}
void erase ( MessagingPort * p ) {
boostlock bl ( m ) ;
ports . erase ( p ) ;
}
} ports ;
2009-01-15 10:17:11 -05:00
void closeAllSockets ( ) {
2009-06-01 14:20:49 -04:00
ports . closeAll ( ) ;
2009-01-15 10:17:11 -05:00
}
MessagingPort : : MessagingPort ( int _sock , SockAddr & _far ) : sock ( _sock ) , piggyBackData ( 0 ) , farEnd ( _far ) {
ports . insert ( this ) ;
}
MessagingPort : : MessagingPort ( ) {
ports . insert ( this ) ;
2008-12-28 20:28:49 -05:00
sock = - 1 ;
2009-01-15 10:17:11 -05:00
piggyBackData = 0 ;
2008-12-28 20:28:49 -05:00
}
2008-06-06 09:43:15 -04:00
2009-01-15 10:17:11 -05:00
void MessagingPort : : shutdown ( ) {
if ( sock > = 0 ) {
closesocket ( sock ) ;
sock = - 1 ;
}
}
MessagingPort : : ~ MessagingPort ( ) {
if ( piggyBackData )
delete ( piggyBackData ) ;
shutdown ( ) ;
ports . erase ( this ) ;
}
2008-06-06 09:43:15 -04:00
2009-01-15 10:17:11 -05:00
class ConnectBG : public BackgroundJob {
public :
int sock ;
int res ;
SockAddr farEnd ;
void run ( ) {
res = : : connect ( sock , ( sockaddr * ) & farEnd . sa , farEnd . addressSize ) ;
}
} ;
2008-11-03 20:42:59 -05:00
2009-01-15 10:17:11 -05:00
bool MessagingPort : : connect ( SockAddr & _far )
{
farEnd = _far ;
2008-06-06 09:43:15 -04:00
2009-01-15 10:17:11 -05:00
sock = socket ( AF_INET , SOCK_STREAM , 0 ) ;
if ( sock = = INVALID_SOCKET ) {
log ( ) < < " ERROR: connect(): invalid socket? " < < errno < < endl ;
return false ;
}
2008-10-24 17:51:28 -04:00
#if 0
2009-01-15 10:17:11 -05:00
long fl = fcntl ( sock , F_GETFL , 0 ) ;
assert ( fl > = 0 ) ;
fl | = O_NONBLOCK ;
fcntl ( sock , F_SETFL , fl ) ;
int res = : : connect ( sock , ( sockaddr * ) & farEnd . sa , farEnd . addressSize ) ;
if ( res ) {
if ( errno = = EINPROGRESS )
//log() << "connect(): failed errno:" << errno << ' ' << farEnd.getPort() << endl;
closesocket ( sock ) ;
sock = - 1 ;
return false ;
}
2008-10-24 17:51:28 -04:00
# endif
2009-01-15 10:17:11 -05:00
ConnectBG bg ;
bg . sock = sock ;
bg . farEnd = farEnd ;
bg . go ( ) ;
2008-11-03 20:42:59 -05:00
2009-01-15 10:17:11 -05:00
// int res = ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize);
if ( bg . wait ( 5000 ) ) {
if ( bg . res ) {
closesocket ( sock ) ;
sock = - 1 ;
return false ;
}
}
else {
// time out the connect
2008-12-28 20:28:49 -05:00
closesocket ( sock ) ;
sock = - 1 ;
2009-01-15 10:17:11 -05:00
bg . wait ( ) ; // so bg stays in scope until bg thread terminates
2008-11-03 20:42:59 -05:00
return false ;
}
2009-01-15 10:17:11 -05:00
disableNagle ( sock ) ;
2009-01-28 11:56:29 -05:00
# ifdef SO_NOSIGPIPE
// osx
const int one = 1 ;
setsockopt ( sock , SOL_SOCKET , SO_NOSIGPIPE , & one , sizeof ( int ) ) ;
# endif
2009-01-15 10:17:11 -05:00
return true ;
}
2008-06-06 09:43:15 -04:00
2009-01-15 10:17:11 -05:00
bool MessagingPort : : recv ( Message & m ) {
2008-09-04 10:33:56 -04:00
again :
2009-01-15 11:26:38 -05:00
mmm ( out ( ) < < " * recv() sock: " < < this - > sock < < endl ; )
2009-01-15 10:17:11 -05:00
int len = - 1 ;
char * lenbuf = ( char * ) & len ;
int lft = 4 ;
while ( 1 ) {
int x = : : recv ( sock , lenbuf , lft , 0 ) ;
if ( x = = 0 ) {
2009-01-15 11:26:38 -05:00
DEV out ( ) < < " MessagingPort recv() conn closed? " < < farEnd . toString ( ) < < endl ;
2009-01-15 10:17:11 -05:00
m . reset ( ) ;
return false ;
}
if ( x < 0 ) {
2009-05-04 11:17:26 -04:00
log ( ) < < " MessagingPort recv() error \" " < < strerror ( errno ) < < " \" ( " < < errno < < " ) " < < farEnd . toString ( ) < < endl ;
2009-01-15 10:17:11 -05:00
m . reset ( ) ;
return false ;
}
lft - = x ;
if ( lft = = 0 )
break ;
lenbuf + = x ;
log ( ) < < " MessagingPort recv() got " < < x < < " bytes wanted 4, lft= " < < lft < < endl ;
assert ( lft > 0 ) ;
}
if ( len < 0 | | len > 16000000 ) {
if ( len = = - 1 ) {
// Endian check from the database, after connecting, to see what mode server is running in.
unsigned foo = 0x10203040 ;
2009-01-28 11:56:29 -05:00
int x = : : send ( sock , ( char * ) & foo , 4 , portSendFlags ) ;
2009-01-15 10:17:11 -05:00
if ( x < = 0 ) {
log ( ) < < " MessagingPort endian send() error " < < errno < < ' ' < < farEnd . toString ( ) < < endl ;
return false ;
}
goto again ;
}
2010-01-02 21:41:35 -05:00
if ( len = = 542393671 ) {
// an http GET
log ( ) < < " looks like you're trying to access db over http on native driver port. please add 1000 for webserver " < < endl ;
static const char * wrongPort = " HTTP 1.0 404 Not Found \n Connection: Close \n \n You are trying to access MongoDB on the native driver port. For http access, add 1000 to the port \n \n " ;
: : send ( sock , wrongPort , strlen ( wrongPort ) , 0 ) ;
return false ;
}
2009-01-15 10:17:11 -05:00
log ( ) < < " bad recv() len: " < < len < < ' \n ' ;
2008-12-28 20:28:49 -05:00
return false ;
}
2009-01-15 10:17:11 -05:00
int z = ( len + 1023 ) & 0xfffffc00 ;
assert ( z > = len ) ;
MsgData * md = ( MsgData * ) malloc ( z ) ;
md - > len = len ;
if ( len < = 0 ) {
2009-01-15 11:26:38 -05:00
out ( ) < < " got a length of " < < len < < " , something is wrong " < < endl ;
2008-12-28 20:28:49 -05:00
return false ;
}
2009-01-15 10:17:11 -05:00
char * p = ( char * ) & md - > id ;
int left = len - 4 ;
while ( 1 ) {
int x = : : recv ( sock , p , left , 0 ) ;
if ( x = = 0 ) {
2009-01-15 11:26:38 -05:00
DEV out ( ) < < " MessagingPort::recv(): conn closed? " < < farEnd . toString ( ) < < endl ;
2009-01-15 10:17:11 -05:00
m . reset ( ) ;
return false ;
}
if ( x < 0 ) {
log ( ) < < " MessagingPort recv() error " < < errno < < ' ' < < farEnd . toString ( ) < < endl ;
m . reset ( ) ;
2008-12-28 20:28:49 -05:00
return false ;
}
2009-01-15 10:17:11 -05:00
left - = x ;
p + = x ;
if ( left < = 0 )
break ;
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
m . setData ( md , true ) ;
return true ;
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
void MessagingPort : : reply ( Message & received , Message & response ) {
say ( /*received.from, */ response , received . data - > id ) ;
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
void MessagingPort : : reply ( Message & received , Message & response , MSGID responseTo ) {
say ( /*received.from, */ response , responseTo ) ;
2008-12-28 20:28:49 -05:00
}
2009-01-15 10:17:11 -05:00
bool MessagingPort : : call ( Message & toSend , Message & response ) {
2009-01-15 11:26:38 -05:00
mmm ( out ( ) < < " *call() " < < endl ; )
2009-01-15 10:17:11 -05:00
MSGID old = toSend . data - > id ;
say ( /*to,*/ toSend ) ;
while ( 1 ) {
bool ok = recv ( response ) ;
if ( ! ok )
return false ;
2009-01-15 11:26:38 -05:00
//out() << "got response: " << response.data->responseTo << endl;
2009-01-15 10:17:11 -05:00
if ( response . data - > responseTo = = toSend . data - > id )
break ;
2009-01-15 11:26:38 -05:00
out ( ) < < " ******************** " < < endl ;
2009-02-19 12:03:15 -05:00
out ( ) < < " ERROR: MessagingPort::call() wrong id got: " < < ( unsigned ) response . data - > responseTo < < " expect: " < < ( unsigned ) toSend . data - > id < < endl ;
2009-12-01 12:25:48 -05:00
out ( ) < < " toSend op: " < < toSend . data - > operation ( ) < < " old id: " < < ( unsigned ) old < < endl ;
2009-02-19 12:03:15 -05:00
out ( ) < < " response msgid: " < < ( unsigned ) response . data - > id < < endl ;
out ( ) < < " response len: " < < ( unsigned ) response . data - > len < < endl ;
2009-12-01 12:25:48 -05:00
out ( ) < < " response op: " < < response . data - > operation ( ) < < endl ;
2009-11-24 13:55:46 -05:00
out ( ) < < " farEnd: " < < farEnd < < endl ;
2009-01-15 10:17:11 -05:00
assert ( false ) ;
response . reset ( ) ;
2009-01-13 14:32:36 -05:00
}
2009-01-15 11:26:38 -05:00
mmm ( out ( ) < < " *call() end " < < endl ; )
2009-01-15 10:17:11 -05:00
return true ;
2009-01-13 14:32:36 -05:00
}
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
void MessagingPort : : say ( Message & toSend , int responseTo ) {
2009-01-15 11:26:38 -05:00
mmm ( out ( ) < < " * say() sock: " < < this - > sock < < " thr: " < < GetCurrentThreadId ( ) < < endl ; )
2009-09-11 16:14:14 -04:00
toSend . data - > id = nextMessageId ( ) ;
2009-01-15 10:17:11 -05:00
toSend . data - > responseTo = responseTo ;
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
int x = - 100 ;
if ( piggyBackData & & piggyBackData - > len ( ) ) {
2009-01-28 11:56:29 -05:00
mmm ( out ( ) < < " * have piggy back " < < endl ; )
2009-01-15 10:17:11 -05:00
if ( ( piggyBackData - > len ( ) + toSend . data - > len ) > 1300 ) {
// won't fit in a packet - so just send it off
piggyBackData - > flush ( ) ;
}
else {
piggyBackData - > append ( toSend ) ;
x = piggyBackData - > flush ( ) ;
}
}
2009-01-13 14:32:36 -05:00
2009-01-15 10:17:11 -05:00
if ( x = = - 100 )
2009-01-28 11:56:29 -05:00
x = : : send ( sock , ( char * ) toSend . data , toSend . data - > len , portSendFlags ) ;
2009-02-03 16:48:12 -05:00
2009-01-15 10:17:11 -05:00
if ( x < = 0 ) {
log ( ) < < " MessagingPort say send() error " < < errno < < ' ' < < farEnd . toString ( ) < < endl ;
2009-02-03 16:48:12 -05:00
throw SocketException ( ) ;
2009-01-15 10:17:11 -05:00
}
2009-01-14 17:17:24 -05:00
2009-01-13 14:32:36 -05:00
}
2009-01-15 10:17:11 -05:00
void MessagingPort : : piggyBack ( Message & toSend , int responseTo ) {
if ( toSend . data - > len > 1300 ) {
// not worth saving because its almost an entire packet
say ( toSend ) ;
return ;
}
2009-01-14 17:17:24 -05:00
2009-01-15 10:17:11 -05:00
// we're going to be storing this, so need to set it up
2009-09-11 16:14:14 -04:00
toSend . data - > id = nextMessageId ( ) ;
2009-01-15 10:17:11 -05:00
toSend . data - > responseTo = responseTo ;
2009-01-13 14:32:36 -05:00
2009-01-15 10:17:11 -05:00
if ( ! piggyBackData )
piggyBackData = new PiggyBackData ( this ) ;
piggyBackData - > append ( toSend ) ;
}
2009-01-14 17:09:51 -05:00
2009-09-11 16:14:14 -04:00
unsigned MessagingPort : : remotePort ( ) {
return farEnd . getPort ( ) ;
}
MSGID NextMsgId ;
bool usingClientIds = 0 ;
2009-10-13 12:55:23 -04:00
ThreadLocalValue < int > clientId ;
2009-09-11 16:14:14 -04:00
struct MsgStart {
MsgStart ( ) {
NextMsgId = ( ( ( unsigned ) time ( 0 ) ) < < 16 ) ^ curTimeMillis ( ) ;
assert ( MsgDataHeaderSize = = 16 ) ;
}
} msgstart ;
2009-03-02 09:57:22 -05:00
MSGID nextMessageId ( ) {
2009-12-01 19:28:27 -05:00
MSGID msgid = NextMsgId . atomicIncrement ( ) ;
2009-09-11 16:14:14 -04:00
if ( usingClientIds ) {
msgid = msgid & 0xFFFF ;
msgid = msgid | clientId . get ( ) ;
}
2009-03-02 09:57:22 -05:00
return msgid ;
}
2009-03-17 17:24:38 -04:00
bool doesOpGetAResponse ( int op ) {
return op = = dbQuery | | op = = dbGetMore ;
}
2009-09-11 16:14:14 -04:00
void setClientId ( int id ) {
usingClientIds = true ;
id = id & 0xFFFF0000 ;
massert ( " invalid id " , id ) ;
2009-10-13 12:55:23 -04:00
clientId . set ( id ) ;
2009-09-11 16:14:14 -04:00
}
2009-09-14 11:33:35 -04:00
int getClientId ( ) {
return clientId . get ( ) ;
}
2009-01-14 17:09:51 -05:00
} // namespace mongo