SERVER-120339 Implement asynchronous recovery mechanism for authoritative CSS metadata (#48765)

Co-authored-by: Pol Piñol Castuera <pol.pinol@mongodb.com>
GitOrigin-RevId: e5c80f9071897fb9724c5ebd6eac41c08ec10aa6
This commit is contained in:
Jordi Olivares Provencio
2026-03-09 16:39:05 +01:00
committed by MongoDB Bot
parent 7c9c595be5
commit ee54c7ce97
8 changed files with 735 additions and 12 deletions

View File

@@ -168,6 +168,17 @@ public:
const OID& epoch,
const Timestamp& timestamp);
/**
* A helper method for using this class with PersistentTaskStore that returns an otherwise
* invalid ChunkType object without the proper chunk version set. It is the caller's
* responsibility to ensure the ChunkVersion is fixed afterwards.
*
* TODO SERVER-121075: See if this can be removed.
*/
MONGO_MOD_PRIVATE static ChunkType parse(const BSONObj& source, const IDLParserContext& ctxt) {
return uassertStatusOK(parseFromConfigBSON(source, OID(), Timestamp()));
}
/**
* Constructs a new ChunkType object from BSON with the following format:
* {_id: <>, max: <>, shard: <>, history: <>, lastmod: <>, onCurrentShardSince: <>}

View File

@@ -444,6 +444,7 @@ mongo_cc_library(
"//src/mongo/db/s/resharding:resharding_txn_cloner.cpp",
"//src/mongo/db/s/resharding:resharding_txn_cloner_progress_gen",
"//src/mongo/db/s/resharding:resharding_util.cpp",
"//src/mongo/db/shard_role/shard_catalog:collection_cache_recoverer.cpp",
"//src/mongo/db/shard_role/shard_catalog:collection_critical_section_document_gen",
"//src/mongo/db/shard_role/shard_catalog:collection_sharding_runtime.cpp",
"//src/mongo/db/shard_role/shard_catalog:collection_sharding_state_factory_shard.cpp",
@@ -885,6 +886,7 @@ mongo_cc_unit_test(
"//src/mongo/db/global_catalog/metadata_consistency_validation:metadata_consistency_util_test.cpp",
"//src/mongo/db/router_role/routing_cache:namespace_metadata_change_notifications_test.cpp",
"//src/mongo/db/router_role/routing_cache:shard_server_catalog_cache_loader_test.cpp",
"//src/mongo/db/shard_role/shard_catalog:collection_cache_recoverer_test.cpp",
"//src/mongo/db/shard_role/shard_catalog:collection_metadata_filtering_test.cpp",
"//src/mongo/db/shard_role/shard_catalog:collection_metadata_test.cpp",
"//src/mongo/db/shard_role/shard_catalog:collection_sharding_runtime_test.cpp",

View File

@@ -0,0 +1,257 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/shard_role/shard_catalog/collection_cache_recoverer.h"
#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/global_catalog/type_collection.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/topology/sharding_state.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
namespace {
CollectionMetadata recoverCollectionFromDisk(OperationContext* opCtx,
repl::OpTime timestampToReadAt,
const NamespaceString& nss) {
// Setup the snapshot timestamp on the opCtx.
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs{LogicalTime{timestampToReadAt.getTimestamp()},
repl::ReadConcernLevel::kSnapshotReadConcern};
// TODO SERVER-121199: This disk recovery should be using the same aggregation that we use on
// the CSRS to refresh the chunks. Right now it's fully restoring everything from disk. The sort
// that happens afterwards should also go away as a result.
// TODO SERVER-119940: Review whether we need to parse only a subset of the information present
// on the collection. Right now we're parsing it fully as if we're the CSRS even if never
// accessing certain fields.
struct CollectionParsed {
static CollectionParsed parse(const BSONObj& obj, const IDLParserContext&) {
return {CollectionType{obj}};
}
CollectionType content;
};
PersistentTaskStore<CollectionParsed> collStore{
NamespaceString::kConfigShardCatalogCollectionsNamespace};
boost::optional<CollectionType> coll;
collStore.forEach(opCtx,
BSON(CollectionType::kNssFieldName << nss.toStringForResourceId()),
[&](const CollectionParsed& parsedColl) {
coll = parsedColl.content;
return false;
});
// TODO SERVER-121201: Handle the case where there is no entry present for the collection
// meaning the collection is not owned by the shard at the current time.
tassert(12033905, "Expected to have a collection entry present but there is none", coll);
// TODO SERVER-119940: Review whether we need to parse only a subset of the information present
// on the chunk. Right now we're parsing it fully as if we're the CSRS even if never accessing
// certain fields.
std::vector<ChunkType> chunks;
PersistentTaskStore<ChunkType> chunkStore{NamespaceString::kConfigShardCatalogChunksNamespace};
chunkStore.forEach(
opCtx, BSON(ChunkType::collectionUUID() << coll->getUuid()), [&](const ChunkType& chunk) {
auto& inserted = chunks.emplace_back(std::move(chunk));
const auto& version = inserted.getVersion();
auto fixedVersion = ChunkVersion{{coll->getEpoch(), coll->getTimestamp()},
{version.majorVersion(), version.minorVersion()}};
inserted.setVersion(std::move(fixedVersion));
return true;
});
// TODO SERVER-121201: Handle the case where there is a collection entry but no chunks present
// for it.
tassert(12033904,
"Expected to have at least one chunk entry for the collection but there are none",
!chunks.empty());
// At this point we can just sort it based on the lastmod version since we know the
// timestamp/uuid is fixed across all chunks for that collection. This is necessary as
// per the requirements of the routing table and could potentially go away if we used a
// different data structure in order to support the delta oplog entries.
std::sort(chunks.begin(), chunks.end(), [](const ChunkType& left, const ChunkType& right) {
return left.getVersion().toLong() < right.getVersion().toLong();
});
auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
if (auto collation = coll->getDefaultCollation(); !collation.isEmpty()) {
// The collation should have been validated upon collection creation
return uassertStatusOK(
CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation));
}
return nullptr;
}();
auto rt = OptionalRoutingTableHistory{std::make_shared<RoutingTableHistory>(
RoutingTableHistory::makeNew(coll->getNss(),
coll->getUuid(),
coll->getKeyPattern(),
coll->getUnsplittable(),
std::move(defaultCollator),
coll->getUnique(),
coll->getEpoch(),
coll->getTimestamp(),
coll->getTimeseriesFields(),
coll->getReshardingFields(),
coll->getAllowMigrations(),
chunks))};
auto cm = CurrentChunkManager{std::move(rt)};
return CollectionMetadata{std::move(cm), ShardingState::get(opCtx)->shardId()};
}
} // namespace
Status CollectionCacheRecoverer::waitForInitialPass(OperationContext* opCtx) {
stdx::unique_lock lk(_mutex);
tassert(12033900,
"Attempting to recover without a valid collection metadata setup or without setting up "
"async recovery",
_collMetadata.valid());
return _collMetadata.getNoThrow(opCtx).getStatus();
}
void CollectionCacheRecoverer::start(OperationContext* opCtx, ExecutorPtr executor) {
std::lock_guard lk(_mutex);
if (_collMetadata.valid()) {
// We got created with an already existing CollectionMetadata, there's nothing to read from
// disk.
return;
}
// We first have to wait until the lastWritten batch of oplog entries has been
// applied and can be used for snapshot reads. This is because not doing so exposes
// the recovery process to a race condition with oplog application.
//
// Consider the following scenario:
// * A secondary has fetched an oplog batch containing oplog 'c' entries for the CSS
// * The secondary applies the batch
// * Concurrently we also initiate recovery from disk
// Now the last two operations can race in such a way that recovery would use a timestamp
// from before the oplog 'c' entry was committed to the oplog and that entry becomes lost
// since it was applied before the recovery process was installed. As a result, the recovery
// would end up installing the wrong CSS state since that entry would not be present in the
// queue even if it logically came after the timestamp used by the recovery process.
//
// To prevent this scenario we write down the lastWritten timestamp after installing
// the recovery process. This will ensure that all oplog entries that come after
// will see the recovery process in place and enqueue themselves.
_timestampToReadAt = repl::ReplicationCoordinator::get(opCtx)->getMyLastWrittenOpTime();
_collMetadata =
repl::ReplicationCoordinator::get(opCtx)
->registerWaiterForMajorityReadOpTime(opCtx, _timestampToReadAt)
.thenRunOn(executor)
.then([nss = _nss,
svcCtx = opCtx->getService(),
token = _cancellationSource.token(),
timestampToReadAt = _timestampToReadAt,
executor] {
ThreadClient client{"CSR-Recovery", getGlobalServiceContext()->getService()};
auto opCtx =
CancelableOperationContext{client->makeOperationContext(), token, executor};
LOGV2_INFO(12033903,
"Wait for stable timestamp finished, proceeding to read disk contents",
"collection"_attr = nss.toStringForErrorMsg());
auto metadata = recoverCollectionFromDisk(opCtx.get(), timestampToReadAt, nss);
LOGV2_INFO(12033902,
"Disk contents have been read",
"collection"_attr = nss.toStringForErrorMsg());
return metadata;
})
.onCompletion(([nss = _nss](const auto& status) {
if (!status.isOK()) {
LOGV2_WARNING(12033901,
"Encountered failure during disk recovery",
"error"_attr = status.getStatus(),
"collection"_attr = nss.toStringForErrorMsg());
}
return status;
}))
.semi();
}
void CollectionCacheRecoverer::onOplogEntry(
OperationContext* opCtx,
Timestamp entryTs,
const InvalidateCollectionShardingStateOplogEntry& entry) {
stdx::lock_guard lk(_mutex);
if (_timestampToReadAt.getTimestamp() < entryTs) {
return;
}
_entriesToApply.emplace(entryTs, entry);
}
void CollectionCacheRecoverer::onOplogEntry(OperationContext* opCtx,
Timestamp entryTs,
const CollectionShardingStateDeltaOplogEntry& entry) {
stdx::lock_guard lk(_mutex);
if (_timestampToReadAt.getTimestamp() < entryTs) {
return;
}
_entriesToApply.emplace(entryTs, entry);
}
namespace {
boost::optional<CollectionMetadata> applyOplogEntry(
OperationContext* opCtx,
const CollectionShardingStateDeltaOplogEntry& entry,
CollectionMetadata collMetadata) {
// TODO SERVER-121200: Actually implement the delta oplog entry application on
// CollectionMetadata
return collMetadata;
}
boost::optional<CollectionMetadata> applyOplogEntry(
OperationContext* opCtx,
const InvalidateCollectionShardingStateOplogEntry& entry,
CollectionMetadata collMetadata) {
return boost::none;
}
} // namespace
boost::optional<CollectionMetadata> CollectionCacheRecoverer::drainAndApply(
OperationContext* opCtx) {
stdx::lock_guard lk(_mutex);
auto collMetadata = _collMetadata.get();
while (!_entriesToApply.empty()) {
ON_BLOCK_EXIT([&] { _entriesToApply.pop(); });
const auto& [ts, entry] = _entriesToApply.front();
auto newCollMetadata = std::visit(
[&](const auto& entry) { return applyOplogEntry(opCtx, entry, collMetadata); }, entry);
if (!newCollMetadata) {
// Draining failed, signal the caller that it must perform another round of recovery. We
// advance the timestamp such that it gets the new valid snapshot.
_timestampToReadAt = repl::OpTime{ts, repl::OpTime::kUninitializedTerm};
return boost::none;
}
collMetadata = std::move(*newCollMetadata);
}
return collMetadata;
}
} // namespace mongo

View File

@@ -0,0 +1,125 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/shard_role/shard_catalog/collection_metadata.h"
#include "mongo/db/shard_role/shard_catalog/type_oplog_catalog_metadata_gen.h"
#include <queue>
namespace mongo {
/**
* A class containing all necessary tools to:
* - Apply oplog entries to the existing CollectionMetadata
* - Recover the sharding metadata from the authoritative source of information on disk + all oplog
* entries that were supposed to be applied before disk recovery finished.
*
* The contract that users of the Recoverer should expect are as follows:
* - The recovery should ony be started once there's a guarantee the durable changes are stable on
* disk. That is, recovery can start within a critical section so long as no more durable changes
* will be done before it is released.
* - The chosen timestamp may or may not be within a critical section.
* - The returned metadata is not majority committed and instead should be treated with local read
* concern.
* - The returned metadata may be from a time when the critical section was active.
*
* This class is expected to be used with two modes of operation:
* - Applying oplog entries to a known set of collection metadata
* - Fully recovering collection metadata from disk and catching up with any potentially concurrent
* oplog entries.
*
* In both cases the class must be used as follows:
*
* while (true) {
* recoverer.start();
* recoverer.wait();
* if (auto collMetadata = recoverer.drain()) {
* CSR.install(collMetadata);
* break;
* }
* }
*
* We expect the first case to just be a single loop and for both start and wait to be immediate
* no-ops.
*/
class MONGO_MOD_PRIVATE CollectionCacheRecoverer {
public:
CollectionCacheRecoverer(const NamespaceString& nss, CollectionMetadata existingMetadata)
: _collMetadata(SemiFuture<CollectionMetadata>::makeReady(std::move(existingMetadata))),
_nss(nss) {};
CollectionCacheRecoverer(const NamespaceString& nss) : _nss(nss) {};
void start(OperationContext* opCtx, ExecutorPtr executor);
/**
* Waits until the CollectionMetadata has been recovered from disk. Note that in order to get it
* we must first call `drainAndApply` in order to drain the potentially concurrent oplog
* entries.
*/
Status waitForInitialPass(OperationContext* opCtx);
/**
* Drain and publish the latest collection metadata state to the caller. This method can return
* boost::none if we encountered an invalidate oplog entry during the drain that forces recovery
* to happen again.
*
* The returned CollectionMetadata should be valid and return whether the collection is
* currently untracked (no sharding metadata exists) or tracked (it's sharded and therefore
* present on the global catalog).
*/
boost::optional<CollectionMetadata> drainAndApply(OperationContext* opCtx);
/**
* Apply the oplog entry to the CollectionMetadata. If disk recovery is taking place it will
* instead enqueue the entry for recovery and only materialize the results once
* CacheSynchronizer::drain is called.
*/
void onOplogEntry(OperationContext* opCtx,
Timestamp entryTs,
const InvalidateCollectionShardingStateOplogEntry& entry);
void onOplogEntry(OperationContext* opCtx,
Timestamp entryTs,
const CollectionShardingStateDeltaOplogEntry& entry);
private:
// The following convention is used to denote what protects what:
// (M) denotes protection via the _mutex
stdx::mutex _mutex;
CancellationSource _cancellationSource;
SemiFuture<CollectionMetadata> _collMetadata;
using QueuedItem = std::variant<InvalidateCollectionShardingStateOplogEntry,
CollectionShardingStateDeltaOplogEntry>;
std::queue<std::pair<Timestamp, QueuedItem>> _entriesToApply; // (M)
const NamespaceString _nss;
repl::OpTime _timestampToReadAt; // (M)
};
} // namespace mongo

View File

@@ -0,0 +1,286 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/shard_role/shard_catalog/collection_cache_recoverer.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/global_catalog/type_collection.h"
#include "mongo/db/sharding_environment/shard_server_test_fixture.h"
#include "mongo/unittest/unittest.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
namespace {
const NamespaceString kTestNss =
NamespaceString::createNamespaceString_forTest("TestDB", "TestColl");
const std::string kShardKey = "_id";
const BSONObj kShardKeyPattern = BSON(kShardKey << 1);
std::pair<CollectionType, std::vector<ChunkType>> makeShardedMetadataForDisk(
OperationContext* opCtx, int nChunks, ShardId shardId) {
const UUID uuid = UUID::gen();
const OID epoch = OID::gen();
const Timestamp timestamp(Date_t::now());
CollectionType collType{kTestNss, epoch, timestamp, Date_t::now(), uuid, kShardKeyPattern};
std::vector<ChunkType> chunks;
auto chunkVersion = ChunkVersion({epoch, timestamp}, {1, 0});
for (int i = 0; i < nChunks; i++) {
auto min = i == 0 ? BSON(kShardKey << MINKEY) : BSON(kShardKey << (i * 100));
auto max =
i == (nChunks - 1) ? BSON(kShardKey << MAXKEY) : BSON(kShardKey << ((i + 1) * 100));
auto range = ChunkRange(min, max);
auto& chunkInserted = chunks.emplace_back(uuid, std::move(range), chunkVersion, shardId);
chunkInserted.setName(OID::gen());
chunkVersion.incMajor();
}
return {std::move(collType), std::move(chunks)};
}
class RecovererFixture : public ShardServerTestFixture {
protected:
void setUp() override {
ShardServerTestFixture::setUp();
_executor = std::make_shared<ThreadPool>([] {
ThreadPool::Options options;
options.poolName = "TestCSSRecoveryPool";
options.minThreads = 1;
options.maxThreads = 1;
return options;
}());
_executor->startup();
}
void tearDown() override {
ShardServerTestFixture::tearDown();
_executor->shutdown();
}
ExecutorPtr getExecutor() {
return _executor;
}
private:
std::shared_ptr<ThreadPool> _executor;
};
} // namespace
TEST_F(RecovererFixture, CacheRecovererCanRecoverFromDisk) {
OperationContext* opCtx = operationContext();
int numChunks = 20;
const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0"));
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace);
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace);
{
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON());
}
for (const auto& chunk : chunks) {
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON());
}
CollectionCacheRecoverer recoverer{kTestNss};
recoverer.start(operationContext(), getExecutor());
ASSERT_OK(recoverer.waitForInitialPass(operationContext()));
auto collMetadata = recoverer.drainAndApply(operationContext());
ASSERT_TRUE(collMetadata);
const auto shardVersionExpected = chunks.back().getVersion();
ASSERT_EQ(collMetadata->getCollPlacementVersion(), shardVersionExpected);
}
TEST_F(RecovererFixture, CacheRecovererAppliesOplogChanges) {
OperationContext* opCtx = operationContext();
int numChunks = 20;
const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0"));
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace);
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace);
{
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON());
}
for (const auto& chunk : chunks) {
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON());
}
auto collMetadata = [&] {
CollectionCacheRecoverer recoverer{kTestNss};
recoverer.start(operationContext(), getExecutor());
ASSERT_OK(recoverer.waitForInitialPass(operationContext()));
return recoverer.drainAndApply(operationContext());
}();
ASSERT_TRUE(collMetadata);
CollectionCacheRecoverer recoverer{kTestNss, std::move(*collMetadata)};
recoverer.onOplogEntry(
operationContext(), Timestamp(Date_t::now()), CollectionShardingStateDeltaOplogEntry{});
collMetadata = recoverer.drainAndApply(operationContext());
ASSERT_TRUE(collMetadata);
}
TEST_F(RecovererFixture, CacheRecovererCanRecoverFromDiskWithConcurrentOplogEntries) {
OperationContext* opCtx = operationContext();
int numChunks = 20;
const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0"));
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace);
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace);
{
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON());
}
for (const auto& chunk : chunks) {
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON());
}
CollectionCacheRecoverer recoverer{kTestNss};
auto collMetadata = [&]() {
recoverer.start(operationContext(), getExecutor());
ASSERT_OK(recoverer.waitForInitialPass(operationContext()));
// We now add an oplog entry that invalidates the previous recovery.
recoverer.onOplogEntry(operationContext(),
Timestamp(Date_t::now()),
InvalidateCollectionShardingStateOplogEntry{kTestNss, UUID::gen()});
auto collMetadata = recoverer.drainAndApply(operationContext());
// This should've encountered an invalidate entry which triggers a new round of wait +
// drain.
ASSERT_FALSE(collMetadata);
recoverer.start(operationContext(), getExecutor());
ASSERT_OK(recoverer.waitForInitialPass(operationContext()));
collMetadata = recoverer.drainAndApply(operationContext());
// Recovery should've happened by now and returned the final state.
ASSERT_TRUE(collMetadata);
return collMetadata;
}();
const auto shardVersionExpected = chunks.back().getVersion();
ASSERT_EQ(collMetadata->getCollPlacementVersion(), shardVersionExpected);
}
TEST_F(RecovererFixture, CacheRecovererBubblesUpCachePressureErrors) {
OperationContext* opCtx = operationContext();
int numChunks = 20;
const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0"));
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace);
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace);
{
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON());
}
for (const auto& chunk : chunks) {
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON());
}
FailPointEnableBlock intermittentFailure{"WTWriteConflictExceptionForReads"};
CollectionCacheRecoverer recoverer{kTestNss};
recoverer.start(operationContext(), getExecutor());
auto status = recoverer.waitForInitialPass(operationContext());
ASSERT_NOT_OK(status);
ASSERT_EQ(status.code(), ErrorCodes::WriteConflict);
}
TEST_F(RecovererFixture, CacheRecovererBubblesUpDiskReadingFailure) {
OperationContext* opCtx = operationContext();
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace);
createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace);
{
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace,
BSON("_id" << kTestNss.toStringForErrorMsg() << "made_up" << true));
}
{
CollectionCacheRecoverer recoverer{kTestNss};
// The CollectionType is parsed via an IDL parser. So it should throw an IDL failure.
recoverer.start(operationContext(), getExecutor());
auto status = recoverer.waitForInitialPass(operationContext());
ASSERT_NOT_OK(status);
ASSERT_EQ(status.code(), ErrorCodes::IDLFailedToParse);
}
int numChunks = 20;
const auto [collType, _] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0"));
{
DBDirectClient client(opCtx);
client.remove(NamespaceString::kConfigShardCatalogCollectionsNamespace,
BSON("_id" << kTestNss.toStringForErrorMsg()));
client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON());
}
DBDirectClient client(opCtx);
client.insert(NamespaceString::kConfigShardCatalogChunksNamespace,
BSON("uuid" << collType.getUuid() << "lastmod" << "Invalid value"));
{
CollectionCacheRecoverer recoverer{kTestNss};
// The ChunkType uses a custom parser that returns a different family of errors compared to
// the CollectionType. Let's make sure that's the case.
recoverer.start(operationContext(), getExecutor());
auto status = recoverer.waitForInitialPass(operationContext());
ASSERT_NOT_OK(status);
ASSERT_EQ(status.code(), ErrorCodes::NoSuchKey);
}
}
} // namespace mongo

View File

@@ -30,14 +30,11 @@
#include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h"
#include "mongo/base/error_codes.h"
#include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/oid.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/client.h"
#include "mongo/db/feature_flag.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/query/plan_cache/plan_cache.h"
#include "mongo/db/query/plan_cache/sbe_plan_cache.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_level.h"
@@ -57,7 +54,6 @@
#include "mongo/db/versioning_protocol/stale_exception.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/duration.h"
@@ -379,8 +375,8 @@ boost::optional<CriticalSectionSignal> CollectionShardingRuntime::getCriticalSec
return {};
}
void CollectionShardingRuntime::setFilteringMetadata_nonAuthoritative(
OperationContext* opCtx, CollectionMetadata newMetadata) {
void CollectionShardingRuntime::_setFilteringMetadata(OperationContext* opCtx,
CollectionMetadata newMetadata) {
tassert(7032302,
str::stream() << "Namespace " << _nss.toStringForErrorMsg()
<< " must never have a routing table.",
@@ -426,21 +422,28 @@ void CollectionShardingRuntime::setFilteringMetadata_nonAuthoritative(
auto result = waitingVersion <=> newChunkVersion;
return result == std::partial_ordering::less;
});
// We reset the state on whether we are authoritative or not and delegate it to the parent
// caller on whether the CSS is now authoritative.
}
void CollectionShardingRuntime::setFilteringMetadata_nonAuthoritative(
OperationContext* opCtx, CollectionMetadata newMetadata) {
_setFilteringMetadata(opCtx, std::move(newMetadata));
_authoritativeState = AuthoritativeState::kNonAuthoritative;
}
void CollectionShardingRuntime::setFilteringMetadata_authoritative(OperationContext* opCtx,
CollectionMetadata newMetadata) {
_setFilteringMetadata(opCtx, std::move(newMetadata));
_authoritativeState = AuthoritativeState::kAuthoritative;
}
void CollectionShardingRuntime::_clearFilteringMetadata(OperationContext* opCtx,
bool collIsDropped) {
_authoritativeState = AuthoritativeState::kNonAuthoritative;
if (_placementVersionInRecoverOrRefresh) {
_placementVersionInRecoverOrRefresh->cancellationSource.cancel();
}
if (_nss.isNamespaceAlwaysUntracked()) {
// The namespace is always marked as untraked thus there is no need to clear anything.
// The namespace is always marked as untracked thus there is no need to clear anything.
return;
}
@@ -461,11 +464,24 @@ void CollectionShardingRuntime::_clearFilteringMetadata(OperationContext* opCtx,
void CollectionShardingRuntime::clearFilteringMetadata_nonAuthoritative(OperationContext* opCtx) {
_clearFilteringMetadata(opCtx, /* collIsDropped */ false);
_authoritativeState = AuthoritativeState::kNonAuthoritative;
}
void CollectionShardingRuntime::clearFilteringMetadataForDroppedCollection_nonAuthoritative(
OperationContext* opCtx) {
_clearFilteringMetadata(opCtx, /* collIsDropped */ true);
_authoritativeState = AuthoritativeState::kNonAuthoritative;
}
void CollectionShardingRuntime::clearFilteringMetadata_authoritative(OperationContext* opCtx) {
_clearFilteringMetadata(opCtx, /* collIsDropped */ false);
_authoritativeState = AuthoritativeState::kAuthoritative;
}
void CollectionShardingRuntime::clearFilteringMetadataForDroppedCollection_authoritative(
OperationContext* opCtx) {
_clearFilteringMetadata(opCtx, /* collIsDropped */ true);
_authoritativeState = AuthoritativeState::kAuthoritative;
}
Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx,

View File

@@ -41,10 +41,10 @@
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role/shard_catalog/collection_metadata.h"
#include "mongo/db/shard_role/shard_catalog/collection_sharding_state.h"
#include "mongo/db/shard_role/shard_catalog/critical_section_signal.h"
#include "mongo/db/shard_role/shard_catalog/metadata_manager.h"
#include "mongo/db/shard_role/shard_catalog/scoped_collection_metadata.h"
#include "mongo/db/versioning_protocol/shard_version.h"
#include "mongo/db/versioning_protocol/stale_exception.h"
#include "mongo/util/cancellation.h"
#include "mongo/util/concurrency/waiter_list.h"
#include "mongo/util/decorable.h"
@@ -189,6 +189,8 @@ public:
*/
void setFilteringMetadata_nonAuthoritative(OperationContext* opCtx,
CollectionMetadata newMetadata);
void setFilteringMetadata_authoritative(OperationContext* opCtx,
CollectionMetadata newMetadata);
/**
* Marks the collection's filtering metadata as UNKNOWN, meaning that all attempts to check for
@@ -200,11 +202,13 @@ public:
* setFilteringMetadata which requires exclusive).
*/
void clearFilteringMetadata_nonAuthoritative(OperationContext* opCtx);
void clearFilteringMetadata_authoritative(OperationContext* opCtx);
/**
* Calls to clearFilteringMetadata + clears the _metadataManager object.
*/
void clearFilteringMetadataForDroppedCollection_nonAuthoritative(OperationContext* opCtx);
void clearFilteringMetadataForDroppedCollection_authoritative(OperationContext* opCtx);
/**
* Methods to control the collection's critical section. Methods listed below must be called
@@ -356,6 +360,11 @@ private:
*/
void _clearFilteringMetadata(OperationContext* opCtx, bool collIsDropped);
/**
* Auxiliary function used to implement the various setFilteringMetadata flavours.
*/
void _setFilteringMetadata(OperationContext* opCtx, CollectionMetadata collMetatada);
/**
* This function cleans up some state associated with the current sharded metadata before it's
* replaced by the new metadata.

View File

@@ -59,3 +59,20 @@ structs:
dbName:
type: database_name
description: "Database name"
# TODO SERVER-121202: These two types are placeholders for the two oplog entries.
# They will quite possibly need to be adjusted once the time comes to actually use them.
InvalidateCollectionShardingStateOplogEntry:
description: "Oplog entry format for a metadata operation that triggers a CSS recovery from the durable state"
strict: false
fields:
collectionName:
type: namespacestring
description: "Collection for which we are triggering the recovery"
uuid:
type: uuid
description: "The UUID of the collection we are triggering the recovery for"
CollectionShardingStateDeltaOplogEntry:
description: "Oplog entry format for delta changes to apply to the overall CSS"
strict: false