Files
mongo/grid/message.cpp

185 lines
4.7 KiB
C++
Raw Normal View History

2007-10-19 19:35:48 -04:00
/* message
todo: authenticate; encrypt?
*/
#include "stdafx.h"
#include "message.h"
2007-11-05 15:12:45 -05:00
#include <time.h>
2007-11-13 16:44:01 -05:00
#include "../util/goodies.h"
2007-12-08 15:50:47 -05:00
2007-12-14 12:48:47 -05:00
// if you want trace output:
#define mmm(x)
2007-12-08 15:50:47 -05:00
/* listener ------------------------------------------------------------------- */
void Listener::listen() {
SockAddr me(port);
int sock = socket(AF_INET, SOCK_STREAM, 0);
if( sock == INVALID_SOCKET ) {
cout << "ERROR: listen(): invalid socket? " << errno << endl;
return;
}
2008-01-24 16:09:24 +00:00
prebindOptions( sock );
2007-12-08 15:50:47 -05:00
if( bind(sock, (sockaddr *) &me.sa, me.addressSize) != 0 ) {
2007-12-13 15:36:56 -05:00
cout << "listen(): bind() failed errno:" << errno << endl;
if( errno == 98 )
cout << "98 == addr already in use" << endl;
2007-12-08 15:50:47 -05:00
closesocket(sock);
return;
}
if( ::listen(sock, 128) != 0 ) {
cout << "listen(): listen() failed " << errno << endl;
closesocket(sock);
return;
}
SockAddr from;
while( 1 ) {
int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
if( s < 0 ) {
cout << "Listener: accept() returns " << s << " errno:" << errno << endl;
continue;
}
disableNagle(s);
cout << "Listener: connection accepted from " << from.toString() << endl;
accepted( new MessagingPort(s, from) );
}
}
/* messagingport -------------------------------------------------------------- */
2007-10-19 19:35:48 -04:00
2007-11-17 21:10:00 -05:00
MSGID NextMsgId;
2007-10-19 19:35:48 -04:00
struct MsgStart {
MsgStart() {
2007-11-17 21:10:00 -05:00
NextMsgId = (((unsigned) time(0)) << 16) ^ curTimeMillis();
assert(MsgDataHeaderSize == 16);
2007-10-19 19:35:48 -04:00
}
} msgstart;
2007-12-08 15:50:47 -05:00
MessagingPort::MessagingPort(int _sock, SockAddr& _far) : sock(_sock), farEnd(_far) { }
2007-11-17 21:10:00 -05:00
2007-12-08 15:50:47 -05:00
MessagingPort::MessagingPort() {
sock = -1;
}
void MessagingPort::shutdown() {
if( sock >= 0 ) {
closesocket(sock);
sock = -1;
2007-11-17 21:10:00 -05:00
}
2007-10-19 19:35:48 -04:00
}
MessagingPort::~MessagingPort() {
2007-12-08 15:50:47 -05:00
shutdown();
2007-10-19 19:35:48 -04:00
}
2007-12-08 15:50:47 -05:00
bool MessagingPort::connect(SockAddr& _far)
{
farEnd = _far;
2007-12-08 15:50:47 -05:00
sock = socket(AF_INET, SOCK_STREAM, 0);
if( sock == INVALID_SOCKET ) {
cout << "ERROR: connect(): invalid socket? " << errno << endl;
return false;
2007-10-19 19:35:48 -04:00
}
2007-12-08 15:50:47 -05:00
if( ::connect(sock, (sockaddr *) &farEnd.sa, farEnd.addressSize) ) {
cout << "ERROR: connect(): connect() failed" << errno << endl;
closesocket(sock); sock = -1;
return false;
}
disableNagle(sock);
return true;
2007-10-19 19:35:48 -04:00
}
bool MessagingPort::recv(Message& m) {
2007-12-14 12:48:47 -05:00
mmm( cout << "* recv() sock:" << this->sock << endl; )
2007-12-08 15:50:47 -05:00
int len;
2007-10-19 19:35:48 -04:00
2007-12-08 15:50:47 -05:00
int x = ::recv(sock, (char *) &len, 4, 0);
if( x == 0 ) {
cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
m.reset();
return false;
}
if( x < 0 ) {
cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString()<<endl;
m.reset();
return false;
2007-10-19 19:35:48 -04:00
}
2007-11-17 21:10:00 -05:00
2007-12-08 15:50:47 -05:00
assert( x == 4 );
2008-01-26 13:12:38 -05:00
2007-12-12 10:32:34 -05:00
int z = (len+1023)&0xfffffc00; assert(z>=len);
2007-12-08 15:50:47 -05:00
MsgData *md = (MsgData *) malloc(z);
md->len = len;
2008-01-26 13:12:38 -05:00
if ( len <= 0 ){
cout << "got a length of 0, something is wrong" << endl;
return false;
}
2007-12-08 15:50:47 -05:00
char *p = (char *) &md->id;
int left = len -4;
while( 1 ) {
x = ::recv(sock, p, left, 0);
if( x == 0 ) {
cout << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
m.reset();
return false;
}
if( x < 0 ) {
cout << "MessagingPort::recv(): recv() error " << errno << ' ' << farEnd.toString() << endl;
m.reset();
return false;
}
left -= x;
p += x;
if( left <= 0 )
break;
}
m.setData(md, true);
2007-10-19 19:35:48 -04:00
return true;
}
void MessagingPort::reply(Message& received, Message& response) {
2007-12-08 15:50:47 -05:00
say(received.from, response, received.data->id);
2007-10-19 19:35:48 -04:00
}
bool MessagingPort::call(SockAddr& to, Message& toSend, Message& response) {
2007-12-14 12:48:47 -05:00
mmm( cout << "*call()" << endl; )
MSGID old = toSend.data->id;
2007-12-08 15:50:47 -05:00
say(to, toSend);
2007-11-05 15:12:45 -05:00
while( 1 ) {
bool ok = recv(response);
if( !ok )
return false;
2007-11-13 16:44:01 -05:00
//cout << "got response: " << response.data->responseTo << endl;
2007-11-05 15:12:45 -05:00
if( response.data->responseTo == toSend.data->id )
break;
2007-12-14 12:48:47 -05:00
cout << "********************" << endl;
cout << "ERROR: MessagingPort::call() wrong id got:" << response.data->responseTo << " expect:" << toSend.data->id << endl;
cout << " old:" << old << endl;
cout << " response msgid:" << response.data->id << endl;
cout << " response len: " << response.data->len << endl;
assert(false);
2007-11-05 15:12:45 -05:00
response.reset();
}
2007-12-14 12:48:47 -05:00
mmm( cout << "*call() end" << endl; )
2007-10-19 19:35:48 -04:00
return true;
}
2007-12-08 15:50:47 -05:00
void MessagingPort::say(SockAddr& to, Message& toSend, int responseTo) {
2007-12-14 12:48:47 -05:00
mmm( cout << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
2007-11-17 21:10:00 -05:00
MSGID msgid = NextMsgId;
++NextMsgId;
toSend.data->id = msgid;
2007-10-19 19:35:48 -04:00
toSend.data->responseTo = responseTo;
2007-12-08 15:50:47 -05:00
int x = ::send(sock, (char *) toSend.data, toSend.data->len, 0);
if( x <= 0 ) {
cout << "MessagingPort::say: send() error " << errno << ' ' << farEnd.toString() << endl;
2007-10-19 19:35:48 -04:00
}
}