SERVER-25131: release collection/db locks on collection clone failure.

This commit is contained in:
Scott Hernandez
2016-09-02 09:39:06 -04:00
parent b3aba5d4d1
commit 815e16eace
6 changed files with 149 additions and 45 deletions

View File

@@ -0,0 +1,38 @@
/**
* Tests that initial sync can complete after a failed insert to a cloned collection.
*/
(function() {
var name = 'initial_sync_fail_insert_once';
var replSet = new ReplSetTest({
name: name,
nodes: 2,
});
replSet.startSet();
replSet.initiate();
var primary = replSet.getPrimary();
var secondary = replSet.getSecondary();
var coll = primary.getDB('test').getCollection(name);
assert.writeOK(coll.insert({_id: 0, x: 1}, {writeConcern: {w: 2}}));
jsTest.log("Enabling Failpoint failCollectionInserts on " + tojson(secondary));
assert.commandWorked(secondary.getDB("admin").adminCommand({
configureFailPoint: "failCollectionInserts",
mode: {times: 2},
data: {collectionNS: coll.getFullName()}
}));
jsTest.log("Issuing RESYNC command to " + tojson(secondary));
assert.commandWorked(secondary.getDB("admin").runCommand({resync: 1}));
replSet.awaitReplication();
replSet.awaitSecondaryNodes();
assert.eq(1, secondary.getDB("test")[name].count());
assert.docEq({_id: 0, x: 1}, secondary.getDB("test")[name].findOne());
jsTest.log("Stopping repl set test; finished.");
replSet.stopSet();
})();

View File

@@ -190,6 +190,7 @@ error_code("IncompatibleServerVersion", 188)
error_code("PrimarySteppedDown", 189)
error_code("MasterSlaveConnectionFailure", 190)
error_code("BalancerLostDistributedLock", 191)
error_code("FailPointEnabled", 192)
# Non-sequential error codes (for compatibility only)
error_code("SocketException", 9001)

View File

@@ -68,11 +68,16 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/auth/user_document_parser.h" // XXX-ANDY
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
// Used below to fail during inserts.
MONGO_FP_DECLARE(failCollectionInserts);
const auto bannedExpressionsInValidators = std::set<StringData>{
"$geoNear", "$near", "$nearSphere", "$text", "$where",
};
@@ -369,6 +374,20 @@ Status Collection::insertDocuments(OperationContext* txn,
OpDebug* opDebug,
bool enforceQuota,
bool fromMigrate) {
MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) {
const BSONObj& data = extraData.getData();
const auto collElem = data["collectionNS"];
// If the failpoint specifies no collection or matches the existing one, fail.
if (!collElem || _ns == collElem.str()) {
const std::string msg = str::stream()
<< "Failpoint (failCollectionInserts) has been enabled (" << data
<< "), so rejecting insert (first doc): " << *begin;
log() << msg;
return {ErrorCodes::FailPointEnabled, msg};
}
}
// Should really be done in the collection object at creation and updated on index create.
const bool hasIdIndex = _indexCatalog.findIdIndex(txn);
@@ -418,6 +437,20 @@ Status Collection::insertDocument(OperationContext* txn,
const BSONObj& doc,
const std::vector<MultiIndexBlock*>& indexBlocks,
bool enforceQuota) {
MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) {
const BSONObj& data = extraData.getData();
const auto collElem = data["collectionNS"];
// If the failpoint specifies no collection or matches the existing one, fail.
if (!collElem || _ns == collElem.str()) {
const std::string msg = str::stream()
<< "Failpoint (failCollectionInserts) has been enabled (" << data
<< "), so rejecting insert: " << doc;
log() << msg;
return {ErrorCodes::FailPointEnabled, msg};
}
}
{
auto status = checkValidation(txn, doc);
if (!status.isOK())

View File

@@ -46,6 +46,7 @@
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
@@ -64,8 +65,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* txn,
_txn(txn),
_coll(coll),
_nss{coll->ns()},
_idIndexBlock{txn, coll},
_secondaryIndexesBlock{txn, coll},
_idIndexBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)),
_secondaryIndexesBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)),
_idIndexSpec(idIndexSpec) {
invariant(txn);
invariant(coll);
@@ -91,18 +92,21 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn,
invariant(coll);
invariant(txn->getClient() == &cc());
if (secondaryIndexSpecs.size()) {
_hasSecondaryIndexes = true;
_secondaryIndexesBlock.ignoreUniqueConstraint();
auto status = _secondaryIndexesBlock.init(secondaryIndexSpecs);
_secondaryIndexesBlock->ignoreUniqueConstraint();
auto status = _secondaryIndexesBlock->init(secondaryIndexSpecs);
if (!status.isOK()) {
return status;
}
} else {
_secondaryIndexesBlock.reset();
}
if (!_idIndexSpec.isEmpty()) {
auto status = _idIndexBlock.init(_idIndexSpec);
auto status = _idIndexBlock->init(_idIndexSpec);
if (!status.isOK()) {
return status;
}
} else {
_idIndexBlock.reset();
}
return Status::OK();
@@ -111,36 +115,37 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn,
Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
int count = 0;
return _runner->runSynchronousTask([begin, end, &count, this](OperationContext* txn) -> Status {
invariant(txn);
return _runTaskReleaseResourcesOnFailure(
[begin, end, &count, this](OperationContext* txn) -> Status {
invariant(txn);
for (auto iter = begin; iter != end; ++iter) {
std::vector<MultiIndexBlock*> indexers;
if (!_idIndexSpec.isEmpty()) {
indexers.push_back(&_idIndexBlock);
}
if (_hasSecondaryIndexes) {
indexers.push_back(&_secondaryIndexesBlock);
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
const auto status = _coll->insertDocument(txn, *iter, indexers, false);
if (!status.isOK()) {
return status;
for (auto iter = begin; iter != end; ++iter) {
std::vector<MultiIndexBlock*> indexers;
if (_idIndexBlock) {
indexers.push_back(_idIndexBlock.get());
}
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
_txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
if (_secondaryIndexesBlock) {
indexers.push_back(_secondaryIndexesBlock.get());
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
const auto status = _coll->insertDocument(txn, *iter, indexers, false);
if (!status.isOK()) {
return status;
}
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
_txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
++count;
}
return Status::OK();
});
++count;
}
return Status::OK();
});
}
Status CollectionBulkLoaderImpl::commit() {
return _runner->runSynchronousTask(
return _runTaskReleaseResourcesOnFailure(
[this](OperationContext* txn) -> Status {
_stats.startBuildingIndexes = Date_t::now();
LOG(2) << "Creating indexes for ns: " << _nss.ns();
@@ -149,9 +154,9 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit before deleting dups, so the dups will be removed from secondary indexes when
// deleted.
if (_hasSecondaryIndexes) {
if (_secondaryIndexesBlock) {
std::set<RecordId> secDups;
auto status = _secondaryIndexesBlock.doneInserting(&secDups);
auto status = _secondaryIndexesBlock->doneInserting(&secDups);
if (!status.isOK()) {
return status;
}
@@ -163,18 +168,18 @@ Status CollectionBulkLoaderImpl::commit() {
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
_secondaryIndexesBlock.commit();
_secondaryIndexesBlock->commit();
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
_txn, "CollectionBulkLoaderImpl::commit", _nss.ns());
}
if (!_idIndexSpec.isEmpty()) {
if (_idIndexBlock) {
// Delete dups.
std::set<RecordId> dups;
// Do not do inside a WriteUnitOfWork (required by doneInserting).
auto status = _idIndexBlock.doneInserting(&dups);
auto status = _idIndexBlock->doneInserting(&dups);
if (!status.isOK()) {
return status;
}
@@ -196,7 +201,7 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit _id index, without dups.
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
_idIndexBlock.commit();
_idIndexBlock->commit();
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
@@ -206,15 +211,40 @@ Status CollectionBulkLoaderImpl::commit() {
LOG(2) << "Done creating indexes for ns: " << _nss.ns()
<< ", stats: " << _stats.toString();
// release locks.
_autoColl.reset(nullptr);
_autoDB.reset(nullptr);
_coll = nullptr;
_releaseResources();
return Status::OK();
},
TaskRunner::NextAction::kDisposeOperationContext);
}
void CollectionBulkLoaderImpl::_releaseResources() {
if (_secondaryIndexesBlock) {
_secondaryIndexesBlock.reset();
}
if (_idIndexBlock) {
_idIndexBlock.reset();
}
// release locks.
_coll = nullptr;
_autoColl.reset(nullptr);
_autoDB.reset(nullptr);
}
Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(
TaskRunner::SynchronousTask task, TaskRunner::NextAction nextAction) {
auto newTask = [this, &task](OperationContext* txn) -> Status {
ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this);
const auto status = task(txn);
if (status.isOK()) {
guard.Dismiss();
}
return status;
};
return _runner->runSynchronousTask(newTask, nextAction);
}
CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const {
return _stats;
}

View File

@@ -84,6 +84,11 @@ public:
virtual BSONObj toBSON() const override;
private:
void _releaseResources();
Status _runTaskReleaseResourcesOnFailure(
TaskRunner::SynchronousTask task,
TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext);
std::unique_ptr<OldThreadPool> _threadPool;
std::unique_ptr<TaskRunner> _runner;
std::unique_ptr<AutoGetCollection> _autoColl;
@@ -91,9 +96,8 @@ private:
OperationContext* _txn = nullptr;
Collection* _coll = nullptr;
NamespaceString _nss;
MultiIndexBlock _idIndexBlock;
MultiIndexBlock _secondaryIndexesBlock;
bool _hasSecondaryIndexes = false;
std::unique_ptr<MultiIndexBlock> _idIndexBlock;
std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock;
BSONObj _idIndexSpec;
Stats _stats;
};

View File

@@ -227,9 +227,7 @@ Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextActi
} else {
// Run supplied function.
try {
log() << "starting to run synchronous task on runner.";
returnStatus = func(txn);
log() << "done running the synchronous task.";
} catch (...) {
returnStatus = exceptionToStatus();
error() << "Exception thrown in runSynchronousTask: " << redact(returnStatus);