Compare commits

...

5 Commits

Author SHA1 Message Date
Gabriel Marks
be443f6fc1 SERVER-63280 Disable ssl_cert_selector_apple.js 2022-02-07 19:38:47 +00:00
A. Jesse Jiryu Davis
1b130329e1 SERVER-63108 Rename Versioned API to Stable API 2022-02-07 19:37:24 +00:00
Jack Mulrow
cebfa751cb SERVER-63364 Use transaction API to handle WCOS errors for findAndModify without retryability 2022-02-07 19:21:55 +00:00
Daniel Gómez Ferro
7a2d86c376 SERVER-61939 Tighter bounds for clustered collection scans 2022-02-07 18:50:45 +00:00
Faustoleyva54
59e19dc9bf SERVER-62453 Add collectionUUID parameter to refineCollectionShardKey command 2022-02-07 16:53:18 +00:00
50 changed files with 588 additions and 324 deletions

View File

@@ -24,7 +24,7 @@
# delete this exception statement from your version. If you delete this
# exception statement from all source files in the program, then also delete
# it in the license file.
"""Check that mongod's and mongos's Versioned API commands are defined in IDL.
"""Check that mongod's and mongos's Stable API commands are defined in IDL.
Call listCommands on mongod and mongos to assert they have the same set of commands in the given API
version, and assert all these commands are defined in IDL.

View File

@@ -4,6 +4,9 @@ selector:
roots:
- jstests/ssl/*.js
- src/mongo/db/modules/*/jstests/fips/*.js
exclude_files:
# TODO SERVER-63155 Re-enable this test.
- jstests/ssl/ssl_cert_selector_apple.js
# ssl tests start their own mongod's.
executor:

2
debian/mongo.1 vendored
View File

@@ -295,7 +295,7 @@ currently the only supported value.
.RS
.PP
Specifies that the server will respond with \fBAPIStrictError\f1 if your application uses a command or behavior
outside of the \fBVersioned API\f1\&.
outside of the \fBStable API\f1\&.
.PP
If you specify \fB\-\-apiStrict\f1\f1, you must also specify
\fB\-\-apiVersion\f1\f1\&.

View File

@@ -7,7 +7,7 @@ set -o errexit
set -o verbose
activate_venv
$python buildscripts/idl/check_versioned_api_commands_have_idl_definitions.py -v --include src --include src/mongo/db/modules/enterprise/src --installDir dist-test/bin 1
$python buildscripts/idl/check_stable_api_commands_have_idl_definitions.py -v --include src --include src/mongo/db/modules/enterprise/src --installDir dist-test/bin 1
$python buildscripts/idl/checkout_idl_files_from_past_releases.py -v idls
find idls -maxdepth 1 -mindepth 1 -type d | while read dir; do
echo "Performing idl check compatibility with release: $dir:"

View File

@@ -138,6 +138,18 @@ const verifyNoBoundsAndFindsN = function(coll, expected, predicate, queryCollati
assert.eq(expected, coll.find(predicate).count(), "Didn't find the expected records");
};
const verifyNoTightBoundsAndFindsN = function(coll, expected, predicate, queryCollation) {
const res = queryCollation === undefined
? assert.commandWorked(coll.find(predicate).explain())
: assert.commandWorked(coll.find(predicate).collation(queryCollation).explain());
const min = res.queryPlanner.winningPlan.minRecord;
const max = res.queryPlanner.winningPlan.maxRecord;
assert.neq(null, min, "No min bound");
assert.neq(null, max, "No max bound");
assert.neq(min, max, "COLLSCAN bounds are equal");
assert.eq(expected, coll.find(predicate).count(), "Didn't find the expected records");
};
const testBounds = function(coll, expected, defaultCollation) {
// Test non string types.
verifyHasBoundsAndFindsN(coll, 1, {_id: 5});
@@ -160,10 +172,10 @@ const testBounds = function(coll, expected, defaultCollation) {
verifyNoBoundsAndFindsN(coll, expected, {data: ["a", "B"]});
// Test non compatible query collations don't generate bounds
verifyNoBoundsAndFindsN(coll, expected, {_id: "A"}, incompatibleCollation);
verifyNoBoundsAndFindsN(coll, expected, {_id: {str: "A"}}, incompatibleCollation);
verifyNoBoundsAndFindsN(coll, expected, {_id: {strs: ["A", "b"]}}, incompatibleCollation);
verifyNoBoundsAndFindsN(coll, expected, {_id: {strs: ["a", "B"]}}, incompatibleCollation);
verifyNoTightBoundsAndFindsN(coll, expected, {_id: "A"}, incompatibleCollation);
verifyNoTightBoundsAndFindsN(coll, expected, {_id: {str: "A"}}, incompatibleCollation);
verifyNoTightBoundsAndFindsN(coll, expected, {_id: {strs: ["A", "b"]}}, incompatibleCollation);
verifyNoTightBoundsAndFindsN(coll, expected, {_id: {strs: ["a", "B"]}}, incompatibleCollation);
// Test compatible query collations generate bounds
verifyHasBoundsAndFindsN(coll, expected, {_id: "A"}, defaultCollation);

View File

@@ -50,8 +50,9 @@ const testClusteredCollectionBoundedScan = function(coll, clusterKey) {
assert(getPlanStage(expl, "CLUSTERED_IXSCAN"));
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("maxRecord"));
assert(!getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("minRecord"));
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("minRecord"));
assert.eq(10, getPlanStage(expl, "CLUSTERED_IXSCAN").maxRecord);
assert.eq(NaN, getPlanStage(expl, "CLUSTERED_IXSCAN").minRecord);
assert.eq(expectedNReturned, expl.executionStats.executionStages.nReturned);
assert.eq(expectedDocsExamined, expl.executionStats.executionStages.docsExamined);
@@ -65,8 +66,9 @@ const testClusteredCollectionBoundedScan = function(coll, clusterKey) {
}));
assert(getPlanStage(expl, "CLUSTERED_IXSCAN"));
assert(!getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("maxRecord"));
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("maxRecord"));
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("minRecord"));
assert.eq(Infinity, getPlanStage(expl, "CLUSTERED_IXSCAN").maxRecord);
assert.eq(89, getPlanStage(expl, "CLUSTERED_IXSCAN").minRecord);
assert.eq(expectedNReturned, expl.executionStats.executionStages.nReturned);

View File

@@ -138,8 +138,12 @@ function testClusteredCollectionHint(coll, clusterKey, clusterKeyName) {
filter: {[clusterKeyFieldName]: {$lt: arbitraryDocId}},
hint: clusterKey,
},
expectedWinningPlanStats:
{stage: "CLUSTERED_IXSCAN", direction: "forward", maxRecord: arbitraryDocId}
expectedWinningPlanStats: {
stage: "CLUSTERED_IXSCAN",
direction: "forward",
minRecord: NaN,
maxRecord: arbitraryDocId
}
});
validateClusteredCollectionHint(coll, {
expectedNReturned: batchSize - arbitraryDocId,
@@ -148,8 +152,12 @@ function testClusteredCollectionHint(coll, clusterKey, clusterKeyName) {
filter: {[clusterKeyFieldName]: {$gte: arbitraryDocId}},
hint: clusterKey,
},
expectedWinningPlanStats:
{stage: "CLUSTERED_IXSCAN", direction: "forward", minRecord: arbitraryDocId}
expectedWinningPlanStats: {
stage: "CLUSTERED_IXSCAN",
direction: "forward",
minRecord: arbitraryDocId,
maxRecord: Infinity
}
});
// Find with $natural hints.
@@ -282,15 +290,15 @@ function validateClusteredCollectionHint(coll,
assert.neq(null, stageOfInterest);
for (const [key, value] of Object.entries(expectedWinningPlanStats)) {
assert(stageOfInterest[key], tojson(explain));
assert(stageOfInterest[key] !== undefined, tojson(explain));
assert.eq(stageOfInterest[key], value, tojson(explain));
}
// Explicitly check that the plan is not bounded by default.
if (!expectedWinningPlanStats.hasOwnProperty("minRecord")) {
assert(!actualWinningPlan["minRecord"], tojson(explain));
assert(!actualWinningPlan.hasOwnProperty("minRecord"), tojson(explain));
}
if (!expectedWinningPlanStats.hasOwnProperty("maxRecord")) {
assert(!actualWinningPlan["maxRecord"], tojson(explain));
assert(!actualWinningPlan.hasOwnProperty("maxRecord"), tojson(explain));
}
}

View File

@@ -1,5 +1,5 @@
/**
* Test the shell's --apiVersion and other options related to the MongoDB Versioned API, and
* Test the shell's --apiVersion and other options related to the MongoDB Stable API, and
* test passing API parameters to the Mongo() constructor.
*
* @tags: [

View File

@@ -0,0 +1,73 @@
/**
* Tests the collectionUUID parameter of the refineCollectionShardKey command.
*
* @tags: [
* featureFlagCommandsAcceptCollectionUUID,
* ]
*/
(function() {
'use strict';
const st = new ShardingTest({shards: 1});
const mongos = st.s0;
const db = mongos.getDB(jsTestName());
const coll = db['coll'];
assert.commandWorked(mongos.adminCommand({enableSharding: db.getName()}));
const oldKeyDoc = {
_id: 1,
a: 1
};
const newKeyDoc = {
_id: 1,
a: 1,
b: 1,
c: 1
};
const resetColl = function(shardedColl) {
shardedColl.drop();
assert.commandWorked(shardedColl.insert({_id: 0, a: 1, b: 2, c: 3}));
assert.commandWorked(mongos.getCollection(shardedColl.getFullName()).createIndex(newKeyDoc));
assert.commandWorked(
mongos.adminCommand({shardCollection: shardedColl.getFullName(), key: oldKeyDoc}));
};
const uuid = function() {
return assert.commandWorked(db.runCommand({listCollections: 1}))
.cursor.firstBatch.find(c => c.name === coll.getName())
.info.uuid;
};
resetColl(coll);
// The command succeeds when provided with the correct collection UUID.
assert.commandWorked(mongos.adminCommand(
{refineCollectionShardKey: coll.getFullName(), key: newKeyDoc, collectionUUID: uuid()}));
// The command fails when provided with a UUID with no corresponding collection.
resetColl(coll);
const nonexistentUUID = UUID();
let res = assert.commandFailedWithCode(mongos.adminCommand({
refineCollectionShardKey: coll.getFullName(),
key: newKeyDoc,
collectionUUID: nonexistentUUID,
}),
ErrorCodes.CollectionUUIDMismatch);
assert.eq(res.collectionUUID, nonexistentUUID);
assert.eq(res.actualNamespace, "");
// The command fails when provided with a different collection's UUID.
const coll2 = db['coll_2'];
resetColl(coll2);
res = assert.commandFailedWithCode(mongos.adminCommand({
refineCollectionShardKey: coll2.getFullName(),
key: newKeyDoc,
collectionUUID: uuid(),
}),
ErrorCodes.CollectionUUIDMismatch);
assert.eq(res.collectionUUID, uuid());
assert.eq(res.actualNamespace, coll.getFullName());
st.stop();
})();

View File

@@ -124,7 +124,7 @@ function runFindAndModifyCmdSuccess(st,
if (upsert) {
assert.eq(null, res);
} else {
assert(resultsEq([res], oldDoc));
assert(resultsEq([res], oldDoc), tojson([res, oldDoc]));
}
}
}

View File

@@ -201,8 +201,9 @@ changeShardKeyOptions.forEach(function(updateConfig) {
if (!isFindAndModify) {
assertCannotUpdateWithMultiTrue(
st, kDbName, ns, session, sessionDB, runInTxn, {"x": 300}, {"$set": {"x": 30}});
changeShardKeyWhenFailpointsSet(session, sessionDB, runInTxn, isFindAndModify);
}
changeShardKeyWhenFailpointsSet(session, sessionDB, runInTxn, isFindAndModify);
}
});

View File

@@ -37,7 +37,7 @@ structs:
# is like MongoClient(uri, api={version: "1", strict: true, deprecationErrors: true}).
ClientAPIVersionParameters:
description: "Parser for Versioned API parameters passed to 'new Mongo()' in the mongo shell"
description: "Parser for Stable API parameters passed to 'new Mongo()' in the mongo shell"
strict: true
fields:
version:

View File

@@ -839,7 +839,6 @@ env.Library(
'repl/repl_coordinator_interface',
'service_context',
'shared_request_handling',
'write_ops',
],
)

View File

@@ -1,4 +1,4 @@
# MongoDB Versioned API
# MongoDB Stable API
The MongoDB API is the user-visible behavior of all commands, including their parameters and reply
fields. An "API version" is a subset of the API for which we make an especially strong promise: For
@@ -84,7 +84,7 @@ This also applies to any `unstable` fields. This is to prevent unexpected errors
a field from `unstable` to `stable`. By intentionally opting in, we assume the implementer
understands the implications and has valid reasons to use `any`.
## Versioned API implementation
## Stable API implementation
All `Command` subclasses implement `apiVersions()`, which returns the set of API versions the
command is part of. By default, a command is not included in any API version, meaning it has no

View File

@@ -46,6 +46,7 @@
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/index_bounds_builder.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/record_id_helpers.h"
@@ -174,7 +175,7 @@ bool Helpers::findById(OperationContext* opCtx,
if (collection->isClustered()) {
Snapshotted<BSONObj> doc;
if (collection->findDoc(opCtx,
RecordId(record_id_helpers::keyForElem(
record_id_helpers::keyForObj(IndexBoundsBuilder::objFromElement(
query["_id"], collection->getDefaultCollator())),
&doc)) {
result = std::move(doc.value());
@@ -200,8 +201,8 @@ RecordId Helpers::findById(OperationContext* opCtx,
if (!desc && clustered_util::isClusteredOnId(collection->getClusteredInfo())) {
// There is no explicit IndexDescriptor for _id on a collection clustered by _id. However,
// the RecordId can be constructed directly from the input.
return RecordId(
record_id_helpers::keyForElem(idquery["_id"], collection->getDefaultCollator()));
return record_id_helpers::keyForObj(
IndexBoundsBuilder::objFromElement(idquery["_id"], collection->getDefaultCollator()));
}
uassert(13430, "no _id index", desc);

View File

@@ -42,6 +42,7 @@
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/record_id_helpers.h"
#include "mongo/db/repl/optime.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -202,13 +203,13 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::FORWARD &&
_params.minRecord) {
// Seek to the approximate start location.
record = _cursor->seekNear(*_params.minRecord);
record = _cursor->seekNear(_params.minRecord->recordId());
}
if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::BACKWARD &&
_params.maxRecord) {
// Seek to the approximate start location (at the end).
record = _cursor->seekNear(*_params.maxRecord);
record = _cursor->seekNear(_params.maxRecord->recordId());
}
if (!record) {
@@ -304,7 +305,7 @@ bool pastEndOfRange(const CollectionScanParams& params, const WorkingSetMember&
return false;
}
auto endRecord = *params.maxRecord;
auto endRecord = params.maxRecord->recordId();
return member.recordId > endRecord ||
(member.recordId == endRecord && !shouldIncludeEndRecord(params));
} else {
@@ -312,7 +313,7 @@ bool pastEndOfRange(const CollectionScanParams& params, const WorkingSetMember&
if (!params.minRecord) {
return false;
}
auto endRecord = *params.minRecord;
auto endRecord = params.minRecord->recordId();
return member.recordId < endRecord ||
(member.recordId == endRecord && !shouldIncludeEndRecord(params));
@@ -326,7 +327,7 @@ bool beforeStartOfRange(const CollectionScanParams& params, const WorkingSetMemb
return false;
}
auto startRecord = *params.minRecord;
auto startRecord = params.minRecord->recordId();
return member.recordId < startRecord ||
(member.recordId == startRecord && !shouldIncludeStartRecord(params));
} else {
@@ -334,7 +335,7 @@ bool beforeStartOfRange(const CollectionScanParams& params, const WorkingSetMemb
if (!params.maxRecord) {
return false;
}
auto startRecord = *params.maxRecord;
auto startRecord = params.maxRecord->recordId();
return member.recordId > startRecord ||
(member.recordId == startRecord && !shouldIncludeStartRecord(params));
}

View File

@@ -30,6 +30,7 @@
#pragma once
#include "mongo/bson/timestamp.h"
#include "mongo/db/query/record_id_bound.h"
#include "mongo/db/record_id.h"
namespace mongo {
@@ -54,7 +55,7 @@ struct CollectionScanParams {
// be used for scans on clustered collections and forward oplog scans. If exclusive
// bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
// cannot be used in conjunction with 'resumeAfterRecordId'
boost::optional<RecordId> minRecord;
boost::optional<RecordIdBound> minRecord;
// If present, this parameter sets the start point of a reverse scan or the end point of a
// forward scan. A forward scan will stop and return EOF on the first document with a RecordId
@@ -63,7 +64,7 @@ struct CollectionScanParams {
// only be used for scans on clustered collections and forward oplog scans. If exclusive
// bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
// cannot be used in conjunction with 'resumeAfterRecordId'.
boost::optional<RecordId> maxRecord;
boost::optional<RecordIdBound> maxRecord;
// If true, the collection scan will return a token that can be used to resume the scan.
bool requestResumeToken = false;

View File

@@ -38,6 +38,7 @@
#include "mongo/db/index/multikey_paths.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/record_id_bound.h"
#include "mongo/db/query/stage_types.h"
#include "mongo/db/record_id.h"
#include "mongo/util/container_size_helper.h"
@@ -297,10 +298,10 @@ struct CollectionScanStats : public SpecificStats {
bool tailable{false};
// The start location of a forward scan and end location for a reverse scan.
boost::optional<RecordId> minRecord;
boost::optional<RecordIdBound> minRecord;
// The end location of a reverse scan and start location for a forward scan.
boost::optional<RecordId> maxRecord;
boost::optional<RecordIdBound> maxRecord;
};
struct CountStats : public SpecificStats {

View File

@@ -49,7 +49,7 @@ namespace mongo {
namespace {
RecordId toRecordId(ChangeStreamPreImageId id) {
return record_id_helpers::keyForElem(
BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement(), nullptr);
BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement());
}
/**
@@ -153,10 +153,10 @@ public:
while (true) {
// Fetch the first pre-image from the next collection, that has pre-images enabled.
auto planExecutor = _previousCollectionUUID
? createCollectionScan(
? createCollectionScan(RecordIdBound(
toRecordId(ChangeStreamPreImageId(*_previousCollectionUUID,
Timestamp::max(),
std::numeric_limits<int64_t>::max())))
std::numeric_limits<int64_t>::max()))))
: createCollectionScan(boost::none);
auto preImageAttributes = getNextPreImageAttributes(planExecutor);
@@ -185,10 +185,10 @@ public:
// '_earliestOplogEntryTimestamp', as the pre-images with smaller or equal
// timestamp are guaranteed to be expired.
Timestamp lastExpiredPreimageTs(_earliestOplogEntryTimestamp.asULL() - 1);
auto planExecutor = createCollectionScan(
auto planExecutor = createCollectionScan(RecordIdBound(
toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
lastExpiredPreimageTs,
std::numeric_limits<int64_t>::max())));
std::numeric_limits<int64_t>::max()))));
// Iterate over all the expired pre-images in the collection in order to find
// the max RecordId.
@@ -233,9 +233,10 @@ public:
preImageAttributes.operationTime <= expirationTime;
}
// Set up the new collection scan to start from the 'minKey'.
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan(
boost::optional<RecordId> minKey) const {
boost::optional<RecordIdBound> minKey) const {
return InternalPlanner::collectionScan(_opCtx,
_preImagesCollPtr,
PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
@@ -315,6 +316,7 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
// TODO SERVER-58693: pass expiration duration parameter to the iterator.
ChangeStreamExpiredPreImageIterator expiredPreImages(
opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs);
for (const auto& collectionRange : expiredPreImages) {
writeConflictRetry(opCtx.get(),
"ChangeStreamExpiredPreImagesRemover",
@@ -329,8 +331,8 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
std::move(params),
PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
InternalPlanner::Direction::FORWARD,
collectionRange.first,
collectionRange.second);
RecordIdBound(collectionRange.first),
RecordIdBound(collectionRange.second));
numberOfRemovals += exec->executeDelete();
});
}

View File

@@ -170,7 +170,7 @@ public:
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
_params.assertTsHasNotFallenOffOplog = Timestamp(0);
_params.shouldTrackLatestOplogTimestamp = true;
_params.minRecord = RecordId(0);
_params.minRecord = RecordIdBound(RecordId(0));
_params.tailable = true;
}
@@ -178,7 +178,7 @@ public:
invariant(!_collScan);
_filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime));
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
_params.minRecord = RecordId(resumeToken.clusterTime.asLL());
_params.minRecord = RecordIdBound(RecordId(resumeToken.clusterTime.asLL()));
_params.assertTsHasNotFallenOffOplog = resumeToken.clusterTime;
}

View File

@@ -79,12 +79,12 @@ CollectionScanParams convertIndexScanParamsToCollScanParams(
clustered_util::matchesClusterKey(keyPattern, collection->getClusteredInfo()));
invariant(collection->getDefaultCollator() == nullptr);
boost::optional<RecordId> startRecord, endRecord;
boost::optional<RecordIdBound> startRecord, endRecord;
if (!startKey.isEmpty()) {
startRecord = RecordId(record_id_helpers::keyForElem(startKey.firstElement(), nullptr));
startRecord = RecordIdBound(record_id_helpers::keyForElem(startKey.firstElement()));
}
if (!endKey.isEmpty()) {
endRecord = RecordId(record_id_helpers::keyForElem(endKey.firstElement(), nullptr));
endRecord = RecordIdBound(record_id_helpers::keyForElem(endKey.firstElement()));
}
// For a forward scan, the startKey is the minRecord. For a backward scan, it is the maxRecord.
@@ -93,7 +93,7 @@ CollectionScanParams convertIndexScanParamsToCollScanParams(
if (minRecord && maxRecord) {
// Regardless of direction, the minRecord should always be less than the maxRecord
dassert(minRecord < maxRecord,
dassert(minRecord->recordId() < maxRecord->recordId(),
str::stream() << "Expected the minRecord " << minRecord
<< " to be less than the maxRecord " << maxRecord
<< " on a bounded collection scan. Original startKey and endKey for "
@@ -105,6 +105,7 @@ CollectionScanParams convertIndexScanParamsToCollScanParams(
CollectionScanParams params;
params.minRecord = minRecord;
params.maxRecord = maxRecord;
if (InternalPlanner::FORWARD == direction) {
params.direction = CollectionScanParams::FORWARD;
} else {
@@ -120,8 +121,8 @@ CollectionScanParams createCollectionScanParams(
const CollectionPtr* coll,
InternalPlanner::Direction direction,
boost::optional<RecordId> resumeAfterRecordId,
boost::optional<RecordId> minRecord,
boost::optional<RecordId> maxRecord) {
boost::optional<RecordIdBound> minRecord,
boost::optional<RecordIdBound> maxRecord) {
const auto& collection = *coll;
invariant(collection);
@@ -146,8 +147,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
PlanYieldPolicy::YieldPolicy yieldPolicy,
const Direction direction,
boost::optional<RecordId> resumeAfterRecordId,
boost::optional<RecordId> minRecord,
boost::optional<RecordId> maxRecord) {
boost::optional<RecordIdBound> minRecord,
boost::optional<RecordIdBound> maxRecord) {
const auto& collection = *coll;
invariant(collection);
@@ -205,8 +206,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith
std::unique_ptr<DeleteStageParams> params,
PlanYieldPolicy::YieldPolicy yieldPolicy,
Direction direction,
boost::optional<RecordId> minRecord,
boost::optional<RecordId> maxRecord) {
boost::optional<RecordIdBound> minRecord,
boost::optional<RecordIdBound> maxRecord) {
const auto& collection = *coll;
invariant(collection);
auto ws = std::make_unique<WorkingSet>();

View File

@@ -78,8 +78,8 @@ public:
PlanYieldPolicy::YieldPolicy yieldPolicy,
Direction direction = FORWARD,
boost::optional<RecordId> resumeAfterRecordId = boost::none,
boost::optional<RecordId> minRecord = boost::none,
boost::optional<RecordId> maxRecord = boost::none);
boost::optional<RecordIdBound> minRecord = boost::none,
boost::optional<RecordIdBound> maxRecord = boost::none);
static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> collectionScan(
OperationContext* opCtx,
@@ -96,8 +96,8 @@ public:
std::unique_ptr<DeleteStageParams> deleteStageParams,
PlanYieldPolicy::YieldPolicy yieldPolicy,
Direction direction = FORWARD,
boost::optional<RecordId> minRecord = boost::none,
boost::optional<RecordId> maxRecord = boost::none);
boost::optional<RecordIdBound> minRecord = boost::none,
boost::optional<RecordIdBound> maxRecord = boost::none);
/**
* Returns an index scan. Caller owns returned pointer.

View File

@@ -257,10 +257,10 @@ void statsToBSON(const PlanStageStats& stats,
CollectionScanStats* spec = static_cast<CollectionScanStats*>(stats.specific.get());
bob->append("direction", spec->direction > 0 ? "forward" : "backward");
if (spec->minRecord) {
record_id_helpers::appendToBSONAs(*spec->minRecord, bob, "minRecord");
spec->minRecord->appendToBSONAs(bob, "minRecord");
}
if (spec->maxRecord) {
record_id_helpers::appendToBSONAs(*spec->maxRecord, bob, "maxRecord");
spec->maxRecord->appendToBSONAs(bob, "maxRecord");
}
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
bob->appendNumber("docsExamined", static_cast<long long>(spec->docsTested));

View File

@@ -73,10 +73,10 @@ void statsToBSON(const QuerySolutionNode* node,
auto csn = static_cast<const CollectionScanNode*>(node);
bob->append("direction", csn->direction > 0 ? "forward" : "backward");
if (csn->minRecord) {
record_id_helpers::appendToBSONAs(*csn->minRecord, bob, "minRecord");
csn->minRecord->appendToBSONAs(bob, "minRecord");
}
if (csn->maxRecord) {
record_id_helpers::appendToBSONAs(*csn->maxRecord, bob, "maxRecord");
csn->maxRecord->appendToBSONAs(bob, "maxRecord");
}
break;
}

View File

@@ -228,6 +228,20 @@ bool affectedByCollator(const BSONElement& element) {
}
}
void setMinRecord(CollectionScanNode* collScan, const BSONObj& min) {
const auto newMinRecord = record_id_helpers::keyForObj(min);
if (!collScan->minRecord || newMinRecord > collScan->minRecord->recordId()) {
collScan->minRecord = RecordIdBound(newMinRecord, min);
}
}
void setMaxRecord(CollectionScanNode* collScan, const BSONObj& max) {
const auto newMaxRecord = record_id_helpers::keyForObj(max);
if (!collScan->maxRecord || newMaxRecord < collScan->maxRecord->recordId()) {
collScan->maxRecord = RecordIdBound(newMaxRecord, max);
}
}
// Returns whether element is not affected by collators or query and collection collators are
// compatible.
bool compatibleCollator(const QueryPlannerParams& params,
@@ -267,30 +281,12 @@ void handleRIDRangeMinMax(const CanonicalQuery& query,
// Assumes clustered collection scans are only supported with the forward direction.
collScan->boundInclusion =
CollectionScanParams::ScanBoundInclusion::kIncludeStartRecordOnly;
newMaxRecord = record_id_helpers::keyForElem(maxObj.firstElement(), collator);
setMaxRecord(collScan, IndexBoundsBuilder::objFromElement(maxObj.firstElement(), collator));
}
if (!minObj.isEmpty() && compatibleCollator(params, collator, minObj.firstElement())) {
// The min() is inclusive as are bounded collection scans by default.
newMinRecord = record_id_helpers::keyForElem(minObj.firstElement(), collator);
}
if (!collScan->minRecord) {
collScan->minRecord = newMinRecord;
} else if (newMinRecord) {
if (*newMinRecord > *collScan->minRecord) {
// The newMinRecord is more restrictive than the existing minRecord.
collScan->minRecord = newMinRecord;
}
}
if (!collScan->maxRecord) {
collScan->maxRecord = newMaxRecord;
} else if (newMaxRecord) {
if (*newMaxRecord < *collScan->maxRecord) {
// The newMaxRecord is more restrictive than the existing maxRecord.
collScan->maxRecord = newMaxRecord;
}
setMinRecord(collScan, IndexBoundsBuilder::objFromElement(minObj.firstElement(), collator));
}
}
@@ -329,24 +325,31 @@ void handleRIDRangeScan(const MatchExpression* conjunct,
}
const auto& element = match->getData();
// Set coarse min/max bounds based on type in case we can't set tight bounds.
BSONObjBuilder minb;
minb.appendMinForType("", element.type());
setMinRecord(collScan, minb.obj());
BSONObjBuilder maxb;
maxb.appendMaxForType("", element.type());
setMaxRecord(collScan, maxb.obj());
bool compatible = compatibleCollator(params, collator, element);
if (!compatible) {
return; // Collator affects probe and it's not compatible with collection's collator.
}
auto& maxRecord = collScan->maxRecord;
auto& minRecord = collScan->minRecord;
const auto collated = IndexBoundsBuilder::objFromElement(element, collator);
if (dynamic_cast<const EqualityMatchExpression*>(match)) {
minRecord = record_id_helpers::keyForElem(element, collator);
maxRecord = minRecord;
} else if (!maxRecord &&
(dynamic_cast<const LTMatchExpression*>(match) ||
dynamic_cast<const LTEMatchExpression*>(match))) {
maxRecord = record_id_helpers::keyForElem(element, collator);
} else if (!minRecord &&
(dynamic_cast<const GTMatchExpression*>(match) ||
dynamic_cast<const GTEMatchExpression*>(match))) {
minRecord = record_id_helpers::keyForElem(element, collator);
setMinRecord(collScan, collated);
setMaxRecord(collScan, collated);
} else if (dynamic_cast<const LTMatchExpression*>(match) ||
dynamic_cast<const LTEMatchExpression*>(match)) {
setMaxRecord(collScan, collated);
} else if (dynamic_cast<const GTMatchExpression*>(match) ||
dynamic_cast<const GTEMatchExpression*>(match)) {
setMinRecord(collScan, collated);
}
}
@@ -398,7 +401,7 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
if (minTs) {
StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*minTs);
if (goal.isOK()) {
csn->minRecord = goal.getValue();
csn->minRecord = RecordIdBound(goal.getValue());
}
if (assertMinTsHasNotFallenOffOplog) {
@@ -408,7 +411,7 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
if (maxTs) {
StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*maxTs);
if (goal.isOK()) {
csn->maxRecord = goal.getValue();
csn->maxRecord = RecordIdBound(goal.getValue());
}
}
}

View File

@@ -40,6 +40,7 @@
#include "mongo/db/query/classic_plan_cache.h"
#include "mongo/db/query/index_bounds.h"
#include "mongo/db/query/plan_enumerator_explain_info.h"
#include "mongo/db/query/record_id_bound.h"
#include "mongo/db/query/stage_types.h"
#include "mongo/util/id_generator.h"
@@ -444,11 +445,11 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet {
// If present, this parameter sets the start point of a forward scan or the end point of a
// reverse scan.
boost::optional<RecordId> minRecord;
boost::optional<RecordIdBound> minRecord;
// If present, this parameter sets the start point of a reverse scan or the end point of a
// forward scan.
boost::optional<RecordId> maxRecord;
boost::optional<RecordIdBound> maxRecord;
// If true, the collection scan will return a token that can be used to resume the scan.
bool requestResumeToken = false;

View File

@@ -0,0 +1,97 @@
/**
* Copyright (C) 2022-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include <boost/optional.hpp>
#include <fmt/format.h>
#include <ostream>
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/builder.h"
#include "mongo/db/record_id.h"
#include "mongo/db/record_id_helpers.h"
#include "mongo/db/storage/key_string.h"
namespace mongo {
/**
* A RecordId bound for a collection scan, with an optional BSON representation for pretty printing.
*/
class RecordIdBound {
public:
RecordIdBound() = default;
explicit RecordIdBound(RecordId&& recordId, boost::optional<BSONObj> bson = boost::none)
: _recordId(recordId), _bson(bson) {}
explicit RecordIdBound(const RecordId& recordId, boost::optional<BSONObj> bson = boost::none)
: _recordId(recordId), _bson(bson) {}
RecordId recordId() const {
return _recordId;
}
/**
* Appends a BSON respresentation of the bound to a BSONObjBuilder. If one is not explicitily
* provided it reconstructs it from the RecordId.
*/
void appendToBSONAs(BSONObjBuilder* builder, StringData fieldName) const {
if (_bson) {
builder->appendAs(_bson->firstElement(), fieldName);
} else {
record_id_helpers::appendToBSONAs(_recordId, builder, fieldName);
}
}
std::string toString() const {
return _recordId.toString();
}
/**
* Compares the underlying RecordIds.
*/
int compare(const RecordIdBound& rhs) const {
return _recordId.compare(rhs._recordId);
}
private:
RecordId _recordId;
boost::optional<BSONObj> _bson;
};
inline StringBuilder& operator<<(StringBuilder& stream, const RecordIdBound& id) {
return stream << "RecordIdBound(" << id.toString() << ')';
}
inline std::ostream& operator<<(std::ostream& stream, const RecordIdBound& id) {
return stream << "RecordIdBound(" << id.toString() << ')';
}
} // namespace mongo

View File

@@ -45,6 +45,7 @@
#include "mongo/db/query/sbe_stage_builder.h"
#include "mongo/db/query/sbe_stage_builder_filter.h"
#include "mongo/db/query/util/make_data_structure.h"
#include "mongo/db/record_id_helpers.h"
#include "mongo/logv2/log.h"
#include "mongo/util/str.h"
@@ -272,7 +273,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
return {state.slotId(), makeConstant(tag, val)};
} else if (csn->minRecord) {
auto cursor = collection->getRecordStore()->getCursor(state.opCtx);
auto startRec = cursor->seekNear(*csn->minRecord);
auto startRec = cursor->seekNear(csn->minRecord->recordId());
if (startRec) {
LOGV2_DEBUG(205841, 3, "Using direct oplog seek");
auto [tag, val] = sbe::value::makeCopyRecordId(startRec->id);
@@ -437,7 +438,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
std::move(stage),
makeBinaryOp(sbe::EPrimBinary::lessEq,
makeVariable(*tsSlot),
makeConstant(sbe::value::TypeTags::Timestamp, csn->maxRecord->getLong())),
makeConstant(sbe::value::TypeTags::Timestamp,
csn->maxRecord->recordId().getLong())),
csn->nodeId());
}

View File

@@ -85,8 +85,8 @@ public:
}
/**
* Construct a RecordId that holds a small binary string. The raw value for RecordStore storage
* may be retrieved using getStr().
* Construct a RecordId that holds a binary string. The raw value for RecordStore storage may be
* retrieved using getStr().
*/
explicit RecordId(const char* str, int32_t size) {
invariant(size > 0, "key size must be greater than 0");
@@ -209,8 +209,8 @@ public:
}
/**
* Compares two RecordIds. Requires that both RecordIds are of the same format, unless one or
* both are null. Null always compares less than every other RecordId format.
* Compares two RecordIds. Requires that both RecordIds are of the same "type" (long or string).
* Null is always comparable and is less than every other RecordId format.
*/
int compare(const RecordId& rhs) const {
switch (_format) {

View File

@@ -51,20 +51,21 @@ StatusWith<RecordId> keyForOptime(const Timestamp& opTime) {
// don't sort differently when put in a RecordId. It also avoids issues with Null/Invalid
// RecordIds
if (opTime.getSecs() > uint32_t(std::numeric_limits<int32_t>::max()))
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts secs too high");
return {ErrorCodes::BadValue, "ts secs too high"};
if (opTime.getInc() > uint32_t(std::numeric_limits<int32_t>::max()))
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts inc too high");
return {ErrorCodes::BadValue, "ts inc too high"};
const RecordId out = RecordId(opTime.getSecs(), opTime.getInc());
const auto out = RecordId(opTime.getSecs(), opTime.getInc());
if (out <= RecordId::minLong())
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts too low");
return {ErrorCodes::BadValue, "ts too low"};
if (out >= RecordId::maxLong())
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts too high");
return {ErrorCodes::BadValue, "ts too high"};
return StatusWith<RecordId>(out);
return out;
}
/**
* data and len must be the arguments from RecordStore::insert() on an oplog collection.
*/
@@ -77,9 +78,9 @@ StatusWith<RecordId> extractKeyOptime(const char* data, int len) {
const BSONObj obj(data);
const BSONElement elem = obj["ts"];
if (elem.eoo())
return StatusWith<RecordId>(ErrorCodes::BadValue, "no ts field");
return {ErrorCodes::BadValue, "no ts field"};
if (elem.type() != bsonTimestamp)
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts must be a Timestamp");
return {ErrorCodes::BadValue, "ts must be a Timestamp"};
return keyForOptime(elem.timestamp());
}
@@ -96,25 +97,28 @@ StatusWith<RecordId> keyForDoc(const BSONObj& doc,
str::stream() << "Document " << redact(doc) << " is missing the '"
<< clusterKeyField << "' field"};
}
if (collator) {
BSONObjBuilder out;
CollationIndexKey::collationAwareIndexKeyAppend(keyElement, collator, &out);
return keyForElem(out.done().firstElement());
}
return keyForElem(keyElement, collator);
return keyForElem(keyElement);
}
RecordId keyForElem(const BSONElement& elem, const CollatorInterface* collator) {
RecordId keyForElem(const BSONElement& elem) {
// Intentionally discard the TypeBits since the type information will be stored in the cluster
// key of the original document. The consequence of this behavior is that cluster key values
// that compare similarly, but are of different types may not be used concurrently.
KeyString::Builder keyBuilder(KeyString::Version::kLatestVersion);
if (collator) {
BSONObjBuilder out;
CollationIndexKey::collationAwareIndexKeyAppend(elem, collator, &out);
keyBuilder.appendBSONElement(out.done().firstElement());
} else {
keyBuilder.appendBSONElement(elem);
}
keyBuilder.appendBSONElement(elem);
return RecordId(keyBuilder.getBuffer(), keyBuilder.getSize());
}
RecordId keyForObj(const BSONObj& obj) {
return keyForElem(obj.firstElement());
}
RecordId keyForOID(OID oid) {
KeyString::Builder keyBuilder(KeyString::Version::kLatestVersion);
keyBuilder.appendOID(oid);

View File

@@ -39,6 +39,7 @@
namespace mongo {
class Timestamp;
class RecordId;
namespace record_id_helpers {
/**
@@ -53,7 +54,8 @@ StatusWith<RecordId> keyForOptime(const Timestamp& opTime);
StatusWith<RecordId> keyForDoc(const BSONObj& doc,
const ClusteredIndexSpec& indexSpec,
const CollatorInterface* collator);
RecordId keyForElem(const BSONElement& elem, const CollatorInterface* collator);
RecordId keyForElem(const BSONElement& elem);
RecordId keyForObj(const BSONObj& obj);
RecordId keyForOID(OID oid);
RecordId keyForDate(Date_t date);

View File

@@ -22,6 +22,9 @@ env.Library(
source=[
'tenant_migration_decoration.cpp',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
]
)
env.Library(

View File

@@ -247,14 +247,14 @@ DbCheckHasher::DbCheckHasher(OperationContext* opCtx,
InternalPlanner::IXSCAN_FETCH);
} else {
CollectionScanParams params;
params.minRecord = uassertStatusOK(
params.minRecord = RecordIdBound(uassertStatusOK(
record_id_helpers::keyForDoc(start.obj(),
collection->getClusteredInfo()->getIndexSpec(),
collection->getDefaultCollator()));
params.maxRecord = uassertStatusOK(
collection->getDefaultCollator())));
params.maxRecord = RecordIdBound(uassertStatusOK(
record_id_helpers::keyForDoc(end.obj(),
collection->getClusteredInfo()->getIndexSpec(),
collection->getDefaultCollator()));
collection->getDefaultCollator())));
params.boundInclusion = CollectionScanParams::ScanBoundInclusion::kIncludeEndRecordOnly;
_exec = InternalPlanner::collectionScan(
opCtx, &collection, params, PlanYieldPolicy::YieldPolicy::NO_YIELD);

View File

@@ -734,16 +734,12 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments(
"bounded collection scans only support forward scans");
}
auto collator = collection->getDefaultCollator();
boost::optional<RecordId> minRecord, maxRecord;
boost::optional<RecordIdBound> minRecord, maxRecord;
if (!startKey.isEmpty()) {
minRecord =
RecordId(record_id_helpers::keyForElem(startKey.firstElement(), collator));
minRecord = RecordIdBound(record_id_helpers::keyForObj(startKey));
}
if (!endKey.isEmpty()) {
maxRecord =
RecordId(record_id_helpers::keyForElem(endKey.firstElement(), collator));
maxRecord = RecordIdBound(record_id_helpers::keyForObj(endKey));
}
planExecutor = isFind

View File

@@ -31,7 +31,9 @@
#include "mongo/db/s/refine_collection_shard_key_coordinator.h"
#include "mongo/db/catalog/collection_uuid_mismatch.h"
#include "mongo/db/commands.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/s/dist_lock_manager.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/logv2/log.h"
@@ -134,6 +136,11 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
sharding_ddl_util::stopMigrations(opCtx, nss(), boost::none);
}
{
AutoGetCollection coll{opCtx, nss(), MODE_IS};
checkCollectionUUIDMismatch(opCtx, *coll, _doc.getCollectionUUID());
}
const auto cmdResponse = uassertStatusOK(configShard->runCommand(
opCtx,
ReadPreferenceSetting(ReadPreference::PrimaryOnly),

View File

@@ -48,6 +48,7 @@
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/reply_interface.h"
#include "mongo/s/is_mongos.h"
#include "mongo/stdx/future.h"
#include "mongo/transport/service_entry_point.h"
@@ -297,7 +298,7 @@ SemiFuture<BSONObj> Transaction::_commitOrAbort(StringData dbName, StringData cm
BSONObjBuilder cmdBuilder;
cmdBuilder.append(cmdName, 1);
cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern.toBSON());
cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern);
auto cmdObj = cmdBuilder.obj();
return _txnClient->runCommand(dbName, cmdObj).semi();
@@ -363,7 +364,7 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) {
if (_state == TransactionState::kInit) {
_state = TransactionState::kStarted;
_sessionInfo.setStartTransaction(boost::none);
cmdBuilder->append(_readConcern.toBSON().firstElement());
cmdBuilder->append(repl::ReadConcernArgs::kReadConcernFieldName, _readConcern);
}
_latestResponseHasTransientTransactionErrorLabel = false;
@@ -379,35 +380,22 @@ void Transaction::processResponse(const BSONObj& reply) {
}
}
void Transaction::_setSessionInfo(LogicalSessionId lsid,
TxnNumber txnNumber,
boost::optional<TxnRetryCounter> txnRetryCounter) {
void Transaction::_setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber) {
_sessionInfo.setSessionId(lsid);
_sessionInfo.setTxnNumber(txnNumber);
_sessionInfo.setTxnRetryCounter(txnRetryCounter ? *txnRetryCounter : 0);
}
void Transaction::primeForTransactionRetry() {
_latestResponseHasTransientTransactionErrorLabel = false;
switch (_execContext) {
case ExecutionContext::kOwnSession:
case ExecutionContext::kClientSession:
case ExecutionContext::kClientRetryableWrite:
// Advance txnNumber.
_sessionInfo.setTxnNumber(*_sessionInfo.getTxnNumber() + 1);
_sessionInfo.setStartTransaction(true);
_state = TransactionState::kInit;
return;
case ExecutionContext::kClientSession:
// Advance txnRetryCounter.
_sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1);
_sessionInfo.setStartTransaction(true);
_state = TransactionState::kInit;
return;
case ExecutionContext::kClientRetryableWrite:
// Advance txnRetryCounter.
_sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1);
_sessionInfo.setStartTransaction(true);
_state = TransactionState::kInit;
return;
case ExecutionContext::kClientTransaction:
// The outermost client handles retries.
MONGO_UNREACHABLE;
@@ -431,27 +419,27 @@ void Transaction::_primeTransaction(OperationContext* opCtx) {
auto clientSession = opCtx->getLogicalSessionId();
auto clientTxnNumber = opCtx->getTxnNumber();
auto clientInMultiDocumentTransaction = opCtx->inMultiDocumentTransaction();
auto clientTxnRetryCounter = opCtx->getTxnRetryCounter();
if (!clientSession) {
// TODO SERVER-61783: Integrate session pool.
_setSessionInfo(makeLogicalSessionId(opCtx), 0, 0);
_setSessionInfo(makeLogicalSessionId(opCtx), 0 /* txnNumber */);
_execContext = ExecutionContext::kOwnSession;
} else if (!clientTxnNumber) {
_setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0, 0);
_setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0 /* txnNumber */);
_execContext = ExecutionContext::kClientSession;
// TODO SERVER-59186: Handle client session case.
MONGO_UNREACHABLE;
} else if (!clientInMultiDocumentTransaction) {
_setSessionInfo(
makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber), 0, 0);
_setSessionInfo(makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber),
0 /* txnNumber */);
_execContext = ExecutionContext::kClientRetryableWrite;
// TODO SERVER-59186: Handle client retryable write case.
MONGO_UNREACHABLE;
// TODO SERVER-59186: Handle client retryable write case on mongod. This is different from
// mongos because only mongod checks out a transaction session for retryable writes.
invariant(isMongos(), "This case is not yet supported on a mongod");
} else {
_setSessionInfo(*clientSession, *clientTxnNumber, clientTxnRetryCounter);
_setSessionInfo(*clientSession, *clientTxnNumber);
_execContext = ExecutionContext::kClientTransaction;
// TODO SERVER-59186: Handle client transaction case.
@@ -460,9 +448,12 @@ void Transaction::_primeTransaction(OperationContext* opCtx) {
_sessionInfo.setStartTransaction(true);
_sessionInfo.setAutocommit(false);
// Extract non-session options.
_readConcern = repl::ReadConcernArgs::get(opCtx);
_writeConcern = opCtx->getWriteConcern();
// Extract non-session options. Strip provenance so it can be correctly inferred for the
// generated commands as if it came from an external client.
_readConcern = repl::ReadConcernArgs::get(opCtx).toBSONInner().removeField(
ReadWriteConcernProvenanceBase::kSourceFieldName);
_writeConcern = opCtx->getWriteConcern().toBSON().removeField(
ReadWriteConcernProvenanceBase::kSourceFieldName);
LOGV2_DEBUG(5875901,
0, // TODO SERVER-61781: Raise verbosity.

View File

@@ -282,8 +282,8 @@ public:
/**
* Handles the given transaction result based on where the transaction is in its lifecycle and
* its execution context, e.g. by updating its txnNumber or txnRetryCounter, and returns the
* next step for the transaction runner.
* its execution context, e.g. by updating its txnNumber, and returns the next step for the
* transaction runner.
*/
ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult) const;
@@ -318,9 +318,7 @@ private:
return std::make_unique<TxnMetadataHooks>(*this);
}
void _setSessionInfo(LogicalSessionId lsid,
TxnNumber txnNumber,
boost::optional<TxnRetryCounter> txnRetryCounter);
void _setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber);
SemiFuture<BSONObj> _commitOrAbort(StringData dbName, StringData cmdName);
@@ -337,8 +335,8 @@ private:
bool _latestResponseHasTransientTransactionErrorLabel{false};
OperationSessionInfo _sessionInfo;
repl::ReadConcernArgs _readConcern;
WriteConcernOptions _writeConcern;
BSONObj _writeConcern;
BSONObj _readConcern;
ExecutionContext _execContext;
TransactionState _state{TransactionState::kInit};
};

View File

@@ -121,7 +121,6 @@ namespace {
void assertTxnMetadata(BSONObj obj,
TxnNumber txnNumber,
boost::optional<TxnRetryCounter> txnRetryCounter,
boost::optional<bool> startTransaction,
boost::optional<BSONObj> readConcern = boost::none,
boost::optional<BSONObj> writeConcern = boost::none) {
@@ -129,12 +128,6 @@ void assertTxnMetadata(BSONObj obj,
ASSERT_EQ(obj["autocommit"].Bool(), false);
ASSERT_EQ(obj["txnNumber"].Long(), txnNumber);
if (txnRetryCounter) {
ASSERT_EQ(obj["txnRetryCounter"].Int(), *txnRetryCounter);
} else {
ASSERT(obj["txnRetryCounter"].eoo());
}
if (startTransaction) {
ASSERT_EQ(obj["startTransaction"].Bool(), *startTransaction);
} else {
@@ -190,13 +183,10 @@ protected:
opCtx(), InlineQueuedCountingExecutor::make(), std::move(mockClient));
}
void expectSentAbort(TxnNumber txnNumber,
TxnRetryCounter txnRetryCounter,
BSONObj writeConcern) {
void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
txnNumber,
txnRetryCounter,
boost::none /* startTransaction */,
boost::none /* readConcern */,
writeConcern);
@@ -220,10 +210,8 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
<< "documents" << BSON_ARRAY(BSON("x" << 1))))
.get();
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
0 /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
assertTxnMetadata(
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
mockClient()->setNextCommandResponse(kOKInsertResponse);
insertRes = txnClient
@@ -235,7 +223,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
0 /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */);
// The commit response.
@@ -248,7 +235,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
0 /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -284,7 +270,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -297,7 +282,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */);
// Throw a transient error to verify the retries behavior.
@@ -313,7 +297,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
writeConcern.toBSON());
@@ -352,7 +335,7 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnAbort) {
});
ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError);
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, writeConcern.toBSON());
expectSentAbort(0 /* txnNumber */, writeConcern.toBSON());
}
}
@@ -382,7 +365,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */,
readConcern.toBSONInner());
@@ -397,7 +379,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */);
// Throw a transient error to verify the retry will still use the read concern.
@@ -413,7 +394,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -441,7 +421,7 @@ TEST_F(TxnAPITest, OwnSession_AbortsOnError) {
});
ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError);
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
}
TEST_F(TxnAPITest, OwnSession_SkipsCommitIfNoCommandsWereRun) {
@@ -466,9 +446,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
attempt += 1;
if (attempt > 0) {
// Verify an abort was sent in between retries.
expectSentAbort(attempt - 1 /* txnNumber */,
0 /* txnRetryCounter */,
WriteConcernOptions().toBSON());
expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
}
mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse
@@ -492,7 +470,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
return SemiFuture<void>::makeReady();
@@ -503,7 +480,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -517,9 +493,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
attempt += 1;
if (attempt > 0) {
// Verify an abort was sent in between retries.
expectSentAbort(attempt - 1 /* txnNumber */,
0 /* txnRetryCounter */,
WriteConcernOptions().toBSON());
expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
}
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -534,7 +508,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
// The commit or implicit abort response.
@@ -549,7 +522,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -569,10 +541,8 @@ TEST_F(TxnAPITest, OwnSession_CommitError) {
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
0 /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
assertTxnMetadata(
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit response.
mockClient()->setNextCommandResponse(
@@ -587,7 +557,7 @@ TEST_F(TxnAPITest, OwnSession_CommitError) {
ASSERT(swResult.getValue().wcError.toStatus().isOK());
ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::InternalError);
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
}
TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
@@ -597,9 +567,7 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
attempt += 1;
if (attempt > 0) {
// Verify an abort was sent in between retries.
expectSentAbort(attempt - 1 /* txnNumber */,
0 /* txnRetryCounter */,
WriteConcernOptions().toBSON());
expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
}
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -614,7 +582,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
attempt /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
// Set commit and best effort abort response, if necessary.
@@ -632,7 +599,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
attempt /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -652,10 +618,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) {
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
0 /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
assertTxnMetadata(
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit response.
mockClient()->setNextCommandResponse(
@@ -669,7 +633,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
0 /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -687,10 +650,8 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) {
<< "documents" << BSON_ARRAY(BSON("x" << 1))))
.get();
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
0 /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
assertTxnMetadata(
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit response.
mockClient()->setNextCommandResponse(kResWithWriteConcernError);
@@ -703,7 +664,7 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) {
ASSERT_EQ(swResult.getValue().wcError.toStatus(), ErrorCodes::WriteConcernFailed);
ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::WriteConcernFailed);
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
}
TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
@@ -719,10 +680,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
0 /* txnNumber */,
0 /* txnRetryCounter */,
true /* startTransaction */);
assertTxnMetadata(
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
// The commit responses.
mockClient()->setNextCommandResponse(kResWithRetryableWriteConcernError);
@@ -735,7 +694,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
0 /* txnNumber */,
0 /* txnRetryCounter */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);

View File

@@ -446,24 +446,24 @@ private:
* delete entries of type 'ObjectId'. All other collections must only delete entries of type
* 'Date'.
*/
RecordId makeCollScanEndBound(const CollectionPtr& collection, Date_t expirationDate) {
RecordIdBound makeCollScanEndBound(const CollectionPtr& collection, Date_t expirationDate) {
if (collection->getTimeseriesOptions()) {
auto endOID = OID();
endOID.init(expirationDate, true /* max */);
return record_id_helpers::keyForOID(endOID);
return RecordIdBound(record_id_helpers::keyForOID(endOID));
}
return record_id_helpers::keyForDate(expirationDate);
return RecordIdBound(record_id_helpers::keyForDate(expirationDate));
}
RecordId makeCollScanStartBound(const CollectionPtr& collection, const Date_t startDate) {
RecordIdBound makeCollScanStartBound(const CollectionPtr& collection, const Date_t startDate) {
if (collection->getTimeseriesOptions()) {
auto startOID = OID();
startOID.init(startDate, false /* max */);
return record_id_helpers::keyForOID(startOID);
return RecordIdBound(record_id_helpers::keyForOID(startOID));
}
return record_id_helpers::keyForDate(startDate);
return RecordIdBound(record_id_helpers::keyForDate(startDate));
}
/*

View File

@@ -209,10 +209,10 @@ public:
_client.insert(ns.ns(), docs, ordered);
}
// Returns the recordId generated by doc, assuming there's no collation and doc takes the shape
// of {<cluster key> : <value>};
RecordId getRecordIdForClusteredDoc(const BSONObj& doc) {
return RecordId(record_id_helpers::keyForElem(doc.firstElement(), nullptr));
// Returns the recordId generated by doc, assuming doc takes the shape of {<cluster key> :
// <value>};
RecordIdBound getRecordIdForClusteredDoc(const BSONObj& doc) {
return RecordIdBound(record_id_helpers::keyForElem(doc.firstElement()));
}
// Performs a bounded collection scan from 'minRecord' to 'maxRecord' in the specified
@@ -231,8 +231,8 @@ public:
CollectionScanParams params;
params.tailable = false;
params.direction = direction;
params.minRecord = minRecord;
params.maxRecord = maxRecord;
params.minRecord = RecordIdBound(minRecord);
params.maxRecord = RecordIdBound(maxRecord);
WorkingSet ws;
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
@@ -261,8 +261,8 @@ public:
void runClusteredCollScanAndAssertContents(
const NamespaceString& ns,
CollectionScanParams::Direction direction,
boost::optional<RecordId> minRecord,
boost::optional<RecordId> maxRecord,
boost::optional<RecordIdBound> minRecord,
boost::optional<RecordIdBound> maxRecord,
CollectionScanParams::ScanBoundInclusion boundInclusion,
const vector<BSONObj>& expectedResults,
const MatchExpression* filter = nullptr) {
@@ -614,8 +614,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredMinMax) {
CollectionScanParams params;
params.direction = CollectionScanParams::FORWARD;
params.tailable = false;
params.minRecord = recordIds[0];
params.maxRecord = recordIds[recordIds.size() - 1];
params.minRecord = RecordIdBound(recordIds[0]);
params.maxRecord = RecordIdBound(recordIds[recordIds.size() - 1]);
WorkingSet ws;
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
@@ -768,8 +768,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredMinMaxDateExclusi
params.tailable = false;
params.direction = direction;
params.minRecord = record_id_helpers::keyForDate(minDate);
params.maxRecord = record_id_helpers::keyForDate(maxDate);
params.minRecord = RecordIdBound(record_id_helpers::keyForDate(minDate));
params.maxRecord = RecordIdBound(record_id_helpers::keyForDate(maxDate));
// Exclude all but the record with _id 'middleDate' from the scan.
StatusWithMatchExpression swMatch = MatchExpressionParser::parse(
@@ -821,8 +821,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredReverse) {
params.tailable = false;
// The last entry in recordIds is the lowest record in the collection and the first entry is the
// highest.
params.minRecord = recordIds[recordIds.size() - 1];
params.maxRecord = recordIds[0];
params.minRecord = RecordIdBound(recordIds[recordIds.size() - 1]);
params.maxRecord = RecordIdBound(recordIds[0]);
WorkingSet ws;
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
@@ -865,8 +865,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredMinMaxFullObjectI
params.tailable = false;
// Expect to see all records.
params.minRecord = record_id_helpers::keyForOID(OID());
params.maxRecord = record_id_helpers::keyForOID(OID::max());
params.minRecord = RecordIdBound(record_id_helpers::keyForOID(OID()));
params.maxRecord = RecordIdBound(record_id_helpers::keyForOID(OID::max()));
WorkingSet ws;
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
@@ -913,8 +913,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredInnerRange) {
ASSERT_LT(startOffset, recordIds.size());
ASSERT_LT(endOffset, recordIds.size());
params.minRecord = recordIds[startOffset];
params.maxRecord = recordIds[endOffset];
params.minRecord = RecordIdBound(recordIds[startOffset]);
params.maxRecord = RecordIdBound(recordIds[endOffset]);
WorkingSet ws;
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
@@ -962,15 +962,20 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredInnerRangeExclusi
ASSERT_LT(startOffset, recordIds.size());
ASSERT_LT(endOffset, recordIds.size());
params.minRecord = recordIds[startOffset];
params.maxRecord = recordIds[endOffset];
params.minRecord = RecordIdBound(recordIds[startOffset]);
params.maxRecord = RecordIdBound(recordIds[endOffset]);
// Provide RecordId bounds with exclusive filters.
StatusWithMatchExpression swMatch = MatchExpressionParser::parse(
fromjson(fmt::sprintf(
"{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
record_id_helpers::toBSONAs(*params.minRecord, "").firstElement().OID().toString(),
record_id_helpers::toBSONAs(*params.maxRecord, "").firstElement().OID().toString())),
fromjson(fmt::sprintf("{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
record_id_helpers::toBSONAs(params.minRecord->recordId(), "")
.firstElement()
.OID()
.toString(),
record_id_helpers::toBSONAs(params.maxRecord->recordId(), "")
.firstElement()
.OID()
.toString())),
_expCtx.get());
ASSERT_OK(swMatch.getStatus());
auto filter = std::move(swMatch.getValue());
@@ -1024,15 +1029,20 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredInnerRangeExclusi
// The last entry in recordIds is the lowest record in the collection and the first entry is the
// highest.
params.minRecord = recordIds[endOffset];
params.maxRecord = recordIds[startOffset];
params.minRecord = RecordIdBound(recordIds[endOffset]);
params.maxRecord = RecordIdBound(recordIds[startOffset]);
// Provide RecordId bounds with exclusive filters.
StatusWithMatchExpression swMatch = MatchExpressionParser::parse(
fromjson(fmt::sprintf(
"{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
record_id_helpers::toBSONAs(*params.minRecord, "").firstElement().OID().toString(),
record_id_helpers::toBSONAs(*params.maxRecord, "").firstElement().OID().toString())),
fromjson(fmt::sprintf("{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
record_id_helpers::toBSONAs(params.minRecord->recordId(), "")
.firstElement()
.OID()
.toString(),
record_id_helpers::toBSONAs(params.maxRecord->recordId(), "")
.firstElement()
.OID()
.toString())),
_expCtx.get());
ASSERT_OK(swMatch.getStatus());
auto filter = std::move(swMatch.getValue());

View File

@@ -1104,7 +1104,7 @@ commands:
anyTypeField: IDLAnyTypeOwned
AccessCheckNone:
description: A versioned API command with access_check and none
description: A Stable API command with access_check and none
command_name: AccessCheckNoneCommandName
namespace: ignored
strict: true
@@ -1114,7 +1114,7 @@ commands:
reply_type: OkReply
AccessCheckSimpleAccessCheck:
description: A versioned API command with access_check and simple check
description: A Stable API command with access_check and simple check
command_name: AccessCheckSimpleAccessCheckCommandName
namespace: ignored
strict: true
@@ -1125,7 +1125,7 @@ commands:
reply_type: OkReply
AccessCheckSimplePrivilege:
description: A versioned API command with access_check and privilege
description: A Stable API command with access_check and privilege
command_name: AccessCheckSimplePrivilegeCommandName
namespace: ignored
strict: true
@@ -1139,7 +1139,7 @@ commands:
AccessCheckComplexPrivilege:
description: A versioned API command with access_check complex
description: A Stable API command with access_check complex
command_name: AccessCheckComplexPrivilegeCommandName
namespace: ignored
strict: true
@@ -1159,10 +1159,10 @@ commands:
- check: is_authorized_to_parse_namespace_element
reply_type: OkReply
# Test that we correctly generate C++ base classes for versioned API commands with different
# Test that we correctly generate C++ base classes for Stable API commands with different
# key names, command names, and C++ names.
APIVersion1CommandIDLName:
description: A versioned API command
description: A Stable API command
command_name: APIVersion1CommandRuntimeName
namespace: ignored
strict: true
@@ -1172,7 +1172,7 @@ commands:
reply_type: OkReply
APIVersion1CommandIDLName2:
description: A versioned API command
description: A Stable API command
command_name: APIVersion1CommandRuntimeName2
cpp_name: APIVersion1CommandCPPName2
namespace: ignored
@@ -1184,7 +1184,7 @@ commands:
# Test whether the C++ code for a command with alias name is currently generated.
APIVersion1CommandWithAlias:
description: A versioned API command with alias
description: A Stable API command with alias
command_name: NewCommandName
command_alias: OldCommandName
namespace: ignored

View File

@@ -62,11 +62,9 @@
namespace mongo {
void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
const BSONElement& wcErrorElem,
BSONObjBuilder& responseBuilder) {
WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem);
void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId,
WriteConcernErrorDetail wcError,
BSONObjBuilder& responseBuilder) {
auto status = wcError.toStatus();
wcError.setStatus(
status.withReason(str::stream() << status.reason() << " at " << shardId.toString()));
@@ -74,6 +72,13 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
responseBuilder.append("writeConcernError", wcError.toBSON());
}
void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
const BSONElement& wcErrorElem,
BSONObjBuilder& responseBuilder) {
WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem);
appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, responseBuilder);
}
boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTargeter(
OperationContext* opCtx,
const NamespaceString& nss,

View File

@@ -52,6 +52,13 @@ struct RawResponsesResult {
boost::optional<Status> firstStaleConfigError;
};
/**
* This function appends the provided WriteConcernErrorDetail to the sharded response.
*/
void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId,
WriteConcernErrorDetail wcError,
BSONObjBuilder& responseBuilder);
/**
* This function appends the provided writeConcernError BSONElement to the sharded response.
*/

View File

@@ -119,6 +119,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ftdc/ftdc_server',
'$BUILD_DIR/mongo/db/index_commands_idl',
'$BUILD_DIR/mongo/db/initialize_api_parameters',
'$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/query/cursor_response_idl',
'$BUILD_DIR/mongo/db/query/map_reduce_output_format',
@@ -131,6 +132,7 @@ env.Library(
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
'$BUILD_DIR/mongo/db/transaction_api',
'$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/executor/async_multicaster',
'$BUILD_DIR/mongo/executor/async_request_executor',

View File

@@ -27,6 +27,8 @@
* it in the license file.
*/
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/base/status_with.h"
@@ -38,9 +40,13 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/update_metrics.h"
#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/ops/write_ops_gen.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/storage/duplicate_key_error_info.h"
#include "mongo/db/transaction_api.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog_cache.h"
@@ -122,6 +128,41 @@ BSONObj getShardKey(OperationContext* opCtx,
return shardKey;
}
void handleWouldChangeOwningShardErrorRetryableWrite(
OperationContext* opCtx,
const ShardId& shardId,
const NamespaceString& nss,
const write_ops::FindAndModifyCommandRequest& request,
BSONObjBuilder* result) {
auto txn = std::make_shared<txn_api::TransactionWithRetries>(
opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
auto swResult = txn->runSyncNoThrow(
opCtx,
[cmdObj = request.toBSON({}), nss, result](const txn_api::TransactionClient& txnClient,
ExecutorPtr txnExec) {
auto res = txnClient.runCommand(nss.db(), cmdObj).get();
uassertStatusOK(getStatusFromCommandResult(res));
result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(res));
return SemiFuture<void>::makeReady();
});
auto cmdStatus = swResult.getStatus();
if (cmdStatus != ErrorCodes::DuplicateKey ||
(cmdStatus == ErrorCodes::DuplicateKey &&
!cmdStatus.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
cmdStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
};
uassertStatusOK(cmdStatus);
const auto& wcError = swResult.getValue().wcError;
if (!wcError.toStatus().isOK()) {
appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, *result);
}
}
void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
const NamespaceString nss,
Status responseStatus,
@@ -402,47 +443,23 @@ private:
if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) {
if (isRetryableWrite) {
RouterOperationContextSession routerSession(opCtx);
try {
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
readConcernArgs =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
// Re-run the findAndModify command that will change the shard key value in a
// transaction. We call _runCommand recursively, and this second time through
// since it will be run as a transaction it will take the other code path to
// updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried
// operation does not include WC inside the transaction by stripping it from the
// cmdObj. The transaction commit will still use the WC, because it uses the WC
// from the opCtx (which has been set previously in Strategy).
documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
_runCommand(opCtx,
shardId,
shardVersion,
dbVersion,
nss,
stripWriteConcern(cmdObj),
result);
uassertStatusOK(getStatusFromCommandResult(result->asTempObj()));
auto commitResponse =
documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
uassertStatusOK(getStatusFromCommandResult(commitResponse));
if (auto wcErrorElem = commitResponse["writeConcernError"]) {
appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
}
} catch (DBException& e) {
if (e.code() != ErrorCodes::DuplicateKey ||
(e.code() == ErrorCodes::DuplicateKey &&
!e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
};
auto txnRouterForAbort = TransactionRouter::get(opCtx);
if (txnRouterForAbort)
txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus());
throw;
if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
serverGlobalParams.featureCompatibility)) {
auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse(
IDLParserErrorContext("ClusterFindAndModify"), cmdObj);
// Strip write concern because this command will be sent as part of a
// transaction and the write concern has already been loaded onto the opCtx and
// will be picked up by the transaction API.
//
// Strip runtime constants because they will be added again when this command is
// recursively sent through the service entry point.
parsedRequest.setWriteConcern(boost::none);
parsedRequest.setLegacyRuntimeConstants(boost::none);
handleWouldChangeOwningShardErrorRetryableWrite(
opCtx, shardId, nss, parsedRequest, result);
} else {
_handleWouldChangeOwningShardErrorRetryableWriteLegacy(
opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result);
}
} else {
updateShardKeyValueOnWouldChangeOwningShardError(
@@ -462,6 +479,53 @@ private:
CommandHelpers::filterCommandReplyForPassthrough(response.data));
}
// TODO SERVER-62375: Remove after 6.0 is released.
static void _handleWouldChangeOwningShardErrorRetryableWriteLegacy(
OperationContext* opCtx,
const ShardId& shardId,
const boost::optional<ChunkVersion>& shardVersion,
const boost::optional<DatabaseVersion>& dbVersion,
const NamespaceString& nss,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
RouterOperationContextSession routerSession(opCtx);
try {
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
// Re-run the findAndModify command that will change the shard key value in a
// transaction. We call _runCommand recursively, and this second time through
// since it will be run as a transaction it will take the other code path to
// updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried
// operation does not include WC inside the transaction by stripping it from the
// cmdObj. The transaction commit will still use the WC, because it uses the WC
// from the opCtx (which has been set previously in Strategy).
documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
_runCommand(
opCtx, shardId, shardVersion, dbVersion, nss, stripWriteConcern(cmdObj), result);
uassertStatusOK(getStatusFromCommandResult(result->asTempObj()));
auto commitResponse =
documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
uassertStatusOK(getStatusFromCommandResult(commitResponse));
if (auto wcErrorElem = commitResponse["writeConcernError"]) {
appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
}
} catch (DBException& e) {
if (e.code() != ErrorCodes::DuplicateKey ||
(e.code() == ErrorCodes::DuplicateKey &&
!e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
};
auto txnRouterForAbort = TransactionRouter::get(opCtx);
if (txnRouterForAbort)
txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus());
throw;
}
}
// Update related command execution metrics.
UpdateMetrics _updateMetrics;
} findAndModifyCmd;

View File

@@ -68,6 +68,7 @@ public:
// Send it to the primary shard
RefineCollectionShardKeyRequest requestParamObj;
requestParamObj.setNewShardKey(request().getKey());
requestParamObj.setCollectionUUID(request().getCollectionUUID());
ShardsvrRefineCollectionShardKey refineCollectionShardKeyCommand(nss);
refineCollectionShardKeyCommand.setRefineCollectionShardKeyRequest(requestParamObj);

View File

@@ -47,6 +47,10 @@ commands:
type: object
description: "The index specification document to use as the new shard key."
optional: false
collectionUUID:
type: uuid
description: "The expected UUID of the collection."
optional: true
_configsvrRefineCollectionShardKey:
command_name: _configsvrRefineCollectionShardKey

View File

@@ -161,6 +161,10 @@ structs:
type: KeyPattern
description: "The index specification document to use as the new shard key."
optional: false
collectionUUID:
type: uuid
description: "The expected UUID of the collection."
optional: true
ReshardCollectionRequest:
description: "Parameters for the reshard collection command"

View File

@@ -83,11 +83,11 @@ configs:
arg_vartype: String
cpp_varname: shellGlobalParams.apiVersion
"apiStrict":
description: "disable all features not included in the MongoDB Versioned API"
description: "disable all features not included in the MongoDB Stable API"
arg_vartype: Switch
cpp_varname: shellGlobalParams.apiStrict
"apiDeprecationErrors":
description: "disable all features deprecated in the MongoDB Versioned API"
description: "disable all features deprecated in the MongoDB Stable API"
arg_vartype: Switch
cpp_varname: shellGlobalParams.apiDeprecationErrors
"objcheck":

View File

@@ -577,7 +577,7 @@ ConnectionRegistry::ConnectionRegistry() = default;
void ConnectionRegistry::registerConnection(DBClientBase& client, StringData uri) {
BSONObj info;
BSONObj command;
// If apiStrict is set override it, whatsmyuri is not in the Versioned API.
// If apiStrict is set override it, whatsmyuri is not in the Stable API.
if (client.getApiParameters().getStrict()) {
command = BSON("whatsmyuri" << 1 << "apiStrict" << false);
} else {