SERVER-120499 Respond with queryStatsMetrics from primary shard when forwarded from router for legacy write path (#49050)
GitOrigin-RevId: a13601758afd8f4b6097b491b4c86eb8f8ffd561
This commit is contained in:
@@ -11,7 +11,6 @@ import {
|
||||
getQueryStatsUpdateCmd,
|
||||
resetQueryStatsStore,
|
||||
} from "jstests/libs/query/query_stats_utils.js";
|
||||
import {isUweEnabled} from "jstests/libs/query/uwe_utils.js";
|
||||
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
||||
|
||||
const collName = jsTestName();
|
||||
@@ -150,12 +149,6 @@ describe("Sharded", () => {
|
||||
before(() => {
|
||||
const st = new ShardingTest({shards: 2});
|
||||
testDB = st.s.getDB("test");
|
||||
// TODO SERVER-120499 Remove skipping test due to legacy batch write exec.
|
||||
if (!isUweEnabled(st.s)) {
|
||||
st.stop();
|
||||
jsTest.log.info("Skipping test: featureFlagUnifiedWriteExecutor is not enabled");
|
||||
quit();
|
||||
}
|
||||
st.shardColl(testDB[collName], {_id: 1}, {_id: 1});
|
||||
fixture = st;
|
||||
resetCollection(testDB[collName]);
|
||||
|
||||
@@ -553,6 +553,19 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Gathers and returns a vector of all queryStatsMetrics for registered batch write operations.
|
||||
MONGO_MOD_PRIVATE std::vector<write_ops::QueryStatsMetrics>
|
||||
gatherQueryStatsMetricsForBatchWrites() const {
|
||||
std::vector<write_ops::QueryStatsMetrics> queryStatsMetrics;
|
||||
if (_queryStatsInfoForBatchWrites) {
|
||||
for (const auto& [opIndex, info] : *_queryStatsInfoForBatchWrites) {
|
||||
queryStatsMetrics.emplace_back(write_ops::QueryStatsMetrics{
|
||||
static_cast<int32_t>(opIndex), getCursorMetrics(opIndex)});
|
||||
}
|
||||
}
|
||||
return queryStatsMetrics;
|
||||
}
|
||||
|
||||
// The query framework that this operation used. Will be unknown for non query operations.
|
||||
PlanExecutor::QueryFramework queryFramework{PlanExecutor::QueryFramework::kUnknown};
|
||||
|
||||
|
||||
@@ -971,6 +971,14 @@ void BatchWriteOp::buildClientResponse(BatchedCommandResponse* batchResp) {
|
||||
if (!_retriedStmtIds.empty()) {
|
||||
batchResp->setRetriedStmtIds({_retriedStmtIds.begin(), _retriedStmtIds.end()});
|
||||
}
|
||||
|
||||
// Append query stats metrics if the command is forwarded from router (the current node is a
|
||||
// primary shard) such that router can receive and aggregate the metrics there.
|
||||
// Otherwise, ignore if the current node is a router.
|
||||
if (_opCtx->isCommandForwardedFromRouter()) {
|
||||
auto& opDebug = CurOp::get(_opCtx)->debug();
|
||||
batchResp->setQueryStatsMetrics(opDebug.gatherQueryStatsMetricsForBatchWrites());
|
||||
}
|
||||
}
|
||||
|
||||
int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const {
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "mongo/bson/oid.h"
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/bson/util/builder.h"
|
||||
#include "mongo/db/curop.h"
|
||||
#include "mongo/db/global_catalog/shard_key_pattern.h"
|
||||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/query/write_ops/write_ops_gen.h"
|
||||
@@ -3119,5 +3120,112 @@ TEST_F(BatchWriteLargeTopLevelFieldTest, WriteWithStmtIds) {
|
||||
auto actual = builder.obj().objsize();
|
||||
ASSERT_GTE(estimate, actual);
|
||||
}
|
||||
|
||||
TEST_F(BatchWriteOpTest, BuildClientResponseIncludesQueryStatsMetricsWhenForwardedFromRouter) {
|
||||
NamespaceString nss = NamespaceString::createNamespaceString_forTest("foo.bar");
|
||||
ShardEndpoint endpoint(
|
||||
ShardId("shard"), ShardVersionFactory::make(ChunkVersion::IGNORED()), boost::none);
|
||||
|
||||
auto targeter = initTargeterFullRange(nss, endpoint);
|
||||
|
||||
BatchedCommandRequest request([&] {
|
||||
write_ops::UpdateCommandRequest updateOp(nss);
|
||||
updateOp.setUpdates({buildUpdate(BSON("x" << 1), BSON("$set" << BSON("y" << 1)), false),
|
||||
buildUpdate(BSON("x" << 2), BSON("$set" << BSON("y" << 2)), false)});
|
||||
return updateOp;
|
||||
}());
|
||||
|
||||
_opCtx->setCommandForwardedFromRouter();
|
||||
|
||||
auto& opDebug = CurOp::get(_opCtx)->debug();
|
||||
opDebug.setQueryStatsInfoAtOpIndex(0, OpDebug::QueryStatsInfo{});
|
||||
opDebug.setQueryStatsInfoAtOpIndex(1, OpDebug::QueryStatsInfo{});
|
||||
|
||||
opDebug.getAdditiveMetrics(0).keysExamined = 10;
|
||||
opDebug.getAdditiveMetrics(0).docsExamined = 5;
|
||||
opDebug.getAdditiveMetrics(1).keysExamined = 20;
|
||||
opDebug.getAdditiveMetrics(1).docsExamined = 15;
|
||||
|
||||
BatchWriteOp batchOp(_opCtx, request);
|
||||
|
||||
std::map<ShardId, std::unique_ptr<TargetedWriteBatch>> targeted;
|
||||
ASSERT_OK(batchOp.targetBatch(targeter, false, &targeted));
|
||||
|
||||
BatchedCommandResponse response;
|
||||
buildResponse(2, &response);
|
||||
response.setNModified(2);
|
||||
|
||||
batchOp.noteBatchResponse(*targeted.begin()->second, response, nullptr);
|
||||
ASSERT(batchOp.isFinished());
|
||||
|
||||
BatchedCommandResponse clientResponse;
|
||||
batchOp.buildClientResponse(&clientResponse);
|
||||
ASSERT(clientResponse.getOk());
|
||||
ASSERT_EQUALS(clientResponse.getN(), 2);
|
||||
|
||||
ASSERT_TRUE(clientResponse.areQueryStatsMetricsSet());
|
||||
const auto& metrics = clientResponse.getQueryStatsMetrics();
|
||||
ASSERT_EQUALS(metrics.size(), 2u);
|
||||
|
||||
bool found0 = false, found1 = false;
|
||||
for (const auto& m : metrics) {
|
||||
if (m.getOriginalOpIndex() == 0) {
|
||||
ASSERT_EQ(m.getMetrics().getKeysExamined(), 10);
|
||||
ASSERT_EQ(m.getMetrics().getDocsExamined(), 5);
|
||||
found0 = true;
|
||||
} else if (m.getOriginalOpIndex() == 1) {
|
||||
ASSERT_EQ(m.getMetrics().getKeysExamined(), 20);
|
||||
ASSERT_EQ(m.getMetrics().getDocsExamined(), 15);
|
||||
found1 = true;
|
||||
}
|
||||
}
|
||||
ASSERT(found0);
|
||||
ASSERT(found1);
|
||||
}
|
||||
|
||||
TEST_F(BatchWriteOpTest,
|
||||
BuildClientResponseDoesNotIncludeQueryStatsMetricsWhenNotForwardedFromRouter) {
|
||||
NamespaceString nss = NamespaceString::createNamespaceString_forTest("foo.bar");
|
||||
ShardEndpoint endpoint(
|
||||
ShardId("shard"), ShardVersionFactory::make(ChunkVersion::IGNORED()), boost::none);
|
||||
|
||||
auto targeter = initTargeterFullRange(nss, endpoint);
|
||||
|
||||
BatchedCommandRequest request([&] {
|
||||
write_ops::UpdateCommandRequest updateOp(nss);
|
||||
updateOp.setUpdates({buildUpdate(BSON("x" << 1), BSON("$set" << BSON("y" << 1)), false),
|
||||
buildUpdate(BSON("x" << 2), BSON("$set" << BSON("y" << 2)), false)});
|
||||
return updateOp;
|
||||
}());
|
||||
|
||||
auto& opDebug = CurOp::get(_opCtx)->debug();
|
||||
opDebug.setQueryStatsInfoAtOpIndex(0, OpDebug::QueryStatsInfo{});
|
||||
opDebug.setQueryStatsInfoAtOpIndex(1, OpDebug::QueryStatsInfo{});
|
||||
|
||||
opDebug.getAdditiveMetrics(0).keysExamined = 10;
|
||||
opDebug.getAdditiveMetrics(0).docsExamined = 5;
|
||||
opDebug.getAdditiveMetrics(1).keysExamined = 20;
|
||||
opDebug.getAdditiveMetrics(1).docsExamined = 15;
|
||||
|
||||
BatchWriteOp batchOp(_opCtx, request);
|
||||
|
||||
std::map<ShardId, std::unique_ptr<TargetedWriteBatch>> targeted;
|
||||
ASSERT_OK(batchOp.targetBatch(targeter, false, &targeted));
|
||||
|
||||
BatchedCommandResponse response;
|
||||
buildResponse(2, &response);
|
||||
response.setNModified(2);
|
||||
|
||||
batchOp.noteBatchResponse(*targeted.begin()->second, response, nullptr);
|
||||
ASSERT(batchOp.isFinished());
|
||||
|
||||
BatchedCommandResponse clientResponse;
|
||||
batchOp.buildClientResponse(&clientResponse);
|
||||
ASSERT(clientResponse.getOk());
|
||||
ASSERT_EQUALS(clientResponse.getN(), 2);
|
||||
|
||||
ASSERT_FALSE(clientResponse.areQueryStatsMetricsSet());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
#include "mongo/db/router_role/cluster_commands_helpers.h"
|
||||
#include "mongo/db/sharding_environment/grid.h"
|
||||
#include "mongo/s/request_types/coordinate_multi_update_gen.h"
|
||||
#include "mongo/s/write_ops/unified_write_executor/write_batch_query_stats_registrar.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace coordinate_multi_update_util {
|
||||
@@ -92,13 +93,21 @@ BulkWriteCRUDOp getWriteOpFromBulk(const BulkWriteCommandRequest& bulkOp,
|
||||
return BulkWriteCRUDOp{bulkOp.getOps()[getWriteOpIndex(childBatches)]};
|
||||
}
|
||||
|
||||
BSONObj makeCommandForOp(BatchWriteOp& batchOp,
|
||||
BSONObj makeCommandForOp(OperationContext* opCtx,
|
||||
BatchWriteOp& batchOp,
|
||||
const TargetedBatchMap& childBatches,
|
||||
const BatchedCommandRequest& clientRequest) {
|
||||
auto op = getWriteOpFromBatch(batchOp, childBatches);
|
||||
BSONObjBuilder bob;
|
||||
clientRequest.getNS().serializeCollectionName(&bob, getCommandNameForOp(op));
|
||||
bob.append(getOpsFieldNameForOp(op), BSON_ARRAY(getOpAsBson(op)));
|
||||
if (op.getOpType() == BatchedCommandRequest::BatchType_Update) {
|
||||
auto updateOpEntry = write_op_helpers::getOrMakeUpdateOpEntry(op.getUpdateOp());
|
||||
unified_write_executor::WriteBatchQueryStatsRegistrar{}
|
||||
.setIncludeQueryStatsMetricsIfRequested(opCtx, op.getIndex(), updateOpEntry);
|
||||
bob.append(getOpsFieldNameForOp(op), BSON_ARRAY(updateOpEntry.toBSON()));
|
||||
} else {
|
||||
bob.append(getOpsFieldNameForOp(op), BSON_ARRAY(getOpAsBson(op)));
|
||||
}
|
||||
bob.appendElementsUnique(getRequestBson(clientRequest));
|
||||
return bob.obj();
|
||||
}
|
||||
@@ -180,7 +189,9 @@ BatchedCommandResponse executeCoordinateMultiUpdate(OperationContext* opCtx,
|
||||
const BatchedCommandRequest& clientRequest) {
|
||||
try {
|
||||
return parseBatchedResponse(executeCoordinateMultiUpdate(
|
||||
opCtx, clientRequest.getNS(), makeCommandForOp(batchOp, childBatches, clientRequest)));
|
||||
opCtx,
|
||||
clientRequest.getNS(),
|
||||
makeCommandForOp(opCtx, batchOp, childBatches, clientRequest)));
|
||||
} catch (const DBException& e) {
|
||||
BatchedCommandResponse result;
|
||||
result.setStatus(e.toStatus());
|
||||
|
||||
@@ -1447,14 +1447,8 @@ BatchedCommandResponse WriteBatchResponseProcessor::generateClientResponseForBat
|
||||
// primary shard) such that router can receive and aggregate the metrics there.
|
||||
// Otherwise, ignore if the current node is a router.
|
||||
if (opCtx->isCommandForwardedFromRouter()) {
|
||||
std::vector<write_ops::QueryStatsMetrics> queryStatsMetrics;
|
||||
auto& opDebug = CurOp::get(opCtx)->debug();
|
||||
opDebug.forEachQueryStatsInfoForBatchWrites(
|
||||
[&queryStatsMetrics, &opDebug](size_t opIndex, const OpDebug::QueryStatsInfo&) {
|
||||
queryStatsMetrics.emplace_back(write_ops::QueryStatsMetrics{
|
||||
static_cast<int32_t>(opIndex), opDebug.getCursorMetrics(opIndex)});
|
||||
});
|
||||
resp.setQueryStatsMetrics(std::move(queryStatsMetrics));
|
||||
resp.setQueryStatsMetrics(opDebug.gatherQueryStatsMetricsForBatchWrites());
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user