Compare commits
52 Commits
v4.4.30-ch
...
r2.0.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a5cf0e213 | ||
|
|
061c8ca173 | ||
|
|
9bf70fe751 | ||
|
|
4f42dce0db | ||
|
|
858d2ad0cd | ||
|
|
b2910bcc5e | ||
|
|
9e642f566f | ||
|
|
9e8ae84247 | ||
|
|
3c02b91d0e | ||
|
|
dda9a77986 | ||
|
|
a7a36ecbf0 | ||
|
|
417b9110ce | ||
|
|
0f605b5ab3 | ||
|
|
35a196a13e | ||
|
|
56b7cf65d1 | ||
|
|
41ff2157a3 | ||
|
|
a7796c9c09 | ||
|
|
8814b71b26 | ||
|
|
17caecbd77 | ||
|
|
7f66400562 | ||
|
|
f394d2c29a | ||
|
|
05916b4e71 | ||
|
|
6728c042dd | ||
|
|
afa5bc3d0e | ||
|
|
4eff1007c9 | ||
|
|
671479a616 | ||
|
|
59aa78ea64 | ||
|
|
414d14dbd8 | ||
|
|
3461007a7a | ||
|
|
4063312d60 | ||
|
|
729ad802c8 | ||
|
|
0b2cd79560 | ||
|
|
fec06cfe91 | ||
|
|
47360db9e8 | ||
|
|
6aee211334 | ||
|
|
33342ebb76 | ||
|
|
f5f0053a1f | ||
|
|
44da5dff2a | ||
|
|
2c34a826b6 | ||
|
|
a05db65257 | ||
|
|
b6bf498b3c | ||
|
|
99b2d74eb9 | ||
|
|
7ce5d643a0 | ||
|
|
1eb2d4556d | ||
|
|
fad753ee00 | ||
|
|
2250ff90c5 | ||
|
|
692f4ec8df | ||
|
|
ade41dcba8 | ||
|
|
1c064ab915 | ||
|
|
4449aae6e4 | ||
|
|
f2bf86ff3f | ||
|
|
695c67dff0 |
@@ -247,38 +247,27 @@ namespace mongo {
|
||||
}
|
||||
|
||||
HostAndPort ReplicaSetMonitor::getSlave() {
|
||||
LOG(2) << "dbclient_rs getSlave " << getServerAddress() << endl;
|
||||
|
||||
LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl;
|
||||
scoped_lock lk( _lock );
|
||||
|
||||
// Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take
|
||||
// any "ok" node
|
||||
// TODO: Could this query hidden nodes?
|
||||
const int MAX = 3;
|
||||
for ( int xxx=0; xxx<MAX; xxx++ ) {
|
||||
|
||||
{
|
||||
scoped_lock lk( _lock );
|
||||
|
||||
unsigned i = 0;
|
||||
for ( ; i<_nodes.size(); i++ ) {
|
||||
_nextSlave = ( _nextSlave + 1 ) % _nodes.size();
|
||||
if ( _nextSlave == _master ){
|
||||
LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl;
|
||||
continue;
|
||||
}
|
||||
if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) )
|
||||
return _nodes[ _nextSlave ].addr;
|
||||
|
||||
LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl;
|
||||
}
|
||||
|
||||
for ( unsigned ii = 0; ii < _nodes.size(); ii++ ) {
|
||||
_nextSlave = ( _nextSlave + 1 ) % _nodes.size();
|
||||
if ( _nextSlave != _master ) {
|
||||
if ( _nodes[ _nextSlave ].okForSecondaryQueries() )
|
||||
return _nodes[ _nextSlave ].addr;
|
||||
LOG(2) << "dbclient_rs getSlave not selecting " << _nodes[_nextSlave] << ", not currently okForSecondaryQueries" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
check(false);
|
||||
if( _master >= 0 ) {
|
||||
assert( static_cast<unsigned>(_master) < _nodes.size() );
|
||||
LOG(2) << "dbclient_rs getSlave no member in secondary state found, returning primary " << _nodes[ _master ] << endl;
|
||||
return _nodes[_master].addr;
|
||||
}
|
||||
|
||||
LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl;
|
||||
|
||||
LOG(2) << "dbclient_rs getSlave no suitable member found, returning first node " << _nodes[ 0 ] << endl;
|
||||
assert( _nodes.size() > 0 );
|
||||
return _nodes[0].addr;
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
namespace mongo {
|
||||
|
||||
LabeledLevel DistributedLock::logLvl( 1 );
|
||||
DistributedLock::LastPings DistributedLock::lastPings;
|
||||
|
||||
ThreadLocalValue<string> distLockIds("");
|
||||
|
||||
@@ -84,7 +85,7 @@ namespace mongo {
|
||||
Date_t pingTime;
|
||||
|
||||
try {
|
||||
ScopedDbConnection conn( addr );
|
||||
ScopedDbConnection conn( addr, 30.0 );
|
||||
|
||||
pingTime = jsTime();
|
||||
|
||||
@@ -224,7 +225,7 @@ namespace mongo {
|
||||
string s = pingThreadId( conn, processId );
|
||||
|
||||
// Ignore if we already have a pinging thread for this process.
|
||||
if ( _seen.count( s ) > 0 ) return "";
|
||||
if ( _seen.count( s ) > 0 ) return s;
|
||||
|
||||
// Check our clock skew
|
||||
try {
|
||||
@@ -303,6 +304,18 @@ namespace mongo {
|
||||
log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn
|
||||
<< " ( lock timeout : " << _lockTimeout
|
||||
<< ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl;
|
||||
|
||||
|
||||
}
|
||||
|
||||
DistributedLock::PingData DistributedLock::LastPings::getLastPing( const ConnectionString& conn, const string& lockName ){
|
||||
scoped_lock lock( _mutex );
|
||||
return _lastPings[ std::pair< string, string >( conn.toString(), lockName ) ];
|
||||
}
|
||||
|
||||
void DistributedLock::LastPings::setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd ){
|
||||
scoped_lock lock( _mutex );
|
||||
_lastPings[ std::pair< string, string >( conn.toString(), lockName ) ] = pd;
|
||||
}
|
||||
|
||||
Date_t DistributedLock::getRemoteTime() {
|
||||
@@ -512,6 +525,7 @@ namespace mongo {
|
||||
|
||||
unsigned long long elapsed = 0;
|
||||
unsigned long long takeover = _lockTimeout;
|
||||
PingData _lastPingCheck = getLastPing();
|
||||
|
||||
log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl;
|
||||
|
||||
@@ -527,8 +541,7 @@ namespace mongo {
|
||||
|
||||
if( recPingChange || recTSChange ) {
|
||||
// If the ping has changed since we last checked, mark the current date and time
|
||||
scoped_lock lk( _mutex );
|
||||
_lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() );
|
||||
setLastPing( PingData( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ) );
|
||||
}
|
||||
else {
|
||||
|
||||
@@ -540,7 +553,6 @@ namespace mongo {
|
||||
else
|
||||
elapsed = remote - _lastPingCheck.get<2>();
|
||||
}
|
||||
|
||||
}
|
||||
catch( LockException& e ) {
|
||||
|
||||
|
||||
@@ -71,6 +71,22 @@ namespace mongo {
|
||||
|
||||
static LabeledLevel logLvl;
|
||||
|
||||
typedef boost::tuple<string, Date_t, Date_t, OID> PingData;
|
||||
|
||||
class LastPings {
|
||||
public:
|
||||
LastPings() : _mutex( "DistributedLock::LastPings" ) {}
|
||||
~LastPings(){}
|
||||
|
||||
PingData getLastPing( const ConnectionString& conn, const string& lockName );
|
||||
void setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd );
|
||||
|
||||
mongo::mutex _mutex;
|
||||
map< std::pair<string, string>, PingData > _lastPings;
|
||||
};
|
||||
|
||||
static LastPings lastPings;
|
||||
|
||||
/**
|
||||
* The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired.
|
||||
* Construction does trigger a lock "pinging" mechanism, though.
|
||||
@@ -145,16 +161,12 @@ namespace mongo {
|
||||
|
||||
private:
|
||||
|
||||
void resetLastPing(){
|
||||
scoped_lock lk( _mutex );
|
||||
_lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>();
|
||||
}
|
||||
void resetLastPing(){ lastPings.setLastPing( _conn, _name, PingData() ); }
|
||||
void setLastPing( const PingData& pd ){ lastPings.setLastPing( _conn, _name, pd ); }
|
||||
PingData getLastPing(){ return lastPings.getLastPing( _conn, _name ); }
|
||||
|
||||
mongo::mutex _mutex;
|
||||
|
||||
// Data from last check of process with ping time
|
||||
boost::tuple<string, Date_t, Date_t, OID> _lastPingCheck;
|
||||
// May or may not exist, depending on startup
|
||||
mongo::mutex _mutex;
|
||||
string _threadId;
|
||||
|
||||
};
|
||||
|
||||
@@ -195,6 +195,7 @@ namespace mongo {
|
||||
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSkew(gen, boost::uniform_int<>(0, skewRange));
|
||||
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomWait(gen, boost::uniform_int<>(1, threadWait));
|
||||
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSleep(gen, boost::uniform_int<>(1, threadSleep));
|
||||
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomNewLock(gen, boost::uniform_int<>(0, 3));
|
||||
|
||||
|
||||
int skew = 0;
|
||||
@@ -262,7 +263,7 @@ namespace mongo {
|
||||
}
|
||||
else {
|
||||
log() << "**** Not unlocking for thread " << threadId << endl;
|
||||
DistributedLock::killPinger( *myLock );
|
||||
assert( DistributedLock::killPinger( *myLock ) );
|
||||
// We're simulating a crashed process...
|
||||
break;
|
||||
}
|
||||
@@ -274,6 +275,12 @@ namespace mongo {
|
||||
break;
|
||||
}
|
||||
|
||||
// Create a new lock 1/3 of the time
|
||||
if( randomNewLock() > 1 ){
|
||||
lock.reset(new DistributedLock( hostConn, lockName, takeoverMS, true ));
|
||||
myLock = lock.get();
|
||||
}
|
||||
|
||||
sleepmillis(randomSleep());
|
||||
}
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ namespace mongo {
|
||||
assert( cursor );
|
||||
|
||||
if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
|
||||
throw StaleConfigException( _ns , "ClusteredCursor::query" );
|
||||
throw StaleConfigException( _ns , "ClusteredCursor::_checkCursor" );
|
||||
}
|
||||
|
||||
if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
|
||||
@@ -90,7 +90,7 @@ namespace mongo {
|
||||
|
||||
if ( conn.setVersion() ) {
|
||||
conn.done();
|
||||
throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true );
|
||||
throw StaleConfigException( _ns , "ClusteredCursor::query" , true );
|
||||
}
|
||||
|
||||
LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server
|
||||
@@ -490,7 +490,7 @@ namespace mongo {
|
||||
|
||||
if ( conns[i]->setVersion() ) {
|
||||
conns[i]->done();
|
||||
staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc );
|
||||
staleConfigExs.push_back( (string)"stale config detected for " + StaleConfigException( _ns , "ParallelCursor::_init" , true ).what() + errLoc );
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -592,7 +592,7 @@ namespace mongo {
|
||||
// when we throw our exception
|
||||
allConfigStale = true;
|
||||
|
||||
staleConfigExs.push_back( e.what() + errLoc );
|
||||
staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc );
|
||||
_cursors[i].reset( NULL );
|
||||
conns[i]->done();
|
||||
continue;
|
||||
|
||||
@@ -83,6 +83,12 @@ namespace mongo {
|
||||
BSONElement e = i.next();
|
||||
if ( e.eoo() )
|
||||
break;
|
||||
|
||||
// for now, skip the "v" field so that v:0 indexes will be upgraded to v:1
|
||||
if ( string("v") == e.fieldName() ) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if ( string("ns") == e.fieldName() ) {
|
||||
uassert( 10024 , "bad ns field for index during dbcopy", e.type() == String);
|
||||
const char *p = strchr(e.valuestr(), '.');
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
#include "../../util/net/listen.h"
|
||||
#include "../commands.h"
|
||||
#include "../../client/dbclient.h"
|
||||
#include "../security.h"
|
||||
|
||||
#ifndef _WIN32
|
||||
# ifndef __sunos__
|
||||
@@ -211,6 +212,11 @@ namespace mongo {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!noauth && cmdLine.keyFile &&
|
||||
!conn.auth("local", internalSecurity.user, internalSecurity.pwd, errmsg, false)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BSONObj out;
|
||||
bool ok = conn.simpleCommand( "admin" , &out , "_isSelf" );
|
||||
|
||||
|
||||
@@ -708,6 +708,12 @@ int main(int argc, char* argv[]) {
|
||||
else {
|
||||
dbpath = "/data/db/";
|
||||
}
|
||||
#ifdef _WIN32
|
||||
if (dbpath.size() > 1 && dbpath[dbpath.size()-1] == '/') {
|
||||
// size() check is for the unlikely possibility of --dbpath "/"
|
||||
dbpath = dbpath.erase(dbpath.size()-1);
|
||||
}
|
||||
#endif
|
||||
|
||||
if ( params.count("directoryperdb")) {
|
||||
directoryperdb = true;
|
||||
|
||||
@@ -949,7 +949,7 @@ namespace mongo {
|
||||
}
|
||||
|
||||
list<BSONObj> all;
|
||||
auto_ptr<DBClientCursor> i = db.getIndexes( toDeleteNs );
|
||||
auto_ptr<DBClientCursor> i = db.query( dbname + ".system.indexes" , BSON( "ns" << toDeleteNs ) , 0 , 0 , 0 , QueryOption_SlaveOk );
|
||||
BSONObjBuilder b;
|
||||
while ( i->more() ) {
|
||||
BSONObj o = i->next().removeField("v").getOwned();
|
||||
@@ -1104,6 +1104,10 @@ namespace mongo {
|
||||
BSONObj sort = BSON( "files_id" << 1 << "n" << 1 );
|
||||
|
||||
shared_ptr<Cursor> cursor = bestGuessCursor(ns.c_str(), query, sort);
|
||||
if ( ! cursor ) {
|
||||
errmsg = "need an index on { files_id : 1 , n : 1 }";
|
||||
return false;
|
||||
}
|
||||
auto_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns.c_str()));
|
||||
|
||||
int n = 0;
|
||||
|
||||
@@ -353,20 +353,19 @@ namespace mongo {
|
||||
}
|
||||
currentOp.ensureStarted();
|
||||
currentOp.done();
|
||||
int ms = currentOp.totalTimeMillis();
|
||||
debug.executionTime = currentOp.totalTimeMillis();
|
||||
|
||||
//DEV log = true;
|
||||
if ( log || ms > logThreshold ) {
|
||||
if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && ms < 4300 && !log ) {
|
||||
if ( log || debug.executionTime > logThreshold ) {
|
||||
if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && debug.executionTime < 4300 && !log ) {
|
||||
/* it's normal for getMore on the oplog to be slow because of use of awaitdata flag. */
|
||||
}
|
||||
else {
|
||||
debug.executionTime = ms;
|
||||
mongo::tlog() << debug << endl;
|
||||
}
|
||||
}
|
||||
|
||||
if ( currentOp.shouldDBProfile( ms ) ) {
|
||||
if ( currentOp.shouldDBProfile( debug.executionTime ) ) {
|
||||
// performance profiling is on
|
||||
if ( dbMutex.getState() < 0 ) {
|
||||
mongo::log(1) << "note: not profiling because recursive read lock" << endl;
|
||||
|
||||
46
db/oplog.cpp
46
db/oplog.cpp
@@ -625,9 +625,13 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
void applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
|
||||
/** @param fromRepl false if from ApplyOpsCmd
|
||||
@return true if was and update should have happened and the document DNE. see replset initial sync code.
|
||||
*/
|
||||
bool applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
|
||||
assertInWriteLock();
|
||||
LOG(6) << "applying op: " << op << endl;
|
||||
bool failedUpdate = false;
|
||||
|
||||
OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters;
|
||||
|
||||
@@ -680,9 +684,45 @@ namespace mongo {
|
||||
}
|
||||
else if ( *opType == 'u' ) {
|
||||
opCounters->gotUpdate();
|
||||
// dm do we create this for a capped collection?
|
||||
// - if not, updates would be slow
|
||||
// - but if were by id would be slow on primary too so maybe ok
|
||||
// - if on primary was by another key and there are other indexes, this could be very bad w/out an index
|
||||
// - if do create, odd to have on secondary but not primary. also can cause secondary to block for
|
||||
// quite a while on creation.
|
||||
RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
|
||||
OpDebug debug;
|
||||
updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ fields[3].booleanSafe(), /*multi*/ false, /*logop*/ false , debug );
|
||||
BSONObj updateCriteria = op.getObjectField("o2");
|
||||
bool upsert = fields[3].booleanSafe();
|
||||
UpdateResult ur = updateObjects(ns, o, updateCriteria, upsert, /*multi*/ false, /*logop*/ false , debug );
|
||||
if( ur.num == 0 ) {
|
||||
if( ur.mod ) {
|
||||
if( updateCriteria.nFields() == 1 ) {
|
||||
// was a simple { _id : ... } update criteria
|
||||
failedUpdate = true;
|
||||
// todo: probably should assert in these failedUpdate cases if not in initialSync
|
||||
}
|
||||
// need to check to see if it isn't present so we can set failedUpdate correctly.
|
||||
// note that adds some overhead for this extra check in some cases, such as an updateCriteria
|
||||
// of the form
|
||||
// { _id:..., { x : {$size:...} }
|
||||
// thus this is not ideal.
|
||||
else if( Helpers::findById(nsdetails(ns), updateCriteria).isNull() ) {
|
||||
failedUpdate = true;
|
||||
}
|
||||
else {
|
||||
// it's present; zero objects were updated because of additional specifiers in the query for idempotence
|
||||
}
|
||||
}
|
||||
else {
|
||||
// this could happen benignly on an oplog duplicate replay of an upsert
|
||||
// (because we are idempotent),
|
||||
// if an regular non-mod update fails the item is (presumably) missing.
|
||||
if( !upsert ) {
|
||||
failedUpdate = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else if ( *opType == 'd' ) {
|
||||
opCounters->gotDelete();
|
||||
@@ -703,7 +743,7 @@ namespace mongo {
|
||||
else {
|
||||
throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) );
|
||||
}
|
||||
|
||||
return failedUpdate;
|
||||
}
|
||||
|
||||
class ApplyOpsCmd : public Command {
|
||||
|
||||
@@ -130,5 +130,5 @@ namespace mongo {
|
||||
* used for applying from an oplog
|
||||
* @param fromRepl really from replication or for testing/internal/command/etc...
|
||||
*/
|
||||
void applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
|
||||
bool applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
|
||||
}
|
||||
|
||||
@@ -1354,7 +1354,8 @@ namespace mongo {
|
||||
logOp( "i", ns, no );
|
||||
return UpdateResult( 0 , 0 , 1 , no );
|
||||
}
|
||||
return UpdateResult( 0 , 0 , 0 );
|
||||
|
||||
return UpdateResult( 0 , isOperatorUpdate , 0 );
|
||||
}
|
||||
|
||||
UpdateResult updateObjects(const char *ns, const BSONObj& updateobj, BSONObj patternOrig, bool upsert, bool multi, bool logop , OpDebug& debug ) {
|
||||
|
||||
@@ -328,7 +328,7 @@ namespace mongo {
|
||||
bool matchesElement( const BSONElement &e, int i, bool direction ) const;
|
||||
bool matchesKey( const BSONObj &key ) const;
|
||||
vector<FieldRange> _ranges;
|
||||
const IndexSpec &_indexSpec;
|
||||
IndexSpec _indexSpec;
|
||||
int _direction;
|
||||
vector<BSONObj> _queries; // make sure mem owned
|
||||
friend class FieldRangeVectorIterator;
|
||||
|
||||
@@ -112,7 +112,8 @@ namespace mongo {
|
||||
class Rolling {
|
||||
|
||||
public:
|
||||
Rolling() {
|
||||
Rolling()
|
||||
: _lock( "ps::Rolling" ){
|
||||
_curSlice = 0;
|
||||
_lastRotate = Listener::getElapsedTimeMillis();
|
||||
}
|
||||
@@ -126,8 +127,8 @@ namespace mongo {
|
||||
bool access( size_t region , short offset , bool doHalf ) {
|
||||
int regionHash = hash(region);
|
||||
|
||||
scoped_spinlock lk( _lock );
|
||||
|
||||
SimpleMutex::scoped_lock lk( _lock );
|
||||
|
||||
static int rarely_count = 0;
|
||||
if ( rarely_count++ % 2048 == 0 ) {
|
||||
long long now = Listener::getElapsedTimeMillis();
|
||||
@@ -174,7 +175,7 @@ namespace mongo {
|
||||
long long _lastRotate;
|
||||
Slice _slices[NumSlices];
|
||||
|
||||
SpinLock _lock;
|
||||
SimpleMutex _lock;
|
||||
} rolling;
|
||||
|
||||
}
|
||||
|
||||
@@ -491,7 +491,7 @@ namespace mongo {
|
||||
void _syncThread();
|
||||
bool tryToGoLiveAsASecondary(OpTime&); // readlocks
|
||||
void syncTail();
|
||||
void syncApply(const BSONObj &o);
|
||||
bool syncApply(const BSONObj &o);
|
||||
unsigned _syncRollback(OplogReader& r);
|
||||
void syncRollback(OplogReader& r);
|
||||
void syncFixUp(HowToFixUp& h, OplogReader& r);
|
||||
|
||||
@@ -80,6 +80,22 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
bool operator==(const MemberCfg& r) const {
|
||||
if (!tags.empty() || !r.tags.empty()) {
|
||||
if (tags.size() != r.tags.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// if they are the same size and not equal, at least one
|
||||
// element in A must be different in B
|
||||
for (map<string,string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) {
|
||||
map<string,string>::const_iterator rit = r.tags.find((*lit).first);
|
||||
|
||||
if (rit == r.tags.end() || (*lit).second != (*rit).second) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
|
||||
arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden &&
|
||||
buildIndexes == buildIndexes;
|
||||
|
||||
@@ -32,17 +32,19 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
/* apply the log op that is in param o */
|
||||
void ReplSetImpl::syncApply(const BSONObj &o) {
|
||||
/* apply the log op that is in param o
|
||||
@return bool failedUpdate
|
||||
*/
|
||||
bool ReplSetImpl::syncApply(const BSONObj &o) {
|
||||
const char *ns = o.getStringField("ns");
|
||||
if ( *ns == '.' || *ns == 0 ) {
|
||||
blank(o);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
Client::Context ctx(ns);
|
||||
ctx.getClient()->curop()->reset();
|
||||
applyOperation_inlock(o);
|
||||
return applyOperation_inlock(o);
|
||||
}
|
||||
|
||||
/* initial oplog application, during initial sync, after cloning.
|
||||
@@ -57,6 +59,7 @@ namespace mongo {
|
||||
|
||||
const string hn = source->h().toString();
|
||||
OplogReader r;
|
||||
OplogReader missingObjReader;
|
||||
try {
|
||||
if( !r.connect(hn) ) {
|
||||
log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog;
|
||||
@@ -133,9 +136,48 @@ namespace mongo {
|
||||
throw DBException("primary changed",0);
|
||||
}
|
||||
|
||||
if( ts >= applyGTE ) {
|
||||
// optimes before we started copying need not be applied.
|
||||
syncApply(o);
|
||||
if( ts >= applyGTE ) { // optimes before we started copying need not be applied.
|
||||
bool failedUpdate = syncApply(o);
|
||||
if( failedUpdate ) {
|
||||
// we don't have the object yet, which is possible on initial sync. get it.
|
||||
log() << "replSet info adding missing object" << endl; // rare enough we can log
|
||||
if( !missingObjReader.connect(hn) ) { // ok to call more than once
|
||||
log() << "replSet initial sync fails, couldn't connect to " << hn << endl;
|
||||
return false;
|
||||
}
|
||||
const char *ns = o.getStringField("ns");
|
||||
BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")["_id"]).obj(); // might be more than just _id in the update criteria
|
||||
BSONObj missingObj;
|
||||
try {
|
||||
missingObj = missingObjReader.findOne(
|
||||
ns,
|
||||
query );
|
||||
} catch(...) {
|
||||
log() << "replSet assertion fetching missing object" << endl;
|
||||
throw;
|
||||
}
|
||||
if( missingObj.isEmpty() ) {
|
||||
log() << "replSet missing object not found on source. presumably deleted later in oplog" << endl;
|
||||
log() << "replSet o2: " << o.getObjectField("o2").toString() << endl;
|
||||
log() << "replSet o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl;
|
||||
}
|
||||
else {
|
||||
Client::Context ctx(ns);
|
||||
try {
|
||||
DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize());
|
||||
assert( !d.isNull() );
|
||||
} catch(...) {
|
||||
log() << "replSet assertion during insert of missing object" << endl;
|
||||
throw;
|
||||
}
|
||||
// now reapply the update from above
|
||||
bool failed = syncApply(o);
|
||||
if( failed ) {
|
||||
log() << "replSet update still fails after adding missing object " << ns << endl;
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
|
||||
}
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
#include "../db/oplog.h"
|
||||
#include "../db/queryoptimizer.h"
|
||||
|
||||
#include "../db/repl/rs.h"
|
||||
|
||||
namespace mongo {
|
||||
void createOplog();
|
||||
}
|
||||
@@ -1101,7 +1103,26 @@ namespace ReplTests {
|
||||
ASSERT_EXCEPTION( fsc.recoverFromYield(), MsgAssertionException );
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/** Check ReplSetConfig::MemberCfg equality */
|
||||
class ReplSetMemberCfgEquality : public Base {
|
||||
public:
|
||||
void run() {
|
||||
ReplSetConfig::MemberCfg m1, m2;
|
||||
assert(m1 == m2);
|
||||
m1.tags["x"] = "foo";
|
||||
assert(m1 != m2);
|
||||
m2.tags["y"] = "bar";
|
||||
assert(m1 != m2);
|
||||
m1.tags["y"] = "bar";
|
||||
assert(m1 != m2);
|
||||
m2.tags["x"] = "foo";
|
||||
assert(m1 == m2);
|
||||
m1.tags.clear();
|
||||
assert(m1 != m2);
|
||||
}
|
||||
};
|
||||
|
||||
class All : public Suite {
|
||||
public:
|
||||
All() : Suite( "repl" ) {
|
||||
@@ -1158,6 +1179,7 @@ namespace ReplTests {
|
||||
add< DatabaseIgnorerUpdate >();
|
||||
add< FindingStartCursorStale >();
|
||||
add< FindingStartCursorYield >();
|
||||
add< ReplSetMemberCfgEquality >();
|
||||
}
|
||||
} myall;
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include <boost/thread/thread.hpp>
|
||||
#include "dbtests.h"
|
||||
#include "../util/concurrency/spin_lock.h"
|
||||
#include "../util/timer.h"
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -73,8 +74,10 @@ namespace {
|
||||
int counter = 0;
|
||||
|
||||
const int threads = 64;
|
||||
const int incs = 10000;
|
||||
const int incs = 50000;
|
||||
LockTester* testers[threads];
|
||||
|
||||
Timer timer;
|
||||
|
||||
for ( int i = 0; i < threads; i++ ) {
|
||||
testers[i] = new LockTester( &spin, &counter );
|
||||
@@ -87,7 +90,10 @@ namespace {
|
||||
ASSERT_EQUALS( testers[i]->requests(), incs );
|
||||
delete testers[i];
|
||||
}
|
||||
|
||||
|
||||
int ms = timer.millis();
|
||||
log() << "spinlock ConcurrentIncs time: " << ms << endl;
|
||||
|
||||
ASSERT_EQUALS( counter, threads*incs );
|
||||
#if defined(__linux__)
|
||||
ASSERT( SpinLock::isfast() );
|
||||
|
||||
13
debian/changelog
vendored
13
debian/changelog
vendored
@@ -1,3 +1,16 @@
|
||||
mongodb (2.0.1) unstable; urgency=low
|
||||
|
||||
* see http://www.mongodb.org/display/DOCS/2.0+Release+Notes
|
||||
* see http://jira.mongodb.org/browse/SERVER/fixforversion/10890
|
||||
|
||||
-- Richard Kreuter <richard@10gen.com> Fri, 21 Oct 2011 16:56:28 -0500
|
||||
|
||||
mongodb (2.0.0) unstable; urgency=low
|
||||
|
||||
* see http://www.mongodb.org/display/DOCS/2.0+Release+Notes
|
||||
|
||||
-- Richard Kreuter <richard@10gen.com> Mon, 12 Sep 2011 16:56:28 -0500
|
||||
|
||||
mongodb (1.9.2) unstable; urgency=low
|
||||
|
||||
* see http://jira.mongodb.org/browse/SERVER/fixforversion/10261
|
||||
|
||||
@@ -188,4 +188,46 @@ freely, subject to the following restrictions:
|
||||
L. Peter Deutsch
|
||||
ghost@aladdin.com
|
||||
|
||||
5) License notice for Snappy - http://code.google.com/p/snappy/
|
||||
---------------------------------
|
||||
Copyright 2005 and onwards Google Inc.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google Inc. nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
A light-weight compression algorithm. It is designed for speed of
|
||||
compression and decompression, rather than for the utmost in space
|
||||
savings.
|
||||
|
||||
For getting better compression ratios when you are compressing data
|
||||
with long repeated sequences or compressing data that is similar to
|
||||
other data, while still compressing fast, you might look at first
|
||||
using BMDiff and then compressing the output of BMDiff with
|
||||
Snappy.
|
||||
|
||||
|
||||
|
||||
End
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#---------------------------------------------------------------------------
|
||||
DOXYFILE_ENCODING = UTF-8
|
||||
PROJECT_NAME = MongoDB
|
||||
PROJECT_NUMBER = 2.0.0-rc3-pre-
|
||||
PROJECT_NUMBER = 2.0.1
|
||||
OUTPUT_DIRECTORY = docs/doxygen
|
||||
CREATE_SUBDIRS = NO
|
||||
OUTPUT_LANGUAGE = English
|
||||
|
||||
11
jstests/filemd5.js
Normal file
11
jstests/filemd5.js
Normal file
@@ -0,0 +1,11 @@
|
||||
|
||||
db.fs.chunks.drop();
|
||||
db.fs.chunks.insert({files_id:1,n:0,data:new BinData(0,"test")})
|
||||
|
||||
x = db.runCommand({"filemd5":1,"root":"fs"});
|
||||
assert( ! x.ok , tojson(x) )
|
||||
|
||||
db.fs.chunks.ensureIndex({files_id:1,n:1})
|
||||
x = db.runCommand({"filemd5":1,"root":"fs"});
|
||||
assert( x.ok , tojson(x) )
|
||||
|
||||
@@ -64,6 +64,6 @@ result = nodes[0].getDB("admin").runCommand({replSetInitiate : {_id : "testSet2"
|
||||
{_id : 0, tags : ["member0"]}
|
||||
]}});
|
||||
|
||||
assert(result.errmsg.match(/bad or missing host field/));
|
||||
assert(result.errmsg.match(/bad or missing host field/) , "error message doesn't match, got result:" + tojson(result) );
|
||||
|
||||
replTest2.stopSet();
|
||||
|
||||
38
jstests/sharding/reset_shard_version.js
Normal file
38
jstests/sharding/reset_shard_version.js
Normal file
@@ -0,0 +1,38 @@
|
||||
// Tests whether a reset sharding version triggers errors
|
||||
|
||||
jsTestLog( "Starting sharded cluster..." )
|
||||
|
||||
var st = new ShardingTest( { shards : 1, mongos : 2 } )
|
||||
|
||||
var mongosA = st.s0
|
||||
var mongosB = st.s1
|
||||
|
||||
var collA = mongosA.getCollection( jsTestName() + ".coll" )
|
||||
collA.drop()
|
||||
var collB = mongosB.getCollection( "" + collA )
|
||||
|
||||
st.shardColl( collA, { _id : 1 }, false )
|
||||
|
||||
jsTestLog( "Inserting data..." )
|
||||
|
||||
// Insert some data
|
||||
for ( var i = 0; i < 100; i++ ) {
|
||||
collA.insert( { _id : i } )
|
||||
}
|
||||
|
||||
jsTestLog( "Setting connection versions on both mongoses..." )
|
||||
|
||||
assert.eq( collA.find().itcount(), 100 )
|
||||
assert.eq( collB.find().itcount(), 100 )
|
||||
|
||||
jsTestLog( "Resetting connection version on shard..." )
|
||||
|
||||
var admin = st.shard0.getDB( "admin" )
|
||||
printjson( admin.runCommand( {
|
||||
setShardVersion : "" + collA, version : new Timestamp( 0, 0 ), configdb : st._configDB, serverID : new ObjectId(),
|
||||
authoritative : true } ) )
|
||||
|
||||
jsTestLog( "Querying with version reset..." )
|
||||
|
||||
// This will cause a version check
|
||||
printjson( collA.findOne() )
|
||||
@@ -51,7 +51,7 @@ assert.soon(
|
||||
return res.length > 1 && Math.abs( res[0].nChunks - res[1].nChunks ) <= 3;
|
||||
|
||||
} ,
|
||||
"never migrated" , 180000 , 1000 );
|
||||
"never migrated" , 9 * 60 * 1000 , 1000 );
|
||||
|
||||
stopMongod( 30000 );
|
||||
stopMongod( 29999 );
|
||||
|
||||
32
jstests/tool/dumpsecondary.js
Normal file
32
jstests/tool/dumpsecondary.js
Normal file
@@ -0,0 +1,32 @@
|
||||
var replTest = new ReplSetTest( {name: 'testSet', nodes: 2} );
|
||||
|
||||
var nodes = replTest.startSet();
|
||||
replTest.initiate();
|
||||
|
||||
var master = replTest.getMaster();
|
||||
db = master.getDB("foo")
|
||||
db.foo.save({a: 1000});
|
||||
replTest.awaitReplication();
|
||||
replTest.awaitSecondaryNodes();
|
||||
|
||||
assert.eq( 1 , db.foo.count() , "setup" );
|
||||
|
||||
var slaves = replTest.liveNodes.slaves;
|
||||
assert( slaves.length == 1, "Expected 1 slave but length was " + slaves.length );
|
||||
slave = slaves[0];
|
||||
|
||||
runMongoProgram.apply(null, ['mongodump', '-h', slave.host, '--out', '/data/db/jstests_tool_dumpsecondary_external/'])
|
||||
|
||||
db.foo.drop()
|
||||
|
||||
assert.eq( 0 , db.foo.count() , "after drop" );
|
||||
|
||||
runMongoProgram.apply(null, ['mongorestore', '-h', master.host, '/data/db/jstests_tool_dumpsecondary_external/'])
|
||||
|
||||
assert.soon( "db.foo.findOne()" , "no data after sleep" );
|
||||
assert.eq( 1 , db.foo.count() , "after restore" );
|
||||
assert.eq( 1000 , db.foo.findOne().a , "after restore 2" );
|
||||
|
||||
resetDbpath('/data/db/jstests_tool_dumpsecondary_external')
|
||||
|
||||
replTest.stopSet(15)
|
||||
27
jstests/tool/exportimport3.js
Normal file
27
jstests/tool/exportimport3.js
Normal file
@@ -0,0 +1,27 @@
|
||||
// exportimport3.js
|
||||
|
||||
t = new ToolTest( "exportimport3" );
|
||||
|
||||
c = t.startDB( "foo" );
|
||||
assert.eq( 0 , c.count() , "setup1" );
|
||||
c.save({a:1})
|
||||
c.save({a:2})
|
||||
c.save({a:3})
|
||||
c.save({a:4})
|
||||
c.save({a:5})
|
||||
|
||||
assert.eq( 5 , c.count() , "setup2" );
|
||||
|
||||
|
||||
t.runTool( "export" , "--jsonArray" , "--out" , t.extFile , "-d" , t.baseName , "-c" , "foo" );
|
||||
|
||||
c.drop();
|
||||
assert.eq( 0 , c.count() , "after drop" , "-d" , t.baseName , "-c" , "foo" );;
|
||||
|
||||
t.runTool( "import" , "--jsonArray" , "--file" , t.extFile , "-d" , t.baseName , "-c" , "foo" );
|
||||
|
||||
assert.soon( "c.findOne()" , "no data after sleep" );
|
||||
assert.eq( 5 , c.count() , "after restore 2" );
|
||||
|
||||
|
||||
t.stop();
|
||||
@@ -1,5 +1,5 @@
|
||||
Name: mongo
|
||||
Version: 2.0.0-rc2
|
||||
Version: 2.0.1
|
||||
Release: mongodb_1%{?dist}
|
||||
Summary: mongo client shell and tools
|
||||
License: AGPL 3.0
|
||||
|
||||
@@ -92,7 +92,7 @@ namespace mongo {
|
||||
}
|
||||
|
||||
if ( maxOpsQueued ) {
|
||||
log() << "biggest shard has unprocessed writebacks, waiting for completion of migrate" << endl;
|
||||
log() << "biggest shard " << max.first << " has unprocessed writebacks, waiting for completion of migrate" << endl;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
29
s/config.cpp
29
s/config.cpp
@@ -185,9 +185,9 @@ namespace mongo {
|
||||
return true;
|
||||
}
|
||||
|
||||
ChunkManagerPtr DBConfig::getChunkManagerIfExists( const string& ns, bool shouldReload ){
|
||||
ChunkManagerPtr DBConfig::getChunkManagerIfExists( const string& ns, bool shouldReload, bool forceReload ){
|
||||
try{
|
||||
return getChunkManager( ns, shouldReload );
|
||||
return getChunkManager( ns, shouldReload, forceReload );
|
||||
}
|
||||
catch( AssertionException& e ){
|
||||
warning() << "chunk manager not found for " << ns << causedBy( e ) << endl;
|
||||
@@ -195,7 +195,7 @@ namespace mongo {
|
||||
}
|
||||
}
|
||||
|
||||
ChunkManagerPtr DBConfig::getChunkManager( const string& ns , bool shouldReload ) {
|
||||
ChunkManagerPtr DBConfig::getChunkManager( const string& ns , bool shouldReload, bool forceReload ) {
|
||||
BSONObj key;
|
||||
bool unique;
|
||||
ShardChunkVersion oldVersion;
|
||||
@@ -205,7 +205,7 @@ namespace mongo {
|
||||
|
||||
CollectionInfo& ci = _collections[ns];
|
||||
|
||||
bool earlyReload = ! ci.isSharded() && shouldReload;
|
||||
bool earlyReload = ! ci.isSharded() && ( shouldReload || forceReload );
|
||||
if ( earlyReload ) {
|
||||
// this is to catch cases where there this is a new sharded collection
|
||||
_reload();
|
||||
@@ -214,7 +214,7 @@ namespace mongo {
|
||||
massert( 10181 , (string)"not sharded:" + ns , ci.isSharded() );
|
||||
assert( ! ci.key().isEmpty() );
|
||||
|
||||
if ( ! shouldReload || earlyReload )
|
||||
if ( ! ( shouldReload || forceReload ) || earlyReload )
|
||||
return ci.getCM();
|
||||
|
||||
key = ci.key().copy();
|
||||
@@ -225,7 +225,7 @@ namespace mongo {
|
||||
|
||||
assert( ! key.isEmpty() );
|
||||
|
||||
if ( oldVersion > 0 ) {
|
||||
if ( oldVersion > 0 && ! forceReload ) {
|
||||
ScopedDbConnection conn( configServer.modelServer() , 30.0 );
|
||||
BSONObj newest = conn->findOne( ShardNS::chunk ,
|
||||
Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) );
|
||||
@@ -240,7 +240,11 @@ namespace mongo {
|
||||
return ci.getCM();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
else if( oldVersion == 0 ){
|
||||
warning() << "version 0 found when " << ( forceReload ? "reloading" : "checking" ) << " chunk manager"
|
||||
<< ", collection '" << ns << "' initially detected as sharded" << endl;
|
||||
}
|
||||
|
||||
// we are not locked now, and want to load a new ChunkManager
|
||||
@@ -257,8 +261,15 @@ namespace mongo {
|
||||
CollectionInfo& ci = _collections[ns];
|
||||
massert( 14822 , (string)"state changed in the middle: " + ns , ci.isSharded() );
|
||||
|
||||
if ( temp->getVersion() > ci.getCM()->getVersion() ) {
|
||||
// we only want to reset if we're newer
|
||||
bool forced = false;
|
||||
if ( temp->getVersion() > ci.getCM()->getVersion() ||
|
||||
(forced = (temp->getVersion() == ci.getCM()->getVersion() && forceReload ) ) ) {
|
||||
|
||||
if( forced ){
|
||||
warning() << "chunk manager reload forced for collection '" << ns << "', config version is " << temp->getVersion() << endl;
|
||||
}
|
||||
|
||||
// we only want to reset if we're newer or equal and forced
|
||||
// otherwise we go into a bad cycle
|
||||
ci.resetCM( temp.release() );
|
||||
}
|
||||
|
||||
@@ -142,8 +142,8 @@ namespace mongo {
|
||||
*/
|
||||
bool isSharded( const string& ns );
|
||||
|
||||
ChunkManagerPtr getChunkManager( const string& ns , bool reload = false );
|
||||
ChunkManagerPtr getChunkManagerIfExists( const string& ns , bool reload = false );
|
||||
ChunkManagerPtr getChunkManager( const string& ns , bool reload = false, bool forceReload = false );
|
||||
ChunkManagerPtr getChunkManagerIfExists( const string& ns , bool reload = false, bool forceReload = false );
|
||||
|
||||
/**
|
||||
* @return the correct for shard for the ns
|
||||
|
||||
@@ -107,7 +107,10 @@ namespace mongo {
|
||||
b.append( "connectionId" , cc().getConnectionId() );
|
||||
b.append( "instanceIdent" , prettyHostName() );
|
||||
b.appendTimestamp( "version" , shardingState.getVersion( ns ) );
|
||||
b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) );
|
||||
|
||||
ShardedConnectionInfo* info = ShardedConnectionInfo::get( false );
|
||||
b.appendTimestamp( "yourVersion" , info ? info->getVersion(ns) : (ConfigVersion)0 );
|
||||
|
||||
b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
|
||||
LOG(2) << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl;
|
||||
writeBackManager.queueWriteBack( clientID.str() , b.obj() );
|
||||
|
||||
@@ -430,8 +430,11 @@ namespace mongo {
|
||||
return checkConfigOrInit( configdb , authoritative , errmsg , result , true );
|
||||
}
|
||||
|
||||
bool checkMongosID( ShardedConnectionInfo* info, const BSONElement& id, string errmsg ) {
|
||||
bool checkMongosID( ShardedConnectionInfo* info, const BSONElement& id, string& errmsg ) {
|
||||
if ( id.type() != jstOID ) {
|
||||
if ( ! info->hasID() ) {
|
||||
warning() << "bad serverID set in setShardVersion and none in info: " << id << endl;
|
||||
}
|
||||
// TODO: fix this
|
||||
//errmsg = "need serverID to be an OID";
|
||||
//return 0;
|
||||
@@ -465,6 +468,10 @@ namespace mongo {
|
||||
lastError.disableForCommand();
|
||||
ShardedConnectionInfo* info = ShardedConnectionInfo::get( true );
|
||||
|
||||
// make sure we have the mongos id for writebacks
|
||||
if ( ! checkMongosID( info , cmdObj["serverID"] , errmsg ) )
|
||||
return false;
|
||||
|
||||
bool authoritative = cmdObj.getBoolField( "authoritative" );
|
||||
|
||||
// check config server is ok or enable sharding
|
||||
@@ -477,10 +484,6 @@ namespace mongo {
|
||||
shardingState.gotShardHost( cmdObj["shardHost"].String() );
|
||||
}
|
||||
|
||||
// make sure we have the mongos id for writebacks
|
||||
if ( ! checkMongosID( info , cmdObj["serverID"] , errmsg ) )
|
||||
return false;
|
||||
|
||||
// step 2
|
||||
|
||||
string ns = cmdObj["setShardVersion"].valuestrsafe();
|
||||
|
||||
@@ -58,7 +58,7 @@ namespace mongo {
|
||||
reset();
|
||||
}
|
||||
|
||||
void Request::reset( bool reload ) {
|
||||
void Request::reset( bool reload, bool forceReload ) {
|
||||
if ( _m.operation() == dbKillCursors ) {
|
||||
return;
|
||||
}
|
||||
@@ -70,7 +70,7 @@ namespace mongo {
|
||||
_config = grid.getDBConfig( nsStr );
|
||||
if ( reload ) {
|
||||
if ( _config->isSharded( nsStr ) )
|
||||
_config->getChunkManager( nsStr , true );
|
||||
_config->getChunkManager( nsStr , true, forceReload );
|
||||
else
|
||||
_config->reload();
|
||||
}
|
||||
@@ -137,7 +137,7 @@ namespace mongo {
|
||||
ShardConnection::checkMyConnectionVersions( getns() );
|
||||
if (!staleConfig.justConnection() )
|
||||
sleepsecs( attempt );
|
||||
reset( ! staleConfig.justConnection() );
|
||||
reset( ! staleConfig.justConnection(), attempt >= 2 );
|
||||
_d.markReset();
|
||||
process( attempt + 1 );
|
||||
return;
|
||||
|
||||
@@ -91,7 +91,7 @@ namespace mongo {
|
||||
|
||||
void init();
|
||||
|
||||
void reset( bool reload=false );
|
||||
void reset( bool reload=false, bool forceReload = false );
|
||||
|
||||
private:
|
||||
Message& _m;
|
||||
|
||||
@@ -140,6 +140,13 @@ namespace mongo {
|
||||
version = manager->getVersion( Shard::make( conn->getServerAddress() ) );
|
||||
}
|
||||
|
||||
if( version == 0 ){
|
||||
LOG(2) << "resetting shard version of " << ns << " on " << conn->getServerAddress() << ", " <<
|
||||
( ! isSharded ? "no longer sharded" :
|
||||
( ! manager ? "no chunk manager found" :
|
||||
"version is zero" ) ) << endl;
|
||||
}
|
||||
|
||||
LOG(2) << " have to set shard version for conn: " << conn << " ns:" << ns
|
||||
<< " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber
|
||||
<< " version: " << version << " manager: " << manager.get()
|
||||
|
||||
@@ -398,13 +398,14 @@ string finishCode( string code ) {
|
||||
while ( ! isBalanced( code ) ) {
|
||||
inMultiLine = 1;
|
||||
code += "\n";
|
||||
// cancel multiline if two blank lines are entered
|
||||
if ( code.find("\n\n\n") != string::npos )
|
||||
return ";";
|
||||
char * line = shellReadline("... " , 1 );
|
||||
if ( gotInterrupted )
|
||||
return "";
|
||||
if ( ! line )
|
||||
return "";
|
||||
if ( code.find("\n\n") != string::npos ) // cancel multiline if two blank lines are entered
|
||||
return ";";
|
||||
|
||||
while (startsWith(line, "... "))
|
||||
line += 4;
|
||||
|
||||
@@ -1005,8 +1005,8 @@ const StringData _jscode_raw_utils =
|
||||
"return {}\n"
|
||||
"}\n"
|
||||
"\n"
|
||||
"testLog = function(x){\n"
|
||||
"print( jsTestFile() + \" - \" + x )\n"
|
||||
"jsTestLog = function(msg){\n"
|
||||
"print( \"\\n\\n----\\n\" + msg + \"\\n----\\n\\n\" )\n"
|
||||
"}\n"
|
||||
"\n"
|
||||
"shellPrintHelper = function (x) {\n"
|
||||
|
||||
@@ -235,7 +235,8 @@ ShardingTest = function( testName , numShards , verboseLevel , numMongos , other
|
||||
rs.awaitReplication();
|
||||
var xxx = new Mongo( rs.getURL() );
|
||||
xxx.name = rs.getURL();
|
||||
this._connections.push( xxx );
|
||||
this._connections.push( xxx )
|
||||
this["shard" + i] = xxx
|
||||
}
|
||||
|
||||
this._configServers = []
|
||||
@@ -260,6 +261,7 @@ ShardingTest = function( testName , numShards , verboseLevel , numMongos , other
|
||||
var conn = startMongodTest( 30000 + i , testName + i, 0, options );
|
||||
this._alldbpaths.push( testName +i )
|
||||
this._connections.push( conn );
|
||||
this["shard" + i] = conn
|
||||
}
|
||||
|
||||
if ( otherParams.sync ){
|
||||
@@ -760,8 +762,8 @@ ShardingTest.prototype.isSharded = function( collName ){
|
||||
|
||||
ShardingTest.prototype.shardGo = function( collName , key , split , move , dbName ){
|
||||
|
||||
split = split || key;
|
||||
move = move || split;
|
||||
split = ( split != false ? ( split || key ) : split )
|
||||
move = ( split != false && move != false ? ( move || split ) : false )
|
||||
|
||||
if( collName.getDB )
|
||||
dbName = "" + collName.getDB()
|
||||
@@ -782,12 +784,16 @@ ShardingTest.prototype.shardGo = function( collName , key , split , move , dbNam
|
||||
assert( false )
|
||||
}
|
||||
|
||||
if( split == false ) return
|
||||
|
||||
result = this.s.adminCommand( { split : c , middle : split } );
|
||||
if( ! result.ok ){
|
||||
printjson( result )
|
||||
assert( false )
|
||||
}
|
||||
|
||||
if( move == false ) return
|
||||
|
||||
var result = null
|
||||
for( var i = 0; i < 5; i++ ){
|
||||
result = this.s.adminCommand( { movechunk : c , find : move , to : this.getOther( this.getServer( dbName ) ).name } );
|
||||
|
||||
@@ -1000,8 +1000,8 @@ jsTestOptions = function(){
|
||||
return {}
|
||||
}
|
||||
|
||||
testLog = function(x){
|
||||
print( jsTestFile() + " - " + x )
|
||||
jsTestLog = function(msg){
|
||||
print( "\n\n----\n" + msg + "\n----\n\n" )
|
||||
}
|
||||
|
||||
shellPrintHelper = function (x) {
|
||||
|
||||
@@ -396,7 +396,7 @@ public:
|
||||
break;
|
||||
}
|
||||
len += bytesProcessed;
|
||||
line += len;
|
||||
line += bytesProcessed;
|
||||
}
|
||||
else {
|
||||
if (!parseRow(in, o, len)) {
|
||||
|
||||
@@ -423,6 +423,8 @@ namespace mongo {
|
||||
int rowCount = getParam( "rowcount" , 0 );
|
||||
int rowNum = 0;
|
||||
|
||||
auth();
|
||||
|
||||
BSONObj prev = stats();
|
||||
if ( prev.isEmpty() )
|
||||
return -1;
|
||||
|
||||
@@ -402,7 +402,7 @@ namespace mongo {
|
||||
// findOne throws an AssertionException if it's not authenticated.
|
||||
if (_coll.size() > 0) {
|
||||
// BSONTools don't have a collection
|
||||
conn().findOne(getNS(), Query("{}"));
|
||||
conn().findOne(getNS(), Query("{}"), 0, QueryOption_SlaveOk);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -203,7 +203,7 @@ namespace mongo {
|
||||
DEV mutexDebugger.entering(_name);
|
||||
}
|
||||
void unlock() {
|
||||
mutexDebugger.leaving(_name);
|
||||
DEV mutexDebugger.leaving(_name);
|
||||
check( pthread_rwlock_unlock( &_lock ) );
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,36 @@ namespace mongo {
|
||||
#if defined(_WIN32)
|
||||
EnterCriticalSection(&_cs);
|
||||
#elif defined(__USE_XOPEN2K)
|
||||
pthread_spin_lock( &_lock );
|
||||
|
||||
/**
|
||||
* this is designed to perform close to the default spin lock
|
||||
* the reason for the mild insanity is to prevent horrible performance
|
||||
* when contention spikes
|
||||
* it allows spinlocks to be used in many more places
|
||||
* which is good because even with this change they are about 8x faster on linux
|
||||
*/
|
||||
|
||||
if ( pthread_spin_trylock( &_lock ) == 0 )
|
||||
return;
|
||||
|
||||
for ( int i=0; i<1000; i++ )
|
||||
if ( pthread_spin_trylock( &_lock ) == 0 )
|
||||
return;
|
||||
|
||||
for ( int i=0; i<1000; i++ ) {
|
||||
if ( pthread_spin_trylock( &_lock ) == 0 )
|
||||
return;
|
||||
pthread_yield();
|
||||
}
|
||||
|
||||
struct timespec t;
|
||||
t.tv_sec = 0;
|
||||
t.tv_nsec = 5000000;
|
||||
|
||||
while ( pthread_spin_trylock( &_lock ) != 0 ) {
|
||||
nanosleep(&t, NULL);
|
||||
}
|
||||
|
||||
#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
|
||||
// fast path
|
||||
if (!_locked && !__sync_lock_test_and_set(&_locked, true)) {
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "mutex.h"
|
||||
#include "spin_lock.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
@@ -36,7 +37,7 @@ namespace mongo {
|
||||
builds at runtime.
|
||||
*/
|
||||
template <typename T, mutex& BY>
|
||||
class Guarded {
|
||||
class Guarded : boost::noncopyable {
|
||||
T _val;
|
||||
public:
|
||||
T& ref(const scoped_lock& lk) {
|
||||
@@ -47,29 +48,85 @@ namespace mongo {
|
||||
|
||||
class DiagStr {
|
||||
string _s;
|
||||
static mutex m;
|
||||
mutable SpinLock m;
|
||||
public:
|
||||
DiagStr(const DiagStr& r) : _s(r.get()) { }
|
||||
DiagStr() { }
|
||||
bool empty() const {
|
||||
mutex::scoped_lock lk(m);
|
||||
scoped_spinlock lk(m);
|
||||
return _s.empty();
|
||||
}
|
||||
string get() const {
|
||||
mutex::scoped_lock lk(m);
|
||||
scoped_spinlock lk(m);
|
||||
return _s;
|
||||
}
|
||||
|
||||
void set(const char *s) {
|
||||
mutex::scoped_lock lk(m);
|
||||
scoped_spinlock lk(m);
|
||||
_s = s;
|
||||
}
|
||||
void set(const string& s) {
|
||||
mutex::scoped_lock lk(m);
|
||||
scoped_spinlock lk(m);
|
||||
_s = s;
|
||||
}
|
||||
operator string() const { return get(); }
|
||||
void operator=(const string& s) { set(s); }
|
||||
void operator=(const DiagStr& rhs) {
|
||||
scoped_spinlock lk(m);
|
||||
_s = rhs.get();
|
||||
}
|
||||
};
|
||||
|
||||
#if 0 // not including in 2.0
|
||||
|
||||
/** Thread safe map.
|
||||
Be careful not to use this too much or it could make things slow;
|
||||
if not a hot code path no problem.
|
||||
|
||||
Examples:
|
||||
|
||||
mapsf<int,int> mp;
|
||||
|
||||
int x = mp.get();
|
||||
|
||||
map<int,int> two;
|
||||
mp.swap(two);
|
||||
|
||||
{
|
||||
mapsf<int,int>::ref r(mp);
|
||||
r[9] = 1;
|
||||
map<int,int>::iterator i = r.r.begin();
|
||||
}
|
||||
|
||||
*/
|
||||
template< class K, class V >
|
||||
struct mapsf : boost::noncopyable {
|
||||
SimpleMutex m;
|
||||
map<K,V> val;
|
||||
friend struct ref;
|
||||
public:
|
||||
mapsf() : m("mapsf") { }
|
||||
void swap(map<K,V>& rhs) {
|
||||
SimpleMutex::scoped_lock lk(m);
|
||||
val.swap(rhs);
|
||||
}
|
||||
// safe as we pass by value:
|
||||
V get(K k) {
|
||||
SimpleMutex::scoped_lock lk(m);
|
||||
map<K,V>::iterator i = val.find(k);
|
||||
if( i == val.end() )
|
||||
return K();
|
||||
return i->second;
|
||||
}
|
||||
// think about deadlocks when using ref. the other methods
|
||||
// above will always be safe as they are "leaf" operations.
|
||||
struct ref {
|
||||
SimpleMutex::scoped_lock lk;
|
||||
public:
|
||||
map<K,V> const &r;
|
||||
ref(mapsf<K,V> &m) : lk(m.m), r(m.val) { }
|
||||
V& operator[](const K& k) { return r[k]; }
|
||||
};
|
||||
};
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -22,8 +22,6 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
mutex DiagStr::m("diags");
|
||||
|
||||
// intentional leak. otherwise destructor orders can be problematic at termination.
|
||||
MutexDebugger &mutexDebugger = *(new MutexDebugger());
|
||||
|
||||
|
||||
@@ -166,8 +166,13 @@ namespace mongo {
|
||||
ss << responseMsg;
|
||||
string response = ss.str();
|
||||
|
||||
sock.send( response.c_str(), response.size() , "http response" );
|
||||
sock.close();
|
||||
try {
|
||||
sock.send( response.c_str(), response.size() , "http response" );
|
||||
sock.close();
|
||||
}
|
||||
catch ( SocketException& e ) {
|
||||
log(1) << "couldn't send data to http client: " << e << endl;
|
||||
}
|
||||
}
|
||||
|
||||
string MiniWebServer::getHeader( const char * req , string wanted ) {
|
||||
|
||||
@@ -135,7 +135,7 @@ namespace mongo {
|
||||
assert( strlen(v[i]) > 20 );
|
||||
int r = repeats(v, i);
|
||||
if( r < 0 ) {
|
||||
s << color( linkify( clean(v,i).c_str() ) );
|
||||
s << color( linkify( clean(v,i).c_str() ) ) << '\n';
|
||||
}
|
||||
else {
|
||||
stringstream x;
|
||||
|
||||
@@ -38,7 +38,7 @@ namespace mongo {
|
||||
* 1.2.3-rc4-pre-
|
||||
* If you really need to do something else you'll need to fix _versionArray()
|
||||
*/
|
||||
const char versionString[] = "2.0.0-rc3-pre-";
|
||||
const char versionString[] = "2.0.1";
|
||||
|
||||
// See unit test for example outputs
|
||||
static BSONArray _versionArray(const char* version){
|
||||
@@ -168,7 +168,7 @@ namespace mongo {
|
||||
f.open("/proc/self/numa_maps", /*read_only*/true);
|
||||
if ( f.is_open() && ! f.bad() ) {
|
||||
char line[100]; //we only need the first line
|
||||
f.read(0, line, sizeof(line));
|
||||
assert( read(f.fd, line, sizeof(line)) <= 100 );
|
||||
|
||||
// just in case...
|
||||
line[98] = ' ';
|
||||
|
||||
Reference in New Issue
Block a user