SERVER-101412 Merge in replicated size and count POC with stubs behind fully disabled feature flag (#46818)

Co-authored-by: Henrik Edin <henrik.edin@mongodb.com>
Co-authored-by: Matt Kneiser <matt.kneiser@mongodb.com>
GitOrigin-RevId: 4d466e5652b605addc9ce79399e4d48e79afa4f8
This commit is contained in:
Damian Wasilewicz
2026-01-24 02:05:13 -05:00
committed by MongoDB Bot
parent 856dd81403
commit 2b1357b976
18 changed files with 848 additions and 2 deletions

3
.github/CODEOWNERS vendored
View File

@@ -2848,6 +2848,9 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
# The following patterns are parsed from ./src/mongo/db/repl/split_horizon/OWNERS.yml
/src/mongo/db/repl/split_horizon/**/* @10gen/server-split-horizon @svc-auto-approve-bot
# The following patterns are parsed from ./src/mongo/db/replicated_size_and_count_metadata_manager/OWNERS.yml
/src/mongo/db/replicated_size_and_count_metadata_manager/**/* @10gen/server-collection-write-path @svc-auto-approve-bot
# The following patterns are parsed from ./src/mongo/db/router_role/OWNERS.yml
/src/mongo/db/router_role/**/* @10gen/server-catalog-and-routing-routing-and-topology @svc-auto-approve-bot

View File

@@ -25,3 +25,5 @@
- featureFlagPrimaryDrivenIndexBuilds
# TODO SERVER-104494 SERVER-104258: Turn this back on once the issues are fixed.
- featureFlagReshardingCloneNoRefresh
# TODO SERVER-117454: Enable the feature flag in the all-feature-flags variant.
- featureFlagReplicatedSizeAndCount

View File

@@ -767,6 +767,7 @@ collection_write_path: # authoritative: @10gen/server-collection-write-path
files:
- src/mongo/db/collection_crud
- src/mongo/db/commands/collection_to_capped*
- src/mongo/db/replicated_size_and_count_metadata_manager
bsoncolumn: # authoritative: @10gen/server-bsoncolumn
meta:

View File

@@ -23,6 +23,7 @@ mongo_cc_library(
"//src/mongo/db/query/query_stats",
"//src/mongo/db/repl:local_oplog_info",
"//src/mongo/db/repl:repl_coordinator_interface",
"//src/mongo/db/replicated_size_and_count_metadata_manager",
"//src/mongo/db/shard_role/shard_catalog:document_validation",
"//src/mongo/db/storage:index_entry_comparison",
"//src/mongo/db/storage:record_store_base",

View File

@@ -39,11 +39,15 @@
#include "mongo/db/collection_crud/capped_collection_maintenance.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/write_stage_common.h"
#include "mongo/db/feature_flag.h"
#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/op_observer/op_observer_util.h"
#include "mongo/db/record_id_helpers.h"
#include "mongo/db/repl/local_oplog_info.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/replicated_size_and_count_metadata_manager/uncommitted_changes.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role/lock_manager/d_concurrency.h"
#include "mongo/db/shard_role/lock_manager/lock_manager_defs.h"
@@ -396,6 +400,16 @@ Status insertDocumentsImpl(OperationContext* opCtx,
recordIds,
/*fromMigrate=*/makeFromMigrateForInserts(opCtx, nss, begin, end, fromMigrate),
/*defaultFromMigrate=*/fromMigrate);
if (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
UncommittedMetaChange::write(opCtx).record(
collection->uuid(),
records.size(),
std::accumulate(records.begin(), records.end(), 0LL, [](auto acc, const Record& r) {
return acc + r.data.size();
}));
}
}
collection_internal::cappedDeleteUntilBelowConfiguredMaximum(
@@ -757,6 +771,13 @@ void updateDocument(OperationContext* opCtx,
args->updatedDoc = newDoc;
opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs);
if (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
UncommittedMetaChange::write(opCtx).record(
collection->uuid(), 0, newDoc.objsize() - oldDoc.value().objsize());
}
}
StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
@@ -849,6 +870,12 @@ StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
}
opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs);
if (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
UncommittedMetaChange::write(opCtx).record(
collection->uuid(), 0, newDoc.objsize() - oldDoc.value().objsize());
}
return newDoc;
}
@@ -937,6 +964,12 @@ void deleteDocument(OperationContext* opCtx,
opCtx->getServiceContext()->getOpObserver()->onDelete(
opCtx, collection, stmtId, doc.value(), documentKey, deleteArgs);
if (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
UncommittedMetaChange::write(opCtx).record(collection->uuid(), -1, -doc.value().objsize());
}
if (opDebug) {
opDebug->getAdditiveMetrics().incrementKeysDeleted(keysDeleted);
// 'opDebug' may be deleted at rollback time in case of multi-document transaction.
@@ -987,6 +1020,11 @@ repl::OpTime truncateRange(OperationContext* opCtx,
opCtx->getServiceContext()->getOpObserver()->onTruncateRange(
opCtx, collection, minRecordId, maxRecordId, bytesDeleted, docsDeleted, opTime);
if (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
UncommittedMetaChange::write(opCtx).record(collection->uuid(), -docsDeleted, -bytesDeleted);
}
return opTime;
}
} // namespace collection_internal

View File

@@ -135,6 +135,7 @@
#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/replicated_size_and_count_metadata_manager/replicated_size_and_count_metadata_manager.h"
#include "mongo/db/replication_state_transition_lock_guard.h"
#include "mongo/db/request_execution_context.h"
#include "mongo/db/router_role/routing_cache/catalog_cache.h"
@@ -1870,6 +1871,13 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
true /* memLeakAllowed */);
}
// Shut down the thread managing fast size and count information.
if (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot())) {
ReplicatedSizeAndCountMetadataManager::get(serviceContext).shutdown();
}
// Depending on the underlying implementation, there may be some state that needs to be shut
// down after the replication subsystem and the storage engine.
auto& serviceLifecycle =

View File

@@ -83,6 +83,12 @@ public:
// Name for the profile collection
static constexpr StringData kSystemDotProfileCollectionName = "system.profile"_sd;
// Name for fastcount - replicated collection size and count - collections.
static constexpr StringData kSystemReplicatedSizeAndCountMetadataStore =
"fast_count_metadata_store"_sd;
static constexpr StringData kSystemReplicatedSizeAndCountMetadataStoreTimestamps =
"fast_count_metadata_store_timestamps"_sd;
// Names of privilege document collections
static constexpr StringData kSystemUsers = "system.users"_sd;
static constexpr StringData kSystemRoles = "system.roles"_sd;

View File

@@ -0,0 +1,32 @@
load("//bazel:mongo_src_rules.bzl", "idl_generator", "mongo_cc_library", "mongo_cc_unit_test")
package(default_visibility = ["//visibility:public"])
exports_files(
glob([
"*.h",
"*.cpp",
]),
)
mongo_cc_library(
name = "replicated_size_and_count_metadata_manager",
srcs = [
"replicated_size_and_count_metadata_manager.cpp",
"uncommitted_changes.cpp",
],
deps = [
"//src/mongo/db:record_id_helpers",
"//src/mongo/db/update:update_common",
],
)
mongo_cc_library(
name = "replicated_size_and_count_metadata_manager_init",
srcs = [
"replicated_size_and_count_metadata_manager_init.cpp",
],
deps = [
"//src/mongo/db/shard_role/shard_catalog:catalog_helpers",
],
)

View File

@@ -0,0 +1,5 @@
version: 1.0.0
filters:
- "*":
approvers:
- 10gen/server-collection-write-path

View File

@@ -0,0 +1,291 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/replicated_size_and_count_metadata_manager/replicated_size_and_count_metadata_manager.h"
#include "mongo/db/collection_crud/collection_write_path.h"
#include "mongo/db/update/document_diff_calculator.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
namespace mongo {
namespace {
const ServiceContext::Decoration<ReplicatedSizeAndCountMetadataManager> getMetadataManager =
ServiceContext::declareDecoration<ReplicatedSizeAndCountMetadataManager>();
}
ReplicatedSizeAndCountMetadataManager& ReplicatedSizeAndCountMetadataManager::get(
ServiceContext* svcCtx) {
return getMetadataManager(svcCtx);
}
void ReplicatedSizeAndCountMetadataManager::shutdown() {
LOGV2(11648800, "Shutting down ReplicatedSizeAndCountMetadataManager");
_inShutdown.storeRelaxed(true);
}
// TODO SERVER-117575: Uncomment implementation once the circular dependency is resolved.
// void ReplicatedSizeAndCountMetadataManager::startup(OperationContext* opCtx) {
// TODO SERVER-117650: Read existing collection to populate in-memory metadata. This is currently
// executed before oplog application, so if there is an create entry for this collection to apply we
// will currently miss it.
// {
// CollectionOrViewAcquisition acquisition = acquireCollectionOrView(
// opCtx,
// CollectionOrViewAcquisitionRequest::fromOpCtx(
// opCtx,
// NamespaceString::makeGlobalConfigCollection(NamespaceString::kSystemReplicatedSizeAndCountMetadataStore),
// AcquisitionPrerequisites::OperationType::kWrite),
// LockMode::MODE_IX);
// if (acquisition.collectionExists()) {
// LOGV2(11648801,
// "ReplicatedSizeAndCountMetadataManager::startup metadata collection
// exists, initializing sizes and " "counts");
// stdx::lock_guard lock(_mutex);
// auto cursor = acquisition.getCollectionPtr()->getCursor(opCtx);
// while (auto record = cursor->next()) {
// Record& rec = *record;
// UUID uuid = _UUIDForKey(rec.id);
// BSONObj data = rec.data.releaseToBson();
// auto& meta = _metadata[uuid];
// meta.sizeCount.count =
// data.getField(ReplicatedSizeAndCountMetadataManager::kCountKey).Long();
// meta.sizeCount.size =
// data.getField(ReplicatedSizeAndCountMetadataManager::kCountKey).Long();
// }
// LOGV2(11648802, "ReplicatedSizeAndCountMetadataManager::startup initialization
// complete");
// } else {
// LOGV2(11648803,
// "ReplicatedSizeAndCountMetadataManager::startup metadata collection does not exist.
// No initialization " "needed");
// }
// }
// _backgroundThread =
// stdx::thread(&ReplicatedSizeAndCountMetadataManager::_startBackgroundThread, this,
// opCtx->getServiceContext());
// }
void ReplicatedSizeAndCountMetadataManager::_startBackgroundThread(ServiceContext* svcCtx) {
// TODO SERVER-117575: Find a way to link this in a way that satisfies symbol checker and also
// does not cause binaries outside of the server to link StoreSASLOptions without
// CoreOptions_Store. Also, specify ShardServer as the ClusterRole.
// AuthorizationSession::get(cc())->grantInternalAuthorization();
ThreadClient tc(_threadName, svcCtx->getService());
auto uniqueOpCtx = tc->makeOperationContext();
auto opCtx = uniqueOpCtx.get();
try {
_runBackgroundThreadOnTimer(opCtx);
} catch (const DBException& ex) {
LOGV2_WARNING(11648806,
"Failure in thread",
"threadName"_attr = _threadName,
"error"_attr = ex.toStatus());
}
// TODO SERVER-117651 : Shutdown behavior goes here.
LOGV2(11648804, "ReplicatedSizeAndCountMetadataManager exited");
}
void ReplicatedSizeAndCountMetadataManager::_runBackgroundThreadOnTimer(OperationContext* opCtx) {
while (!_inShutdown.loadRelaxed()) {
// TODO SERVER-117515: should not just be on a timer. We want to signal this from checkpoint
// thread somehow.
stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
_runIteration(opCtx);
}
}
void ReplicatedSizeAndCountMetadataManager::_runIteration(OperationContext* opCtx) {
// TODO SERVER-117652: Make copy so we don't hold locks too long. Should use immutable data
// structures.
absl::flat_hash_map<UUID, StoredSizeCount> metadata;
{
stdx::lock_guard lock(_mutex);
metadata = _metadata;
}
try {
for (auto&& [metadataKey, metadataVal] : metadata) {
if (metadataVal.dirty) {
_writeMetadata(opCtx, metadataKey, metadataVal.sizeCount, _keyForUUID(metadataKey));
stdx::lock_guard lock(_mutex);
_metadata[metadataKey].dirty = false;
}
}
} catch (const DBException& ex) {
LOGV2_WARNING(7397500,
"Failed to persist collection size/count metadata",
"error"_attr = ex.toStatus());
}
}
// TODO SERVER-117575: Uncomment implementation once the circular dependency is resolved.
// void ReplicatedSizeAndCountMetadataManager::_writeMetadata(OperationContext* opCtx,
// const UUID& uuid,
// const CollectionSizeCount& sizeCount,
// const RecordId recordId) {
// CollectionOrViewAcquisition acquisition = _acquireMetadataCollection(opCtx);
// const CollectionPtr& coll = acquisition.getCollectionPtr();
// invariant(coll, str::stream() <<
// "Expected to acquire metadata store as a collection, not a view. isView: "
// << std::boolalpha << coll.isView());
// TODO SERVER-117512: We're performing one write per collection here. But we should be
// able to bundle many of these writes in a single applyOps using the WUOW
// grouping interface. Might be a problem with updates.
// WriteUnitOfWork wuow(opCtx);
// Snapshotted<BSONObj> doc;
// bool exists = coll->findDoc(opCtx, recordId, &doc);
// if (exists) {
// _updateMetadata(opCtx, coll, doc, uuid, sizeCount);
// } else {
// _insertMetadata(opCtx, coll, uuid, sizeCount);
// }
// wuow.commit();
// }
void ReplicatedSizeAndCountMetadataManager::_updateMetadata(OperationContext* opCtx,
const CollectionPtr& coll,
const Snapshotted<BSONObj>& doc,
const UUID& uuid,
const CollectionSizeCount& sizeCount) {
// TODO SERVER-117575: Manually performing update without query system. This would be
// nice to avoid extra dependencies but might be too tricky to get
// right.
CollectionUpdateArgs args(doc.value());
// TODO SERVER-117654: When we also store timestamp we should be able to recover/combine data
// from old doc to keep this accurate.
BSONObj newDoc = _getDocForWrite(uuid, sizeCount);
auto diff = doc_diff::computeOplogDiff(doc.value(), newDoc, 0);
if (diff) {
args.update = update_oplog_entry::makeDeltaOplogEntry(*diff);
args.criteria = BSON("_id" << uuid);
collection_internal::updateDocument(
opCtx, coll, _keyForUUID(uuid), doc, newDoc, &args.update, nullptr, nullptr, &args);
} else {
// TODO SERVER-117508: Increment t2 stat.
LOGV2(11648805, "ReplicatedSizeAndCountMetadataManager empty update", "uuid"_attr = uuid);
}
}
// TODO SERVER-117575: Uncomment implementation once the circular dependency is resolved.
// void ReplicatedSizeAndCountMetadataManager::_insertMetadata(OperationContext* opCtx,
// const CollectionPtr& coll,
// const UUID& uuid,
// const CollectionSizeCount& sizeCount) {
// [[maybe_unused]] auto ret = collection_internal::insertDocument(
// opCtx,
// coll,
// InsertStatement(_getDocForWrite(uuid, sizeCount)),
// // entry.first, entry.second.sizeCount.count, entry.second.sizeCount.size),
// nullptr);
// }
BSONObj ReplicatedSizeAndCountMetadataManager::_getDocForWrite(
const UUID& uuid, const CollectionSizeCount& sizeCount) {
return BSON("_id" << uuid << ReplicatedSizeAndCountMetadataManager::kCountKey << sizeCount.count
<< ReplicatedSizeAndCountMetadataManager::kSizeKey << sizeCount.size);
}
// TODO SERVER-117575: Uncomment implementation once the circular dependency is resolved.
// CollectionOrViewAcquisition
// ReplicatedSizeAndCountMetadataManager::_acquireMetadataCollection(OperationContext* opCtx)
// {
// {
// CollectionOrViewAcquisition acquisition = acquireCollectionOrView(
// opCtx,
// CollectionOrViewAcquisitionRequest::fromOpCtx(
// opCtx,
// NamespaceString::makeGlobalConfigCollection(NamespaceString::kSystemReplicatedSizeAndCountMetadataStore),
// AcquisitionPrerequisites::OperationType::kWrite),
// LockMode::MODE_IX);
// if (acquisition.getCollectionPtr()) {
// return acquisition;
// }
// }
// uasserted(11718600, "Expected metadata collection to exist");
// }
void ReplicatedSizeAndCountMetadataManager::commit(
const boost::container::flat_map<UUID, CollectionSizeCount>& changes,
boost::optional<Timestamp> commitTime) {
stdx::lock_guard lock(_mutex);
for (const auto& [uuid, metadata] : changes) {
// TODO SERVER-117656: Investigate why we sometimes get zero changes here.
if (metadata.count == 0 && metadata.size == 0) {
LOGV2_WARNING(11648808, "ReplicatedSizeAndCountMetadataManager, Count & Size == 0");
continue;
}
auto& stored = _metadata[uuid];
stored.sizeCount.count += metadata.count;
stored.sizeCount.size += metadata.size;
stored.dirty = true;
}
}
CollectionSizeCount ReplicatedSizeAndCountMetadataManager::find(const UUID& uuid) const {
stdx::lock_guard lock(_mutex);
auto it = _metadata.find(uuid);
if (it != _metadata.end()) {
return it->second.sizeCount;
}
return {};
}
// TODO SERVER-117575: Uncomment implementation once the circular dependency is resolved.
// RecordId ReplicatedSizeAndCountMetadataManager::_keyForUUID(const UUID& uuid) {
// auto key = record_id_helpers::keyForDoc(
// BSON("_id" << uuid), clustered_util::makeDefaultClusteredIdIndex().getIndexSpec(),
// nullptr);
// return key.getValue();
// }
UUID ReplicatedSizeAndCountMetadataManager::_UUIDForKey(RecordId key) {
return UUID::parse(record_id_helpers::toBSONAs(key, "").firstElement()).getValue();
}
} // namespace mongo

View File

@@ -0,0 +1,142 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/base/string_data.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/record_id.h"
#include "mongo/db/replicated_size_and_count_metadata_manager/size_and_count.h"
#include "mongo/db/shard_role/shard_catalog/collection.h"
#include "mongo/db/storage/snapshot.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/uuid.h"
#include <boost/container/flat_map.hpp>
#include <boost/optional/optional.hpp>
namespace mongo {
class MONGO_MOD_PUBLIC ReplicatedSizeAndCountMetadataManager {
public:
static ReplicatedSizeAndCountMetadataManager& get(ServiceContext* svcCtx);
inline static StringData kCountKey = "c"_sd;
inline static StringData kSizeKey = "s"_sd;
/**
* Signals fastcount thread to start.
*/
void startup(OperationContext* opCtx) {
uasserted(ErrorCodes::NotImplemented,
"ReplicatedSizeAndCountMetadataManager::startup not yet implemented");
}
/**
* Signals fastcount thread to stop.
*/
void shutdown();
void commit(const boost::container::flat_map<UUID, CollectionSizeCount>& changes,
boost::optional<Timestamp> commitTime);
CollectionSizeCount find(const UUID& uuid) const;
private:
void _startBackgroundThread(ServiceContext* svcCtx);
void _runBackgroundThreadOnTimer(OperationContext* opCtx);
void _runIteration(OperationContext* opCtx);
void _createMetadataCollection(OperationContext* opCtx);
void _writeMetadata(OperationContext* opCtx,
const UUID& uuid,
const CollectionSizeCount& sizeCount,
RecordId recordId) {
uasserted(ErrorCodes::NotImplemented,
"ReplicatedSizeAndCountMetadataManager::_writeMetadata not yet implemented");
}
void _updateMetadata(OperationContext* opCtx,
const CollectionPtr& coll,
const Snapshotted<BSONObj>& doc,
const UUID& uuid,
const CollectionSizeCount& sizeCount);
void _insertMetadata(OperationContext* opCtx,
const CollectionPtr& coll,
const UUID& uuid,
const CollectionSizeCount& sizeCount) {
uasserted(ErrorCodes::NotImplemented,
"ReplicatedSizeAndCountMetadataManager::_insertMetadata not yet implemented");
}
/**
* Formats and returns the document to write to the metadata collection.
*/
BSONObj _getDocForWrite(const UUID& uuid, const CollectionSizeCount& sizeCount);
// Acquire or create if missing, the kSystemReplicatedSizeAndCountMetadataStore collection.
// TODO SERVER-117575: Change return type to CollectionOrViewAcquisition
// boost::optional<CollectionOrViewAcquisition> _acquireMetadataCollection(
boost::optional<int> _acquireMetadataCollection(OperationContext* opCtx) {
uasserted(ErrorCodes::NotImplemented,
"ReplicatedSizeAndCountMetadataManager::_acquireMetadataCollection not yet "
"implemented");
return boost::none;
}
// void _fetch(OperationContext* opCtx, UUID uuid);
RecordId _keyForUUID(const UUID& uuid) {
uasserted(ErrorCodes::NotImplemented,
"ReplicatedSizeAndCountMetadataManager::_keyForUUID not yet implemented");
return RecordId();
}
UUID _UUIDForKey(RecordId key);
AtomicWord<bool> _inShutdown = false;
StringData _threadName = "sizeCount"_sd;
mutable stdx::mutex _mutex;
struct StoredSizeCount {
CollectionSizeCount sizeCount;
bool dirty{false}; // indicate if write is needed
// boost::optional<Timestamp> lastUpdated;
};
absl::flat_hash_map<UUID, StoredSizeCount> _metadata;
stdx::thread _backgroundThread;
};
} // namespace mongo

View File

@@ -0,0 +1,73 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/replicated_size_and_count_metadata_manager/replicated_size_and_count_metadata_manager_init.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/shard_role/shard_catalog/clustered_collection_util.h"
#include "mongo/db/shard_role/shard_catalog/create_collection.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
namespace mongo {
Status createFastcountCollection(OperationContext* opCtx) {
try {
LOGV2(11718601,
"Creating internal fastcount collection.",
"ns"_attr = NamespaceString::makeGlobalConfigCollection(
NamespaceString::kSystemReplicatedSizeAndCountMetadataStore)
.toStringForErrorMsg());
WriteUnitOfWork wuow(opCtx);
Status createCollectionStatus = createCollection(
opCtx,
NamespaceString::makeGlobalConfigCollection(
NamespaceString::kSystemReplicatedSizeAndCountMetadataStore),
CollectionOptions{.clusteredIndex = clustered_util::makeDefaultClusteredIdIndex()},
BSONObj{});
uassert(createCollectionStatus.code(),
str::stream() << "Failed to create the metadata store collection: "
<< NamespaceString::makeGlobalConfigCollection(
NamespaceString::kSystemReplicatedSizeAndCountMetadataStore)
.toStringForErrorMsg()
<< causedBy(createCollectionStatus.reason()),
createCollectionStatus.isOK());
wuow.commit();
} catch (const DBException& ex) {
return ex.toStatus();
}
return Status::OK();
}
} // namespace mongo

View File

@@ -0,0 +1,36 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/operation_context.h"
namespace mongo {
MONGO_MOD_PUBLIC Status createFastcountCollection(OperationContext* opCtx);
}

View File

@@ -0,0 +1,43 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/util/modules.h"
#include <cstdint>
namespace mongo {
struct MONGO_MOD_PUBLIC CollectionSizeCount {
int64_t count{0};
int64_t size{0};
};
} // namespace mongo

View File

@@ -0,0 +1,90 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/replicated_size_and_count_metadata_manager/uncommitted_changes.h"
#include "mongo/db/replicated_size_and_count_metadata_manager/replicated_size_and_count_metadata_manager.h"
#include "mongo/db/shard_role/transaction_resources.h"
#include "mongo/db/update/document_diff_calculator.h"
#include "mongo/db/update/update_oplog_entry_serialization.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
namespace mongo {
namespace {
const OperationContext::Decoration<std::shared_ptr<UncommittedMetaChange>>
getUncommittedMetaChange =
OperationContext::declareDecoration<std::shared_ptr<UncommittedMetaChange>>();
const ServiceContext::Decoration<ReplicatedSizeAndCountMetadataManager> getMetadataManager =
ServiceContext::declareDecoration<ReplicatedSizeAndCountMetadataManager>();
} // namespace
const UncommittedMetaChange& UncommittedMetaChange::read(OperationContext* opCtx) {
std::shared_ptr<UncommittedMetaChange>& ptr = getUncommittedMetaChange(opCtx);
if (ptr) {
return *ptr;
}
static UncommittedMetaChange empty;
return empty;
}
UncommittedMetaChange& UncommittedMetaChange::write(OperationContext* opCtx) {
std::shared_ptr<UncommittedMetaChange>& ptr = getUncommittedMetaChange(opCtx);
if (ptr) {
return *ptr;
}
auto metaChange = std::make_shared<UncommittedMetaChange>();
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
[metaChange](OperationContext* opCtx, boost::optional<Timestamp> commitTime) {
getMetadataManager(opCtx->getServiceContext())
.commit(metaChange->_trackedChanges, commitTime);
});
ptr = std::move(metaChange);
return *ptr;
}
CollectionSizeCount UncommittedMetaChange::find(UUID uuid) const {
auto it = _trackedChanges.find(uuid);
if (it != _trackedChanges.end()) {
return it->second;
}
return {};
}
void UncommittedMetaChange::record(UUID uuid, int64_t numDelta, int64_t sizeDelta) {
auto& collChanges = _trackedChanges[uuid];
collChanges.count += numDelta;
collChanges.size += sizeDelta;
}
} // namespace mongo

View File

@@ -0,0 +1,55 @@
/**
* Copyright (C) 2026-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/operation_context.h"
#include "mongo/db/replicated_size_and_count_metadata_manager/size_and_count.h"
#include "mongo/util/uuid.h"
#include <boost/container/flat_map.hpp>
namespace mongo {
class MONGO_MOD_PUBLIC UncommittedMetaChange {
public:
static const UncommittedMetaChange& read(OperationContext* opCtx);
static UncommittedMetaChange& write(OperationContext* opCtx);
CollectionSizeCount find(UUID uuid) const;
void record(UUID uuid, int64_t numDelta, int64_t sizeDelta);
private:
boost::container::flat_map<UUID, CollectionSizeCount> _trackedChanges;
};
} // namespace mongo

View File

@@ -231,6 +231,11 @@ feature_flags:
default: false
fcv_gated: false
incremental_rollout_phase: in_development
featureFlagReplicatedSizeAndCount:
description: "Track collection size and count in replicated internal collections"
cpp_varname: gFeatureFlagReplicatedSizeAndCount
default: false
fcv_gated: true
featureFlagOtelMetrics:
description: "Feature flag to enable exporting OpenTelemetry metrics"
cpp_varname: gFeatureFlagOtelMetrics

View File

@@ -77,6 +77,9 @@
#include "mongo/db/query/util/make_data_structure.h"
#include "mongo/db/repl/local_oplog_info.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/replicated_size_and_count_metadata_manager/replicated_size_and_count_metadata_manager.h"
#include "mongo/db/replicated_size_and_count_metadata_manager/uncommitted_changes.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role/lock_manager/lock_manager_defs.h"
@@ -1094,11 +1097,23 @@ long long CollectionImpl::getCappedMaxSize() const {
}
long long CollectionImpl::numRecords(OperationContext* opCtx) const {
return _shared->_recordStore->numRecords();
return (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()))
? ReplicatedSizeAndCountMetadataManager::get(opCtx->getServiceContext())
.find(uuid())
.count +
UncommittedMetaChange::read(opCtx).find(uuid()).count
: _shared->_recordStore->numRecords();
}
long long CollectionImpl::dataSize(OperationContext* opCtx) const {
return _shared->_recordStore->dataSize();
return (gFeatureFlagReplicatedSizeAndCount.isEnabledUseLastLTSFCVWhenUninitialized(
VersionContext::getDecoration(opCtx),
serverGlobalParams.featureCompatibility.acquireFCVSnapshot()))
? ReplicatedSizeAndCountMetadataManager::get(opCtx->getServiceContext()).find(uuid()).size +
UncommittedMetaChange::read(opCtx).find(uuid()).size
: _shared->_recordStore->dataSize();
}
int64_t CollectionImpl::sizeOnDisk(OperationContext* opCtx,