From 485d3f088b4bcbafc0e2eeeb83eaee7b1f6f03ff Mon Sep 17 00:00:00 2001 From: erinzhu001 Date: Mon, 9 Mar 2026 10:34:29 -0400 Subject: [PATCH] SERVER-120499 Respond with queryStatsMetrics from primary shard when forwarded from router for legacy write path (#49050) GitOrigin-RevId: a13601758afd8f4b6097b491b4c86eb8f8ffd561 --- .../update_cmd_include_query_stats_metrics.js | 7 -- src/mongo/db/op_debug.h | 13 +++ src/mongo/s/write_ops/batch_write_op.cpp | 8 ++ src/mongo/s/write_ops/batch_write_op_test.cpp | 108 ++++++++++++++++++ .../coordinate_multi_update_util.cpp | 17 ++- .../write_batch_response_processor.cpp | 8 +- 6 files changed, 144 insertions(+), 17 deletions(-) diff --git a/jstests/noPassthrough/query/queryStats/update_cmd_include_query_stats_metrics.js b/jstests/noPassthrough/query/queryStats/update_cmd_include_query_stats_metrics.js index f0ffaec822e..180d7f297d4 100644 --- a/jstests/noPassthrough/query/queryStats/update_cmd_include_query_stats_metrics.js +++ b/jstests/noPassthrough/query/queryStats/update_cmd_include_query_stats_metrics.js @@ -11,7 +11,6 @@ import { getQueryStatsUpdateCmd, resetQueryStatsStore, } from "jstests/libs/query/query_stats_utils.js"; -import {isUweEnabled} from "jstests/libs/query/uwe_utils.js"; import {ShardingTest} from "jstests/libs/shardingtest.js"; const collName = jsTestName(); @@ -150,12 +149,6 @@ describe("Sharded", () => { before(() => { const st = new ShardingTest({shards: 2}); testDB = st.s.getDB("test"); - // TODO SERVER-120499 Remove skipping test due to legacy batch write exec. - if (!isUweEnabled(st.s)) { - st.stop(); - jsTest.log.info("Skipping test: featureFlagUnifiedWriteExecutor is not enabled"); - quit(); - } st.shardColl(testDB[collName], {_id: 1}, {_id: 1}); fixture = st; resetCollection(testDB[collName]); diff --git a/src/mongo/db/op_debug.h b/src/mongo/db/op_debug.h index 290c417c720..2c7d4f5635e 100644 --- a/src/mongo/db/op_debug.h +++ b/src/mongo/db/op_debug.h @@ -553,6 +553,19 @@ public: } } + // Gathers and returns a vector of all queryStatsMetrics for registered batch write operations. + MONGO_MOD_PRIVATE std::vector + gatherQueryStatsMetricsForBatchWrites() const { + std::vector queryStatsMetrics; + if (_queryStatsInfoForBatchWrites) { + for (const auto& [opIndex, info] : *_queryStatsInfoForBatchWrites) { + queryStatsMetrics.emplace_back(write_ops::QueryStatsMetrics{ + static_cast(opIndex), getCursorMetrics(opIndex)}); + } + } + return queryStatsMetrics; + } + // The query framework that this operation used. Will be unknown for non query operations. PlanExecutor::QueryFramework queryFramework{PlanExecutor::QueryFramework::kUnknown}; diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index c3b67a6c7c4..0a91b490c2a 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -971,6 +971,14 @@ void BatchWriteOp::buildClientResponse(BatchedCommandResponse* batchResp) { if (!_retriedStmtIds.empty()) { batchResp->setRetriedStmtIds({_retriedStmtIds.begin(), _retriedStmtIds.end()}); } + + // Append query stats metrics if the command is forwarded from router (the current node is a + // primary shard) such that router can receive and aggregate the metrics there. + // Otherwise, ignore if the current node is a router. + if (_opCtx->isCommandForwardedFromRouter()) { + auto& opDebug = CurOp::get(_opCtx)->debug(); + batchResp->setQueryStatsMetrics(opDebug.gatherQueryStatsMetricsForBatchWrites()); + } } int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const { diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index a7b0642471b..d3c61c3475b 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -36,6 +36,7 @@ #include "mongo/bson/oid.h" #include "mongo/bson/timestamp.h" #include "mongo/bson/util/builder.h" +#include "mongo/db/curop.h" #include "mongo/db/global_catalog/shard_key_pattern.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/write_ops/write_ops_gen.h" @@ -3119,5 +3120,112 @@ TEST_F(BatchWriteLargeTopLevelFieldTest, WriteWithStmtIds) { auto actual = builder.obj().objsize(); ASSERT_GTE(estimate, actual); } + +TEST_F(BatchWriteOpTest, BuildClientResponseIncludesQueryStatsMetricsWhenForwardedFromRouter) { + NamespaceString nss = NamespaceString::createNamespaceString_forTest("foo.bar"); + ShardEndpoint endpoint( + ShardId("shard"), ShardVersionFactory::make(ChunkVersion::IGNORED()), boost::none); + + auto targeter = initTargeterFullRange(nss, endpoint); + + BatchedCommandRequest request([&] { + write_ops::UpdateCommandRequest updateOp(nss); + updateOp.setUpdates({buildUpdate(BSON("x" << 1), BSON("$set" << BSON("y" << 1)), false), + buildUpdate(BSON("x" << 2), BSON("$set" << BSON("y" << 2)), false)}); + return updateOp; + }()); + + _opCtx->setCommandForwardedFromRouter(); + + auto& opDebug = CurOp::get(_opCtx)->debug(); + opDebug.setQueryStatsInfoAtOpIndex(0, OpDebug::QueryStatsInfo{}); + opDebug.setQueryStatsInfoAtOpIndex(1, OpDebug::QueryStatsInfo{}); + + opDebug.getAdditiveMetrics(0).keysExamined = 10; + opDebug.getAdditiveMetrics(0).docsExamined = 5; + opDebug.getAdditiveMetrics(1).keysExamined = 20; + opDebug.getAdditiveMetrics(1).docsExamined = 15; + + BatchWriteOp batchOp(_opCtx, request); + + std::map> targeted; + ASSERT_OK(batchOp.targetBatch(targeter, false, &targeted)); + + BatchedCommandResponse response; + buildResponse(2, &response); + response.setNModified(2); + + batchOp.noteBatchResponse(*targeted.begin()->second, response, nullptr); + ASSERT(batchOp.isFinished()); + + BatchedCommandResponse clientResponse; + batchOp.buildClientResponse(&clientResponse); + ASSERT(clientResponse.getOk()); + ASSERT_EQUALS(clientResponse.getN(), 2); + + ASSERT_TRUE(clientResponse.areQueryStatsMetricsSet()); + const auto& metrics = clientResponse.getQueryStatsMetrics(); + ASSERT_EQUALS(metrics.size(), 2u); + + bool found0 = false, found1 = false; + for (const auto& m : metrics) { + if (m.getOriginalOpIndex() == 0) { + ASSERT_EQ(m.getMetrics().getKeysExamined(), 10); + ASSERT_EQ(m.getMetrics().getDocsExamined(), 5); + found0 = true; + } else if (m.getOriginalOpIndex() == 1) { + ASSERT_EQ(m.getMetrics().getKeysExamined(), 20); + ASSERT_EQ(m.getMetrics().getDocsExamined(), 15); + found1 = true; + } + } + ASSERT(found0); + ASSERT(found1); +} + +TEST_F(BatchWriteOpTest, + BuildClientResponseDoesNotIncludeQueryStatsMetricsWhenNotForwardedFromRouter) { + NamespaceString nss = NamespaceString::createNamespaceString_forTest("foo.bar"); + ShardEndpoint endpoint( + ShardId("shard"), ShardVersionFactory::make(ChunkVersion::IGNORED()), boost::none); + + auto targeter = initTargeterFullRange(nss, endpoint); + + BatchedCommandRequest request([&] { + write_ops::UpdateCommandRequest updateOp(nss); + updateOp.setUpdates({buildUpdate(BSON("x" << 1), BSON("$set" << BSON("y" << 1)), false), + buildUpdate(BSON("x" << 2), BSON("$set" << BSON("y" << 2)), false)}); + return updateOp; + }()); + + auto& opDebug = CurOp::get(_opCtx)->debug(); + opDebug.setQueryStatsInfoAtOpIndex(0, OpDebug::QueryStatsInfo{}); + opDebug.setQueryStatsInfoAtOpIndex(1, OpDebug::QueryStatsInfo{}); + + opDebug.getAdditiveMetrics(0).keysExamined = 10; + opDebug.getAdditiveMetrics(0).docsExamined = 5; + opDebug.getAdditiveMetrics(1).keysExamined = 20; + opDebug.getAdditiveMetrics(1).docsExamined = 15; + + BatchWriteOp batchOp(_opCtx, request); + + std::map> targeted; + ASSERT_OK(batchOp.targetBatch(targeter, false, &targeted)); + + BatchedCommandResponse response; + buildResponse(2, &response); + response.setNModified(2); + + batchOp.noteBatchResponse(*targeted.begin()->second, response, nullptr); + ASSERT(batchOp.isFinished()); + + BatchedCommandResponse clientResponse; + batchOp.buildClientResponse(&clientResponse); + ASSERT(clientResponse.getOk()); + ASSERT_EQUALS(clientResponse.getN(), 2); + + ASSERT_FALSE(clientResponse.areQueryStatsMetricsSet()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/write_ops/coordinate_multi_update_util.cpp b/src/mongo/s/write_ops/coordinate_multi_update_util.cpp index d290579fb6d..d0a5f8c597f 100644 --- a/src/mongo/s/write_ops/coordinate_multi_update_util.cpp +++ b/src/mongo/s/write_ops/coordinate_multi_update_util.cpp @@ -33,6 +33,7 @@ #include "mongo/db/router_role/cluster_commands_helpers.h" #include "mongo/db/sharding_environment/grid.h" #include "mongo/s/request_types/coordinate_multi_update_gen.h" +#include "mongo/s/write_ops/unified_write_executor/write_batch_query_stats_registrar.h" namespace mongo { namespace coordinate_multi_update_util { @@ -92,13 +93,21 @@ BulkWriteCRUDOp getWriteOpFromBulk(const BulkWriteCommandRequest& bulkOp, return BulkWriteCRUDOp{bulkOp.getOps()[getWriteOpIndex(childBatches)]}; } -BSONObj makeCommandForOp(BatchWriteOp& batchOp, +BSONObj makeCommandForOp(OperationContext* opCtx, + BatchWriteOp& batchOp, const TargetedBatchMap& childBatches, const BatchedCommandRequest& clientRequest) { auto op = getWriteOpFromBatch(batchOp, childBatches); BSONObjBuilder bob; clientRequest.getNS().serializeCollectionName(&bob, getCommandNameForOp(op)); - bob.append(getOpsFieldNameForOp(op), BSON_ARRAY(getOpAsBson(op))); + if (op.getOpType() == BatchedCommandRequest::BatchType_Update) { + auto updateOpEntry = write_op_helpers::getOrMakeUpdateOpEntry(op.getUpdateOp()); + unified_write_executor::WriteBatchQueryStatsRegistrar{} + .setIncludeQueryStatsMetricsIfRequested(opCtx, op.getIndex(), updateOpEntry); + bob.append(getOpsFieldNameForOp(op), BSON_ARRAY(updateOpEntry.toBSON())); + } else { + bob.append(getOpsFieldNameForOp(op), BSON_ARRAY(getOpAsBson(op))); + } bob.appendElementsUnique(getRequestBson(clientRequest)); return bob.obj(); } @@ -180,7 +189,9 @@ BatchedCommandResponse executeCoordinateMultiUpdate(OperationContext* opCtx, const BatchedCommandRequest& clientRequest) { try { return parseBatchedResponse(executeCoordinateMultiUpdate( - opCtx, clientRequest.getNS(), makeCommandForOp(batchOp, childBatches, clientRequest))); + opCtx, + clientRequest.getNS(), + makeCommandForOp(opCtx, batchOp, childBatches, clientRequest))); } catch (const DBException& e) { BatchedCommandResponse result; result.setStatus(e.toStatus()); diff --git a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp index f40abfe3c5d..d6770615440 100644 --- a/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp +++ b/src/mongo/s/write_ops/unified_write_executor/write_batch_response_processor.cpp @@ -1447,14 +1447,8 @@ BatchedCommandResponse WriteBatchResponseProcessor::generateClientResponseForBat // primary shard) such that router can receive and aggregate the metrics there. // Otherwise, ignore if the current node is a router. if (opCtx->isCommandForwardedFromRouter()) { - std::vector queryStatsMetrics; auto& opDebug = CurOp::get(opCtx)->debug(); - opDebug.forEachQueryStatsInfoForBatchWrites( - [&queryStatsMetrics, &opDebug](size_t opIndex, const OpDebug::QueryStatsInfo&) { - queryStatsMetrics.emplace_back(write_ops::QueryStatsMetrics{ - static_cast(opIndex), opDebug.getCursorMetrics(opIndex)}); - }); - resp.setQueryStatsMetrics(std::move(queryStatsMetrics)); + resp.setQueryStatsMetrics(opDebug.gatherQueryStatsMetricsForBatchWrites()); } return resp; }