SERVER-106393 Replace AtomicProxy usage (#39221)

Co-authored-by: eduardo <eduardo.lopez@mongodb.com>
GitOrigin-RevId: 39aa6d8198351b7400e18ce8fdac0df70e5a7728
This commit is contained in:
Eduardo Lopez
2025-07-29 14:43:31 -04:00
committed by MongoDB Bot
parent bd01835fcd
commit 5bc2052b2c
22 changed files with 37 additions and 230 deletions

View File

@@ -575,9 +575,9 @@ properties:
description: >
Must contain the value startup, runtime, [startup, runtime], or cluster. If runtime
is specified along with cpp_varname, then decltype(cpp_varname) must refer to a
thread-safe storage type, specifically: AtomicWord<T>, AtomicDouble,
std::atomic<T>, or boost::synchronized<T>. Parameters declared as cluster can only
be set at runtime and exhibit numerous differences.
thread-safe storage type, specifically: AtomicWord<T>, AtomicWord<double>, std::atomic<T>,
or boost::synchronized<T>. Parameters declared as cluster can only be set at
runtime and exhibit numerous differences.
oneOf:
- type: string
enum:

View File

@@ -97,7 +97,7 @@ must be unique across the server instance. More information on the specific fiel
- `set_at` (required): Must contain the value `startup`, `runtime`, [`startup`, `runtime`], or
`cluster`. If `runtime` is specified along with `cpp_varname`, then `decltype(cpp_varname)` must
refer to a thread-safe storage type, specifically: `AtomicWord<T>`, `AtomicDouble`, `std::atomic<T>`,
refer to a thread-safe storage type, specifically: `AtomicWord<T>`, `std::atomic<T>`,
or `boost::synchronized<T>`. Parameters declared as `cluster` can only be set at runtime and exhibit
numerous differences. See [Cluster Server Parameters](cluster-server-parameters) below.

View File

@@ -759,7 +759,6 @@ filegroup(
"//src/mongo/idl:generic_argument_gen",
"//src/mongo/idl:idl_parser.h",
"//src/mongo/idl:error_status_idl.h",
"//src/mongo/platform:atomic_proxy.h",
"//src/mongo/platform:visibility.h",
"//src/mongo/rpc:get_status_from_command_result.h",
"//src/mongo/rpc:get_status_from_command_result_write_util.h",

View File

@@ -100,7 +100,7 @@ Status CachedPlanStage::pickBestPlan(const QueryPlannerParams& plannerParams,
// If we work this many times during the trial period, then we will replan the
// query from scratch.
size_t maxWorksBeforeReplan =
static_cast<size_t>(internalQueryCacheEvictionRatio * _decisionWorks);
static_cast<size_t>(internalQueryCacheEvictionRatio.load() * _decisionWorks);
// The trial period ends without replanning if the cached plan produces this many results.
size_t numResults = trial_period::getTrialPeriodNumToReturn(*_canonicalQuery);

View File

@@ -48,7 +48,6 @@
#include "mongo/db/service_context.h"
#include "mongo/db/storage/exceptions.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_proxy.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/duration.h"

View File

@@ -219,7 +219,7 @@ std::unique_ptr<PlannerInterface> attemptToUsePlan(
std::move(sbePlanAndData));
}
const size_t maxReadsBeforeReplan = internalQueryCacheEvictionRatio * *decisionReads;
const size_t maxReadsBeforeReplan = internalQueryCacheEvictionRatio.load() * *decisionReads;
auto candidate = collectExecutionStatsForCachedPlan(plannerData,
std::move(solution),
indexExistenceChecker,

View File

@@ -45,7 +45,6 @@
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/temporary_record_store.h"
#include "mongo/platform/atomic_proxy.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/unordered_map.h"

View File

@@ -57,7 +57,7 @@ server_parameters:
- runtime
- startup
cpp_varname: maxIndexBuildMemoryUsageMegabytes
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 200
validator:
callback: validateMaxIndexBuildMemoryUsageMegabytesSetting

View File

@@ -62,7 +62,6 @@
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_proxy.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/s/sharding_feature_flags_gen.h"
#include "mongo/util/assert_util.h"
@@ -517,7 +516,7 @@ Status storeMongodOptions(const moe::Environment& params) {
}
if (params.count("storage.syncPeriodSecs")) {
storageGlobalParams.syncdelay = params["storage.syncPeriodSecs"].as<double>();
storageGlobalParams.syncdelay.store(params["storage.syncPeriodSecs"].as<double>());
Status conflictStatus =
checkConflictWithSetParameter("storage.syncPeriodSecs", "syncdelay");
if (!conflictStatus.isOK()) {

View File

@@ -52,7 +52,6 @@
#include "mongo/db/query/plan_cache/plan_cache_log_utils.h"
#include "mongo/db/query/plan_ranking_decision.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/platform/atomic_proxy.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/container_size_helper.h"
@@ -550,7 +549,8 @@ public:
hasOldEntry ? &**oldEntryWithStatus.getValue() : nullptr,
newReadsOrWorks,
*cachedPlan.get(),
worksGrowthCoefficient.get_value_or(internalQueryCacheWorksGrowthCoefficient),
worksGrowthCoefficient.get_value_or(
internalQueryCacheWorksGrowthCoefficient.load()),
callbacks);
// Avoid recomputing the hashes if we've got an old entry to grab them from.

View File

@@ -35,7 +35,6 @@ global:
- "mongo/db/query/plan_cache/sbe_plan_cache_on_parameter_change.h"
- "mongo/db/query/query_knob_expressions.h"
- "mongo/db/query/query_stats/query_stats_on_parameter_change.h"
- "mongo/platform/atomic_proxy.h"
- "mongo/platform/atomic_word.h"
enums:
@@ -138,7 +137,7 @@ server_parameters:
is taken as this fraction of the collection size. Applies only to the classic execution engine.
set_at: [startup, runtime]
cpp_varname: "internalQueryPlanEvaluationCollFraction"
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 0.3
validator:
gte: 0.0
@@ -151,7 +150,7 @@ server_parameters:
the number of times we work() all candidate plans in total. Applies only to the classic execution engine.
set_at: [startup, runtime]
cpp_varname: "internalQueryPlanTotalEvaluationCollFraction"
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 2.0
validator:
gte: 0.0
@@ -304,7 +303,7 @@ server_parameters:
and replanning?
set_at: [startup, runtime]
cpp_varname: "internalQueryCacheEvictionRatio"
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 10.0
validator:
gte: 0.0
@@ -317,7 +316,7 @@ server_parameters:
exponentially. The value of this server parameter is the base.
set_at: [startup, runtime]
cpp_varname: "internalQueryCacheWorksGrowthCoefficient"
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 2.0
validator:
gt: 1.0
@@ -912,7 +911,7 @@ server_parameters:
internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill].
set_at: [startup, runtime]
cpp_varname: "internalQuerySBEAggMemoryUseCheckMargin"
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 0.7
validator:
gt: 0.0
@@ -1270,7 +1269,7 @@ server_parameters:
internalQueryStatsRateLimit to 0.
set_at: [startup, runtime]
cpp_varname: "internalQueryStatsSampleRate"
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 0
validator:
gte: 0.0
@@ -1360,7 +1359,7 @@ server_parameters:
description: "The minimal density required for the hybrid container to switch from hash table to Roaring Bitmaps."
set_at: [startup, runtime]
cpp_varname: internalRoaringBitmapsMinimalDensity
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 0.00001
redact: false
@@ -1450,7 +1449,7 @@ server_parameters:
`internalQueryMaxSizeFactorToSimplify`, the simplified one will be rejected.
set_at: [startup, runtime]
cpp_varname: internalQueryMaxSizeFactorToSimplify
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 1.0
redact: false
@@ -1630,7 +1629,7 @@ server_parameters:
than 10% lead to very wide confidence interval and poor estimate accuracy.
set_at: [startup, runtime]
cpp_varname: samplingMarginOfError
cpp_vartype: AtomicDouble
cpp_vartype: AtomicWord<double>
default: 5.0
validator:
gte: 1.0

View File

@@ -47,7 +47,6 @@
#include "mongo/db/server_parameter.h"
#include "mongo/db/tenant_id.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/platform/atomic_proxy.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
@@ -152,42 +151,6 @@ private:
U _defaultValue;
};
// Covers AtomicDouble
template <typename U, typename P>
struct storage_wrapper<AtomicProxy<U, P>> {
static constexpr bool isTenantAware = false;
using type = U;
storage_wrapper(AtomicProxy<U, P>& storage)
: _storage(storage), _defaultValue(storage.load()) {}
void store(const U& value, const boost::optional<TenantId>& id) {
invariant(!id.is_initialized());
_storage.store(value);
}
U load(const boost::optional<TenantId>& id) const {
invariant(!id.is_initialized());
return _storage.load();
}
void reset(const boost::optional<TenantId>& id) {
invariant(!id.is_initialized());
_storage.store(_defaultValue);
}
// Not thread-safe, will only be called once at most per ServerParameter in its initialization
// block.
void setDefault(const U& value) {
_defaultValue = value;
}
private:
AtomicProxy<U, P>& _storage;
// Copy of original value to be read from during resets.
U _defaultValue;
};
template <typename U>
struct storage_wrapper<synchronized_value<U>> {

View File

@@ -98,16 +98,18 @@ void Checkpointer::run() {
LOGV2_DEBUG(7702900,
1,
"Checkpoint thread sleeping",
"duration"_attr = static_cast<std::int64_t>(storageGlobalParams.syncdelay));
_sleepCV.wait_for(
lock,
stdx::chrono::seconds(static_cast<std::int64_t>(storageGlobalParams.syncdelay)),
[&] { return _shuttingDown || _triggerCheckpoint; });
"duration"_attr =
static_cast<std::int64_t>(storageGlobalParams.syncdelay.load()));
_sleepCV.wait_for(lock,
stdx::chrono::seconds(
static_cast<std::int64_t>(storageGlobalParams.syncdelay.load())),
[&] { return _shuttingDown || _triggerCheckpoint; });
// If the syncdelay is set to 0, that means we should skip checkpointing. However,
// syncdelay is adjustable by a runtime server parameter, so we need to wake up to check
// periodically. The wakeup to check period is arbitrary.
while (storageGlobalParams.syncdelay == 0 && !_shuttingDown && !_triggerCheckpoint) {
while (storageGlobalParams.syncdelay.load() == 0 && !_shuttingDown &&
!_triggerCheckpoint) {
_sleepCV.wait_for(lock, stdx::chrono::seconds(static_cast<std::int64_t>(3)), [&] {
return _shuttingDown || _triggerCheckpoint;
});

View File

@@ -60,7 +60,7 @@ void StorageGlobalParams::reset() {
noTableScan.store(false);
directoryperdb = false;
syncdelay = 60.0;
syncdelay.store(60.0);
queryableBackupMode = false;
groupCollections = false;
oplogMinRetentionHours.store(0.0);

View File

@@ -29,7 +29,6 @@
#pragma once
#include "mongo/platform/atomic_proxy.h"
#include "mongo/platform/atomic_word.h"
#include <string>
@@ -116,7 +115,7 @@ struct StorageGlobalParams {
// This parameter is both a server parameter and a configuration parameter, and to resolve
// conflicts between the two the default must be set here.
static constexpr double kMaxSyncdelaySecs = 60 * 60; // 1hr
AtomicDouble syncdelay{60.0}; // seconds between checkpoints
AtomicWord<double> syncdelay{60.0}; // seconds between checkpoints
// --queryableBackupMode
// Prevents user-originating operations from performing writes to the server. Internally

View File

@@ -441,18 +441,18 @@ std::string generateWTOpenConfigString(const WiredTigerKVEngineBase::WiredTigerC
// checkpoints more often.
const double fourMinutesInSeconds = 240.0;
int ckptsPerFourMinutes;
if (storageGlobalParams.syncdelay <= 0.0) {
if (storageGlobalParams.syncdelay.load() <= 0.0) {
ckptsPerFourMinutes = 1;
} else {
ckptsPerFourMinutes =
static_cast<int>(fourMinutesInSeconds / storageGlobalParams.syncdelay);
static_cast<int>(fourMinutesInSeconds / storageGlobalParams.syncdelay.load());
}
if (ckptsPerFourMinutes < 1) {
LOGV2_WARNING(8423377,
"Unexpected value for checkpoint retention",
"syncdelay"_attr =
static_cast<std::int64_t>(storageGlobalParams.syncdelay),
static_cast<std::int64_t>(storageGlobalParams.syncdelay.load()),
"ckptsPerFourMinutes"_attr = ckptsPerFourMinutes);
ckptsPerFourMinutes = 1;
}

View File

@@ -1397,7 +1397,7 @@ Status WiredTigerUtil::canRunAutoCompact(bool isEphemeral) {
return Status(ErrorCodes::IllegalOperation,
"autoCompact() cannot be executed for in-memory configurations");
}
if (storageGlobalParams.syncdelay == 0) {
if (storageGlobalParams.syncdelay.load() == 0) {
return Status(ErrorCodes::IllegalOperation,
"autoCompact() can only be executed when checkpoints are enabled");
}

View File

@@ -69,7 +69,6 @@
#include "mongo/db/storage/snapshot.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/dbtests/dbtests.h" // IWYU pragma: keep
#include "mongo/platform/atomic_proxy.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -201,7 +200,7 @@ public:
auto plannerParams = makePlannerParams(collection, *cq);
const size_t decisionWorks = 10;
const size_t mockWorks =
1U + static_cast<size_t>(internalQueryCacheEvictionRatio * decisionWorks);
1U + static_cast<size_t>(internalQueryCacheEvictionRatio.load() * decisionWorks);
auto mockChild = std::make_unique<MockStage>(_expCtx.get(), &_ws);
for (size_t i = 0; i < mockWorks; i++) {
mockChild->enqueueStateCode(PlanStage::NEED_TIME);
@@ -306,7 +305,7 @@ TEST_F(QueryStageCachedPlan, QueryStageCachedPlanHitMaxWorks) {
auto plannerParams = makePlannerParams(collection, *cq);
const size_t decisionWorks = 10;
const size_t mockWorks =
1U + static_cast<size_t>(internalQueryCacheEvictionRatio * decisionWorks);
1U + static_cast<size_t>(internalQueryCacheEvictionRatio.load() * decisionWorks);
auto mockChild = std::make_unique<MockStage>(_expCtx.get(), &_ws);
for (size_t i = 0; i < mockWorks; i++) {
mockChild->enqueueStateCode(PlanStage::NEED_TIME);

View File

@@ -173,8 +173,8 @@ TEST(ServerParameterWithStorage, StorageTest) {
doStorageTestByAtomic<AtomicWord<bool>>("AtomicWord<bool>", boolVals, stringVals);
doStorageTestByAtomic<AtomicWord<int>>("AtomicWord<int>", numberVals, stringVals);
doStorageTestByAtomic<AtomicDouble>("AtomicDoubleI", numberVals, stringVals);
doStorageTestByAtomic<AtomicDouble>("AtomicDoubleD", doubleVals, stringVals);
doStorageTestByAtomic<AtomicWord<double>>("AtomicWord<double>I", numberVals, stringVals);
doStorageTestByAtomic<AtomicWord<double>>("AtomicWord<double>D", doubleVals, stringVals);
}
TEST(ServerParameterWithStorage, BoundsTest) {

View File

@@ -98,7 +98,6 @@ mongo_cc_unit_test(
mongo_cc_unit_test(
name = "platform_test",
srcs = [
"atomic_proxy_test.cpp",
"atomic_test.cpp",
"bits_test.cpp",
"decimal128_bson_test.cpp",

View File

@@ -1,88 +0,0 @@
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/base/static_assert.h"
#include "mongo/config.h" // IWYU pragma: keep
#include <atomic>
#include <cstdint>
#include <cstring>
#include <type_traits>
namespace mongo {
/**
* Provides a simple version of an atomic version of T
* that uses std::atomic<BaseWordT> as a backing type;
*/
template <typename T, typename BaseWordT>
class AtomicProxy {
MONGO_STATIC_ASSERT_MSG(sizeof(T) == sizeof(BaseWordT),
"T and BaseWordT must have the same size");
MONGO_STATIC_ASSERT_MSG(std::is_integral_v<BaseWordT>, "BaseWordT must be an integral type");
MONGO_STATIC_ASSERT_MSG(std::is_trivially_copyable_v<T>, "T must be trivially copyable");
public:
using value_type = T;
using base_type = BaseWordT;
explicit AtomicProxy(T value = 0) {
store(value);
}
T operator=(T value) {
store(value);
return value;
}
operator T() const {
return load();
}
T load(std::memory_order order = std::memory_order_seq_cst) const {
const BaseWordT tempInteger = _value.load(order);
T value;
std::memcpy(&value, &tempInteger, sizeof(T));
return value;
}
void store(const T value, std::memory_order order = std::memory_order_seq_cst) {
BaseWordT tempInteger;
std::memcpy(&tempInteger, &value, sizeof(T));
_value.store(tempInteger, order);
}
private:
std::atomic<BaseWordT> _value; // NOLINT
};
using AtomicDouble = AtomicProxy<double, std::uint64_t>;
} // namespace mongo

View File

@@ -1,62 +0,0 @@
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/atomic_proxy.h"
#include "mongo/base/string_data.h"
#include "mongo/unittest/unittest.h"
#include <memory>
#include <fmt/format.h>
namespace mongo {
namespace {
template <typename AtomicProxyType>
void testAtomicProxyBasicOperations() {
typedef typename AtomicProxyType::value_type WordType;
AtomicProxyType w;
ASSERT_EQUALS(WordType(0), w.load());
w.store(1);
ASSERT_EQUALS(WordType(1), w.load());
}
TEST(AtomicProxyTests, BasicOperationsDouble) {
testAtomicProxyBasicOperations<AtomicDouble>();
AtomicDouble d(3.14159);
ASSERT_EQUALS(3.14159, d.load());
}
} // namespace
} // namespace mongo