Compare commits

...

9 Commits

Author SHA1 Message Date
Siyuan Zhou
cf4d9f77e5 Revert "SERVER-26403 Clean primary states on stepdown"
This reverts commit 6904d0ac5e.
2016-11-17 20:07:41 -05:00
Spencer T Brody
c7ebfd0fd2 SERVER-27053 Don't acknowledge writes if the term has changed.
(cherry picked from commit a557fd981d235f84d4a0865dc0bb6b5385fc7a21)
2016-11-17 17:26:03 -05:00
Tess Avitabile
5c5fe01994 Revert "SERVER-14662 Reject ambiguous positional projections and updates"
This reverts commit 2eea3f09ae.
2016-11-17 17:06:33 -05:00
Tess Avitabile
b9c8db05bb Revert "SERVER-14662 fix duplicate uassert error code"
This reverts commit 0df1eabb73.
2016-11-17 17:06:15 -05:00
Tess Avitabile
0362de2200 SERVER-27055 initial_sync_id_index.js should not use w:2 write to check initial sync is complete 2016-11-17 17:05:55 -05:00
Judah Schvimer
e9aea74810 SERVER-27080 Temporarily disable resync command on replica sets
(cherry picked from commit 7c569051d2)
2016-11-17 16:43:23 -05:00
Marko Vojvodic
8ab19a65f3 SERVER-27097 Check that db is not null before derefencing in _collectionCount 2016-11-17 16:14:10 -05:00
ADAM David Alan Martin
b7472174d0 SERVER-27048 Fix recursive lock issue in transport.
The LegacySession teardown code has a race where promoted weak
pointers would be the last owners of a type which needs to hold
a lock in destruction.  That same lock is held by the LegacySession
teardown code, thus leading to a deadlock or detectable recursive
locking situation.

(cherry picked from commit 2fae4242b9)
2016-11-16 18:03:00 -05:00
Ernie Hershey
061d53915f SERVER-27066 Update evergreen configuration for new v3.4 branch 2016-11-16 12:50:30 -05:00
25 changed files with 665 additions and 305 deletions

View File

@@ -52,8 +52,8 @@ if not version_parts:
exit(1)
if version_parts[0]:
print "suffix: latest"
print "src_suffix: latest"
print "suffix: v3.4-latest"
print "src_suffix: v3.4-latest"
else:
print "suffix: {0}".format(version_line)
print "src_suffix: r{0}".format(version_line)

View File

@@ -16,6 +16,8 @@ selector:
- jstests/replsets/stepup.js
# The combination of new bridges and PV0 can lead to an improper spanning tree in sync2.js.
- jstests/replsets/sync2.js
# PV0's w:majority guarantees aren't strong enough for this test to pass.
- jstests/replsets/write_concern_after_stepdown_and_stepup.js
executor:
js_test:

View File

@@ -152,7 +152,7 @@ functions:
params:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
remote_file: mongodb-mongo-master/${build_variant}/${revision}/artifacts/${build_id}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/artifacts/${build_id}.tgz
bucket: mciuploads
extract_to: src
@@ -161,7 +161,7 @@ functions:
params:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
remote_file: mongodb-mongo-master/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
bucket: mciuploads
local_file: src/mongo-binaries.tgz
@@ -196,7 +196,7 @@ functions:
"get buildnumber" : &get_buildnumber
command: keyval.inc
params:
key: "${build_variant}_master"
key: "${build_variant}_v3.4"
destination: "builder_num"
"run diskstats": &run_diskstats
@@ -297,7 +297,7 @@ functions:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: src/mongo-debugsymbols.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -309,7 +309,7 @@ functions:
params:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
remote_file: mongodb-mongo-master/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
bucket: mciuploads
local_file: src/mongo-debugsymbols.tgz
build_variants:
@@ -622,7 +622,7 @@ functions:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: jstests.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/jstestfuzz/${task_id}-${execution}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/jstestfuzz/${task_id}-${execution}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -803,7 +803,7 @@ post:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: src/diagnostic-data.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/ftdc/mongo-diagnostic-data-${task_id}-${execution}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/ftdc/mongo-diagnostic-data-${task_id}-${execution}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -834,7 +834,7 @@ post:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: "src/gcov-intermediate-files.tgz"
remote_file: mongodb-mongo-master/${build_variant}/${revision}/gcov/gcov-intermediate-files-${task_id}-${execution}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/gcov/gcov-intermediate-files-${task_id}-${execution}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -884,7 +884,7 @@ post:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: src/jepsen-mongod-logs.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/jepsen/jepsen-mongod-logs-${task_id}-${execution}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/jepsen/jepsen-mongod-logs-${task_id}-${execution}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -903,7 +903,7 @@ post:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: src/jepsen-results.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/jepsen/jepsen-results-${task_id}-${execution}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/jepsen/jepsen-results-${task_id}-${execution}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -923,7 +923,7 @@ post:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: mongo-coredumps.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/coredumps/mongo-coredumps-${build_id}-${task_name}-${execution}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/coredumps/mongo-coredumps-${build_id}-${task_name}-${execution}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -943,7 +943,7 @@ post:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: diskstats.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/diskstats/mongo-diskstats-${task_id}-${execution}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/diskstats/mongo-diskstats-${task_id}-${execution}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -1102,7 +1102,7 @@ tasks:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: src/mongodb-binaries.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -1112,7 +1112,7 @@ tasks:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: artifacts.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/artifacts/${build_id}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/artifacts/${build_id}.tgz
bucket: mciuploads
permissions: public-read
content_type: application/tar
@@ -1122,7 +1122,7 @@ tasks:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: src/mongodb-unittests.tgz
remote_file: mongodb-mongo-master/${build_variant}/${revision}/unittests/${build_id}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/unittests/${build_id}.tgz
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -1133,7 +1133,7 @@ tasks:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
local_file: src/distsrc.${ext|tgz}
remote_file: mongodb-mongo-master/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
bucket: mciuploads
permissions: public-read
content_type: ${content_type|application/x-gzip}
@@ -1331,7 +1331,7 @@ tasks:
params:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
remote_file: mongodb-mongo-master/${build_variant}/${revision}/unittests/${build_id}.tgz
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/unittests/${build_id}.tgz
bucket: mciuploads
extract_to: src
- func: "run tests"
@@ -2614,7 +2614,7 @@ tasks:
params:
aws_key: ${aws_key}
aws_secret: ${aws_secret}
remote_file: mongodb-mongo-master/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
bucket: mciuploads
local_file: src/distsrc.${ext|tgz}
build_variants: [ linux-64, windows-64-2k8 ]
@@ -3087,7 +3087,7 @@ modules:
- name: enterprise
repo: git@github.com:10gen/mongo-enterprise-modules.git
prefix: src/mongo/db/modules
branch: master
branch: v3.4
- name: rocksdb
repo: git@github.com:mongodb-partners/mongo-rocks.git

View File

@@ -11,3 +11,17 @@ t.drop();
t.save({a: [1, 2]});
t.update({$and: [{a: 1}]}, {$set: {'a.$': 5}});
assert.eq([5, 2], t.findOne().a);
// Make sure dollar sign operator with $and is consistent with no $and case
t.drop();
t.save({a: [1, 2], b: [3, 4]});
t.update({a: 1, b: 4}, {$set: {'a.$': 5}});
// Probably not what we want here, just trying to make sure $and is consistent
assert.eq({a: [1, 5], b: [3, 4]}, t.find({}, {_id: 0}).toArray()[0]);
// Make sure dollar sign operator with $and is consistent with no $and case
t.drop();
t.save({a: [1, 2], b: [3, 4]});
t.update({a: 1, $and: [{b: 4}]}, {$set: {'a.$': 5}});
// Probably not what we want here, just trying to make sure $and is consistent
assert.eq({a: [1, 5], b: [3, 4]}, t.find({}, {_id: 0}).toArray()[0]);

View File

@@ -214,14 +214,6 @@ assert.eq(
t.find({group: 10}, {_id: 0, x: {$elemMatch: {a: 1}}, y: {$elemMatch: {c: 3}}}).toArray()[0],
"multiple $elemMatch on unique fields 1");
assert.eq({"x": [{"y": [{"a": 1, "b": 2}, {"a": 3, "b": 4}]}]},
t.find({group: 8}, {_id: 0, x: {$elemMatch: {y: {$elemMatch: {a: 3}}}}}).toArray()[0],
"nested $elemMatch");
assert.throws(function() {
t.find({group: 3, 'x.a': 1}, {'x.$': 1, y: {$elemMatch: {aa: 1}}}).toArray();
}, [], "throw on positional operator with $elemMatch");
if (false) {
assert.eq(2, // SERVER-1243: handle multiple $elemMatch results
t.find({group: 4}, {x: {$elemMatchAll: {a: {$lte: 2}}}}).toArray()[0].x.length,
@@ -254,3 +246,29 @@ a = t.find({group: 3}, {x: {$elemMatch: {a: 1}}}).batchSize(1);
while (a.hasNext()) {
assert.eq(1, a.next().x[0].a, "positional getMore test");
}
// verify the positional update operator matches the same element as the the positional find. this
// is to ensure consistent behavior with updates until SERVER-1013 is resolved, at which point the
// following tests should be updated.
t.update({group: 10, 'x.a': 3, 'y.c': 1}, {$set: {'x.$': 100}}, false, true);
// updated the wrong element, so the following assertions should be true
assert.eq(100,
t.find({group: 10, 'y.c': 1, x: 100}, {'x.$': 1}).toArray()[0].x[0],
"wrong single element match after update");
assert.eq(100,
t.find({group: 10, x: 100, 'y.c': 1}, {'x.$': 1}).toArray()[0].x[0],
"wrong single element match after update");
t.remove({group: 10});
t.insert({group: 10, x: [{a: 1, b: 2}, {a: 3, b: 4}], y: [{c: 1, d: 2}, {c: 3, d: 4}]});
t.update({group: 10, 'y.c': 1, 'x.a': 3}, {$set: {'x.$': 100}}, false, true);
// updated the correct element
assert.eq(100,
t.find({group: 10, 'y.c': 1, x: 100}, {'x.$': 1}).toArray()[0].x[0],
"right single element match after update");
assert.eq(100,
t.find({group: 10, x: 100, 'y.c': 1}, {'x.$': 1}).toArray()[0].x[0],
"right single element match after update");

View File

@@ -11,60 +11,45 @@ t.drop();
// The collection is empty, forcing an upsert. In this case the query has no array position match
// to substiture for the positional operator. SERVER-4713
assert.writeError(t.update({}, {$set: {'a.$.b': 1}}, true));
res = t.update({}, {$set: {'a.$.b': 1}}, true);
assert(res.hasWriteError(), "An error is reported.");
assert.eq(0, t.count(), "No upsert occurred.");
// Save a document to the collection so it is no longer empty.
assert.writeOK(t.save({_id: 0}));
t.save({_id: 0});
// Now, with an existing document, trigger an update rather than an upsert. The query has no array
// position match to substiture for the positional operator. SERVER-6669
assert.writeError(t.update({}, {$set: {'a.$.b': 1}}));
res = t.update({}, {$set: {'a.$.b': 1}});
assert(res.hasWriteError(), "An error is reported.");
assert.eq([{_id: 0}], t.find().toArray(), "No update occurred.");
// Now, try with an update by _id (without a query array match).
assert.writeError(t.update({_id: 0}, {$set: {'a.$.b': 1}}));
res = t.update({_id: 0}, {$set: {'a.$.b': 1}});
assert(res.hasWriteError(), "An error is reported.");
assert.eq([{_id: 0}], t.find().toArray(), "No update occurred.");
// Seed the collection with a document suitable for the following check.
assert.writeOK(t.remove({}));
assert.writeOK(t.save({_id: 0, a: [{b: {c: 1}}]}));
t.remove({});
t.save({_id: 0, a: [{b: {c: 1}}]});
// Now, attempt to apply an update with two nested positional operators. There is a positional
// query match for the first positional operator but not the second. Note that dollar sign
// substitution for multiple positional opertors is not implemented (SERVER-831).
assert.writeError(t.update({'a.b.c': 1}, {$set: {'a.$.b.$.c': 2}}));
res = t.update({'a.b.c': 1}, {$set: {'a.$.b.$.c': 2}});
assert(res.hasWriteError(), "An error is reported");
assert.eq([{_id: 0, a: [{b: {c: 1}}]}], t.find().toArray(), "No update occurred.");
// SERVER-1155 test an update with the positional operator
// that has a regex in the query field
t.drop();
assert.writeOK(t.insert({_id: 1, arr: [{a: "z", b: 1}]}));
assert.writeOK(t.update({"arr.a": /^z$/}, {$set: {"arr.$.b": 2}}, false, true));
t.insert({_id: 1, arr: [{a: "z", b: 1}]});
res = t.update({"arr.a": /^z$/}, {$set: {"arr.$.b": 2}}, false, true);
assert.writeOK(res);
assert.eq(t.findOne().arr[0], {a: "z", b: 2});
t.drop();
assert.writeOK(t.insert({_id: 1, arr: [{a: "z", b: 1}, {a: "abc", b: 2}, {a: "lmn", b: 3}]}));
assert.writeOK(t.update({"arr.a": /l/}, {$inc: {"arr.$.b": 2}}, false, true));
t.insert({_id: 1, arr: [{a: "z", b: 1}, {a: "abc", b: 2}, {a: "lmn", b: 3}]});
res = t.update({"arr.a": /l/}, {$inc: {"arr.$.b": 2}}, false, true);
assert.writeOK(res);
assert.eq(t.findOne().arr[2], {a: "lmn", b: 5});
// Test updates with ambiguous positional operator.
t.drop();
assert.writeOK(t.insert({_id: 0, a: [1, 2]}));
assert.writeError(t.update({$and: [{a: 1}, {a: 2}]}, {$set: {'a.$': 5}}));
assert.eq([{_id: 0, a: [1, 2]}], t.find().toArray(), "No update occurred.");
t.drop();
assert.writeOK(t.insert({_id: 0, a: [1], b: [2]}));
assert.writeError(t.update({a: 1, b: 2}, {$set: {'a.$': 5}}));
assert.eq([{_id: 0, a: [1], b: [2]}], t.find().toArray(), "No update occurred.");
t.drop();
assert.writeOK(t.insert({_id: 0, a: [1], b: [2]}));
assert.writeError(t.update({a: {$elemMatch: {$lt: 2}}, b: 2}, {$set: {'a.$': 5}}));
assert.eq([{_id: 0, a: [1], b: [2]}], t.find().toArray(), "No update occurred.");
t.drop();
assert.writeOK(t.insert({_id: 0, a: [{b: 1}, {c: 2}]}));
assert.writeError(t.update({'a.b': 1, 'a.c': 2}, {$set: {'a.$': 5}}));
assert.eq([{_id: 0, a: [{b: 1}, {c: 2}]}], t.find().toArray(), "No update occurred.");

View File

@@ -31,8 +31,15 @@
const slave = rt.start(false);
const slaveDB = slave.getDB("test");
// Perform a w=2 write to ensure that slave can be read from, and initial sync is complete.
assert.writeOK(masterDB.coll.insert({}, {writeConcern: {w: 2}}));
// Wait for the slave to sync the collections.
assert.soon(function() {
var res = slaveDB.runCommand({listCollections: 1, filter: {name: "collV2"}});
return res.cursor.firstBatch.length === 1;
}, "Collection with v:2 _id index failed to sync on slave");
assert.soon(function() {
var res = slaveDB.runCommand({listCollections: 1, filter: {name: "collV1"}});
return res.cursor.firstBatch.length === 1;
}, "Collection with v:1 _id index failed to sync on slave");
// Check _id index versions on slave.
spec = GetIndexHelpers.findByName(slaveDB.collV2.getIndexes(), "_id_");

View File

@@ -0,0 +1,120 @@
/*
* Tests that heartbeats containing writes from a different branch of history can't cause a stale
* primary to incorrectly acknowledge a w:majority write that's about to be rolled back.
*/
(function() {
'use strict';
var name = "writeConcernStepDownAndBackUp";
var dbName = "wMajorityCheck";
var collName = "stepdownAndBackUp";
var rst = new ReplSetTest({
name: name,
nodes: [
{},
{},
{rsConfig: {priority: 0}},
],
useBridge: true
});
var nodes = rst.startSet();
rst.initiate();
function waitForState(node, state) {
assert.soonNoExcept(function() {
assert.commandWorked(node.adminCommand(
{replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
return true;
});
}
function waitForPrimary(node) {
assert.soon(function() {
return node.adminCommand('ismaster').ismaster;
});
}
function stepUp(node) {
var primary = rst.getPrimary();
if (primary != node) {
assert.throws(function() {
primary.adminCommand({replSetStepDown: 60 * 5});
});
}
waitForPrimary(node);
}
jsTestLog("Make sure node 0 is primary.");
stepUp(nodes[0]);
var primary = rst.getPrimary();
var secondaries = rst.getSecondaries();
assert.eq(nodes[0], primary);
// Wait for all data bearing nodes to get up to date.
assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
{a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
// Stop the secondaries from replicating.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
});
// Stop the primary from being able to complete stepping down.
assert.commandWorked(
nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'}));
jsTestLog("Do w:majority write that will block waiting for replication.");
var doMajorityWrite = function() {
var res = db.getSiblingDB('wMajorityCheck').stepdownAndBackUp.insert({a: 2}, {
writeConcern: {w: 'majority'}
});
assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown);
};
var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
jsTest.log("Disconnect primary from all secondaries");
nodes[0].disconnect(nodes[1]);
nodes[0].disconnect(nodes[2]);
jsTest.log("Wait for a new primary to be elected");
// Allow the secondaries to replicate again.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
});
waitForPrimary(nodes[1]);
jsTest.log("Do a write to the new primary");
assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
{a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
jsTest.log("Reconnect the old primary to the rest of the nodes");
// Only allow the old primary to connect to the other nodes, not the other way around.
// This is so that the old priamry will detect that it needs to step down and step itself down,
// rather than one of the other nodes detecting this and sending it a replSetStepDown command,
// which would cause the old primary to kill all operations and close all connections, making
// the way that the insert in the parallel shell fails be nondeterministic. Rather than
// handling all possible failure modes in the parallel shell, allowing heartbeat connectivity in
// only one direction makes it easier for the test to fail deterministically.
nodes[1].acceptConnectionsFrom(nodes[0]);
nodes[2].acceptConnectionsFrom(nodes[0]);
joinMajorityWriter();
// Allow the old primary to finish stepping down so that shutdown can finish.
var res = null;
try {
res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'});
} catch (e) {
// Expected - once we disable the fail point the stepdown will proceed and it's racy whether
// the stepdown closes all connections before or after the configureFailPoint command
// returns
}
if (res) {
assert.commandWorked(res);
}
rst.stopSet();
}());

View File

@@ -0,0 +1,134 @@
/*
* Tests that heartbeats containing writes from a different branch of history can't cause a stale
* primary to incorrectly acknowledge a w:majority write that's about to be rolled back, even if the
* stale primary is re-elected primary before waiting for the write concern acknowledgement.
*/
(function() {
'use strict';
var name = "writeConcernStepDownAndBackUp";
var dbName = "wMajorityCheck";
var collName = "stepdownAndBackUp";
var rst = new ReplSetTest({
name: name,
nodes: [
{},
{},
{rsConfig: {priority: 0}},
],
useBridge: true
});
var nodes = rst.startSet();
rst.initiate();
function waitForState(node, state) {
assert.soonNoExcept(function() {
assert.commandWorked(node.adminCommand(
{replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
return true;
});
}
function waitForPrimary(node) {
assert.soon(function() {
return node.adminCommand('ismaster').ismaster;
});
}
function stepUp(node) {
var primary = rst.getPrimary();
if (primary != node) {
assert.throws(function() {
primary.adminCommand({replSetStepDown: 60 * 5});
});
}
waitForPrimary(node);
}
jsTestLog("Make sure node 0 is primary.");
stepUp(nodes[0]);
var primary = rst.getPrimary();
var secondaries = rst.getSecondaries();
assert.eq(nodes[0], primary);
// Wait for all data bearing nodes to get up to date.
assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
{a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
// Stop the secondaries from replicating.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
});
// Stop the primary from calling into awaitReplication()
assert.commandWorked(nodes[0].adminCommand(
{configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'alwaysOn'}));
jsTestLog("Do w:majority write that won't enter awaitReplication() until after the primary " +
"has stepped down and back up");
var doMajorityWrite = function() {
// Run ismaster command with 'hangUpOnStepDown' set to false to mark this connection as
// one that shouldn't be closed when the node steps down. This simulates the scenario where
// the write was coming from a mongos.
assert.commandWorked(db.adminCommand({ismaster: 1, hangUpOnStepDown: false}));
var res = db.getSiblingDB('wMajorityCheck').stepdownAndBackUp.insert({a: 2}, {
writeConcern: {w: 'majority'}
});
assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown);
};
var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
jsTest.log("Disconnect primary from all secondaries");
nodes[0].disconnect(nodes[1]);
nodes[0].disconnect(nodes[2]);
jsTest.log("Wait for a new primary to be elected");
// Allow the secondaries to replicate again.
secondaries.forEach(function(node) {
assert.commandWorked(
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
});
waitForPrimary(nodes[1]);
jsTest.log("Do a write to the new primary");
assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
{a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
jsTest.log("Reconnect the old primary to the rest of the nodes");
nodes[0].reconnect(nodes[1]);
nodes[0].reconnect(nodes[2]);
jsTest.log("Wait for the old primary to step down, roll back its write, and apply the " +
"new writes from the new primary");
waitForState(nodes[0], ReplSetTest.State.SECONDARY);
rst.awaitReplication();
// At this point all 3 nodes should have the same data
assert.soonNoExcept(function() {
nodes.forEach(function(node) {
assert.eq(null,
node.getDB(dbName).getCollection(collName).findOne({a: 2}),
"Node " + node.host + " contained op that should have been rolled back");
assert.neq(null,
node.getDB(dbName).getCollection(collName).findOne({a: 3}),
"Node " + node.host +
" was missing op from branch of history that should have persisted");
});
return true;
});
jsTest.log("Make the original primary become primary once again");
stepUp(nodes[0]);
jsTest.log("Unblock the thread waiting for replication of the now rolled-back write, ensure " +
"that the write concern failed");
assert.commandWorked(nodes[0].adminCommand(
{configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'off'}));
joinMajorityWriter();
rst.stopSet();
}());

View File

@@ -608,14 +608,16 @@ namespace {
unsigned long long _collectionCount(OperationContext* txn,
const string& ns,
bool callerHoldsGlobalLock) {
Collection* coll;
Collection* coll = nullptr;
boost::optional<AutoGetCollectionForRead> ctx;
// If the global write lock is held, we must avoid using AutoGetCollectionForRead as it may lead
// to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596.
if (callerHoldsGlobalLock) {
Database* db = dbHolder().get(txn, ns);
coll = db->getCollection(ns);
if (db) {
coll = db->getCollection(ns);
}
} else {
ctx.emplace(txn, NamespaceString(ns));
coll = ctx->getCollection();

View File

@@ -267,13 +267,6 @@ Status ProjectionExec::transform(WorkingSetMember* member) const {
matchDetails.requestElemMatchKey();
verify(NULL != _queryExpression);
verify(_queryExpression->matchesBSON(member->obj.value(), &matchDetails));
// Performing a positional projection requires valid MatchDetails. For example,
// ambiguity caused by multiple implicit array traversal predicates can lead to invalid
// match details.
if (!matchDetails.isValid()) {
return Status(ErrorCodes::InternalError, "ambiguous positional projection");
}
}
Status projStatus = transform(member->obj.value(), &bob, &matchDetails);
@@ -406,10 +399,6 @@ Status ProjectionExec::transform(const BSONObj& in,
arrayDetails.requestElemMatchKey();
if (matcher->second->matchesBSON(in, &arrayDetails)) {
// Since we create a special matcher for each $elemMatch projection, we should always
// have valid MatchDetails.
invariant(arrayDetails.isValid());
FieldMap::const_iterator fieldIt = _fields.find(elt.fieldName());
if (_fields.end() == fieldIt) {
return Status(ErrorCodes::BadValue,

View File

@@ -189,12 +189,6 @@ TEST(ProjectionExecTest, TransformPositionalDollar) {
// Invalid position $ projections.
testTransform("{'a.$': 1}", "{a: {$size: 1}}", "{a: [5]}", false, "");
// Ambigous position $ projections.
testTransform("{'a.$': 1}", "{$and: [{a: 1}, {a: 2}]}", "{a: [1, 2]}", false, "");
testTransform("{'a.$': 1}", "{a: 1, b: 2}", "{a: [1], b: [2]}", false, "");
testTransform("{'a.$': 1}", "{a: {$elemMatch: {$lt: 2}}, b: 2}", "{a: [1], b: [2]}", false, "");
testTransform("{'a.$': 1}", "{'a.b': 1, 'a.c': 2}", "{a: [{b: 1}, {c: 2}]}", false, "");
}
//

View File

@@ -508,10 +508,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
dassert(cq);
verify(cq->root()->matchesBSON(oldObj.value(), &matchDetails));
// If we have matched more than one array position, we cannot perform a positional update
// operation.
uassert(34412, "ambiguous positional update operation", matchDetails.isValid());
string matchedField;
if (matchDetails.hasElemMatchKey())
matchedField = matchDetails.elemMatchKey();

View File

@@ -189,16 +189,12 @@ public:
// XXX document
virtual bool equivalent(const MatchExpression* other) const = 0;
/**
* Determine if a document satisfies the tree-predicate.
*
* The caller may optionally provide a non-null MatchDetails as an out-parameter. For matching
*documents, the MatchDetails provide further info on how the document was
*matched---specifically, which array element matched an array predicate.
*
* The caller must check that the MatchDetails is valid via the isValid() method before using.
*/
//
// Determine if a document satisfies the tree-predicate.
//
virtual bool matches(const MatchableDocument* doc, MatchDetails* details = 0) const = 0;
virtual bool matchesBSON(const BSONObj& doc, MatchDetails* details = 0) const;
/**

View File

@@ -38,7 +38,7 @@ namespace mongo {
using std::string;
MatchDetails::MatchDetails() : _elemMatchKeyRequested(false), _isValid(true) {
MatchDetails::MatchDetails() : _elemMatchKeyRequested() {
resetOutput();
}
@@ -58,9 +58,6 @@ std::string MatchDetails::elemMatchKey() const {
void MatchDetails::setElemMatchKey(const std::string& elemMatchKey) {
if (_elemMatchKeyRequested) {
if (_elemMatchKey) {
_isValid = false;
}
_elemMatchKey.reset(new std::string(elemMatchKey));
}
}

View File

@@ -73,14 +73,9 @@ public:
void setElemMatchKey(const std::string& elemMatchKey);
bool isValid() const {
return _isValid;
}
private:
bool _loadedRecord;
bool _elemMatchKeyRequested;
bool _isValid;
std::unique_ptr<std::string> _elemMatchKey;
};
}

View File

@@ -921,9 +921,8 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
invariant(!_isCatchingUp);
invariant(!_canAcceptNonLocalWrites);
_isWaitingForDrainToComplete = false;
_drainFinishedCond_forTest.notify_all();
_drainFinishedCond.notify_all();
if (!_getMemberState_inlock().primary()) {
// We must have decided not to transition to primary while waiting for the applier to drain.
@@ -931,6 +930,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
return;
}
invariant(!_canAcceptNonLocalWrites);
_canAcceptNonLocalWrites = true;
lk.unlock();
@@ -952,7 +952,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; };
if (!_drainFinishedCond_forTest.wait_for(lk, timeout.toSystemDuration(), pred)) {
if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
return Status(ErrorCodes::ExceededTimeLimit,
"Timed out waiting to finish draining applier buffer");
}
@@ -1617,9 +1617,35 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
return Status::OK();
}
if (replMode == modeReplSet && !_memberState.primary()) {
return {ErrorCodes::PrimarySteppedDown,
"Primary stepped down while waiting for replication"};
auto checkForStepDown = [&]() -> Status {
if (replMode == modeReplSet && !_memberState.primary()) {
return {ErrorCodes::PrimarySteppedDown,
"Primary stepped down while waiting for replication"};
}
if (opTime.getTerm() != _cachedTerm) {
return {
ErrorCodes::PrimarySteppedDown,
str::stream() << "Term changed from " << opTime.getTerm() << " to " << _cachedTerm
<< " while waiting for replication, indicating that this node must "
"have stepped down."};
}
if (_stepDownPending) {
return {ErrorCodes::PrimarySteppedDown,
"Received stepdown request while waiting for replication"};
}
return Status::OK();
};
Status stepdownStatus = checkForStepDown();
if (!stepdownStatus.isOK()) {
return stepdownStatus;
}
auto interruptStatus = txn->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
}
if (writeConcern.wMode.empty()) {
@@ -1647,10 +1673,6 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
WaiterInfoGuard waitInfo(
&_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
if (replMode == modeReplSet && !_getMemberState_inlock().primary()) {
return {ErrorCodes::PrimarySteppedDown,
"Not primary anymore while waiting for replication - primary stepped down"};
}
if (_inShutdown) {
return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
@@ -1672,6 +1694,11 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
}
return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
}
stepdownStatus = checkForStepDown();
if (!stepdownStatus.isOK()) {
return stepdownStatus;
}
}
return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
@@ -2516,11 +2543,10 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
_replicationWaiterList.signalAndRemoveAll_inlock();
// Wake up the optime waiter that is waiting for primary catch-up to finish.
_opTimeWaiterList.signalAndRemoveAll_inlock();
// Clean up primary states.
// _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously
// by freshness scan.
_canAcceptNonLocalWrites = false;
_isCatchingUp = false;
_isWaitingForDrainToComplete = false;
_drainFinishedCond_forTest.notify_all();
_stepDownPending = false;
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false);
result = kActionCloseAllConnections;
} else {
@@ -2654,6 +2680,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto evhStatus =
scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
_finishCatchUpOplog_inlock(true);
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2662,7 +2689,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
LockGuard lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
_finishCatchUpOplog_inlock(false);
_finishCatchUpOplog_inlock(true);
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2729,11 +2756,10 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
}
void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) {
invariant(_isCatchingUp);
_isCatchingUp = false;
// If the node steps down during the catch-up, we don't go into drain mode.
if (startToDrain) {
invariant(_getMemberState_inlock().primary());
invariant(!_canAcceptNonLocalWrites);
invariant(!_isWaitingForDrainToComplete);
_isWaitingForDrainToComplete = true;
// Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to
@@ -3367,7 +3393,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback(
if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
log() << "stepping down from primary, because a new term has begun: " << term;
_topCoord->prepareForStepDown();
return _stepDownStart();
return _stepDownStart(false);
}
return EventHandle();
}

View File

@@ -910,7 +910,10 @@ private:
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
ReplicationExecutor::EventHandle _stepDownStart();
/**
* Schedules stepdown to run with the global exclusive lock.
*/
ReplicationExecutor::EventHandle _stepDownStart(bool hasMutex);
/**
* Completes a step-down of the current node. Must be run with a global
@@ -949,9 +952,11 @@ private:
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
* returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
* value of "responseStatus".
* 'hasMutex' is true if the caller is holding _mutex. TODO(SERVER-27083): Remove this.
*/
void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action,
const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
bool hasMutex);
/**
* Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
@@ -1180,6 +1185,14 @@ private:
// TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
int _rbid; // (M)
// Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat)
// TODO(SERVER-27083): This bool is redundant of the same-named bool in TopologyCoordinatorImpl,
// but due to mutex ordering between _mutex and _topoMutex we can't inspect the
// TopologyCoordinator field in awaitReplication() where this bool is used. Once we get rid
// of topoMutex and start guarding access to the TopologyCoordinator via _mutex we should
// consolidate the two bools.
bool _stepDownPending = false; // (M)
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
WaiterList _replicationWaiterList; // (M)
@@ -1209,8 +1222,8 @@ private:
// Current ReplicaSet state.
MemberState _memberState; // (MX)
// Used to signal threads waiting for changes to _memberState. Only used in testing.
stdx::condition_variable _drainFinishedCond_forTest; // (M)
// Used to signal threads waiting for changes to _memberState.
stdx::condition_variable _drainFinishedCond; // (M)
// True if we are waiting for the applier to finish draining.
bool _isWaitingForDrainToComplete; // (M)

View File

@@ -1348,12 +1348,11 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
ASSERT_FALSE(getReplCoord()->isCatchingUp());
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
auto net = getNet();
net->enterNetwork();
net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
net->exitNetwork();
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
@@ -1378,12 +1377,15 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
ASSERT_FALSE(getReplCoord()->isCatchingUp());
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
auto net = getNet();
net->enterNetwork();
net->runReadyNetworkOperations();
net->exitNetwork();
auto txn = makeOperationContext();
// Simulate bgsync signaling replCoord to exit drain mode.
// At this point, we see the stepdown and reset the states.
getReplCoord()->signalDrainComplete(txn.get());
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));

View File

@@ -66,6 +66,8 @@ using CBHandle = ReplicationExecutor::CallbackHandle;
using CBHStatus = StatusWith<CBHandle>;
using LockGuard = stdx::lock_guard<stdx::mutex>;
MONGO_FP_DECLARE(blockHeartbeatStepdown);
} // namespace
using executor::RemoteCommandRequest;
@@ -213,7 +215,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
_scheduleHeartbeatToTarget(
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
_handleHeartbeatResponseAction(action, hbStatusResponse);
_handleHeartbeatResponseAction(action, hbStatusResponse, false /*we're not holding _mutex*/);
}
void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIndex,
@@ -233,11 +235,13 @@ void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIn
void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
const HeartbeatResponseAction& action,
const StatusWith<ReplSetHeartbeatResponse>& responseStatus) {
const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
bool hasMutex) {
switch (action.getAction()) {
case HeartbeatResponseAction::NoAction:
// Update the cached member state if different than the current topology member state
if (_memberState != _topCoord->getMemberState()) {
invariant(!hasMutex);
stdx::unique_lock<stdx::mutex> lk(_mutex);
const PostMemberStateUpdateAction postUpdateAction =
_updateMemberStateFromTopologyCoordinator_inlock();
@@ -257,7 +261,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
log() << "Stepping down from primary in response to heartbeat";
_topCoord->prepareForStepDown();
// Don't need to wait for stepdown to finish.
_stepDownStart();
_stepDownStart(hasMutex);
break;
case HeartbeatResponseAction::StepDownRemotePrimary: {
invariant(action.getPrimaryConfigIndex() != _selfIndex);
@@ -311,11 +315,19 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort
}
}
ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() {
ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart(bool hasMutex) {
{
boost::optional<stdx::lock_guard<stdx::mutex>> lk;
if (!hasMutex) {
lk.emplace(_mutex);
}
_stepDownPending = true;
}
auto finishEvent = _makeEvent();
if (!finishEvent) {
return finishEvent;
}
_replExecutor.scheduleWorkWithGlobalExclusiveLock(stdx::bind(
&ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent));
return finishEvent;
@@ -328,6 +340,19 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
return;
}
if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) {
// Must reschedule rather than block so we don't take up threads in the replication
// executor.
sleepmillis(10);
_replExecutor.scheduleWorkWithGlobalExclusiveLock(
stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
this,
stdx::placeholders::_1,
finishedEvent));
return;
}
LockGuard topoLock(_topoMutex);
invariant(cbData.txn);
@@ -668,7 +693,9 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout(
_topCoord->setMemberAsDown(now, memberIndex, _getMyLastDurableOpTime_inlock());
// Don't mind potential asynchronous stepdown as this is the last step of
// liveness check.
_handleHeartbeatResponseAction(action, makeStatusWith<ReplSetHeartbeatResponse>());
_handleHeartbeatResponseAction(action,
makeStatusWith<ReplSetHeartbeatResponse>(),
true /*we're holding _mutex*/);
}
}
}

View File

@@ -43,6 +43,7 @@
#include "mongo/db/repl/old_update_position_args.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_settings.h"
@@ -79,15 +80,15 @@ using executor::RemoteCommandResponse;
using unittest::assertGet;
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
// Helper class to wrap Timestamp as an OpTime with term 0.
struct OpTimeWithTermZero {
OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
// Helper class to wrap Timestamp as an OpTime with term 1.
struct OpTimeWithTermOne {
OpTimeWithTermOne(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
operator OpTime() const {
return OpTime(timestamp, 0);
return OpTime(timestamp, 1);
}
operator boost::optional<OpTime>() const {
return OpTime(timestamp, 0);
return OpTime(timestamp, 1);
}
OpTime asOpTime() const {
@@ -601,7 +602,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstASta
init("");
auto txn = makeOperationContext();
OpTimeWithTermZero time(100, 1);
OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -620,7 +621,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas
init(settings);
auto txn = makeOperationContext();
OpTimeWithTermZero time(100, 1);
OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -654,7 +655,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
auto txn = makeOperationContext();
OpTimeWithTermZero time(100, 1);
OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -667,7 +668,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, statusAndDur.status);
}
TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWZero) {
TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWTermOne) {
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version"
@@ -687,7 +688,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
<< 2))),
HostAndPort("node1", 12345));
OpTimeWithTermZero time(100, 1);
OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -696,13 +697,13 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
// Become primary.
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ASSERT(getReplCoord()->getMemberState().primary());
auto txn = makeOperationContext();
;
ReplicationCoordinator::StatusAndDuration statusAndDur =
getReplCoord()->awaitReplication(txn.get(), time, writeConcern);
ASSERT_OK(statusAndDur.status);
@@ -735,12 +736,12 @@ TEST_F(ReplCoordTest,
<< 3))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -815,12 +816,12 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
<< 3))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -1026,6 +1027,8 @@ TEST_F(
// another name if we didn't get a high enough one.
}
auto zeroOpTimeInCurrentTerm = OpTime(Timestamp(0, 0), 1);
ReplClientInfo::forClient(txn.get()->getClient()).setLastOp(zeroOpTimeInCurrentTerm);
statusAndDur =
getReplCoord()->awaitReplicationOfLastOpForClient(txn.get(), majorityWriteConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
@@ -1148,14 +1151,14 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1212,14 +1215,14 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wDeadline = getNet()->now() + Milliseconds(50);
@@ -1263,14 +1266,14 @@ TEST_F(ReplCoordTest,
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1313,15 +1316,15 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
const auto txn = makeOperationContext();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1355,14 +1358,14 @@ TEST_F(ReplCoordTest,
<< "node3"))),
HostAndPort("node1"));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1575,7 +1578,7 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
const auto txn = makeOperationContext();
OpTimeWithTermZero optime1(100, 1);
OpTimeWithTermOne optime1(100, 1);
// All nodes are caught up
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
@@ -1589,7 +1592,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
TEST_F(StepDownTest,
NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) {
OpTimeWithTermZero optime1(100, 1);
OpTimeWithTermOne optime1(100, 1);
// All nodes are caught up
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
@@ -1819,8 +1822,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle
TEST_F(StepDownTest,
NodeReturnsExceededTimeLimitWhenNoSecondaryIsCaughtUpWithinStepDownsSecondaryCatchUpPeriod) {
OpTimeWithTermZero optime1(100, 1);
OpTimeWithTermZero optime2(100, 2);
OpTimeWithTermOne optime1(100, 1);
OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
repl->setMyLastAppliedOpTime(optime2);
@@ -1986,8 +1989,8 @@ TEST_F(StepDownTest,
}
TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
OpTimeWithTermZero optime1(100, 1);
OpTimeWithTermZero optime2(100, 2);
OpTimeWithTermOne optime1(100, 1);
OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
repl->setMyLastAppliedOpTime(optime2);
@@ -2142,9 +2145,9 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
<< BSON("_id" << 2 << "host"
<< "test3:1234"))),
HostAndPort("test1", 1234));
OpTimeWithTermZero optime1(100, 1);
OpTimeWithTermZero optime2(100, 2);
OpTimeWithTermZero optime3(2, 1);
OpTimeWithTermOne optime1(100, 1);
OpTimeWithTermOne optime2(100, 2);
OpTimeWithTermOne optime3(2, 1);
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2));
@@ -2177,7 +2180,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
ASSERT_EQUALS(optime3.timestamp,
entry[OldUpdatePositionArgs::kOpTimeFieldName]["ts"].timestamp());
}
ASSERT_EQUALS(0, entry[OldUpdatePositionArgs::kOpTimeFieldName]["t"].Number());
ASSERT_EQUALS(1, entry[OldUpdatePositionArgs::kOpTimeFieldName]["t"].Number());
}
ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes
}
@@ -2200,8 +2203,8 @@ TEST_F(ReplCoordTest,
<< "test3:1234"))),
HostAndPort("test2", 1234));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can't unset maintenance mode if it was never set to begin with.
Status status = getReplCoord()->setMaintenanceMode(false);
@@ -2227,8 +2230,8 @@ TEST_F(ReplCoordTest,
<< "test3:1234"))),
HostAndPort("test2", 1234));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// valid set
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
@@ -2259,8 +2262,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena
<< "test3:1234"))),
HostAndPort("test2", 1234));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can set multiple times
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
@@ -2293,8 +2296,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS
<< "test3:1234"))),
HostAndPort("test2", 1234));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// From rollback, entering and exiting maintenance mode doesn't change perceived
// state.
@@ -2335,8 +2338,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) {
<< "test3:1234"))),
HostAndPort("test2", 1234));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can't modify maintenance mode when PRIMARY
simulateSuccessfulV1Election();
@@ -2374,8 +2377,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection)
<< "test3:1234"))),
HostAndPort("test2", 1234));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// TODO this election shouldn't have to happen.
simulateSuccessfulV1Election();
@@ -2439,8 +2442,8 @@ TEST_F(ReplCoordTest,
<< BSON("_id" << 2 << "host" << client2Host.toString()))),
HostAndPort("node1", 12345));
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
@@ -2484,8 +2487,8 @@ TEST_F(ReplCoordTest,
<< BSON("_id" << 2 << "host" << client2Host.toString()))),
HostAndPort("node1", 12345));
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
@@ -2515,8 +2518,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast
OID client = OID::gen();
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
getExternalState()->setClientHostAndPort(clientHost);
HandshakeArgs handshake;
@@ -2717,12 +2720,12 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTime time1({100, 1}, 2);
OpTime time2({100, 2}, 2);
OpTime time1({100, 1}, 1);
OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2738,18 +2741,17 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
// receive updatePosition containing ourself, should not process the update for self
UpdatePositionArgs args;
ASSERT_OK(args.initialize(
BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
<< 2
<< UpdatePositionArgs::kMemberIdFieldName
<< 0
<< UpdatePositionArgs::kDurableOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 2)
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 2))))));
ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
<< 2
<< UpdatePositionArgs::kMemberIdFieldName
<< 0
<< UpdatePositionArgs::kDurableOpTimeFieldName
<< time2.toBSON()
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< time2.toBSON())))));
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
@@ -2776,13 +2778,13 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermZero staleTime(10, 0);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2833,12 +2835,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTime time1({100, 1}, 3);
OpTime time2({100, 2}, 3);
OpTime time1({100, 1}, 1);
OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2848,18 +2850,17 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
// receive updatePosition with incorrect config version
UpdatePositionArgs args;
ASSERT_OK(args.initialize(
BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
<< 3
<< UpdatePositionArgs::kMemberIdFieldName
<< 1
<< UpdatePositionArgs::kDurableOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 3)
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 3))))));
ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
<< 3
<< UpdatePositionArgs::kMemberIdFieldName
<< 1
<< UpdatePositionArgs::kDurableOpTimeFieldName
<< time2.toBSON()
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< time2.toBSON())))));
auto txn = makeOperationContext();
@@ -2891,13 +2892,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermZero staleTime(10, 0);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2947,12 +2948,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTime time1({100, 1}, 2);
OpTime time2({100, 2}, 2);
OpTime time1({100, 1}, 1);
OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2962,18 +2963,17 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
// receive updatePosition with nonexistent member id
UpdatePositionArgs args;
ASSERT_OK(args.initialize(
BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
<< 2
<< UpdatePositionArgs::kMemberIdFieldName
<< 9
<< UpdatePositionArgs::kDurableOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 2)
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 2))))));
ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
<< 1
<< UpdatePositionArgs::kUpdateArrayFieldName
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
<< 2
<< UpdatePositionArgs::kMemberIdFieldName
<< 9
<< UpdatePositionArgs::kDurableOpTimeFieldName
<< time2.toBSON()
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< time2.toBSON())))));
auto txn = makeOperationContext();
@@ -3003,13 +3003,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermZero staleTime(10, 0);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -3058,13 +3058,13 @@ TEST_F(ReplCoordTest,
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
OpTimeWithTermZero time1(100, 1);
OpTimeWithTermZero time2(100, 2);
OpTimeWithTermZero staleTime(10, 0);
OpTimeWithTermOne time1(100, 1);
OpTimeWithTermOne time2(100, 2);
OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -3151,11 +3151,11 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) {
disableSnapshots();
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
simulateSuccessfulV1Election();
OpTimeWithTermZero time(100, 2);
OpTimeWithTermOne time(100, 2);
// 3 nodes waiting for time
WriteConcernOptions writeConcern;
@@ -3242,11 +3242,11 @@ TEST_F(
<< 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
simulateSuccessfulV1Election();
OpTimeWithTermZero time(100, 2);
OpTimeWithTermOne time(100, 2);
// 3 nodes waiting for time
WriteConcernOptions writeConcern;
@@ -3316,8 +3316,8 @@ TEST_F(ReplCoordTest,
disableSnapshots();
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 1));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 1));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
simulateSuccessfulV1Election();
OpTime time(Timestamp(100, 2), 1);
@@ -3523,15 +3523,15 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin
<< 0))),
HostAndPort("node1", 12345));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
auto txn = makeOperationContext();
shutdown(txn.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_EQ(status, ErrorCodes::ShutdownInProgress);
}
@@ -3547,14 +3547,14 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
<< 0))),
HostAndPort("node1", 12345));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
const auto txn = makeOperationContext();
killOperation(txn.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_EQ(status, ErrorCodes::Interrupted);
}
@@ -3587,14 +3587,13 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
<< 0))),
HostAndPort("node1", 12345));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
auto txn = makeOperationContext();
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
txn.get(),
ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)));
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)));
}
TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTimeEqualToOurLast) {
@@ -3610,7 +3609,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
HostAndPort("node1", 12345));
OpTimeWithTermZero time(100, 0);
OpTimeWithTermOne time(100, 0);
getReplCoord()->setMyLastAppliedOpTime(time);
getReplCoord()->setMyLastDurableOpTime(time);
@@ -3627,7 +3626,7 @@ TEST_F(ReplCoordTest,
auto txn = makeOperationContext();
auto status = getReplCoord()->waitUntilOpTimeForRead(
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_EQ(status, ErrorCodes::NotAReplicaSet);
}

View File

@@ -83,6 +83,14 @@ public:
// Replica set resync.
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
if (getGlobalReplicationCoordinator()->getSettings().usingReplSets()) {
// Resync is disabled in production on replica sets until it stabilizes (SERVER-27081).
if (!Command::testCommandsEnabled) {
return appendCommandStatus(
result,
Status(ErrorCodes::OperationFailed,
"Replica sets do not support the resync command"));
}
const MemberState memberState = replCoord->getMemberState();
if (memberState.startup()) {
return appendCommandStatus(

View File

@@ -45,6 +45,7 @@
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/rpc/protocol.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -59,6 +60,8 @@ static Counter64 gleWtimeouts;
static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts",
&gleWtimeouts);
MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern);
StatusWith<WriteConcernOptions> extractWriteConcern(OperationContext* txn,
const BSONObj& cmdObj,
const std::string& dbName,
@@ -182,6 +185,8 @@ Status waitForWriteConcern(OperationContext* txn,
<< ", write concern: " << writeConcern.toBSON();
auto replCoord = repl::ReplicationCoordinator::get(txn);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern);
// Next handle blocking on disk
Timer syncTimer;
WriteConcernOptions writeConcernWithPopulatedSyncMode =

View File

@@ -30,6 +30,8 @@
#include "mongo/platform/basic.h"
#include <algorithm>
#include <iterator>
#include <memory>
#include "mongo/transport/transport_layer_legacy.h"
@@ -47,6 +49,14 @@
namespace mongo {
namespace transport {
namespace {
struct lock_weak {
template <typename T>
std::shared_ptr<T> operator()(const std::weak_ptr<T>& p) const {
return p.lock();
}
};
} // namespace
TransportLayerLegacy::ListenerLegacy::ListenerLegacy(const TransportLayerLegacy::Options& opts,
NewConnectionCb callback)
@@ -221,23 +231,40 @@ void TransportLayerLegacy::_closeConnection(Connection* conn) {
Listener::globalTicketHolder.release();
}
// Capture all of the weak pointers behind the lock, to delay their expiry until we leave the
// locking context. This function requires proof of locking, by passing the lock guard.
auto TransportLayerLegacy::lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const
-> std::vector<LegacySessionHandle> {
using std::begin;
using std::end;
std::vector<std::shared_ptr<LegacySession>> result;
std::transform(begin(_sessions), end(_sessions), std::back_inserter(result), lock_weak());
// Skip expired weak pointers.
result.erase(std::remove(begin(result), end(result), nullptr), end(result));
return result;
}
void TransportLayerLegacy::endAllSessions(Session::TagMask tags) {
log() << "legacy transport layer closing all connections";
{
stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
for (auto&& it : _sessions) {
stdx::unique_lock<stdx::mutex> lk(_sessionsMutex);
// We want to capture the shared_ptrs to our sessions in a way which lets us destroy them
// outside of the lock.
const auto sessions = lockAllSessions(lk);
// Attempt to make our weak_ptr into a shared_ptr
auto session = it.lock();
if (session) {
if (session->getTags() & tags) {
log() << "Skip closing connection for connection # "
<< session->conn()->connectionId;
} else {
_closeConnection(session->conn());
}
for (auto&& session : sessions) {
if (session->getTags() & tags) {
log() << "Skip closing connection for connection # "
<< session->conn()->connectionId;
} else {
_closeConnection(session->conn());
}
}
// TODO(SERVER-27069): Revamp this lock to not cover the loop. This unlock was put here
// specifically to minimize risk, just before the release of 3.4. The risk is that we would
// be in the loop without the lock, which most of our testing didn't do. We must unlock
// manually here, because the `sessions` vector must be destroyed *outside* of the lock.
lk.unlock();
}
}

View File

@@ -28,6 +28,8 @@
#pragma once
#include <vector>
#include "mongo/stdx/list.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
@@ -101,6 +103,8 @@ private:
using NewConnectionCb = stdx::function<void(std::unique_ptr<AbstractMessagingPort>)>;
using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>;
std::vector<LegacySessionHandle> lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const;
/**
* Connection object, to associate Sessions with AbstractMessagingPorts.
*/
@@ -127,8 +131,8 @@ private:
public:
~LegacySession();
static std::shared_ptr<LegacySession> create(std::unique_ptr<AbstractMessagingPort> amp,
TransportLayerLegacy* tl);
static LegacySessionHandle create(std::unique_ptr<AbstractMessagingPort> amp,
TransportLayerLegacy* tl);
TransportLayer* getTransportLayer() const override {
return _tl;