2010-02-09 16:48:21 -05:00
|
|
|
/*
|
|
|
|
|
* Copyright (C) 2010 10gen Inc.
|
|
|
|
|
*
|
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
|
* it under the terms of the GNU Affero General Public License, version 3,
|
|
|
|
|
* as published by the Free Software Foundation.
|
|
|
|
|
*
|
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
|
* GNU Affero General Public License for more details.
|
|
|
|
|
*
|
|
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
*/
|
|
|
|
|
|
2009-02-20 10:46:42 -05:00
|
|
|
// stragegy.cpp
|
|
|
|
|
|
2010-04-27 15:27:52 -04:00
|
|
|
#include "pch.h"
|
2009-02-20 10:46:42 -05:00
|
|
|
#include "request.h"
|
2009-04-17 17:11:32 -04:00
|
|
|
#include "../util/background.h"
|
2009-02-20 10:46:42 -05:00
|
|
|
#include "../client/connpool.h"
|
|
|
|
|
#include "../db/commands.h"
|
2010-07-28 14:24:55 -04:00
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
#include "server.h"
|
2010-07-28 14:24:55 -04:00
|
|
|
#include "grid.h"
|
2009-02-20 10:46:42 -05:00
|
|
|
|
|
|
|
|
namespace mongo {
|
|
|
|
|
|
2009-02-23 21:47:25 -05:00
|
|
|
// ----- Strategy ------
|
|
|
|
|
|
2010-07-22 23:05:02 -04:00
|
|
|
void Strategy::doWrite( int op , Request& r , const Shard& shard , bool checkVersion ){
|
2010-07-21 14:12:32 -04:00
|
|
|
ShardConnection conn( shard , r.getns() );
|
2010-07-22 23:05:02 -04:00
|
|
|
if ( ! checkVersion )
|
|
|
|
|
conn.donotCheckVersion();
|
2010-07-23 00:47:24 -04:00
|
|
|
else if ( conn.setVersion() ){
|
|
|
|
|
conn.done();
|
|
|
|
|
throw StaleConfigException( r.getns() , "doWRite" , true );
|
|
|
|
|
}
|
2010-07-21 14:12:32 -04:00
|
|
|
conn->say( r.m() );
|
|
|
|
|
conn.done();
|
2009-02-20 10:46:42 -05:00
|
|
|
}
|
2010-07-23 00:47:24 -04:00
|
|
|
|
2010-04-27 12:32:59 -04:00
|
|
|
void Strategy::doQuery( Request& r , const Shard& shard ){
|
2009-02-21 23:39:41 -05:00
|
|
|
try{
|
2010-05-20 13:36:29 -04:00
|
|
|
ShardConnection dbcon( shard , r.getns() );
|
2010-03-22 11:47:37 -04:00
|
|
|
DBClientBase &c = dbcon.conn();
|
2009-02-21 23:39:41 -05:00
|
|
|
|
|
|
|
|
Message response;
|
2010-03-22 11:47:37 -04:00
|
|
|
bool ok = c.call( r.m(), response);
|
2009-04-07 15:19:27 -04:00
|
|
|
|
|
|
|
|
{
|
2010-05-12 15:26:00 -07:00
|
|
|
QueryResult *qr = (QueryResult *) response.singleData();
|
2010-07-18 13:34:16 -04:00
|
|
|
if ( qr->resultFlags() & ResultFlag_ShardConfigStale ){
|
2009-09-14 14:32:24 -04:00
|
|
|
dbcon.done();
|
2009-04-12 22:19:41 -04:00
|
|
|
throw StaleConfigException( r.getns() , "Strategy::doQuery" );
|
2009-04-07 15:19:27 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-12-28 16:43:43 -05:00
|
|
|
uassert( 10200 , "mongos: error calling db", ok);
|
2010-05-28 14:23:37 -04:00
|
|
|
r.reply( response , c.getServerAddress() );
|
2009-02-21 23:39:41 -05:00
|
|
|
dbcon.done();
|
|
|
|
|
}
|
|
|
|
|
catch ( AssertionException& e ) {
|
|
|
|
|
BSONObjBuilder err;
|
2010-06-21 13:41:34 -04:00
|
|
|
e.getInfo().append( err );
|
2009-02-21 23:39:41 -05:00
|
|
|
BSONObj errObj = err.done();
|
2010-07-18 13:34:16 -04:00
|
|
|
replyToQuery(ResultFlag_ErrSet, r.p() , r.m() , errObj);
|
2009-02-21 23:39:41 -05:00
|
|
|
}
|
|
|
|
|
}
|
2009-02-20 13:46:57 -05:00
|
|
|
|
2010-04-27 12:32:59 -04:00
|
|
|
void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj ){
|
2010-05-20 13:36:29 -04:00
|
|
|
ShardConnection dbcon( shard , ns );
|
2010-07-02 00:51:41 -04:00
|
|
|
if ( dbcon.setVersion() ){
|
|
|
|
|
dbcon.done();
|
2010-07-01 17:44:13 -04:00
|
|
|
throw StaleConfigException( ns , "for insert" );
|
2010-07-02 00:51:41 -04:00
|
|
|
}
|
2009-02-20 10:46:42 -05:00
|
|
|
dbcon->insert( ns , obj );
|
2009-02-20 13:46:57 -05:00
|
|
|
dbcon.done();
|
2009-02-20 10:46:42 -05:00
|
|
|
}
|
2009-02-23 21:47:25 -05:00
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
class WriteBackListener : public BackgroundJob {
|
|
|
|
|
protected:
|
2010-05-18 12:17:43 -04:00
|
|
|
string name() { return "WriteBackListener"; }
|
2009-04-17 17:11:32 -04:00
|
|
|
WriteBackListener( const string& addr ) : _addr( addr ){
|
2010-07-16 15:23:34 -04:00
|
|
|
log() << "creating WriteBackListener for: " << addr << endl;
|
2009-04-17 17:11:32 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void run(){
|
2010-07-22 13:52:42 -04:00
|
|
|
OID lastID;
|
2010-07-26 22:11:25 -04:00
|
|
|
lastID.clear();
|
2009-04-17 17:11:32 -04:00
|
|
|
int secsToSleep = 0;
|
2010-06-29 14:23:55 -04:00
|
|
|
while ( Shard::isMember( _addr ) ){
|
2010-07-22 13:52:42 -04:00
|
|
|
|
|
|
|
|
if ( lastID.isSet() ){
|
|
|
|
|
scoped_lock lk( _seenWritebacksLock );
|
|
|
|
|
_seenWritebacks.insert( lastID );
|
|
|
|
|
lastID.clear();
|
|
|
|
|
}
|
|
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
try {
|
2010-04-27 12:32:59 -04:00
|
|
|
ScopedDbConnection conn( _addr );
|
2009-04-17 17:11:32 -04:00
|
|
|
|
|
|
|
|
BSONObj result;
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
BSONObjBuilder cmd;
|
2010-07-12 12:43:13 -04:00
|
|
|
cmd.appendOID( "writebacklisten" , &serverID ); // Command will block for data
|
2009-04-17 17:11:32 -04:00
|
|
|
if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){
|
2010-07-17 16:07:38 -04:00
|
|
|
log() << "writebacklisten command failed! " << result << endl;
|
2009-09-14 14:32:24 -04:00
|
|
|
conn.done();
|
2009-04-17 17:11:32 -04:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2010-07-17 16:07:38 -04:00
|
|
|
log(1) << "writebacklisten result: " << result << endl;
|
2009-04-18 21:55:34 -04:00
|
|
|
|
|
|
|
|
BSONObj data = result.getObjectField( "data" );
|
|
|
|
|
if ( data.getBoolField( "writeBack" ) ){
|
|
|
|
|
string ns = data["ns"].valuestrsafe();
|
2010-07-22 13:52:42 -04:00
|
|
|
{
|
|
|
|
|
BSONElement e = data["id"];
|
|
|
|
|
if ( e.type() == jstOID )
|
|
|
|
|
lastID = e.OID();
|
|
|
|
|
}
|
2009-04-18 21:55:34 -04:00
|
|
|
int len;
|
|
|
|
|
|
|
|
|
|
Message m( (void*)data["msg"].binData( len ) , false );
|
2010-05-12 15:26:00 -07:00
|
|
|
massert( 10427 , "invalid writeback message" , m.header()->valid() );
|
2009-04-18 21:55:34 -04:00
|
|
|
|
2010-07-14 14:31:14 -04:00
|
|
|
DBConfigPtr db = grid.getDBConfig( ns );
|
|
|
|
|
ShardChunkVersion needVersion( data["version"] );
|
|
|
|
|
|
2010-07-23 16:06:24 -04:00
|
|
|
log(1) << "writeback id: " << lastID << " needVersion : " << needVersion.toString()
|
|
|
|
|
<< " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3)
|
2010-07-14 14:31:14 -04:00
|
|
|
|
2010-07-23 17:14:49 -04:00
|
|
|
if ( logLevel ) log(1) << debugString( m ) << endl;
|
2010-07-23 16:06:24 -04:00
|
|
|
|
2010-07-14 14:31:14 -04:00
|
|
|
if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ){
|
|
|
|
|
// this means when the write went originally, the version was old
|
|
|
|
|
// if we're here, it means we've already updated the config, so don't need to do again
|
2010-07-22 15:07:12 -04:00
|
|
|
//db->getChunkManager( ns , true ); // SERVER-1349
|
2010-07-14 14:31:14 -04:00
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
db->getChunkManager( ns , true );
|
|
|
|
|
}
|
2009-04-18 21:55:34 -04:00
|
|
|
|
|
|
|
|
Request r( m , 0 );
|
2010-07-22 15:39:20 -04:00
|
|
|
r.init();
|
2009-04-18 21:55:34 -04:00
|
|
|
r.process();
|
|
|
|
|
}
|
|
|
|
|
else {
|
2010-07-17 16:07:38 -04:00
|
|
|
log() << "unknown writeBack result: " << result << endl;
|
2009-04-18 21:55:34 -04:00
|
|
|
}
|
2009-04-17 17:11:32 -04:00
|
|
|
|
|
|
|
|
conn.done();
|
|
|
|
|
secsToSleep = 0;
|
2010-07-02 10:27:35 -04:00
|
|
|
continue;
|
2009-04-17 17:11:32 -04:00
|
|
|
}
|
|
|
|
|
catch ( std::exception e ){
|
|
|
|
|
log() << "WriteBackListener exception : " << e.what() << endl;
|
2010-06-29 14:28:48 -04:00
|
|
|
|
|
|
|
|
// It's possible this shard was removed
|
|
|
|
|
Shard::reloadShardInfo();
|
2009-04-17 17:11:32 -04:00
|
|
|
}
|
|
|
|
|
catch ( ... ){
|
|
|
|
|
log() << "WriteBackListener uncaught exception!" << endl;
|
|
|
|
|
}
|
|
|
|
|
secsToSleep++;
|
2009-04-20 09:16:47 -04:00
|
|
|
sleepsecs(secsToSleep);
|
2009-04-17 17:11:32 -04:00
|
|
|
if ( secsToSleep > 10 )
|
|
|
|
|
secsToSleep = 0;
|
|
|
|
|
}
|
2010-06-29 14:23:55 -04:00
|
|
|
|
|
|
|
|
log() << "WriteBackListener exiting : address no longer in cluster " << _addr;
|
|
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
string _addr;
|
|
|
|
|
|
2010-05-11 12:06:09 -04:00
|
|
|
static map<string,WriteBackListener*> _cache;
|
2010-07-22 13:52:42 -04:00
|
|
|
static mongo::mutex _cacheLock;
|
|
|
|
|
|
|
|
|
|
static set<OID> _seenWritebacks;
|
|
|
|
|
static mongo::mutex _seenWritebacksLock;
|
2010-05-11 12:06:09 -04:00
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
public:
|
|
|
|
|
static void init( DBClientBase& conn ){
|
2010-07-22 13:52:42 -04:00
|
|
|
scoped_lock lk( _cacheLock );
|
2009-04-17 17:11:32 -04:00
|
|
|
WriteBackListener*& l = _cache[conn.getServerAddress()];
|
|
|
|
|
if ( l )
|
|
|
|
|
return;
|
|
|
|
|
l = new WriteBackListener( conn.getServerAddress() );
|
|
|
|
|
l->go();
|
|
|
|
|
}
|
2010-07-22 13:52:42 -04:00
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
|
2010-07-22 13:52:42 -04:00
|
|
|
static void waitFor( const OID& oid ){
|
|
|
|
|
Timer t;
|
|
|
|
|
for ( int i=0; i<5000; i++ ){
|
|
|
|
|
{
|
|
|
|
|
scoped_lock lk( _seenWritebacksLock );
|
|
|
|
|
if ( _seenWritebacks.count( oid ) )
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
sleepmillis( 10 );
|
|
|
|
|
}
|
|
|
|
|
stringstream ss;
|
|
|
|
|
ss << "didn't get writeback for: " << oid << " after: " << t.millis() << " ms";
|
|
|
|
|
uasserted( 13403 , ss.str() );
|
|
|
|
|
}
|
2009-04-17 17:11:32 -04:00
|
|
|
};
|
|
|
|
|
|
2010-07-22 13:52:42 -04:00
|
|
|
void waitForWriteback( const OID& oid ){
|
|
|
|
|
WriteBackListener::waitFor( oid );
|
|
|
|
|
}
|
|
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
map<string,WriteBackListener*> WriteBackListener::_cache;
|
2010-07-22 13:52:42 -04:00
|
|
|
mongo::mutex WriteBackListener::_cacheLock("WriteBackListener");
|
|
|
|
|
|
|
|
|
|
set<OID> WriteBackListener::_seenWritebacks;
|
|
|
|
|
mongo::mutex WriteBackListener::_seenWritebacksLock( "WriteBackListener::seen" );
|
2010-07-25 11:32:15 -04:00
|
|
|
|
|
|
|
|
struct ConnectionShardStatus {
|
|
|
|
|
|
|
|
|
|
typedef unsigned long long S;
|
|
|
|
|
|
|
|
|
|
ConnectionShardStatus()
|
|
|
|
|
: _mutex( "ConnectionShardStatus" ){
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
S getSequence( DBClientBase * conn , const string& ns ){
|
|
|
|
|
scoped_lock lk( _mutex );
|
|
|
|
|
return _map[conn][ns];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void setSequence( DBClientBase * conn , const string& ns , const S& s ){
|
|
|
|
|
scoped_lock lk( _mutex );
|
|
|
|
|
_map[conn][ns] = s;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void reset( DBClientBase * conn ){
|
|
|
|
|
scoped_lock lk( _mutex );
|
|
|
|
|
_map.erase( conn );
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
map<DBClientBase*, map<string,unsigned long long> > _map;
|
|
|
|
|
mongo::mutex _mutex;
|
|
|
|
|
} connectionShardStatus;
|
|
|
|
|
|
|
|
|
|
void resetShardVersion( DBClientBase * conn ){
|
|
|
|
|
connectionShardStatus.reset( conn );
|
|
|
|
|
}
|
2010-07-01 17:44:13 -04:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @return true if had to do something
|
|
|
|
|
*/
|
2010-07-16 11:05:59 -04:00
|
|
|
bool checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative , int tryNumber ){
|
2009-03-25 17:35:38 -04:00
|
|
|
// TODO: cache, optimize, etc...
|
|
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
WriteBackListener::init( conn );
|
|
|
|
|
|
2010-05-28 14:18:51 -04:00
|
|
|
DBConfigPtr conf = grid.getDBConfig( ns );
|
2009-03-25 17:35:38 -04:00
|
|
|
if ( ! conf )
|
2010-07-01 17:44:13 -04:00
|
|
|
return false;
|
2009-03-25 17:35:38 -04:00
|
|
|
|
2009-11-24 17:28:57 -05:00
|
|
|
unsigned long long officialSequenceNumber = 0;
|
2010-07-16 11:05:59 -04:00
|
|
|
|
2010-05-26 12:40:46 -04:00
|
|
|
ChunkManagerPtr manager;
|
2010-06-03 15:45:46 -04:00
|
|
|
const bool isSharded = conf->isSharded( ns );
|
|
|
|
|
if ( isSharded ){
|
2010-04-28 10:08:28 -04:00
|
|
|
manager = conf->getChunkManager( ns , authoritative );
|
2009-11-24 17:28:57 -05:00
|
|
|
officialSequenceNumber = manager->getSequenceNumber();
|
|
|
|
|
}
|
2009-03-30 10:50:10 -04:00
|
|
|
|
2010-07-25 11:32:15 -04:00
|
|
|
unsigned long long sequenceNumber = connectionShardStatus.getSequence(&conn,ns);
|
|
|
|
|
if ( sequenceNumber == officialSequenceNumber ){
|
2010-07-01 17:44:13 -04:00
|
|
|
return false;
|
2010-07-25 11:32:15 -04:00
|
|
|
}
|
2010-06-03 15:45:46 -04:00
|
|
|
|
2010-07-25 11:32:15 -04:00
|
|
|
|
|
|
|
|
ShardChunkVersion version = 0;
|
2010-06-03 15:45:46 -04:00
|
|
|
if ( isSharded ){
|
|
|
|
|
version = manager->getVersion( Shard::make( conn.getServerAddress() ) );
|
|
|
|
|
}
|
2009-04-10 10:41:35 -04:00
|
|
|
|
2010-04-28 10:08:28 -04:00
|
|
|
log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns
|
|
|
|
|
<< " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber
|
2010-05-26 12:40:46 -04:00
|
|
|
<< " version: " << version << " manager: " << manager.get()
|
2010-04-28 10:08:28 -04:00
|
|
|
<< endl;
|
2010-04-28 22:13:38 -04:00
|
|
|
|
2009-03-27 16:55:26 -04:00
|
|
|
BSONObj result;
|
|
|
|
|
if ( setShardVersion( conn , ns , version , authoritative , result ) ){
|
2009-03-30 10:50:10 -04:00
|
|
|
// success!
|
2009-11-24 17:28:57 -05:00
|
|
|
log(1) << " setShardVersion success!" << endl;
|
2010-07-25 11:32:15 -04:00
|
|
|
connectionShardStatus.setSequence( &conn , ns , officialSequenceNumber );
|
2010-07-01 17:44:13 -04:00
|
|
|
return true;
|
2009-03-27 16:55:26 -04:00
|
|
|
}
|
2010-07-01 17:44:13 -04:00
|
|
|
|
2010-07-17 16:07:38 -04:00
|
|
|
log(1) << " setShardVersion failed!\n" << result << endl;
|
2009-03-27 16:55:26 -04:00
|
|
|
|
|
|
|
|
if ( result.getBoolField( "need_authoritative" ) )
|
2009-12-28 16:43:43 -05:00
|
|
|
massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative );
|
2009-03-27 16:55:26 -04:00
|
|
|
|
|
|
|
|
if ( ! authoritative ){
|
2010-07-16 11:05:59 -04:00
|
|
|
checkShardVersion( conn , ns , 1 , tryNumber + 1 );
|
2010-07-01 17:44:13 -04:00
|
|
|
return true;
|
2009-03-27 16:55:26 -04:00
|
|
|
}
|
|
|
|
|
|
2010-07-16 11:05:59 -04:00
|
|
|
if ( tryNumber < 4 ){
|
|
|
|
|
log(1) << "going to retry checkShardVersion" << endl;
|
|
|
|
|
sleepmillis( 10 );
|
|
|
|
|
checkShardVersion( conn , ns , 1 , tryNumber + 1 );
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2010-07-17 16:07:38 -04:00
|
|
|
log() << " setShardVersion failed: " << result << endl;
|
2010-04-27 15:50:33 -04:00
|
|
|
massert( 10429 , (string)"setShardVersion failed! " + result.jsonString() , 0 );
|
2010-06-29 17:57:29 -04:00
|
|
|
return true;
|
2009-04-03 14:21:00 -04:00
|
|
|
}
|
2010-07-01 17:44:13 -04:00
|
|
|
|
2009-04-17 17:11:32 -04:00
|
|
|
|
2009-02-20 10:46:42 -05:00
|
|
|
}
|