Compare commits
10 Commits
v4.7
...
server-491
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f6c8f380b | ||
|
|
d8d2e1cbb1 | ||
|
|
6aa14e0c59 | ||
|
|
c38756fe5b | ||
|
|
ab93d1193a | ||
|
|
8ebfdfe306 | ||
|
|
a701b85037 | ||
|
|
39ca11ba83 | ||
|
|
1ceca57bca | ||
|
|
e05dccf26b |
52
jstests/replsets/tenant_migration_mtab_blocks_writes.js
Normal file
52
jstests/replsets/tenant_migration_mtab_blocks_writes.js
Normal file
@@ -0,0 +1,52 @@
|
||||
/**
|
||||
* Tests that after donorStartCommand is run, that reads and writes should be blocked for the
|
||||
* migrating tenant.
|
||||
* @tags: [requires_fcv_46]
|
||||
*/
|
||||
|
||||
(function() {
|
||||
"use strict";
|
||||
const runDonorStartMigrationCommand =
|
||||
(primaryConnection, migrationId, recipientConnectionString, dbPrefix, readPreference) => {
|
||||
return primaryConnection.adminCommand({
|
||||
donorStartMigration: 1,
|
||||
migrationId,
|
||||
recipientConnectionString,
|
||||
databasePrefix: dbPrefix,
|
||||
readPreference
|
||||
});
|
||||
};
|
||||
|
||||
// the value of the access enum in MigratingTenantAccessBlocker for the 'kBlockWritesAndReads'
|
||||
// access state.
|
||||
const kBlockReadsAndWrites = 2;
|
||||
const rst = new ReplSetTest({nodes: 1});
|
||||
rst.startSet();
|
||||
rst.initiate();
|
||||
|
||||
const donorPrimary = rst.getPrimary();
|
||||
|
||||
const kMigrationId = new UUID();
|
||||
const kRecipientConnectionString = new ReplSetTest({nodes: 1}).getURL();
|
||||
|
||||
const kReadPreference = {
|
||||
mode: "primary"
|
||||
};
|
||||
const kDBPrefixes = 'databaseABC';
|
||||
|
||||
jsTest.log('Running donorStartMigration command.');
|
||||
assert.commandWorked(runDonorStartMigrationCommand(
|
||||
donorPrimary, kMigrationId, kRecipientConnectionString, kDBPrefixes, kReadPreference));
|
||||
|
||||
jsTest.log('Running the serverStatus command.');
|
||||
const migratingTenantServerStatus =
|
||||
donorPrimary.adminCommand({serverStatus: 1}).migratingTenantAccessBlocker;
|
||||
|
||||
// Due to the way that the state machine works, if both reads and writes are blocked, we know that
|
||||
// at some point only writes were blocked Thus both kBlockWrites and kBlockWritesAndReads are states
|
||||
// that are used.
|
||||
assert.eq(migratingTenantServerStatus.access, kBlockReadsAndWrites);
|
||||
assert(migratingTenantServerStatus.blockTimestamp);
|
||||
|
||||
rst.stopSet();
|
||||
})();
|
||||
@@ -26,62 +26,55 @@
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication;
|
||||
|
||||
|
||||
#include "mongo/db/commands.h"
|
||||
#include "mongo/db/commands/migrate_tenant_cmds.h"
|
||||
#include "mongo/db/commands/migrate_tenant_cmds_gen.h"
|
||||
#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
|
||||
#include "mongo/db/repl/migrating_tenant_donor_util.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
template <typename RequestT>
|
||||
class MigrationDonorCmdBase : public TypedCommand<MigrationDonorCmdBase<RequestT>> {
|
||||
|
||||
class DonorStartMigrationCmd : public TenantMigrationDonorCmdBase<DonorStartMigrationCmd> {
|
||||
public:
|
||||
using Request = RequestT;
|
||||
using TC = TypedCommand<MigrationDonorCmdBase<RequestT>>;
|
||||
using Request = DonorStartMigration;
|
||||
using ParentInvocation = TenantMigrationDonorCmdBase<DonorStartMigrationCmd>::Invocation;
|
||||
class Invocation : public ParentInvocation {
|
||||
using ParentInvocation::ParentInvocation;
|
||||
|
||||
class Invocation : public TC::InvocationBase {
|
||||
public:
|
||||
using TC::InvocationBase::InvocationBase;
|
||||
using TC::InvocationBase::request;
|
||||
void typedRun(OperationContext* opCtx) {
|
||||
const auto requestBody = request();
|
||||
auto donorStateDocument = getDonorStateDocumentFromRequest(requestBody);
|
||||
|
||||
void typedRun(OperationContext* opCtx) {}
|
||||
|
||||
private:
|
||||
bool supportsWriteConcern() const override {
|
||||
return false;
|
||||
}
|
||||
NamespaceString ns() const override {
|
||||
return NamespaceString(request().getDbName(), "");
|
||||
migrating_tenant_donor_util::persistDonorStateDocument(opCtx, donorStateDocument);
|
||||
migrating_tenant_donor_util::dataSync(opCtx, donorStateDocument);
|
||||
}
|
||||
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {}
|
||||
};
|
||||
TenantMigrationDonorDocument getDonorStateDocumentFromRequest(
|
||||
const RequestType& requestBody) {
|
||||
mongo::UUID migrationId = requestBody.getMigrationId();
|
||||
|
||||
bool adminOnly() const override {
|
||||
return true;
|
||||
}
|
||||
std::string recipientURI = requestBody.getRecipientConnectionString().toString();
|
||||
std::string dbPrefix = requestBody.getDatabasePrefix().toString();
|
||||
|
||||
std::string help() const override {
|
||||
return "Multi-tenant migration command on the donor.";
|
||||
}
|
||||
BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return BasicCommand::AllowedOnSecondary::kNever;
|
||||
}
|
||||
};
|
||||
auto donorStartState = TenantMigrationDonorStateEnum::kDataSync;
|
||||
bool garbageCollect = false;
|
||||
const TenantMigrationDonorDocument donorStateDocument(
|
||||
migrationId, recipientURI, dbPrefix, donorStartState, garbageCollect);
|
||||
|
||||
class DonorStartMigrationCmd : public MigrationDonorCmdBase<DonorStartMigration> {
|
||||
public:
|
||||
using ParentInvocation = MigrationDonorCmdBase<DonorStartMigration>::Invocation;
|
||||
class Invocation final : public ParentInvocation {
|
||||
public:
|
||||
void typedRun(OperationContext* opCtx) {}
|
||||
return donorStateDocument;
|
||||
}
|
||||
|
||||
private:
|
||||
void doCheckAuthorization(OperationContext* opCtx) const override {}
|
||||
void doCheckAuthorization(OperationContext* opCtx) const {}
|
||||
};
|
||||
|
||||
std::string help() const override {
|
||||
std::string help() const {
|
||||
return "Start migrating databases whose names match the specified prefix to the specified "
|
||||
"replica set.";
|
||||
}
|
||||
@@ -89,12 +82,19 @@ public:
|
||||
} donorStartMigrationCmd;
|
||||
|
||||
class DonorWaitForMigrationToCommitCmd
|
||||
: public MigrationDonorCmdBase<DonorWaitForMigrationToCommit> {
|
||||
: public TenantMigrationDonorCmdBase<DonorWaitForMigrationToCommitCmd> {
|
||||
public:
|
||||
using ParentInvocation = MigrationDonorCmdBase<DonorWaitForMigrationToCommit>::Invocation;
|
||||
class Invocation final : public ParentInvocation {
|
||||
using Request = DonorWaitForMigrationToCommit;
|
||||
using ParentInvocation =
|
||||
TenantMigrationDonorCmdBase<DonorWaitForMigrationToCommitCmd>::Invocation;
|
||||
class Invocation : public ParentInvocation {
|
||||
using ParentInvocation::ParentInvocation;
|
||||
|
||||
public:
|
||||
void typedRun(OperationContext* opCtx) {}
|
||||
|
||||
private:
|
||||
void doCheckAuthorization(OperationContext* opCtx) const {}
|
||||
};
|
||||
|
||||
std::string help() const override {
|
||||
@@ -103,12 +103,18 @@ public:
|
||||
|
||||
} donorWaitForMigrationToCommit;
|
||||
|
||||
class DonorForgetMigrationCmd : public MigrationDonorCmdBase<DonorForgetMigration> {
|
||||
class DonorForgetMigrationCmd : public TenantMigrationDonorCmdBase<DonorForgetMigrationCmd> {
|
||||
public:
|
||||
using ParentInvocation = MigrationDonorCmdBase<DonorWaitForMigrationToCommit>::Invocation;
|
||||
class Invocation final : public ParentInvocation {
|
||||
using Request = DonorForgetMigration;
|
||||
using ParentInvocation = TenantMigrationDonorCmdBase<DonorForgetMigrationCmd>::Invocation;
|
||||
class Invocation : public ParentInvocation {
|
||||
using ParentInvocation::ParentInvocation;
|
||||
|
||||
public:
|
||||
void typedRun(OperationContext* opCtx) {}
|
||||
|
||||
private:
|
||||
void doCheckAuthorization(OperationContext* opCtx) const {}
|
||||
};
|
||||
|
||||
std::string help() const override {
|
||||
|
||||
60
src/mongo/db/commands/migrate_tenant_cmds.h
Normal file
60
src/mongo/db/commands/migrate_tenant_cmds.h
Normal file
@@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Copyright (C) 2020-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
|
||||
#include "mongo/db/commands.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
template <typename DerivedT>
|
||||
class TenantMigrationDonorCmdBase : public TypedCommand<DerivedT> {
|
||||
public:
|
||||
using TC = TypedCommand<DerivedT>;
|
||||
|
||||
class Invocation : public TC::InvocationBase {
|
||||
using TC::InvocationBase::InvocationBase;
|
||||
|
||||
private:
|
||||
bool supportsWriteConcern() const override {
|
||||
return false;
|
||||
}
|
||||
NamespaceString ns() const {
|
||||
return NamespaceString(TC::InvocationBase::request().getDbName(), "");
|
||||
}
|
||||
};
|
||||
|
||||
bool adminOnly() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
|
||||
return BasicCommand::AllowedOnSecondary::kNever;
|
||||
}
|
||||
};
|
||||
} // namespace mongo
|
||||
@@ -1227,6 +1227,7 @@ env.Library(
|
||||
'$BUILD_DIR/mongo/db/commands/server_status',
|
||||
'$BUILD_DIR/mongo/db/dbhelpers',
|
||||
'$BUILD_DIR/mongo/db/service_context',
|
||||
'$BUILD_DIR/mongo/db/rw_concern_d',
|
||||
'$BUILD_DIR/mongo/executor/network_interface_factory',
|
||||
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
|
||||
'local_oplog_info',
|
||||
|
||||
@@ -52,12 +52,9 @@ structs:
|
||||
strict: true
|
||||
fields:
|
||||
_id:
|
||||
type: objectid
|
||||
description: "A unique identifier for the document."
|
||||
cpp_name: id
|
||||
migrationId:
|
||||
type: uuid
|
||||
description: "Unique identifier for the tenant migration."
|
||||
cpp_name: id
|
||||
recipientConnectionString:
|
||||
type: string
|
||||
description: "The URI string that the donor will utilize to create a connection with the recipient."
|
||||
@@ -82,12 +79,9 @@ structs:
|
||||
strict: true
|
||||
fields:
|
||||
_id:
|
||||
type: objectid
|
||||
description: "A unique identifier for the document."
|
||||
cpp_name: id
|
||||
migrationId:
|
||||
type: uuid
|
||||
description: "Unique identifier for the tenant migration."
|
||||
cpp_name: id
|
||||
donorConnectionString:
|
||||
type: string
|
||||
description: "The URI string that the donor will utilize to create a connection with the recipient."
|
||||
|
||||
@@ -27,7 +27,10 @@
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication;
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
#include "mongo/util/str.h"
|
||||
|
||||
#include "mongo/db/repl/migrating_tenant_donor_util.h"
|
||||
|
||||
@@ -36,10 +39,13 @@
|
||||
#include "mongo/db/dbhelpers.h"
|
||||
#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
|
||||
#include "mongo/db/repl/migrating_tenant_access_blocker_by_prefix.h"
|
||||
#include "mongo/db/s/persistent_task_store.h"
|
||||
#include "mongo/executor/network_interface_factory.h"
|
||||
#include "mongo/executor/thread_pool_task_executor.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/concurrency/thread_pool.h"
|
||||
|
||||
|
||||
namespace mongo {
|
||||
|
||||
namespace migrating_tenant_donor_util {
|
||||
@@ -55,8 +61,10 @@ const char kNetName[] = "TenantMigrationWorkerNetwork";
|
||||
* state.
|
||||
*/
|
||||
void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorDoc) {
|
||||
LOGV2(4917300, "Reached the beginning of the onTransitionToBlocking");
|
||||
invariant(donorDoc.getState() == TenantMigrationDonorStateEnum::kBlocking);
|
||||
invariant(donorDoc.getBlockTimestamp());
|
||||
LOGV2(4917300, "Passed invariants of the onTransitionToBlocking");
|
||||
|
||||
auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(opCtx->getServiceContext());
|
||||
auto mtab = mtabByPrefix.getMigratingTenantBlocker(donorDoc.getDatabasePrefix());
|
||||
@@ -83,15 +91,98 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
|
||||
mtab->startBlockingReadsAfter(donorDoc.getBlockTimestamp().get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a MigratingTenantAccess blocker, and makes it start blocking writes. Then adds it to
|
||||
* the MigratingTenantAccessBlockerByPrefix container using the donor document's databasePrefix as
|
||||
* the key.
|
||||
*/
|
||||
void startTenantMigrationBlockOnPrimary(OperationContext* opCtx,
|
||||
const TenantMigrationDonorDocument& donorStateDocument) {
|
||||
invariant(donorStateDocument.getState() == TenantMigrationDonorStateEnum::kDataSync);
|
||||
auto serviceContext = opCtx->getServiceContext();
|
||||
|
||||
executor::TaskExecutor* mtabExecutor = getTenantMigrationExecutor(serviceContext).get();
|
||||
auto mtab = std::make_shared<MigratingTenantAccessBlocker>(serviceContext, mtabExecutor);
|
||||
|
||||
mtab->startBlockingWrites();
|
||||
|
||||
auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(serviceContext);
|
||||
mtabByPrefix.add(donorStateDocument.getDatabasePrefix(), mtab);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an updated donor state document that will be the same as the original except with the
|
||||
* state set to blocking and the blockTimestamp set to the reserved Optime.
|
||||
*/
|
||||
TenantMigrationDonorDocument createUpdatedDonorStateDocument(
|
||||
const TenantMigrationDonorDocument& originalDonorStateDocument, const OplogSlot& oplogSlot) {
|
||||
TenantMigrationDonorDocument updatedDoc = originalDonorStateDocument;
|
||||
updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking);
|
||||
updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp());
|
||||
return updatedDoc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the arguments object for the update command with the _id as the criteria and the reserved
|
||||
* opTime as the oplogSlot.
|
||||
*/
|
||||
CollectionUpdateArgs createUpdateArgumentsForDonorStateDocument(
|
||||
const TenantMigrationDonorDocument& originalDonorStateDocument,
|
||||
const OplogSlot& oplogSlot,
|
||||
const BSONObj& serializedUpdatedDonorStateDocument) {
|
||||
CollectionUpdateArgs args;
|
||||
args.criteria = BSON("_id" << originalDonorStateDocument.getId());
|
||||
args.oplogSlot = oplogSlot;
|
||||
args.update = serializedUpdatedDonorStateDocument;
|
||||
|
||||
return args;
|
||||
}
|
||||
|
||||
/**
|
||||
* After reserving the opTime for the write and creating the new updated document with the necessary
|
||||
* update arguments. It will send the update command to the tenant migration donors collection.
|
||||
*/
|
||||
void updateDonorStateDocument(OperationContext* opCtx,
|
||||
Collection* collection,
|
||||
const TenantMigrationDonorDocument& originalDonorStateDocument) {
|
||||
|
||||
const auto originalRecordId = Helpers::findOne(
|
||||
opCtx, collection, originalDonorStateDocument.toBSON(), false /* requireIndex */);
|
||||
invariant(!originalRecordId.isNull());
|
||||
|
||||
// Reserve an opTime for the write and use it as the blockTimestamp for the migration.
|
||||
auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
|
||||
|
||||
BSONObj serializedUpdatedDonorDoc =
|
||||
createUpdatedDonorStateDocument(originalDonorStateDocument, oplogSlot).toBSON();
|
||||
|
||||
CollectionUpdateArgs args = createUpdateArgumentsForDonorStateDocument(
|
||||
originalDonorStateDocument, oplogSlot, serializedUpdatedDonorDoc);
|
||||
|
||||
const auto originalSnapshot = Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(),
|
||||
originalDonorStateDocument.toBSON());
|
||||
collection->updateDocument(opCtx,
|
||||
originalRecordId,
|
||||
originalSnapshot,
|
||||
serializedUpdatedDonorDoc,
|
||||
false,
|
||||
nullptr /* OpDebug* */,
|
||||
&args);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDoc) {
|
||||
/**
|
||||
* TODO - Implement recipientSyncData command
|
||||
*/
|
||||
void dataSync(OperationContext* opCtx,
|
||||
const TenantMigrationDonorDocument& originalDonorStateDocument) {
|
||||
invariant(originalDonorStateDocument.getState() == TenantMigrationDonorStateEnum::kDataSync);
|
||||
|
||||
// Send recipientSyncData.
|
||||
|
||||
// Call startBlockingWrites.
|
||||
startTenantMigrationBlockOnPrimary(opCtx, originalDonorStateDocument);
|
||||
|
||||
// Update the on-disk state of the migration to "blocking" state.
|
||||
invariant(originalDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
|
||||
|
||||
uassertStatusOK(writeConflictRetry(
|
||||
opCtx,
|
||||
@@ -109,40 +200,14 @@ void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& origi
|
||||
}
|
||||
|
||||
WriteUnitOfWork wuow(opCtx);
|
||||
|
||||
const auto originalRecordId =
|
||||
Helpers::findOne(opCtx, collection, originalDoc.toBSON(), false /* requireIndex */);
|
||||
invariant(!originalRecordId.isNull());
|
||||
|
||||
// Reserve an opTime for the write and use it as the blockTimestamp for the migration.
|
||||
auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
|
||||
|
||||
TenantMigrationDonorDocument updatedDoc;
|
||||
updatedDoc.setId(originalDoc.getId());
|
||||
updatedDoc.setDatabasePrefix(originalDoc.getDatabasePrefix());
|
||||
updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking);
|
||||
updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp());
|
||||
|
||||
CollectionUpdateArgs args;
|
||||
args.update = updatedDoc.toBSON();
|
||||
args.criteria = BSON("_id" << originalDoc.getId());
|
||||
args.oplogSlot = oplogSlot;
|
||||
|
||||
collection->updateDocument(
|
||||
opCtx,
|
||||
originalRecordId,
|
||||
Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(), originalDoc.toBSON()),
|
||||
updatedDoc.toBSON(),
|
||||
false,
|
||||
nullptr /* OpDebug* */,
|
||||
&args);
|
||||
|
||||
updateDonorStateDocument(opCtx, collection, originalDonorStateDocument);
|
||||
wuow.commit();
|
||||
|
||||
return Status::OK();
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContext* serviceContext) {
|
||||
ThreadPool::Options tpOptions;
|
||||
tpOptions.threadNamePrefix = kThreadNamePrefix;
|
||||
@@ -157,14 +222,16 @@ std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContex
|
||||
executor::makeNetworkInterface(kNetName, nullptr, nullptr));
|
||||
}
|
||||
|
||||
void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc) {
|
||||
auto donorDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"), doc);
|
||||
void onTenantMigrationDonorStateTransition(OperationContext* opCtx,
|
||||
const BSONObj& serializedDonorStateDocument) {
|
||||
auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"),
|
||||
serializedDonorStateDocument);
|
||||
|
||||
switch (donorDoc.getState()) {
|
||||
switch (donorStateDoc.getState()) {
|
||||
case TenantMigrationDonorStateEnum::kDataSync:
|
||||
break;
|
||||
case TenantMigrationDonorStateEnum::kBlocking:
|
||||
onTransitionToBlocking(opCtx, donorDoc);
|
||||
onTransitionToBlocking(opCtx, donorStateDoc);
|
||||
break;
|
||||
case TenantMigrationDonorStateEnum::kCommitted:
|
||||
break;
|
||||
@@ -175,6 +242,27 @@ void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONOb
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The function will persist(insert) the provided donorStateDocument into the config data on the
|
||||
* collection for the tenantMigration donors In order to maintain a stable state for the tenant
|
||||
* migration in case of node failure or restart.
|
||||
*/
|
||||
void persistDonorStateDocument(OperationContext* opCtx,
|
||||
const TenantMigrationDonorDocument& donorStateDocument) {
|
||||
PersistentTaskStore<TenantMigrationDonorDocument> store(
|
||||
NamespaceString::kMigrationDonorsNamespace);
|
||||
try {
|
||||
store.add(opCtx, donorStateDocument);
|
||||
} catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
|
||||
uasserted(
|
||||
4917300,
|
||||
str::stream()
|
||||
<< "While attempting to persist the donor state machine for tenant migration"
|
||||
<< ", found another document with the same migration id. Attempted migration: "
|
||||
<< donorStateDocument.toBSON());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace migrating_tenant_donor_util
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
@@ -41,7 +41,8 @@ namespace migrating_tenant_donor_util {
|
||||
* Sends recipientSyncData to the recipient until success and starts blocking writes and causal
|
||||
* reads.
|
||||
*/
|
||||
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& donorDoc);
|
||||
void dataSync(OperationContext* opCtx,
|
||||
const TenantMigrationDonorDocument& originalDonorStateDocument);
|
||||
|
||||
/**
|
||||
* Creates a task executor to be used for tenant migration.
|
||||
@@ -54,6 +55,8 @@ std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContex
|
||||
*/
|
||||
void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc);
|
||||
|
||||
void persistDonorStateDocument(OperationContext* opCtx,
|
||||
const TenantMigrationDonorDocument& donorStateDocument);
|
||||
} // namespace migrating_tenant_donor_util
|
||||
|
||||
} // namespace mongo
|
||||
|
||||
Reference in New Issue
Block a user