Compare commits

..

2 Commits

27 changed files with 257 additions and 445 deletions

View File

@@ -1,52 +0,0 @@
/**
* Tests that after donorStartCommand is run, that reads and writes should be blocked for the
* migrating tenant.
* @tags: [requires_fcv_46]
*/
(function() {
"use strict";
const runDonorStartMigrationCommand =
(primaryConnection, migrationId, recipientConnectionString, dbPrefix, readPreference) => {
return primaryConnection.adminCommand({
donorStartMigration: 1,
migrationId,
recipientConnectionString,
databasePrefix: dbPrefix,
readPreference
});
};
// the value of the access enum in MigratingTenantAccessBlocker for the 'kBlockWritesAndReads'
// access state.
const kBlockReadsAndWrites = 2;
const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
const donorPrimary = rst.getPrimary();
const kMigrationId = new UUID();
const kRecipientConnectionString = new ReplSetTest({nodes: 1}).getURL();
const kReadPreference = {
mode: "primary"
};
const kDBPrefixes = 'databaseABC';
jsTest.log('Running donorStartMigration command.');
assert.commandWorked(runDonorStartMigrationCommand(
donorPrimary, kMigrationId, kRecipientConnectionString, kDBPrefixes, kReadPreference));
jsTest.log('Running the serverStatus command.');
const migratingTenantServerStatus =
donorPrimary.adminCommand({serverStatus: 1}).migratingTenantAccessBlocker;
// Due to the way that the state machine works, if both reads and writes are blocked, we know that
// at some point only writes were blocked Thus both kBlockWrites and kBlockWritesAndReads are states
// that are used.
assert.eq(migratingTenantServerStatus.access, kBlockReadsAndWrites);
assert(migratingTenantServerStatus.blockTimestamp);
rst.stopSet();
})();

View File

@@ -26,55 +26,62 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication;
#include "mongo/db/commands/migrate_tenant_cmds.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/migrate_tenant_cmds_gen.h"
#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
#include "mongo/db/repl/migrating_tenant_donor_util.h"
#include "mongo/logv2/log.h"
namespace mongo {
namespace {
class DonorStartMigrationCmd : public TenantMigrationDonorCmdBase<DonorStartMigrationCmd> {
template <typename RequestT>
class MigrationDonorCmdBase : public TypedCommand<MigrationDonorCmdBase<RequestT>> {
public:
using Request = DonorStartMigration;
using ParentInvocation = TenantMigrationDonorCmdBase<DonorStartMigrationCmd>::Invocation;
class Invocation : public ParentInvocation {
using ParentInvocation::ParentInvocation;
using Request = RequestT;
using TC = TypedCommand<MigrationDonorCmdBase<RequestT>>;
class Invocation : public TC::InvocationBase {
public:
void typedRun(OperationContext* opCtx) {
const auto requestBody = request();
auto donorStateDocument = getDonorStateDocumentFromRequest(requestBody);
using TC::InvocationBase::InvocationBase;
using TC::InvocationBase::request;
migrating_tenant_donor_util::persistDonorStateDocument(opCtx, donorStateDocument);
migrating_tenant_donor_util::dataSync(opCtx, donorStateDocument);
}
TenantMigrationDonorDocument getDonorStateDocumentFromRequest(
const RequestType& requestBody) {
mongo::UUID migrationId = requestBody.getMigrationId();
std::string recipientURI = requestBody.getRecipientConnectionString().toString();
std::string dbPrefix = requestBody.getDatabasePrefix().toString();
auto donorStartState = TenantMigrationDonorStateEnum::kDataSync;
bool garbageCollect = false;
const TenantMigrationDonorDocument donorStateDocument(
migrationId, recipientURI, dbPrefix, donorStartState, garbageCollect);
return donorStateDocument;
}
void typedRun(OperationContext* opCtx) {}
private:
void doCheckAuthorization(OperationContext* opCtx) const {}
bool supportsWriteConcern() const override {
return false;
}
NamespaceString ns() const override {
return NamespaceString(request().getDbName(), "");
}
void doCheckAuthorization(OperationContext* opCtx) const override {}
};
std::string help() const {
bool adminOnly() const override {
return true;
}
std::string help() const override {
return "Multi-tenant migration command on the donor.";
}
BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return BasicCommand::AllowedOnSecondary::kNever;
}
};
class DonorStartMigrationCmd : public MigrationDonorCmdBase<DonorStartMigration> {
public:
using ParentInvocation = MigrationDonorCmdBase<DonorStartMigration>::Invocation;
class Invocation final : public ParentInvocation {
public:
void typedRun(OperationContext* opCtx) {}
private:
void doCheckAuthorization(OperationContext* opCtx) const override {}
};
std::string help() const override {
return "Start migrating databases whose names match the specified prefix to the specified "
"replica set.";
}
@@ -82,19 +89,12 @@ public:
} donorStartMigrationCmd;
class DonorWaitForMigrationToCommitCmd
: public TenantMigrationDonorCmdBase<DonorWaitForMigrationToCommitCmd> {
: public MigrationDonorCmdBase<DonorWaitForMigrationToCommit> {
public:
using Request = DonorWaitForMigrationToCommit;
using ParentInvocation =
TenantMigrationDonorCmdBase<DonorWaitForMigrationToCommitCmd>::Invocation;
class Invocation : public ParentInvocation {
using ParentInvocation::ParentInvocation;
using ParentInvocation = MigrationDonorCmdBase<DonorWaitForMigrationToCommit>::Invocation;
class Invocation final : public ParentInvocation {
public:
void typedRun(OperationContext* opCtx) {}
private:
void doCheckAuthorization(OperationContext* opCtx) const {}
};
std::string help() const override {
@@ -103,18 +103,12 @@ public:
} donorWaitForMigrationToCommit;
class DonorForgetMigrationCmd : public TenantMigrationDonorCmdBase<DonorForgetMigrationCmd> {
class DonorForgetMigrationCmd : public MigrationDonorCmdBase<DonorForgetMigration> {
public:
using Request = DonorForgetMigration;
using ParentInvocation = TenantMigrationDonorCmdBase<DonorForgetMigrationCmd>::Invocation;
class Invocation : public ParentInvocation {
using ParentInvocation::ParentInvocation;
using ParentInvocation = MigrationDonorCmdBase<DonorWaitForMigrationToCommit>::Invocation;
class Invocation final : public ParentInvocation {
public:
void typedRun(OperationContext* opCtx) {}
private:
void doCheckAuthorization(OperationContext* opCtx) const {}
};
std::string help() const override {

View File

@@ -1,60 +0,0 @@
/**
* Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/commands.h"
namespace mongo {
template <typename DerivedT>
class TenantMigrationDonorCmdBase : public TypedCommand<DerivedT> {
public:
using TC = TypedCommand<DerivedT>;
class Invocation : public TC::InvocationBase {
using TC::InvocationBase::InvocationBase;
private:
bool supportsWriteConcern() const override {
return false;
}
NamespaceString ns() const {
return NamespaceString(TC::InvocationBase::request().getDbName(), "");
}
};
bool adminOnly() const override {
return true;
}
BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return BasicCommand::AllowedOnSecondary::kNever;
}
};
} // namespace mongo

View File

@@ -33,6 +33,7 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/transport/message_compressor_registry.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/util/net/hostname_canonicalization.h"
#include "mongo/util/net/socket_utils.h"
#include "mongo/util/net/ssl_manager.h"
@@ -77,12 +78,13 @@ public:
return true;
}
// TODO: need to track connections in server stats (see SERVER-49073)
BSONObj generateSection(OperationContext* opCtx,
const BSONElement& configElement) const override {
BSONObjBuilder b;
networkCounter.append(b);
appendMessageCompressionStats(&b);
auto executor = opCtx->getServiceContext()->getServiceExecutor();
auto executor = transport::ServiceExecutorSynchronous::get(opCtx->getServiceContext());
if (executor) {
BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats"));
executor->appendStats(&section);

View File

@@ -734,16 +734,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
// operation context anymore
startupOpCtx.reset();
auto start = serviceContext->getServiceExecutor()->start();
if (!start.isOK()) {
LOGV2_ERROR(20570,
"Error starting service executor: {error}",
"Error starting service executor",
"error"_attr = start);
return EXIT_NET_ERROR;
}
start = serviceContext->getServiceEntryPoint()->start();
auto start = serviceContext->getServiceEntryPoint()->start();
if (!start.isOK()) {
LOGV2_ERROR(20571,
"Error starting service entry point: {error}",
@@ -1279,18 +1270,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
"Service entry point did not shutdown within the time limit");
}
}
// Shutdown and wait for the service executor to exit
if (auto svcExec = serviceContext->getServiceExecutor()) {
LOGV2_OPTIONS(4784924, {LogComponent::kExecutor}, "Shutting down the service executor");
Status status = svcExec->shutdown(Seconds(10));
if (!status.isOK()) {
LOGV2_OPTIONS(20564,
{LogComponent::kNetwork},
"Service executor did not shutdown within the time limit",
"error"_attr = status);
}
}
#endif
LOGV2(4784925, "Shutting down free monitoring");

View File

@@ -1227,7 +1227,6 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/rw_concern_d',
'$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'local_oplog_info',

View File

@@ -52,9 +52,12 @@ structs:
strict: true
fields:
_id:
type: objectid
description: "A unique identifier for the document."
cpp_name: id
migrationId:
type: uuid
description: "Unique identifier for the tenant migration."
cpp_name: id
recipientConnectionString:
type: string
description: "The URI string that the donor will utilize to create a connection with the recipient."
@@ -79,9 +82,12 @@ structs:
strict: true
fields:
_id:
type: objectid
description: "A unique identifier for the document."
cpp_name: id
migrationId:
type: uuid
description: "Unique identifier for the tenant migration."
cpp_name: id
donorConnectionString:
type: string
description: "The URI string that the donor will utilize to create a connection with the recipient."

View File

@@ -27,10 +27,7 @@
* it in the license file.
*/
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication;
#include "mongo/platform/basic.h"
#include "mongo/util/str.h"
#include "mongo/db/repl/migrating_tenant_donor_util.h"
@@ -39,13 +36,10 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
#include "mongo/db/repl/migrating_tenant_access_blocker_by_prefix.h"
#include "mongo/db/s/persistent_task_store.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
namespace migrating_tenant_donor_util {
@@ -61,10 +55,8 @@ const char kNetName[] = "TenantMigrationWorkerNetwork";
* state.
*/
void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorDoc) {
LOGV2(4917300, "Reached the beginning of the onTransitionToBlocking");
invariant(donorDoc.getState() == TenantMigrationDonorStateEnum::kBlocking);
invariant(donorDoc.getBlockTimestamp());
LOGV2(4917300, "Passed invariants of the onTransitionToBlocking");
auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(opCtx->getServiceContext());
auto mtab = mtabByPrefix.getMigratingTenantBlocker(donorDoc.getDatabasePrefix());
@@ -91,98 +83,15 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
mtab->startBlockingReadsAfter(donorDoc.getBlockTimestamp().get());
}
/**
* Creates a MigratingTenantAccess blocker, and makes it start blocking writes. Then adds it to
* the MigratingTenantAccessBlockerByPrefix container using the donor document's databasePrefix as
* the key.
*/
void startTenantMigrationBlockOnPrimary(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDocument) {
invariant(donorStateDocument.getState() == TenantMigrationDonorStateEnum::kDataSync);
auto serviceContext = opCtx->getServiceContext();
executor::TaskExecutor* mtabExecutor = getTenantMigrationExecutor(serviceContext).get();
auto mtab = std::make_shared<MigratingTenantAccessBlocker>(serviceContext, mtabExecutor);
mtab->startBlockingWrites();
auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(serviceContext);
mtabByPrefix.add(donorStateDocument.getDatabasePrefix(), mtab);
}
/**
* Returns an updated donor state document that will be the same as the original except with the
* state set to blocking and the blockTimestamp set to the reserved Optime.
*/
TenantMigrationDonorDocument createUpdatedDonorStateDocument(
const TenantMigrationDonorDocument& originalDonorStateDocument, const OplogSlot& oplogSlot) {
TenantMigrationDonorDocument updatedDoc = originalDonorStateDocument;
updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking);
updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp());
return updatedDoc;
}
/**
* Returns the arguments object for the update command with the _id as the criteria and the reserved
* opTime as the oplogSlot.
*/
CollectionUpdateArgs createUpdateArgumentsForDonorStateDocument(
const TenantMigrationDonorDocument& originalDonorStateDocument,
const OplogSlot& oplogSlot,
const BSONObj& serializedUpdatedDonorStateDocument) {
CollectionUpdateArgs args;
args.criteria = BSON("_id" << originalDonorStateDocument.getId());
args.oplogSlot = oplogSlot;
args.update = serializedUpdatedDonorStateDocument;
return args;
}
/**
* After reserving the opTime for the write and creating the new updated document with the necessary
* update arguments. It will send the update command to the tenant migration donors collection.
*/
void updateDonorStateDocument(OperationContext* opCtx,
Collection* collection,
const TenantMigrationDonorDocument& originalDonorStateDocument) {
const auto originalRecordId = Helpers::findOne(
opCtx, collection, originalDonorStateDocument.toBSON(), false /* requireIndex */);
invariant(!originalRecordId.isNull());
// Reserve an opTime for the write and use it as the blockTimestamp for the migration.
auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
BSONObj serializedUpdatedDonorDoc =
createUpdatedDonorStateDocument(originalDonorStateDocument, oplogSlot).toBSON();
CollectionUpdateArgs args = createUpdateArgumentsForDonorStateDocument(
originalDonorStateDocument, oplogSlot, serializedUpdatedDonorDoc);
const auto originalSnapshot = Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(),
originalDonorStateDocument.toBSON());
collection->updateDocument(opCtx,
originalRecordId,
originalSnapshot,
serializedUpdatedDonorDoc,
false,
nullptr /* OpDebug* */,
&args);
}
} // namespace
/**
* TODO - Implement recipientSyncData command
*/
void dataSync(OperationContext* opCtx,
const TenantMigrationDonorDocument& originalDonorStateDocument) {
invariant(originalDonorStateDocument.getState() == TenantMigrationDonorStateEnum::kDataSync);
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDoc) {
// Send recipientSyncData.
startTenantMigrationBlockOnPrimary(opCtx, originalDonorStateDocument);
// Call startBlockingWrites.
// Update the on-disk state of the migration to "blocking" state.
invariant(originalDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
uassertStatusOK(writeConflictRetry(
opCtx,
@@ -200,14 +109,40 @@ void dataSync(OperationContext* opCtx,
}
WriteUnitOfWork wuow(opCtx);
updateDonorStateDocument(opCtx, collection, originalDonorStateDocument);
const auto originalRecordId =
Helpers::findOne(opCtx, collection, originalDoc.toBSON(), false /* requireIndex */);
invariant(!originalRecordId.isNull());
// Reserve an opTime for the write and use it as the blockTimestamp for the migration.
auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
TenantMigrationDonorDocument updatedDoc;
updatedDoc.setId(originalDoc.getId());
updatedDoc.setDatabasePrefix(originalDoc.getDatabasePrefix());
updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking);
updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp());
CollectionUpdateArgs args;
args.update = updatedDoc.toBSON();
args.criteria = BSON("_id" << originalDoc.getId());
args.oplogSlot = oplogSlot;
collection->updateDocument(
opCtx,
originalRecordId,
Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(), originalDoc.toBSON()),
updatedDoc.toBSON(),
false,
nullptr /* OpDebug* */,
&args);
wuow.commit();
return Status::OK();
}));
}
std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContext* serviceContext) {
ThreadPool::Options tpOptions;
tpOptions.threadNamePrefix = kThreadNamePrefix;
@@ -222,16 +157,14 @@ std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContex
executor::makeNetworkInterface(kNetName, nullptr, nullptr));
}
void onTenantMigrationDonorStateTransition(OperationContext* opCtx,
const BSONObj& serializedDonorStateDocument) {
auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"),
serializedDonorStateDocument);
void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc) {
auto donorDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"), doc);
switch (donorStateDoc.getState()) {
switch (donorDoc.getState()) {
case TenantMigrationDonorStateEnum::kDataSync:
break;
case TenantMigrationDonorStateEnum::kBlocking:
onTransitionToBlocking(opCtx, donorStateDoc);
onTransitionToBlocking(opCtx, donorDoc);
break;
case TenantMigrationDonorStateEnum::kCommitted:
break;
@@ -242,27 +175,6 @@ void onTenantMigrationDonorStateTransition(OperationContext* opCtx,
}
}
/**
* The function will persist(insert) the provided donorStateDocument into the config data on the
* collection for the tenantMigration donors In order to maintain a stable state for the tenant
* migration in case of node failure or restart.
*/
void persistDonorStateDocument(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDocument) {
PersistentTaskStore<TenantMigrationDonorDocument> store(
NamespaceString::kMigrationDonorsNamespace);
try {
store.add(opCtx, donorStateDocument);
} catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
uasserted(
4917300,
str::stream()
<< "While attempting to persist the donor state machine for tenant migration"
<< ", found another document with the same migration id. Attempted migration: "
<< donorStateDocument.toBSON());
}
}
} // namespace migrating_tenant_donor_util
} // namespace mongo

View File

@@ -41,8 +41,7 @@ namespace migrating_tenant_donor_util {
* Sends recipientSyncData to the recipient until success and starts blocking writes and causal
* reads.
*/
void dataSync(OperationContext* opCtx,
const TenantMigrationDonorDocument& originalDonorStateDocument);
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& donorDoc);
/**
* Creates a task executor to be used for tenant migration.
@@ -55,8 +54,6 @@ std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContex
*/
void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc);
void persistDonorStateDocument(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDocument);
} // namespace migrating_tenant_donor_util
} // namespace mongo

View File

@@ -4821,13 +4821,13 @@ void ReplicationCoordinatorImpl::attemptToAdvanceStableTimestamp() {
_setStableTimestampForStorage(lk);
}
boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::_recalculateStableOpTime(
WithLock lk) {
auto commitPoint = _topCoord->getLastCommittedOpTimeAndWallTime();
OpTime ReplicationCoordinatorImpl::_recalculateStableOpTime(WithLock lk) {
auto commitPoint = _topCoord->getLastCommittedOpTime();
auto lastApplied = _topCoord->getMyLastAppliedOpTime();
if (_currentCommittedSnapshot) {
auto snapshotOpTime = _currentCommittedSnapshot->opTime;
invariant(snapshotOpTime.getTimestamp() <= commitPoint.opTime.getTimestamp());
invariant(snapshotOpTime <= commitPoint.opTime);
invariant(snapshotOpTime.getTimestamp() <= commitPoint.getTimestamp());
invariant(snapshotOpTime <= commitPoint);
}
//
@@ -4858,28 +4858,24 @@ boost::optional<OpTimeAndWallTime> ReplicationCoordinatorImpl::_recalculateStabl
// order. Because of this, the stable timestamp must not fall in the middle of a batch while it
// is being applied. To prevent this we ensure the no-overlap point does not surpass the
// lastApplied, which is only advanced at the end of secondary batch application.
OpTime noOverlap = std::min(_topCoord->getMyLastAppliedOpTime(), allDurableOpTime);
OpTime noOverlap = std::min(lastApplied, allDurableOpTime);
// The stable optime must always be less than or equal to the no overlap point. When majority
// reads are enabled, the stable optime must also not surpass the majority commit point. When
// majority reads are disabled, the stable optime is not required to be majority committed.
boost::optional<OpTimeAndWallTime> stableOpTime;
auto maximumStableOpTime = serverGlobalParams.enableMajorityReadConcern
? commitPoint
: _topCoord->getMyLastAppliedOpTimeAndWallTime();
OpTime stableOpTime;
auto maximumStableOpTime =
serverGlobalParams.enableMajorityReadConcern ? commitPoint : lastApplied;
// Make sure the stable optime does not surpass its maximum.
stableOpTime = OpTimeAndWallTime(std::min(noOverlap, maximumStableOpTime.opTime), Date_t());
stableOpTime = std::min(noOverlap, maximumStableOpTime);
if (stableOpTime) {
// Check that the selected stable optime does not exceed our maximum and that it does not
// surpass the no-overlap point.
invariant(stableOpTime.get().opTime.getTimestamp() <=
maximumStableOpTime.opTime.getTimestamp());
invariant(stableOpTime.get().opTime <= maximumStableOpTime.opTime);
invariant(stableOpTime.get().opTime.getTimestamp() <= noOverlap.getTimestamp());
invariant(stableOpTime.get().opTime <= noOverlap);
}
// Check that the selected stable optime does not exceed our maximum and that it does not
// surpass the no-overlap point.
invariant(stableOpTime.getTimestamp() <= maximumStableOpTime.getTimestamp());
invariant(stableOpTime <= maximumStableOpTime);
invariant(stableOpTime.getTimestamp() <= noOverlap.getTimestamp());
invariant(stableOpTime <= noOverlap);
return stableOpTime;
}
@@ -4906,7 +4902,7 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
}
// Get the current stable optime.
auto stableOpTime = _recalculateStableOpTime(lk);
OpTime stableOpTime = _recalculateStableOpTime(lk);
// Don't update the stable timestamp if it is earlier than the initial data timestamp.
// Timestamps before the initialDataTimestamp are not consistent and so are not safe to use for
@@ -4915,61 +4911,60 @@ void ReplicationCoordinatorImpl::_setStableTimestampForStorage(WithLock lk) {
// after leaving initial sync, since the initialDataTimestamp and oldest timestamp will be equal
// after initial sync oplog application has completed.
auto initialDataTimestamp = _service->getStorageEngine()->getInitialDataTimestamp();
if (stableOpTime && stableOpTime->opTime.getTimestamp() < initialDataTimestamp) {
if (stableOpTime.getTimestamp() < initialDataTimestamp) {
LOGV2_DEBUG(2139504,
2,
"Not updating stable timestamp since it is less than the initialDataTimestamp",
"stableTimestamp"_attr = stableOpTime->opTime.getTimestamp(),
"stableTimestamp"_attr = stableOpTime.getTimestamp(),
"initialDataTimestamp"_attr = initialDataTimestamp);
return;
}
if (stableOpTime && stableOpTime->opTime.getTimestamp().isNull()) {
if (stableOpTime.getTimestamp().isNull()) {
LOGV2_DEBUG(2139502, 2, "Not updating stable timestamp to a null timestamp");
return;
}
// If there is a valid stable optime, set it for the storage engine, and then remove any
// old, unneeded stable optime candidates.
if (stableOpTime) {
LOGV2_DEBUG(21396,
2,
"Setting replication's stable optime to {stableOpTime}",
"Setting replication's stable optime",
"stableOpTime"_attr = stableOpTime.value());
if (gTestingSnapshotBehaviorInIsolation) {
return;
}
if (!gTestingSnapshotBehaviorInIsolation) {
// Update committed snapshot and wake up any threads waiting on read concern or
// write concern.
if (serverGlobalParams.enableMajorityReadConcern) {
// When majority read concern is enabled, the committed snapshot is set to the new
// stable optime.
if (_updateCommittedSnapshot(lk, stableOpTime.value())) {
// Update the stable timestamp for the storage engine.
_storage->setStableTimestamp(getServiceContext(),
stableOpTime->opTime.getTimestamp());
}
} else {
const auto lastCommittedOpTime = _topCoord->getLastCommittedOpTimeAndWallTime();
if (!lastCommittedOpTime.opTime.isNull()) {
// When majority read concern is disabled, we set the stable timestamp to
// be less than or equal to the all committed timestamp. This makes sure that
// the committed snapshot is not past the all committed timestamp to guarantee
// we can always read our own majority committed writes. This problem is
// specific to the case where we have a single node replica set and the
// lastCommittedOpTime is set to be the lastApplied which can be ahead of the
// allCommitted.
auto newCommittedSnapshot = std::min(lastCommittedOpTime, *stableOpTime);
_updateCommittedSnapshot(lk, newCommittedSnapshot);
}
// Set the stable timestamp regardless of whether the majority commit point moved
// forward. If we are in rollback state, however, do not alter the stable timestamp,
// since it may be moved backwards explicitly by the rollback-via-refetch process.
if (!MONGO_unlikely(disableSnapshotting.shouldFail()) && !_memberState.rollback()) {
_storage->setStableTimestamp(getServiceContext(),
stableOpTime->opTime.getTimestamp());
}
}
// Set the stable timestamp and update the committed snapshot.
LOGV2_DEBUG(21396,
2,
"Setting replication's stable optime to {stableOpTime}",
"Setting replication's stable optime",
"stableOpTime"_attr = stableOpTime);
// Update committed snapshot and wake up any threads waiting on read concern or
// write concern.
if (serverGlobalParams.enableMajorityReadConcern) {
// When majority read concern is enabled, the committed snapshot is set to the new
// stable optime. The wall time of the committed snapshot is not used for anything so we can
// create a fake one.
if (_updateCommittedSnapshot(lk, OpTimeAndWallTime(stableOpTime, Date_t()))) {
// Update the stable timestamp for the storage engine.
_storage->setStableTimestamp(getServiceContext(), stableOpTime.getTimestamp());
}
} else {
const auto lastCommittedOpTime = _topCoord->getLastCommittedOpTime();
if (!lastCommittedOpTime.isNull()) {
// When majority read concern is disabled, we set the stable timestamp to be less than
// or equal to the all-durable timestamp. This makes sure that the committed snapshot is
// not past the all-durable timestamp to guarantee we can always read our own majority
// committed writes. This problem is specific to the case where we have a single node
// replica set and the lastCommittedOpTime is set to be the lastApplied which can be
// ahead of the all-durable.
OpTime newCommittedSnapshot = std::min(lastCommittedOpTime, stableOpTime);
// The wall clock time of the committed snapshot is not used for anything so we can
// create a fake one.
_updateCommittedSnapshot(lk, OpTimeAndWallTime(newCommittedSnapshot, Date_t()));
}
// Set the stable timestamp regardless of whether the majority commit point moved
// forward. If we are in rollback state, however, do not alter the stable timestamp,
// since it may be moved backwards explicitly by the rollback-via-refetch process.
if (!MONGO_unlikely(disableSnapshotting.shouldFail()) && !_memberState.rollback()) {
_storage->setStableTimestamp(getServiceContext(), stableOpTime.getTimestamp());
}
}
}

View File

@@ -1338,10 +1338,9 @@ private:
bool _updateCommittedSnapshot(WithLock lk, const OpTimeAndWallTime& newCommittedSnapshot);
/**
* A helper method that returns the current stable optime based on the current commit point and
* set of stable optime candidates.
* A helper method that returns the current stable optime based on the current commit point.
*/
boost::optional<OpTimeAndWallTime> _recalculateStableOpTime(WithLock lk);
OpTime _recalculateStableOpTime(WithLock lk);
/**
* Calculates and sets the value of the 'stable' replication optime for the storage engine.

View File

@@ -199,10 +199,6 @@ ServiceEntryPoint* ServiceContext::getServiceEntryPoint() const {
return _serviceEntryPoint.get();
}
transport::ServiceExecutor* ServiceContext::getServiceExecutor() const {
return _serviceExecutor.get();
}
void ServiceContext::setStorageEngine(std::unique_ptr<StorageEngine> engine) {
invariant(engine);
invariant(!_storageEngine);
@@ -233,10 +229,6 @@ void ServiceContext::setTransportLayer(std::unique_ptr<transport::TransportLayer
_transportLayer = std::move(tl);
}
void ServiceContext::setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec) {
_serviceExecutor = std::move(exec);
}
void ServiceContext::ClientDeleter::operator()(Client* client) const {
ServiceContext* const service = client->getServiceContext();
{

View File

@@ -487,14 +487,6 @@ public:
*/
ServiceEntryPoint* getServiceEntryPoint() const;
/**
* Get the service executor for the service context.
*
* See ServiceStateMachine for how this is used. Some configurations may not have a service
* executor registered and this will return a nullptr.
*/
transport::ServiceExecutor* getServiceExecutor() const;
/**
* Waits for the ServiceContext to be fully initialized and for all TransportLayers to have been
* added/started.
@@ -579,11 +571,6 @@ public:
*/
void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl);
/**
* Binds the service executor to the service context
*/
void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec);
/**
* Creates a delayed execution baton with basic functionality
*/

View File

@@ -371,16 +371,6 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) {
}
}
// Shutdown and wait for the service executor to exit
if (auto svcExec = serviceContext->getServiceExecutor()) {
Status status = svcExec->shutdown(Seconds(5));
if (!status.isOK()) {
LOGV2_OPTIONS(22845,
{LogComponent::kNetwork},
"Service executor did not shutdown within the time limit",
"error"_attr = status);
}
}
#endif
// Shutdown Full-Time Data Capture
@@ -788,15 +778,6 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
std::make_unique<SessionsCollectionSharded>(),
RouterSessionCatalog::reapSessionsOlderThan));
status = serviceContext->getServiceExecutor()->start();
if (!status.isOK()) {
LOGV2_ERROR(22859,
"Error starting service executor: {error}",
"Error starting service executor",
"error"_attr = redact(status));
return EXIT_NET_ERROR;
}
status = serviceContext->getServiceEntryPoint()->start();
if (!status.isOK()) {
LOGV2_ERROR(22860,

View File

@@ -478,10 +478,6 @@ int bridgeMain(int argc, char** argv) {
setGlobalServiceContext(ServiceContext::make());
auto serviceContext = getGlobalServiceContext();
serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointBridge>(serviceContext));
serviceContext->setServiceExecutor(
std::make_unique<transport::ServiceExecutorSynchronous>(serviceContext));
fassert(50766, serviceContext->getServiceExecutor()->start());
transport::TransportLayerASIO::Options opts;
opts.ipList.emplace_back("0.0.0.0");

View File

@@ -96,6 +96,7 @@ tlEnv.Library(
],
LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/idl/server_parameter",
"$BUILD_DIR/mongo/db/server_options_core",
"$BUILD_DIR/mongo/util/concurrency/thread_pool",
"$BUILD_DIR/mongo/util/processinfo",
'$BUILD_DIR/third_party/shim_asio',
@@ -178,7 +179,7 @@ tlEnv.CppUnitTest(
'transport_layer_asio_test.cpp',
'service_executor_test.cpp',
'max_conns_override_test.cpp',
'service_state_machine_test.cpp',
# 'service_state_machine_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',

View File

@@ -122,12 +122,26 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s
}
Status ServiceEntryPointImpl::start() {
if (_adminInternalPool)
return _adminInternalPool->start();
else
return Status::OK();
if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
!status.isOK()) {
return status;
}
if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx); exec) {
if (auto status = exec->start(); !status.isOK()) {
return status;
}
}
// TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
// if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
// return status;
// }
return Status::OK();
}
// TODO: explicitly start on the fixed executor
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
const auto& remoteAddr = session->remoteAddr();
@@ -140,7 +154,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
const bool quiet = serverGlobalParams.quiet.load();
size_t connectionCount;
auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode();
auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
auto usingMaxConnOverride = false;
@@ -168,8 +182,9 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
"connectionCount"_attr = connectionCount);
}
return;
} else if (usingMaxConnOverride && _adminInternalPool) {
ssm->setServiceExecutor(_adminInternalPool.get());
} else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx);
usingMaxConnOverride && exec) {
ssm->setServiceExecutor(exec);
}
if (!quiet) {
@@ -256,6 +271,27 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
"shutdown: exhausted grace period active workers to drain; continuing with shutdown...",
"workers"_attr = numOpenSessions());
}
lk.unlock();
// TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
// if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout -
// timeSpent);
// !status.isOK()) {
// LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
// }
if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
if (auto status = exec->shutdown(timeout - timeSpent); !status.isOK()) {
LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
}
}
if (auto status =
transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent);
!status.isOK()) {
LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
}
return result;
}

View File

@@ -36,7 +36,9 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/variant.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/net/cidr.h"

View File

@@ -80,6 +80,18 @@ Status ServiceExecutorFixed::start() {
return Status::OK();
}
const auto getServiceExecutorFixed =
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>();
ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) {
auto& ref = getServiceExecutorFixed(ctx);
if (!ref) {
ThreadPool::Options options{};
ref = std::make_unique<ServiceExecutorFixed>(options);
}
return ref.get();
}
Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
auto waitForShutdown = [&]() mutable -> Status {
stdx::unique_lock<Latch> lk(_mutex);

View File

@@ -32,6 +32,7 @@
#include <memory>
#include "mongo/base/status.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -54,6 +55,8 @@ public:
explicit ServiceExecutorFixed(ThreadPool::Options options);
virtual ~ServiceExecutorFixed();
static ServiceExecutorFixed* get(ServiceContext* ctx);
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status schedule(Task task, ScheduleFlags flags) override;

View File

@@ -33,6 +33,7 @@
#include "mongo/transport/service_executor_reserved.h"
#include "mongo/db/server_options.h"
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_entry_point_utils.h"
@@ -147,6 +148,20 @@ Status ServiceExecutorReserved::_startWorker() {
});
}
const auto getServiceExecutorReserved =
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>();
ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) {
auto& ref = getServiceExecutorReserved(ctx);
if (!ref) {
if (serverGlobalParams.reservedAdminThreads) {
ref = std::make_unique<transport::ServiceExecutorReserved>(
ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
} else
return nullptr;
}
return ref.get();
}
Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
LOGV2_DEBUG(22980, 3, "Shutting down reserved executor");
@@ -173,8 +188,8 @@ Status ServiceExecutorReserved::schedule(Task task, ScheduleFlags flags) {
if (!_localWorkQueue.empty()) {
// Execute task directly (recurse) if allowed by the caller as it produced better
// performance in testing. Try to limit the amount of recursion so we don't blow up the
// stack, even though this shouldn't happen with this executor that uses blocking network
// I/O.
// stack, even though this shouldn't happen with this executor that uses blocking
// network I/O.
if ((flags & ScheduleFlags::kMayRecurse) &&
(_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) {
++_localRecursionDepth;

View File

@@ -32,6 +32,7 @@
#include <deque>
#include "mongo/base/status.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -54,6 +55,8 @@ class ServiceExecutorReserved final : public ServiceExecutor {
public:
explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads);
static ServiceExecutorReserved* get(ServiceContext* ctx);
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status schedule(Task task, ScheduleFlags flags) override;

View File

@@ -78,6 +78,17 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
"passthrough executor couldn't shutdown all worker threads within time limit.");
}
const auto getServiceExecutorSynchronous =
ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
auto& ref = getServiceExecutorSynchronous(ctx);
if (!ref) {
ref = std::make_unique<ServiceExecutorSynchronous>();
}
return ref.get();
}
Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) {
if (!_stillRunning.load()) {
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};

View File

@@ -32,6 +32,7 @@
#include <deque>
#include "mongo/base/status.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -47,7 +48,9 @@ namespace transport {
*/
class ServiceExecutorSynchronous final : public ServiceExecutor {
public:
explicit ServiceExecutorSynchronous(ServiceContext* ctx);
explicit ServiceExecutorSynchronous(ServiceContext* ctx = getGlobalServiceContext());
static ServiceExecutorSynchronous* get(ServiceContext* ctx);
Status start() override;
Status shutdown(Milliseconds timeout) override;

View File

@@ -45,6 +45,7 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
@@ -300,11 +301,11 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
_sep{svcContext->getServiceEntryPoint()},
_transportMode(transportMode),
_serviceContext(svcContext),
_serviceExecutor(_serviceContext->getServiceExecutor()),
_sessionHandle(session),
_threadName{str::stream() << "conn" << _session()->id()},
_dbClient{svcContext->makeClient(_threadName, std::move(session))},
_dbClientPtr{_dbClient.get()} {}
_dbClientPtr{_dbClient.get()},
_serviceExecutor(transport::ServiceExecutorSynchronous::get(_serviceContext)) {}
const transport::SessionHandle& ServiceStateMachine::_session() const {
return _sessionHandle;

View File

@@ -230,12 +230,12 @@ private:
transport::Mode _transportMode;
ServiceContext* const _serviceContext;
transport::ServiceExecutor* _serviceExecutor;
transport::SessionHandle _sessionHandle;
const std::string _threadName;
ServiceContext::UniqueClient _dbClient;
const Client* _dbClientPtr;
transport::ServiceExecutor* _serviceExecutor;
std::function<void()> _cleanupHook;
bool _inExhaust = false;

View File

@@ -137,8 +137,6 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
transport::TransportLayerASIO::Options opts(config);
opts.transportMode = transport::Mode::kSynchronous;
ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx));
std::vector<std::unique_ptr<TransportLayer>> retVector;
retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep));
return std::make_unique<TransportLayerManager>(std::move(retVector));