diff --git a/src/mongo/db/global_catalog/type_chunk.h b/src/mongo/db/global_catalog/type_chunk.h index 7c2a0c3932b..974cfb60afd 100644 --- a/src/mongo/db/global_catalog/type_chunk.h +++ b/src/mongo/db/global_catalog/type_chunk.h @@ -168,6 +168,17 @@ public: const OID& epoch, const Timestamp& timestamp); + /** + * A helper method for using this class with PersistentTaskStore that returns an otherwise + * invalid ChunkType object without the proper chunk version set. It is the caller's + * responsibility to ensure the ChunkVersion is fixed afterwards. + * + * TODO SERVER-121075: See if this can be removed. + */ + MONGO_MOD_PRIVATE static ChunkType parse(const BSONObj& source, const IDLParserContext& ctxt) { + return uassertStatusOK(parseFromConfigBSON(source, OID(), Timestamp())); + } + /** * Constructs a new ChunkType object from BSON with the following format: * {_id: <>, max: <>, shard: <>, history: <>, lastmod: <>, onCurrentShardSince: <>} diff --git a/src/mongo/db/s/BUILD.bazel b/src/mongo/db/s/BUILD.bazel index cc9a4f4d311..727e4a43bad 100644 --- a/src/mongo/db/s/BUILD.bazel +++ b/src/mongo/db/s/BUILD.bazel @@ -444,6 +444,7 @@ mongo_cc_library( "//src/mongo/db/s/resharding:resharding_txn_cloner.cpp", "//src/mongo/db/s/resharding:resharding_txn_cloner_progress_gen", "//src/mongo/db/s/resharding:resharding_util.cpp", + "//src/mongo/db/shard_role/shard_catalog:collection_cache_recoverer.cpp", "//src/mongo/db/shard_role/shard_catalog:collection_critical_section_document_gen", "//src/mongo/db/shard_role/shard_catalog:collection_sharding_runtime.cpp", "//src/mongo/db/shard_role/shard_catalog:collection_sharding_state_factory_shard.cpp", @@ -885,6 +886,7 @@ mongo_cc_unit_test( "//src/mongo/db/global_catalog/metadata_consistency_validation:metadata_consistency_util_test.cpp", "//src/mongo/db/router_role/routing_cache:namespace_metadata_change_notifications_test.cpp", "//src/mongo/db/router_role/routing_cache:shard_server_catalog_cache_loader_test.cpp", + "//src/mongo/db/shard_role/shard_catalog:collection_cache_recoverer_test.cpp", "//src/mongo/db/shard_role/shard_catalog:collection_metadata_filtering_test.cpp", "//src/mongo/db/shard_role/shard_catalog:collection_metadata_test.cpp", "//src/mongo/db/shard_role/shard_catalog:collection_sharding_runtime_test.cpp", diff --git a/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer.cpp b/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer.cpp new file mode 100644 index 00000000000..62d71636199 --- /dev/null +++ b/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer.cpp @@ -0,0 +1,257 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/shard_role/shard_catalog/collection_cache_recoverer.h" + +#include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/global_catalog/type_collection.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/topology/sharding_state.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +namespace mongo { +namespace { +CollectionMetadata recoverCollectionFromDisk(OperationContext* opCtx, + repl::OpTime timestampToReadAt, + const NamespaceString& nss) { + // Setup the snapshot timestamp on the opCtx. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs{LogicalTime{timestampToReadAt.getTimestamp()}, + repl::ReadConcernLevel::kSnapshotReadConcern}; + + // TODO SERVER-121199: This disk recovery should be using the same aggregation that we use on + // the CSRS to refresh the chunks. Right now it's fully restoring everything from disk. The sort + // that happens afterwards should also go away as a result. + + // TODO SERVER-119940: Review whether we need to parse only a subset of the information present + // on the collection. Right now we're parsing it fully as if we're the CSRS even if never + // accessing certain fields. + struct CollectionParsed { + static CollectionParsed parse(const BSONObj& obj, const IDLParserContext&) { + return {CollectionType{obj}}; + } + CollectionType content; + }; + PersistentTaskStore collStore{ + NamespaceString::kConfigShardCatalogCollectionsNamespace}; + boost::optional coll; + collStore.forEach(opCtx, + BSON(CollectionType::kNssFieldName << nss.toStringForResourceId()), + [&](const CollectionParsed& parsedColl) { + coll = parsedColl.content; + return false; + }); + + // TODO SERVER-121201: Handle the case where there is no entry present for the collection + // meaning the collection is not owned by the shard at the current time. + tassert(12033905, "Expected to have a collection entry present but there is none", coll); + + // TODO SERVER-119940: Review whether we need to parse only a subset of the information present + // on the chunk. Right now we're parsing it fully as if we're the CSRS even if never accessing + // certain fields. + std::vector chunks; + PersistentTaskStore chunkStore{NamespaceString::kConfigShardCatalogChunksNamespace}; + chunkStore.forEach( + opCtx, BSON(ChunkType::collectionUUID() << coll->getUuid()), [&](const ChunkType& chunk) { + auto& inserted = chunks.emplace_back(std::move(chunk)); + const auto& version = inserted.getVersion(); + auto fixedVersion = ChunkVersion{{coll->getEpoch(), coll->getTimestamp()}, + {version.majorVersion(), version.minorVersion()}}; + inserted.setVersion(std::move(fixedVersion)); + return true; + }); + + // TODO SERVER-121201: Handle the case where there is a collection entry but no chunks present + // for it. + tassert(12033904, + "Expected to have at least one chunk entry for the collection but there are none", + !chunks.empty()); + + // At this point we can just sort it based on the lastmod version since we know the + // timestamp/uuid is fixed across all chunks for that collection. This is necessary as + // per the requirements of the routing table and could potentially go away if we used a + // different data structure in order to support the delta oplog entries. + std::sort(chunks.begin(), chunks.end(), [](const ChunkType& left, const ChunkType& right) { + return left.getVersion().toLong() < right.getVersion().toLong(); + }); + auto defaultCollator = [&]() -> std::unique_ptr { + if (auto collation = coll->getDefaultCollation(); !collation.isEmpty()) { + // The collation should have been validated upon collection creation + return uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); + } + return nullptr; + }(); + auto rt = OptionalRoutingTableHistory{std::make_shared( + RoutingTableHistory::makeNew(coll->getNss(), + coll->getUuid(), + coll->getKeyPattern(), + coll->getUnsplittable(), + std::move(defaultCollator), + coll->getUnique(), + coll->getEpoch(), + coll->getTimestamp(), + coll->getTimeseriesFields(), + coll->getReshardingFields(), + coll->getAllowMigrations(), + chunks))}; + auto cm = CurrentChunkManager{std::move(rt)}; + return CollectionMetadata{std::move(cm), ShardingState::get(opCtx)->shardId()}; +} +} // namespace + +Status CollectionCacheRecoverer::waitForInitialPass(OperationContext* opCtx) { + stdx::unique_lock lk(_mutex); + tassert(12033900, + "Attempting to recover without a valid collection metadata setup or without setting up " + "async recovery", + _collMetadata.valid()); + return _collMetadata.getNoThrow(opCtx).getStatus(); +} + +void CollectionCacheRecoverer::start(OperationContext* opCtx, ExecutorPtr executor) { + std::lock_guard lk(_mutex); + + if (_collMetadata.valid()) { + // We got created with an already existing CollectionMetadata, there's nothing to read from + // disk. + return; + } + // We first have to wait until the lastWritten batch of oplog entries has been + // applied and can be used for snapshot reads. This is because not doing so exposes + // the recovery process to a race condition with oplog application. + // + // Consider the following scenario: + // * A secondary has fetched an oplog batch containing oplog 'c' entries for the CSS + // * The secondary applies the batch + // * Concurrently we also initiate recovery from disk + // Now the last two operations can race in such a way that recovery would use a timestamp + // from before the oplog 'c' entry was committed to the oplog and that entry becomes lost + // since it was applied before the recovery process was installed. As a result, the recovery + // would end up installing the wrong CSS state since that entry would not be present in the + // queue even if it logically came after the timestamp used by the recovery process. + // + // To prevent this scenario we write down the lastWritten timestamp after installing + // the recovery process. This will ensure that all oplog entries that come after + // will see the recovery process in place and enqueue themselves. + _timestampToReadAt = repl::ReplicationCoordinator::get(opCtx)->getMyLastWrittenOpTime(); + _collMetadata = + repl::ReplicationCoordinator::get(opCtx) + ->registerWaiterForMajorityReadOpTime(opCtx, _timestampToReadAt) + .thenRunOn(executor) + .then([nss = _nss, + svcCtx = opCtx->getService(), + token = _cancellationSource.token(), + timestampToReadAt = _timestampToReadAt, + executor] { + ThreadClient client{"CSR-Recovery", getGlobalServiceContext()->getService()}; + auto opCtx = + CancelableOperationContext{client->makeOperationContext(), token, executor}; + LOGV2_INFO(12033903, + "Wait for stable timestamp finished, proceeding to read disk contents", + "collection"_attr = nss.toStringForErrorMsg()); + auto metadata = recoverCollectionFromDisk(opCtx.get(), timestampToReadAt, nss); + LOGV2_INFO(12033902, + "Disk contents have been read", + "collection"_attr = nss.toStringForErrorMsg()); + return metadata; + }) + .onCompletion(([nss = _nss](const auto& status) { + if (!status.isOK()) { + LOGV2_WARNING(12033901, + "Encountered failure during disk recovery", + "error"_attr = status.getStatus(), + "collection"_attr = nss.toStringForErrorMsg()); + } + return status; + })) + .semi(); +} + +void CollectionCacheRecoverer::onOplogEntry( + OperationContext* opCtx, + Timestamp entryTs, + const InvalidateCollectionShardingStateOplogEntry& entry) { + stdx::lock_guard lk(_mutex); + if (_timestampToReadAt.getTimestamp() < entryTs) { + return; + } + _entriesToApply.emplace(entryTs, entry); +} + +void CollectionCacheRecoverer::onOplogEntry(OperationContext* opCtx, + Timestamp entryTs, + const CollectionShardingStateDeltaOplogEntry& entry) { + stdx::lock_guard lk(_mutex); + if (_timestampToReadAt.getTimestamp() < entryTs) { + return; + } + _entriesToApply.emplace(entryTs, entry); +} + +namespace { +boost::optional applyOplogEntry( + OperationContext* opCtx, + const CollectionShardingStateDeltaOplogEntry& entry, + CollectionMetadata collMetadata) { + // TODO SERVER-121200: Actually implement the delta oplog entry application on + // CollectionMetadata + return collMetadata; +} +boost::optional applyOplogEntry( + OperationContext* opCtx, + const InvalidateCollectionShardingStateOplogEntry& entry, + CollectionMetadata collMetadata) { + return boost::none; +} +} // namespace + +boost::optional CollectionCacheRecoverer::drainAndApply( + OperationContext* opCtx) { + stdx::lock_guard lk(_mutex); + auto collMetadata = _collMetadata.get(); + while (!_entriesToApply.empty()) { + ON_BLOCK_EXIT([&] { _entriesToApply.pop(); }); + const auto& [ts, entry] = _entriesToApply.front(); + auto newCollMetadata = std::visit( + [&](const auto& entry) { return applyOplogEntry(opCtx, entry, collMetadata); }, entry); + if (!newCollMetadata) { + // Draining failed, signal the caller that it must perform another round of recovery. We + // advance the timestamp such that it gets the new valid snapshot. + _timestampToReadAt = repl::OpTime{ts, repl::OpTime::kUninitializedTerm}; + return boost::none; + } + collMetadata = std::move(*newCollMetadata); + } + return collMetadata; +} + +} // namespace mongo diff --git a/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer.h b/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer.h new file mode 100644 index 00000000000..d76eab9da5d --- /dev/null +++ b/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer.h @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/shard_role/shard_catalog/collection_metadata.h" +#include "mongo/db/shard_role/shard_catalog/type_oplog_catalog_metadata_gen.h" + +#include + +namespace mongo { +/** + * A class containing all necessary tools to: + * - Apply oplog entries to the existing CollectionMetadata + * - Recover the sharding metadata from the authoritative source of information on disk + all oplog + * entries that were supposed to be applied before disk recovery finished. + * + * The contract that users of the Recoverer should expect are as follows: + * - The recovery should ony be started once there's a guarantee the durable changes are stable on + * disk. That is, recovery can start within a critical section so long as no more durable changes + * will be done before it is released. + * - The chosen timestamp may or may not be within a critical section. + * - The returned metadata is not majority committed and instead should be treated with local read + * concern. + * - The returned metadata may be from a time when the critical section was active. + * + * This class is expected to be used with two modes of operation: + * - Applying oplog entries to a known set of collection metadata + * - Fully recovering collection metadata from disk and catching up with any potentially concurrent + * oplog entries. + * + * In both cases the class must be used as follows: + * + * while (true) { + * recoverer.start(); + * recoverer.wait(); + * if (auto collMetadata = recoverer.drain()) { + * CSR.install(collMetadata); + * break; + * } + * } + * + * We expect the first case to just be a single loop and for both start and wait to be immediate + * no-ops. + */ +class MONGO_MOD_PRIVATE CollectionCacheRecoverer { +public: + CollectionCacheRecoverer(const NamespaceString& nss, CollectionMetadata existingMetadata) + : _collMetadata(SemiFuture::makeReady(std::move(existingMetadata))), + _nss(nss) {}; + CollectionCacheRecoverer(const NamespaceString& nss) : _nss(nss) {}; + + void start(OperationContext* opCtx, ExecutorPtr executor); + + /** + * Waits until the CollectionMetadata has been recovered from disk. Note that in order to get it + * we must first call `drainAndApply` in order to drain the potentially concurrent oplog + * entries. + */ + Status waitForInitialPass(OperationContext* opCtx); + + /** + * Drain and publish the latest collection metadata state to the caller. This method can return + * boost::none if we encountered an invalidate oplog entry during the drain that forces recovery + * to happen again. + * + * The returned CollectionMetadata should be valid and return whether the collection is + * currently untracked (no sharding metadata exists) or tracked (it's sharded and therefore + * present on the global catalog). + */ + boost::optional drainAndApply(OperationContext* opCtx); + + /** + * Apply the oplog entry to the CollectionMetadata. If disk recovery is taking place it will + * instead enqueue the entry for recovery and only materialize the results once + * CacheSynchronizer::drain is called. + */ + void onOplogEntry(OperationContext* opCtx, + Timestamp entryTs, + const InvalidateCollectionShardingStateOplogEntry& entry); + void onOplogEntry(OperationContext* opCtx, + Timestamp entryTs, + const CollectionShardingStateDeltaOplogEntry& entry); + +private: + // The following convention is used to denote what protects what: + // (M) denotes protection via the _mutex + stdx::mutex _mutex; + CancellationSource _cancellationSource; + SemiFuture _collMetadata; + + using QueuedItem = std::variant; + std::queue> _entriesToApply; // (M) + + const NamespaceString _nss; + repl::OpTime _timestampToReadAt; // (M) +}; +} // namespace mongo diff --git a/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer_test.cpp b/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer_test.cpp new file mode 100644 index 00000000000..a0b90258b76 --- /dev/null +++ b/src/mongo/db/shard_role/shard_catalog/collection_cache_recoverer_test.cpp @@ -0,0 +1,286 @@ +/** + * Copyright (C) 2026-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/shard_role/shard_catalog/collection_cache_recoverer.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/global_catalog/type_collection.h" +#include "mongo/db/sharding_environment/shard_server_test_fixture.h" +#include "mongo/unittest/unittest.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +namespace mongo { +namespace { + +const NamespaceString kTestNss = + NamespaceString::createNamespaceString_forTest("TestDB", "TestColl"); +const std::string kShardKey = "_id"; +const BSONObj kShardKeyPattern = BSON(kShardKey << 1); + +std::pair> makeShardedMetadataForDisk( + OperationContext* opCtx, int nChunks, ShardId shardId) { + const UUID uuid = UUID::gen(); + const OID epoch = OID::gen(); + const Timestamp timestamp(Date_t::now()); + + CollectionType collType{kTestNss, epoch, timestamp, Date_t::now(), uuid, kShardKeyPattern}; + + std::vector chunks; + auto chunkVersion = ChunkVersion({epoch, timestamp}, {1, 0}); + for (int i = 0; i < nChunks; i++) { + auto min = i == 0 ? BSON(kShardKey << MINKEY) : BSON(kShardKey << (i * 100)); + auto max = + i == (nChunks - 1) ? BSON(kShardKey << MAXKEY) : BSON(kShardKey << ((i + 1) * 100)); + auto range = ChunkRange(min, max); + auto& chunkInserted = chunks.emplace_back(uuid, std::move(range), chunkVersion, shardId); + chunkInserted.setName(OID::gen()); + chunkVersion.incMajor(); + } + + return {std::move(collType), std::move(chunks)}; +} + +class RecovererFixture : public ShardServerTestFixture { +protected: + void setUp() override { + ShardServerTestFixture::setUp(); + _executor = std::make_shared([] { + ThreadPool::Options options; + options.poolName = "TestCSSRecoveryPool"; + options.minThreads = 1; + options.maxThreads = 1; + return options; + }()); + _executor->startup(); + } + + void tearDown() override { + ShardServerTestFixture::tearDown(); + _executor->shutdown(); + } + + ExecutorPtr getExecutor() { + return _executor; + } + +private: + std::shared_ptr _executor; +}; + +} // namespace + +TEST_F(RecovererFixture, CacheRecovererCanRecoverFromDisk) { + OperationContext* opCtx = operationContext(); + int numChunks = 20; + const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0")); + + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace); + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace); + + { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON()); + } + + for (const auto& chunk : chunks) { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON()); + } + + CollectionCacheRecoverer recoverer{kTestNss}; + + recoverer.start(operationContext(), getExecutor()); + ASSERT_OK(recoverer.waitForInitialPass(operationContext())); + auto collMetadata = recoverer.drainAndApply(operationContext()); + + ASSERT_TRUE(collMetadata); + + const auto shardVersionExpected = chunks.back().getVersion(); + + ASSERT_EQ(collMetadata->getCollPlacementVersion(), shardVersionExpected); +} + +TEST_F(RecovererFixture, CacheRecovererAppliesOplogChanges) { + OperationContext* opCtx = operationContext(); + int numChunks = 20; + const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0")); + + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace); + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace); + + { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON()); + } + + for (const auto& chunk : chunks) { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON()); + } + + auto collMetadata = [&] { + CollectionCacheRecoverer recoverer{kTestNss}; + + recoverer.start(operationContext(), getExecutor()); + ASSERT_OK(recoverer.waitForInitialPass(operationContext())); + return recoverer.drainAndApply(operationContext()); + }(); + + ASSERT_TRUE(collMetadata); + + CollectionCacheRecoverer recoverer{kTestNss, std::move(*collMetadata)}; + recoverer.onOplogEntry( + operationContext(), Timestamp(Date_t::now()), CollectionShardingStateDeltaOplogEntry{}); + collMetadata = recoverer.drainAndApply(operationContext()); + + ASSERT_TRUE(collMetadata); +} + +TEST_F(RecovererFixture, CacheRecovererCanRecoverFromDiskWithConcurrentOplogEntries) { + OperationContext* opCtx = operationContext(); + int numChunks = 20; + const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0")); + + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace); + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace); + + { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON()); + } + + for (const auto& chunk : chunks) { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON()); + } + + CollectionCacheRecoverer recoverer{kTestNss}; + + auto collMetadata = [&]() { + recoverer.start(operationContext(), getExecutor()); + ASSERT_OK(recoverer.waitForInitialPass(operationContext())); + // We now add an oplog entry that invalidates the previous recovery. + recoverer.onOplogEntry(operationContext(), + Timestamp(Date_t::now()), + InvalidateCollectionShardingStateOplogEntry{kTestNss, UUID::gen()}); + auto collMetadata = recoverer.drainAndApply(operationContext()); + // This should've encountered an invalidate entry which triggers a new round of wait + + // drain. + ASSERT_FALSE(collMetadata); + recoverer.start(operationContext(), getExecutor()); + ASSERT_OK(recoverer.waitForInitialPass(operationContext())); + collMetadata = recoverer.drainAndApply(operationContext()); + // Recovery should've happened by now and returned the final state. + ASSERT_TRUE(collMetadata); + return collMetadata; + }(); + + const auto shardVersionExpected = chunks.back().getVersion(); + + ASSERT_EQ(collMetadata->getCollPlacementVersion(), shardVersionExpected); +} + +TEST_F(RecovererFixture, CacheRecovererBubblesUpCachePressureErrors) { + OperationContext* opCtx = operationContext(); + int numChunks = 20; + const auto [collType, chunks] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0")); + + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace); + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace); + + { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON()); + } + + for (const auto& chunk : chunks) { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, chunk.toConfigBSON()); + } + + FailPointEnableBlock intermittentFailure{"WTWriteConflictExceptionForReads"}; + + CollectionCacheRecoverer recoverer{kTestNss}; + + recoverer.start(operationContext(), getExecutor()); + auto status = recoverer.waitForInitialPass(operationContext()); + ASSERT_NOT_OK(status); + ASSERT_EQ(status.code(), ErrorCodes::WriteConflict); +} + +TEST_F(RecovererFixture, CacheRecovererBubblesUpDiskReadingFailure) { + OperationContext* opCtx = operationContext(); + + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogCollectionsNamespace); + createTestCollection(opCtx, NamespaceString::kConfigShardCatalogChunksNamespace); + + { + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, + BSON("_id" << kTestNss.toStringForErrorMsg() << "made_up" << true)); + } + + { + CollectionCacheRecoverer recoverer{kTestNss}; + + // The CollectionType is parsed via an IDL parser. So it should throw an IDL failure. + recoverer.start(operationContext(), getExecutor()); + auto status = recoverer.waitForInitialPass(operationContext()); + ASSERT_NOT_OK(status); + ASSERT_EQ(status.code(), ErrorCodes::IDLFailedToParse); + } + + int numChunks = 20; + const auto [collType, _] = makeShardedMetadataForDisk(opCtx, numChunks, ShardId("0")); + + { + DBDirectClient client(opCtx); + client.remove(NamespaceString::kConfigShardCatalogCollectionsNamespace, + BSON("_id" << kTestNss.toStringForErrorMsg())); + client.insert(NamespaceString::kConfigShardCatalogCollectionsNamespace, collType.toBSON()); + } + + DBDirectClient client(opCtx); + client.insert(NamespaceString::kConfigShardCatalogChunksNamespace, + BSON("uuid" << collType.getUuid() << "lastmod" << "Invalid value")); + + { + CollectionCacheRecoverer recoverer{kTestNss}; + + // The ChunkType uses a custom parser that returns a different family of errors compared to + // the CollectionType. Let's make sure that's the case. + recoverer.start(operationContext(), getExecutor()); + auto status = recoverer.waitForInitialPass(operationContext()); + ASSERT_NOT_OK(status); + ASSERT_EQ(status.code(), ErrorCodes::NoSuchKey); + } +} + +} // namespace mongo diff --git a/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.cpp b/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.cpp index 8531413902d..1087ca41cd7 100644 --- a/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.cpp +++ b/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.cpp @@ -30,14 +30,11 @@ #include "mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h" #include "mongo/base/error_codes.h" -#include "mongo/base/status_with.h" #include "mongo/base/string_data.h" -#include "mongo/bson/oid.h" #include "mongo/bson/timestamp.h" #include "mongo/db/client.h" #include "mongo/db/feature_flag.h" #include "mongo/db/operation_context.h" -#include "mongo/db/query/plan_cache/plan_cache.h" #include "mongo/db/query/plan_cache/sbe_plan_cache.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_level.h" @@ -57,7 +54,6 @@ #include "mongo/db/versioning_protocol/stale_exception.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" -#include "mongo/platform/atomic_word.h" #include "mongo/util/assert_util.h" #include "mongo/util/clock_source.h" #include "mongo/util/duration.h" @@ -379,8 +375,8 @@ boost::optional CollectionShardingRuntime::getCriticalSec return {}; } -void CollectionShardingRuntime::setFilteringMetadata_nonAuthoritative( - OperationContext* opCtx, CollectionMetadata newMetadata) { +void CollectionShardingRuntime::_setFilteringMetadata(OperationContext* opCtx, + CollectionMetadata newMetadata) { tassert(7032302, str::stream() << "Namespace " << _nss.toStringForErrorMsg() << " must never have a routing table.", @@ -426,21 +422,28 @@ void CollectionShardingRuntime::setFilteringMetadata_nonAuthoritative( auto result = waitingVersion <=> newChunkVersion; return result == std::partial_ordering::less; }); - // We reset the state on whether we are authoritative or not and delegate it to the parent - // caller on whether the CSS is now authoritative. +} + +void CollectionShardingRuntime::setFilteringMetadata_nonAuthoritative( + OperationContext* opCtx, CollectionMetadata newMetadata) { + _setFilteringMetadata(opCtx, std::move(newMetadata)); _authoritativeState = AuthoritativeState::kNonAuthoritative; } +void CollectionShardingRuntime::setFilteringMetadata_authoritative(OperationContext* opCtx, + CollectionMetadata newMetadata) { + _setFilteringMetadata(opCtx, std::move(newMetadata)); + _authoritativeState = AuthoritativeState::kAuthoritative; +} + void CollectionShardingRuntime::_clearFilteringMetadata(OperationContext* opCtx, bool collIsDropped) { - _authoritativeState = AuthoritativeState::kNonAuthoritative; - if (_placementVersionInRecoverOrRefresh) { _placementVersionInRecoverOrRefresh->cancellationSource.cancel(); } if (_nss.isNamespaceAlwaysUntracked()) { - // The namespace is always marked as untraked thus there is no need to clear anything. + // The namespace is always marked as untracked thus there is no need to clear anything. return; } @@ -461,11 +464,24 @@ void CollectionShardingRuntime::_clearFilteringMetadata(OperationContext* opCtx, void CollectionShardingRuntime::clearFilteringMetadata_nonAuthoritative(OperationContext* opCtx) { _clearFilteringMetadata(opCtx, /* collIsDropped */ false); + _authoritativeState = AuthoritativeState::kNonAuthoritative; } void CollectionShardingRuntime::clearFilteringMetadataForDroppedCollection_nonAuthoritative( OperationContext* opCtx) { _clearFilteringMetadata(opCtx, /* collIsDropped */ true); + _authoritativeState = AuthoritativeState::kNonAuthoritative; +} + +void CollectionShardingRuntime::clearFilteringMetadata_authoritative(OperationContext* opCtx) { + _clearFilteringMetadata(opCtx, /* collIsDropped */ false); + _authoritativeState = AuthoritativeState::kAuthoritative; +} + +void CollectionShardingRuntime::clearFilteringMetadataForDroppedCollection_authoritative( + OperationContext* opCtx) { + _clearFilteringMetadata(opCtx, /* collIsDropped */ true); + _authoritativeState = AuthoritativeState::kAuthoritative; } Status CollectionShardingRuntime::waitForClean(OperationContext* opCtx, diff --git a/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h b/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h index 389abc2d9f2..8c30019bdfd 100644 --- a/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h +++ b/src/mongo/db/shard_role/shard_catalog/collection_sharding_runtime.h @@ -41,10 +41,10 @@ #include "mongo/db/service_context.h" #include "mongo/db/shard_role/shard_catalog/collection_metadata.h" #include "mongo/db/shard_role/shard_catalog/collection_sharding_state.h" +#include "mongo/db/shard_role/shard_catalog/critical_section_signal.h" #include "mongo/db/shard_role/shard_catalog/metadata_manager.h" #include "mongo/db/shard_role/shard_catalog/scoped_collection_metadata.h" #include "mongo/db/versioning_protocol/shard_version.h" -#include "mongo/db/versioning_protocol/stale_exception.h" #include "mongo/util/cancellation.h" #include "mongo/util/concurrency/waiter_list.h" #include "mongo/util/decorable.h" @@ -189,6 +189,8 @@ public: */ void setFilteringMetadata_nonAuthoritative(OperationContext* opCtx, CollectionMetadata newMetadata); + void setFilteringMetadata_authoritative(OperationContext* opCtx, + CollectionMetadata newMetadata); /** * Marks the collection's filtering metadata as UNKNOWN, meaning that all attempts to check for @@ -200,11 +202,13 @@ public: * setFilteringMetadata which requires exclusive). */ void clearFilteringMetadata_nonAuthoritative(OperationContext* opCtx); + void clearFilteringMetadata_authoritative(OperationContext* opCtx); /** * Calls to clearFilteringMetadata + clears the _metadataManager object. */ void clearFilteringMetadataForDroppedCollection_nonAuthoritative(OperationContext* opCtx); + void clearFilteringMetadataForDroppedCollection_authoritative(OperationContext* opCtx); /** * Methods to control the collection's critical section. Methods listed below must be called @@ -356,6 +360,11 @@ private: */ void _clearFilteringMetadata(OperationContext* opCtx, bool collIsDropped); + /** + * Auxiliary function used to implement the various setFilteringMetadata flavours. + */ + void _setFilteringMetadata(OperationContext* opCtx, CollectionMetadata collMetatada); + /** * This function cleans up some state associated with the current sharded metadata before it's * replaced by the new metadata. diff --git a/src/mongo/db/shard_role/shard_catalog/type_oplog_catalog_metadata.idl b/src/mongo/db/shard_role/shard_catalog/type_oplog_catalog_metadata.idl index 716d39d2179..8de1e306db6 100644 --- a/src/mongo/db/shard_role/shard_catalog/type_oplog_catalog_metadata.idl +++ b/src/mongo/db/shard_role/shard_catalog/type_oplog_catalog_metadata.idl @@ -59,3 +59,20 @@ structs: dbName: type: database_name description: "Database name" + + # TODO SERVER-121202: These two types are placeholders for the two oplog entries. + # They will quite possibly need to be adjusted once the time comes to actually use them. + InvalidateCollectionShardingStateOplogEntry: + description: "Oplog entry format for a metadata operation that triggers a CSS recovery from the durable state" + strict: false + fields: + collectionName: + type: namespacestring + description: "Collection for which we are triggering the recovery" + uuid: + type: uuid + description: "The UUID of the collection we are triggering the recovery for" + + CollectionShardingStateDeltaOplogEntry: + description: "Oplog entry format for delta changes to apply to the overall CSS" + strict: false