SERVER-120992 Refactor resharding coordinator command dispatch logic (#49270)

GitOrigin-RevId: a08c9776d6bf961958a3135bbfbfed70e06edd35
This commit is contained in:
Kruti Shah
2026-03-09 19:16:48 -04:00
committed by MongoDB Bot
parent 2a1b4c6e13
commit 4e64245a02
5 changed files with 249 additions and 150 deletions

View File

@@ -414,6 +414,7 @@ mongo_cc_library(
"//src/mongo/db/s/resharding:resharding_change_streams_monitor.cpp",
"//src/mongo/db/s/resharding:resharding_clone_fetcher.cpp",
"//src/mongo/db/s/resharding:resharding_collection_cloner.cpp",
"//src/mongo/db/s/resharding:resharding_coordinator_command_util.cpp",
"//src/mongo/db/s/resharding:resharding_coordinator_commit_monitor.cpp",
"//src/mongo/db/s/resharding:resharding_coordinator_dao.cpp",
"//src/mongo/db/s/resharding:resharding_coordinator_observer.cpp",

View File

@@ -468,26 +468,6 @@ private:
void _removeOrQuiesceCoordinatorDocAndRemoveReshardingFields(
OperationContext* opCtx, boost::optional<Status> abortReason = boost::none);
/**
* Sends the command to the specified participants asynchronously.
*/
template <typename CommandType>
void _sendCommandToAllParticipants(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<async_rpc::AsyncRPCOptions<CommandType>> opts);
template <typename CommandType>
void _sendCommandToAllDonors(const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<async_rpc::AsyncRPCOptions<CommandType>> opts);
template <typename CommandType>
void _sendCommandToAllRecipients(const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<async_rpc::AsyncRPCOptions<CommandType>> opts);
void _sendRecipientCloneCmdToShards(
OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
ShardsvrReshardRecipientClone cmd,
std::set<ShardId> recipientShardIds);
/**
* Sends '_flushRoutingTableCacheUpdatesWithWriteConcern' to ensure donor state machine creation
* by the time the refresh completes.
@@ -538,7 +518,7 @@ private:
* Sends '_shardsvrCommitReshardCollection' to all participant shards.
*/
void _tellAllParticipantsToCommit(
const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
/**
* Sends '_shardsvrAbortReshardCollection' to all participant shards.

View File

@@ -35,6 +35,7 @@
#include "mongo/db/s/balancer/balance_stats.h"
#include "mongo/db/s/balancer/balancer_policy.h"
#include "mongo/db/s/resharding/resharding_coordinator.h"
#include "mongo/db/s/resharding/resharding_coordinator_command_util.h"
#include "mongo/db/s/resharding/resharding_coordinator_commit_monitor.h"
#include "mongo/db/s/resharding/resharding_coordinator_dao.h"
#include "mongo/db/s/resharding/resharding_coordinator_observer.h"
@@ -48,8 +49,6 @@
#include "mongo/db/topology/vector_clock/vector_clock.h"
#include "mongo/db/topology/vector_clock/vector_clock_mutable.h"
#include "mongo/otel/traces/telemetry_context_serialization.h"
#include "mongo/s/request_types/abort_reshard_collection_gen.h"
#include "mongo/s/request_types/commit_reshard_collection_gen.h"
#include "mongo/s/request_types/reshard_collection_gen.h"
#include "mongo/util/modules.h"
#include "mongo/util/testing_proctor.h"
@@ -574,10 +573,7 @@ ExecutorFuture<void> ReshardingCoordinator::_commitAndFinishReshardOperation(
AfterWriteOnCatalogLegacy);
}
})
.then([this, executor] {
_tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(),
executor);
})
.then([this, executor] { _tellAllParticipantsToCommit(executor); })
.then([this] {
_updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss());
})
@@ -1884,61 +1880,6 @@ void ReshardingCoordinator::_removeOrQuiesceCoordinatorDocAndRemoveReshardingFie
#endif // RESHARDING_COORDINATOR_PART_3
#ifdef RESHARDING_COORDINATOR_PART_4
template <typename CommandType>
void ReshardingCoordinator::_sendCommandToAllParticipants(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<async_rpc::AsyncRPCOptions<CommandType>> opts) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto donorShardIds =
resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
auto recipientShardIds =
resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()};
participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
resharding::sendCommandToShards(
opCtx.get(), opts, {participantShardIds.begin(), participantShardIds.end()});
}
template <typename CommandType>
void ReshardingCoordinator::_sendCommandToAllDonors(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<async_rpc::AsyncRPCOptions<CommandType>> opts) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto donorShardIds =
resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
resharding::sendCommandToShards(
opCtx.get(), opts, {donorShardIds.begin(), donorShardIds.end()});
}
template <typename CommandType>
void ReshardingCoordinator::_sendCommandToAllRecipients(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<async_rpc::AsyncRPCOptions<CommandType>> opts) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto recipientShardIds =
resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
resharding::sendCommandToShards(
opCtx.get(), opts, {recipientShardIds.begin(), recipientShardIds.end()});
}
void ReshardingCoordinator::_sendRecipientCloneCmdToShards(
OperationContext* opCtx,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
ShardsvrReshardRecipientClone cmd,
std::set<ShardId> recipientShardIds) {
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrReshardRecipientClone>>(
**executor, _ctHolder->getStepdownToken(), cmd);
generic_argument_util::setMajorityWriteConcern(opts->cmd, &resharding::kMajorityWriteConcern);
opts->cmd.setDbName(DatabaseName::kAdmin);
resharding::sendCommandToShards(
opCtx, opts, {recipientShardIds.begin(), recipientShardIds.end()});
}
void ReshardingCoordinator::_establishAllDonorsAsParticipants(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
invariant(_coordinatorDoc.getState() == CoordinatorStateEnum::kPreparingToDonate);
@@ -1972,25 +1913,8 @@ void ReshardingCoordinator::_tellAllRecipientsToClone(
reshardingPauseBeforeTellingRecipientsToClone.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
auto [shardsOwningChunks, shardsNotOwningChunks] =
resharding::computeRecipientChunkOwnership(opCtx.get(), _coordinatorDoc);
auto recipientFields = resharding::constructRecipientFields(_coordinatorDoc);
ShardsvrReshardRecipientClone cmd(_coordinatorDoc.getReshardingUUID());
cmd.setCloneTimestamp(recipientFields.getCloneTimestamp().get());
cmd.setDonorShards(recipientFields.getDonorShards());
cmd.setApproxCopySize(recipientFields.getReshardingApproxCopySizeStruct());
_sendRecipientCloneCmdToShards(opCtx.get(), executor, cmd, shardsOwningChunks);
if (shardsNotOwningChunks.size() > 0) {
ReshardingApproxCopySize approxCopySize;
approxCopySize.setApproxBytesToCopy(0);
approxCopySize.setApproxDocumentsToCopy(0);
cmd.setApproxCopySize(approxCopySize);
_sendRecipientCloneCmdToShards(opCtx.get(), executor, cmd, shardsNotOwningChunks);
}
resharding::tellAllRecipientsToClone(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
}
void ReshardingCoordinator::_tellAllRecipientsToRefresh(
@@ -2032,67 +1956,34 @@ void ReshardingCoordinator::_tellAllDonorsToStartChangeStreamsMonitor(
if (!_metadata.getPerformVerification()) {
return;
}
invariant(_coordinatorDoc.getCloneTimestamp());
ShardsvrReshardingDonorStartChangeStreamsMonitor cmd(_coordinatorDoc.getSourceNss(),
_coordinatorDoc.getReshardingUUID(),
*_coordinatorDoc.getCloneTimestamp());
auto opts = std::make_shared<
async_rpc::AsyncRPCOptions<ShardsvrReshardingDonorStartChangeStreamsMonitor>>(
**executor, _ctHolder->getStepdownToken(), cmd);
opts->cmd.setDbName(DatabaseName::kAdmin);
_sendCommandToAllDonors(executor, opts);
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::tellAllDonorsToStartChangeStreamsMonitor(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
}
void ReshardingCoordinator::_tellAllRecipientsCriticalSectionStarted(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
ShardsvrReshardRecipientCriticalSectionStarted cmd(_coordinatorDoc.getReshardingUUID());
generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern);
auto opts = std::make_shared<
async_rpc::AsyncRPCOptions<ShardsvrReshardRecipientCriticalSectionStarted>>(
**executor, _ctHolder->getAbortToken(), cmd);
opts->cmd.setDbName(DatabaseName::kAdmin);
_sendCommandToAllRecipients(executor, opts);
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::tellAllRecipientsCriticalSectionStarted(
opCtx.get(), _coordinatorDoc, _ctHolder->getAbortToken(), executor);
}
namespace {
std::shared_ptr<async_rpc::AsyncRPCOptions<ShardsvrCommitReshardCollection>>
createShardsvrCommitReshardCollectionOptions(const NamespaceString& nss,
const UUID& reshardingUUID,
const std::shared_ptr<executor::TaskExecutor>& exec,
CancellationToken token) {
ShardsvrCommitReshardCollection cmd(nss);
cmd.setDbName(DatabaseName::kAdmin);
cmd.setReshardingUUID(reshardingUUID);
generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern);
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrCommitReshardCollection>>(
exec, token, cmd);
return opts;
}
} // namespace
void ReshardingCoordinator::_tellAllParticipantsToCommit(
const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
{
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseBeforeTellingParticipantsToCommit.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
}
auto opts = createShardsvrCommitReshardCollectionOptions(
nss, _coordinatorDoc.getReshardingUUID(), **executor, _ctHolder->getStepdownToken());
opts->cmd.setDbName(DatabaseName::kAdmin);
_sendCommandToAllParticipants(executor, opts);
resharding::tellAllParticipantsToCommit(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor);
}
void ReshardingCoordinator::_tellAllParticipantsToAbort(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor, bool isUserAborted) {
ShardsvrAbortReshardCollection abortCmd(_coordinatorDoc.getReshardingUUID(), isUserAborted);
abortCmd.setDbName(DatabaseName::kAdmin);
generic_argument_util::setMajorityWriteConcern(abortCmd, &resharding::kMajorityWriteConcern);
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<ShardsvrAbortReshardCollection>>(
**executor, _ctHolder->getStepdownToken(), abortCmd);
_sendCommandToAllParticipants(executor, opts);
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::tellAllParticipantsToAbort(
opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor, isUserAborted);
}
void ReshardingCoordinator::_updateChunkImbalanceMetrics(const NamespaceString& nss) {

View File

@@ -0,0 +1,155 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/s/resharding/resharding_coordinator_command_util.h"
#include "mongo/db/generic_argument_util.h"
#include "mongo/s/request_types/abort_reshard_collection_gen.h"
#include "mongo/s/request_types/commit_reshard_collection_gen.h"
namespace mongo {
namespace {
std::vector<ShardId> getAllParticipantShardIds(const ReshardingCoordinatorDocument& doc) {
auto donorShardIds = resharding::extractShardIdsFromParticipantEntries(doc.getDonorShards());
auto recipientShardIds =
resharding::extractShardIdsFromParticipantEntries(doc.getRecipientShards());
std::set<ShardId> shardIds{donorShardIds.begin(), donorShardIds.end()};
shardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
return {shardIds.begin(), shardIds.end()};
}
template <typename Cmd>
void sendReshardingCommand(OperationContext* opCtx,
Cmd cmd,
CancellationToken token,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const std::vector<ShardId>& shardIds,
bool setWriteConcern = true) {
cmd.setDbName(DatabaseName::kAdmin);
if (setWriteConcern) {
generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern);
}
auto opts = std::make_shared<async_rpc::AsyncRPCOptions<Cmd>>(**executor, token, cmd);
resharding::sendCommandToShards(opCtx, opts, shardIds);
}
} // namespace
namespace resharding {
void tellAllParticipantsToCommit(OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
ShardsvrCommitReshardCollection cmd(doc.getSourceNss());
cmd.setReshardingUUID(doc.getReshardingUUID());
sendReshardingCommand(opCtx, cmd, stepdownToken, executor, getAllParticipantShardIds(doc));
}
void tellAllParticipantsToAbort(OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
bool isUserAborted) {
ShardsvrAbortReshardCollection abortCmd(doc.getReshardingUUID(), isUserAborted);
sendReshardingCommand(opCtx, abortCmd, stepdownToken, executor, getAllParticipantShardIds(doc));
}
void tellAllDonorsToStartChangeStreamsMonitor(
OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
invariant(doc.getCloneTimestamp());
ShardsvrReshardingDonorStartChangeStreamsMonitor cmd(
doc.getSourceNss(), doc.getReshardingUUID(), *doc.getCloneTimestamp());
// The donors ensure the change streams monitor start time is majority committed in their
// state document before returning, so no write concern is needed.
sendReshardingCommand(opCtx,
cmd,
stepdownToken,
executor,
resharding::extractShardIdsFromParticipantEntries(doc.getDonorShards()),
false /* setWriteConcern */);
}
void tellAllRecipientsToClone(OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
auto [shardsOwningChunks, shardsNotOwningChunks] =
resharding::computeRecipientChunkOwnership(opCtx, doc);
auto recipientFields = resharding::constructRecipientFields(doc);
ShardsvrReshardRecipientClone cmd(doc.getReshardingUUID());
cmd.setCloneTimestamp(recipientFields.getCloneTimestamp().get());
cmd.setDonorShards(recipientFields.getDonorShards());
cmd.setApproxCopySize(recipientFields.getReshardingApproxCopySizeStruct());
sendReshardingCommand(opCtx,
cmd,
stepdownToken,
executor,
{shardsOwningChunks.begin(), shardsOwningChunks.end()});
if (!shardsNotOwningChunks.empty()) {
ReshardingApproxCopySize approxCopySize;
approxCopySize.setApproxBytesToCopy(0);
approxCopySize.setApproxDocumentsToCopy(0);
cmd.setApproxCopySize(approxCopySize);
sendReshardingCommand(opCtx,
cmd,
stepdownToken,
executor,
{shardsNotOwningChunks.begin(), shardsNotOwningChunks.end()});
}
}
void tellAllRecipientsCriticalSectionStarted(
OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken abortToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
ShardsvrReshardRecipientCriticalSectionStarted cmd(doc.getReshardingUUID());
sendReshardingCommand(
opCtx,
cmd,
abortToken,
executor,
resharding::extractShardIdsFromParticipantEntries(doc.getRecipientShards()));
}
} // namespace resharding
} // namespace mongo

View File

@@ -0,0 +1,72 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/operation_context.h"
#include "mongo/db/s/resharding/resharding_coordinator.h"
#include "mongo/db/sharding_environment/shard_id.h"
#include "mongo/executor/scoped_task_executor.h"
#include "mongo/util/cancellation.h"
#include <memory>
#include <vector>
namespace mongo {
namespace resharding {
void tellAllParticipantsToCommit(OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllParticipantsToAbort(OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
bool isUserAborted);
void tellAllDonorsToStartChangeStreamsMonitor(
OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllRecipientsToClone(OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken stepdownToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void tellAllRecipientsCriticalSectionStarted(
OperationContext* opCtx,
const ReshardingCoordinatorDocument& doc,
CancellationToken abortToken,
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
} // namespace resharding
} // namespace mongo