Compare commits
5 Commits
ajdavis-pa
...
r5.3.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
be443f6fc1 | ||
|
|
1b130329e1 | ||
|
|
cebfa751cb | ||
|
|
7a2d86c376 | ||
|
|
59e19dc9bf |
@@ -24,7 +24,7 @@
|
||||
# delete this exception statement from your version. If you delete this
|
||||
# exception statement from all source files in the program, then also delete
|
||||
# it in the license file.
|
||||
"""Check that mongod's and mongos's Versioned API commands are defined in IDL.
|
||||
"""Check that mongod's and mongos's Stable API commands are defined in IDL.
|
||||
|
||||
Call listCommands on mongod and mongos to assert they have the same set of commands in the given API
|
||||
version, and assert all these commands are defined in IDL.
|
||||
@@ -4,6 +4,9 @@ selector:
|
||||
roots:
|
||||
- jstests/ssl/*.js
|
||||
- src/mongo/db/modules/*/jstests/fips/*.js
|
||||
exclude_files:
|
||||
# TODO SERVER-63155 Re-enable this test.
|
||||
- jstests/ssl/ssl_cert_selector_apple.js
|
||||
|
||||
# ssl tests start their own mongod's.
|
||||
executor:
|
||||
|
||||
2
debian/mongo.1
vendored
2
debian/mongo.1
vendored
@@ -295,7 +295,7 @@ currently the only supported value.
|
||||
.RS
|
||||
.PP
|
||||
Specifies that the server will respond with \fBAPIStrictError\f1 if your application uses a command or behavior
|
||||
outside of the \fBVersioned API\f1\&.
|
||||
outside of the \fBStable API\f1\&.
|
||||
.PP
|
||||
If you specify \fB\-\-apiStrict\f1\f1, you must also specify
|
||||
\fB\-\-apiVersion\f1\f1\&.
|
||||
|
||||
@@ -7,7 +7,7 @@ set -o errexit
|
||||
set -o verbose
|
||||
activate_venv
|
||||
|
||||
$python buildscripts/idl/check_versioned_api_commands_have_idl_definitions.py -v --include src --include src/mongo/db/modules/enterprise/src --installDir dist-test/bin 1
|
||||
$python buildscripts/idl/check_stable_api_commands_have_idl_definitions.py -v --include src --include src/mongo/db/modules/enterprise/src --installDir dist-test/bin 1
|
||||
$python buildscripts/idl/checkout_idl_files_from_past_releases.py -v idls
|
||||
find idls -maxdepth 1 -mindepth 1 -type d | while read dir; do
|
||||
echo "Performing idl check compatibility with release: $dir:"
|
||||
|
||||
@@ -138,6 +138,18 @@ const verifyNoBoundsAndFindsN = function(coll, expected, predicate, queryCollati
|
||||
assert.eq(expected, coll.find(predicate).count(), "Didn't find the expected records");
|
||||
};
|
||||
|
||||
const verifyNoTightBoundsAndFindsN = function(coll, expected, predicate, queryCollation) {
|
||||
const res = queryCollation === undefined
|
||||
? assert.commandWorked(coll.find(predicate).explain())
|
||||
: assert.commandWorked(coll.find(predicate).collation(queryCollation).explain());
|
||||
const min = res.queryPlanner.winningPlan.minRecord;
|
||||
const max = res.queryPlanner.winningPlan.maxRecord;
|
||||
assert.neq(null, min, "No min bound");
|
||||
assert.neq(null, max, "No max bound");
|
||||
assert.neq(min, max, "COLLSCAN bounds are equal");
|
||||
assert.eq(expected, coll.find(predicate).count(), "Didn't find the expected records");
|
||||
};
|
||||
|
||||
const testBounds = function(coll, expected, defaultCollation) {
|
||||
// Test non string types.
|
||||
verifyHasBoundsAndFindsN(coll, 1, {_id: 5});
|
||||
@@ -160,10 +172,10 @@ const testBounds = function(coll, expected, defaultCollation) {
|
||||
verifyNoBoundsAndFindsN(coll, expected, {data: ["a", "B"]});
|
||||
|
||||
// Test non compatible query collations don't generate bounds
|
||||
verifyNoBoundsAndFindsN(coll, expected, {_id: "A"}, incompatibleCollation);
|
||||
verifyNoBoundsAndFindsN(coll, expected, {_id: {str: "A"}}, incompatibleCollation);
|
||||
verifyNoBoundsAndFindsN(coll, expected, {_id: {strs: ["A", "b"]}}, incompatibleCollation);
|
||||
verifyNoBoundsAndFindsN(coll, expected, {_id: {strs: ["a", "B"]}}, incompatibleCollation);
|
||||
verifyNoTightBoundsAndFindsN(coll, expected, {_id: "A"}, incompatibleCollation);
|
||||
verifyNoTightBoundsAndFindsN(coll, expected, {_id: {str: "A"}}, incompatibleCollation);
|
||||
verifyNoTightBoundsAndFindsN(coll, expected, {_id: {strs: ["A", "b"]}}, incompatibleCollation);
|
||||
verifyNoTightBoundsAndFindsN(coll, expected, {_id: {strs: ["a", "B"]}}, incompatibleCollation);
|
||||
|
||||
// Test compatible query collations generate bounds
|
||||
verifyHasBoundsAndFindsN(coll, expected, {_id: "A"}, defaultCollation);
|
||||
|
||||
@@ -50,8 +50,9 @@ const testClusteredCollectionBoundedScan = function(coll, clusterKey) {
|
||||
|
||||
assert(getPlanStage(expl, "CLUSTERED_IXSCAN"));
|
||||
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("maxRecord"));
|
||||
assert(!getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("minRecord"));
|
||||
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("minRecord"));
|
||||
assert.eq(10, getPlanStage(expl, "CLUSTERED_IXSCAN").maxRecord);
|
||||
assert.eq(NaN, getPlanStage(expl, "CLUSTERED_IXSCAN").minRecord);
|
||||
|
||||
assert.eq(expectedNReturned, expl.executionStats.executionStages.nReturned);
|
||||
assert.eq(expectedDocsExamined, expl.executionStats.executionStages.docsExamined);
|
||||
@@ -65,8 +66,9 @@ const testClusteredCollectionBoundedScan = function(coll, clusterKey) {
|
||||
}));
|
||||
|
||||
assert(getPlanStage(expl, "CLUSTERED_IXSCAN"));
|
||||
assert(!getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("maxRecord"));
|
||||
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("maxRecord"));
|
||||
assert(getPlanStage(expl, "CLUSTERED_IXSCAN").hasOwnProperty("minRecord"));
|
||||
assert.eq(Infinity, getPlanStage(expl, "CLUSTERED_IXSCAN").maxRecord);
|
||||
assert.eq(89, getPlanStage(expl, "CLUSTERED_IXSCAN").minRecord);
|
||||
|
||||
assert.eq(expectedNReturned, expl.executionStats.executionStages.nReturned);
|
||||
|
||||
@@ -138,8 +138,12 @@ function testClusteredCollectionHint(coll, clusterKey, clusterKeyName) {
|
||||
filter: {[clusterKeyFieldName]: {$lt: arbitraryDocId}},
|
||||
hint: clusterKey,
|
||||
},
|
||||
expectedWinningPlanStats:
|
||||
{stage: "CLUSTERED_IXSCAN", direction: "forward", maxRecord: arbitraryDocId}
|
||||
expectedWinningPlanStats: {
|
||||
stage: "CLUSTERED_IXSCAN",
|
||||
direction: "forward",
|
||||
minRecord: NaN,
|
||||
maxRecord: arbitraryDocId
|
||||
}
|
||||
});
|
||||
validateClusteredCollectionHint(coll, {
|
||||
expectedNReturned: batchSize - arbitraryDocId,
|
||||
@@ -148,8 +152,12 @@ function testClusteredCollectionHint(coll, clusterKey, clusterKeyName) {
|
||||
filter: {[clusterKeyFieldName]: {$gte: arbitraryDocId}},
|
||||
hint: clusterKey,
|
||||
},
|
||||
expectedWinningPlanStats:
|
||||
{stage: "CLUSTERED_IXSCAN", direction: "forward", minRecord: arbitraryDocId}
|
||||
expectedWinningPlanStats: {
|
||||
stage: "CLUSTERED_IXSCAN",
|
||||
direction: "forward",
|
||||
minRecord: arbitraryDocId,
|
||||
maxRecord: Infinity
|
||||
}
|
||||
});
|
||||
|
||||
// Find with $natural hints.
|
||||
@@ -282,15 +290,15 @@ function validateClusteredCollectionHint(coll,
|
||||
assert.neq(null, stageOfInterest);
|
||||
|
||||
for (const [key, value] of Object.entries(expectedWinningPlanStats)) {
|
||||
assert(stageOfInterest[key], tojson(explain));
|
||||
assert(stageOfInterest[key] !== undefined, tojson(explain));
|
||||
assert.eq(stageOfInterest[key], value, tojson(explain));
|
||||
}
|
||||
|
||||
// Explicitly check that the plan is not bounded by default.
|
||||
if (!expectedWinningPlanStats.hasOwnProperty("minRecord")) {
|
||||
assert(!actualWinningPlan["minRecord"], tojson(explain));
|
||||
assert(!actualWinningPlan.hasOwnProperty("minRecord"), tojson(explain));
|
||||
}
|
||||
if (!expectedWinningPlanStats.hasOwnProperty("maxRecord")) {
|
||||
assert(!actualWinningPlan["maxRecord"], tojson(explain));
|
||||
assert(!actualWinningPlan.hasOwnProperty("maxRecord"), tojson(explain));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Test the shell's --apiVersion and other options related to the MongoDB Versioned API, and
|
||||
* Test the shell's --apiVersion and other options related to the MongoDB Stable API, and
|
||||
* test passing API parameters to the Mongo() constructor.
|
||||
*
|
||||
* @tags: [
|
||||
|
||||
@@ -0,0 +1,73 @@
|
||||
/**
|
||||
* Tests the collectionUUID parameter of the refineCollectionShardKey command.
|
||||
*
|
||||
* @tags: [
|
||||
* featureFlagCommandsAcceptCollectionUUID,
|
||||
* ]
|
||||
*/
|
||||
(function() {
|
||||
'use strict';
|
||||
|
||||
const st = new ShardingTest({shards: 1});
|
||||
const mongos = st.s0;
|
||||
const db = mongos.getDB(jsTestName());
|
||||
const coll = db['coll'];
|
||||
assert.commandWorked(mongos.adminCommand({enableSharding: db.getName()}));
|
||||
|
||||
const oldKeyDoc = {
|
||||
_id: 1,
|
||||
a: 1
|
||||
};
|
||||
const newKeyDoc = {
|
||||
_id: 1,
|
||||
a: 1,
|
||||
b: 1,
|
||||
c: 1
|
||||
};
|
||||
|
||||
const resetColl = function(shardedColl) {
|
||||
shardedColl.drop();
|
||||
assert.commandWorked(shardedColl.insert({_id: 0, a: 1, b: 2, c: 3}));
|
||||
assert.commandWorked(mongos.getCollection(shardedColl.getFullName()).createIndex(newKeyDoc));
|
||||
assert.commandWorked(
|
||||
mongos.adminCommand({shardCollection: shardedColl.getFullName(), key: oldKeyDoc}));
|
||||
};
|
||||
|
||||
const uuid = function() {
|
||||
return assert.commandWorked(db.runCommand({listCollections: 1}))
|
||||
.cursor.firstBatch.find(c => c.name === coll.getName())
|
||||
.info.uuid;
|
||||
};
|
||||
|
||||
resetColl(coll);
|
||||
|
||||
// The command succeeds when provided with the correct collection UUID.
|
||||
assert.commandWorked(mongos.adminCommand(
|
||||
{refineCollectionShardKey: coll.getFullName(), key: newKeyDoc, collectionUUID: uuid()}));
|
||||
|
||||
// The command fails when provided with a UUID with no corresponding collection.
|
||||
resetColl(coll);
|
||||
const nonexistentUUID = UUID();
|
||||
let res = assert.commandFailedWithCode(mongos.adminCommand({
|
||||
refineCollectionShardKey: coll.getFullName(),
|
||||
key: newKeyDoc,
|
||||
collectionUUID: nonexistentUUID,
|
||||
}),
|
||||
ErrorCodes.CollectionUUIDMismatch);
|
||||
assert.eq(res.collectionUUID, nonexistentUUID);
|
||||
assert.eq(res.actualNamespace, "");
|
||||
|
||||
// The command fails when provided with a different collection's UUID.
|
||||
const coll2 = db['coll_2'];
|
||||
resetColl(coll2);
|
||||
res = assert.commandFailedWithCode(mongos.adminCommand({
|
||||
refineCollectionShardKey: coll2.getFullName(),
|
||||
key: newKeyDoc,
|
||||
collectionUUID: uuid(),
|
||||
}),
|
||||
ErrorCodes.CollectionUUIDMismatch);
|
||||
assert.eq(res.collectionUUID, uuid());
|
||||
assert.eq(res.actualNamespace, coll.getFullName());
|
||||
|
||||
st.stop();
|
||||
})();
|
||||
@@ -124,7 +124,7 @@ function runFindAndModifyCmdSuccess(st,
|
||||
if (upsert) {
|
||||
assert.eq(null, res);
|
||||
} else {
|
||||
assert(resultsEq([res], oldDoc));
|
||||
assert(resultsEq([res], oldDoc), tojson([res, oldDoc]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,8 +201,9 @@ changeShardKeyOptions.forEach(function(updateConfig) {
|
||||
if (!isFindAndModify) {
|
||||
assertCannotUpdateWithMultiTrue(
|
||||
st, kDbName, ns, session, sessionDB, runInTxn, {"x": 300}, {"$set": {"x": 30}});
|
||||
|
||||
changeShardKeyWhenFailpointsSet(session, sessionDB, runInTxn, isFindAndModify);
|
||||
}
|
||||
changeShardKeyWhenFailpointsSet(session, sessionDB, runInTxn, isFindAndModify);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ structs:
|
||||
# is like MongoClient(uri, api={version: "1", strict: true, deprecationErrors: true}).
|
||||
|
||||
ClientAPIVersionParameters:
|
||||
description: "Parser for Versioned API parameters passed to 'new Mongo()' in the mongo shell"
|
||||
description: "Parser for Stable API parameters passed to 'new Mongo()' in the mongo shell"
|
||||
strict: true
|
||||
fields:
|
||||
version:
|
||||
|
||||
@@ -839,7 +839,6 @@ env.Library(
|
||||
'repl/repl_coordinator_interface',
|
||||
'service_context',
|
||||
'shared_request_handling',
|
||||
'write_ops',
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# MongoDB Versioned API
|
||||
# MongoDB Stable API
|
||||
|
||||
The MongoDB API is the user-visible behavior of all commands, including their parameters and reply
|
||||
fields. An "API version" is a subset of the API for which we make an especially strong promise: For
|
||||
@@ -84,7 +84,7 @@ This also applies to any `unstable` fields. This is to prevent unexpected errors
|
||||
a field from `unstable` to `stable`. By intentionally opting in, we assume the implementer
|
||||
understands the implications and has valid reasons to use `any`.
|
||||
|
||||
## Versioned API implementation
|
||||
## Stable API implementation
|
||||
|
||||
All `Command` subclasses implement `apiVersions()`, which returns the set of API versions the
|
||||
command is part of. By default, a command is not included in any API version, meaning it has no
|
||||
@@ -46,6 +46,7 @@
|
||||
#include "mongo/db/ops/update.h"
|
||||
#include "mongo/db/ops/update_request.h"
|
||||
#include "mongo/db/query/get_executor.h"
|
||||
#include "mongo/db/query/index_bounds_builder.h"
|
||||
#include "mongo/db/query/internal_plans.h"
|
||||
#include "mongo/db/query/query_planner.h"
|
||||
#include "mongo/db/record_id_helpers.h"
|
||||
@@ -174,7 +175,7 @@ bool Helpers::findById(OperationContext* opCtx,
|
||||
if (collection->isClustered()) {
|
||||
Snapshotted<BSONObj> doc;
|
||||
if (collection->findDoc(opCtx,
|
||||
RecordId(record_id_helpers::keyForElem(
|
||||
record_id_helpers::keyForObj(IndexBoundsBuilder::objFromElement(
|
||||
query["_id"], collection->getDefaultCollator())),
|
||||
&doc)) {
|
||||
result = std::move(doc.value());
|
||||
@@ -200,8 +201,8 @@ RecordId Helpers::findById(OperationContext* opCtx,
|
||||
if (!desc && clustered_util::isClusteredOnId(collection->getClusteredInfo())) {
|
||||
// There is no explicit IndexDescriptor for _id on a collection clustered by _id. However,
|
||||
// the RecordId can be constructed directly from the input.
|
||||
return RecordId(
|
||||
record_id_helpers::keyForElem(idquery["_id"], collection->getDefaultCollator()));
|
||||
return record_id_helpers::keyForObj(
|
||||
IndexBoundsBuilder::objFromElement(idquery["_id"], collection->getDefaultCollator()));
|
||||
}
|
||||
|
||||
uassert(13430, "no _id index", desc);
|
||||
|
||||
@@ -42,6 +42,7 @@
|
||||
#include "mongo/db/exec/scoped_timer.h"
|
||||
#include "mongo/db/exec/working_set.h"
|
||||
#include "mongo/db/exec/working_set_common.h"
|
||||
#include "mongo/db/record_id_helpers.h"
|
||||
#include "mongo/db/repl/optime.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/fail_point.h"
|
||||
@@ -202,13 +203,13 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
|
||||
if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::FORWARD &&
|
||||
_params.minRecord) {
|
||||
// Seek to the approximate start location.
|
||||
record = _cursor->seekNear(*_params.minRecord);
|
||||
record = _cursor->seekNear(_params.minRecord->recordId());
|
||||
}
|
||||
|
||||
if (_lastSeenId.isNull() && _params.direction == CollectionScanParams::BACKWARD &&
|
||||
_params.maxRecord) {
|
||||
// Seek to the approximate start location (at the end).
|
||||
record = _cursor->seekNear(*_params.maxRecord);
|
||||
record = _cursor->seekNear(_params.maxRecord->recordId());
|
||||
}
|
||||
|
||||
if (!record) {
|
||||
@@ -304,7 +305,7 @@ bool pastEndOfRange(const CollectionScanParams& params, const WorkingSetMember&
|
||||
return false;
|
||||
}
|
||||
|
||||
auto endRecord = *params.maxRecord;
|
||||
auto endRecord = params.maxRecord->recordId();
|
||||
return member.recordId > endRecord ||
|
||||
(member.recordId == endRecord && !shouldIncludeEndRecord(params));
|
||||
} else {
|
||||
@@ -312,7 +313,7 @@ bool pastEndOfRange(const CollectionScanParams& params, const WorkingSetMember&
|
||||
if (!params.minRecord) {
|
||||
return false;
|
||||
}
|
||||
auto endRecord = *params.minRecord;
|
||||
auto endRecord = params.minRecord->recordId();
|
||||
|
||||
return member.recordId < endRecord ||
|
||||
(member.recordId == endRecord && !shouldIncludeEndRecord(params));
|
||||
@@ -326,7 +327,7 @@ bool beforeStartOfRange(const CollectionScanParams& params, const WorkingSetMemb
|
||||
return false;
|
||||
}
|
||||
|
||||
auto startRecord = *params.minRecord;
|
||||
auto startRecord = params.minRecord->recordId();
|
||||
return member.recordId < startRecord ||
|
||||
(member.recordId == startRecord && !shouldIncludeStartRecord(params));
|
||||
} else {
|
||||
@@ -334,7 +335,7 @@ bool beforeStartOfRange(const CollectionScanParams& params, const WorkingSetMemb
|
||||
if (!params.maxRecord) {
|
||||
return false;
|
||||
}
|
||||
auto startRecord = *params.maxRecord;
|
||||
auto startRecord = params.maxRecord->recordId();
|
||||
return member.recordId > startRecord ||
|
||||
(member.recordId == startRecord && !shouldIncludeStartRecord(params));
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/query/record_id_bound.h"
|
||||
#include "mongo/db/record_id.h"
|
||||
|
||||
namespace mongo {
|
||||
@@ -54,7 +55,7 @@ struct CollectionScanParams {
|
||||
// be used for scans on clustered collections and forward oplog scans. If exclusive
|
||||
// bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
|
||||
// cannot be used in conjunction with 'resumeAfterRecordId'
|
||||
boost::optional<RecordId> minRecord;
|
||||
boost::optional<RecordIdBound> minRecord;
|
||||
|
||||
// If present, this parameter sets the start point of a reverse scan or the end point of a
|
||||
// forward scan. A forward scan will stop and return EOF on the first document with a RecordId
|
||||
@@ -63,7 +64,7 @@ struct CollectionScanParams {
|
||||
// only be used for scans on clustered collections and forward oplog scans. If exclusive
|
||||
// bounds are required, a MatchExpression must be passed to the CollectionScan stage. This field
|
||||
// cannot be used in conjunction with 'resumeAfterRecordId'.
|
||||
boost::optional<RecordId> maxRecord;
|
||||
boost::optional<RecordIdBound> maxRecord;
|
||||
|
||||
// If true, the collection scan will return a token that can be used to resume the scan.
|
||||
bool requestResumeToken = false;
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include "mongo/db/index/multikey_paths.h"
|
||||
#include "mongo/db/jsobj.h"
|
||||
#include "mongo/db/query/plan_summary_stats.h"
|
||||
#include "mongo/db/query/record_id_bound.h"
|
||||
#include "mongo/db/query/stage_types.h"
|
||||
#include "mongo/db/record_id.h"
|
||||
#include "mongo/util/container_size_helper.h"
|
||||
@@ -297,10 +298,10 @@ struct CollectionScanStats : public SpecificStats {
|
||||
bool tailable{false};
|
||||
|
||||
// The start location of a forward scan and end location for a reverse scan.
|
||||
boost::optional<RecordId> minRecord;
|
||||
boost::optional<RecordIdBound> minRecord;
|
||||
|
||||
// The end location of a reverse scan and start location for a forward scan.
|
||||
boost::optional<RecordId> maxRecord;
|
||||
boost::optional<RecordIdBound> maxRecord;
|
||||
};
|
||||
|
||||
struct CountStats : public SpecificStats {
|
||||
|
||||
@@ -49,7 +49,7 @@ namespace mongo {
|
||||
namespace {
|
||||
RecordId toRecordId(ChangeStreamPreImageId id) {
|
||||
return record_id_helpers::keyForElem(
|
||||
BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement(), nullptr);
|
||||
BSON(ChangeStreamPreImage::kIdFieldName << id.toBSON()).firstElement());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -153,10 +153,10 @@ public:
|
||||
while (true) {
|
||||
// Fetch the first pre-image from the next collection, that has pre-images enabled.
|
||||
auto planExecutor = _previousCollectionUUID
|
||||
? createCollectionScan(
|
||||
? createCollectionScan(RecordIdBound(
|
||||
toRecordId(ChangeStreamPreImageId(*_previousCollectionUUID,
|
||||
Timestamp::max(),
|
||||
std::numeric_limits<int64_t>::max())))
|
||||
std::numeric_limits<int64_t>::max()))))
|
||||
: createCollectionScan(boost::none);
|
||||
auto preImageAttributes = getNextPreImageAttributes(planExecutor);
|
||||
|
||||
@@ -185,10 +185,10 @@ public:
|
||||
// '_earliestOplogEntryTimestamp', as the pre-images with smaller or equal
|
||||
// timestamp are guaranteed to be expired.
|
||||
Timestamp lastExpiredPreimageTs(_earliestOplogEntryTimestamp.asULL() - 1);
|
||||
auto planExecutor = createCollectionScan(
|
||||
auto planExecutor = createCollectionScan(RecordIdBound(
|
||||
toRecordId(ChangeStreamPreImageId(currentCollectionUUID,
|
||||
lastExpiredPreimageTs,
|
||||
std::numeric_limits<int64_t>::max())));
|
||||
std::numeric_limits<int64_t>::max()))));
|
||||
|
||||
// Iterate over all the expired pre-images in the collection in order to find
|
||||
// the max RecordId.
|
||||
@@ -233,9 +233,10 @@ public:
|
||||
preImageAttributes.operationTime <= expirationTime;
|
||||
}
|
||||
|
||||
|
||||
// Set up the new collection scan to start from the 'minKey'.
|
||||
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> createCollectionScan(
|
||||
boost::optional<RecordId> minKey) const {
|
||||
boost::optional<RecordIdBound> minKey) const {
|
||||
return InternalPlanner::collectionScan(_opCtx,
|
||||
_preImagesCollPtr,
|
||||
PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY,
|
||||
@@ -315,6 +316,7 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
|
||||
// TODO SERVER-58693: pass expiration duration parameter to the iterator.
|
||||
ChangeStreamExpiredPreImageIterator expiredPreImages(
|
||||
opCtx.get(), &preImagesColl, currentEarliestOplogEntryTs);
|
||||
|
||||
for (const auto& collectionRange : expiredPreImages) {
|
||||
writeConflictRetry(opCtx.get(),
|
||||
"ChangeStreamExpiredPreImagesRemover",
|
||||
@@ -329,8 +331,8 @@ void deleteExpiredChangeStreamPreImages(Client* client) {
|
||||
std::move(params),
|
||||
PlanYieldPolicy::YieldPolicy::YIELD_AUTO,
|
||||
InternalPlanner::Direction::FORWARD,
|
||||
collectionRange.first,
|
||||
collectionRange.second);
|
||||
RecordIdBound(collectionRange.first),
|
||||
RecordIdBound(collectionRange.second));
|
||||
numberOfRemovals += exec->executeDelete();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -170,7 +170,7 @@ public:
|
||||
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
|
||||
_params.assertTsHasNotFallenOffOplog = Timestamp(0);
|
||||
_params.shouldTrackLatestOplogTimestamp = true;
|
||||
_params.minRecord = RecordId(0);
|
||||
_params.minRecord = RecordIdBound(RecordId(0));
|
||||
_params.tailable = true;
|
||||
}
|
||||
|
||||
@@ -178,7 +178,7 @@ public:
|
||||
invariant(!_collScan);
|
||||
_filterExpr = BSON("ns" << kTestNs << "ts" << BSON("$gte" << resumeToken.clusterTime));
|
||||
_filter = MatchExpressionParser::parseAndNormalize(_filterExpr, pExpCtx);
|
||||
_params.minRecord = RecordId(resumeToken.clusterTime.asLL());
|
||||
_params.minRecord = RecordIdBound(RecordId(resumeToken.clusterTime.asLL()));
|
||||
_params.assertTsHasNotFallenOffOplog = resumeToken.clusterTime;
|
||||
}
|
||||
|
||||
|
||||
@@ -79,12 +79,12 @@ CollectionScanParams convertIndexScanParamsToCollScanParams(
|
||||
clustered_util::matchesClusterKey(keyPattern, collection->getClusteredInfo()));
|
||||
invariant(collection->getDefaultCollator() == nullptr);
|
||||
|
||||
boost::optional<RecordId> startRecord, endRecord;
|
||||
boost::optional<RecordIdBound> startRecord, endRecord;
|
||||
if (!startKey.isEmpty()) {
|
||||
startRecord = RecordId(record_id_helpers::keyForElem(startKey.firstElement(), nullptr));
|
||||
startRecord = RecordIdBound(record_id_helpers::keyForElem(startKey.firstElement()));
|
||||
}
|
||||
if (!endKey.isEmpty()) {
|
||||
endRecord = RecordId(record_id_helpers::keyForElem(endKey.firstElement(), nullptr));
|
||||
endRecord = RecordIdBound(record_id_helpers::keyForElem(endKey.firstElement()));
|
||||
}
|
||||
|
||||
// For a forward scan, the startKey is the minRecord. For a backward scan, it is the maxRecord.
|
||||
@@ -93,7 +93,7 @@ CollectionScanParams convertIndexScanParamsToCollScanParams(
|
||||
|
||||
if (minRecord && maxRecord) {
|
||||
// Regardless of direction, the minRecord should always be less than the maxRecord
|
||||
dassert(minRecord < maxRecord,
|
||||
dassert(minRecord->recordId() < maxRecord->recordId(),
|
||||
str::stream() << "Expected the minRecord " << minRecord
|
||||
<< " to be less than the maxRecord " << maxRecord
|
||||
<< " on a bounded collection scan. Original startKey and endKey for "
|
||||
@@ -105,6 +105,7 @@ CollectionScanParams convertIndexScanParamsToCollScanParams(
|
||||
CollectionScanParams params;
|
||||
params.minRecord = minRecord;
|
||||
params.maxRecord = maxRecord;
|
||||
|
||||
if (InternalPlanner::FORWARD == direction) {
|
||||
params.direction = CollectionScanParams::FORWARD;
|
||||
} else {
|
||||
@@ -120,8 +121,8 @@ CollectionScanParams createCollectionScanParams(
|
||||
const CollectionPtr* coll,
|
||||
InternalPlanner::Direction direction,
|
||||
boost::optional<RecordId> resumeAfterRecordId,
|
||||
boost::optional<RecordId> minRecord,
|
||||
boost::optional<RecordId> maxRecord) {
|
||||
boost::optional<RecordIdBound> minRecord,
|
||||
boost::optional<RecordIdBound> maxRecord) {
|
||||
const auto& collection = *coll;
|
||||
invariant(collection);
|
||||
|
||||
@@ -146,8 +147,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::collection
|
||||
PlanYieldPolicy::YieldPolicy yieldPolicy,
|
||||
const Direction direction,
|
||||
boost::optional<RecordId> resumeAfterRecordId,
|
||||
boost::optional<RecordId> minRecord,
|
||||
boost::optional<RecordId> maxRecord) {
|
||||
boost::optional<RecordIdBound> minRecord,
|
||||
boost::optional<RecordIdBound> maxRecord) {
|
||||
const auto& collection = *coll;
|
||||
invariant(collection);
|
||||
|
||||
@@ -205,8 +206,8 @@ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> InternalPlanner::deleteWith
|
||||
std::unique_ptr<DeleteStageParams> params,
|
||||
PlanYieldPolicy::YieldPolicy yieldPolicy,
|
||||
Direction direction,
|
||||
boost::optional<RecordId> minRecord,
|
||||
boost::optional<RecordId> maxRecord) {
|
||||
boost::optional<RecordIdBound> minRecord,
|
||||
boost::optional<RecordIdBound> maxRecord) {
|
||||
const auto& collection = *coll;
|
||||
invariant(collection);
|
||||
auto ws = std::make_unique<WorkingSet>();
|
||||
|
||||
@@ -78,8 +78,8 @@ public:
|
||||
PlanYieldPolicy::YieldPolicy yieldPolicy,
|
||||
Direction direction = FORWARD,
|
||||
boost::optional<RecordId> resumeAfterRecordId = boost::none,
|
||||
boost::optional<RecordId> minRecord = boost::none,
|
||||
boost::optional<RecordId> maxRecord = boost::none);
|
||||
boost::optional<RecordIdBound> minRecord = boost::none,
|
||||
boost::optional<RecordIdBound> maxRecord = boost::none);
|
||||
|
||||
static std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> collectionScan(
|
||||
OperationContext* opCtx,
|
||||
@@ -96,8 +96,8 @@ public:
|
||||
std::unique_ptr<DeleteStageParams> deleteStageParams,
|
||||
PlanYieldPolicy::YieldPolicy yieldPolicy,
|
||||
Direction direction = FORWARD,
|
||||
boost::optional<RecordId> minRecord = boost::none,
|
||||
boost::optional<RecordId> maxRecord = boost::none);
|
||||
boost::optional<RecordIdBound> minRecord = boost::none,
|
||||
boost::optional<RecordIdBound> maxRecord = boost::none);
|
||||
|
||||
/**
|
||||
* Returns an index scan. Caller owns returned pointer.
|
||||
|
||||
@@ -257,10 +257,10 @@ void statsToBSON(const PlanStageStats& stats,
|
||||
CollectionScanStats* spec = static_cast<CollectionScanStats*>(stats.specific.get());
|
||||
bob->append("direction", spec->direction > 0 ? "forward" : "backward");
|
||||
if (spec->minRecord) {
|
||||
record_id_helpers::appendToBSONAs(*spec->minRecord, bob, "minRecord");
|
||||
spec->minRecord->appendToBSONAs(bob, "minRecord");
|
||||
}
|
||||
if (spec->maxRecord) {
|
||||
record_id_helpers::appendToBSONAs(*spec->maxRecord, bob, "maxRecord");
|
||||
spec->maxRecord->appendToBSONAs(bob, "maxRecord");
|
||||
}
|
||||
if (verbosity >= ExplainOptions::Verbosity::kExecStats) {
|
||||
bob->appendNumber("docsExamined", static_cast<long long>(spec->docsTested));
|
||||
|
||||
@@ -73,10 +73,10 @@ void statsToBSON(const QuerySolutionNode* node,
|
||||
auto csn = static_cast<const CollectionScanNode*>(node);
|
||||
bob->append("direction", csn->direction > 0 ? "forward" : "backward");
|
||||
if (csn->minRecord) {
|
||||
record_id_helpers::appendToBSONAs(*csn->minRecord, bob, "minRecord");
|
||||
csn->minRecord->appendToBSONAs(bob, "minRecord");
|
||||
}
|
||||
if (csn->maxRecord) {
|
||||
record_id_helpers::appendToBSONAs(*csn->maxRecord, bob, "maxRecord");
|
||||
csn->maxRecord->appendToBSONAs(bob, "maxRecord");
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -228,6 +228,20 @@ bool affectedByCollator(const BSONElement& element) {
|
||||
}
|
||||
}
|
||||
|
||||
void setMinRecord(CollectionScanNode* collScan, const BSONObj& min) {
|
||||
const auto newMinRecord = record_id_helpers::keyForObj(min);
|
||||
if (!collScan->minRecord || newMinRecord > collScan->minRecord->recordId()) {
|
||||
collScan->minRecord = RecordIdBound(newMinRecord, min);
|
||||
}
|
||||
}
|
||||
|
||||
void setMaxRecord(CollectionScanNode* collScan, const BSONObj& max) {
|
||||
const auto newMaxRecord = record_id_helpers::keyForObj(max);
|
||||
if (!collScan->maxRecord || newMaxRecord < collScan->maxRecord->recordId()) {
|
||||
collScan->maxRecord = RecordIdBound(newMaxRecord, max);
|
||||
}
|
||||
}
|
||||
|
||||
// Returns whether element is not affected by collators or query and collection collators are
|
||||
// compatible.
|
||||
bool compatibleCollator(const QueryPlannerParams& params,
|
||||
@@ -267,30 +281,12 @@ void handleRIDRangeMinMax(const CanonicalQuery& query,
|
||||
// Assumes clustered collection scans are only supported with the forward direction.
|
||||
collScan->boundInclusion =
|
||||
CollectionScanParams::ScanBoundInclusion::kIncludeStartRecordOnly;
|
||||
newMaxRecord = record_id_helpers::keyForElem(maxObj.firstElement(), collator);
|
||||
setMaxRecord(collScan, IndexBoundsBuilder::objFromElement(maxObj.firstElement(), collator));
|
||||
}
|
||||
|
||||
if (!minObj.isEmpty() && compatibleCollator(params, collator, minObj.firstElement())) {
|
||||
// The min() is inclusive as are bounded collection scans by default.
|
||||
newMinRecord = record_id_helpers::keyForElem(minObj.firstElement(), collator);
|
||||
}
|
||||
|
||||
if (!collScan->minRecord) {
|
||||
collScan->minRecord = newMinRecord;
|
||||
} else if (newMinRecord) {
|
||||
if (*newMinRecord > *collScan->minRecord) {
|
||||
// The newMinRecord is more restrictive than the existing minRecord.
|
||||
collScan->minRecord = newMinRecord;
|
||||
}
|
||||
}
|
||||
|
||||
if (!collScan->maxRecord) {
|
||||
collScan->maxRecord = newMaxRecord;
|
||||
} else if (newMaxRecord) {
|
||||
if (*newMaxRecord < *collScan->maxRecord) {
|
||||
// The newMaxRecord is more restrictive than the existing maxRecord.
|
||||
collScan->maxRecord = newMaxRecord;
|
||||
}
|
||||
setMinRecord(collScan, IndexBoundsBuilder::objFromElement(minObj.firstElement(), collator));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,24 +325,31 @@ void handleRIDRangeScan(const MatchExpression* conjunct,
|
||||
}
|
||||
|
||||
const auto& element = match->getData();
|
||||
|
||||
// Set coarse min/max bounds based on type in case we can't set tight bounds.
|
||||
BSONObjBuilder minb;
|
||||
minb.appendMinForType("", element.type());
|
||||
setMinRecord(collScan, minb.obj());
|
||||
|
||||
BSONObjBuilder maxb;
|
||||
maxb.appendMaxForType("", element.type());
|
||||
setMaxRecord(collScan, maxb.obj());
|
||||
|
||||
bool compatible = compatibleCollator(params, collator, element);
|
||||
if (!compatible) {
|
||||
return; // Collator affects probe and it's not compatible with collection's collator.
|
||||
}
|
||||
|
||||
auto& maxRecord = collScan->maxRecord;
|
||||
auto& minRecord = collScan->minRecord;
|
||||
const auto collated = IndexBoundsBuilder::objFromElement(element, collator);
|
||||
if (dynamic_cast<const EqualityMatchExpression*>(match)) {
|
||||
minRecord = record_id_helpers::keyForElem(element, collator);
|
||||
maxRecord = minRecord;
|
||||
} else if (!maxRecord &&
|
||||
(dynamic_cast<const LTMatchExpression*>(match) ||
|
||||
dynamic_cast<const LTEMatchExpression*>(match))) {
|
||||
maxRecord = record_id_helpers::keyForElem(element, collator);
|
||||
} else if (!minRecord &&
|
||||
(dynamic_cast<const GTMatchExpression*>(match) ||
|
||||
dynamic_cast<const GTEMatchExpression*>(match))) {
|
||||
minRecord = record_id_helpers::keyForElem(element, collator);
|
||||
setMinRecord(collScan, collated);
|
||||
setMaxRecord(collScan, collated);
|
||||
} else if (dynamic_cast<const LTMatchExpression*>(match) ||
|
||||
dynamic_cast<const LTEMatchExpression*>(match)) {
|
||||
setMaxRecord(collScan, collated);
|
||||
} else if (dynamic_cast<const GTMatchExpression*>(match) ||
|
||||
dynamic_cast<const GTEMatchExpression*>(match)) {
|
||||
setMinRecord(collScan, collated);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +401,7 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
|
||||
if (minTs) {
|
||||
StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*minTs);
|
||||
if (goal.isOK()) {
|
||||
csn->minRecord = goal.getValue();
|
||||
csn->minRecord = RecordIdBound(goal.getValue());
|
||||
}
|
||||
|
||||
if (assertMinTsHasNotFallenOffOplog) {
|
||||
@@ -408,7 +411,7 @@ std::unique_ptr<QuerySolutionNode> QueryPlannerAccess::makeCollectionScan(
|
||||
if (maxTs) {
|
||||
StatusWith<RecordId> goal = record_id_helpers::keyForOptime(*maxTs);
|
||||
if (goal.isOK()) {
|
||||
csn->maxRecord = goal.getValue();
|
||||
csn->maxRecord = RecordIdBound(goal.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
#include "mongo/db/query/classic_plan_cache.h"
|
||||
#include "mongo/db/query/index_bounds.h"
|
||||
#include "mongo/db/query/plan_enumerator_explain_info.h"
|
||||
#include "mongo/db/query/record_id_bound.h"
|
||||
#include "mongo/db/query/stage_types.h"
|
||||
#include "mongo/util/id_generator.h"
|
||||
|
||||
@@ -444,11 +445,11 @@ struct CollectionScanNode : public QuerySolutionNodeWithSortSet {
|
||||
|
||||
// If present, this parameter sets the start point of a forward scan or the end point of a
|
||||
// reverse scan.
|
||||
boost::optional<RecordId> minRecord;
|
||||
boost::optional<RecordIdBound> minRecord;
|
||||
|
||||
// If present, this parameter sets the start point of a reverse scan or the end point of a
|
||||
// forward scan.
|
||||
boost::optional<RecordId> maxRecord;
|
||||
boost::optional<RecordIdBound> maxRecord;
|
||||
|
||||
// If true, the collection scan will return a token that can be used to resume the scan.
|
||||
bool requestResumeToken = false;
|
||||
|
||||
97
src/mongo/db/query/record_id_bound.h
Normal file
97
src/mongo/db/query/record_id_bound.h
Normal file
@@ -0,0 +1,97 @@
|
||||
/**
|
||||
* Copyright (C) 2022-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
#include <fmt/format.h>
|
||||
#include <ostream>
|
||||
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/bson/util/builder.h"
|
||||
#include "mongo/db/record_id.h"
|
||||
#include "mongo/db/record_id_helpers.h"
|
||||
#include "mongo/db/storage/key_string.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
/**
|
||||
* A RecordId bound for a collection scan, with an optional BSON representation for pretty printing.
|
||||
*/
|
||||
class RecordIdBound {
|
||||
public:
|
||||
RecordIdBound() = default;
|
||||
|
||||
explicit RecordIdBound(RecordId&& recordId, boost::optional<BSONObj> bson = boost::none)
|
||||
: _recordId(recordId), _bson(bson) {}
|
||||
|
||||
explicit RecordIdBound(const RecordId& recordId, boost::optional<BSONObj> bson = boost::none)
|
||||
: _recordId(recordId), _bson(bson) {}
|
||||
|
||||
RecordId recordId() const {
|
||||
return _recordId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends a BSON respresentation of the bound to a BSONObjBuilder. If one is not explicitily
|
||||
* provided it reconstructs it from the RecordId.
|
||||
*/
|
||||
void appendToBSONAs(BSONObjBuilder* builder, StringData fieldName) const {
|
||||
if (_bson) {
|
||||
builder->appendAs(_bson->firstElement(), fieldName);
|
||||
} else {
|
||||
record_id_helpers::appendToBSONAs(_recordId, builder, fieldName);
|
||||
}
|
||||
}
|
||||
|
||||
std::string toString() const {
|
||||
return _recordId.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares the underlying RecordIds.
|
||||
*/
|
||||
int compare(const RecordIdBound& rhs) const {
|
||||
return _recordId.compare(rhs._recordId);
|
||||
}
|
||||
|
||||
private:
|
||||
RecordId _recordId;
|
||||
boost::optional<BSONObj> _bson;
|
||||
};
|
||||
|
||||
inline StringBuilder& operator<<(StringBuilder& stream, const RecordIdBound& id) {
|
||||
return stream << "RecordIdBound(" << id.toString() << ')';
|
||||
}
|
||||
|
||||
inline std::ostream& operator<<(std::ostream& stream, const RecordIdBound& id) {
|
||||
return stream << "RecordIdBound(" << id.toString() << ')';
|
||||
}
|
||||
|
||||
} // namespace mongo
|
||||
@@ -45,6 +45,7 @@
|
||||
#include "mongo/db/query/sbe_stage_builder.h"
|
||||
#include "mongo/db/query/sbe_stage_builder_filter.h"
|
||||
#include "mongo/db/query/util/make_data_structure.h"
|
||||
#include "mongo/db/record_id_helpers.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/str.h"
|
||||
|
||||
@@ -272,7 +273,7 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
|
||||
return {state.slotId(), makeConstant(tag, val)};
|
||||
} else if (csn->minRecord) {
|
||||
auto cursor = collection->getRecordStore()->getCursor(state.opCtx);
|
||||
auto startRec = cursor->seekNear(*csn->minRecord);
|
||||
auto startRec = cursor->seekNear(csn->minRecord->recordId());
|
||||
if (startRec) {
|
||||
LOGV2_DEBUG(205841, 3, "Using direct oplog seek");
|
||||
auto [tag, val] = sbe::value::makeCopyRecordId(startRec->id);
|
||||
@@ -437,7 +438,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> generateOptimizedOplo
|
||||
std::move(stage),
|
||||
makeBinaryOp(sbe::EPrimBinary::lessEq,
|
||||
makeVariable(*tsSlot),
|
||||
makeConstant(sbe::value::TypeTags::Timestamp, csn->maxRecord->getLong())),
|
||||
makeConstant(sbe::value::TypeTags::Timestamp,
|
||||
csn->maxRecord->recordId().getLong())),
|
||||
csn->nodeId());
|
||||
}
|
||||
|
||||
|
||||
@@ -85,8 +85,8 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a RecordId that holds a small binary string. The raw value for RecordStore storage
|
||||
* may be retrieved using getStr().
|
||||
* Construct a RecordId that holds a binary string. The raw value for RecordStore storage may be
|
||||
* retrieved using getStr().
|
||||
*/
|
||||
explicit RecordId(const char* str, int32_t size) {
|
||||
invariant(size > 0, "key size must be greater than 0");
|
||||
@@ -209,8 +209,8 @@ public:
|
||||
}
|
||||
|
||||
/**
|
||||
* Compares two RecordIds. Requires that both RecordIds are of the same format, unless one or
|
||||
* both are null. Null always compares less than every other RecordId format.
|
||||
* Compares two RecordIds. Requires that both RecordIds are of the same "type" (long or string).
|
||||
* Null is always comparable and is less than every other RecordId format.
|
||||
*/
|
||||
int compare(const RecordId& rhs) const {
|
||||
switch (_format) {
|
||||
|
||||
@@ -51,20 +51,21 @@ StatusWith<RecordId> keyForOptime(const Timestamp& opTime) {
|
||||
// don't sort differently when put in a RecordId. It also avoids issues with Null/Invalid
|
||||
// RecordIds
|
||||
if (opTime.getSecs() > uint32_t(std::numeric_limits<int32_t>::max()))
|
||||
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts secs too high");
|
||||
return {ErrorCodes::BadValue, "ts secs too high"};
|
||||
|
||||
if (opTime.getInc() > uint32_t(std::numeric_limits<int32_t>::max()))
|
||||
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts inc too high");
|
||||
return {ErrorCodes::BadValue, "ts inc too high"};
|
||||
|
||||
const RecordId out = RecordId(opTime.getSecs(), opTime.getInc());
|
||||
const auto out = RecordId(opTime.getSecs(), opTime.getInc());
|
||||
if (out <= RecordId::minLong())
|
||||
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts too low");
|
||||
return {ErrorCodes::BadValue, "ts too low"};
|
||||
if (out >= RecordId::maxLong())
|
||||
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts too high");
|
||||
return {ErrorCodes::BadValue, "ts too high"};
|
||||
|
||||
return StatusWith<RecordId>(out);
|
||||
return out;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* data and len must be the arguments from RecordStore::insert() on an oplog collection.
|
||||
*/
|
||||
@@ -77,9 +78,9 @@ StatusWith<RecordId> extractKeyOptime(const char* data, int len) {
|
||||
const BSONObj obj(data);
|
||||
const BSONElement elem = obj["ts"];
|
||||
if (elem.eoo())
|
||||
return StatusWith<RecordId>(ErrorCodes::BadValue, "no ts field");
|
||||
return {ErrorCodes::BadValue, "no ts field"};
|
||||
if (elem.type() != bsonTimestamp)
|
||||
return StatusWith<RecordId>(ErrorCodes::BadValue, "ts must be a Timestamp");
|
||||
return {ErrorCodes::BadValue, "ts must be a Timestamp"};
|
||||
|
||||
return keyForOptime(elem.timestamp());
|
||||
}
|
||||
@@ -96,25 +97,28 @@ StatusWith<RecordId> keyForDoc(const BSONObj& doc,
|
||||
str::stream() << "Document " << redact(doc) << " is missing the '"
|
||||
<< clusterKeyField << "' field"};
|
||||
}
|
||||
if (collator) {
|
||||
BSONObjBuilder out;
|
||||
CollationIndexKey::collationAwareIndexKeyAppend(keyElement, collator, &out);
|
||||
return keyForElem(out.done().firstElement());
|
||||
}
|
||||
|
||||
return keyForElem(keyElement, collator);
|
||||
return keyForElem(keyElement);
|
||||
}
|
||||
|
||||
RecordId keyForElem(const BSONElement& elem, const CollatorInterface* collator) {
|
||||
RecordId keyForElem(const BSONElement& elem) {
|
||||
// Intentionally discard the TypeBits since the type information will be stored in the cluster
|
||||
// key of the original document. The consequence of this behavior is that cluster key values
|
||||
// that compare similarly, but are of different types may not be used concurrently.
|
||||
KeyString::Builder keyBuilder(KeyString::Version::kLatestVersion);
|
||||
if (collator) {
|
||||
BSONObjBuilder out;
|
||||
CollationIndexKey::collationAwareIndexKeyAppend(elem, collator, &out);
|
||||
keyBuilder.appendBSONElement(out.done().firstElement());
|
||||
} else {
|
||||
keyBuilder.appendBSONElement(elem);
|
||||
}
|
||||
keyBuilder.appendBSONElement(elem);
|
||||
return RecordId(keyBuilder.getBuffer(), keyBuilder.getSize());
|
||||
}
|
||||
|
||||
RecordId keyForObj(const BSONObj& obj) {
|
||||
return keyForElem(obj.firstElement());
|
||||
}
|
||||
|
||||
RecordId keyForOID(OID oid) {
|
||||
KeyString::Builder keyBuilder(KeyString::Version::kLatestVersion);
|
||||
keyBuilder.appendOID(oid);
|
||||
|
||||
@@ -39,6 +39,7 @@
|
||||
namespace mongo {
|
||||
class Timestamp;
|
||||
class RecordId;
|
||||
|
||||
namespace record_id_helpers {
|
||||
|
||||
/**
|
||||
@@ -53,7 +54,8 @@ StatusWith<RecordId> keyForOptime(const Timestamp& opTime);
|
||||
StatusWith<RecordId> keyForDoc(const BSONObj& doc,
|
||||
const ClusteredIndexSpec& indexSpec,
|
||||
const CollatorInterface* collator);
|
||||
RecordId keyForElem(const BSONElement& elem, const CollatorInterface* collator);
|
||||
RecordId keyForElem(const BSONElement& elem);
|
||||
RecordId keyForObj(const BSONObj& obj);
|
||||
RecordId keyForOID(OID oid);
|
||||
RecordId keyForDate(Date_t date);
|
||||
|
||||
|
||||
@@ -22,6 +22,9 @@ env.Library(
|
||||
source=[
|
||||
'tenant_migration_decoration.cpp',
|
||||
],
|
||||
LIBDEPS_PRIVATE=[
|
||||
'$BUILD_DIR/mongo/base',
|
||||
]
|
||||
)
|
||||
|
||||
env.Library(
|
||||
|
||||
@@ -247,14 +247,14 @@ DbCheckHasher::DbCheckHasher(OperationContext* opCtx,
|
||||
InternalPlanner::IXSCAN_FETCH);
|
||||
} else {
|
||||
CollectionScanParams params;
|
||||
params.minRecord = uassertStatusOK(
|
||||
params.minRecord = RecordIdBound(uassertStatusOK(
|
||||
record_id_helpers::keyForDoc(start.obj(),
|
||||
collection->getClusteredInfo()->getIndexSpec(),
|
||||
collection->getDefaultCollator()));
|
||||
params.maxRecord = uassertStatusOK(
|
||||
collection->getDefaultCollator())));
|
||||
params.maxRecord = RecordIdBound(uassertStatusOK(
|
||||
record_id_helpers::keyForDoc(end.obj(),
|
||||
collection->getClusteredInfo()->getIndexSpec(),
|
||||
collection->getDefaultCollator()));
|
||||
collection->getDefaultCollator())));
|
||||
params.boundInclusion = CollectionScanParams::ScanBoundInclusion::kIncludeEndRecordOnly;
|
||||
_exec = InternalPlanner::collectionScan(
|
||||
opCtx, &collection, params, PlanYieldPolicy::YieldPolicy::NO_YIELD);
|
||||
|
||||
@@ -734,16 +734,12 @@ StatusWith<std::vector<BSONObj>> _findOrDeleteDocuments(
|
||||
"bounded collection scans only support forward scans");
|
||||
}
|
||||
|
||||
auto collator = collection->getDefaultCollator();
|
||||
boost::optional<RecordId> minRecord, maxRecord;
|
||||
boost::optional<RecordIdBound> minRecord, maxRecord;
|
||||
if (!startKey.isEmpty()) {
|
||||
minRecord =
|
||||
RecordId(record_id_helpers::keyForElem(startKey.firstElement(), collator));
|
||||
minRecord = RecordIdBound(record_id_helpers::keyForObj(startKey));
|
||||
}
|
||||
|
||||
if (!endKey.isEmpty()) {
|
||||
maxRecord =
|
||||
RecordId(record_id_helpers::keyForElem(endKey.firstElement(), collator));
|
||||
maxRecord = RecordIdBound(record_id_helpers::keyForObj(endKey));
|
||||
}
|
||||
|
||||
planExecutor = isFind
|
||||
|
||||
@@ -31,7 +31,9 @@
|
||||
|
||||
#include "mongo/db/s/refine_collection_shard_key_coordinator.h"
|
||||
|
||||
#include "mongo/db/catalog/collection_uuid_mismatch.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/db_raii.h"
|
||||
#include "mongo/db/s/dist_lock_manager.h"
|
||||
#include "mongo/db/s/sharding_ddl_util.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
@@ -134,6 +136,11 @@ ExecutorFuture<void> RefineCollectionShardKeyCoordinator::_runImpl(
|
||||
sharding_ddl_util::stopMigrations(opCtx, nss(), boost::none);
|
||||
}
|
||||
|
||||
{
|
||||
AutoGetCollection coll{opCtx, nss(), MODE_IS};
|
||||
checkCollectionUUIDMismatch(opCtx, *coll, _doc.getCollectionUUID());
|
||||
}
|
||||
|
||||
const auto cmdResponse = uassertStatusOK(configShard->runCommand(
|
||||
opCtx,
|
||||
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
|
||||
|
||||
@@ -48,6 +48,7 @@
|
||||
#include "mongo/rpc/factory.h"
|
||||
#include "mongo/rpc/get_status_from_command_result.h"
|
||||
#include "mongo/rpc/reply_interface.h"
|
||||
#include "mongo/s/is_mongos.h"
|
||||
#include "mongo/stdx/future.h"
|
||||
#include "mongo/transport/service_entry_point.h"
|
||||
|
||||
@@ -297,7 +298,7 @@ SemiFuture<BSONObj> Transaction::_commitOrAbort(StringData dbName, StringData cm
|
||||
|
||||
BSONObjBuilder cmdBuilder;
|
||||
cmdBuilder.append(cmdName, 1);
|
||||
cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern.toBSON());
|
||||
cmdBuilder.append(WriteConcernOptions::kWriteConcernField, _writeConcern);
|
||||
auto cmdObj = cmdBuilder.obj();
|
||||
|
||||
return _txnClient->runCommand(dbName, cmdObj).semi();
|
||||
@@ -363,7 +364,7 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) {
|
||||
if (_state == TransactionState::kInit) {
|
||||
_state = TransactionState::kStarted;
|
||||
_sessionInfo.setStartTransaction(boost::none);
|
||||
cmdBuilder->append(_readConcern.toBSON().firstElement());
|
||||
cmdBuilder->append(repl::ReadConcernArgs::kReadConcernFieldName, _readConcern);
|
||||
}
|
||||
|
||||
_latestResponseHasTransientTransactionErrorLabel = false;
|
||||
@@ -379,35 +380,22 @@ void Transaction::processResponse(const BSONObj& reply) {
|
||||
}
|
||||
}
|
||||
|
||||
void Transaction::_setSessionInfo(LogicalSessionId lsid,
|
||||
TxnNumber txnNumber,
|
||||
boost::optional<TxnRetryCounter> txnRetryCounter) {
|
||||
void Transaction::_setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber) {
|
||||
_sessionInfo.setSessionId(lsid);
|
||||
_sessionInfo.setTxnNumber(txnNumber);
|
||||
_sessionInfo.setTxnRetryCounter(txnRetryCounter ? *txnRetryCounter : 0);
|
||||
}
|
||||
|
||||
void Transaction::primeForTransactionRetry() {
|
||||
_latestResponseHasTransientTransactionErrorLabel = false;
|
||||
switch (_execContext) {
|
||||
case ExecutionContext::kOwnSession:
|
||||
case ExecutionContext::kClientSession:
|
||||
case ExecutionContext::kClientRetryableWrite:
|
||||
// Advance txnNumber.
|
||||
_sessionInfo.setTxnNumber(*_sessionInfo.getTxnNumber() + 1);
|
||||
_sessionInfo.setStartTransaction(true);
|
||||
_state = TransactionState::kInit;
|
||||
return;
|
||||
case ExecutionContext::kClientSession:
|
||||
// Advance txnRetryCounter.
|
||||
_sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1);
|
||||
_sessionInfo.setStartTransaction(true);
|
||||
_state = TransactionState::kInit;
|
||||
return;
|
||||
case ExecutionContext::kClientRetryableWrite:
|
||||
// Advance txnRetryCounter.
|
||||
_sessionInfo.setTxnRetryCounter(*_sessionInfo.getTxnRetryCounter() + 1);
|
||||
_sessionInfo.setStartTransaction(true);
|
||||
_state = TransactionState::kInit;
|
||||
return;
|
||||
case ExecutionContext::kClientTransaction:
|
||||
// The outermost client handles retries.
|
||||
MONGO_UNREACHABLE;
|
||||
@@ -431,27 +419,27 @@ void Transaction::_primeTransaction(OperationContext* opCtx) {
|
||||
auto clientSession = opCtx->getLogicalSessionId();
|
||||
auto clientTxnNumber = opCtx->getTxnNumber();
|
||||
auto clientInMultiDocumentTransaction = opCtx->inMultiDocumentTransaction();
|
||||
auto clientTxnRetryCounter = opCtx->getTxnRetryCounter();
|
||||
|
||||
if (!clientSession) {
|
||||
// TODO SERVER-61783: Integrate session pool.
|
||||
_setSessionInfo(makeLogicalSessionId(opCtx), 0, 0);
|
||||
_setSessionInfo(makeLogicalSessionId(opCtx), 0 /* txnNumber */);
|
||||
_execContext = ExecutionContext::kOwnSession;
|
||||
} else if (!clientTxnNumber) {
|
||||
_setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0, 0);
|
||||
_setSessionInfo(makeLogicalSessionIdWithTxnUUID(*clientSession), 0 /* txnNumber */);
|
||||
_execContext = ExecutionContext::kClientSession;
|
||||
|
||||
// TODO SERVER-59186: Handle client session case.
|
||||
MONGO_UNREACHABLE;
|
||||
} else if (!clientInMultiDocumentTransaction) {
|
||||
_setSessionInfo(
|
||||
makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber), 0, 0);
|
||||
_setSessionInfo(makeLogicalSessionIdWithTxnNumberAndUUID(*clientSession, *clientTxnNumber),
|
||||
0 /* txnNumber */);
|
||||
_execContext = ExecutionContext::kClientRetryableWrite;
|
||||
|
||||
// TODO SERVER-59186: Handle client retryable write case.
|
||||
MONGO_UNREACHABLE;
|
||||
// TODO SERVER-59186: Handle client retryable write case on mongod. This is different from
|
||||
// mongos because only mongod checks out a transaction session for retryable writes.
|
||||
invariant(isMongos(), "This case is not yet supported on a mongod");
|
||||
} else {
|
||||
_setSessionInfo(*clientSession, *clientTxnNumber, clientTxnRetryCounter);
|
||||
_setSessionInfo(*clientSession, *clientTxnNumber);
|
||||
_execContext = ExecutionContext::kClientTransaction;
|
||||
|
||||
// TODO SERVER-59186: Handle client transaction case.
|
||||
@@ -460,9 +448,12 @@ void Transaction::_primeTransaction(OperationContext* opCtx) {
|
||||
_sessionInfo.setStartTransaction(true);
|
||||
_sessionInfo.setAutocommit(false);
|
||||
|
||||
// Extract non-session options.
|
||||
_readConcern = repl::ReadConcernArgs::get(opCtx);
|
||||
_writeConcern = opCtx->getWriteConcern();
|
||||
// Extract non-session options. Strip provenance so it can be correctly inferred for the
|
||||
// generated commands as if it came from an external client.
|
||||
_readConcern = repl::ReadConcernArgs::get(opCtx).toBSONInner().removeField(
|
||||
ReadWriteConcernProvenanceBase::kSourceFieldName);
|
||||
_writeConcern = opCtx->getWriteConcern().toBSON().removeField(
|
||||
ReadWriteConcernProvenanceBase::kSourceFieldName);
|
||||
|
||||
LOGV2_DEBUG(5875901,
|
||||
0, // TODO SERVER-61781: Raise verbosity.
|
||||
|
||||
@@ -282,8 +282,8 @@ public:
|
||||
|
||||
/**
|
||||
* Handles the given transaction result based on where the transaction is in its lifecycle and
|
||||
* its execution context, e.g. by updating its txnNumber or txnRetryCounter, and returns the
|
||||
* next step for the transaction runner.
|
||||
* its execution context, e.g. by updating its txnNumber, and returns the next step for the
|
||||
* transaction runner.
|
||||
*/
|
||||
ErrorHandlingStep handleError(const StatusWith<CommitResult>& swResult) const;
|
||||
|
||||
@@ -318,9 +318,7 @@ private:
|
||||
return std::make_unique<TxnMetadataHooks>(*this);
|
||||
}
|
||||
|
||||
void _setSessionInfo(LogicalSessionId lsid,
|
||||
TxnNumber txnNumber,
|
||||
boost::optional<TxnRetryCounter> txnRetryCounter);
|
||||
void _setSessionInfo(LogicalSessionId lsid, TxnNumber txnNumber);
|
||||
|
||||
SemiFuture<BSONObj> _commitOrAbort(StringData dbName, StringData cmdName);
|
||||
|
||||
@@ -337,8 +335,8 @@ private:
|
||||
bool _latestResponseHasTransientTransactionErrorLabel{false};
|
||||
|
||||
OperationSessionInfo _sessionInfo;
|
||||
repl::ReadConcernArgs _readConcern;
|
||||
WriteConcernOptions _writeConcern;
|
||||
BSONObj _writeConcern;
|
||||
BSONObj _readConcern;
|
||||
ExecutionContext _execContext;
|
||||
TransactionState _state{TransactionState::kInit};
|
||||
};
|
||||
|
||||
@@ -121,7 +121,6 @@ namespace {
|
||||
|
||||
void assertTxnMetadata(BSONObj obj,
|
||||
TxnNumber txnNumber,
|
||||
boost::optional<TxnRetryCounter> txnRetryCounter,
|
||||
boost::optional<bool> startTransaction,
|
||||
boost::optional<BSONObj> readConcern = boost::none,
|
||||
boost::optional<BSONObj> writeConcern = boost::none) {
|
||||
@@ -129,12 +128,6 @@ void assertTxnMetadata(BSONObj obj,
|
||||
ASSERT_EQ(obj["autocommit"].Bool(), false);
|
||||
ASSERT_EQ(obj["txnNumber"].Long(), txnNumber);
|
||||
|
||||
if (txnRetryCounter) {
|
||||
ASSERT_EQ(obj["txnRetryCounter"].Int(), *txnRetryCounter);
|
||||
} else {
|
||||
ASSERT(obj["txnRetryCounter"].eoo());
|
||||
}
|
||||
|
||||
if (startTransaction) {
|
||||
ASSERT_EQ(obj["startTransaction"].Bool(), *startTransaction);
|
||||
} else {
|
||||
@@ -190,13 +183,10 @@ protected:
|
||||
opCtx(), InlineQueuedCountingExecutor::make(), std::move(mockClient));
|
||||
}
|
||||
|
||||
void expectSentAbort(TxnNumber txnNumber,
|
||||
TxnRetryCounter txnRetryCounter,
|
||||
BSONObj writeConcern) {
|
||||
void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
txnNumber,
|
||||
txnRetryCounter,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
writeConcern);
|
||||
@@ -220,10 +210,8 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
|
||||
<< "documents" << BSON_ARRAY(BSON("x" << 1))))
|
||||
.get();
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
assertTxnMetadata(
|
||||
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
|
||||
|
||||
mockClient()->setNextCommandResponse(kOKInsertResponse);
|
||||
insertRes = txnClient
|
||||
@@ -235,7 +223,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */);
|
||||
|
||||
// The commit response.
|
||||
@@ -248,7 +235,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesTxnMetadata) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
WriteConcernOptions().toBSON() /* writeConcern */);
|
||||
@@ -284,7 +270,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
|
||||
mockClient()->setNextCommandResponse(kOKInsertResponse);
|
||||
@@ -297,7 +282,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */);
|
||||
|
||||
// Throw a transient error to verify the retries behavior.
|
||||
@@ -313,7 +297,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnCommit) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
writeConcern.toBSON());
|
||||
@@ -352,7 +335,7 @@ TEST_F(TxnAPITest, OwnSession_AttachesWriteConcernOnAbort) {
|
||||
});
|
||||
ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError);
|
||||
|
||||
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, writeConcern.toBSON());
|
||||
expectSentAbort(0 /* txnNumber */, writeConcern.toBSON());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -382,7 +365,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */,
|
||||
readConcern.toBSONInner());
|
||||
|
||||
@@ -397,7 +379,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */);
|
||||
|
||||
// Throw a transient error to verify the retry will still use the read concern.
|
||||
@@ -413,7 +394,6 @@ TEST_F(TxnAPITest, OwnSession_AttachesReadConcernOnStartTransaction) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
WriteConcernOptions().toBSON() /* writeConcern */);
|
||||
@@ -441,7 +421,7 @@ TEST_F(TxnAPITest, OwnSession_AbortsOnError) {
|
||||
});
|
||||
ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError);
|
||||
|
||||
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
|
||||
expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
|
||||
}
|
||||
|
||||
TEST_F(TxnAPITest, OwnSession_SkipsCommitIfNoCommandsWereRun) {
|
||||
@@ -466,9 +446,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
|
||||
attempt += 1;
|
||||
if (attempt > 0) {
|
||||
// Verify an abort was sent in between retries.
|
||||
expectSentAbort(attempt - 1 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
WriteConcernOptions().toBSON());
|
||||
expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
|
||||
}
|
||||
|
||||
mockClient()->setNextCommandResponse(attempt == 0 ? kNoSuchTransactionResponse
|
||||
@@ -492,7 +470,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
|
||||
return SemiFuture<void>::makeReady();
|
||||
@@ -503,7 +480,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientError) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
WriteConcernOptions().toBSON() /* writeConcern */);
|
||||
@@ -517,9 +493,7 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
|
||||
attempt += 1;
|
||||
if (attempt > 0) {
|
||||
// Verify an abort was sent in between retries.
|
||||
expectSentAbort(attempt - 1 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
WriteConcernOptions().toBSON());
|
||||
expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
|
||||
}
|
||||
|
||||
mockClient()->setNextCommandResponse(kOKInsertResponse);
|
||||
@@ -534,7 +508,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
|
||||
// The commit or implicit abort response.
|
||||
@@ -549,7 +522,6 @@ TEST_F(TxnAPITest, OwnSession_RetriesOnTransientClientError) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
WriteConcernOptions().toBSON() /* writeConcern */);
|
||||
@@ -569,10 +541,8 @@ TEST_F(TxnAPITest, OwnSession_CommitError) {
|
||||
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
|
||||
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
assertTxnMetadata(
|
||||
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
|
||||
|
||||
// The commit response.
|
||||
mockClient()->setNextCommandResponse(
|
||||
@@ -587,7 +557,7 @@ TEST_F(TxnAPITest, OwnSession_CommitError) {
|
||||
ASSERT(swResult.getValue().wcError.toStatus().isOK());
|
||||
ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::InternalError);
|
||||
|
||||
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
|
||||
expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
|
||||
}
|
||||
|
||||
TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
|
||||
@@ -597,9 +567,7 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
|
||||
attempt += 1;
|
||||
if (attempt > 0) {
|
||||
// Verify an abort was sent in between retries.
|
||||
expectSentAbort(attempt - 1 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
WriteConcernOptions().toBSON());
|
||||
expectSentAbort(attempt - 1 /* txnNumber */, WriteConcernOptions().toBSON());
|
||||
}
|
||||
|
||||
mockClient()->setNextCommandResponse(kOKInsertResponse);
|
||||
@@ -614,7 +582,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
|
||||
// Set commit and best effort abort response, if necessary.
|
||||
@@ -632,7 +599,6 @@ TEST_F(TxnAPITest, OwnSession_TransientCommitError) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
attempt /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
WriteConcernOptions().toBSON() /* writeConcern */);
|
||||
@@ -652,10 +618,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) {
|
||||
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
|
||||
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
assertTxnMetadata(
|
||||
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
|
||||
|
||||
// The commit response.
|
||||
mockClient()->setNextCommandResponse(
|
||||
@@ -669,7 +633,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitError) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
WriteConcernOptions().toBSON() /* writeConcern */);
|
||||
@@ -687,10 +650,8 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) {
|
||||
<< "documents" << BSON_ARRAY(BSON("x" << 1))))
|
||||
.get();
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
assertTxnMetadata(
|
||||
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
|
||||
|
||||
// The commit response.
|
||||
mockClient()->setNextCommandResponse(kResWithWriteConcernError);
|
||||
@@ -703,7 +664,7 @@ TEST_F(TxnAPITest, OwnSession_NonRetryableCommitWCError) {
|
||||
ASSERT_EQ(swResult.getValue().wcError.toStatus(), ErrorCodes::WriteConcernFailed);
|
||||
ASSERT_EQ(swResult.getValue().getEffectiveStatus(), ErrorCodes::WriteConcernFailed);
|
||||
|
||||
expectSentAbort(0 /* txnNumber */, 0 /* txnRetryCounter */, WriteConcernOptions().toBSON());
|
||||
expectSentAbort(0 /* txnNumber */, WriteConcernOptions().toBSON());
|
||||
}
|
||||
|
||||
TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
|
||||
@@ -719,10 +680,8 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
|
||||
ASSERT_OK(getStatusFromWriteCommandReply(insertRes));
|
||||
|
||||
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
|
||||
assertTxnMetadata(mockClient()->getLastSentRequest(),
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
true /* startTransaction */);
|
||||
assertTxnMetadata(
|
||||
mockClient()->getLastSentRequest(), 0 /* txnNumber */, true /* startTransaction */);
|
||||
|
||||
// The commit responses.
|
||||
mockClient()->setNextCommandResponse(kResWithRetryableWriteConcernError);
|
||||
@@ -735,7 +694,6 @@ TEST_F(TxnAPITest, OwnSession_RetryableCommitWCError) {
|
||||
auto lastRequest = mockClient()->getLastSentRequest();
|
||||
assertTxnMetadata(lastRequest,
|
||||
0 /* txnNumber */,
|
||||
0 /* txnRetryCounter */,
|
||||
boost::none /* startTransaction */,
|
||||
boost::none /* readConcern */,
|
||||
WriteConcernOptions().toBSON() /* writeConcern */);
|
||||
|
||||
@@ -446,24 +446,24 @@ private:
|
||||
* delete entries of type 'ObjectId'. All other collections must only delete entries of type
|
||||
* 'Date'.
|
||||
*/
|
||||
RecordId makeCollScanEndBound(const CollectionPtr& collection, Date_t expirationDate) {
|
||||
RecordIdBound makeCollScanEndBound(const CollectionPtr& collection, Date_t expirationDate) {
|
||||
if (collection->getTimeseriesOptions()) {
|
||||
auto endOID = OID();
|
||||
endOID.init(expirationDate, true /* max */);
|
||||
return record_id_helpers::keyForOID(endOID);
|
||||
return RecordIdBound(record_id_helpers::keyForOID(endOID));
|
||||
}
|
||||
|
||||
return record_id_helpers::keyForDate(expirationDate);
|
||||
return RecordIdBound(record_id_helpers::keyForDate(expirationDate));
|
||||
}
|
||||
|
||||
RecordId makeCollScanStartBound(const CollectionPtr& collection, const Date_t startDate) {
|
||||
RecordIdBound makeCollScanStartBound(const CollectionPtr& collection, const Date_t startDate) {
|
||||
if (collection->getTimeseriesOptions()) {
|
||||
auto startOID = OID();
|
||||
startOID.init(startDate, false /* max */);
|
||||
return record_id_helpers::keyForOID(startOID);
|
||||
return RecordIdBound(record_id_helpers::keyForOID(startOID));
|
||||
}
|
||||
|
||||
return record_id_helpers::keyForDate(startDate);
|
||||
return RecordIdBound(record_id_helpers::keyForDate(startDate));
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -209,10 +209,10 @@ public:
|
||||
_client.insert(ns.ns(), docs, ordered);
|
||||
}
|
||||
|
||||
// Returns the recordId generated by doc, assuming there's no collation and doc takes the shape
|
||||
// of {<cluster key> : <value>};
|
||||
RecordId getRecordIdForClusteredDoc(const BSONObj& doc) {
|
||||
return RecordId(record_id_helpers::keyForElem(doc.firstElement(), nullptr));
|
||||
// Returns the recordId generated by doc, assuming doc takes the shape of {<cluster key> :
|
||||
// <value>};
|
||||
RecordIdBound getRecordIdForClusteredDoc(const BSONObj& doc) {
|
||||
return RecordIdBound(record_id_helpers::keyForElem(doc.firstElement()));
|
||||
}
|
||||
|
||||
// Performs a bounded collection scan from 'minRecord' to 'maxRecord' in the specified
|
||||
@@ -231,8 +231,8 @@ public:
|
||||
CollectionScanParams params;
|
||||
params.tailable = false;
|
||||
params.direction = direction;
|
||||
params.minRecord = minRecord;
|
||||
params.maxRecord = maxRecord;
|
||||
params.minRecord = RecordIdBound(minRecord);
|
||||
params.maxRecord = RecordIdBound(maxRecord);
|
||||
|
||||
WorkingSet ws;
|
||||
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
|
||||
@@ -261,8 +261,8 @@ public:
|
||||
void runClusteredCollScanAndAssertContents(
|
||||
const NamespaceString& ns,
|
||||
CollectionScanParams::Direction direction,
|
||||
boost::optional<RecordId> minRecord,
|
||||
boost::optional<RecordId> maxRecord,
|
||||
boost::optional<RecordIdBound> minRecord,
|
||||
boost::optional<RecordIdBound> maxRecord,
|
||||
CollectionScanParams::ScanBoundInclusion boundInclusion,
|
||||
const vector<BSONObj>& expectedResults,
|
||||
const MatchExpression* filter = nullptr) {
|
||||
@@ -614,8 +614,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredMinMax) {
|
||||
CollectionScanParams params;
|
||||
params.direction = CollectionScanParams::FORWARD;
|
||||
params.tailable = false;
|
||||
params.minRecord = recordIds[0];
|
||||
params.maxRecord = recordIds[recordIds.size() - 1];
|
||||
params.minRecord = RecordIdBound(recordIds[0]);
|
||||
params.maxRecord = RecordIdBound(recordIds[recordIds.size() - 1]);
|
||||
|
||||
WorkingSet ws;
|
||||
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
|
||||
@@ -768,8 +768,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredMinMaxDateExclusi
|
||||
params.tailable = false;
|
||||
params.direction = direction;
|
||||
|
||||
params.minRecord = record_id_helpers::keyForDate(minDate);
|
||||
params.maxRecord = record_id_helpers::keyForDate(maxDate);
|
||||
params.minRecord = RecordIdBound(record_id_helpers::keyForDate(minDate));
|
||||
params.maxRecord = RecordIdBound(record_id_helpers::keyForDate(maxDate));
|
||||
|
||||
// Exclude all but the record with _id 'middleDate' from the scan.
|
||||
StatusWithMatchExpression swMatch = MatchExpressionParser::parse(
|
||||
@@ -821,8 +821,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredReverse) {
|
||||
params.tailable = false;
|
||||
// The last entry in recordIds is the lowest record in the collection and the first entry is the
|
||||
// highest.
|
||||
params.minRecord = recordIds[recordIds.size() - 1];
|
||||
params.maxRecord = recordIds[0];
|
||||
params.minRecord = RecordIdBound(recordIds[recordIds.size() - 1]);
|
||||
params.maxRecord = RecordIdBound(recordIds[0]);
|
||||
|
||||
WorkingSet ws;
|
||||
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
|
||||
@@ -865,8 +865,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredMinMaxFullObjectI
|
||||
params.tailable = false;
|
||||
|
||||
// Expect to see all records.
|
||||
params.minRecord = record_id_helpers::keyForOID(OID());
|
||||
params.maxRecord = record_id_helpers::keyForOID(OID::max());
|
||||
params.minRecord = RecordIdBound(record_id_helpers::keyForOID(OID()));
|
||||
params.maxRecord = RecordIdBound(record_id_helpers::keyForOID(OID::max()));
|
||||
|
||||
WorkingSet ws;
|
||||
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
|
||||
@@ -913,8 +913,8 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredInnerRange) {
|
||||
ASSERT_LT(startOffset, recordIds.size());
|
||||
ASSERT_LT(endOffset, recordIds.size());
|
||||
|
||||
params.minRecord = recordIds[startOffset];
|
||||
params.maxRecord = recordIds[endOffset];
|
||||
params.minRecord = RecordIdBound(recordIds[startOffset]);
|
||||
params.maxRecord = RecordIdBound(recordIds[endOffset]);
|
||||
|
||||
WorkingSet ws;
|
||||
auto scan = std::make_unique<CollectionScan>(_expCtx.get(), coll, params, &ws, nullptr);
|
||||
@@ -962,15 +962,20 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredInnerRangeExclusi
|
||||
ASSERT_LT(startOffset, recordIds.size());
|
||||
ASSERT_LT(endOffset, recordIds.size());
|
||||
|
||||
params.minRecord = recordIds[startOffset];
|
||||
params.maxRecord = recordIds[endOffset];
|
||||
params.minRecord = RecordIdBound(recordIds[startOffset]);
|
||||
params.maxRecord = RecordIdBound(recordIds[endOffset]);
|
||||
|
||||
// Provide RecordId bounds with exclusive filters.
|
||||
StatusWithMatchExpression swMatch = MatchExpressionParser::parse(
|
||||
fromjson(fmt::sprintf(
|
||||
"{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
|
||||
record_id_helpers::toBSONAs(*params.minRecord, "").firstElement().OID().toString(),
|
||||
record_id_helpers::toBSONAs(*params.maxRecord, "").firstElement().OID().toString())),
|
||||
fromjson(fmt::sprintf("{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
|
||||
record_id_helpers::toBSONAs(params.minRecord->recordId(), "")
|
||||
.firstElement()
|
||||
.OID()
|
||||
.toString(),
|
||||
record_id_helpers::toBSONAs(params.maxRecord->recordId(), "")
|
||||
.firstElement()
|
||||
.OID()
|
||||
.toString())),
|
||||
_expCtx.get());
|
||||
ASSERT_OK(swMatch.getStatus());
|
||||
auto filter = std::move(swMatch.getValue());
|
||||
@@ -1024,15 +1029,20 @@ TEST_F(QueryStageCollectionScanTest, QueryTestCollscanClusteredInnerRangeExclusi
|
||||
|
||||
// The last entry in recordIds is the lowest record in the collection and the first entry is the
|
||||
// highest.
|
||||
params.minRecord = recordIds[endOffset];
|
||||
params.maxRecord = recordIds[startOffset];
|
||||
params.minRecord = RecordIdBound(recordIds[endOffset]);
|
||||
params.maxRecord = RecordIdBound(recordIds[startOffset]);
|
||||
|
||||
// Provide RecordId bounds with exclusive filters.
|
||||
StatusWithMatchExpression swMatch = MatchExpressionParser::parse(
|
||||
fromjson(fmt::sprintf(
|
||||
"{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
|
||||
record_id_helpers::toBSONAs(*params.minRecord, "").firstElement().OID().toString(),
|
||||
record_id_helpers::toBSONAs(*params.maxRecord, "").firstElement().OID().toString())),
|
||||
fromjson(fmt::sprintf("{_id: {$gt: ObjectId('%s'), $lt: ObjectId('%s')}}",
|
||||
record_id_helpers::toBSONAs(params.minRecord->recordId(), "")
|
||||
.firstElement()
|
||||
.OID()
|
||||
.toString(),
|
||||
record_id_helpers::toBSONAs(params.maxRecord->recordId(), "")
|
||||
.firstElement()
|
||||
.OID()
|
||||
.toString())),
|
||||
_expCtx.get());
|
||||
ASSERT_OK(swMatch.getStatus());
|
||||
auto filter = std::move(swMatch.getValue());
|
||||
|
||||
@@ -1104,7 +1104,7 @@ commands:
|
||||
anyTypeField: IDLAnyTypeOwned
|
||||
|
||||
AccessCheckNone:
|
||||
description: A versioned API command with access_check and none
|
||||
description: A Stable API command with access_check and none
|
||||
command_name: AccessCheckNoneCommandName
|
||||
namespace: ignored
|
||||
strict: true
|
||||
@@ -1114,7 +1114,7 @@ commands:
|
||||
reply_type: OkReply
|
||||
|
||||
AccessCheckSimpleAccessCheck:
|
||||
description: A versioned API command with access_check and simple check
|
||||
description: A Stable API command with access_check and simple check
|
||||
command_name: AccessCheckSimpleAccessCheckCommandName
|
||||
namespace: ignored
|
||||
strict: true
|
||||
@@ -1125,7 +1125,7 @@ commands:
|
||||
reply_type: OkReply
|
||||
|
||||
AccessCheckSimplePrivilege:
|
||||
description: A versioned API command with access_check and privilege
|
||||
description: A Stable API command with access_check and privilege
|
||||
command_name: AccessCheckSimplePrivilegeCommandName
|
||||
namespace: ignored
|
||||
strict: true
|
||||
@@ -1139,7 +1139,7 @@ commands:
|
||||
|
||||
|
||||
AccessCheckComplexPrivilege:
|
||||
description: A versioned API command with access_check complex
|
||||
description: A Stable API command with access_check complex
|
||||
command_name: AccessCheckComplexPrivilegeCommandName
|
||||
namespace: ignored
|
||||
strict: true
|
||||
@@ -1159,10 +1159,10 @@ commands:
|
||||
- check: is_authorized_to_parse_namespace_element
|
||||
reply_type: OkReply
|
||||
|
||||
# Test that we correctly generate C++ base classes for versioned API commands with different
|
||||
# Test that we correctly generate C++ base classes for Stable API commands with different
|
||||
# key names, command names, and C++ names.
|
||||
APIVersion1CommandIDLName:
|
||||
description: A versioned API command
|
||||
description: A Stable API command
|
||||
command_name: APIVersion1CommandRuntimeName
|
||||
namespace: ignored
|
||||
strict: true
|
||||
@@ -1172,7 +1172,7 @@ commands:
|
||||
reply_type: OkReply
|
||||
|
||||
APIVersion1CommandIDLName2:
|
||||
description: A versioned API command
|
||||
description: A Stable API command
|
||||
command_name: APIVersion1CommandRuntimeName2
|
||||
cpp_name: APIVersion1CommandCPPName2
|
||||
namespace: ignored
|
||||
@@ -1184,7 +1184,7 @@ commands:
|
||||
|
||||
# Test whether the C++ code for a command with alias name is currently generated.
|
||||
APIVersion1CommandWithAlias:
|
||||
description: A versioned API command with alias
|
||||
description: A Stable API command with alias
|
||||
command_name: NewCommandName
|
||||
command_alias: OldCommandName
|
||||
namespace: ignored
|
||||
|
||||
@@ -62,11 +62,9 @@
|
||||
|
||||
namespace mongo {
|
||||
|
||||
void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
|
||||
const BSONElement& wcErrorElem,
|
||||
BSONObjBuilder& responseBuilder) {
|
||||
WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem);
|
||||
|
||||
void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId,
|
||||
WriteConcernErrorDetail wcError,
|
||||
BSONObjBuilder& responseBuilder) {
|
||||
auto status = wcError.toStatus();
|
||||
wcError.setStatus(
|
||||
status.withReason(str::stream() << status.reason() << " at " << shardId.toString()));
|
||||
@@ -74,6 +72,13 @@ void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
|
||||
responseBuilder.append("writeConcernError", wcError.toBSON());
|
||||
}
|
||||
|
||||
void appendWriteConcernErrorToCmdResponse(const ShardId& shardId,
|
||||
const BSONElement& wcErrorElem,
|
||||
BSONObjBuilder& responseBuilder) {
|
||||
WriteConcernErrorDetail wcError = getWriteConcernErrorDetail(wcErrorElem);
|
||||
appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, responseBuilder);
|
||||
}
|
||||
|
||||
boost::intrusive_ptr<ExpressionContext> makeExpressionContextWithDefaultsForTargeter(
|
||||
OperationContext* opCtx,
|
||||
const NamespaceString& nss,
|
||||
|
||||
@@ -52,6 +52,13 @@ struct RawResponsesResult {
|
||||
boost::optional<Status> firstStaleConfigError;
|
||||
};
|
||||
|
||||
/**
|
||||
* This function appends the provided WriteConcernErrorDetail to the sharded response.
|
||||
*/
|
||||
void appendWriteConcernErrorDetailToCmdResponse(const ShardId& shardId,
|
||||
WriteConcernErrorDetail wcError,
|
||||
BSONObjBuilder& responseBuilder);
|
||||
|
||||
/**
|
||||
* This function appends the provided writeConcernError BSONElement to the sharded response.
|
||||
*/
|
||||
|
||||
@@ -119,6 +119,7 @@ env.Library(
|
||||
'$BUILD_DIR/mongo/db/ftdc/ftdc_server',
|
||||
'$BUILD_DIR/mongo/db/index_commands_idl',
|
||||
'$BUILD_DIR/mongo/db/initialize_api_parameters',
|
||||
'$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
|
||||
'$BUILD_DIR/mongo/db/query/command_request_response',
|
||||
'$BUILD_DIR/mongo/db/query/cursor_response_idl',
|
||||
'$BUILD_DIR/mongo/db/query/map_reduce_output_format',
|
||||
@@ -131,6 +132,7 @@ env.Library(
|
||||
'$BUILD_DIR/mongo/db/stats/counters',
|
||||
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
|
||||
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
|
||||
'$BUILD_DIR/mongo/db/transaction_api',
|
||||
'$BUILD_DIR/mongo/db/views/views',
|
||||
'$BUILD_DIR/mongo/executor/async_multicaster',
|
||||
'$BUILD_DIR/mongo/executor/async_request_executor',
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "mongo/base/status_with.h"
|
||||
@@ -38,9 +40,13 @@
|
||||
#include "mongo/db/catalog/document_validation.h"
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/commands/update_metrics.h"
|
||||
#include "mongo/db/internal_transactions_feature_flag_gen.h"
|
||||
#include "mongo/db/ops/write_ops_gen.h"
|
||||
#include "mongo/db/query/collation/collator_factory_interface.h"
|
||||
#include "mongo/db/storage/duplicate_key_error_info.h"
|
||||
#include "mongo/db/transaction_api.h"
|
||||
#include "mongo/executor/task_executor_pool.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/rpc/get_status_from_command_result.h"
|
||||
#include "mongo/s/balancer_configuration.h"
|
||||
#include "mongo/s/catalog_cache.h"
|
||||
@@ -122,6 +128,41 @@ BSONObj getShardKey(OperationContext* opCtx,
|
||||
return shardKey;
|
||||
}
|
||||
|
||||
void handleWouldChangeOwningShardErrorRetryableWrite(
|
||||
OperationContext* opCtx,
|
||||
const ShardId& shardId,
|
||||
const NamespaceString& nss,
|
||||
const write_ops::FindAndModifyCommandRequest& request,
|
||||
BSONObjBuilder* result) {
|
||||
auto txn = std::make_shared<txn_api::TransactionWithRetries>(
|
||||
opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
|
||||
|
||||
auto swResult = txn->runSyncNoThrow(
|
||||
opCtx,
|
||||
[cmdObj = request.toBSON({}), nss, result](const txn_api::TransactionClient& txnClient,
|
||||
ExecutorPtr txnExec) {
|
||||
auto res = txnClient.runCommand(nss.db(), cmdObj).get();
|
||||
uassertStatusOK(getStatusFromCommandResult(res));
|
||||
|
||||
result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(res));
|
||||
|
||||
return SemiFuture<void>::makeReady();
|
||||
});
|
||||
|
||||
auto cmdStatus = swResult.getStatus();
|
||||
if (cmdStatus != ErrorCodes::DuplicateKey ||
|
||||
(cmdStatus == ErrorCodes::DuplicateKey &&
|
||||
!cmdStatus.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
|
||||
cmdStatus.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
|
||||
};
|
||||
uassertStatusOK(cmdStatus);
|
||||
|
||||
const auto& wcError = swResult.getValue().wcError;
|
||||
if (!wcError.toStatus().isOK()) {
|
||||
appendWriteConcernErrorDetailToCmdResponse(shardId, wcError, *result);
|
||||
}
|
||||
}
|
||||
|
||||
void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
|
||||
const NamespaceString nss,
|
||||
Status responseStatus,
|
||||
@@ -402,47 +443,23 @@ private:
|
||||
|
||||
if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) {
|
||||
if (isRetryableWrite) {
|
||||
RouterOperationContextSession routerSession(opCtx);
|
||||
try {
|
||||
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
|
||||
readConcernArgs =
|
||||
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
|
||||
|
||||
// Re-run the findAndModify command that will change the shard key value in a
|
||||
// transaction. We call _runCommand recursively, and this second time through
|
||||
// since it will be run as a transaction it will take the other code path to
|
||||
// updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried
|
||||
// operation does not include WC inside the transaction by stripping it from the
|
||||
// cmdObj. The transaction commit will still use the WC, because it uses the WC
|
||||
// from the opCtx (which has been set previously in Strategy).
|
||||
documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
|
||||
_runCommand(opCtx,
|
||||
shardId,
|
||||
shardVersion,
|
||||
dbVersion,
|
||||
nss,
|
||||
stripWriteConcern(cmdObj),
|
||||
result);
|
||||
uassertStatusOK(getStatusFromCommandResult(result->asTempObj()));
|
||||
auto commitResponse =
|
||||
documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
|
||||
|
||||
uassertStatusOK(getStatusFromCommandResult(commitResponse));
|
||||
if (auto wcErrorElem = commitResponse["writeConcernError"]) {
|
||||
appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
|
||||
}
|
||||
} catch (DBException& e) {
|
||||
if (e.code() != ErrorCodes::DuplicateKey ||
|
||||
(e.code() == ErrorCodes::DuplicateKey &&
|
||||
!e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
|
||||
e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
|
||||
};
|
||||
|
||||
auto txnRouterForAbort = TransactionRouter::get(opCtx);
|
||||
if (txnRouterForAbort)
|
||||
txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus());
|
||||
|
||||
throw;
|
||||
if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
|
||||
serverGlobalParams.featureCompatibility)) {
|
||||
auto parsedRequest = write_ops::FindAndModifyCommandRequest::parse(
|
||||
IDLParserErrorContext("ClusterFindAndModify"), cmdObj);
|
||||
// Strip write concern because this command will be sent as part of a
|
||||
// transaction and the write concern has already been loaded onto the opCtx and
|
||||
// will be picked up by the transaction API.
|
||||
//
|
||||
// Strip runtime constants because they will be added again when this command is
|
||||
// recursively sent through the service entry point.
|
||||
parsedRequest.setWriteConcern(boost::none);
|
||||
parsedRequest.setLegacyRuntimeConstants(boost::none);
|
||||
handleWouldChangeOwningShardErrorRetryableWrite(
|
||||
opCtx, shardId, nss, parsedRequest, result);
|
||||
} else {
|
||||
_handleWouldChangeOwningShardErrorRetryableWriteLegacy(
|
||||
opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result);
|
||||
}
|
||||
} else {
|
||||
updateShardKeyValueOnWouldChangeOwningShardError(
|
||||
@@ -462,6 +479,53 @@ private:
|
||||
CommandHelpers::filterCommandReplyForPassthrough(response.data));
|
||||
}
|
||||
|
||||
// TODO SERVER-62375: Remove after 6.0 is released.
|
||||
static void _handleWouldChangeOwningShardErrorRetryableWriteLegacy(
|
||||
OperationContext* opCtx,
|
||||
const ShardId& shardId,
|
||||
const boost::optional<ChunkVersion>& shardVersion,
|
||||
const boost::optional<DatabaseVersion>& dbVersion,
|
||||
const NamespaceString& nss,
|
||||
const BSONObj& cmdObj,
|
||||
BSONObjBuilder* result) {
|
||||
RouterOperationContextSession routerSession(opCtx);
|
||||
try {
|
||||
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
|
||||
readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
|
||||
|
||||
// Re-run the findAndModify command that will change the shard key value in a
|
||||
// transaction. We call _runCommand recursively, and this second time through
|
||||
// since it will be run as a transaction it will take the other code path to
|
||||
// updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried
|
||||
// operation does not include WC inside the transaction by stripping it from the
|
||||
// cmdObj. The transaction commit will still use the WC, because it uses the WC
|
||||
// from the opCtx (which has been set previously in Strategy).
|
||||
documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
|
||||
_runCommand(
|
||||
opCtx, shardId, shardVersion, dbVersion, nss, stripWriteConcern(cmdObj), result);
|
||||
uassertStatusOK(getStatusFromCommandResult(result->asTempObj()));
|
||||
auto commitResponse =
|
||||
documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
|
||||
|
||||
uassertStatusOK(getStatusFromCommandResult(commitResponse));
|
||||
if (auto wcErrorElem = commitResponse["writeConcernError"]) {
|
||||
appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
|
||||
}
|
||||
} catch (DBException& e) {
|
||||
if (e.code() != ErrorCodes::DuplicateKey ||
|
||||
(e.code() == ErrorCodes::DuplicateKey &&
|
||||
!e.extraInfo<DuplicateKeyErrorInfo>()->getKeyPattern().hasField("_id"))) {
|
||||
e.addContext(documentShardKeyUpdateUtil::kNonDuplicateKeyErrorContext);
|
||||
};
|
||||
|
||||
auto txnRouterForAbort = TransactionRouter::get(opCtx);
|
||||
if (txnRouterForAbort)
|
||||
txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus());
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
// Update related command execution metrics.
|
||||
UpdateMetrics _updateMetrics;
|
||||
} findAndModifyCmd;
|
||||
|
||||
@@ -68,6 +68,7 @@ public:
|
||||
// Send it to the primary shard
|
||||
RefineCollectionShardKeyRequest requestParamObj;
|
||||
requestParamObj.setNewShardKey(request().getKey());
|
||||
requestParamObj.setCollectionUUID(request().getCollectionUUID());
|
||||
ShardsvrRefineCollectionShardKey refineCollectionShardKeyCommand(nss);
|
||||
refineCollectionShardKeyCommand.setRefineCollectionShardKeyRequest(requestParamObj);
|
||||
|
||||
|
||||
@@ -47,6 +47,10 @@ commands:
|
||||
type: object
|
||||
description: "The index specification document to use as the new shard key."
|
||||
optional: false
|
||||
collectionUUID:
|
||||
type: uuid
|
||||
description: "The expected UUID of the collection."
|
||||
optional: true
|
||||
|
||||
_configsvrRefineCollectionShardKey:
|
||||
command_name: _configsvrRefineCollectionShardKey
|
||||
|
||||
@@ -161,6 +161,10 @@ structs:
|
||||
type: KeyPattern
|
||||
description: "The index specification document to use as the new shard key."
|
||||
optional: false
|
||||
collectionUUID:
|
||||
type: uuid
|
||||
description: "The expected UUID of the collection."
|
||||
optional: true
|
||||
|
||||
ReshardCollectionRequest:
|
||||
description: "Parameters for the reshard collection command"
|
||||
|
||||
@@ -83,11 +83,11 @@ configs:
|
||||
arg_vartype: String
|
||||
cpp_varname: shellGlobalParams.apiVersion
|
||||
"apiStrict":
|
||||
description: "disable all features not included in the MongoDB Versioned API"
|
||||
description: "disable all features not included in the MongoDB Stable API"
|
||||
arg_vartype: Switch
|
||||
cpp_varname: shellGlobalParams.apiStrict
|
||||
"apiDeprecationErrors":
|
||||
description: "disable all features deprecated in the MongoDB Versioned API"
|
||||
description: "disable all features deprecated in the MongoDB Stable API"
|
||||
arg_vartype: Switch
|
||||
cpp_varname: shellGlobalParams.apiDeprecationErrors
|
||||
"objcheck":
|
||||
|
||||
@@ -577,7 +577,7 @@ ConnectionRegistry::ConnectionRegistry() = default;
|
||||
void ConnectionRegistry::registerConnection(DBClientBase& client, StringData uri) {
|
||||
BSONObj info;
|
||||
BSONObj command;
|
||||
// If apiStrict is set override it, whatsmyuri is not in the Versioned API.
|
||||
// If apiStrict is set override it, whatsmyuri is not in the Stable API.
|
||||
if (client.getApiParameters().getStrict()) {
|
||||
command = BSON("whatsmyuri" << 1 << "apiStrict" << false);
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user