Compare commits

...

1 Commits

Author SHA1 Message Date
Moustafa Maher
d14c8c01e5 tst 2021-10-06 05:17:40 +00:00
3 changed files with 665 additions and 2 deletions

159
jstests/replsets/test.js Normal file
View File

@@ -0,0 +1,159 @@
/**
* Test the waiting logic of $backupCursorExtend. Given a timestamp T, when
* $backupCursorExtend returns, oplog with T should be majority committed and
* persisent on the disk of that node.
*
* @tags: [
* requires_journaling,
* requires_persistence,
* requires_sharding,
* requires_wiredtiger,
* ]
*/
(function() {
"use strict";
load("jstests/replsets/rslib.js"); // For reconfig, isConfigCommitted and
// safeReconfigShouldFail.
load("jstests/libs/backup_utils.js");
load("jstests/libs/write_concern_util.js");
const DEBUG = false;
const dbName = "test";
const collName = "coll";
const restorePath = MongoRunner.dataPath + "forRestore/";
const numDocs = 2;
let addNodeConfig = function(rst, nodeId, conn, arbiter) {
const config = rst.getReplSetConfigFromNode();
if (arbiter) {
config.members.push({_id: nodeId, host: conn.host, arbiterOnly: true});
} else {
config.members.push({_id: nodeId, host: conn.host});
}
return config;
};
let removeNodeConfig = function(rst, conn) {
const config = rst.getReplSetConfigFromNode();
for (var i = 0; i < config.members.length; i++) {
if (config.members[i].host == conn.host) {
config.members.splice(i, 1);
break;
}
}
return config;
};
function testReconfig(rst, config, shouldSucceed, errCode, errMsg) {
if (shouldSucceed) {
reconfig(rst, config);
assert.soon(() => isConfigCommitted(rst.getPrimary()));
rst.waitForConfigReplication(rst.getPrimary());
rst.awaitReplication();
// rst.await
} else {
safeReconfigShouldFail(rst, config, false /* force */, errCode, errMsg);
// A force reconfig should also fail.
safeReconfigShouldFail(rst, config, true /* force */, errCode, errMsg);
}
}
function insertDoc(db, collName, doc) {
let res = assert.commandWorked(db.runCommand({insert: collName, documents: [doc]}));
assert(res.hasOwnProperty("operationTime"), tojson(res));
return res.operationTime;
}
/*
* Assert that lagged secondary will block when Timestamp T has not been majority committed yet.
*/
function assertLaggedSecondaryGetBlocked() {
resetDbpath(restorePath);
let rst = new ReplSetTest({name: "test", nodes: 1});
rst.startSet();
rst.initiateWithHighElectionTimeout();
const primaryDB = rst.getPrimary().getDB(dbName);
print("Ahoo0 ==> Insert Docs to Primary");
for (let i = 0; i < 1000; i++) {
insertDoc(primaryDB, collName, {k: i});
}
print("Ahoo0 ==> AddSecondary");
testReconfig(rst,
addNodeConfig(rst, 1 /* nodeId */, rst.add() /* conn */, false /* arbiter */),
true /* shouldSucceed */);
rst.stopSet();
return;
let cursor = openBackupCursor(rst.getSecondary());
// let firstBatch = cursor.next();
let firstBatch = undefined;
while(cursor.hasNext()) {
let batch = cursor.next();
print("Ahoo1 ==> ", tojson(batch));
if (!firstBatch) {
firstBatch = batch;
}
}
print("Ahoo2 --> first batch: ", tojson(firstBatch));
let checkpointTimestamp = firstBatch.metadata["checkpointTimestamp"];
const backupId = firstBatch.metadata.backupId;
print("ahoo2 -> ", tojson(checkpointTimestamp), " "+ backupId);
jsTestLog("Start writes on primary");
let clusterTime;
for (let i = 0; i < numDocs - 1; i++) {
clusterTime = insertDoc(primaryDB, collName, {a: i});
}
print("Ahoo3 ==> clusterTime: ", tojson(clusterTime));
let extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
while(extendCursor.hasNext()) {
let batch = extendCursor.next();
print("Ahoo3 ==> ", tojson(batch));
}
jsTestLog("Start writes on primary");
for (let i = 0; i < numDocs - 1; i++) {
clusterTime = insertDoc(primaryDB, collName, {b: i});
}
print("Ahoo4 ==> clusterTime: ", tojson(clusterTime));
extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
while(extendCursor.hasNext()) {
let batch = extendCursor.next();
print("Ahoo4 ==> ", tojson(batch));
}
jsTestLog("Start writes on primary");
for (let i = 0; i < numDocs - 1; i++) {
clusterTime = insertDoc(primaryDB, collName, {c: i});
}
print("Ahoo5 ==> clusterTime: ", tojson(clusterTime));
extendCursor = extendBackupCursor(rst.getSecondary(), backupId, clusterTime);
while(extendCursor.hasNext()) {
let batch = extendCursor.next();
print("Ahoo5 ==> ", tojson(batch));
}
cursor.close();
cursor = openBackupCursor(rst.getSecondary());
// let firstBatch = cursor.next();
while(cursor.hasNext()) {
let batch = cursor.next();
print("Ahoo6 ==> ", tojson(batch));
}
cursor.close();
rst.stopSet();
}
assertLaggedSecondaryGetBlocked();
})();

View File

@@ -29,9 +29,10 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync
#include "mongo/platform/basic.h"
#include "initial_syncer.h"
#include "mongo/platform/basic.h"
#include "mongo/util/future_util.h"
#include <boost/optional.hpp>
#include <algorithm>
#include <memory>
@@ -77,6 +78,7 @@
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
#include "mongo/util/version/releases.h"
#include "mongo/db/dbdirectclient.h"
namespace mongo {
namespace repl {
@@ -704,6 +706,434 @@ void InitialSyncer::_startInitialSyncAttemptCallback(
}
}
// void _cloneFiles(const std::set<std::string> &files) {
// std::cout << "Ahoo ->Start cloning files:" << std::endl;
// for (auto str : files) {
// std::cout << str << std::endl;
// }
// }
void _cloneFiles(const StringSet& files) {
std::cout << "Ahoo ->Start cloning files:" << std::endl;
for (auto str : files) {
std::cout << str << std::endl;
}
}
struct SyncingFilesState {
SyncingFilesState() = default;
void keepBackupCursorAlive(std::shared_ptr<executor::TaskExecutor> clonerExec,
const CancellationToken& parentToken,
const HostAndPort& host) {
backupCursorKeepAliveCancellation = CancellationSource(parentToken);
std::cout << "Ahoo --> Inside keepBackupCursorAlive " << std::endl;
return AsyncTry([this, host, clonerExec] {
if (backupId) {
executor::RemoteCommandRequest request(
host,
NamespaceString::kAdminDb.toString(),
std::move(BSON("getMore" << cursorId << "collection"
<< "$cmd.aggregate")),
rpc::makeEmptyMetadata(),
nullptr);
// We're not expecting a response, set to fire and forget
request.fireAndForgetMode =
executor::RemoteCommandRequest::FireAndForgetMode::kOn;
std::cout << "Ahoo --> Inside keepBackupCursorAlive new command"
<< std::endl;
return clonerExec
->scheduleRemoteCommand(std::move(request),
backupCursorKeepAliveCancellation.token())
.getAsync([](auto&&) {});
}
})
.until([](Status) { return false; })
.withDelayBetweenIterations(Seconds(2))
.on(clonerExec, backupCursorKeepAliveCancellation.token())
.getAsync([](auto&&) {}); // Ignore the result Future;
}
void KillBackupCursor(std::shared_ptr<executor::TaskExecutor> clonerExec,
const HostAndPort& host) {
if (backupId) {
std::cout << "Ahoo ->Inside InitialSync Cancelling backupCursorCancellation:"
<< std::endl;
backupCursorKeepAliveCancellation.cancel();
// mongo::sleepsecs(20);
auto cmdObj = BSON("killCursors"
<< "$cmd.aggregate"
<< "cursors" << BSON_ARRAY(cursorId));
// std::cout << "Ahoo ->Inside InitialSync kill cursor :" << " " << cmdObj << std::endl;
executor::RemoteCommandRequest request(host,
NamespaceString::kAdminDb.toString(),
std::move(cmdObj),
rpc::makeEmptyMetadata(),
nullptr);
// We're not expecting a response, set to fire and forget
request.fireAndForgetMode = executor::RemoteCommandRequest::FireAndForgetMode::kOn;
return clonerExec
->scheduleRemoteCommand(std::move(request), CancellationToken::uncancelable())
.getAsync([](auto&&) {});
}
}
// Extended cursor sends all log files created since the backupCursor's
// checkpointTimestamp till the extendTo timestamp, so we need to get the
// difference between the files returned by the consecutive backupCursorExtend to
// clone only the new log files added since the previous backupCursorExtend.
StringSet getNewFilesToClone(std::set<std::string>& backupCursorExtendFiles) {
StringSet newFilesToClone;
std::set_difference(backupCursorExtendFiles.begin(),
backupCursorExtendFiles.end(),
extendedCursorFiles.begin(),
extendedCursorFiles.end(),
std::inserter(newFilesToClone, newFilesToClone.begin()));
extendedCursorFiles.insert(newFilesToClone.begin(), newFilesToClone.end());
return newFilesToClone;
}
CancellationSource backupCursorKeepAliveCancellation;
boost::optional<mongo::UUID> backupId;
CursorId cursorId;
mongo::Timestamp lastSyncedOpTime;
mongo::Timestamp lastAppliedOpTimeOnSyncSrc;
std::set<std::string> extendedCursorFiles;
int fileBasedInitialSyncCycles = 1;
};
BSONElement _getBSONField(const BSONObj& obj,
const std::string& fieldName,
const std::string& objName) {
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Missing " << fieldName << "field for " << objName << ".",
obj.hasField(fieldName));
return obj.getField(fieldName);
}
ExecutorFuture<mongo::Timestamp> _getLastAppliedOpTimeFromRemoteNode(
HostAndPort host,
std::shared_ptr<executor::TaskExecutor> clonerExec,
const CancellationToken& token) {
executor::RemoteCommandRequest request(std::move(host),
NamespaceString::kAdminDb.toString(),
std::move(BSON("replSetGetStatus" << 1)),
rpc::makeEmptyMetadata(),
nullptr);
return clonerExec->scheduleRemoteCommand(std::move(request), token)
.then([](const auto& response) {
uassertStatusOK(response.status);
auto& reply = response.data;
uassertStatusOK(getStatusFromCommandResult(reply));
// Parsing replSetGetStatus's reply to get lastAppliedOpTime.
// ReplSetGetStatus's reply example:
// {
// ...
// "optimes" : {
// ...
// "appliedOpTime" : {
// "ts" : Timestamp(1583385878, 1),
// "t" : NumberLong(3)
// },
// ...
// }
// ...
// }
auto lastAppliedOpTime = _getBSONField(
_getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(),
"appliedOpTime",
"replSetGetStatus's reply.optimes")
.Obj(),
"ts",
"replSetGetStatus's reply.optimes.appliedOpTime");
return lastAppliedOpTime.timestamp();
});
}
ExecutorFuture<void> _openBackupCursor(const HostAndPort& host,
std::shared_ptr<executor::TaskExecutor> clonerExec,
const CancellationToken& token,
SyncingFilesState& _syncingFilesState,
std::shared_ptr<StringSet> files) {
const auto cmdObj = [&] {
AggregateCommandRequest aggRequest(
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
{BSON("$backupCursor" << BSONObj())});
// We must set a writeConcern on internal commands.
aggRequest.setWriteConcern(WriteConcernOptions());
return aggRequest.toBSON(BSONObj());
}();
std::cout << "Ahoo -> Inside Fetcher CMD -> " << cmdObj << std::endl;
auto fetcherCallback = [&_syncingFilesState, files, token](
const Fetcher::QueryResponseStatus& dataStatus,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
std::cout << "Ahoo -> Inside Fetcher callback-> " << dataStatus.getStatus().reason()
<< std::endl;
// Throw out any accumulated results on error
uassertStatusOK(dataStatus.getStatus());
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
const auto& data = dataStatus.getValue();
for (const BSONObj& doc : data.documents) {
std::cout << "Ahoo -> Inside Fetcher -> " << doc << std::endl;
if (!_syncingFilesState.backupId) {
// First batch must contain the metadata.
// Parsing the metadata to get backupId and checkpointTimestamp for the
// the backupCursor.
const auto& metaData =
_getBSONField(doc, "metadata", "backupCursor's first batch").Obj();
_syncingFilesState.backupId = UUID(uassertStatusOK(UUID::parse(
_getBSONField(metaData, "backupId", "backupCursor's first batch.metadata"))));
_syncingFilesState.lastSyncedOpTime =
_getBSONField(
metaData, "checkpointTimestamp", "backupCursor's first batch.metadata")
.timestamp();
_syncingFilesState.cursorId = data.cursorId;
std::cout << "AHoo -> _syncingFilesState.cursorId " << _syncingFilesState.cursorId
<< std::endl;
} else {
files->insert(_getBSONField(doc, "filename", "backupCursor's batches").str());
}
}
if (!getMoreBob) {
return;
}
if (!data.documents.size()) {
*nextAction = Fetcher::NextAction::kExitAndKeepCursorAlive;
return;
}
getMoreBob->append("getMore", data.cursorId);
getMoreBob->append("collection", data.nss.coll());
};
auto fetcher = std::make_shared<Fetcher>(
clonerExec.get(),
host,
NamespaceString::kAdminDb.toString(),
cmdObj,
fetcherCallback,
ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
3U, executor::RemoteCommandRequest::kNoTimeout),
transport::kGlobalSSLMode);
uassertStatusOK(fetcher->schedule());
// std::cout << fetcher->getDiagnosticString() << std::endl;
// fetcher->join();
// fetcher.join();
return fetcher->onCompletion().thenRunOn(clonerExec).then([fetcher, files] {
std::cout << "Ahoo -> Inside Fetcher Finished -> " << std::endl;
for (const auto& file : *files) {
std::cout << file << std::endl;
}
});
}
ExecutorFuture<void> _extendBackupCursor(const HostAndPort& host,
std::shared_ptr<executor::TaskExecutor> clonerExec,
const CancellationToken& token,
SyncingFilesState& _syncingFilesState,
std::shared_ptr<StringSet> files,
const Timestamp& extendTo) {
const auto cmdObj = [&] {
AggregateCommandRequest aggRequest(
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
{BSON("$backupCursorExtend" << BSON("backupId" << _syncingFilesState.backupId.get()
<< "timestamp" << extendTo))});
// We must set a writeConcern on internal commands.
aggRequest.setWriteConcern(WriteConcernOptions());
// The command may not return immediately because it may wait for the node to have the full
// oplog history up to the backup point in time.
aggRequest.setMaxTimeMS(180 * 1000);
return aggRequest.toBSON(BSONObj());
}();
std::cout << "Ahoo -> Inside Fetcher Extend CMD -> " << cmdObj << std::endl;
auto exfiles = std::make_shared<std::set<std::string>>();
auto fetcherCallback =
[&_syncingFilesState, exfiles, token](const Fetcher::QueryResponseStatus& dataStatus,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
std::cout << "Ahoo -> Inside Fetcher Extend callback-> "
<< dataStatus.getStatus().reason() << std::endl;
// Throw out any accumulated results on error
uassertStatusOK(dataStatus.getStatus());
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
const auto& data = dataStatus.getValue();
for (const BSONObj& doc : data.documents) {
std::cout << "Ahoo -> Inside Fetcher Extend -> " << doc << std::endl;
exfiles->insert(_getBSONField(doc, "filename", "backupCursor's batches").str());
}
if (!getMoreBob || !data.documents.size()) {
return;
}
getMoreBob->append("getMore", data.cursorId);
getMoreBob->append("collection", data.nss.coll());
};
auto fetcher = std::make_shared<Fetcher>(
clonerExec.get(),
host,
NamespaceString::kAdminDb.toString(),
cmdObj,
fetcherCallback,
ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
executor::RemoteCommandRequest::kNoTimeout, /* findNetworkTimeout */
executor::RemoteCommandRequest::kNoTimeout, /* getMoreNetworkTimeout */
RemoteCommandRetryScheduler::makeRetryPolicy<ErrorCategory::RetriableError>(
3U, executor::RemoteCommandRequest::kNoTimeout),
transport::kGlobalSSLMode);
uassertStatusOK(fetcher->schedule());
// std::cout << fetcher->getDiagnosticString() << std::endl;
// fetcher->join();
// fetcher.join();
return fetcher->onCompletion()
.thenRunOn(clonerExec)
.then([fetcher, files, exfiles, &_syncingFilesState] {
std::cout << "Ahoo -> Inside Fetcher Extend Finished -> " << std::endl;
*files = _syncingFilesState.getNewFilesToClone(*exfiles);
for (const auto& file : *files) {
std::cout << file << std::endl;
}
});
}
ExecutorFuture<void> _cloneFromCursor(const HostAndPort& host,
std::shared_ptr<executor::TaskExecutor> clonerExec,
const CancellationToken& token,
SyncingFilesState& _syncingFilesState) {
std::cout << "Ahoo ->Inside InitialSync fileBasedInitialSyncCycles: "
<< _syncingFilesState.fileBasedInitialSyncCycles << std::endl;
if (_syncingFilesState.fileBasedInitialSyncCycles == 1) {
std::cout << "Ahoo ->Inside InitialSync start backup cursor: " << std::endl;
invariant(!_syncingFilesState.backupId);
auto files = std::make_shared<StringSet>();
return _openBackupCursor(host, clonerExec, token, _syncingFilesState, files)
.then([files, &_syncingFilesState, clonerExec, &host, token] {
_cloneFiles(*files);
_syncingFilesState.keepBackupCursorAlive(clonerExec, token, host);
});
}
invariant(_syncingFilesState.backupId);
auto files = std::make_shared<StringSet>();
return _extendBackupCursor(host,
clonerExec,
token,
_syncingFilesState,
files,
_syncingFilesState.lastAppliedOpTimeOnSyncSrc)
.then([files, &_syncingFilesState] {
_syncingFilesState.lastSyncedOpTime = _syncingFilesState.lastAppliedOpTimeOnSyncSrc;
_cloneFiles(*files);
});
}
ExecutorFuture<void> _startSyncingFiles(std::shared_ptr<executor::TaskExecutor> clonerExec,
std::unique_ptr<DBClientConnection>& client,
SyncingFilesState& _syncingFilesState,
const CancellationToken& token) {
return AsyncTry([&client, &_syncingFilesState, &clonerExec, &token]() mutable {
return _cloneFromCursor(
client->getServerHostAndPort(), clonerExec, token, _syncingFilesState)
.then([&client, &clonerExec, &token, &_syncingFilesState]() {
if (_syncingFilesState.fileBasedInitialSyncCycles == 1) {
std::cout << "Ahoo --> Inside InitialSync- start inserting "
<< std::endl;
for (int i = 0; i < 2000; i++) {
auto insertCmd = BSON("insert"
<< "hoppa"
<< "documents" << BSON_ARRAY(BSON("a" << i))
<< "writeConcern" << BSON("w" << 1));
client->runCommand(OpMsgRequest::fromDBAndBody("test", insertCmd));
}
mongo::sleepsecs(20);
}
return _getLastAppliedOpTimeFromRemoteNode(
client->getServerHostAndPort(), clonerExec, token);
})
.then([&_syncingFilesState](mongo::Timestamp result) {
_syncingFilesState.lastAppliedOpTimeOnSyncSrc = result;
});
})
.until([&client, &_syncingFilesState, &clonerExec, &token](Status status) mutable {
std::cout << "Ahoo ->Inside InitialSync Until: " << std::endl;
if (!status.isOK()) {
_syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort());
return true;
}
if (static_cast<int>(_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() -
_syncingFilesState.lastSyncedOpTime.getSecs()) >= 0) {
if (++_syncingFilesState.fileBasedInitialSyncCycles <= 3) {
std::cout << "Ahoo ->Inside InitialSync lastSyncedOpTime: "
<< _syncingFilesState.lastSyncedOpTime.toString() << std::endl;
std::cout << "Ahoo ->Inside InitialSync syncSourceLastApliedOpTime: "
<< _syncingFilesState.lastAppliedOpTimeOnSyncSrc.toString()
<< std::endl;
std::cout << "Ahoo ->Inside InitialSync Diff: "
<< (_syncingFilesState.lastAppliedOpTimeOnSyncSrc.getSecs() -
_syncingFilesState.lastSyncedOpTime.getSecs())
<< std::endl;
return false;
}
}
_syncingFilesState.KillBackupCursor(clonerExec, client->getServerHostAndPort());
std::cout << "Ahoo ->Inside InitialSync stop looping: " << std::endl;
return true;
})
.on(clonerExec, token);
}
mongo::Timestamp _getLastAppliedOpTimeLocal(
DBClientBase* conn) {
std::cout << "here 1" << std::endl;
auto res = conn->runCommand(
OpMsgRequest::fromDBAndBody(NamespaceString::kAdminDb, BSON("replSetGetStatus" << 1)));
auto reply = res->getCommandReply();
std::cout << "here 2" << reply << std::endl;
uassertStatusOK(getStatusFromCommandResult(reply));
// Parsing replSetGetStatus's reply to get lastAppliedOpTime.
// ReplSetGetStatus's reply example:
// {
// ...
// "optimes" : {
// ...
// "appliedOpTime" : {
// "ts" : Timestamp(1583385878, 1),
// "t" : NumberLong(3)
// },
// ...
// }
// ...
// }
auto lastAppliedOpTime = _getBSONField(
_getBSONField(_getBSONField(reply, "optimes", "replSetGetStatus's reply").Obj(),
"appliedOpTime",
"replSetGetStatus's reply.optimes")
.Obj(),
"ts",
"replSetGetStatus's reply.optimes.appliedOpTime");
return lastAppliedOpTime.timestamp();
}
void InitialSyncer::_chooseSyncSourceCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::uint32_t chooseSyncSourceAttempt,
@@ -795,6 +1225,73 @@ void InitialSyncer::_chooseSyncSourceCallback(
_syncSource = syncSource.getValue();
{
/// AHOO
// auto client = std::make_unique<DBDirectClient>(true /* autoReconnect */);
// _client;
// uassertStatusOK(client->connect(_syncSource, "FileCopyBasedInitialSyncer", boost::none));
// std::cout << "AHOO ---> Inside InitialSync == client : " << client->toString()
// << std::endl;
// SyncingFilesState _syncingFilesState;
// auto primary = CancellationSource();
// uassertStatusOK(_startSyncingFiles(_clonerExec, client, _syncingFilesState, primary.token())
// .getNoThrow());
// std::set<std::string> files;
// uassertStatusOK(_openBackupCursor(_syncSource, _clonerExec, primary.token(),
// _syncingFilesState, files).getNoThrow());
AggregateCommandRequest aggRequest(
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
{BSON("$backupCursor" << BSONObj())});
// We must set a writeConcern on internal commands.
aggRequest.setWriteConcern(WriteConcernOptions());
auto opctx = makeOpCtx();
auto client = std::make_unique<DBDirectClient>(opctx.get());
auto statusWith = DBClientCursor::fromAggregationRequest(
client.get(), aggRequest, true /* secondaryOk */, false /* useExhaust */);
std::cout << "Ahoo --> WE ARE HERE --> " << statusWith.isOK() << std::endl;
auto cursor = statusWith.getValue().get();
bool firstBatch = false;
auto backupId = UUID::gen();
while (cursor->more()) {
auto batch = cursor->next();
if(!firstBatch) {
firstBatch = true;
backupId = UUID(uassertStatusOK(UUID::parse(batch["metadata"]["backupId"])));
}
std::cout << "Ahoo BackupCursor--> " << batch << std::endl;
}
// std::cout << "Ahoo --> backupId " << backupId << std::endl;
// auto lastOpTime = _getLastAppliedOpTimeLocal(client.get());
// std::cout << "Ahoo --> lastOpTime " << lastOpTime.toString() << std::endl;
// AggregateCommandRequest aggRequestEx(
// NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
// {BSON("$backupCursorExtend" << BSON("backupId" << backupId << "timestamp" << lastOpTime))});
// // We must set a writeConcern on internal commands.
// aggRequestEx.setWriteConcern(WriteConcernOptions());
// // The command may not return immediately because it may wait for the node to have the full
// // oplog history up to the backup point in time.
// aggRequestEx.setMaxTimeMS(180 * 1000);
// std::cout << "Ahoo --> open Extended Cursror " << std::endl;
// statusWith = DBClientCursor::fromAggregationRequest(
// client.get(), aggRequestEx, true /* secondaryOk */, false /* useExhaust */);
// std::cout << "Ahoo --> WE ARE HERE2 --> " << statusWith.isOK() << std::endl;
// auto exCursor = statusWith.getValue().get();
// while (exCursor->more()) {
// auto batch = exCursor->next();
// std::cout << "Ahoo ExtendedBackupCursor--> " << batch << std::endl;
// }
std::cout << "Ahoo --> Kill backupCursor " << std::endl;
cursor->kill();
}
// Schedule rollback ID checker.
_rollbackChecker = std::make_unique<RollbackChecker>(*_attemptExec, _syncSource);
auto scheduleResult = _rollbackChecker->reset([=](const RollbackChecker::Result& result) {

View File

@@ -571,6 +571,10 @@ private:
auto command = _execContext->getCommand();
auto& request = _execContext->getRequest();
if (command &&
(command->getName() == "aggregate" || command->getName() == "getMore" ||
command->getName() == "killCursors"))
std::cout << "Ahoo --> new Request: " << request.body << std::endl;
const auto apiParamsFromClient = initializeAPIParameters(request.body, command);
Client* client = opCtx->getClient();
@@ -1865,6 +1869,9 @@ DbResponse makeCommandResponse(std::shared_ptr<HandleRequest::ExecutionContext>
}
}
if (c &&
(c->getName() == "aggregate" || c->getName() == "getMore" || c->getName() == "killCursors"))
std::cout << "Ahoo ==> Reply: " << replyBuilder->getBodyBuilder().asTempObj() << std::endl;
dbResponse.response = replyBuilder->done();
CurOp::get(opCtx)->debug().responseLength = dbResponse.response.header().dataLen();