Files
mongo/jstests/sharding/add_and_remove_shard_serializes_with_ddl.js
wolfee 655e1aeca5 SERVER-117113 Prevent promotion to sharded with stale sharding metadata (#46781)
GitOrigin-RevId: a83879d191882df6422aad0048be58ebb0d9c91b
2026-01-26 09:57:36 +00:00

256 lines
9.2 KiB
JavaScript

/**
* Tests that addShard and removeShard serialize with DDL operations.
*
* @tags: [
* does_not_support_stepdowns,
* requires_fcv_80,
* ]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
import {funWithArgs} from "jstests/libs/parallel_shell_helpers.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {waitForCommand} from "jstests/libs/wait_for_command.js";
import {removeShard as removeShardSync} from "jstests/sharding/libs/remove_shard_util.js";
let st = new ShardingTest({shards: 2, enableBalancer: true});
const dbName = "test";
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
let db = st.s.getDB(dbName);
let coll1 = db["coll1"];
// Test removeShard serializes with DDL.
{
jsTest.log("Testing that removeShard serializes with DDL operations");
coll1.insert({});
// Start a DDL operation and block it.
let fpBlockDropCollectionParticipant = configureFailPoint(st.rs0.getPrimary(), "failCommand", {
failCommands: ["_shardsvrDropCollectionParticipant"],
errorCode: ErrorCodes.HostUnreachable,
namespace: coll1.getFullName(),
failInternalCommands: true,
});
let awaitDropCollection = startParallelShell(() => {
assert(db.getSiblingDB("test")["coll1"].drop());
}, st.s.port);
fpBlockDropCollectionParticipant.wait();
// Start remove shard. It should block behind the running DDL.
let shardToRemove = st.shard1.shardName;
let awaitRemoveShard = startParallelShell(
funWithArgs(async function (shardToRemove) {
const {removeShard} = await import("jstests/sharding/libs/remove_shard_util.js");
removeShard(db, shardToRemove);
}, shardToRemove),
st.s.port,
);
// Wait for removeShard to reach the point where it waits for DDLs to drain.
waitForCommand(
"_shardsvrJoinDDLCoordinators",
(op) => op["command"] && op["command"]["_shardsvrJoinDDLCoordinators"] == 1,
st.rs0.getPrimary().getDB("admin"),
);
// Check new DDL operations can still start and complete.
assert.commandWorked(db.coll2.insert({}));
assert(db.coll2.drop());
// Check that the shard to be removed still exists and is draining.
assert.eq(true, st.s.getDB("config")["shards"].findOne({_id: shardToRemove}).draining);
// Unblock the blocked DDL operation
fpBlockDropCollectionParticipant.off();
awaitDropCollection();
// Remove shard should now complete.
awaitRemoveShard();
assert.eq(undefined, st.s.getDB("config")["shards"].findOne({_id: shardToRemove}));
}
// Test addShard serializes with DDL.
{
jsTest.log("Testing that addShard serializes with DDL operations");
coll1.insert({});
// Start a DDL operation and block it.
let fpBlockDropCollectionParticipant = configureFailPoint(st.rs0.getPrimary(), "failCommand", {
failCommands: ["_shardsvrDropCollectionParticipant"],
errorCode: ErrorCodes.HostUnreachable,
namespace: coll1.getFullName(),
failInternalCommands: true,
});
let awaitDropCollection = startParallelShell(() => {
assert(db.getSiblingDB("test")["coll1"].drop());
}, st.s.port);
fpBlockDropCollectionParticipant.wait();
// Start add shard. It should block behind the running DDL.
let shardToAdd = st.shard1.shardName;
// Clean the shard to make sure it's okay to add it again
st.restartShardClean(st.shard1);
let shardToAddUrl = st.rs1.getURL();
let awaitAddShard = startParallelShell(
funWithArgs(
async function (shardToAdd, shardToAddUrl) {
db.adminCommand({addShard: shardToAddUrl, name: shardToAdd});
},
shardToAdd,
shardToAddUrl,
),
st.s.port,
);
// Wait for removeShard to reach the point where it waits for DDLs to drain.
waitForCommand(
"_shardsvrJoinDDLCoordinators",
(op) => op["command"] && op["command"]["_shardsvrJoinDDLCoordinators"] == 1,
st.rs0.getPrimary().getDB("admin"),
);
// Check new DDL operations can still start and complete.
assert.commandWorked(db.coll2.insert({}));
assert(db.coll2.drop());
// Check that the shard still has not been added
assert.eq(0, st.s.getDB("config")["shards"].count({_id: shardToAdd}));
// Unblock the blocked DDL operation
fpBlockDropCollectionParticipant.off();
awaitDropCollection();
// Add shard should now complete.
awaitAddShard();
assert.eq(1, st.s.getDB("config")["shards"].count({_id: shardToAdd}));
}
// Test that if addShard/removeShard fails after having blocked new DDL coordinators, it will
// unblock them.
{
jsTest.log("Testing that the addOrRemoveShardInProgress is reset after a crash during add/removeShard");
function getAddOrRemoveShardInProgressParamValue() {
return assert.commandWorked(
st.configRS.getPrimary().adminCommand({getClusterParameter: "addOrRemoveShardInProgress"}),
).clusterParameters[0].inProgress;
}
function test(testCase) {
let fp = configureFailPoint(st.configRS.getPrimary(), "hangRemoveShardAfterDrainingDDL");
// Start remove shard.
let shardToRemove = st.shard1.shardName;
let shardURL = st.rs1.getURL();
let awaitRemoveShard = startParallelShell(
funWithArgs(async function (shardToRemove) {
const {removeShard} = await import("jstests/sharding/libs/remove_shard_util.js");
try {
removeShard(db, shardToRemove);
} catch (e) {}
}, shardToRemove),
st.s.port,
);
fp.wait();
assert.eq(true, getAddOrRemoveShardInProgressParamValue());
if (
!FeatureFlagUtil.isPresentAndEnabled(
st.configRS.getPrimary().getDB("admin"),
"UseTopologyChangeCoordinators",
)
) {
assert.eq(
1,
st.configRS
.getPrimary()
.getDB("admin")
["system.version"].count({_id: "addOrRemoveShardInProgressRecovery"}),
);
}
if (testCase === "killOp") {
let configPrimary = st.configRS.getPrimary();
let removeShardOpId = configPrimary
.getDB("admin")
.aggregate([
{$currentOp: {allUsers: true}},
{
$match: {
$or: [
{"command._configsvrRemoveShard": shardToRemove},
{"command._configsvrCommitShardRemoval": shardToRemove},
],
},
},
])
.toArray()[0].opid;
assert.commandWorked(configPrimary.getDB("admin").killOp(removeShardOpId));
} else if (testCase === "stopServer") {
// Restart the configsvr.
st.stopAllConfigServers({} /* opts */, true /* forRestart */);
st.restartAllConfigServers();
} else if (testCase === "stepUp") {
st.configRS.stepUp(st.configRS.getSecondary());
}
// Turn off the failpoint for the coordinator case where the command is still stuck
st.configRS.nodes.forEach((conn) => {
sh.assertRetryableCommandWorkedOrFailedWithCodes(() => {
return conn.adminCommand({configureFailPoint: "hangRemoveShardAfterDrainingDDL", mode: "off"});
}, "Timed out disabling fail point " + "hangRemoveShardAfterDrainingDDL");
});
awaitRemoveShard();
// Soon the cluster parameter will be reset and new DDL operations will be able to start.
assert.soon(() => {
let recoveryDocRemoved = true;
if (
!FeatureFlagUtil.isPresentAndEnabled(
st.configRS.getPrimary().getDB("admin"),
"UseTopologyChangeCoordinators",
)
) {
recoveryDocRemoved =
st.configRS
.getPrimary()
.getDB("admin")
["system.version"].count({_id: "addOrRemoveShardInProgressRecovery"}) === 0;
}
return getAddOrRemoveShardInProgressParamValue() === false && recoveryDocRemoved;
});
assert.commandWorked(coll1.insert({}));
assert(coll1.drop());
// Re-add shard to cluster
fp.off();
removeShardSync(st, shardToRemove);
st.restartShardClean(st.shard1);
assert.commandWorked(st.s.adminCommand({addShard: shardURL}));
}
test("killOp");
if (FeatureFlagUtil.isPresentAndEnabled(st.shard0.getDB("admin"), "UseTopologyChangeCoordinators")) {
// Test step up rather than stop because stopping the primary will hang due to the
// coordinator waiting for the failpoint. Only run this test if we have some secondary to
// step up (auto bootstrap runs with a single CSRS node).
if (st.configRS.nodes.length > 1) {
test("stepUp");
}
} else {
test("stopServer");
}
}
st.stop();