Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1b43b61a5 | ||
|
|
69655c6c6e | ||
|
|
f3700f4ec3 | ||
|
|
5a4b6a0acc | ||
|
|
fed35f0c08 | ||
|
|
cb2e7e34d5 | ||
|
|
c527cc73e2 | ||
|
|
9032d392d0 | ||
|
|
448ef26e3d | ||
|
|
c6039b222e |
@@ -3,7 +3,7 @@
|
||||
#---------------------------------------------------------------------------
|
||||
DOXYFILE_ENCODING = UTF-8
|
||||
PROJECT_NAME = MongoDB
|
||||
PROJECT_NUMBER = 2.2.2-rc0
|
||||
PROJECT_NUMBER = 2.2.2
|
||||
OUTPUT_DIRECTORY = docs/doxygen
|
||||
CREATE_SUBDIRS = NO
|
||||
OUTPUT_LANGUAGE = English
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
|
||||
var NODE_COUNT = 3;
|
||||
var st = new ShardingTest({ shards: { rs0: { nodes: NODE_COUNT, oplogSize: 10 }},
|
||||
separateConfig: true });
|
||||
separateConfig: true, config : 3 });
|
||||
var replTest = st.rs0;
|
||||
var mongos = st.s;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
Name: mongo-10gen
|
||||
Conflicts: mongo, mongo-10gen-unstable
|
||||
Obsoletes: mongo-stable
|
||||
Version: 2.2.1
|
||||
Version: 2.2.2
|
||||
Release: mongodb_1%{?dist}
|
||||
Summary: mongodb client shell and tools
|
||||
License: AGPL 3.0
|
||||
|
||||
@@ -212,8 +212,8 @@ namespace mongo {
|
||||
int balancedLastTime ) {
|
||||
|
||||
|
||||
// 1) check for shards that policy require to us to move off of
|
||||
// draining, maxSize
|
||||
// 1) check for shards that policy require to us to move off of:
|
||||
// draining only
|
||||
// 2) check tag policy violations
|
||||
// 3) then we make sure chunks are balanced for each tag
|
||||
|
||||
@@ -226,7 +226,7 @@ namespace mongo {
|
||||
string shard = *z;
|
||||
const ShardInfo& info = distribution.shardInfo( shard );
|
||||
|
||||
if ( ! info.isSizeMaxed() && ! info.isDraining() )
|
||||
if ( ! info.isDraining() )
|
||||
continue;
|
||||
|
||||
if ( distribution.numberOfChunksInShard( shard ) == 0 )
|
||||
|
||||
@@ -80,6 +80,10 @@ namespace mongo {
|
||||
*/
|
||||
bool hasOpsQueued() const { return _hasOpsQueued; }
|
||||
|
||||
long long getMaxSize() const { return _maxSize; }
|
||||
|
||||
long long getCurrSize() const { return _currSize; }
|
||||
|
||||
string toString() const;
|
||||
|
||||
private:
|
||||
|
||||
@@ -291,5 +291,221 @@ namespace mongo {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Idea for this test is to set up three shards, one of which is overloaded (too much data).
|
||||
*
|
||||
* Even though the overloaded shard has less chunks, we shouldn't move chunks to that shard.
|
||||
*/
|
||||
TEST( BalancerPolicyTests, MaxSizeRespect ) {
|
||||
|
||||
ShardToChunksMap chunks;
|
||||
addShard( chunks, 3 , false );
|
||||
addShard( chunks, 4 , false );
|
||||
addShard( chunks, 6 , true );
|
||||
|
||||
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3.
|
||||
// Other shards have maxSize = 0 = unset.
|
||||
|
||||
ShardInfoMap shards;
|
||||
// ShardInfo(maxSize, currSize, draining, opsQueued)
|
||||
shards["shard0"] = ShardInfo( 1, 3, false, false );
|
||||
shards["shard1"] = ShardInfo( 0, 4, false, false );
|
||||
shards["shard2"] = ShardInfo( 0, 6, false, false );
|
||||
|
||||
DistributionStatus d( shards, chunks );
|
||||
MigrateInfo* m = BalancerPolicy::balance( "ns", d, 0 );
|
||||
ASSERT( m );
|
||||
ASSERT_EQUALS( "shard2" , m->from );
|
||||
ASSERT_EQUALS( "shard1" , m->to );
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Here we check that being over the maxSize is *not* equivalent to draining, we don't want
|
||||
* to empty shards for no other reason than they are over this limit.
|
||||
*/
|
||||
TEST( BalancerPolicyTests, MaxSizeNoDrain ) {
|
||||
|
||||
ShardToChunksMap chunks;
|
||||
// Shard0 will be overloaded
|
||||
addShard( chunks, 4 , false );
|
||||
addShard( chunks, 4 , false );
|
||||
addShard( chunks, 4 , true );
|
||||
|
||||
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4.
|
||||
// Other shards have maxSize = 0 = unset.
|
||||
|
||||
ShardInfoMap shards;
|
||||
// ShardInfo(maxSize, currSize, draining, opsQueued)
|
||||
shards["shard0"] = ShardInfo( 1, 4, false, false );
|
||||
shards["shard1"] = ShardInfo( 0, 4, false, false );
|
||||
shards["shard2"] = ShardInfo( 0, 4, false, false );
|
||||
|
||||
DistributionStatus d( shards, chunks );
|
||||
MigrateInfo* m = BalancerPolicy::balance( "ns", d, 0 );
|
||||
ASSERT( !m );
|
||||
}
|
||||
|
||||
// Note: Only in 2.2, 2.4 has utility class
|
||||
class PseudoRandom {
|
||||
public:
|
||||
|
||||
PseudoRandom(unsigned int seed) {
|
||||
_seed = seed;
|
||||
}
|
||||
|
||||
int nextInt32( int max = -1 ){
|
||||
|
||||
#if !defined(_WIN32)
|
||||
int r = rand_r( &_seed ) ;
|
||||
#else
|
||||
int r = ::rand(); // seed not used in this case
|
||||
#endif
|
||||
return max > 0 ? r % max : r;
|
||||
}
|
||||
|
||||
private:
|
||||
unsigned int _seed;
|
||||
};
|
||||
|
||||
/**
|
||||
* Idea behind this test is that we set up several shards, the first two of which are
|
||||
* draining and the second two of which have a data size limit. We also simulate a random
|
||||
* number of chunks on each shard.
|
||||
*
|
||||
* Once the shards are setup, we virtually migrate numChunks times, or until there are no
|
||||
* more migrations to run. Each chunk is assumed to have a size of 1 unit, and we increment
|
||||
* our currSize for each shard as the chunks move.
|
||||
*
|
||||
* Finally, we ensure that the drained shards are drained, the data-limited shards aren't
|
||||
* overloaded, and that all shards (including the data limited shard if the baseline isn't
|
||||
* over the limit are balanced to within 1 unit of some baseline.
|
||||
*
|
||||
*/
|
||||
TEST( BalancerPolicyTests, Simulation ) {
|
||||
|
||||
// Hardcode seed here, make test deterministic.
|
||||
int64_t seed = 1337;
|
||||
PseudoRandom rng(seed);
|
||||
|
||||
// Run test 10 times
|
||||
for (int test = 0; test < 10; test++) {
|
||||
|
||||
//
|
||||
// Setup our shards as draining, with maxSize, and normal
|
||||
//
|
||||
|
||||
int numShards = 7;
|
||||
int numChunks = 0;
|
||||
|
||||
ShardToChunksMap chunks;
|
||||
ShardInfoMap shards;
|
||||
|
||||
map<string,int> expected;
|
||||
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
|
||||
int numShardChunks = rng.nextInt32(100);
|
||||
bool draining = i < 2;
|
||||
bool maxed = i >= 2 && i < 4;
|
||||
|
||||
if (draining) expected[str::stream() << "shard" << i] = 0;
|
||||
if (maxed) expected[str::stream() << "shard" << i] = numShardChunks + 1;
|
||||
|
||||
addShard(chunks, numShardChunks, false);
|
||||
numChunks += numShardChunks;
|
||||
|
||||
shards[str::stream() << "shard" << i] =
|
||||
ShardInfo(maxed ? numShardChunks + 1 : 0,
|
||||
numShardChunks, draining, false);
|
||||
}
|
||||
|
||||
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
|
||||
log() << it->first << " : " << it->second.toString() << endl;
|
||||
}
|
||||
|
||||
//
|
||||
// Perform migrations and increment data size as chunks move
|
||||
//
|
||||
|
||||
for (int i = 0; i < numChunks; i++) {
|
||||
|
||||
DistributionStatus d( shards, chunks );
|
||||
MigrateInfo* m = BalancerPolicy::balance( "ns", d, i != 0 );
|
||||
|
||||
if (!m) {
|
||||
log() << "Finished with test moves." << endl;
|
||||
break;
|
||||
}
|
||||
|
||||
moveChunk(chunks, m);
|
||||
|
||||
{
|
||||
ShardInfo& info = shards[m->from];
|
||||
shards[m->from] = ShardInfo(info.getMaxSize(),
|
||||
info.getCurrSize() - 1,
|
||||
info.isDraining(),
|
||||
info.hasOpsQueued());
|
||||
}
|
||||
|
||||
{
|
||||
ShardInfo& info = shards[m->to];
|
||||
shards[m->to] = ShardInfo(info.getMaxSize(),
|
||||
info.getCurrSize() + 1,
|
||||
info.isDraining(),
|
||||
info.hasOpsQueued());
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Make sure our balance is correct and our data size is low.
|
||||
//
|
||||
|
||||
// The balanced value is the count on the last shard, since it's not draining or
|
||||
// limited
|
||||
int balancedSize = (--shards.end())->second.getCurrSize();
|
||||
|
||||
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
|
||||
log() << it->first << " : " << it->second.toString() << endl;
|
||||
}
|
||||
|
||||
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
|
||||
|
||||
log() << it->first << " : " << it->second.toString() << endl;
|
||||
|
||||
map<string,int>::iterator expectedIt = expected.find(it->first);
|
||||
|
||||
if (expectedIt == expected.end()) {
|
||||
bool isInRange = it->second.getCurrSize() >= balancedSize - 1 &&
|
||||
it->second.getCurrSize() <= balancedSize + 1;
|
||||
|
||||
if (!isInRange) {
|
||||
warning() << "non-limited and non-draining shard had "
|
||||
<< it->second.getCurrSize() << " chunks, expected near "
|
||||
<< balancedSize << endl;
|
||||
}
|
||||
|
||||
ASSERT(isInRange);
|
||||
}
|
||||
else {
|
||||
int expectedSize = expectedIt->second;
|
||||
bool isInRange = it->second.getCurrSize() <= expectedSize;
|
||||
if (isInRange && expectedSize >= balancedSize) {
|
||||
isInRange = it->second.getCurrSize() >= balancedSize - 1 &&
|
||||
it->second.getCurrSize() <= balancedSize + 1;
|
||||
}
|
||||
|
||||
if (!isInRange) {
|
||||
warning() << "limited or draining shard had "
|
||||
<< it->second.getCurrSize() << " chunks, expected less than "
|
||||
<< expectedSize << " and (if less than expected) near "
|
||||
<< balancedSize << endl;
|
||||
}
|
||||
|
||||
ASSERT(isInRange);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1033,15 +1033,16 @@ namespace mongo {
|
||||
log(1) << "replicaSetChange: shard not found for set: " << monitor->getServerAddress() << endl;
|
||||
return;
|
||||
}
|
||||
scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getScopedDbConnection(
|
||||
scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getInternalScopedDbConnection(
|
||||
configServer.getConnectionString().toString(), 30.0 ) );
|
||||
conn->get()->update( ShardNS::shard,
|
||||
BSON( "_id" << s.getName() ),
|
||||
BSON( "$set" << BSON( "host" << monitor->getServerAddress() ) ) );
|
||||
conn->done();
|
||||
}
|
||||
catch ( DBException & ) {
|
||||
error() << "RSChangeWatcher: could not update config db for set: " << monitor->getName() << " to: " << monitor->getServerAddress() << endl;
|
||||
catch (DBException& e) {
|
||||
error() << "RSChangeWatcher: could not update config db for set: " << monitor->getName()
|
||||
<< " to: " << monitor->getServerAddress() << causedBy(e) << endl;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -294,7 +294,10 @@ namespace mongo {
|
||||
}
|
||||
|
||||
void done() {
|
||||
Lock::DBRead lk( _ns );
|
||||
log() << "MigrateFromStatus::done About to acquire global write lock to exit critical "
|
||||
"section" << endl;
|
||||
Lock::GlobalWrite lk;
|
||||
log() << "MigrateFromStatus::done Global lock acquired" << endl;
|
||||
|
||||
{
|
||||
scoped_spinlock lk( _trackerLocks );
|
||||
@@ -913,6 +916,7 @@ namespace mongo {
|
||||
configServer.logChange( "moveChunk.start" , ns , chunkInfo );
|
||||
|
||||
ShardChunkVersion maxVersion;
|
||||
ShardChunkVersion startingVersion;
|
||||
string myOldShard;
|
||||
{
|
||||
scoped_ptr<ScopedDbConnection> conn(
|
||||
@@ -981,10 +985,10 @@ namespace mongo {
|
||||
// it's possible this shard will be *at* zero version from a previous migrate and
|
||||
// no refresh will be done
|
||||
// TODO: Make this less fragile
|
||||
ShardChunkVersion shardVersion = maxVersion;
|
||||
shardingState.trySetVersion( ns , shardVersion /* will return updated */ );
|
||||
startingVersion = maxVersion;
|
||||
shardingState.trySetVersion( ns , startingVersion /* will return updated */ );
|
||||
|
||||
log() << "moveChunk request accepted at version " << shardVersion << migrateLog;
|
||||
log() << "moveChunk request accepted at version " << startingVersion << migrateLog;
|
||||
}
|
||||
|
||||
timing.done(2);
|
||||
@@ -1099,8 +1103,7 @@ namespace mongo {
|
||||
// 5.a
|
||||
// we're under the collection lock here, so no other migrate can change maxVersion or ShardChunkManager state
|
||||
migrateFromStatus.setInCriticalSection( true );
|
||||
ShardChunkVersion currVersion = maxVersion;
|
||||
ShardChunkVersion myVersion = currVersion;
|
||||
ShardChunkVersion myVersion = maxVersion;
|
||||
myVersion.incMajor();
|
||||
|
||||
{
|
||||
@@ -1120,7 +1123,8 @@ namespace mongo {
|
||||
{
|
||||
BSONObj res;
|
||||
scoped_ptr<ScopedDbConnection> connTo(
|
||||
ScopedDbConnection::getScopedDbConnection( toShard.getConnString() ) );
|
||||
ScopedDbConnection::getScopedDbConnection( toShard.getConnString(),
|
||||
35.0 ) );
|
||||
|
||||
bool ok;
|
||||
|
||||
@@ -1132,21 +1136,24 @@ namespace mongo {
|
||||
catch( DBException& e ){
|
||||
errmsg = str::stream() << "moveChunk could not contact to: shard " << toShard.getConnString() << " to commit transfer" << causedBy( e );
|
||||
warning() << errmsg << endl;
|
||||
return false;
|
||||
ok = false;
|
||||
}
|
||||
|
||||
connTo->done();
|
||||
|
||||
if ( ! ok ) {
|
||||
log() << "moveChunk migrate commit not accepted by TO-shard: " << res
|
||||
<< " resetting shard version to: " << startingVersion << migrateLog;
|
||||
{
|
||||
Lock::DBWrite lk( ns );
|
||||
Lock::GlobalWrite lk;
|
||||
log() << "moveChunk global lock acquired to reset shard version from "
|
||||
"failed migration" << endl;
|
||||
|
||||
// revert the chunk manager back to the state before "forgetting" about the chunk
|
||||
shardingState.undoDonateChunk( ns , min , max , currVersion );
|
||||
shardingState.undoDonateChunk( ns , min , max , startingVersion );
|
||||
}
|
||||
|
||||
log() << "moveChunk migrate commit not accepted by TO-shard: " << res
|
||||
<< " resetting shard version to: " << currVersion << migrateLog;
|
||||
log() << "Shard version successfully reset to clean up failed migration"
|
||||
<< endl;
|
||||
|
||||
errmsg = "_recvChunkCommit failed!";
|
||||
result.append( "cause" , res );
|
||||
@@ -1597,6 +1604,8 @@ namespace mongo {
|
||||
// this will prevent us from going into critical section until we're ready
|
||||
Timer t;
|
||||
while ( t.minutes() < 600 ) {
|
||||
log() << "Waiting for replication to catch up before entering critical section"
|
||||
<< endl;
|
||||
if ( flushPendingWrites( lastOpApplied ) )
|
||||
break;
|
||||
sleepsecs(1);
|
||||
@@ -1786,7 +1795,8 @@ namespace mongo {
|
||||
|
||||
Timer t;
|
||||
// we wait for the commit to succeed before giving up
|
||||
while ( t.minutes() <= 5 ) {
|
||||
while ( t.seconds() <= 30 ) {
|
||||
log() << "Waiting for commit to finish" << endl;
|
||||
sleepmillis(1);
|
||||
if ( state == DONE )
|
||||
return true;
|
||||
|
||||
@@ -162,6 +162,7 @@ namespace mongo {
|
||||
|
||||
void ShardingState::undoDonateChunk( const string& ns , const BSONObj& min , const BSONObj& max , ShardChunkVersion version ) {
|
||||
scoped_lock lk( _mutex );
|
||||
log() << "ShardingState::undoDonateChunk acquired _mutex" << endl;
|
||||
|
||||
ChunkManagersMap::const_iterator it = _chunks.find( ns );
|
||||
verify( it != _chunks.end() ) ;
|
||||
@@ -634,7 +635,7 @@ namespace mongo {
|
||||
if ( version < globalVersion && version.hasCompatibleEpoch( globalVersion ) ) {
|
||||
while ( shardingState.inCriticalMigrateSection() ) {
|
||||
dbtemprelease r;
|
||||
sleepmillis(2);
|
||||
sleepmillis(20);
|
||||
OCCASIONALLY log() << "waiting till out of critical section" << endl;
|
||||
}
|
||||
errmsg = "shard global version for collection is higher than trying to set to '" + ns + "'";
|
||||
|
||||
@@ -43,7 +43,7 @@ namespace mongo {
|
||||
* 1.2.3-rc4-pre-
|
||||
* If you really need to do something else you'll need to fix _versionArray()
|
||||
*/
|
||||
const char versionString[] = "2.2.2-rc0";
|
||||
const char versionString[] = "2.2.2";
|
||||
|
||||
// See unit test for example outputs
|
||||
static BSONArray _versionArray(const char* version){
|
||||
|
||||
Reference in New Issue
Block a user