Compare commits

...

10 Commits

Author SHA1 Message Date
Luis Osta
4f6c8f380b formatting 2020-07-17 15:26:06 +00:00
Luis Osta
d8d2e1cbb1 changed name 2020-07-17 15:26:06 +00:00
Luis Osta
6aa14e0c59 fixed compilation issue 2020-07-17 15:26:06 +00:00
Luis Osta
c38756fe5b formatting 2020-07-17 15:26:06 +00:00
Luis Osta
ab93d1193a work in progress of cleaned code 2020-07-17 15:26:06 +00:00
Luis Osta
8ebfdfe306 added asserts to test 2020-07-17 15:26:06 +00:00
Luis Osta
a701b85037 implemented basic persistance and fixed class command issue 2020-07-17 15:26:06 +00:00
Luis Osta
39ca11ba83 updated doc with new types 2020-07-17 15:26:06 +00:00
Luis Osta
1ceca57bca added basic function for mtab blocking 2020-07-17 15:26:06 +00:00
Luis Osta
e05dccf26b WIP startBlockWrites 2020-07-17 15:26:06 +00:00
7 changed files with 290 additions and 86 deletions

View File

@@ -0,0 +1,52 @@
/**
* Tests that after donorStartCommand is run, that reads and writes should be blocked for the
* migrating tenant.
* @tags: [requires_fcv_46]
*/
(function() {
"use strict";
const runDonorStartMigrationCommand =
(primaryConnection, migrationId, recipientConnectionString, dbPrefix, readPreference) => {
return primaryConnection.adminCommand({
donorStartMigration: 1,
migrationId,
recipientConnectionString,
databasePrefix: dbPrefix,
readPreference
});
};
// the value of the access enum in MigratingTenantAccessBlocker for the 'kBlockWritesAndReads'
// access state.
const kBlockReadsAndWrites = 2;
const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
const donorPrimary = rst.getPrimary();
const kMigrationId = new UUID();
const kRecipientConnectionString = new ReplSetTest({nodes: 1}).getURL();
const kReadPreference = {
mode: "primary"
};
const kDBPrefixes = 'databaseABC';
jsTest.log('Running donorStartMigration command.');
assert.commandWorked(runDonorStartMigrationCommand(
donorPrimary, kMigrationId, kRecipientConnectionString, kDBPrefixes, kReadPreference));
jsTest.log('Running the serverStatus command.');
const migratingTenantServerStatus =
donorPrimary.adminCommand({serverStatus: 1}).migratingTenantAccessBlocker;
// Due to the way that the state machine works, if both reads and writes are blocked, we know that
// at some point only writes were blocked Thus both kBlockWrites and kBlockWritesAndReads are states
// that are used.
assert.eq(migratingTenantServerStatus.access, kBlockReadsAndWrites);
assert(migratingTenantServerStatus.blockTimestamp);
rst.stopSet();
})();

View File

@@ -26,62 +26,55 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication;
#include "mongo/db/commands.h"
#include "mongo/db/commands/migrate_tenant_cmds.h"
#include "mongo/db/commands/migrate_tenant_cmds_gen.h"
#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
#include "mongo/db/repl/migrating_tenant_donor_util.h"
#include "mongo/logv2/log.h"
namespace mongo {
namespace {
template <typename RequestT>
class MigrationDonorCmdBase : public TypedCommand<MigrationDonorCmdBase<RequestT>> {
class DonorStartMigrationCmd : public TenantMigrationDonorCmdBase<DonorStartMigrationCmd> {
public:
using Request = RequestT;
using TC = TypedCommand<MigrationDonorCmdBase<RequestT>>;
using Request = DonorStartMigration;
using ParentInvocation = TenantMigrationDonorCmdBase<DonorStartMigrationCmd>::Invocation;
class Invocation : public ParentInvocation {
using ParentInvocation::ParentInvocation;
class Invocation : public TC::InvocationBase {
public:
using TC::InvocationBase::InvocationBase;
using TC::InvocationBase::request;
void typedRun(OperationContext* opCtx) {
const auto requestBody = request();
auto donorStateDocument = getDonorStateDocumentFromRequest(requestBody);
void typedRun(OperationContext* opCtx) {}
private:
bool supportsWriteConcern() const override {
return false;
}
NamespaceString ns() const override {
return NamespaceString(request().getDbName(), "");
migrating_tenant_donor_util::persistDonorStateDocument(opCtx, donorStateDocument);
migrating_tenant_donor_util::dataSync(opCtx, donorStateDocument);
}
void doCheckAuthorization(OperationContext* opCtx) const override {}
};
TenantMigrationDonorDocument getDonorStateDocumentFromRequest(
const RequestType& requestBody) {
mongo::UUID migrationId = requestBody.getMigrationId();
bool adminOnly() const override {
return true;
}
std::string recipientURI = requestBody.getRecipientConnectionString().toString();
std::string dbPrefix = requestBody.getDatabasePrefix().toString();
std::string help() const override {
return "Multi-tenant migration command on the donor.";
}
BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return BasicCommand::AllowedOnSecondary::kNever;
}
};
auto donorStartState = TenantMigrationDonorStateEnum::kDataSync;
bool garbageCollect = false;
const TenantMigrationDonorDocument donorStateDocument(
migrationId, recipientURI, dbPrefix, donorStartState, garbageCollect);
class DonorStartMigrationCmd : public MigrationDonorCmdBase<DonorStartMigration> {
public:
using ParentInvocation = MigrationDonorCmdBase<DonorStartMigration>::Invocation;
class Invocation final : public ParentInvocation {
public:
void typedRun(OperationContext* opCtx) {}
return donorStateDocument;
}
private:
void doCheckAuthorization(OperationContext* opCtx) const override {}
void doCheckAuthorization(OperationContext* opCtx) const {}
};
std::string help() const override {
std::string help() const {
return "Start migrating databases whose names match the specified prefix to the specified "
"replica set.";
}
@@ -89,12 +82,19 @@ public:
} donorStartMigrationCmd;
class DonorWaitForMigrationToCommitCmd
: public MigrationDonorCmdBase<DonorWaitForMigrationToCommit> {
: public TenantMigrationDonorCmdBase<DonorWaitForMigrationToCommitCmd> {
public:
using ParentInvocation = MigrationDonorCmdBase<DonorWaitForMigrationToCommit>::Invocation;
class Invocation final : public ParentInvocation {
using Request = DonorWaitForMigrationToCommit;
using ParentInvocation =
TenantMigrationDonorCmdBase<DonorWaitForMigrationToCommitCmd>::Invocation;
class Invocation : public ParentInvocation {
using ParentInvocation::ParentInvocation;
public:
void typedRun(OperationContext* opCtx) {}
private:
void doCheckAuthorization(OperationContext* opCtx) const {}
};
std::string help() const override {
@@ -103,12 +103,18 @@ public:
} donorWaitForMigrationToCommit;
class DonorForgetMigrationCmd : public MigrationDonorCmdBase<DonorForgetMigration> {
class DonorForgetMigrationCmd : public TenantMigrationDonorCmdBase<DonorForgetMigrationCmd> {
public:
using ParentInvocation = MigrationDonorCmdBase<DonorWaitForMigrationToCommit>::Invocation;
class Invocation final : public ParentInvocation {
using Request = DonorForgetMigration;
using ParentInvocation = TenantMigrationDonorCmdBase<DonorForgetMigrationCmd>::Invocation;
class Invocation : public ParentInvocation {
using ParentInvocation::ParentInvocation;
public:
void typedRun(OperationContext* opCtx) {}
private:
void doCheckAuthorization(OperationContext* opCtx) const {}
};
std::string help() const override {

View File

@@ -0,0 +1,60 @@
/**
* Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/commands.h"
namespace mongo {
template <typename DerivedT>
class TenantMigrationDonorCmdBase : public TypedCommand<DerivedT> {
public:
using TC = TypedCommand<DerivedT>;
class Invocation : public TC::InvocationBase {
using TC::InvocationBase::InvocationBase;
private:
bool supportsWriteConcern() const override {
return false;
}
NamespaceString ns() const {
return NamespaceString(TC::InvocationBase::request().getDbName(), "");
}
};
bool adminOnly() const override {
return true;
}
BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return BasicCommand::AllowedOnSecondary::kNever;
}
};
} // namespace mongo

View File

@@ -1227,6 +1227,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/rw_concern_d',
'$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'local_oplog_info',

View File

@@ -52,12 +52,9 @@ structs:
strict: true
fields:
_id:
type: objectid
description: "A unique identifier for the document."
cpp_name: id
migrationId:
type: uuid
description: "Unique identifier for the tenant migration."
cpp_name: id
recipientConnectionString:
type: string
description: "The URI string that the donor will utilize to create a connection with the recipient."
@@ -82,12 +79,9 @@ structs:
strict: true
fields:
_id:
type: objectid
description: "A unique identifier for the document."
cpp_name: id
migrationId:
type: uuid
description: "Unique identifier for the tenant migration."
cpp_name: id
donorConnectionString:
type: string
description: "The URI string that the donor will utilize to create a connection with the recipient."

View File

@@ -27,7 +27,10 @@
* it in the license file.
*/
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication;
#include "mongo/platform/basic.h"
#include "mongo/util/str.h"
#include "mongo/db/repl/migrating_tenant_donor_util.h"
@@ -36,10 +39,13 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
#include "mongo/db/repl/migrating_tenant_access_blocker_by_prefix.h"
#include "mongo/db/s/persistent_task_store.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
namespace migrating_tenant_donor_util {
@@ -55,8 +61,10 @@ const char kNetName[] = "TenantMigrationWorkerNetwork";
* state.
*/
void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorDoc) {
LOGV2(4917300, "Reached the beginning of the onTransitionToBlocking");
invariant(donorDoc.getState() == TenantMigrationDonorStateEnum::kBlocking);
invariant(donorDoc.getBlockTimestamp());
LOGV2(4917300, "Passed invariants of the onTransitionToBlocking");
auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(opCtx->getServiceContext());
auto mtab = mtabByPrefix.getMigratingTenantBlocker(donorDoc.getDatabasePrefix());
@@ -83,15 +91,98 @@ void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocumen
mtab->startBlockingReadsAfter(donorDoc.getBlockTimestamp().get());
}
/**
* Creates a MigratingTenantAccess blocker, and makes it start blocking writes. Then adds it to
* the MigratingTenantAccessBlockerByPrefix container using the donor document's databasePrefix as
* the key.
*/
void startTenantMigrationBlockOnPrimary(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDocument) {
invariant(donorStateDocument.getState() == TenantMigrationDonorStateEnum::kDataSync);
auto serviceContext = opCtx->getServiceContext();
executor::TaskExecutor* mtabExecutor = getTenantMigrationExecutor(serviceContext).get();
auto mtab = std::make_shared<MigratingTenantAccessBlocker>(serviceContext, mtabExecutor);
mtab->startBlockingWrites();
auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(serviceContext);
mtabByPrefix.add(donorStateDocument.getDatabasePrefix(), mtab);
}
/**
* Returns an updated donor state document that will be the same as the original except with the
* state set to blocking and the blockTimestamp set to the reserved Optime.
*/
TenantMigrationDonorDocument createUpdatedDonorStateDocument(
const TenantMigrationDonorDocument& originalDonorStateDocument, const OplogSlot& oplogSlot) {
TenantMigrationDonorDocument updatedDoc = originalDonorStateDocument;
updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking);
updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp());
return updatedDoc;
}
/**
* Returns the arguments object for the update command with the _id as the criteria and the reserved
* opTime as the oplogSlot.
*/
CollectionUpdateArgs createUpdateArgumentsForDonorStateDocument(
const TenantMigrationDonorDocument& originalDonorStateDocument,
const OplogSlot& oplogSlot,
const BSONObj& serializedUpdatedDonorStateDocument) {
CollectionUpdateArgs args;
args.criteria = BSON("_id" << originalDonorStateDocument.getId());
args.oplogSlot = oplogSlot;
args.update = serializedUpdatedDonorStateDocument;
return args;
}
/**
* After reserving the opTime for the write and creating the new updated document with the necessary
* update arguments. It will send the update command to the tenant migration donors collection.
*/
void updateDonorStateDocument(OperationContext* opCtx,
Collection* collection,
const TenantMigrationDonorDocument& originalDonorStateDocument) {
const auto originalRecordId = Helpers::findOne(
opCtx, collection, originalDonorStateDocument.toBSON(), false /* requireIndex */);
invariant(!originalRecordId.isNull());
// Reserve an opTime for the write and use it as the blockTimestamp for the migration.
auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
BSONObj serializedUpdatedDonorDoc =
createUpdatedDonorStateDocument(originalDonorStateDocument, oplogSlot).toBSON();
CollectionUpdateArgs args = createUpdateArgumentsForDonorStateDocument(
originalDonorStateDocument, oplogSlot, serializedUpdatedDonorDoc);
const auto originalSnapshot = Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(),
originalDonorStateDocument.toBSON());
collection->updateDocument(opCtx,
originalRecordId,
originalSnapshot,
serializedUpdatedDonorDoc,
false,
nullptr /* OpDebug* */,
&args);
}
} // namespace
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDoc) {
/**
* TODO - Implement recipientSyncData command
*/
void dataSync(OperationContext* opCtx,
const TenantMigrationDonorDocument& originalDonorStateDocument) {
invariant(originalDonorStateDocument.getState() == TenantMigrationDonorStateEnum::kDataSync);
// Send recipientSyncData.
// Call startBlockingWrites.
startTenantMigrationBlockOnPrimary(opCtx, originalDonorStateDocument);
// Update the on-disk state of the migration to "blocking" state.
invariant(originalDoc.getState() == TenantMigrationDonorStateEnum::kDataSync);
uassertStatusOK(writeConflictRetry(
opCtx,
@@ -109,40 +200,14 @@ void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& origi
}
WriteUnitOfWork wuow(opCtx);
const auto originalRecordId =
Helpers::findOne(opCtx, collection, originalDoc.toBSON(), false /* requireIndex */);
invariant(!originalRecordId.isNull());
// Reserve an opTime for the write and use it as the blockTimestamp for the migration.
auto oplogSlot = repl::LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
TenantMigrationDonorDocument updatedDoc;
updatedDoc.setId(originalDoc.getId());
updatedDoc.setDatabasePrefix(originalDoc.getDatabasePrefix());
updatedDoc.setState(TenantMigrationDonorStateEnum::kBlocking);
updatedDoc.setBlockTimestamp(oplogSlot.getTimestamp());
CollectionUpdateArgs args;
args.update = updatedDoc.toBSON();
args.criteria = BSON("_id" << originalDoc.getId());
args.oplogSlot = oplogSlot;
collection->updateDocument(
opCtx,
originalRecordId,
Snapshotted<BSONObj>(opCtx->recoveryUnit()->getSnapshotId(), originalDoc.toBSON()),
updatedDoc.toBSON(),
false,
nullptr /* OpDebug* */,
&args);
updateDonorStateDocument(opCtx, collection, originalDonorStateDocument);
wuow.commit();
return Status::OK();
}));
}
std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContext* serviceContext) {
ThreadPool::Options tpOptions;
tpOptions.threadNamePrefix = kThreadNamePrefix;
@@ -157,14 +222,16 @@ std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContex
executor::makeNetworkInterface(kNetName, nullptr, nullptr));
}
void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc) {
auto donorDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"), doc);
void onTenantMigrationDonorStateTransition(OperationContext* opCtx,
const BSONObj& serializedDonorStateDocument) {
auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"),
serializedDonorStateDocument);
switch (donorDoc.getState()) {
switch (donorStateDoc.getState()) {
case TenantMigrationDonorStateEnum::kDataSync:
break;
case TenantMigrationDonorStateEnum::kBlocking:
onTransitionToBlocking(opCtx, donorDoc);
onTransitionToBlocking(opCtx, donorStateDoc);
break;
case TenantMigrationDonorStateEnum::kCommitted:
break;
@@ -175,6 +242,27 @@ void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONOb
}
}
/**
* The function will persist(insert) the provided donorStateDocument into the config data on the
* collection for the tenantMigration donors In order to maintain a stable state for the tenant
* migration in case of node failure or restart.
*/
void persistDonorStateDocument(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDocument) {
PersistentTaskStore<TenantMigrationDonorDocument> store(
NamespaceString::kMigrationDonorsNamespace);
try {
store.add(opCtx, donorStateDocument);
} catch (const ExceptionFor<ErrorCodes::DuplicateKey>&) {
uasserted(
4917300,
str::stream()
<< "While attempting to persist the donor state machine for tenant migration"
<< ", found another document with the same migration id. Attempted migration: "
<< donorStateDocument.toBSON());
}
}
} // namespace migrating_tenant_donor_util
} // namespace mongo

View File

@@ -41,7 +41,8 @@ namespace migrating_tenant_donor_util {
* Sends recipientSyncData to the recipient until success and starts blocking writes and causal
* reads.
*/
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& donorDoc);
void dataSync(OperationContext* opCtx,
const TenantMigrationDonorDocument& originalDonorStateDocument);
/**
* Creates a task executor to be used for tenant migration.
@@ -54,6 +55,8 @@ std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContex
*/
void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc);
void persistDonorStateDocument(OperationContext* opCtx,
const TenantMigrationDonorDocument& donorStateDocument);
} // namespace migrating_tenant_donor_util
} // namespace mongo