Files
mongo/client/dbclient.cpp

563 lines
16 KiB
C++
Raw Normal View History

// dbclient.cpp - connect to a Mongo database as a database, from C++
2008-07-27 18:36:47 -04:00
/**
* Copyright (C) 2008 10gen Inc.
2008-12-28 20:28:49 -05:00
*
2008-07-27 18:36:47 -04:00
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
2008-12-28 20:28:49 -05:00
*
2008-07-27 18:36:47 -04:00
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
2008-12-28 20:28:49 -05:00
*
2008-07-27 18:36:47 -04:00
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "stdafx.h"
2008-10-19 17:46:53 -05:00
#include "../db/pdfile.h"
2008-07-27 18:36:47 -04:00
#include "dbclient.h"
#include "../util/builder.h"
2008-10-19 17:46:53 -05:00
#include "../db/jsobj.h"
#include "../db/query.h"
#include "../db/json.h"
#include "../db/instance.h"
2008-10-19 11:17:25 -05:00
/* --- dbclientcommands --- */
2009-01-10 18:17:23 -05:00
inline bool DBClientWithCommands::isOk(const BSONObj& o) {
return o.getIntField("ok") == 1;
}
inline bool DBClientWithCommands::runCommand(const char *dbname, BSONObj cmd, BSONObj &info) {
string ns = string(dbname) + ".$cmd";
info = findOne(ns.c_str(), cmd);
return isOk(info);
}
/* note - we build a bson obj here -- for something that is super common like getlasterror you
should have that object prebuilt as that would be faster.
*/
bool DBClientWithCommands::simpleCommand(const char *dbname, BSONObj *info, const char *command) {
BSONObj o;
if( info == 0 )
info = &o;
BSONObjBuilder b;
b.appendInt(command, 1);
return runCommand(dbname, b.done(), *info);
}
2009-01-06 13:02:09 -05:00
BSONObj ismastercmdobj = fromjson("{\"ismaster\":1}");
2008-10-19 11:17:25 -05:00
2009-01-10 18:17:23 -05:00
bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) {
BSONObj o; if( info == 0 ) info = &o;
bool ok = runCommand("admin", ismastercmdobj, *info);
isMaster = (info->getIntField("ismaster") == 1);
return ok;
}
bool DBClientWithCommands::createCollection(const char *ns, unsigned size, bool capped, int max, BSONObj *info) {
BSONObj o; if( info == 0 ) info = &o;
BSONObjBuilder b;
b.append("create", ns);
if( size ) b.append("size", size);
if( capped ) b.append("capped", true);
if( max ) b.append("max", max);
string db = nsToClient(ns);
return runCommand(db.c_str(), b.done(), *info);
}
bool DBClientWithCommands::copyDatabase(const char *fromdb, const char *todb, const char *fromhost, BSONObj *info) {
assert( *fromdb && *todb );
BSONObj o; if( info == 0 ) info = &o;
BSONObjBuilder b;
b.append("copydb", 1);
b.append("fromhost", fromhost);
b.append("fromdb", fromdb);
b.append("todb", todb);
return runCommand("admin", b.done(), *info);
}
bool DBClientWithCommands::setDbProfilingLevel(const char *dbname, ProfilingLevel level, BSONObj *info ) {
BSONObj o; if( info == 0 ) info = &o;
if( level ) {
// Create system.profile collection. If it already exists this does nothing.
// TODO: move this into the db instead of here so that all
// drivers don't have to do this.
string ns = string(dbname) + ".system.profile";
createCollection(ns.c_str(), 1024 * 1024, true, 0, info);
}
BSONObjBuilder b;
b.append("profile", (int) level);
return runCommand(dbname, b.done(), *info);
}
BSONObj getprofilingcmdobj = fromjson("{\"profile\":-1}");
2009-01-10 18:17:23 -05:00
bool DBClientWithCommands::getDbProfilingLevel(const char *dbname, ProfilingLevel& level, BSONObj *info) {
BSONObj o; if( info == 0 ) info = &o;
if( runCommand(dbname, getprofilingcmdobj, *info) ) {
level = (ProfilingLevel) info->getIntField("was");
return true;
}
return false;
}
bool DBClientWithCommands::eval(const char *dbname, const char *jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) {
BSONObjBuilder b;
b.appendCode("$eval", jscode);
if( args )
b.appendArray("args", *args);
bool ok = runCommand(dbname, b.done(), info);
if( ok )
retValue = info.getField("retval");
return ok;
}
bool DBClientWithCommands::eval(const char *dbname, const char *jscode) {
BSONObj info;
BSONElement retValue;
return eval(dbname, jscode, info, retValue);
}
/* TODO: unit tests should run this? */
void testDbEval() {
DBClientConnection c;
string err;
if( !c.connect("localhost", err) ) {
cout << "can't connect to server " << err << endl;
return;
}
BSONObj info;
BSONElement retValue;
BSONObjBuilder b;
b.append("0", 99);
BSONObj args = b.done();
bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args);
cout << "eval ok=" << ok << endl;
cout << "retvalue=" << retValue.toString() << endl;
cout << "info=" << info.toString() << endl;
cout << endl;
int x = 3;
assert( c.eval("dwight", "function() { return 3; }", x) );
cout << "***\n";
BSONObj foo = fromjson("{\"x\":7}");
2009-01-10 18:17:23 -05:00
cout << foo.toString() << endl;
int res=0;
ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res);
cout << ok << " retval:" << res << endl;
}
int test2() {
testDbEval();
return 0;
2008-10-19 11:17:25 -05:00
}
/* --- dbclientconnection --- */
2008-07-27 18:36:47 -04:00
BSONObj DBClientBase::findOne(const char *ns, BSONObj query, BSONObj *fieldsToReturn, int queryOptions) {
2008-12-28 20:28:49 -05:00
auto_ptr<DBClientCursor> c =
this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
2008-09-11 13:04:30 -04:00
massert( "DBClientBase::findOne: transport error", c.get() );
2008-12-28 20:28:49 -05:00
if ( !c->more() )
return BSONObj();
2008-12-28 20:28:49 -05:00
return c->next().copy();
}
2008-12-28 20:28:49 -05:00
bool DBClientConnection::connect(const char *_serverAddress, string& errmsg) {
serverAddress = _serverAddress;
2008-12-28 20:28:49 -05:00
int port = DBPort;
string ip = hostbyname(_serverAddress);
if ( ip.empty() )
ip = serverAddress;
size_t idx = ip.find( ":" );
if ( idx != string::npos ) {
//cout << "port string:" << ip.substr( idx ) << endl;
port = atoi( ip.substr( idx + 1 ).c_str() );
ip = ip.substr( 0 , idx );
ip = hostbyname(ip.c_str());
}
if ( ip.empty() )
ip = serverAddress;
// we keep around SockAddr for connection life -- maybe MessagingPort
// requires that?
2008-12-28 20:28:49 -05:00
server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
p = auto_ptr<MessagingPort>(new MessagingPort());
2008-12-28 20:28:49 -05:00
if ( !p->connect(*server) ) {
stringstream ss;
ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port;
2008-12-19 11:19:35 -05:00
errmsg = ss.str();
failed = true;
2008-12-28 20:28:49 -05:00
return false;
}
return true;
}
2008-12-28 20:28:49 -05:00
void DBClientConnection::checkConnection() {
if ( !failed )
return;
2008-12-28 20:28:49 -05:00
if ( lastReconnectTry && time(0)-lastReconnectTry < 2 )
return;
2008-12-28 20:28:49 -05:00
if ( !autoReconnect )
return;
lastReconnectTry = time(0);
log() << "trying reconnect to " << serverAddress << endl;
string errmsg;
string tmp = serverAddress;
failed = false;
2008-12-28 20:28:49 -05:00
if ( !connect(tmp.c_str(), errmsg) )
log() << "reconnect " << serverAddress << " failed " << errmsg << endl;
else
log() << "reconnect " << serverAddress << " ok" << endl;
}
auto_ptr<DBClientCursor> DBClientBase::query(const char *ns, BSONObj query, int nToReturn,
2008-12-28 20:28:49 -05:00
int nToSkip, BSONObj *fieldsToReturn, int queryOptions) {
auto_ptr<DBClientCursor> c( new DBClientCursor( this,
2008-12-28 20:28:49 -05:00
ns, query, nToReturn, nToSkip,
fieldsToReturn, queryOptions ) );
if ( c->init() )
return c;
return auto_ptr< DBClientCursor >( 0 );
}
void DBClientBase::insert( const char * ns , BSONObj obj ){
2009-01-05 14:48:04 -05:00
Message toSend;
BufBuilder b;
int opts = 0;
b.append( opts );
b.append( ns );
obj.appendSelfToBufBuilder( b );
toSend.setData( dbInsert , b.buf() , b.len() );
say( toSend );
2009-01-05 14:48:04 -05:00
}
2009-01-13 15:19:50 -05:00
void DBClientBase::remove( const char * ns , BSONObj obj , bool justOne ){
2009-01-05 14:48:04 -05:00
Message toSend;
BufBuilder b;
int opts = 0;
b.append( opts );
b.append( ns );
int flags = 0;
if ( justOne || obj.hasField( "_id" ) )
flags &= 1;
b.append( flags );
obj.appendSelfToBufBuilder( b );
toSend.setData( dbDelete , b.buf() , b.len() );
2009-01-13 15:19:50 -05:00
say( toSend );
2009-01-05 14:48:04 -05:00
}
2009-01-13 16:08:07 -05:00
void DBClientBase::update( const char * ns , BSONObj query , BSONObj obj , bool upsert ){
BufBuilder b;
b.append( (int)0 ); // reserverd
b.append( ns );
b.append( (int)upsert );
query.appendSelfToBufBuilder( b );
obj.appendSelfToBufBuilder( b );
Message toSend;
toSend.setData( dbUpdate , b.buf() , b.len() );
say( toSend );
}
/* -- DBClientCursor ---------------------------------------------- */
2008-12-19 09:52:54 -05:00
void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) {
2008-12-28 20:28:49 -05:00
// see query.h for the protocol we are using here.
BufBuilder b;
int opts = queryOptions;
assert( (opts&Option_ALLMASK) == opts );
b.append(opts);
2008-12-28 20:28:49 -05:00
b.append(ns.c_str());
b.append(nToSkip);
b.append(nToReturn);
query.appendSelfToBufBuilder(b);
if ( fieldsToReturn )
fieldsToReturn->appendSelfToBufBuilder(b);
toSend.setData(dbQuery, b.buf(), b.len());
}
void DBClientConnection::say( Message &toSend ) {
port().say( toSend );
}
void DBClientConnection::sayPiggyBack( Message &toSend ) {
port().piggyBack( toSend );
}
bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) {
if ( !port().call(toSend, response) ) {
failed = true;
2008-12-28 20:28:49 -05:00
if ( assertOk )
massert("dbclient error communicating with server", false);
return false;
}
2008-12-28 20:28:49 -05:00
return true;
}
void DBClientConnection::checkResponse( const char *data, int nReturned ) {
2008-12-28 20:28:49 -05:00
/* check for errors. the only one we really care about at
this stage is "not master" */
if ( clientPaired && nReturned ) {
BSONObj o(data);
BSONElement e = o.firstElement();
2008-12-28 20:28:49 -05:00
if ( strcmp(e.fieldName(), "$err") == 0 &&
e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
clientPaired->isntMaster();
}
}
}
bool DBClientCursor::init() {
2008-12-28 20:28:49 -05:00
Message toSend;
assembleRequest( ns, query, nToReturn, nToSkip, fieldsToReturn, opts, toSend );
if ( !connector->call( toSend, *m, false ) )
2008-12-28 20:28:49 -05:00
return false;
dataReceived();
return true;
}
2008-12-28 20:28:49 -05:00
void DBClientCursor::requestMore() {
assert( cursorId && pos == nReturned );
2008-12-28 20:28:49 -05:00
BufBuilder b;
b.append(opts);
2008-12-28 20:28:49 -05:00
b.append(ns.c_str());
b.append(nToReturn);
b.append(cursorId);
2008-12-28 20:28:49 -05:00
Message toSend;
toSend.setData(dbGetMore, b.buf(), b.len());
auto_ptr<Message> response(new Message());
connector->call( toSend, *response );
2008-12-28 20:28:49 -05:00
m = response;
dataReceived();
}
2008-12-28 20:28:49 -05:00
void DBClientCursor::dataReceived() {
QueryResult *qr = (QueryResult *) m->data;
2009-01-09 11:20:16 -05:00
if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) {
2008-12-28 20:28:49 -05:00
// cursor id no longer valid at the server.
assert( qr->cursorId == 0 );
cursorId = 0; // 0 indicates no longer valid (dead)
}
if ( cursorId == 0 ) {
// only set initially: we don't want to kill it on end of data
// if it's a tailable cursor
cursorId = qr->cursorId;
}
2008-12-28 20:28:49 -05:00
nReturned = qr->nReturned;
pos = 0;
data = qr->data();
2008-12-28 20:28:49 -05:00
connector->checkResponse( data, nReturned );
/* this assert would fire the way we currently work:
assert( nReturned || cursorId == 0 );
*/
}
2008-12-28 20:28:49 -05:00
bool DBClientCursor::more() {
if ( pos < nReturned )
return true;
2008-12-28 20:28:49 -05:00
if ( cursorId == 0 )
return false;
2008-12-28 20:28:49 -05:00
requestMore();
return pos < nReturned;
}
2008-10-21 16:13:48 -04:00
BSONObj DBClientCursor::next() {
2008-12-28 20:28:49 -05:00
assert( more() );
pos++;
BSONObj o(data);
data += o.objsize();
return o;
}
2009-01-08 17:08:43 -05:00
DBClientCursor::~DBClientCursor(){
if ( cursorId ){
BufBuilder b;
b.append( (int)0 ); // reserved
b.append( (int)1 ); // number
b.append( cursorId );
Message m;
m.setData( dbKillCursors , b.buf() , b.len() );
connector->sayPiggyBack( m );
2009-01-08 17:08:43 -05:00
}
}
/* ------------------------------------------------------ */
// "./db testclient" to invoke
2008-10-21 16:13:48 -04:00
extern BSONObj emptyObj;
void testClient() {
2008-12-28 20:28:49 -05:00
cout << "testClient()" << endl;
// DBClientConnection c(true);
DBClientPaired c;
2008-12-28 20:28:49 -05:00
string err;
if ( !c.connect("10.211.55.2", "1.2.3.4") ) {
// if( !c.connect("10.211.55.2", err) ) {
cout << "testClient: connect() failed" << endl;
}
2008-12-28 20:28:49 -05:00
else {
// temp:
cout << "test query returns: " << c.findOne("foo.bar", fromjson("{}")).toString() << endl;
}
again:
2008-12-28 20:28:49 -05:00
cout << "query foo.bar..." << endl;
auto_ptr<DBClientCursor> cursor =
c.query("foo.bar", emptyObj, 0, 0, 0, Option_CursorTailable);
DBClientCursor *cc = cursor.get();
if ( cc == 0 ) {
2008-10-19 11:17:25 -05:00
cout << "query() returned 0, sleeping 10 secs" << endl;
sleepsecs(10);
goto again;
}
2008-12-28 20:28:49 -05:00
while ( 1 ) {
bool m;
try {
m = cc->more();
2008-12-28 20:28:49 -05:00
} catch (AssertionException&) {
cout << "more() asserted, sleeping 10 sec" << endl;
goto again;
}
2008-12-28 20:28:49 -05:00
cout << "more: " << m << " dead:" << cc->isDead() << endl;
if ( !m ) {
if ( cc->isDead() )
cout << "cursor dead, stopping" << endl;
else {
cout << "Sleeping 10 seconds" << endl;
sleepsecs(10);
continue;
}
break;
}
cout << cc->next().toString() << endl;
}
}
2008-10-19 11:17:25 -05:00
/* --- class dbclientpaired --- */
2008-12-28 20:28:49 -05:00
string DBClientPaired::toString() {
2008-10-24 17:51:28 -04:00
stringstream ss;
ss << "state: " << master << '\n';
ss << "left: " << left.toStringLong() << '\n';
ss << "right: " << right.toStringLong() << '\n';
return ss.str();
}
2008-12-28 20:28:49 -05:00
DBClientPaired::DBClientPaired() :
left(true), right(true)
{
2008-10-19 11:17:25 -05:00
master = NotSetL;
}
/* find which server, the left or right, is currently master mode */
void DBClientPaired::_checkMaster() {
2008-12-28 20:28:49 -05:00
for ( int retry = 0; retry < 2; retry++ ) {
int x = master;
2008-12-28 20:28:49 -05:00
for ( int pass = 0; pass < 2; pass++ ) {
DBClientConnection& c = x == 0 ? left : right;
try {
bool im;
2009-01-10 18:17:23 -05:00
BSONObj o;
c.isMaster(im, &o);
2008-12-28 20:28:49 -05:00
if ( retry )
log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n';
2008-12-28 20:28:49 -05:00
if ( im ) {
master = (State) (x + 2);
return;
}
}
2008-12-28 20:28:49 -05:00
catch (AssertionException&) {
if ( retry )
log() << "checkmaster: caught exception " << c.toString() << '\n';
}
x = x^1;
}
sleepsecs(1);
}
2008-10-19 11:17:25 -05:00
uassert("checkmaster: no master found", false);
}
2008-10-19 11:17:25 -05:00
2008-12-28 20:28:49 -05:00
inline DBClientConnection& DBClientPaired::checkMaster() {
if ( master > NotSetR ) {
// a master is selected. let's just make sure connection didn't die
DBClientConnection& c = master == Left ? left : right;
2008-12-28 20:28:49 -05:00
if ( !c.isFailed() )
return c;
2008-12-28 20:28:49 -05:00
// after a failure, on the next checkMaster, start with the other
// server -- presumably it took over. (not critical which we check first,
// just will make the failover slightly faster if we guess right)
master = master == Left ? NotSetR : NotSetL;
2008-10-19 11:17:25 -05:00
}
_checkMaster();
assert( master > NotSetR );
return master == Left ? left : right;
2008-10-19 11:17:25 -05:00
}
2008-12-28 20:28:49 -05:00
bool DBClientPaired::connect(const char *serverHostname1, const char *serverHostname2) {
2008-10-19 11:17:25 -05:00
string errmsg;
bool l = left.connect(serverHostname1, errmsg);
bool r = right.connect(serverHostname2, errmsg);
master = l ? NotSetL : NotSetR;
2008-12-28 20:28:49 -05:00
if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow
return false;
2008-12-28 20:28:49 -05:00
try {
checkMaster();
}
catch (UserAssertionException&) {
return false;
}
return true;
2008-10-19 11:17:25 -05:00
}
2008-12-28 20:28:49 -05:00
auto_ptr<DBClientCursor> DBClientPaired::query(const char *a, BSONObj b, int c, int d,
BSONObj *e, int f)
2008-10-19 11:17:25 -05:00
{
return checkMaster().query(a,b,c,d,e,f);
2008-10-19 11:17:25 -05:00
}
2008-10-21 16:13:48 -04:00
BSONObj DBClientPaired::findOne(const char *a, BSONObj b, BSONObj *c, int d) {
return checkMaster().findOne(a,b,c,d);
2008-10-19 11:17:25 -05:00
}