Compare commits

...

10 Commits

10 changed files with 257 additions and 25 deletions

View File

@@ -3,7 +3,7 @@
#---------------------------------------------------------------------------
DOXYFILE_ENCODING = UTF-8
PROJECT_NAME = MongoDB
PROJECT_NUMBER = 2.2.2-rc0
PROJECT_NUMBER = 2.2.2
OUTPUT_DIRECTORY = docs/doxygen
CREATE_SUBDIRS = NO
OUTPUT_LANGUAGE = English

View File

@@ -5,7 +5,7 @@
var NODE_COUNT = 3;
var st = new ShardingTest({ shards: { rs0: { nodes: NODE_COUNT, oplogSize: 10 }},
separateConfig: true });
separateConfig: true, config : 3 });
var replTest = st.rs0;
var mongos = st.s;

View File

@@ -1,7 +1,7 @@
Name: mongo-10gen
Conflicts: mongo, mongo-10gen-unstable
Obsoletes: mongo-stable
Version: 2.2.1
Version: 2.2.2
Release: mongodb_1%{?dist}
Summary: mongodb client shell and tools
License: AGPL 3.0

View File

@@ -212,8 +212,8 @@ namespace mongo {
int balancedLastTime ) {
// 1) check for shards that policy require to us to move off of
// draining, maxSize
// 1) check for shards that policy require to us to move off of:
// draining only
// 2) check tag policy violations
// 3) then we make sure chunks are balanced for each tag
@@ -226,7 +226,7 @@ namespace mongo {
string shard = *z;
const ShardInfo& info = distribution.shardInfo( shard );
if ( ! info.isSizeMaxed() && ! info.isDraining() )
if ( ! info.isDraining() )
continue;
if ( distribution.numberOfChunksInShard( shard ) == 0 )

View File

@@ -80,6 +80,10 @@ namespace mongo {
*/
bool hasOpsQueued() const { return _hasOpsQueued; }
long long getMaxSize() const { return _maxSize; }
long long getCurrSize() const { return _currSize; }
string toString() const;
private:

View File

@@ -291,5 +291,221 @@ namespace mongo {
}
/**
* Idea for this test is to set up three shards, one of which is overloaded (too much data).
*
* Even though the overloaded shard has less chunks, we shouldn't move chunks to that shard.
*/
TEST( BalancerPolicyTests, MaxSizeRespect ) {
ShardToChunksMap chunks;
addShard( chunks, 3 , false );
addShard( chunks, 4 , false );
addShard( chunks, 6 , true );
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 3.
// Other shards have maxSize = 0 = unset.
ShardInfoMap shards;
// ShardInfo(maxSize, currSize, draining, opsQueued)
shards["shard0"] = ShardInfo( 1, 3, false, false );
shards["shard1"] = ShardInfo( 0, 4, false, false );
shards["shard2"] = ShardInfo( 0, 6, false, false );
DistributionStatus d( shards, chunks );
MigrateInfo* m = BalancerPolicy::balance( "ns", d, 0 );
ASSERT( m );
ASSERT_EQUALS( "shard2" , m->from );
ASSERT_EQUALS( "shard1" , m->to );
}
/**
* Here we check that being over the maxSize is *not* equivalent to draining, we don't want
* to empty shards for no other reason than they are over this limit.
*/
TEST( BalancerPolicyTests, MaxSizeNoDrain ) {
ShardToChunksMap chunks;
// Shard0 will be overloaded
addShard( chunks, 4 , false );
addShard( chunks, 4 , false );
addShard( chunks, 4 , true );
// Note that maxSize of shard0 is 1, and it is therefore overloaded with currSize = 4.
// Other shards have maxSize = 0 = unset.
ShardInfoMap shards;
// ShardInfo(maxSize, currSize, draining, opsQueued)
shards["shard0"] = ShardInfo( 1, 4, false, false );
shards["shard1"] = ShardInfo( 0, 4, false, false );
shards["shard2"] = ShardInfo( 0, 4, false, false );
DistributionStatus d( shards, chunks );
MigrateInfo* m = BalancerPolicy::balance( "ns", d, 0 );
ASSERT( !m );
}
// Note: Only in 2.2, 2.4 has utility class
class PseudoRandom {
public:
PseudoRandom(unsigned int seed) {
_seed = seed;
}
int nextInt32( int max = -1 ){
#if !defined(_WIN32)
int r = rand_r( &_seed ) ;
#else
int r = ::rand(); // seed not used in this case
#endif
return max > 0 ? r % max : r;
}
private:
unsigned int _seed;
};
/**
* Idea behind this test is that we set up several shards, the first two of which are
* draining and the second two of which have a data size limit. We also simulate a random
* number of chunks on each shard.
*
* Once the shards are setup, we virtually migrate numChunks times, or until there are no
* more migrations to run. Each chunk is assumed to have a size of 1 unit, and we increment
* our currSize for each shard as the chunks move.
*
* Finally, we ensure that the drained shards are drained, the data-limited shards aren't
* overloaded, and that all shards (including the data limited shard if the baseline isn't
* over the limit are balanced to within 1 unit of some baseline.
*
*/
TEST( BalancerPolicyTests, Simulation ) {
// Hardcode seed here, make test deterministic.
int64_t seed = 1337;
PseudoRandom rng(seed);
// Run test 10 times
for (int test = 0; test < 10; test++) {
//
// Setup our shards as draining, with maxSize, and normal
//
int numShards = 7;
int numChunks = 0;
ShardToChunksMap chunks;
ShardInfoMap shards;
map<string,int> expected;
for (int i = 0; i < numShards; i++) {
int numShardChunks = rng.nextInt32(100);
bool draining = i < 2;
bool maxed = i >= 2 && i < 4;
if (draining) expected[str::stream() << "shard" << i] = 0;
if (maxed) expected[str::stream() << "shard" << i] = numShardChunks + 1;
addShard(chunks, numShardChunks, false);
numChunks += numShardChunks;
shards[str::stream() << "shard" << i] =
ShardInfo(maxed ? numShardChunks + 1 : 0,
numShardChunks, draining, false);
}
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
log() << it->first << " : " << it->second.toString() << endl;
}
//
// Perform migrations and increment data size as chunks move
//
for (int i = 0; i < numChunks; i++) {
DistributionStatus d( shards, chunks );
MigrateInfo* m = BalancerPolicy::balance( "ns", d, i != 0 );
if (!m) {
log() << "Finished with test moves." << endl;
break;
}
moveChunk(chunks, m);
{
ShardInfo& info = shards[m->from];
shards[m->from] = ShardInfo(info.getMaxSize(),
info.getCurrSize() - 1,
info.isDraining(),
info.hasOpsQueued());
}
{
ShardInfo& info = shards[m->to];
shards[m->to] = ShardInfo(info.getMaxSize(),
info.getCurrSize() + 1,
info.isDraining(),
info.hasOpsQueued());
}
}
//
// Make sure our balance is correct and our data size is low.
//
// The balanced value is the count on the last shard, since it's not draining or
// limited
int balancedSize = (--shards.end())->second.getCurrSize();
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
log() << it->first << " : " << it->second.toString() << endl;
}
for (ShardInfoMap::iterator it = shards.begin(); it != shards.end(); ++it) {
log() << it->first << " : " << it->second.toString() << endl;
map<string,int>::iterator expectedIt = expected.find(it->first);
if (expectedIt == expected.end()) {
bool isInRange = it->second.getCurrSize() >= balancedSize - 1 &&
it->second.getCurrSize() <= balancedSize + 1;
if (!isInRange) {
warning() << "non-limited and non-draining shard had "
<< it->second.getCurrSize() << " chunks, expected near "
<< balancedSize << endl;
}
ASSERT(isInRange);
}
else {
int expectedSize = expectedIt->second;
bool isInRange = it->second.getCurrSize() <= expectedSize;
if (isInRange && expectedSize >= balancedSize) {
isInRange = it->second.getCurrSize() >= balancedSize - 1 &&
it->second.getCurrSize() <= balancedSize + 1;
}
if (!isInRange) {
warning() << "limited or draining shard had "
<< it->second.getCurrSize() << " chunks, expected less than "
<< expectedSize << " and (if less than expected) near "
<< balancedSize << endl;
}
ASSERT(isInRange);
}
}
}
}
}
}

View File

@@ -1033,15 +1033,16 @@ namespace mongo {
log(1) << "replicaSetChange: shard not found for set: " << monitor->getServerAddress() << endl;
return;
}
scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getScopedDbConnection(
scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getInternalScopedDbConnection(
configServer.getConnectionString().toString(), 30.0 ) );
conn->get()->update( ShardNS::shard,
BSON( "_id" << s.getName() ),
BSON( "$set" << BSON( "host" << monitor->getServerAddress() ) ) );
conn->done();
}
catch ( DBException & ) {
error() << "RSChangeWatcher: could not update config db for set: " << monitor->getName() << " to: " << monitor->getServerAddress() << endl;
catch (DBException& e) {
error() << "RSChangeWatcher: could not update config db for set: " << monitor->getName()
<< " to: " << monitor->getServerAddress() << causedBy(e) << endl;
}
}

View File

@@ -294,7 +294,10 @@ namespace mongo {
}
void done() {
Lock::DBRead lk( _ns );
log() << "MigrateFromStatus::done About to acquire global write lock to exit critical "
"section" << endl;
Lock::GlobalWrite lk;
log() << "MigrateFromStatus::done Global lock acquired" << endl;
{
scoped_spinlock lk( _trackerLocks );
@@ -913,6 +916,7 @@ namespace mongo {
configServer.logChange( "moveChunk.start" , ns , chunkInfo );
ShardChunkVersion maxVersion;
ShardChunkVersion startingVersion;
string myOldShard;
{
scoped_ptr<ScopedDbConnection> conn(
@@ -981,10 +985,10 @@ namespace mongo {
// it's possible this shard will be *at* zero version from a previous migrate and
// no refresh will be done
// TODO: Make this less fragile
ShardChunkVersion shardVersion = maxVersion;
shardingState.trySetVersion( ns , shardVersion /* will return updated */ );
startingVersion = maxVersion;
shardingState.trySetVersion( ns , startingVersion /* will return updated */ );
log() << "moveChunk request accepted at version " << shardVersion << migrateLog;
log() << "moveChunk request accepted at version " << startingVersion << migrateLog;
}
timing.done(2);
@@ -1099,8 +1103,7 @@ namespace mongo {
// 5.a
// we're under the collection lock here, so no other migrate can change maxVersion or ShardChunkManager state
migrateFromStatus.setInCriticalSection( true );
ShardChunkVersion currVersion = maxVersion;
ShardChunkVersion myVersion = currVersion;
ShardChunkVersion myVersion = maxVersion;
myVersion.incMajor();
{
@@ -1120,7 +1123,8 @@ namespace mongo {
{
BSONObj res;
scoped_ptr<ScopedDbConnection> connTo(
ScopedDbConnection::getScopedDbConnection( toShard.getConnString() ) );
ScopedDbConnection::getScopedDbConnection( toShard.getConnString(),
35.0 ) );
bool ok;
@@ -1132,21 +1136,24 @@ namespace mongo {
catch( DBException& e ){
errmsg = str::stream() << "moveChunk could not contact to: shard " << toShard.getConnString() << " to commit transfer" << causedBy( e );
warning() << errmsg << endl;
return false;
ok = false;
}
connTo->done();
if ( ! ok ) {
log() << "moveChunk migrate commit not accepted by TO-shard: " << res
<< " resetting shard version to: " << startingVersion << migrateLog;
{
Lock::DBWrite lk( ns );
Lock::GlobalWrite lk;
log() << "moveChunk global lock acquired to reset shard version from "
"failed migration" << endl;
// revert the chunk manager back to the state before "forgetting" about the chunk
shardingState.undoDonateChunk( ns , min , max , currVersion );
shardingState.undoDonateChunk( ns , min , max , startingVersion );
}
log() << "moveChunk migrate commit not accepted by TO-shard: " << res
<< " resetting shard version to: " << currVersion << migrateLog;
log() << "Shard version successfully reset to clean up failed migration"
<< endl;
errmsg = "_recvChunkCommit failed!";
result.append( "cause" , res );
@@ -1597,6 +1604,8 @@ namespace mongo {
// this will prevent us from going into critical section until we're ready
Timer t;
while ( t.minutes() < 600 ) {
log() << "Waiting for replication to catch up before entering critical section"
<< endl;
if ( flushPendingWrites( lastOpApplied ) )
break;
sleepsecs(1);
@@ -1786,7 +1795,8 @@ namespace mongo {
Timer t;
// we wait for the commit to succeed before giving up
while ( t.minutes() <= 5 ) {
while ( t.seconds() <= 30 ) {
log() << "Waiting for commit to finish" << endl;
sleepmillis(1);
if ( state == DONE )
return true;

View File

@@ -162,6 +162,7 @@ namespace mongo {
void ShardingState::undoDonateChunk( const string& ns , const BSONObj& min , const BSONObj& max , ShardChunkVersion version ) {
scoped_lock lk( _mutex );
log() << "ShardingState::undoDonateChunk acquired _mutex" << endl;
ChunkManagersMap::const_iterator it = _chunks.find( ns );
verify( it != _chunks.end() ) ;
@@ -634,7 +635,7 @@ namespace mongo {
if ( version < globalVersion && version.hasCompatibleEpoch( globalVersion ) ) {
while ( shardingState.inCriticalMigrateSection() ) {
dbtemprelease r;
sleepmillis(2);
sleepmillis(20);
OCCASIONALLY log() << "waiting till out of critical section" << endl;
}
errmsg = "shard global version for collection is higher than trying to set to '" + ns + "'";

View File

@@ -43,7 +43,7 @@ namespace mongo {
* 1.2.3-rc4-pre-
* If you really need to do something else you'll need to fix _versionArray()
*/
const char versionString[] = "2.2.2-rc0";
const char versionString[] = "2.2.2";
// See unit test for example outputs
static BSONArray _versionArray(const char* version){