SERVER-119065 Add integration tests to join existing unshardCollection and rewriteCollection commands (#49013)
GitOrigin-RevId: b7280934c0968ff453f20ea42932f3d6265e2871
This commit is contained in:
committed by
MongoDB Bot
parent
18b1bfa868
commit
b738baf21a
@@ -2,11 +2,16 @@ import {
|
||||
areViewlessTimeseriesEnabled,
|
||||
getTimeseriesBucketsColl,
|
||||
} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
|
||||
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {getDBNameAndCollNameFromFullNamespace} from "jstests/libs/namespace_utils.js";
|
||||
import {Thread} from "jstests/libs/parallelTester.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
import {extractUUIDFromObject, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
||||
import {
|
||||
extractUUIDFromObject,
|
||||
getUUIDFromConfigCollections,
|
||||
getUUIDFromListCollections,
|
||||
} from "jstests/libs/uuid_util.js";
|
||||
import {awaitRSClientHosts} from "jstests/replsets/rslib.js";
|
||||
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
|
||||
import {isSlowBuild} from "jstests/sharding/libs/sharding_util.js";
|
||||
@@ -387,6 +392,22 @@ export var ReshardingTest = class {
|
||||
assert.neq(undefined, this._sourceCollectionUUID, "createShardedCollection must be called first");
|
||||
return this._sourceCollectionUUID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the mongos connection for this test fixture.
|
||||
*/
|
||||
getMongos() {
|
||||
return this._st.s;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the source namespace for the resharding operation.
|
||||
*/
|
||||
getSourceNamespace() {
|
||||
assert.neq(undefined, this._ns, "createShardedCollection must be called first");
|
||||
return this._ns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reshards an existing collection using the specified new shard key and new chunk ranges.
|
||||
*
|
||||
@@ -541,167 +562,107 @@ export var ReshardingTest = class {
|
||||
|
||||
/**
|
||||
* Moves an existing unsharded collection to toShard.
|
||||
*
|
||||
* @param toShard - shardId of the shard to move to.
|
||||
*
|
||||
* @param duringReshardingFn - a function which optionally accepts the temporary resharding
|
||||
* namespace string. It is only guaranteed to be called after mongos has started running the
|
||||
* reshardCollection command. Callers should use DiscoverTopology.findConnectedNodes() to
|
||||
* introspect the state of the donor or recipient shards if they need more specific
|
||||
* synchronization.
|
||||
*
|
||||
* @param expectedErrorCode - the expected response code for the reshardCollection command.
|
||||
*
|
||||
* @param postCheckConsistencyFn - a function for evaluating additional correctness
|
||||
* assertions. This function is called in the critical section, after the `reshardCollection`
|
||||
* command has shuffled data, but before the coordinator persists a decision.
|
||||
*
|
||||
* @param postDecisionPersistedFn - a function for evaluating addition assertions after
|
||||
* the decision has been persisted, but before the resharding operation finishes and returns
|
||||
* to the client.
|
||||
*
|
||||
* @param afterReshardingFn - a function that will be called after the resharding operation
|
||||
* finishes but before checking the the state post resharding. By the time afterReshardingFn
|
||||
* is called the temporary resharding collection will either have been dropped or renamed.
|
||||
* @param operationArgs.toShard - shardId of the shard to move to.
|
||||
* See _withOperationInBackground for duringReshardingFn and options documentation.
|
||||
*/
|
||||
withMoveCollectionInBackground(
|
||||
{toShard},
|
||||
duringReshardingFn = (tempNs) => {},
|
||||
{
|
||||
expectedErrorCode = ErrorCodes.OK,
|
||||
postCheckConsistencyFn = (tempNs) => {},
|
||||
postDecisionPersistedFn = () => {},
|
||||
afterReshardingFn = () => {},
|
||||
} = {},
|
||||
) {
|
||||
this._opType = "moveCollection";
|
||||
this._startReshardingInBackgroundAndAllowCommandFailure(
|
||||
{newShardKeyPattern: {_id: 1}, toShard: toShard},
|
||||
expectedErrorCode,
|
||||
);
|
||||
|
||||
assert.soon(() => {
|
||||
const op = this._findMoveCollectionCommandOp();
|
||||
return op !== undefined || this._commandDoneSignal.getCount() === 0;
|
||||
}, "failed to find moveCollection in $currentOp output");
|
||||
|
||||
this._callFunctionSafely(() => duringReshardingFn(this._tempNs));
|
||||
this._checkConsistencyAndPostState(
|
||||
expectedErrorCode,
|
||||
() => postCheckConsistencyFn(this._tempNs),
|
||||
() => postDecisionPersistedFn(),
|
||||
() => afterReshardingFn(),
|
||||
);
|
||||
withMoveCollectionInBackground(operationArgs, duringReshardingFn, options = {}) {
|
||||
this._withOperationInBackground("moveCollection", operationArgs, duringReshardingFn, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unshards an existing sharded collection to toShard.
|
||||
*
|
||||
* @param toShard (Optional) - shardId of the shard to unshard to.
|
||||
*
|
||||
* @param duringReshardingFn - a function which optionally accepts the temporary resharding
|
||||
* namespace string. It is only guaranteed to be called after mongos has started running the
|
||||
* reshardCollection command. Callers should use DiscoverTopology.findConnectedNodes() to
|
||||
* introspect the state of the donor or recipient shards if they need more specific
|
||||
* synchronization.
|
||||
*
|
||||
* @param expectedErrorCode - the expected response code for the reshardCollection command.
|
||||
*
|
||||
* @param postCheckConsistencyFn - a function for evaluating additional correctness
|
||||
* assertions. This function is called in the critical section, after the `reshardCollection`
|
||||
* command has shuffled data, but before the coordinator persists a decision.
|
||||
*
|
||||
* @param postDecisionPersistedFn - a function for evaluating addition assertions after
|
||||
* the decision has been persisted, but before the resharding operation finishes and returns
|
||||
* to the client.
|
||||
*
|
||||
* @param afterReshardingFn - a function that will be called after the resharding operation
|
||||
* finishes but before checking the the state post resharding. By the time afterReshardingFn
|
||||
* is called the temporary resharding collection will either have been dropped or renamed.
|
||||
* @param operationArgs.toShard (Optional) - shardId of the shard to unshard to.
|
||||
* See _withOperationInBackground for duringReshardingFn and options documentation.
|
||||
*/
|
||||
withUnshardCollectionInBackground(
|
||||
{toShard},
|
||||
duringReshardingFn = (tempNs) => {},
|
||||
{
|
||||
expectedErrorCode = ErrorCodes.OK,
|
||||
postCheckConsistencyFn = (tempNs) => {},
|
||||
postDecisionPersistedFn = () => {},
|
||||
afterReshardingFn = () => {},
|
||||
} = {},
|
||||
) {
|
||||
this._opType = "unshardCollection";
|
||||
this._startReshardingInBackgroundAndAllowCommandFailure(
|
||||
{newShardKeyPattern: {_id: 1}, toShard: toShard},
|
||||
expectedErrorCode,
|
||||
);
|
||||
withUnshardCollectionInBackground(operationArgs, duringReshardingFn, options = {}) {
|
||||
this._withOperationInBackground("unshardCollection", operationArgs, duringReshardingFn, options);
|
||||
}
|
||||
|
||||
assert.soon(() => {
|
||||
const op = this._findUnshardCollectionCommandOp();
|
||||
return op !== undefined || this._commandDoneSignal.getCount() === 0;
|
||||
}, "failed to find unshardCollection in $currentOp output");
|
||||
|
||||
this._callFunctionSafely(() => duringReshardingFn(this._tempNs));
|
||||
this._checkConsistencyAndPostState(
|
||||
expectedErrorCode,
|
||||
() => postCheckConsistencyFn(this._tempNs),
|
||||
() => postDecisionPersistedFn(),
|
||||
() => afterReshardingFn(),
|
||||
);
|
||||
/**
|
||||
* Rewrites an existing sharded collection on its existing shard key.
|
||||
* @param operationArgs.newChunks - an array of {min, max, shard} objects defining chunk
|
||||
* distribution.
|
||||
* See _withOperationInBackground for duringReshardingFn and options documentation.
|
||||
*/
|
||||
withRewriteCollectionInBackground(operationArgs, duringReshardingFn, options = {}) {
|
||||
this._withOperationInBackground("rewriteCollection", operationArgs, duringReshardingFn, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reshards an existing collection using the specified new shard key and new chunk ranges.
|
||||
*
|
||||
* @param newChunks - an array of
|
||||
* {min: <shardKeyValue0>, max: <shardKeyValue1>, shard: <shardName>} objects. The chunks must
|
||||
* form a partition of the {shardKey: MinKey} --> {shardKey: MaxKey} space.
|
||||
*
|
||||
* @param duringReshardingFn - a function which optionally accepts the temporary resharding
|
||||
* namespace string. It is only guaranteed to be called after mongos has started running the
|
||||
* reshardCollection command. Callers should use DiscoverTopology.findConnectedNodes() to
|
||||
* introspect the state of the donor or recipient shards if they need more specific
|
||||
* synchronization.
|
||||
*
|
||||
* @param expectedErrorCode - the expected response code for the reshardCollection command.
|
||||
*
|
||||
* @param postCheckConsistencyFn - a function for evaluating additional correctness
|
||||
* assertions. This function is called in the critical section, after the `reshardCollection`
|
||||
* command has shuffled data, but before the coordinator persists a decision.
|
||||
*
|
||||
* @param postDecisionPersistedFn - a function for evaluating addition assertions after
|
||||
* the decision has been persisted, but before the resharding operation finishes and returns
|
||||
* to the client.
|
||||
*
|
||||
* @param afterReshardingFn - a function that will be called after the resharding operation
|
||||
* finishes but before checking the the state post resharding. By the time afterReshardingFn
|
||||
* is called the temporary resharding collection will either have been dropped or renamed.
|
||||
* @param operationArgs.newShardKeyPattern - the new shard key pattern to use.
|
||||
* @param operationArgs.newChunks - an array of {min, max, shard} objects defining chunk
|
||||
* distribution.
|
||||
* @param operationArgs.forceRedistribution - if true, forces data redistribution even if
|
||||
* shard key is same.
|
||||
* @param operationArgs.reshardingUUID - optional UUID for the resharding operation.
|
||||
* @param operationArgs.performVerification - if true, performs verification after resharding.
|
||||
* See _withOperationInBackground for duringReshardingFn and options documentation.
|
||||
*/
|
||||
withReshardingInBackground(
|
||||
{newShardKeyPattern, newChunks, forceRedistribution, reshardingUUID, performVerification},
|
||||
duringReshardingFn = (tempNs) => {},
|
||||
{
|
||||
withReshardingInBackground(operationArgs, duringReshardingFn, options = {}) {
|
||||
this._withOperationInBackground("reshardCollection", operationArgs, duringReshardingFn, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a resharding-family operation in the background and invokes callback functions at
|
||||
* various stages of the operation.
|
||||
*
|
||||
* @private
|
||||
* @param opType - the operation type: "reshardCollection", "moveCollection",
|
||||
* "unshardCollection", or "rewriteCollection"
|
||||
* @param operationArgs - operation-specific arguments object. The following properties are
|
||||
* supported depending on the operation type:
|
||||
* - toShard: shardId of the target shard (moveCollection, unshardCollection)
|
||||
* - newShardKeyPattern: the new shard key pattern (reshardCollection; auto-set for others)
|
||||
* - newChunks: array of {min, max, shard} objects defining chunk distribution
|
||||
* (reshardCollection, rewriteCollection)
|
||||
* - forceRedistribution: force data redistribution even if shard key is same (reshardCollection)
|
||||
* - reshardingUUID: optional UUID for the resharding operation (reshardCollection)
|
||||
* - performVerification: perform verification after resharding (reshardCollection)
|
||||
* @param duringReshardingFn - a function which optionally accepts the temporary resharding
|
||||
* namespace string. It is only guaranteed to be called after
|
||||
* mongos has started running the command. Callers should use
|
||||
* DiscoverTopology.findConnectedNodes() to introspect the state of
|
||||
* the donor or recipient shards if they need more specific
|
||||
* synchronization. Defaults to a no-op.
|
||||
* @param options - optional configuration object with the following properties:
|
||||
* - expectedErrorCode: the expected response code for the command. Defaults to ErrorCodes.OK.
|
||||
* - postCheckConsistencyFn: a function for evaluating additional correctness assertions.
|
||||
* Called in the critical section after data is shuffled but before
|
||||
* the coordinator persists a decision. Receives tempNs. Defaults to
|
||||
* a no-op.
|
||||
* - postDecisionPersistedFn: a function for evaluating additional assertions after the
|
||||
* decision has been persisted but before the operation returns.
|
||||
* Defaults to a no-op.
|
||||
* - afterReshardingFn: a function called after the operation finishes but before checking
|
||||
* post-resharding state. The temporary collection will have been dropped
|
||||
* or renamed by this point. Defaults to a no-op.
|
||||
*/
|
||||
_withOperationInBackground(opType, operationArgs, duringReshardingFn, options) {
|
||||
const {
|
||||
expectedErrorCode = ErrorCodes.OK,
|
||||
postCheckConsistencyFn = (tempNs) => {},
|
||||
postDecisionPersistedFn = () => {},
|
||||
afterReshardingFn = () => {},
|
||||
} = {},
|
||||
) {
|
||||
this._opType = "reshardCollection";
|
||||
this._startReshardingInBackgroundAndAllowCommandFailure(
|
||||
{
|
||||
newShardKeyPattern,
|
||||
newChunks,
|
||||
forceRedistribution,
|
||||
reshardingUUID,
|
||||
performVerification,
|
||||
},
|
||||
expectedErrorCode,
|
||||
);
|
||||
} = options;
|
||||
duringReshardingFn = duringReshardingFn ?? ((tempNs) => {});
|
||||
|
||||
// Apply default newShardKeyPattern based on operation type.
|
||||
if (opType === "moveCollection" || opType === "unshardCollection") {
|
||||
operationArgs = {newShardKeyPattern: {_id: 1}, ...operationArgs};
|
||||
} else if (opType === "rewriteCollection") {
|
||||
operationArgs = {newShardKeyPattern: this._currentShardKey, ...operationArgs};
|
||||
}
|
||||
|
||||
this._opType = opType;
|
||||
this._startReshardingInBackgroundAndAllowCommandFailure(operationArgs, expectedErrorCode);
|
||||
|
||||
// For reshardCollection, provenance is undefined; for others, it matches the opType
|
||||
const provenance = opType === "reshardCollection" ? undefined : opType;
|
||||
assert.soon(() => {
|
||||
const op = this._findReshardingCommandOp();
|
||||
const op = this._findReshardingCommandWithProvenance(provenance);
|
||||
return op !== undefined || this._commandDoneSignal.getCount() === 0;
|
||||
}, "failed to find reshardCollection in $currentOp output");
|
||||
}, `failed to find ${opType} in $currentOp output`);
|
||||
|
||||
this._callFunctionSafely(() => duringReshardingFn(this._tempNs));
|
||||
this._checkConsistencyAndPostState(
|
||||
@@ -713,40 +674,24 @@ export var ReshardingTest = class {
|
||||
}
|
||||
|
||||
/** @private */
|
||||
_findMoveCollectionCommandOp() {
|
||||
const filter = {
|
||||
type: "op",
|
||||
"originatingCommand.reshardCollection": this._underlyingSourceNs,
|
||||
"provenance": "moveCollection",
|
||||
};
|
||||
_findReshardingCommandWithProvenance(provenance) {
|
||||
let localOps;
|
||||
let filter;
|
||||
if (provenance === undefined) {
|
||||
localOps = true;
|
||||
filter = {"command.reshardCollection": this._ns};
|
||||
} else {
|
||||
localOps = false;
|
||||
filter = {
|
||||
type: "op",
|
||||
"originatingCommand.reshardCollection": this._underlyingSourceNs,
|
||||
provenance,
|
||||
};
|
||||
}
|
||||
|
||||
return this._st.s
|
||||
.getDB("admin")
|
||||
.aggregate([{$currentOp: {allUsers: true, localOps: false}}, {$match: filter}])
|
||||
.toArray()[0];
|
||||
}
|
||||
|
||||
/** @private */
|
||||
_findUnshardCollectionCommandOp() {
|
||||
const filter = {
|
||||
type: "op",
|
||||
"originatingCommand.reshardCollection": this._underlyingSourceNs,
|
||||
"provenance": "unshardCollection",
|
||||
};
|
||||
|
||||
return this._st.s
|
||||
.getDB("admin")
|
||||
.aggregate([{$currentOp: {allUsers: true, localOps: false}}, {$match: filter}])
|
||||
.toArray()[0];
|
||||
}
|
||||
|
||||
/** @private */
|
||||
_findReshardingCommandOp() {
|
||||
return this._st.admin
|
||||
.aggregate([
|
||||
{$currentOp: {allUsers: true, localOps: true}},
|
||||
{$match: {"command.reshardCollection": this._ns}},
|
||||
])
|
||||
.aggregate([{$currentOp: {allUsers: true, localOps}}, {$match: filter}])
|
||||
.toArray()[0];
|
||||
}
|
||||
|
||||
@@ -788,7 +733,7 @@ export var ReshardingTest = class {
|
||||
}
|
||||
|
||||
try {
|
||||
const op = this._findReshardingCommandOp();
|
||||
const op = this._findReshardingCommandWithProvenance();
|
||||
if (op !== undefined) {
|
||||
assert.commandWorked(this._st.admin.killOp(op.opid));
|
||||
}
|
||||
@@ -821,7 +766,7 @@ export var ReshardingTest = class {
|
||||
}
|
||||
|
||||
interruptReshardingThread() {
|
||||
const op = this._findReshardingCommandOp();
|
||||
const op = this._findReshardingCommandWithProvenance();
|
||||
assert.neq(undefined, op, "failed to find reshardCollection in $currentOp output");
|
||||
assert.commandWorked(this._st.admin.killOp(op.opid));
|
||||
}
|
||||
|
||||
87
jstests/sharding/libs/resharding_test_joins_operation.js
Normal file
87
jstests/sharding/libs/resharding_test_joins_operation.js
Normal file
@@ -0,0 +1,87 @@
|
||||
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {getUUIDFromConfigCollections, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
||||
|
||||
/**
|
||||
* Tests that if a resharding-family command is issued while there is an ongoing operation
|
||||
* for the same collection, the command joins with the ongoing instance.
|
||||
*
|
||||
* This is a generic helper that consolidates the common test pattern across:
|
||||
* - reshardCollection
|
||||
* - moveCollection
|
||||
* - unshardCollection
|
||||
* - rewriteCollection
|
||||
*
|
||||
* @param reshardingTest - a ReshardingTest instance that has been set up with a collection
|
||||
* @param opType - the operation type: "reshardCollection", "moveCollection",
|
||||
* "unshardCollection", or "rewriteCollection"
|
||||
* @param operationArgs - arguments for the background operation (e.g., newShardKeyPattern,
|
||||
* newChunks, toShard)
|
||||
* @param makeJoiningThreadFn - a function that returns a Thread object to run the joining
|
||||
* command. Receives (mongosHost, ns, extraArgs) as parameters.
|
||||
* @param joiningThreadExtraArgs - extra arguments to pass to makeJoiningThreadFn (e.g., toShard)
|
||||
*/
|
||||
export function runJoinsExistingOperationTest(
|
||||
reshardingTest,
|
||||
{opType, operationArgs, makeJoiningThreadFn, joiningThreadExtraArgs = {}},
|
||||
) {
|
||||
const mongos = reshardingTest.getMongos();
|
||||
const ns = reshardingTest.getSourceNamespace();
|
||||
const topology = DiscoverTopology.findConnectedNodes(mongos);
|
||||
const donorShardNames = reshardingTest.donorShardNames;
|
||||
|
||||
const getTempUUID = (tempNs) => {
|
||||
const tempCollection = mongos.getCollection(tempNs);
|
||||
return getUUIDFromConfigCollections(mongos, tempCollection.getFullName());
|
||||
};
|
||||
|
||||
// All operations use the same failpoints on the donor shard.
|
||||
const donorShardConn = new Mongo(topology.shards[donorShardNames[0]].nodes[0]);
|
||||
|
||||
const pauseFailpoint = configureFailPoint(donorShardConn, "reshardingPauseRecipientDuringCloning");
|
||||
const shorterLockTimeout = configureFailPoint(donorShardConn, "overrideDDLLockTimeout", {"timeoutMillisecs": 500});
|
||||
|
||||
const joiningThread = makeJoiningThreadFn(mongos.host, ns, joiningThreadExtraArgs);
|
||||
|
||||
// Fulfilled once the first command creates the temporary collection.
|
||||
let expectedUUIDAfterReshardingCompletes = undefined;
|
||||
|
||||
// Select the appropriate with*InBackground method.
|
||||
const withOperationInBackgroundFn = {
|
||||
"reshardCollection": (args, fn) => reshardingTest.withReshardingInBackground(args, fn),
|
||||
"moveCollection": (args, fn) => reshardingTest.withMoveCollectionInBackground(args, fn),
|
||||
"unshardCollection": (args, fn) => reshardingTest.withUnshardCollectionInBackground(args, fn),
|
||||
"rewriteCollection": (args, fn) => reshardingTest.withRewriteCollectionInBackground(args, fn),
|
||||
}[opType];
|
||||
|
||||
assert(withOperationInBackgroundFn, `Unknown operation type: ${opType}`);
|
||||
|
||||
withOperationInBackgroundFn(operationArgs, (tempNs) => {
|
||||
pauseFailpoint.wait();
|
||||
|
||||
// The UUID of the temporary resharding collection should become the UUID of the
|
||||
// original collection once resharding has completed.
|
||||
expectedUUIDAfterReshardingCompletes = getTempUUID(tempNs);
|
||||
|
||||
const joinedFP = configureFailPoint(donorShardConn, "shardsvrReshardCollectionJoinedExistingOperation");
|
||||
|
||||
joiningThread.start();
|
||||
|
||||
// Hitting the joined failpoint is confirmation that the command (same collection
|
||||
// as the ongoing operation) gets joined with the ongoing operation.
|
||||
joinedFP.wait();
|
||||
|
||||
joinedFP.off();
|
||||
pauseFailpoint.off();
|
||||
});
|
||||
|
||||
joiningThread.join();
|
||||
shorterLockTimeout.off();
|
||||
|
||||
// Confirm the UUID for the namespace matches the temporary collection's UUID before
|
||||
// the second command was issued.
|
||||
assert.neq(expectedUUIDAfterReshardingCompletes, undefined);
|
||||
const sourceCollection = mongos.getCollection(ns);
|
||||
const finalSourceCollectionUUID = getUUIDFromListCollections(sourceCollection.getDB(), sourceCollection.getName());
|
||||
assert.eq(expectedUUIDAfterReshardingCompletes, finalSourceCollectionUUID);
|
||||
}
|
||||
@@ -4,33 +4,27 @@
|
||||
* ongoing moveCollection instance.
|
||||
*
|
||||
* @tags: [
|
||||
* does_not_support_stepdowns,
|
||||
* uses_atclustertime,
|
||||
* ]
|
||||
*/
|
||||
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {Thread} from "jstests/libs/parallelTester.js";
|
||||
import {getUUIDFromConfigCollections, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
||||
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
||||
import {runJoinsExistingOperationTest} from "jstests/sharding/libs/resharding_test_joins_operation.js";
|
||||
|
||||
// Generates a new thread to run moveCollection.
|
||||
const makeMoveCollectionThread = (mongoSConnectionString, ns, toShard) => {
|
||||
const makeMoveCollectionThread = (mongosHost, ns, extraArgs) => {
|
||||
return new Thread(
|
||||
(mongoSConnectionString, ns, toShard) => {
|
||||
const mongoS = new Mongo(mongoSConnectionString);
|
||||
(mongosHost, ns, toShard) => {
|
||||
const mongoS = new Mongo(mongosHost);
|
||||
assert.commandWorked(mongoS.adminCommand({moveCollection: ns, toShard: toShard}));
|
||||
},
|
||||
mongoSConnectionString,
|
||||
mongosHost,
|
||||
ns,
|
||||
toShard,
|
||||
extraArgs.toShard,
|
||||
);
|
||||
};
|
||||
|
||||
const getTempUUID = (tempNs) => {
|
||||
const tempCollection = mongos.getCollection(tempNs);
|
||||
return getUUIDFromConfigCollections(mongos, tempCollection.getFullName());
|
||||
};
|
||||
|
||||
const reshardingTest = new ReshardingTest();
|
||||
reshardingTest.setup();
|
||||
const donorShardNames = reshardingTest.donorShardNames;
|
||||
@@ -40,49 +34,11 @@ const sourceCollection = reshardingTest.createUnshardedCollection({
|
||||
primaryShardName: donorShardNames[0],
|
||||
});
|
||||
|
||||
const mongos = sourceCollection.getMongo();
|
||||
const topology = DiscoverTopology.findConnectedNodes(mongos);
|
||||
const donorShard = new Mongo(topology.shards[donorShardNames[0]].nodes[0]);
|
||||
|
||||
const pauseDuringCloning = configureFailPoint(donorShard, "reshardingPauseRecipientDuringCloning");
|
||||
const shorterLockTimeout = configureFailPoint(donorShard, "overrideDDLLockTimeout", {"timeoutMillisecs": 500});
|
||||
|
||||
const moveCollectionThread = makeMoveCollectionThread(
|
||||
mongos.host,
|
||||
sourceCollection.getFullName(),
|
||||
recipientShardNames[0],
|
||||
);
|
||||
|
||||
// Fulfilled once the first reshardCollection command creates the temporary collection.
|
||||
let expectedUUIDAfterReshardingCompletes = undefined;
|
||||
|
||||
reshardingTest.withMoveCollectionInBackground({toShard: recipientShardNames[0]}, (tempNs) => {
|
||||
pauseDuringCloning.wait();
|
||||
|
||||
// The UUID of the temporary resharding collection should become the UUID of the original
|
||||
// collection once resharding has completed.
|
||||
expectedUUIDAfterReshardingCompletes = getTempUUID(tempNs);
|
||||
|
||||
const moveCollectionJoinedFP = configureFailPoint(donorShard, "shardsvrReshardCollectionJoinedExistingOperation");
|
||||
|
||||
moveCollectionThread.start();
|
||||
|
||||
// Hitting the reshardCollectionJoinedFP is additional confirmation that
|
||||
// _configsvrReshardCollection command (identical resharding key and collection as the
|
||||
// ongoing operation) gets joined with the ongoing resharding operation.
|
||||
moveCollectionJoinedFP.wait();
|
||||
|
||||
moveCollectionJoinedFP.off();
|
||||
pauseDuringCloning.off();
|
||||
runJoinsExistingOperationTest(reshardingTest, {
|
||||
opType: "moveCollection",
|
||||
operationArgs: {toShard: recipientShardNames[0]},
|
||||
makeJoiningThreadFn: makeMoveCollectionThread,
|
||||
joiningThreadExtraArgs: {toShard: recipientShardNames[0]},
|
||||
});
|
||||
|
||||
moveCollectionThread.join();
|
||||
shorterLockTimeout.off();
|
||||
|
||||
// Confirm the UUID for the namespace that was resharded is the same as the temporary collection's
|
||||
// UUID before the second reshardCollection command was issued.
|
||||
assert.neq(expectedUUIDAfterReshardingCompletes, undefined);
|
||||
const finalSourceCollectionUUID = getUUIDFromListCollections(sourceCollection.getDB(), sourceCollection.getName());
|
||||
assert.eq(expectedUUIDAfterReshardingCompletes, finalSourceCollectionUUID);
|
||||
|
||||
reshardingTest.teardown();
|
||||
|
||||
@@ -1,47 +1,39 @@
|
||||
/**
|
||||
* Tests that if a _configsvrReshardCollection command is issued while there is an ongoing
|
||||
* resharding operation for the same collection with the same resharding key, the command joins with
|
||||
* the ongoing resharding instance.
|
||||
*
|
||||
* Use _configsvrReshardCollection instead of reshardCollection to exercise the behavior of the
|
||||
* config server in the absence of the DDL lock taken by _shardsvrReshardCollection on the
|
||||
* primary shard for the database.
|
||||
* Tests that if a reshardCollection command is issued while there is an ongoing resharding
|
||||
* operation for the same collection with the same resharding key, the command joins with the
|
||||
* ongoing resharding instance.
|
||||
*
|
||||
* @tags: [
|
||||
* does_not_support_stepdowns,
|
||||
* uses_atclustertime,
|
||||
* ]
|
||||
*/
|
||||
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {Thread} from "jstests/libs/parallelTester.js";
|
||||
import {getUUIDFromConfigCollections, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
||||
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
||||
import {runJoinsExistingOperationTest} from "jstests/sharding/libs/resharding_test_joins_operation.js";
|
||||
|
||||
// Generates a new thread to run _configsvrReshardCollection.
|
||||
const makeConfigsvrReshardCollectionThread = (configsvrConnString, ns) => {
|
||||
// Generates a new thread to run reshardCollection.
|
||||
// The joining command must pass the same arguments as the background operation to be recognized
|
||||
// as a duplicate that should join rather than conflict. This includes _presetReshardedChunks.
|
||||
const makeReshardCollectionThread = (mongosHost, ns, extraArgs) => {
|
||||
return new Thread(
|
||||
(configsvrConnString, ns) => {
|
||||
const configsvr = new Mongo(configsvrConnString);
|
||||
(mongosHost, ns, newShardKey, presetReshardedChunks) => {
|
||||
const mongoS = new Mongo(mongosHost);
|
||||
assert.commandWorked(
|
||||
configsvr.adminCommand({
|
||||
_configsvrReshardCollection: ns,
|
||||
key: {newKey: 1},
|
||||
numInitialChunks: 1,
|
||||
writeConcern: {w: "majority"},
|
||||
provenance: "reshardCollection",
|
||||
mongoS.adminCommand({
|
||||
reshardCollection: ns,
|
||||
key: newShardKey,
|
||||
_presetReshardedChunks: presetReshardedChunks,
|
||||
}),
|
||||
);
|
||||
},
|
||||
configsvrConnString,
|
||||
mongosHost,
|
||||
ns,
|
||||
extraArgs.newShardKey,
|
||||
extraArgs.presetReshardedChunks,
|
||||
);
|
||||
};
|
||||
|
||||
const getTempUUID = (tempNs) => {
|
||||
const tempCollection = mongos.getCollection(tempNs);
|
||||
return getUUIDFromConfigCollections(mongos, tempCollection.getFullName());
|
||||
};
|
||||
|
||||
const reshardingTest = new ReshardingTest({numDonors: 1});
|
||||
reshardingTest.setup();
|
||||
const donorShardNames = reshardingTest.donorShardNames;
|
||||
@@ -52,52 +44,22 @@ const sourceCollection = reshardingTest.createShardedCollection({
|
||||
chunks: [{min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorShardNames[0]}],
|
||||
});
|
||||
|
||||
const mongos = sourceCollection.getMongo();
|
||||
const topology = DiscoverTopology.findConnectedNodes(mongos);
|
||||
const configsvr = new Mongo(topology.configsvr.nodes[0]);
|
||||
// The newChunks format uses 'shard' but the command expects 'recipientShardId'.
|
||||
const newChunks = [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}];
|
||||
const presetReshardedChunks = newChunks.map((chunk) => ({
|
||||
min: chunk.min,
|
||||
max: chunk.max,
|
||||
recipientShardId: chunk.shard,
|
||||
}));
|
||||
|
||||
const pauseBeforeCloningFP = configureFailPoint(configsvr, "reshardingPauseCoordinatorBeforeCloning");
|
||||
|
||||
const configsvrReshardCollectionThread = makeConfigsvrReshardCollectionThread(
|
||||
topology.configsvr.nodes[0],
|
||||
sourceCollection.getFullName(),
|
||||
);
|
||||
|
||||
// Fulfilled once the first reshardCollection command creates the temporary collection.
|
||||
let expectedUUIDAfterReshardingCompletes = undefined;
|
||||
|
||||
reshardingTest.withReshardingInBackground(
|
||||
{
|
||||
runJoinsExistingOperationTest(reshardingTest, {
|
||||
opType: "reshardCollection",
|
||||
operationArgs: {
|
||||
newShardKeyPattern: {newKey: 1},
|
||||
newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}],
|
||||
newChunks: newChunks,
|
||||
},
|
||||
(tempNs) => {
|
||||
pauseBeforeCloningFP.wait();
|
||||
|
||||
// The UUID of the temporary resharding collection should become the UUID of the original
|
||||
// collection once resharding has completed.
|
||||
expectedUUIDAfterReshardingCompletes = getTempUUID(tempNs);
|
||||
|
||||
const reshardCollectionJoinedFP = configureFailPoint(configsvr, "reshardCollectionJoinedExistingOperation");
|
||||
|
||||
configsvrReshardCollectionThread.start();
|
||||
|
||||
// Hitting the reshardCollectionJoinedFP is additional confirmation that
|
||||
// _configsvrReshardCollection command (identical resharding key and collection as the
|
||||
// ongoing operation) gets joined with the ongoing resharding operation.
|
||||
reshardCollectionJoinedFP.wait();
|
||||
|
||||
reshardCollectionJoinedFP.off();
|
||||
pauseBeforeCloningFP.off();
|
||||
},
|
||||
);
|
||||
|
||||
configsvrReshardCollectionThread.join();
|
||||
|
||||
// Confirm the UUID for the namespace that was resharded is the same as the temporary collection's
|
||||
// UUID before the second reshardCollection command was issued.
|
||||
assert.neq(expectedUUIDAfterReshardingCompletes, undefined);
|
||||
const finalSourceCollectionUUID = getUUIDFromListCollections(sourceCollection.getDB(), sourceCollection.getName());
|
||||
assert.eq(expectedUUIDAfterReshardingCompletes, finalSourceCollectionUUID);
|
||||
makeJoiningThreadFn: makeReshardCollectionThread,
|
||||
joiningThreadExtraArgs: {newShardKey: {newKey: 1}, presetReshardedChunks: presetReshardedChunks},
|
||||
});
|
||||
|
||||
reshardingTest.teardown();
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
/**
|
||||
* Tests that if a _configsvrReshardCollection command is issued while there is an ongoing
|
||||
* resharding operation for the same collection with the same resharding key, the command joins with
|
||||
* the ongoing resharding instance.
|
||||
*
|
||||
* Use _configsvrReshardCollection instead of reshardCollection to exercise the behavior of the
|
||||
* config server in the absence of the DDL lock taken by _shardsvrReshardCollection on the
|
||||
* primary shard for the database.
|
||||
*
|
||||
* @tags: [
|
||||
* does_not_support_stepdowns,
|
||||
* uses_atclustertime,
|
||||
* ]
|
||||
*/
|
||||
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
||||
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
||||
import {Thread} from "jstests/libs/parallelTester.js";
|
||||
import {getUUIDFromConfigCollections, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
||||
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
||||
|
||||
// Generates a new thread to run _configsvrReshardCollection.
|
||||
const makeConfigsvrReshardCollectionThread = (configsvrConnString, ns) => {
|
||||
return new Thread(
|
||||
(configsvrConnString, ns) => {
|
||||
const configsvr = new Mongo(configsvrConnString);
|
||||
assert.commandWorked(
|
||||
configsvr.adminCommand({
|
||||
_configsvrReshardCollection: ns,
|
||||
key: {newKey: 1},
|
||||
numInitialChunks: 1,
|
||||
writeConcern: {w: "majority"},
|
||||
provenance: "reshardCollection",
|
||||
}),
|
||||
);
|
||||
},
|
||||
configsvrConnString,
|
||||
ns,
|
||||
);
|
||||
};
|
||||
|
||||
const getTempUUID = (tempNs) => {
|
||||
const tempCollection = mongos.getCollection(tempNs);
|
||||
return getUUIDFromConfigCollections(mongos, tempCollection.getFullName());
|
||||
};
|
||||
|
||||
const reshardingTest = new ReshardingTest({numDonors: 1});
|
||||
reshardingTest.setup();
|
||||
const donorShardNames = reshardingTest.donorShardNames;
|
||||
const recipientShardNames = reshardingTest.recipientShardNames;
|
||||
const sourceCollection = reshardingTest.createShardedCollection({
|
||||
ns: "reshardingDb.coll",
|
||||
shardKeyPattern: {oldKey: 1},
|
||||
chunks: [{min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorShardNames[0]}],
|
||||
});
|
||||
|
||||
const mongos = sourceCollection.getMongo();
|
||||
const topology = DiscoverTopology.findConnectedNodes(mongos);
|
||||
const configsvr = new Mongo(topology.configsvr.nodes[0]);
|
||||
|
||||
const pauseBeforeCloningFP = configureFailPoint(configsvr, "reshardingPauseCoordinatorBeforeCloning");
|
||||
|
||||
const configsvrReshardCollectionThread = makeConfigsvrReshardCollectionThread(
|
||||
topology.configsvr.nodes[0],
|
||||
sourceCollection.getFullName(),
|
||||
);
|
||||
|
||||
// Fulfilled once the first reshardCollection command creates the temporary collection.
|
||||
let expectedUUIDAfterReshardingCompletes = undefined;
|
||||
|
||||
reshardingTest.withReshardingInBackground(
|
||||
{
|
||||
newShardKeyPattern: {newKey: 1},
|
||||
newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}],
|
||||
},
|
||||
(tempNs) => {
|
||||
pauseBeforeCloningFP.wait();
|
||||
|
||||
// The UUID of the temporary resharding collection should become the UUID of the original
|
||||
// collection once resharding has completed.
|
||||
expectedUUIDAfterReshardingCompletes = getTempUUID(tempNs);
|
||||
|
||||
const reshardCollectionJoinedFP = configureFailPoint(configsvr, "reshardCollectionJoinedExistingOperation");
|
||||
|
||||
configsvrReshardCollectionThread.start();
|
||||
|
||||
// Hitting the reshardCollectionJoinedFP is additional confirmation that
|
||||
// _configsvrReshardCollection command (identical resharding key and collection as the
|
||||
// ongoing operation) gets joined with the ongoing resharding operation.
|
||||
reshardCollectionJoinedFP.wait();
|
||||
|
||||
reshardCollectionJoinedFP.off();
|
||||
pauseBeforeCloningFP.off();
|
||||
},
|
||||
);
|
||||
|
||||
configsvrReshardCollectionThread.join();
|
||||
|
||||
// Confirm the UUID for the namespace that was resharded is the same as the temporary collection's
|
||||
// UUID before the second reshardCollection command was issued.
|
||||
assert.neq(expectedUUIDAfterReshardingCompletes, undefined);
|
||||
const finalSourceCollectionUUID = getUUIDFromListCollections(sourceCollection.getDB(), sourceCollection.getName());
|
||||
assert.eq(expectedUUIDAfterReshardingCompletes, finalSourceCollectionUUID);
|
||||
|
||||
reshardingTest.teardown();
|
||||
@@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Tests that if a rewriteCollection command is issued while there is an ongoing rewriteCollection
|
||||
* operation for the same collection, the command joins with the ongoing rewriteCollection instance.
|
||||
*
|
||||
* @tags: [
|
||||
* does_not_support_stepdowns,
|
||||
* requires_fcv_83,
|
||||
* uses_atclustertime,
|
||||
* ]
|
||||
*/
|
||||
import {Thread} from "jstests/libs/parallelTester.js";
|
||||
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
||||
import {runJoinsExistingOperationTest} from "jstests/sharding/libs/resharding_test_joins_operation.js";
|
||||
|
||||
// Generates a new thread to run rewriteCollection.
|
||||
const makeRewriteCollectionThread = (mongosHost, ns) => {
|
||||
return new Thread(
|
||||
(mongosHost, ns) => {
|
||||
const mongoS = new Mongo(mongosHost);
|
||||
assert.commandWorked(mongoS.adminCommand({rewriteCollection: ns}));
|
||||
},
|
||||
mongosHost,
|
||||
ns,
|
||||
);
|
||||
};
|
||||
|
||||
// Use default ReshardingTest (2 shards: 1 donor + 1 recipient) to ensure enough cardinality
|
||||
// for the resharding coordinator to create chunks without hitting sampling issues.
|
||||
const reshardingTest = new ReshardingTest();
|
||||
reshardingTest.setup();
|
||||
const donorShardNames = reshardingTest.donorShardNames;
|
||||
|
||||
// Create a sharded collection - rewriteCollection requires a sharded collection as input.
|
||||
// Use {oldKey: 1} as the shard key to match the pattern of other resharding tests.
|
||||
const sourceCollection = reshardingTest.createShardedCollection({
|
||||
ns: "reshardingDb.coll",
|
||||
shardKeyPattern: {oldKey: 1},
|
||||
chunks: [{min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorShardNames[0]}],
|
||||
});
|
||||
|
||||
// Insert documents with sufficient cardinality in the shard key field.
|
||||
// The rewriteCollection command uses resharding which by default tries to create ~90 chunks.
|
||||
// Without enough unique shard key values, it fails with a cardinality error.
|
||||
const bulkOp = sourceCollection.initializeUnorderedBulkOp();
|
||||
for (let i = -500; i < 500; ++i) {
|
||||
bulkOp.insert({oldKey: i});
|
||||
}
|
||||
assert.commandWorked(bulkOp.execute());
|
||||
|
||||
runJoinsExistingOperationTest(reshardingTest, {
|
||||
opType: "rewriteCollection",
|
||||
operationArgs: {
|
||||
newChunks: [{min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorShardNames[0]}],
|
||||
},
|
||||
makeJoiningThreadFn: makeRewriteCollectionThread,
|
||||
});
|
||||
|
||||
reshardingTest.teardown();
|
||||
@@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Tests that if an unshardCollection command is issued while there is an ongoing unshardCollection
|
||||
* operation for the same collection with the same destination shard, the command joins with the
|
||||
* ongoing unshardCollection instance.
|
||||
*
|
||||
* @tags: [
|
||||
* does_not_support_stepdowns,
|
||||
* featureFlagUnshardCollection,
|
||||
* requires_fcv_80,
|
||||
* uses_atclustertime,
|
||||
* ]
|
||||
*/
|
||||
import {Thread} from "jstests/libs/parallelTester.js";
|
||||
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
||||
import {runJoinsExistingOperationTest} from "jstests/sharding/libs/resharding_test_joins_operation.js";
|
||||
|
||||
// Generates a new thread to run unshardCollection.
|
||||
const makeUnshardCollectionThread = (mongosHost, ns, extraArgs) => {
|
||||
return new Thread(
|
||||
(mongosHost, ns, toShard) => {
|
||||
const mongoS = new Mongo(mongosHost);
|
||||
assert.commandWorked(mongoS.adminCommand({unshardCollection: ns, toShard: toShard}));
|
||||
},
|
||||
mongosHost,
|
||||
ns,
|
||||
extraArgs.toShard,
|
||||
);
|
||||
};
|
||||
|
||||
const reshardingTest = new ReshardingTest();
|
||||
reshardingTest.setup();
|
||||
const donorShardNames = reshardingTest.donorShardNames;
|
||||
const recipientShardNames = reshardingTest.recipientShardNames;
|
||||
|
||||
// Create a sharded collection - unshardCollection requires a sharded collection as input
|
||||
const sourceCollection = reshardingTest.createShardedCollection({
|
||||
ns: "reshardingDb.coll",
|
||||
shardKeyPattern: {oldKey: 1},
|
||||
chunks: [{min: {oldKey: MinKey}, max: {oldKey: MaxKey}, shard: donorShardNames[0]}],
|
||||
});
|
||||
|
||||
runJoinsExistingOperationTest(reshardingTest, {
|
||||
opType: "unshardCollection",
|
||||
operationArgs: {toShard: recipientShardNames[0]},
|
||||
makeJoiningThreadFn: makeUnshardCollectionThread,
|
||||
joiningThreadExtraArgs: {toShard: recipientShardNames[0]},
|
||||
});
|
||||
|
||||
reshardingTest.teardown();
|
||||
Reference in New Issue
Block a user