From 4e64245a02517c16d514efe81bad8e7bd2e09d05 Mon Sep 17 00:00:00 2001 From: Kruti Shah <70412403+krutishah139@users.noreply.github.com> Date: Mon, 9 Mar 2026 19:16:48 -0400 Subject: [PATCH] SERVER-120992 Refactor resharding coordinator command dispatch logic (#49270) GitOrigin-RevId: a08c9776d6bf961958a3135bbfbfed70e06edd35 --- src/mongo/db/s/BUILD.bazel | 1 + .../db/s/resharding/resharding_coordinator.h | 22 +-- .../s/resharding/resharding_coordinator.inl | 149 +++-------------- .../resharding_coordinator_command_util.cpp | 155 ++++++++++++++++++ .../resharding_coordinator_command_util.h | 72 ++++++++ 5 files changed, 249 insertions(+), 150 deletions(-) create mode 100644 src/mongo/db/s/resharding/resharding_coordinator_command_util.cpp create mode 100644 src/mongo/db/s/resharding/resharding_coordinator_command_util.h diff --git a/src/mongo/db/s/BUILD.bazel b/src/mongo/db/s/BUILD.bazel index 727e4a43bad..bcdbd52b28e 100644 --- a/src/mongo/db/s/BUILD.bazel +++ b/src/mongo/db/s/BUILD.bazel @@ -414,6 +414,7 @@ mongo_cc_library( "//src/mongo/db/s/resharding:resharding_change_streams_monitor.cpp", "//src/mongo/db/s/resharding:resharding_clone_fetcher.cpp", "//src/mongo/db/s/resharding:resharding_collection_cloner.cpp", + "//src/mongo/db/s/resharding:resharding_coordinator_command_util.cpp", "//src/mongo/db/s/resharding:resharding_coordinator_commit_monitor.cpp", "//src/mongo/db/s/resharding:resharding_coordinator_dao.cpp", "//src/mongo/db/s/resharding:resharding_coordinator_observer.cpp", diff --git a/src/mongo/db/s/resharding/resharding_coordinator.h b/src/mongo/db/s/resharding/resharding_coordinator.h index 638dc58616b..064cb7442a8 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator.h +++ b/src/mongo/db/s/resharding/resharding_coordinator.h @@ -468,26 +468,6 @@ private: void _removeOrQuiesceCoordinatorDocAndRemoveReshardingFields( OperationContext* opCtx, boost::optional abortReason = boost::none); - /** - * Sends the command to the specified participants asynchronously. - */ - template - void _sendCommandToAllParticipants( - const std::shared_ptr& executor, - std::shared_ptr> opts); - template - void _sendCommandToAllDonors(const std::shared_ptr& executor, - std::shared_ptr> opts); - template - void _sendCommandToAllRecipients(const std::shared_ptr& executor, - std::shared_ptr> opts); - - void _sendRecipientCloneCmdToShards( - OperationContext* opCtx, - const std::shared_ptr& executor, - ShardsvrReshardRecipientClone cmd, - std::set recipientShardIds); - /** * Sends '_flushRoutingTableCacheUpdatesWithWriteConcern' to ensure donor state machine creation * by the time the refresh completes. @@ -538,7 +518,7 @@ private: * Sends '_shardsvrCommitReshardCollection' to all participant shards. */ void _tellAllParticipantsToCommit( - const NamespaceString& nss, const std::shared_ptr& executor); + const std::shared_ptr& executor); /** * Sends '_shardsvrAbortReshardCollection' to all participant shards. diff --git a/src/mongo/db/s/resharding/resharding_coordinator.inl b/src/mongo/db/s/resharding/resharding_coordinator.inl index 84eb8fdd045..20a09067a22 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator.inl +++ b/src/mongo/db/s/resharding/resharding_coordinator.inl @@ -35,6 +35,7 @@ #include "mongo/db/s/balancer/balance_stats.h" #include "mongo/db/s/balancer/balancer_policy.h" #include "mongo/db/s/resharding/resharding_coordinator.h" +#include "mongo/db/s/resharding/resharding_coordinator_command_util.h" #include "mongo/db/s/resharding/resharding_coordinator_commit_monitor.h" #include "mongo/db/s/resharding/resharding_coordinator_dao.h" #include "mongo/db/s/resharding/resharding_coordinator_observer.h" @@ -48,8 +49,6 @@ #include "mongo/db/topology/vector_clock/vector_clock.h" #include "mongo/db/topology/vector_clock/vector_clock_mutable.h" #include "mongo/otel/traces/telemetry_context_serialization.h" -#include "mongo/s/request_types/abort_reshard_collection_gen.h" -#include "mongo/s/request_types/commit_reshard_collection_gen.h" #include "mongo/s/request_types/reshard_collection_gen.h" #include "mongo/util/modules.h" #include "mongo/util/testing_proctor.h" @@ -574,10 +573,7 @@ ExecutorFuture ReshardingCoordinator::_commitAndFinishReshardOperation( AfterWriteOnCatalogLegacy); } }) - .then([this, executor] { - _tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(), - executor); - }) + .then([this, executor] { _tellAllParticipantsToCommit(executor); }) .then([this] { _updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss()); }) @@ -1884,61 +1880,6 @@ void ReshardingCoordinator::_removeOrQuiesceCoordinatorDocAndRemoveReshardingFie #endif // RESHARDING_COORDINATOR_PART_3 #ifdef RESHARDING_COORDINATOR_PART_4 -template -void ReshardingCoordinator::_sendCommandToAllParticipants( - const std::shared_ptr& executor, - std::shared_ptr> opts) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto donorShardIds = - resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); - auto recipientShardIds = - resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); - std::set participantShardIds{donorShardIds.begin(), donorShardIds.end()}; - participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); - - resharding::sendCommandToShards( - opCtx.get(), opts, {participantShardIds.begin(), participantShardIds.end()}); -} - -template -void ReshardingCoordinator::_sendCommandToAllDonors( - const std::shared_ptr& executor, - std::shared_ptr> opts) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto donorShardIds = - resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); - - resharding::sendCommandToShards( - opCtx.get(), opts, {donorShardIds.begin(), donorShardIds.end()}); -} - -template -void ReshardingCoordinator::_sendCommandToAllRecipients( - const std::shared_ptr& executor, - std::shared_ptr> opts) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto recipientShardIds = - resharding::extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); - - resharding::sendCommandToShards( - opCtx.get(), opts, {recipientShardIds.begin(), recipientShardIds.end()}); -} - -void ReshardingCoordinator::_sendRecipientCloneCmdToShards( - OperationContext* opCtx, - const std::shared_ptr& executor, - ShardsvrReshardRecipientClone cmd, - std::set recipientShardIds) { - auto opts = std::make_shared>( - **executor, _ctHolder->getStepdownToken(), cmd); - - generic_argument_util::setMajorityWriteConcern(opts->cmd, &resharding::kMajorityWriteConcern); - opts->cmd.setDbName(DatabaseName::kAdmin); - - resharding::sendCommandToShards( - opCtx, opts, {recipientShardIds.begin(), recipientShardIds.end()}); -} - void ReshardingCoordinator::_establishAllDonorsAsParticipants( const std::shared_ptr& executor) { invariant(_coordinatorDoc.getState() == CoordinatorStateEnum::kPreparingToDonate); @@ -1972,25 +1913,8 @@ void ReshardingCoordinator::_tellAllRecipientsToClone( reshardingPauseBeforeTellingRecipientsToClone.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getAbortToken()); - auto [shardsOwningChunks, shardsNotOwningChunks] = - resharding::computeRecipientChunkOwnership(opCtx.get(), _coordinatorDoc); - - auto recipientFields = resharding::constructRecipientFields(_coordinatorDoc); - ShardsvrReshardRecipientClone cmd(_coordinatorDoc.getReshardingUUID()); - cmd.setCloneTimestamp(recipientFields.getCloneTimestamp().get()); - cmd.setDonorShards(recipientFields.getDonorShards()); - cmd.setApproxCopySize(recipientFields.getReshardingApproxCopySizeStruct()); - - _sendRecipientCloneCmdToShards(opCtx.get(), executor, cmd, shardsOwningChunks); - - if (shardsNotOwningChunks.size() > 0) { - ReshardingApproxCopySize approxCopySize; - approxCopySize.setApproxBytesToCopy(0); - approxCopySize.setApproxDocumentsToCopy(0); - cmd.setApproxCopySize(approxCopySize); - - _sendRecipientCloneCmdToShards(opCtx.get(), executor, cmd, shardsNotOwningChunks); - } + resharding::tellAllRecipientsToClone( + opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor); } void ReshardingCoordinator::_tellAllRecipientsToRefresh( @@ -2032,67 +1956,34 @@ void ReshardingCoordinator::_tellAllDonorsToStartChangeStreamsMonitor( if (!_metadata.getPerformVerification()) { return; } - invariant(_coordinatorDoc.getCloneTimestamp()); - ShardsvrReshardingDonorStartChangeStreamsMonitor cmd(_coordinatorDoc.getSourceNss(), - _coordinatorDoc.getReshardingUUID(), - *_coordinatorDoc.getCloneTimestamp()); - auto opts = std::make_shared< - async_rpc::AsyncRPCOptions>( - **executor, _ctHolder->getStepdownToken(), cmd); - opts->cmd.setDbName(DatabaseName::kAdmin); - _sendCommandToAllDonors(executor, opts); + + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + resharding::tellAllDonorsToStartChangeStreamsMonitor( + opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor); } void ReshardingCoordinator::_tellAllRecipientsCriticalSectionStarted( const std::shared_ptr& executor) { - ShardsvrReshardRecipientCriticalSectionStarted cmd(_coordinatorDoc.getReshardingUUID()); - generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern); - - auto opts = std::make_shared< - async_rpc::AsyncRPCOptions>( - **executor, _ctHolder->getAbortToken(), cmd); - opts->cmd.setDbName(DatabaseName::kAdmin); - _sendCommandToAllRecipients(executor, opts); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + resharding::tellAllRecipientsCriticalSectionStarted( + opCtx.get(), _coordinatorDoc, _ctHolder->getAbortToken(), executor); } -namespace { -std::shared_ptr> -createShardsvrCommitReshardCollectionOptions(const NamespaceString& nss, - const UUID& reshardingUUID, - const std::shared_ptr& exec, - CancellationToken token) { - ShardsvrCommitReshardCollection cmd(nss); - cmd.setDbName(DatabaseName::kAdmin); - cmd.setReshardingUUID(reshardingUUID); - generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern); - auto opts = std::make_shared>( - exec, token, cmd); - return opts; -} -} // namespace - void ReshardingCoordinator::_tellAllParticipantsToCommit( - const NamespaceString& nss, const std::shared_ptr& executor) { - { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - reshardingPauseBeforeTellingParticipantsToCommit.pauseWhileSetAndNotCanceled( - opCtx.get(), _ctHolder->getAbortToken()); - } + const std::shared_ptr& executor) { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + reshardingPauseBeforeTellingParticipantsToCommit.pauseWhileSetAndNotCanceled( + opCtx.get(), _ctHolder->getAbortToken()); - auto opts = createShardsvrCommitReshardCollectionOptions( - nss, _coordinatorDoc.getReshardingUUID(), **executor, _ctHolder->getStepdownToken()); - opts->cmd.setDbName(DatabaseName::kAdmin); - _sendCommandToAllParticipants(executor, opts); + resharding::tellAllParticipantsToCommit( + opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor); } void ReshardingCoordinator::_tellAllParticipantsToAbort( const std::shared_ptr& executor, bool isUserAborted) { - ShardsvrAbortReshardCollection abortCmd(_coordinatorDoc.getReshardingUUID(), isUserAborted); - abortCmd.setDbName(DatabaseName::kAdmin); - generic_argument_util::setMajorityWriteConcern(abortCmd, &resharding::kMajorityWriteConcern); - auto opts = std::make_shared>( - **executor, _ctHolder->getStepdownToken(), abortCmd); - _sendCommandToAllParticipants(executor, opts); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + resharding::tellAllParticipantsToAbort( + opCtx.get(), _coordinatorDoc, _ctHolder->getStepdownToken(), executor, isUserAborted); } void ReshardingCoordinator::_updateChunkImbalanceMetrics(const NamespaceString& nss) { diff --git a/src/mongo/db/s/resharding/resharding_coordinator_command_util.cpp b/src/mongo/db/s/resharding/resharding_coordinator_command_util.cpp new file mode 100644 index 00000000000..a19f58ed84e --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_coordinator_command_util.cpp @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/resharding/resharding_coordinator_command_util.h" + +#include "mongo/db/generic_argument_util.h" +#include "mongo/s/request_types/abort_reshard_collection_gen.h" +#include "mongo/s/request_types/commit_reshard_collection_gen.h" + +namespace mongo { +namespace { +std::vector getAllParticipantShardIds(const ReshardingCoordinatorDocument& doc) { + auto donorShardIds = resharding::extractShardIdsFromParticipantEntries(doc.getDonorShards()); + auto recipientShardIds = + resharding::extractShardIdsFromParticipantEntries(doc.getRecipientShards()); + + std::set shardIds{donorShardIds.begin(), donorShardIds.end()}; + shardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); + return {shardIds.begin(), shardIds.end()}; +} + +template +void sendReshardingCommand(OperationContext* opCtx, + Cmd cmd, + CancellationToken token, + const std::shared_ptr& executor, + const std::vector& shardIds, + bool setWriteConcern = true) { + cmd.setDbName(DatabaseName::kAdmin); + if (setWriteConcern) { + generic_argument_util::setMajorityWriteConcern(cmd, &resharding::kMajorityWriteConcern); + } + + auto opts = std::make_shared>(**executor, token, cmd); + resharding::sendCommandToShards(opCtx, opts, shardIds); +} +} // namespace + +namespace resharding { + +void tellAllParticipantsToCommit(OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor) { + ShardsvrCommitReshardCollection cmd(doc.getSourceNss()); + cmd.setReshardingUUID(doc.getReshardingUUID()); + + sendReshardingCommand(opCtx, cmd, stepdownToken, executor, getAllParticipantShardIds(doc)); +} + +void tellAllParticipantsToAbort(OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor, + bool isUserAborted) { + ShardsvrAbortReshardCollection abortCmd(doc.getReshardingUUID(), isUserAborted); + + sendReshardingCommand(opCtx, abortCmd, stepdownToken, executor, getAllParticipantShardIds(doc)); +} + +void tellAllDonorsToStartChangeStreamsMonitor( + OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor) { + invariant(doc.getCloneTimestamp()); + ShardsvrReshardingDonorStartChangeStreamsMonitor cmd( + doc.getSourceNss(), doc.getReshardingUUID(), *doc.getCloneTimestamp()); + + // The donors ensure the change streams monitor start time is majority committed in their + // state document before returning, so no write concern is needed. + sendReshardingCommand(opCtx, + cmd, + stepdownToken, + executor, + resharding::extractShardIdsFromParticipantEntries(doc.getDonorShards()), + false /* setWriteConcern */); +} + +void tellAllRecipientsToClone(OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor) { + auto [shardsOwningChunks, shardsNotOwningChunks] = + resharding::computeRecipientChunkOwnership(opCtx, doc); + + auto recipientFields = resharding::constructRecipientFields(doc); + ShardsvrReshardRecipientClone cmd(doc.getReshardingUUID()); + cmd.setCloneTimestamp(recipientFields.getCloneTimestamp().get()); + cmd.setDonorShards(recipientFields.getDonorShards()); + cmd.setApproxCopySize(recipientFields.getReshardingApproxCopySizeStruct()); + + sendReshardingCommand(opCtx, + cmd, + stepdownToken, + executor, + {shardsOwningChunks.begin(), shardsOwningChunks.end()}); + + if (!shardsNotOwningChunks.empty()) { + ReshardingApproxCopySize approxCopySize; + approxCopySize.setApproxBytesToCopy(0); + approxCopySize.setApproxDocumentsToCopy(0); + cmd.setApproxCopySize(approxCopySize); + + sendReshardingCommand(opCtx, + cmd, + stepdownToken, + executor, + {shardsNotOwningChunks.begin(), shardsNotOwningChunks.end()}); + } +} + +void tellAllRecipientsCriticalSectionStarted( + OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken abortToken, + const std::shared_ptr& executor) { + ShardsvrReshardRecipientCriticalSectionStarted cmd(doc.getReshardingUUID()); + + sendReshardingCommand( + opCtx, + cmd, + abortToken, + executor, + resharding::extractShardIdsFromParticipantEntries(doc.getRecipientShards())); +} + +} // namespace resharding +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_command_util.h b/src/mongo/db/s/resharding/resharding_coordinator_command_util.h new file mode 100644 index 00000000000..ac72c6851dd --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_coordinator_command_util.h @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/s/resharding/resharding_coordinator.h" +#include "mongo/db/sharding_environment/shard_id.h" +#include "mongo/executor/scoped_task_executor.h" +#include "mongo/util/cancellation.h" + +#include +#include + +namespace mongo { +namespace resharding { + +void tellAllParticipantsToCommit(OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor); + +void tellAllParticipantsToAbort(OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor, + bool isUserAborted); + +void tellAllDonorsToStartChangeStreamsMonitor( + OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor); + +void tellAllRecipientsToClone(OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken stepdownToken, + const std::shared_ptr& executor); + +void tellAllRecipientsCriticalSectionStarted( + OperationContext* opCtx, + const ReshardingCoordinatorDocument& doc, + CancellationToken abortToken, + const std::shared_ptr& executor); + +} // namespace resharding +} // namespace mongo