Compare commits
1 Commits
r7.2.0-rc0
...
m.maher/Te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d14c8c01e5 |
159
jstests/replsets/test.js
Normal file
159
jstests/replsets/test.js
Normal file
@@ -0,0 +1,159 @@
|
||||
/**
|
||||
* Test the waiting logic of $backupCursorExtend. Given a timestamp T, when
|
||||
* $backupCursorExtend returns, oplog with T should be majority committed and
|
||||
* persisent on the disk of that node.
|
||||
*
|
||||
* @tags: [
|
||||
* requires_journaling,
|
||||
* requires_persistence,
|
||||
* requires_sharding,
|
||||
* requires_wiredtiger,
|
||||
* ]
|
||||
*/
|
||||
(function() {
|
||||
"use strict";
|
||||
load("jstests/replsets/rslib.js"); // For reconfig, isConfigCommitted and
|
||||
// safeReconfigShouldFail.
|
||||
load("jstests/libs/backup_utils.js");
|
||||
load("jstests/libs/write_concern_util.js");
|
||||
|
||||
const DEBUG = false;
|
||||
const dbName = "test";
|
||||
const collName = "coll";
|
||||
const restorePath = MongoRunner.dataPath + "forRestore/";
|
||||
const numDocs = 2;
|
||||
|
||||
let addNodeConfig = function(rst, nodeId, conn, arbiter) {
|
||||
const config = rst.getReplSetConfigFromNode();
|
||||
if (arbiter) {
|
||||
config.members.push({_id: nodeId, host: conn.host, arbiterOnly: true});
|
||||
} else {
|
||||
config.members.push({_id: nodeId, host: conn.host});
|
||||
}
|
||||
|
||||
return config;
|
||||
};
|
||||
|
||||
let removeNodeConfig = function(rst, conn) {
|
||||
const config = rst.getReplSetConfigFromNode();
|
||||
for (var i = 0; i < config.members.length; i++) {
|
||||
if (config.members[i].host == conn.host) {
|
||||
config.members.splice(i, 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return config;
|
||||
};
|
||||
|
||||
function testReconfig(rst, config, shouldSucceed, errCode, errMsg) {
|
||||
if (shouldSucceed) {
|
||||
reconfig(rst, config);
|
||||
assert.soon(() => isConfigCommitted(rst.getPrimary()));
|
||||
rst.waitForConfigReplication(rst.getPrimary());
|
||||
rst.awaitReplication();
|
||||
// rst.await
|
||||
} else {
|
||||
safeReconfigShouldFail(rst, config, false /* force */, errCode, errMsg);
|
||||
|
||||
// A force reconfig should also fail.
|
||||
safeReconfigShouldFail(rst, config, true /* force */, errCode, errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
function insertDoc(db, collName, doc) {
|
||||
let res = assert.commandWorked(db.runCommand({insert: collName, documents: [doc]}));
|
||||
assert(res.hasOwnProperty("operationTime"), tojson(res));
|
||||
return res.operationTime;
|
||||
}
|
||||
|
||||
/*
|
||||
* Assert that lagged secondary will block when Timestamp T has not been majority committed yet.
|
||||
*/
|
||||
function assertLaggedSecondaryGetBlocked() {
|
||||
resetDbpath(restorePath);
|
||||
let rst = new ReplSetTest({name: "test", nodes: 1});
|
||||
rst.startSet();
|
||||
rst.initiateWithHighElectionTimeout();
|
||||
const primaryDB = rst.getPrimary().getDB(dbName);
|
||||
|
||||
print("Ahoo0 ==> Insert Docs to Primary");
|
||||
for (let i = 0; i < 1000; i++) {
|
||||
insertDoc(primaryDB, collName, {k: i});
|
||||
}
|
||||
|
||||
print("Ahoo0 ==> AddSecondary");
|
||||
testReconfig(rst,
|
||||
addNodeConfig(rst, 1 /* nodeId */, rst.add() /* conn */, false /* arbiter */),
|
||||
true /* shouldSucceed */);
|
||||
rst.stopSet();
|
||||
return;
|
||||
|
||||
let cursor = openBackupCursor(rst.getSecondary());
|
||||
// let firstBatch = cursor.next();
|
||||
let firstBatch = undefined;
|
||||
while(cursor.hasNext()) {
|
||||
let batch = cursor.next();
|
||||
print("Ahoo1 ==> ", tojson(batch));
|
||||
if (!firstBatch) {
|
||||
firstBatch = batch;
|
||||
}
|
||||
}
|
||||
|
||||
print("Ahoo2 --> first batch: ", tojson(firstBatch));
|
||||
let checkpointTimestamp = firstBatch.metadata["checkpointTimestamp"];
|
||||
const backupId = firstBatch.metadata.backupId;
|
||||
print("ahoo2 -> ", tojson(checkpointTimestamp), " "+ backupId);
|
||||
|
||||
jsTestLog("Start writes on primary");
|
||||
let clusterTime;
|
||||
for (let i = 0; i < numDocs - 1; i++) {
|
||||
clusterTime = insertDoc(primaryDB, collName, {a: i});
|
||||
}
|
||||
|
||||
print("Ahoo3 ==> clusterTime: ", tojson(clusterTime));
|
||||
let extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
|
||||
while(extendCursor.hasNext()) {
|
||||
let batch = extendCursor.next();
|
||||
print("Ahoo3 ==> ", tojson(batch));
|
||||
}
|
||||
|
||||
jsTestLog("Start writes on primary");
|
||||
for (let i = 0; i < numDocs - 1; i++) {
|
||||
clusterTime = insertDoc(primaryDB, collName, {b: i});
|
||||
}
|
||||
|
||||
print("Ahoo4 ==> clusterTime: ", tojson(clusterTime));
|
||||
extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
|
||||
while(extendCursor.hasNext()) {
|
||||
let batch = extendCursor.next();
|
||||
print("Ahoo4 ==> ", tojson(batch));
|
||||
}
|
||||
|
||||
jsTestLog("Start writes on primary");
|
||||
for (let i = 0; i < numDocs - 1; i++) {
|
||||
clusterTime = insertDoc(primaryDB, collName, {c: i});
|
||||
}
|
||||
|
||||
print("Ahoo5 ==> clusterTime: ", tojson(clusterTime));
|
||||
extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
|
||||
while(extendCursor.hasNext()) {
|
||||
let batch = extendCursor.next();
|
||||
print("Ahoo5 ==> ", tojson(batch));
|
||||
}
|
||||
|
||||
cursor.close();
|
||||
|
||||
cursor = openBackupCursor(rst.getSecondary());
|
||||
// let firstBatch = cursor.next();
|
||||
while(cursor.hasNext()) {
|
||||
let batch = cursor.next();
|
||||
print("Ahoo6 ==> ", tojson(batch));
|
||||
}
|
||||
|
||||
cursor.close();
|
||||
rst.stopSet();
|
||||
}
|
||||
|
||||
assertLaggedSecondaryGetBlocked();
|
||||
})();
|
||||
@@ -29,9 +29,10 @@
|
||||
|
||||
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include "initial_syncer.h"
|
||||
#include "mongo/platform/basic.h"
|
||||
#include "mongo/util/future_util.h"
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
@@ -77,6 +78,7 @@
|
||||
#include "mongo/util/time_support.h"
|
||||
#include "mongo/util/timer.h"
|
||||
#include "mongo/util/version/releases.h"
|
||||
#include "mongo/db/dbdirectclient.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace repl {
|
||||
@@ -704,6 +706,434 @@ void InitialSyncer::_startInitialSyncAttemptCallback(
|
||||
}
|
||||
}
|
||||
|
||||
// void _cloneFiles(const std::set<std::string> &files) {
|
||||
// std::cout << "Ahoo ->Start cloning files:" << std::endl;
|
||||
// for (auto str : files) {
|
||||
// std::cout << str << std::endl;
|
||||
// }
|
||||
// }
|
||||
|
||||
void _cloneFiles(const StringSet& files) {
|
||||
std::cout << "Ahoo ->Start cloning files:" << std::endl;
|
||||
for (auto str : files) {
|
||||
std::cout << str << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
struct SyncingFilesState {
|
||||
SyncingFilesState() = default;
|
||||
void keepBackupCursorAlive(std::shared_ptr<executor::TaskExecutor> clonerExec,
|
||||
const CancellationToken& parentToken,
|
||||
const HostAndPort& host) {
|
||||
backupCursorKeepAliveCancellation = CancellationSource(parentToken);
|
||||
std::cout << "Ahoo --> Inside keepBackupCursorAlive " << std::endl;
|
||||
return AsyncTry([this, host, clonerExec] {
|
||||
if (backupId) {
|
||||
executor::RemoteCommandRequest request(
|
||||
host,
|
||||
NamespaceString::kAdminDb.toString(),
|
||||
std::move(BSON("getMore" << cursorId << "collection"
|
||||
<< "$cmd.aggregate")),
|
||||
rpc::makeEmptyMetadata(),
|
||||
nullptr);
|
||||
// We're not expecting a response, set to fire and forget
|
||||
request.fireAndForgetMode =
|
||||
executor::RemoteCommandRequest::FireAndForgetMode::kOn;
|
||||
std::cout << "Ahoo --> Inside keepBackupCursorAlive new command"
|
||||
<< std::endl;
|
||||
return clonerExec
|
||||
->scheduleRemoteCommand(std::move(request),
|
||||
backupCursorKeepAliveCancellation.token())
|
||||
.getAsync([](auto&&) {});
|
||||
}
|
||||
})
|
||||
.until([](Status) { return false; })
|
||||
.withDelayBetweenIterations(Seconds(2))
|
||||
.on(clonerExec, backupCursorKeepAliveCancellation.token())
|
||||
.getAsync([](auto&&) {}); // Ignore the result Future;
|
||||
}
|
||||
|
||||
void KillBackupCursor(std::shared_ptr<executor::TaskExecutor> clonerExec,
|
||||
const HostAndPort& host) {
|
||||
if (backupId) {
|
||||
std::cout << "Ahoo ->Inside InitialSync Cancelling backupCursorCancellation:"
|
||||
<< std::endl;
|
||||
backupCursorKeepAliveCancellation.cancel();
|
||||
// mongo::sleepsecs(20);
|
||||
auto cmdObj = BSON("killCursors"
|
||||
<< "$cmd.aggregate"
|
||||
<< "cursors" << BSON_ARRAY(cursorId));
|
||||
// std::cout << "Ahoo ->Inside InitialSync kill cursor :" << " " << cmdObj << std::endl;
|
||||
|
||||
executor::RemoteCommandRequest request(host,
|
||||
NamespaceString::kAdminDb.toString(),
|
||||
std::move(cmdObj),
|
||||
rpc::makeEmptyMetadata(),
|
||||
nullptr);
|
||||
// We're not expecting a response, set to fire and forget
|
||||
request.fireAndForgetMode = executor::RemoteCommandRequest::FireAndForgetMode::kOn;
|
||||
return clonerExec
|
||||
->scheduleRemoteCommand(std::move(request), CancellationToken::uncancelable())
|
||||
.getAsync([](auto&&) {});
|
||||
}
|
||||
}
|
||||
|
||||
// Extended cursor sends all log files created since the backupCursor's
|
||||
// checkpointTimestamp till the extendTo timestamp, so we need to get the
|
||||
// difference between the files returned by the consecutive backupCursorExtend to
|
||||
// clone only the new log files added since the previous backupCursorExtend.
|
||||
StringSet getNewFilesToClone(std::set<std::string>& backupCursorExtendFiles) {
|
||||
StringSet newFilesToClone;
|
||||
std::set_difference(backupCursorExtendFiles.begin(),
|
||||
backupCursorExtendFiles.end(),
|
||||
extendedCursorFiles.begin(),
|
||||
extendedCursorFiles.end(),
|
||||
std::inserter(newFilesToClone, newFilesToClone.begin()));
|
||||
extendedCursorFiles.insert(newFilesToClone.begin(), newFilesToClone.end());
|
||||
return newFilesToClone;
|
||||
}
|
||||
|
||||
CancellationSource backupCursorKeepAliveCancellation;
|
||||
boost::optional<mongo::UUID> backupId;
|
||||
CursorId cursorId;
|
||||
mongo::Timestamp lastSyncedOpTime;
|
||||
mongo::Timestamp lastAppliedOpTimeOnSyncSrc;
|
||||
std::set<std::string> extendedCursorFiles;
|
||||
int fileBasedInitialSyncCycles = 1;
|
||||
};
|
||||
|
||||
BSONElement _getBSONField(const BSONObj& obj,
|
||||
const std::string& fieldName,
|
||||
const std::string& objName) {
|
||||
uassert(ErrorCodes::NoSuchKey,
|
||||
str::stream() << "Missing " << fieldName << "field for " << objName << ".",
|
||||
obj.hasField(fieldName));
|
||||
return obj.getField(fieldName);
|
||||
}
|
||||
|
||||
ExecutorFuture<mongo::Timestamp> _getLastAppliedOpTimeFromRemoteNode(
|
||||
HostAndPort host,
|
||||
std::shared_ptr<executor::TaskExecutor> clonerExec,
|
||||
const CancellationToken& token) {
|
||||
executor::RemoteCommandRequest request(std::move(host),
|
||||
NamespaceString::kAdminDb.toString(),
|
||||
std::move(BSON("replSetGetStatus" << 1)),
|
||||
rpc::makeEmptyMetadata(),
|
||||
nullptr);
|
||||
return clonerExec->scheduleRemoteCommand(std::move(request), token)
|
||||
.then([](const auto& response) {
|
||||
uassertStatusOK(response.status);
|
||||
auto& reply = response.data;
|
||||
uassertStatusOK(getStatusFromCommandResult(reply));
|
||||
// Parsing replSetGetStatus's reply to get lastAppliedOpTime.
|
||||
// ReplSetGetStatus's reply example:
|
||||
// {
|
||||
// ...
|
||||
// "optimes" : {
|
||||
// ...
|
||||
// "appliedOpTime" : {
|
||||
// "ts" : Timestamp(1583385878, 1),
|
||||
// "t" : NumberLong(3)
|
||||
// },
|
||||
// ...
|
||||
// }
|
||||
// ...
|
||||
// }
|
||||
auto lastAppliedOpTime = _getBSONField(
|
||||
_getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(),
|
||||
"appliedOpTime",
|
||||
"replSetGetStatus's reply.optimes")
|
||||
.Obj(),
|
||||
"ts",
|
||||
"replSetGetStatus's reply.optimes.appliedOpTime");
|
||||
return lastAppliedOpTime.timestamp();
|
||||
});
|
||||
}
|
||||
|
||||
ExecutorFuture<void> _openBackupCursor(const HostAndPort& host,
|
||||
std::shared_ptr<executor::TaskExecutor> clonerExec,
|
||||
const CancellationToken& token,
|
||||
SyncingFilesState& _syncingFilesState,
|
||||
std::shared_ptr<StringSet> files) {
|
||||
|
||||
const auto cmdObj = [&] {
|
||||
AggregateCommandRequest aggRequest(
|
||||
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
|
||||
{BSON("$backupCursor" << BSONObj())});
|
||||
// We must set a writeConcern on internal commands.
|
||||
aggRequest.setWriteConcern(WriteConcernOptions());
|
||||
return aggRequest.toBSON(BSONObj());
|
||||
}();
|
||||
|
||||
std::cout << "Ahoo -> Inside Fetcher CMD -> " << cmdObj << std::endl;
|
||||
|
||||
auto fetcherCallback = [&_syncingFilesState, files, token](
|
||||
const Fetcher::QueryResponseStatus& dataStatus,
|
||||
Fetcher::NextAction* nextAction,
|
||||
BSONObjBuilder* getMoreBob) {
|
||||
std::cout << "Ahoo -> Inside Fetcher callback-> " << dataStatus.getStatus().reason()
|
||||
<< std::endl;
|
||||
// Throw out any accumulated results on error
|
||||
uassertStatusOK(dataStatus.getStatus());
|
||||
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
|
||||
|
||||
const auto& data = dataStatus.getValue();
|
||||
for (const BSONObj& doc : data.documents) {
|
||||
std::cout << "Ahoo -> Inside Fetcher -> " << doc << std::endl;
|
||||
if (!_syncingFilesState.backupId) {
|
||||
// First batch must contain the metadata.
|
||||
// Parsing the metadata to get backupId and checkpointTimestamp for the
|
||||
// the backupCursor.
|
||||
const auto& metaData =
|
||||
_getBSONField(doc, "metadata", "backupCursor's first batch").Obj();
|
||||
_syncingFilesState.backupId = UUID(uassertStatusOK(UUID::parse(
|
||||
_getBSONField(metaData, "backupId", "backupCursor's first batch.metadata"))));
|
||||
_syncingFilesState.lastSyncedOpTime =
|
||||
_getBSONField(
|
||||
metaData, "checkpointTimestamp", "backupCursor's first batch.metadata")
|
||||
.timestamp();
|
||||
_syncingFilesState.cursorId = data.cursorId;
|
||||
std::cout << "AHoo -> _syncingFilesState.cursorId " << _syncingFilesState.cursorId
|
||||
<< std::endl;
|
||||
} else {
|
||||
files->insert(_getBSONField(doc, "filename", "backupCursor's batches").str());
|
||||
}
|
||||
}
|
||||
|
||||
if (!getMoreBob) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!data.documents.size()) {
|
||||
*nextAction = Fetcher::NextAction::kExitAndKeepCursorAlive;
|
||||
return;
|
||||
}
|
||||
|
||||
getMoreBob->append("getMore", data.cursorId);
|
||||
getMoreBob->append("collection", data.nss.coll());
|
||||
};
|
||||
auto fetcher = std::make_shared<Fetcher>(
|
||||
clonerExec.get(),
|
||||
host,
|
||||
NamespaceString::kAdminDb.toString(),
|
||||
cmdObj,
|
||||
fetcherCallback,
|
||||
ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
|
||||
executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
|
||||
executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
|
||||
RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
|
||||
3U, executor::RemoteCommandRequest::kNoTimeout),
|
||||
transport::kGlobalSSLMode);
|
||||
uassertStatusOK(fetcher->schedule());
|
||||
// std::cout << fetcher->getDiagnosticString() << std::endl;
|
||||
// fetcher->join();
|
||||
// fetcher.join();
|
||||
return fetcher->onCompletion().thenRunOn(clonerExec).then([fetcher, files] {
|
||||
std::cout << "Ahoo -> Inside Fetcher Finished -> " << std::endl;
|
||||
for (const auto& file : *files) {
|
||||
std::cout << file << std::endl;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ExecutorFuture<void> _extendBackupCursor(const HostAndPort& host,
|
||||
std::shared_ptr<executor::TaskExecutor> clonerExec,
|
||||
const CancellationToken& token,
|
||||
SyncingFilesState& _syncingFilesState,
|
||||
std::shared_ptr<StringSet> files,
|
||||
const Timestamp& extendTo) {
|
||||
|
||||
const auto cmdObj = [&] {
|
||||
AggregateCommandRequest aggRequest(
|
||||
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
|
||||
{BSON("$backupCursorExtend" << BSON("backupId" << _syncingFilesState.backupId.get()
|
||||
<< "timestamp" << extendTo))});
|
||||
// We must set a writeConcern on internal commands.
|
||||
aggRequest.setWriteConcern(WriteConcernOptions());
|
||||
// The command may not return immediately because it may wait for the node to have the full
|
||||
// oplog history up to the backup point in time.
|
||||
aggRequest.setMaxTimeMS(180 * 1000);
|
||||
return aggRequest.toBSON(BSONObj());
|
||||
}();
|
||||
|
||||
std::cout << "Ahoo -> Inside Fetcher Extend CMD -> " << cmdObj << std::endl;
|
||||
auto exfiles = std::make_shared<std::set<std::string>>();
|
||||
auto fetcherCallback =
|
||||
[&_syncingFilesState, exfiles, token](const Fetcher::QueryResponseStatus& dataStatus,
|
||||
Fetcher::NextAction* nextAction,
|
||||
BSONObjBuilder* getMoreBob) {
|
||||
std::cout << "Ahoo -> Inside Fetcher Extend callback-> "
|
||||
<< dataStatus.getStatus().reason() << std::endl;
|
||||
// Throw out any accumulated results on error
|
||||
uassertStatusOK(dataStatus.getStatus());
|
||||
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
|
||||
|
||||
const auto& data = dataStatus.getValue();
|
||||
for (const BSONObj& doc : data.documents) {
|
||||
std::cout << "Ahoo -> Inside Fetcher Extend -> " << doc << std::endl;
|
||||
exfiles->insert(_getBSONField(doc, "filename", "backupCursor's batches").str());
|
||||
}
|
||||
|
||||
if (!getMoreBob || !data.documents.size()) {
|
||||
return;
|
||||
}
|
||||
getMoreBob->append("getMore", data.cursorId);
|
||||
getMoreBob->append("collection", data.nss.coll());
|
||||
};
|
||||
|
||||
auto fetcher = std::make_shared<Fetcher>(
|
||||
clonerExec.get(),
|
||||
host,
|
||||
NamespaceString::kAdminDb.toString(),
|
||||
cmdObj,
|
||||
fetcherCallback,
|
||||
ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
|
||||
executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
|
||||
executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
|
||||
RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
|
||||
3U, executor::RemoteCommandRequest::kNoTimeout),
|
||||
transport::kGlobalSSLMode);
|
||||
uassertStatusOK(fetcher->schedule());
|
||||
// std::cout << fetcher->getDiagnosticString() << std::endl;
|
||||
// fetcher->join();
|
||||
// fetcher.join();
|
||||
return fetcher->onCompletion()
|
||||
.thenRunOn(clonerExec)
|
||||
.then([fetcher, files, exfiles, &_syncingFilesState] {
|
||||
std::cout << "Ahoo -> Inside Fetcher Extend Finished -> " << std::endl;
|
||||
*files = _syncingFilesState.getNewFilesToClone(*exfiles);
|
||||
for (const auto& file : *files) {
|
||||
std::cout << file << std::endl;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ExecutorFuture<void> _cloneFromCursor(const HostAndPort& host,
|
||||
std::shared_ptr<executor::TaskExecutor> clonerExec,
|
||||
const CancellationToken& token,
|
||||
SyncingFilesState& _syncingFilesState) {
|
||||
std::cout << "Ahoo ->Inside InitialSync fileBasedInitialSyncCycles: "
|
||||
<< _syncingFilesState.fileBasedInitialSyncCycles << std::endl;
|
||||
if (_syncingFilesState.fileBasedInitialSyncCycles == 1) {
|
||||
std::cout << "Ahoo ->Inside InitialSync start backup cursor: " << std::endl;
|
||||
invariant(!_syncingFilesState.backupId);
|
||||
auto files = std::make_shared<StringSet>();
|
||||
return _openBackupCursor(host, clonerExec, token, _syncingFilesState, files)
|
||||
.then([files, &_syncingFilesState, clonerExec, &host, token] {
|
||||
_cloneFiles(*files);
|
||||
_syncingFilesState.keepBackupCursorAlive(clonerExec, token, host);
|
||||
});
|
||||
}
|
||||
|
||||
invariant(_syncingFilesState.backupId);
|
||||
auto files = std::make_shared<StringSet>();
|
||||
return _extendBackupCursor(host,
|
||||
clonerExec,
|
||||
token,
|
||||
_syncingFilesState,
|
||||
files,
|
||||
_syncingFilesState.lastAppliedOpTimeOnSyncSrc)
|
||||
.then([files, &_syncingFilesState] {
|
||||
_syncingFilesState.lastSyncedOpTime = _syncingFilesState.lastAppliedOpTimeOnSyncSrc;
|
||||
_cloneFiles(*files);
|
||||
});
|
||||
}
|
||||
|
||||
ExecutorFuture<void> _startSyncingFiles(std::shared_ptr<executor::TaskExecutor> clonerExec,
|
||||
std::unique_ptr<DBClientConnection>& client,
|
||||
SyncingFilesState& _syncingFilesState,
|
||||
const CancellationToken& token) {
|
||||
|
||||
return AsyncTry([&client, &_syncingFilesState, &clonerExec, &token]() mutable {
|
||||
return _cloneFromCursor(
|
||||
client->getServerHostAndPort(), clonerExec, token, _syncingFilesState)
|
||||
.then([&client, &clonerExec, &token, &_syncingFilesState]() {
|
||||
if (_syncingFilesState.fileBasedInitialSyncCycles == 1) {
|
||||
std::cout << "Ahoo --> Inside InitialSync- start inserting "
|
||||
<< std::endl;
|
||||
for (int i = 0; i < 2000; i++) {
|
||||
auto insertCmd = BSON("insert"
|
||||
<< "hoppa"
|
||||
<< "documents" << BSON_ARRAY(BSON("a" << i))
|
||||
<< "writeConcern" << BSON("w" << 1));
|
||||
client->runCommand(OpMsgRequest::fromDBAndBody("test", insertCmd));
|
||||
}
|
||||
mongo::sleepsecs(20);
|
||||
}
|
||||
|
||||
return _getLastAppliedOpTimeFromRemoteNode(
|
||||
client->getServerHostAndPort(), clonerExec, token);
|
||||
})
|
||||
.then([&_syncingFilesState](mongo::Timestamp result) {
|
||||
_syncingFilesState.lastAppliedOpTimeOnSyncSrc = result;
|
||||
});
|
||||
})
|
||||
.until([&client, &_syncingFilesState, &clonerExec, &token](Status status) mutable {
|
||||
std::cout << "Ahoo ->Inside InitialSync Until: " << std::endl;
|
||||
|
||||
if (!status.isOK()) {
|
||||
_syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort());
|
||||
return true;
|
||||
}
|
||||
|
||||
if (static_cast<int>(_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() -
|
||||
_syncingFilesState.lastSyncedOpTime.getSecs()) >= 0) {
|
||||
if (++_syncingFilesState.fileBasedInitialSyncCycles <= 3) {
|
||||
std::cout << "Ahoo ->Inside InitialSync lastSyncedOpTime: "
|
||||
<< _syncingFilesState.lastSyncedOpTime.toString() << std::endl;
|
||||
std::cout << "Ahoo ->Inside InitialSync syncSourceLastApliedOpTime: "
|
||||
<< _syncingFilesState.lastAppliedOpTimeOnSyncSrc.toString()
|
||||
<< std::endl;
|
||||
std::cout << "Ahoo ->Inside InitialSync Diff: "
|
||||
<< (_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() -
|
||||
_syncingFilesState.lastSyncedOpTime.getSecs())
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
_syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort());
|
||||
|
||||
std::cout << "Ahoo ->Inside InitialSync stop looping: " << std::endl;
|
||||
return true;
|
||||
})
|
||||
.on(clonerExec, token);
|
||||
}
|
||||
|
||||
mongo::Timestamp _getLastAppliedOpTimeLocal(
|
||||
DBClientBase* conn) {
|
||||
std::cout << "here 1" << std::endl;
|
||||
auto res = conn->runCommand(
|
||||
OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, BSON("replSetGetStatus" << 1)));
|
||||
auto reply = res->getCommandReply();
|
||||
|
||||
std::cout << "here 2" << reply << std::endl;
|
||||
uassertStatusOK(getStatusFromCommandResult(reply));
|
||||
|
||||
// Parsing replSetGetStatus's reply to get lastAppliedOpTime.
|
||||
// ReplSetGetStatus's reply example:
|
||||
// {
|
||||
// ...
|
||||
// "optimes" : {
|
||||
// ...
|
||||
// "appliedOpTime" : {
|
||||
// "ts" : Timestamp(1583385878, 1),
|
||||
// "t" : NumberLong(3)
|
||||
// },
|
||||
// ...
|
||||
// }
|
||||
// ...
|
||||
// }
|
||||
auto lastAppliedOpTime = _getBSONField(
|
||||
_getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(),
|
||||
"appliedOpTime",
|
||||
"replSetGetStatus's reply.optimes")
|
||||
.Obj(),
|
||||
"ts",
|
||||
"replSetGetStatus's reply.optimes.appliedOpTime");
|
||||
return lastAppliedOpTime.timestamp();
|
||||
}
|
||||
|
||||
void InitialSyncer::_chooseSyncSourceCallback(
|
||||
const executor::TaskExecutor::CallbackArgs& callbackArgs,
|
||||
std::uint32_t chooseSyncSourceAttempt,
|
||||
@@ -795,6 +1225,73 @@ void InitialSyncer::_chooseSyncSourceCallback(
|
||||
|
||||
_syncSource = syncSource.getValue();
|
||||
|
||||
{
|
||||
/// AHOO
|
||||
// auto client = std::make_unique<DBDirectClient>(true /* autoReconnect */);
|
||||
// _client;
|
||||
// uassertStatusOK(client->connect(_syncSource, "FileCopyBasedInitialSyncer", boost::none));
|
||||
// std::cout << "AHOO ---> Inside InitialSync == client : " << client->toString()
|
||||
// << std::endl;
|
||||
// SyncingFilesState _syncingFilesState;
|
||||
// auto primary = CancellationSource();
|
||||
// uassertStatusOK(_startSyncingFiles(_clonerExec, client, _syncingFilesState, primary.token())
|
||||
// .getNoThrow());
|
||||
// std::set<std::string> files;
|
||||
// uassertStatusOK(_openBackupCursor(_syncSource, _clonerExec, primary.token(),
|
||||
// _syncingFilesState, files).getNoThrow());
|
||||
AggregateCommandRequest aggRequest(
|
||||
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
|
||||
{BSON("$backupCursor" << BSONObj())});
|
||||
// We must set a writeConcern on internal commands.
|
||||
aggRequest.setWriteConcern(WriteConcernOptions());
|
||||
auto opctx = makeOpCtx();
|
||||
auto client = std::make_unique<DBDirectClient>(opctx.get());
|
||||
auto statusWith = DBClientCursor::fromAggregationRequest(
|
||||
client.get(), aggRequest, true /* secondaryOk */, false /* useExhaust */);
|
||||
|
||||
std::cout << "Ahoo --> WE ARE HERE --> " << statusWith.isOK() << std::endl;
|
||||
auto cursor = statusWith.getValue().get();
|
||||
bool firstBatch = false;
|
||||
auto backupId = UUID::gen();
|
||||
while (cursor->more()) {
|
||||
auto batch = cursor->next();
|
||||
if(!firstBatch) {
|
||||
firstBatch = true;
|
||||
backupId = UUID(uassertStatusOK(UUID::parse(batch["metadata"]["backupId"])));
|
||||
}
|
||||
std::cout << "Ahoo BackupCursor--> " << batch << std::endl;
|
||||
}
|
||||
|
||||
// std::cout << "Ahoo --> backupId " << backupId << std::endl;
|
||||
|
||||
// auto lastOpTime = _getLastAppliedOpTimeLocal(client.get());
|
||||
|
||||
// std::cout << "Ahoo --> lastOpTime " << lastOpTime.toString() << std::endl;
|
||||
|
||||
// AggregateCommandRequest aggRequestEx(
|
||||
// NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
|
||||
// {BSON("$backupCursorExtend" << BSON("backupId" << backupId << "timestamp" << lastOpTime))});
|
||||
// // We must set a writeConcern on internal commands.
|
||||
// aggRequestEx.setWriteConcern(WriteConcernOptions());
|
||||
// // The command may not return immediately because it may wait for the node to have the full
|
||||
// // oplog history up to the backup point in time.
|
||||
// aggRequestEx.setMaxTimeMS(180 * 1000);
|
||||
|
||||
// std::cout << "Ahoo --> open Extended Cursror " << std::endl;
|
||||
// statusWith = DBClientCursor::fromAggregationRequest(
|
||||
// client.get(), aggRequestEx, true /* secondaryOk */, false /* useExhaust */);
|
||||
|
||||
// std::cout << "Ahoo --> WE ARE HERE2 --> " << statusWith.isOK() << std::endl;
|
||||
// auto exCursor = statusWith.getValue().get();
|
||||
// while (exCursor->more()) {
|
||||
// auto batch = exCursor->next();
|
||||
// std::cout << "Ahoo ExtendedBackupCursor--> " << batch << std::endl;
|
||||
// }
|
||||
|
||||
std::cout << "Ahoo --> Kill backupCursor " << std::endl;
|
||||
cursor->kill();
|
||||
}
|
||||
|
||||
// Schedule rollback ID checker.
|
||||
_rollbackChecker = std::make_unique<RollbackChecker>(*_attemptExec, _syncSource);
|
||||
auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) {
|
||||
|
||||
@@ -571,6 +571,10 @@ private:
|
||||
auto command = _execContext->getCommand();
|
||||
auto& request = _execContext->getRequest();
|
||||
|
||||
if (command &&
|
||||
(command->getName() == "aggregate" || command->getName() == "getMore" ||
|
||||
command->getName() == "killCursors"))
|
||||
std::cout << "Ahoo --> new Request: " << request.body << std::endl;
|
||||
const auto apiParamsFromClient = initializeAPIParameters(request.body, command);
|
||||
Client* client = opCtx->getClient();
|
||||
|
||||
@@ -1865,6 +1869,9 @@ DbResponse makeCommandResponse(std::shared_ptr<HandleRequest::ExecutionContext>
|
||||
}
|
||||
}
|
||||
|
||||
if (c &&
|
||||
(c->getName() == "aggregate" || c->getName() == "getMore" || c->getName() == "killCursors"))
|
||||
std::cout << "Ahoo ==> Reply: " << replyBuilder->getBodyBuilder().asTempObj() << std::endl;
|
||||
dbResponse.response = replyBuilder->done();
|
||||
CurOp::get(opCtx)->debug().responseLength = dbResponse.response.header().dataLen();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user