Compare commits
9 Commits
master
...
r3.4.0-rc4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cf4d9f77e5 | ||
|
|
c7ebfd0fd2 | ||
|
|
5c5fe01994 | ||
|
|
b9c8db05bb | ||
|
|
0362de2200 | ||
|
|
e9aea74810 | ||
|
|
8ab19a65f3 | ||
|
|
b7472174d0 | ||
|
|
061d53915f |
@@ -52,8 +52,8 @@ if not version_parts:
|
||||
exit(1)
|
||||
|
||||
if version_parts[0]:
|
||||
print "suffix: latest"
|
||||
print "src_suffix: latest"
|
||||
print "suffix: v3.4-latest"
|
||||
print "src_suffix: v3.4-latest"
|
||||
else:
|
||||
print "suffix: {0}".format(version_line)
|
||||
print "src_suffix: r{0}".format(version_line)
|
||||
|
||||
@@ -16,6 +16,8 @@ selector:
|
||||
- jstests/replsets/stepup.js
|
||||
# The combination of new bridges and PV0 can lead to an improper spanning tree in sync2.js.
|
||||
- jstests/replsets/sync2.js
|
||||
# PV0's w:majority guarantees aren't strong enough for this test to pass.
|
||||
- jstests/replsets/write_concern_after_stepdown_and_stepup.js
|
||||
|
||||
executor:
|
||||
js_test:
|
||||
|
||||
@@ -152,7 +152,7 @@ functions:
|
||||
params:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/artifacts/${build_id}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/artifacts/${build_id}.tgz
|
||||
bucket: mciuploads
|
||||
extract_to: src
|
||||
|
||||
@@ -161,7 +161,7 @@ functions:
|
||||
params:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
|
||||
bucket: mciuploads
|
||||
local_file: src/mongo-binaries.tgz
|
||||
|
||||
@@ -196,7 +196,7 @@ functions:
|
||||
"get buildnumber" : &get_buildnumber
|
||||
command: keyval.inc
|
||||
params:
|
||||
key: "${build_variant}_master"
|
||||
key: "${build_variant}_v3.4"
|
||||
destination: "builder_num"
|
||||
|
||||
"run diskstats": &run_diskstats
|
||||
@@ -297,7 +297,7 @@ functions:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: src/mongo-debugsymbols.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -309,7 +309,7 @@ functions:
|
||||
params:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/debugsymbols/debugsymbols-${build_id}.${ext|tgz}
|
||||
bucket: mciuploads
|
||||
local_file: src/mongo-debugsymbols.tgz
|
||||
build_variants:
|
||||
@@ -622,7 +622,7 @@ functions:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: jstests.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/jstestfuzz/${task_id}-${execution}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/jstestfuzz/${task_id}-${execution}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -803,7 +803,7 @@ post:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: src/diagnostic-data.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/ftdc/mongo-diagnostic-data-${task_id}-${execution}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/ftdc/mongo-diagnostic-data-${task_id}-${execution}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -834,7 +834,7 @@ post:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: "src/gcov-intermediate-files.tgz"
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/gcov/gcov-intermediate-files-${task_id}-${execution}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/gcov/gcov-intermediate-files-${task_id}-${execution}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -884,7 +884,7 @@ post:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: src/jepsen-mongod-logs.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/jepsen/jepsen-mongod-logs-${task_id}-${execution}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/jepsen/jepsen-mongod-logs-${task_id}-${execution}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -903,7 +903,7 @@ post:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: src/jepsen-results.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/jepsen/jepsen-results-${task_id}-${execution}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/jepsen/jepsen-results-${task_id}-${execution}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -923,7 +923,7 @@ post:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: mongo-coredumps.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/coredumps/mongo-coredumps-${build_id}-${task_name}-${execution}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/coredumps/mongo-coredumps-${build_id}-${task_name}-${execution}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -943,7 +943,7 @@ post:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: diskstats.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/diskstats/mongo-diskstats-${task_id}-${execution}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/diskstats/mongo-diskstats-${task_id}-${execution}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -1102,7 +1102,7 @@ tasks:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: src/mongodb-binaries.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/binaries/mongo-${build_id}.${ext|tgz}
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -1112,7 +1112,7 @@ tasks:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: artifacts.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/artifacts/${build_id}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/artifacts/${build_id}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: application/tar
|
||||
@@ -1122,7 +1122,7 @@ tasks:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: src/mongodb-unittests.tgz
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/unittests/${build_id}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/unittests/${build_id}.tgz
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -1133,7 +1133,7 @@ tasks:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
local_file: src/distsrc.${ext|tgz}
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
|
||||
bucket: mciuploads
|
||||
permissions: public-read
|
||||
content_type: ${content_type|application/x-gzip}
|
||||
@@ -1331,7 +1331,7 @@ tasks:
|
||||
params:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/unittests/${build_id}.tgz
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/unittests/${build_id}.tgz
|
||||
bucket: mciuploads
|
||||
extract_to: src
|
||||
- func: "run tests"
|
||||
@@ -2614,7 +2614,7 @@ tasks:
|
||||
params:
|
||||
aws_key: ${aws_key}
|
||||
aws_secret: ${aws_secret}
|
||||
remote_file: mongodb-mongo-master/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
|
||||
remote_file: mongodb-mongo-v3.4/${build_variant}/${revision}/sources/mongo-src-${build_id}.${ext|tgz}
|
||||
bucket: mciuploads
|
||||
local_file: src/distsrc.${ext|tgz}
|
||||
build_variants: [ linux-64, windows-64-2k8 ]
|
||||
@@ -3087,7 +3087,7 @@ modules:
|
||||
- name: enterprise
|
||||
repo: git@github.com:10gen/mongo-enterprise-modules.git
|
||||
prefix: src/mongo/db/modules
|
||||
branch: master
|
||||
branch: v3.4
|
||||
|
||||
- name: rocksdb
|
||||
repo: git@github.com:mongodb-partners/mongo-rocks.git
|
||||
|
||||
@@ -11,3 +11,17 @@ t.drop();
|
||||
t.save({a: [1, 2]});
|
||||
t.update({$and: [{a: 1}]}, {$set: {'a.$': 5}});
|
||||
assert.eq([5, 2], t.findOne().a);
|
||||
|
||||
// Make sure dollar sign operator with $and is consistent with no $and case
|
||||
t.drop();
|
||||
t.save({a: [1, 2], b: [3, 4]});
|
||||
t.update({a: 1, b: 4}, {$set: {'a.$': 5}});
|
||||
// Probably not what we want here, just trying to make sure $and is consistent
|
||||
assert.eq({a: [1, 5], b: [3, 4]}, t.find({}, {_id: 0}).toArray()[0]);
|
||||
|
||||
// Make sure dollar sign operator with $and is consistent with no $and case
|
||||
t.drop();
|
||||
t.save({a: [1, 2], b: [3, 4]});
|
||||
t.update({a: 1, $and: [{b: 4}]}, {$set: {'a.$': 5}});
|
||||
// Probably not what we want here, just trying to make sure $and is consistent
|
||||
assert.eq({a: [1, 5], b: [3, 4]}, t.find({}, {_id: 0}).toArray()[0]);
|
||||
|
||||
@@ -214,14 +214,6 @@ assert.eq(
|
||||
t.find({group: 10}, {_id: 0, x: {$elemMatch: {a: 1}}, y: {$elemMatch: {c: 3}}}).toArray()[0],
|
||||
"multiple $elemMatch on unique fields 1");
|
||||
|
||||
assert.eq({"x": [{"y": [{"a": 1, "b": 2}, {"a": 3, "b": 4}]}]},
|
||||
t.find({group: 8}, {_id: 0, x: {$elemMatch: {y: {$elemMatch: {a: 3}}}}}).toArray()[0],
|
||||
"nested $elemMatch");
|
||||
|
||||
assert.throws(function() {
|
||||
t.find({group: 3, 'x.a': 1}, {'x.$': 1, y: {$elemMatch: {aa: 1}}}).toArray();
|
||||
}, [], "throw on positional operator with $elemMatch");
|
||||
|
||||
if (false) {
|
||||
assert.eq(2, // SERVER-1243: handle multiple $elemMatch results
|
||||
t.find({group: 4}, {x: {$elemMatchAll: {a: {$lte: 2}}}}).toArray()[0].x.length,
|
||||
@@ -254,3 +246,29 @@ a = t.find({group: 3}, {x: {$elemMatch: {a: 1}}}).batchSize(1);
|
||||
while (a.hasNext()) {
|
||||
assert.eq(1, a.next().x[0].a, "positional getMore test");
|
||||
}
|
||||
|
||||
// verify the positional update operator matches the same element as the the positional find. this
|
||||
// is to ensure consistent behavior with updates until SERVER-1013 is resolved, at which point the
|
||||
// following tests should be updated.
|
||||
|
||||
t.update({group: 10, 'x.a': 3, 'y.c': 1}, {$set: {'x.$': 100}}, false, true);
|
||||
// updated the wrong element, so the following assertions should be true
|
||||
assert.eq(100,
|
||||
t.find({group: 10, 'y.c': 1, x: 100}, {'x.$': 1}).toArray()[0].x[0],
|
||||
"wrong single element match after update");
|
||||
|
||||
assert.eq(100,
|
||||
t.find({group: 10, x: 100, 'y.c': 1}, {'x.$': 1}).toArray()[0].x[0],
|
||||
"wrong single element match after update");
|
||||
|
||||
t.remove({group: 10});
|
||||
t.insert({group: 10, x: [{a: 1, b: 2}, {a: 3, b: 4}], y: [{c: 1, d: 2}, {c: 3, d: 4}]});
|
||||
|
||||
t.update({group: 10, 'y.c': 1, 'x.a': 3}, {$set: {'x.$': 100}}, false, true);
|
||||
// updated the correct element
|
||||
assert.eq(100,
|
||||
t.find({group: 10, 'y.c': 1, x: 100}, {'x.$': 1}).toArray()[0].x[0],
|
||||
"right single element match after update");
|
||||
assert.eq(100,
|
||||
t.find({group: 10, x: 100, 'y.c': 1}, {'x.$': 1}).toArray()[0].x[0],
|
||||
"right single element match after update");
|
||||
|
||||
@@ -11,60 +11,45 @@ t.drop();
|
||||
|
||||
// The collection is empty, forcing an upsert. In this case the query has no array position match
|
||||
// to substiture for the positional operator. SERVER-4713
|
||||
assert.writeError(t.update({}, {$set: {'a.$.b': 1}}, true));
|
||||
res = t.update({}, {$set: {'a.$.b': 1}}, true);
|
||||
assert(res.hasWriteError(), "An error is reported.");
|
||||
assert.eq(0, t.count(), "No upsert occurred.");
|
||||
|
||||
// Save a document to the collection so it is no longer empty.
|
||||
assert.writeOK(t.save({_id: 0}));
|
||||
t.save({_id: 0});
|
||||
|
||||
// Now, with an existing document, trigger an update rather than an upsert. The query has no array
|
||||
// position match to substiture for the positional operator. SERVER-6669
|
||||
assert.writeError(t.update({}, {$set: {'a.$.b': 1}}));
|
||||
res = t.update({}, {$set: {'a.$.b': 1}});
|
||||
assert(res.hasWriteError(), "An error is reported.");
|
||||
assert.eq([{_id: 0}], t.find().toArray(), "No update occurred.");
|
||||
|
||||
// Now, try with an update by _id (without a query array match).
|
||||
assert.writeError(t.update({_id: 0}, {$set: {'a.$.b': 1}}));
|
||||
res = t.update({_id: 0}, {$set: {'a.$.b': 1}});
|
||||
assert(res.hasWriteError(), "An error is reported.");
|
||||
assert.eq([{_id: 0}], t.find().toArray(), "No update occurred.");
|
||||
|
||||
// Seed the collection with a document suitable for the following check.
|
||||
assert.writeOK(t.remove({}));
|
||||
assert.writeOK(t.save({_id: 0, a: [{b: {c: 1}}]}));
|
||||
t.remove({});
|
||||
t.save({_id: 0, a: [{b: {c: 1}}]});
|
||||
|
||||
// Now, attempt to apply an update with two nested positional operators. There is a positional
|
||||
// query match for the first positional operator but not the second. Note that dollar sign
|
||||
// substitution for multiple positional opertors is not implemented (SERVER-831).
|
||||
assert.writeError(t.update({'a.b.c': 1}, {$set: {'a.$.b.$.c': 2}}));
|
||||
res = t.update({'a.b.c': 1}, {$set: {'a.$.b.$.c': 2}});
|
||||
assert(res.hasWriteError(), "An error is reported");
|
||||
assert.eq([{_id: 0, a: [{b: {c: 1}}]}], t.find().toArray(), "No update occurred.");
|
||||
|
||||
// SERVER-1155 test an update with the positional operator
|
||||
// that has a regex in the query field
|
||||
t.drop();
|
||||
assert.writeOK(t.insert({_id: 1, arr: [{a: "z", b: 1}]}));
|
||||
assert.writeOK(t.update({"arr.a": /^z$/}, {$set: {"arr.$.b": 2}}, false, true));
|
||||
t.insert({_id: 1, arr: [{a: "z", b: 1}]});
|
||||
res = t.update({"arr.a": /^z$/}, {$set: {"arr.$.b": 2}}, false, true);
|
||||
assert.writeOK(res);
|
||||
assert.eq(t.findOne().arr[0], {a: "z", b: 2});
|
||||
|
||||
t.drop();
|
||||
assert.writeOK(t.insert({_id: 1, arr: [{a: "z", b: 1}, {a: "abc", b: 2}, {a: "lmn", b: 3}]}));
|
||||
assert.writeOK(t.update({"arr.a": /l/}, {$inc: {"arr.$.b": 2}}, false, true));
|
||||
t.insert({_id: 1, arr: [{a: "z", b: 1}, {a: "abc", b: 2}, {a: "lmn", b: 3}]});
|
||||
res = t.update({"arr.a": /l/}, {$inc: {"arr.$.b": 2}}, false, true);
|
||||
assert.writeOK(res);
|
||||
assert.eq(t.findOne().arr[2], {a: "lmn", b: 5});
|
||||
|
||||
// Test updates with ambiguous positional operator.
|
||||
t.drop();
|
||||
assert.writeOK(t.insert({_id: 0, a: [1, 2]}));
|
||||
assert.writeError(t.update({$and: [{a: 1}, {a: 2}]}, {$set: {'a.$': 5}}));
|
||||
assert.eq([{_id: 0, a: [1, 2]}], t.find().toArray(), "No update occurred.");
|
||||
|
||||
t.drop();
|
||||
assert.writeOK(t.insert({_id: 0, a: [1], b: [2]}));
|
||||
assert.writeError(t.update({a: 1, b: 2}, {$set: {'a.$': 5}}));
|
||||
assert.eq([{_id: 0, a: [1], b: [2]}], t.find().toArray(), "No update occurred.");
|
||||
|
||||
t.drop();
|
||||
assert.writeOK(t.insert({_id: 0, a: [1], b: [2]}));
|
||||
assert.writeError(t.update({a: {$elemMatch: {$lt: 2}}, b: 2}, {$set: {'a.$': 5}}));
|
||||
assert.eq([{_id: 0, a: [1], b: [2]}], t.find().toArray(), "No update occurred.");
|
||||
|
||||
t.drop();
|
||||
assert.writeOK(t.insert({_id: 0, a: [{b: 1}, {c: 2}]}));
|
||||
assert.writeError(t.update({'a.b': 1, 'a.c': 2}, {$set: {'a.$': 5}}));
|
||||
assert.eq([{_id: 0, a: [{b: 1}, {c: 2}]}], t.find().toArray(), "No update occurred.");
|
||||
@@ -31,8 +31,15 @@
|
||||
const slave = rt.start(false);
|
||||
const slaveDB = slave.getDB("test");
|
||||
|
||||
// Perform a w=2 write to ensure that slave can be read from, and initial sync is complete.
|
||||
assert.writeOK(masterDB.coll.insert({}, {writeConcern: {w: 2}}));
|
||||
// Wait for the slave to sync the collections.
|
||||
assert.soon(function() {
|
||||
var res = slaveDB.runCommand({listCollections: 1, filter: {name: "collV2"}});
|
||||
return res.cursor.firstBatch.length === 1;
|
||||
}, "Collection with v:2 _id index failed to sync on slave");
|
||||
assert.soon(function() {
|
||||
var res = slaveDB.runCommand({listCollections: 1, filter: {name: "collV1"}});
|
||||
return res.cursor.firstBatch.length === 1;
|
||||
}, "Collection with v:1 _id index failed to sync on slave");
|
||||
|
||||
// Check _id index versions on slave.
|
||||
spec = GetIndexHelpers.findByName(slaveDB.collV2.getIndexes(), "_id_");
|
||||
|
||||
120
jstests/replsets/write_concern_after_stepdown.js
Normal file
120
jstests/replsets/write_concern_after_stepdown.js
Normal file
@@ -0,0 +1,120 @@
|
||||
/*
|
||||
* Tests that heartbeats containing writes from a different branch of history can't cause a stale
|
||||
* primary to incorrectly acknowledge a w:majority write that's about to be rolled back.
|
||||
*/
|
||||
(function() {
|
||||
'use strict';
|
||||
|
||||
var name = "writeConcernStepDownAndBackUp";
|
||||
var dbName = "wMajorityCheck";
|
||||
var collName = "stepdownAndBackUp";
|
||||
|
||||
var rst = new ReplSetTest({
|
||||
name: name,
|
||||
nodes: [
|
||||
{},
|
||||
{},
|
||||
{rsConfig: {priority: 0}},
|
||||
],
|
||||
useBridge: true
|
||||
});
|
||||
var nodes = rst.startSet();
|
||||
rst.initiate();
|
||||
|
||||
function waitForState(node, state) {
|
||||
assert.soonNoExcept(function() {
|
||||
assert.commandWorked(node.adminCommand(
|
||||
{replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
function waitForPrimary(node) {
|
||||
assert.soon(function() {
|
||||
return node.adminCommand('ismaster').ismaster;
|
||||
});
|
||||
}
|
||||
|
||||
function stepUp(node) {
|
||||
var primary = rst.getPrimary();
|
||||
if (primary != node) {
|
||||
assert.throws(function() {
|
||||
primary.adminCommand({replSetStepDown: 60 * 5});
|
||||
});
|
||||
}
|
||||
waitForPrimary(node);
|
||||
}
|
||||
|
||||
jsTestLog("Make sure node 0 is primary.");
|
||||
stepUp(nodes[0]);
|
||||
var primary = rst.getPrimary();
|
||||
var secondaries = rst.getSecondaries();
|
||||
assert.eq(nodes[0], primary);
|
||||
// Wait for all data bearing nodes to get up to date.
|
||||
assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
|
||||
{a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
|
||||
|
||||
// Stop the secondaries from replicating.
|
||||
secondaries.forEach(function(node) {
|
||||
assert.commandWorked(
|
||||
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
|
||||
});
|
||||
// Stop the primary from being able to complete stepping down.
|
||||
assert.commandWorked(
|
||||
nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'}));
|
||||
|
||||
jsTestLog("Do w:majority write that will block waiting for replication.");
|
||||
var doMajorityWrite = function() {
|
||||
var res = db.getSiblingDB('wMajorityCheck').stepdownAndBackUp.insert({a: 2}, {
|
||||
writeConcern: {w: 'majority'}
|
||||
});
|
||||
assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown);
|
||||
};
|
||||
|
||||
var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
|
||||
|
||||
jsTest.log("Disconnect primary from all secondaries");
|
||||
nodes[0].disconnect(nodes[1]);
|
||||
nodes[0].disconnect(nodes[2]);
|
||||
|
||||
jsTest.log("Wait for a new primary to be elected");
|
||||
// Allow the secondaries to replicate again.
|
||||
secondaries.forEach(function(node) {
|
||||
assert.commandWorked(
|
||||
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
|
||||
});
|
||||
|
||||
waitForPrimary(nodes[1]);
|
||||
|
||||
jsTest.log("Do a write to the new primary");
|
||||
assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
|
||||
{a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
|
||||
|
||||
jsTest.log("Reconnect the old primary to the rest of the nodes");
|
||||
// Only allow the old primary to connect to the other nodes, not the other way around.
|
||||
// This is so that the old priamry will detect that it needs to step down and step itself down,
|
||||
// rather than one of the other nodes detecting this and sending it a replSetStepDown command,
|
||||
// which would cause the old primary to kill all operations and close all connections, making
|
||||
// the way that the insert in the parallel shell fails be nondeterministic. Rather than
|
||||
// handling all possible failure modes in the parallel shell, allowing heartbeat connectivity in
|
||||
// only one direction makes it easier for the test to fail deterministically.
|
||||
nodes[1].acceptConnectionsFrom(nodes[0]);
|
||||
nodes[2].acceptConnectionsFrom(nodes[0]);
|
||||
|
||||
joinMajorityWriter();
|
||||
|
||||
// Allow the old primary to finish stepping down so that shutdown can finish.
|
||||
var res = null;
|
||||
try {
|
||||
res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'});
|
||||
} catch (e) {
|
||||
// Expected - once we disable the fail point the stepdown will proceed and it's racy whether
|
||||
// the stepdown closes all connections before or after the configureFailPoint command
|
||||
// returns
|
||||
}
|
||||
if (res) {
|
||||
assert.commandWorked(res);
|
||||
}
|
||||
|
||||
rst.stopSet();
|
||||
}());
|
||||
134
jstests/replsets/write_concern_after_stepdown_and_stepup.js
Normal file
134
jstests/replsets/write_concern_after_stepdown_and_stepup.js
Normal file
@@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Tests that heartbeats containing writes from a different branch of history can't cause a stale
|
||||
* primary to incorrectly acknowledge a w:majority write that's about to be rolled back, even if the
|
||||
* stale primary is re-elected primary before waiting for the write concern acknowledgement.
|
||||
*/
|
||||
(function() {
|
||||
'use strict';
|
||||
|
||||
var name = "writeConcernStepDownAndBackUp";
|
||||
var dbName = "wMajorityCheck";
|
||||
var collName = "stepdownAndBackUp";
|
||||
|
||||
var rst = new ReplSetTest({
|
||||
name: name,
|
||||
nodes: [
|
||||
{},
|
||||
{},
|
||||
{rsConfig: {priority: 0}},
|
||||
],
|
||||
useBridge: true
|
||||
});
|
||||
var nodes = rst.startSet();
|
||||
rst.initiate();
|
||||
|
||||
function waitForState(node, state) {
|
||||
assert.soonNoExcept(function() {
|
||||
assert.commandWorked(node.adminCommand(
|
||||
{replSetTest: 1, waitForMemberState: state, timeoutMillis: rst.kDefaultTimeoutMS}));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
function waitForPrimary(node) {
|
||||
assert.soon(function() {
|
||||
return node.adminCommand('ismaster').ismaster;
|
||||
});
|
||||
}
|
||||
|
||||
function stepUp(node) {
|
||||
var primary = rst.getPrimary();
|
||||
if (primary != node) {
|
||||
assert.throws(function() {
|
||||
primary.adminCommand({replSetStepDown: 60 * 5});
|
||||
});
|
||||
}
|
||||
waitForPrimary(node);
|
||||
}
|
||||
|
||||
jsTestLog("Make sure node 0 is primary.");
|
||||
stepUp(nodes[0]);
|
||||
var primary = rst.getPrimary();
|
||||
var secondaries = rst.getSecondaries();
|
||||
assert.eq(nodes[0], primary);
|
||||
// Wait for all data bearing nodes to get up to date.
|
||||
assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
|
||||
{a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
|
||||
|
||||
// Stop the secondaries from replicating.
|
||||
secondaries.forEach(function(node) {
|
||||
assert.commandWorked(
|
||||
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
|
||||
});
|
||||
// Stop the primary from calling into awaitReplication()
|
||||
assert.commandWorked(nodes[0].adminCommand(
|
||||
{configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'alwaysOn'}));
|
||||
|
||||
jsTestLog("Do w:majority write that won't enter awaitReplication() until after the primary " +
|
||||
"has stepped down and back up");
|
||||
var doMajorityWrite = function() {
|
||||
// Run ismaster command with 'hangUpOnStepDown' set to false to mark this connection as
|
||||
// one that shouldn't be closed when the node steps down. This simulates the scenario where
|
||||
// the write was coming from a mongos.
|
||||
assert.commandWorked(db.adminCommand({ismaster: 1, hangUpOnStepDown: false}));
|
||||
|
||||
var res = db.getSiblingDB('wMajorityCheck').stepdownAndBackUp.insert({a: 2}, {
|
||||
writeConcern: {w: 'majority'}
|
||||
});
|
||||
assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown);
|
||||
};
|
||||
|
||||
var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
|
||||
|
||||
jsTest.log("Disconnect primary from all secondaries");
|
||||
nodes[0].disconnect(nodes[1]);
|
||||
nodes[0].disconnect(nodes[2]);
|
||||
|
||||
jsTest.log("Wait for a new primary to be elected");
|
||||
// Allow the secondaries to replicate again.
|
||||
secondaries.forEach(function(node) {
|
||||
assert.commandWorked(
|
||||
node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
|
||||
});
|
||||
|
||||
waitForPrimary(nodes[1]);
|
||||
|
||||
jsTest.log("Do a write to the new primary");
|
||||
assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
|
||||
{a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
|
||||
|
||||
jsTest.log("Reconnect the old primary to the rest of the nodes");
|
||||
nodes[0].reconnect(nodes[1]);
|
||||
nodes[0].reconnect(nodes[2]);
|
||||
|
||||
jsTest.log("Wait for the old primary to step down, roll back its write, and apply the " +
|
||||
"new writes from the new primary");
|
||||
waitForState(nodes[0], ReplSetTest.State.SECONDARY);
|
||||
rst.awaitReplication();
|
||||
|
||||
// At this point all 3 nodes should have the same data
|
||||
assert.soonNoExcept(function() {
|
||||
nodes.forEach(function(node) {
|
||||
assert.eq(null,
|
||||
node.getDB(dbName).getCollection(collName).findOne({a: 2}),
|
||||
"Node " + node.host + " contained op that should have been rolled back");
|
||||
assert.neq(null,
|
||||
node.getDB(dbName).getCollection(collName).findOne({a: 3}),
|
||||
"Node " + node.host +
|
||||
" was missing op from branch of history that should have persisted");
|
||||
});
|
||||
return true;
|
||||
});
|
||||
|
||||
jsTest.log("Make the original primary become primary once again");
|
||||
stepUp(nodes[0]);
|
||||
|
||||
jsTest.log("Unblock the thread waiting for replication of the now rolled-back write, ensure " +
|
||||
"that the write concern failed");
|
||||
assert.commandWorked(nodes[0].adminCommand(
|
||||
{configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'off'}));
|
||||
|
||||
joinMajorityWriter();
|
||||
|
||||
rst.stopSet();
|
||||
}());
|
||||
@@ -608,14 +608,16 @@ namespace {
|
||||
unsigned long long _collectionCount(OperationContext* txn,
|
||||
const string& ns,
|
||||
bool callerHoldsGlobalLock) {
|
||||
Collection* coll;
|
||||
Collection* coll = nullptr;
|
||||
boost::optional<AutoGetCollectionForRead> ctx;
|
||||
|
||||
// If the global write lock is held, we must avoid using AutoGetCollectionForRead as it may lead
|
||||
// to deadlock when waiting for a majority snapshot to be committed. See SERVER-24596.
|
||||
if (callerHoldsGlobalLock) {
|
||||
Database* db = dbHolder().get(txn, ns);
|
||||
coll = db->getCollection(ns);
|
||||
if (db) {
|
||||
coll = db->getCollection(ns);
|
||||
}
|
||||
} else {
|
||||
ctx.emplace(txn, NamespaceString(ns));
|
||||
coll = ctx->getCollection();
|
||||
|
||||
@@ -267,13 +267,6 @@ Status ProjectionExec::transform(WorkingSetMember* member) const {
|
||||
matchDetails.requestElemMatchKey();
|
||||
verify(NULL != _queryExpression);
|
||||
verify(_queryExpression->matchesBSON(member->obj.value(), &matchDetails));
|
||||
|
||||
// Performing a positional projection requires valid MatchDetails. For example,
|
||||
// ambiguity caused by multiple implicit array traversal predicates can lead to invalid
|
||||
// match details.
|
||||
if (!matchDetails.isValid()) {
|
||||
return Status(ErrorCodes::InternalError, "ambiguous positional projection");
|
||||
}
|
||||
}
|
||||
|
||||
Status projStatus = transform(member->obj.value(), &bob, &matchDetails);
|
||||
@@ -406,10 +399,6 @@ Status ProjectionExec::transform(const BSONObj& in,
|
||||
arrayDetails.requestElemMatchKey();
|
||||
|
||||
if (matcher->second->matchesBSON(in, &arrayDetails)) {
|
||||
// Since we create a special matcher for each $elemMatch projection, we should always
|
||||
// have valid MatchDetails.
|
||||
invariant(arrayDetails.isValid());
|
||||
|
||||
FieldMap::const_iterator fieldIt = _fields.find(elt.fieldName());
|
||||
if (_fields.end() == fieldIt) {
|
||||
return Status(ErrorCodes::BadValue,
|
||||
|
||||
@@ -189,12 +189,6 @@ TEST(ProjectionExecTest, TransformPositionalDollar) {
|
||||
|
||||
// Invalid position $ projections.
|
||||
testTransform("{'a.$': 1}", "{a: {$size: 1}}", "{a: [5]}", false, "");
|
||||
|
||||
// Ambigous position $ projections.
|
||||
testTransform("{'a.$': 1}", "{$and: [{a: 1}, {a: 2}]}", "{a: [1, 2]}", false, "");
|
||||
testTransform("{'a.$': 1}", "{a: 1, b: 2}", "{a: [1], b: [2]}", false, "");
|
||||
testTransform("{'a.$': 1}", "{a: {$elemMatch: {$lt: 2}}, b: 2}", "{a: [1], b: [2]}", false, "");
|
||||
testTransform("{'a.$': 1}", "{'a.b': 1, 'a.c': 2}", "{a: [{b: 1}, {c: 2}]}", false, "");
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -508,10 +508,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
|
||||
dassert(cq);
|
||||
verify(cq->root()->matchesBSON(oldObj.value(), &matchDetails));
|
||||
|
||||
// If we have matched more than one array position, we cannot perform a positional update
|
||||
// operation.
|
||||
uassert(34412, "ambiguous positional update operation", matchDetails.isValid());
|
||||
|
||||
string matchedField;
|
||||
if (matchDetails.hasElemMatchKey())
|
||||
matchedField = matchDetails.elemMatchKey();
|
||||
|
||||
@@ -189,16 +189,12 @@ public:
|
||||
// XXX document
|
||||
virtual bool equivalent(const MatchExpression* other) const = 0;
|
||||
|
||||
/**
|
||||
* Determine if a document satisfies the tree-predicate.
|
||||
*
|
||||
* The caller may optionally provide a non-null MatchDetails as an out-parameter. For matching
|
||||
*documents, the MatchDetails provide further info on how the document was
|
||||
*matched---specifically, which array element matched an array predicate.
|
||||
*
|
||||
* The caller must check that the MatchDetails is valid via the isValid() method before using.
|
||||
*/
|
||||
//
|
||||
// Determine if a document satisfies the tree-predicate.
|
||||
//
|
||||
|
||||
virtual bool matches(const MatchableDocument* doc, MatchDetails* details = 0) const = 0;
|
||||
|
||||
virtual bool matchesBSON(const BSONObj& doc, MatchDetails* details = 0) const;
|
||||
|
||||
/**
|
||||
|
||||
@@ -38,7 +38,7 @@ namespace mongo {
|
||||
|
||||
using std::string;
|
||||
|
||||
MatchDetails::MatchDetails() : _elemMatchKeyRequested(false), _isValid(true) {
|
||||
MatchDetails::MatchDetails() : _elemMatchKeyRequested() {
|
||||
resetOutput();
|
||||
}
|
||||
|
||||
@@ -58,9 +58,6 @@ std::string MatchDetails::elemMatchKey() const {
|
||||
|
||||
void MatchDetails::setElemMatchKey(const std::string& elemMatchKey) {
|
||||
if (_elemMatchKeyRequested) {
|
||||
if (_elemMatchKey) {
|
||||
_isValid = false;
|
||||
}
|
||||
_elemMatchKey.reset(new std::string(elemMatchKey));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,14 +73,9 @@ public:
|
||||
|
||||
void setElemMatchKey(const std::string& elemMatchKey);
|
||||
|
||||
bool isValid() const {
|
||||
return _isValid;
|
||||
}
|
||||
|
||||
private:
|
||||
bool _loadedRecord;
|
||||
bool _elemMatchKeyRequested;
|
||||
bool _isValid;
|
||||
std::unique_ptr<std::string> _elemMatchKey;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -921,9 +921,8 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
|
||||
return;
|
||||
}
|
||||
invariant(!_isCatchingUp);
|
||||
invariant(!_canAcceptNonLocalWrites);
|
||||
_isWaitingForDrainToComplete = false;
|
||||
_drainFinishedCond_forTest.notify_all();
|
||||
_drainFinishedCond.notify_all();
|
||||
|
||||
if (!_getMemberState_inlock().primary()) {
|
||||
// We must have decided not to transition to primary while waiting for the applier to drain.
|
||||
@@ -931,6 +930,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
|
||||
return;
|
||||
}
|
||||
|
||||
invariant(!_canAcceptNonLocalWrites);
|
||||
_canAcceptNonLocalWrites = true;
|
||||
|
||||
lk.unlock();
|
||||
@@ -952,7 +952,7 @@ Status ReplicationCoordinatorImpl::waitForDrainFinish(Milliseconds timeout) {
|
||||
|
||||
stdx::unique_lock<stdx::mutex> lk(_mutex);
|
||||
auto pred = [this]() { return !_isCatchingUp && !_isWaitingForDrainToComplete; };
|
||||
if (!_drainFinishedCond_forTest.wait_for(lk, timeout.toSystemDuration(), pred)) {
|
||||
if (!_drainFinishedCond.wait_for(lk, timeout.toSystemDuration(), pred)) {
|
||||
return Status(ErrorCodes::ExceededTimeLimit,
|
||||
"Timed out waiting to finish draining applier buffer");
|
||||
}
|
||||
@@ -1617,9 +1617,35 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (replMode == modeReplSet && !_memberState.primary()) {
|
||||
return {ErrorCodes::PrimarySteppedDown,
|
||||
"Primary stepped down while waiting for replication"};
|
||||
auto checkForStepDown = [&]() -> Status {
|
||||
if (replMode == modeReplSet && !_memberState.primary()) {
|
||||
return {ErrorCodes::PrimarySteppedDown,
|
||||
"Primary stepped down while waiting for replication"};
|
||||
}
|
||||
|
||||
if (opTime.getTerm() != _cachedTerm) {
|
||||
return {
|
||||
ErrorCodes::PrimarySteppedDown,
|
||||
str::stream() << "Term changed from " << opTime.getTerm() << " to " << _cachedTerm
|
||||
<< " while waiting for replication, indicating that this node must "
|
||||
"have stepped down."};
|
||||
}
|
||||
|
||||
if (_stepDownPending) {
|
||||
return {ErrorCodes::PrimarySteppedDown,
|
||||
"Received stepdown request while waiting for replication"};
|
||||
}
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
Status stepdownStatus = checkForStepDown();
|
||||
if (!stepdownStatus.isOK()) {
|
||||
return stepdownStatus;
|
||||
}
|
||||
|
||||
auto interruptStatus = txn->checkForInterruptNoAssert();
|
||||
if (!interruptStatus.isOK()) {
|
||||
return interruptStatus;
|
||||
}
|
||||
|
||||
if (writeConcern.wMode.empty()) {
|
||||
@@ -1647,10 +1673,6 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
|
||||
WaiterInfoGuard waitInfo(
|
||||
&_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar);
|
||||
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
|
||||
if (replMode == modeReplSet && !_getMemberState_inlock().primary()) {
|
||||
return {ErrorCodes::PrimarySteppedDown,
|
||||
"Not primary anymore while waiting for replication - primary stepped down"};
|
||||
}
|
||||
|
||||
if (_inShutdown) {
|
||||
return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
|
||||
@@ -1672,6 +1694,11 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
|
||||
}
|
||||
return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
|
||||
}
|
||||
|
||||
stepdownStatus = checkForStepDown();
|
||||
if (!stepdownStatus.isOK()) {
|
||||
return stepdownStatus;
|
||||
}
|
||||
}
|
||||
|
||||
return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
|
||||
@@ -2516,11 +2543,10 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
|
||||
_replicationWaiterList.signalAndRemoveAll_inlock();
|
||||
// Wake up the optime waiter that is waiting for primary catch-up to finish.
|
||||
_opTimeWaiterList.signalAndRemoveAll_inlock();
|
||||
// Clean up primary states.
|
||||
// _isCatchingUp and _isWaitingForDrainToComplete could be cleaned up asynchronously
|
||||
// by freshness scan.
|
||||
_canAcceptNonLocalWrites = false;
|
||||
_isCatchingUp = false;
|
||||
_isWaitingForDrainToComplete = false;
|
||||
_drainFinishedCond_forTest.notify_all();
|
||||
_stepDownPending = false;
|
||||
serverGlobalParams.featureCompatibility.validateFeaturesAsMaster.store(false);
|
||||
result = kActionCloseAllConnections;
|
||||
} else {
|
||||
@@ -2654,6 +2680,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
|
||||
auto evhStatus =
|
||||
scanner->start(&_replExecutor, _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
|
||||
if (evhStatus == ErrorCodes::ShutdownInProgress) {
|
||||
_finishCatchUpOplog_inlock(true);
|
||||
return;
|
||||
}
|
||||
fassertStatusOK(40254, evhStatus.getStatus());
|
||||
@@ -2662,7 +2689,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
|
||||
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
|
||||
LockGuard lk(_mutex);
|
||||
if (cbData.status == ErrorCodes::CallbackCanceled) {
|
||||
_finishCatchUpOplog_inlock(false);
|
||||
_finishCatchUpOplog_inlock(true);
|
||||
return;
|
||||
}
|
||||
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
|
||||
@@ -2729,11 +2756,10 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_finishCatchUpOplog_inlock(bool startToDrain) {
|
||||
invariant(_isCatchingUp);
|
||||
_isCatchingUp = false;
|
||||
// If the node steps down during the catch-up, we don't go into drain mode.
|
||||
if (startToDrain) {
|
||||
invariant(_getMemberState_inlock().primary());
|
||||
invariant(!_canAcceptNonLocalWrites);
|
||||
invariant(!_isWaitingForDrainToComplete);
|
||||
_isWaitingForDrainToComplete = true;
|
||||
// Signal applier in executor to avoid the deadlock with bgsync's mutex that is required to
|
||||
@@ -3367,7 +3393,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback(
|
||||
if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
|
||||
log() << "stepping down from primary, because a new term has begun: " << term;
|
||||
_topCoord->prepareForStepDown();
|
||||
return _stepDownStart();
|
||||
return _stepDownStart(false);
|
||||
}
|
||||
return EventHandle();
|
||||
}
|
||||
|
||||
@@ -910,7 +910,10 @@ private:
|
||||
*/
|
||||
void _requestRemotePrimaryStepdown(const HostAndPort& target);
|
||||
|
||||
ReplicationExecutor::EventHandle _stepDownStart();
|
||||
/**
|
||||
* Schedules stepdown to run with the global exclusive lock.
|
||||
*/
|
||||
ReplicationExecutor::EventHandle _stepDownStart(bool hasMutex);
|
||||
|
||||
/**
|
||||
* Completes a step-down of the current node. Must be run with a global
|
||||
@@ -949,9 +952,11 @@ private:
|
||||
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
|
||||
* returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
|
||||
* value of "responseStatus".
|
||||
* 'hasMutex' is true if the caller is holding _mutex. TODO(SERVER-27083): Remove this.
|
||||
*/
|
||||
void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action,
|
||||
const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
|
||||
const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
|
||||
bool hasMutex);
|
||||
|
||||
/**
|
||||
* Scan the SlaveInfoVector and determine the highest OplogEntry present on a majority of
|
||||
@@ -1180,6 +1185,14 @@ private:
|
||||
// TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
|
||||
int _rbid; // (M)
|
||||
|
||||
// Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat)
|
||||
// TODO(SERVER-27083): This bool is redundant of the same-named bool in TopologyCoordinatorImpl,
|
||||
// but due to mutex ordering between _mutex and _topoMutex we can't inspect the
|
||||
// TopologyCoordinator field in awaitReplication() where this bool is used. Once we get rid
|
||||
// of topoMutex and start guarding access to the TopologyCoordinator via _mutex we should
|
||||
// consolidate the two bools.
|
||||
bool _stepDownPending = false; // (M)
|
||||
|
||||
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
|
||||
WaiterList _replicationWaiterList; // (M)
|
||||
|
||||
@@ -1209,8 +1222,8 @@ private:
|
||||
// Current ReplicaSet state.
|
||||
MemberState _memberState; // (MX)
|
||||
|
||||
// Used to signal threads waiting for changes to _memberState. Only used in testing.
|
||||
stdx::condition_variable _drainFinishedCond_forTest; // (M)
|
||||
// Used to signal threads waiting for changes to _memberState.
|
||||
stdx::condition_variable _drainFinishedCond; // (M)
|
||||
|
||||
// True if we are waiting for the applier to finish draining.
|
||||
bool _isWaitingForDrainToComplete; // (M)
|
||||
|
||||
@@ -1348,12 +1348,11 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringFreshnessScan) {
|
||||
ASSERT_TRUE(evh.isValid());
|
||||
getReplExec()->waitForEvent(evh);
|
||||
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
|
||||
ASSERT_FALSE(getReplCoord()->isCatchingUp());
|
||||
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
|
||||
auto net = getNet();
|
||||
net->enterNetwork();
|
||||
net->runUntil(net->now() + config.getCatchUpTimeoutPeriod());
|
||||
net->exitNetwork();
|
||||
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(1, countLogLinesContaining("Stopped transition to primary"));
|
||||
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
|
||||
@@ -1378,12 +1377,15 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
|
||||
ASSERT_TRUE(evh.isValid());
|
||||
getReplExec()->waitForEvent(evh);
|
||||
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
|
||||
ASSERT_FALSE(getReplCoord()->isCatchingUp());
|
||||
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
|
||||
auto net = getNet();
|
||||
net->enterNetwork();
|
||||
net->runReadyNetworkOperations();
|
||||
net->exitNetwork();
|
||||
auto txn = makeOperationContext();
|
||||
// Simulate bgsync signaling replCoord to exit drain mode.
|
||||
// At this point, we see the stepdown and reset the states.
|
||||
getReplCoord()->signalDrainComplete(txn.get());
|
||||
ASSERT_FALSE(getReplCoord()->isWaitingForApplierToDrain());
|
||||
stopCapturingLogMessages();
|
||||
ASSERT_EQUALS(1, countLogLinesContaining("Cannot catch up oplog after becoming primary"));
|
||||
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase("test"));
|
||||
|
||||
@@ -66,6 +66,8 @@ using CBHandle = ReplicationExecutor::CallbackHandle;
|
||||
using CBHStatus = StatusWith<CBHandle>;
|
||||
using LockGuard = stdx::lock_guard<stdx::mutex>;
|
||||
|
||||
MONGO_FP_DECLARE(blockHeartbeatStepdown);
|
||||
|
||||
} // namespace
|
||||
|
||||
using executor::RemoteCommandRequest;
|
||||
@@ -213,7 +215,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
|
||||
_scheduleHeartbeatToTarget(
|
||||
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
|
||||
|
||||
_handleHeartbeatResponseAction(action, hbStatusResponse);
|
||||
_handleHeartbeatResponseAction(action, hbStatusResponse, false /*we're not holding _mutex*/);
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIndex,
|
||||
@@ -233,11 +235,13 @@ void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIn
|
||||
|
||||
void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
|
||||
const HeartbeatResponseAction& action,
|
||||
const StatusWith<ReplSetHeartbeatResponse>& responseStatus) {
|
||||
const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
|
||||
bool hasMutex) {
|
||||
switch (action.getAction()) {
|
||||
case HeartbeatResponseAction::NoAction:
|
||||
// Update the cached member state if different than the current topology member state
|
||||
if (_memberState != _topCoord->getMemberState()) {
|
||||
invariant(!hasMutex);
|
||||
stdx::unique_lock<stdx::mutex> lk(_mutex);
|
||||
const PostMemberStateUpdateAction postUpdateAction =
|
||||
_updateMemberStateFromTopologyCoordinator_inlock();
|
||||
@@ -257,7 +261,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
|
||||
log() << "Stepping down from primary in response to heartbeat";
|
||||
_topCoord->prepareForStepDown();
|
||||
// Don't need to wait for stepdown to finish.
|
||||
_stepDownStart();
|
||||
_stepDownStart(hasMutex);
|
||||
break;
|
||||
case HeartbeatResponseAction::StepDownRemotePrimary: {
|
||||
invariant(action.getPrimaryConfigIndex() != _selfIndex);
|
||||
@@ -311,11 +315,19 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort
|
||||
}
|
||||
}
|
||||
|
||||
ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() {
|
||||
ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart(bool hasMutex) {
|
||||
{
|
||||
boost::optional<stdx::lock_guard<stdx::mutex>> lk;
|
||||
if (!hasMutex) {
|
||||
lk.emplace(_mutex);
|
||||
}
|
||||
_stepDownPending = true;
|
||||
}
|
||||
auto finishEvent = _makeEvent();
|
||||
if (!finishEvent) {
|
||||
return finishEvent;
|
||||
}
|
||||
|
||||
_replExecutor.scheduleWorkWithGlobalExclusiveLock(stdx::bind(
|
||||
&ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent));
|
||||
return finishEvent;
|
||||
@@ -328,6 +340,19 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
|
||||
return;
|
||||
}
|
||||
|
||||
if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) {
|
||||
// Must reschedule rather than block so we don't take up threads in the replication
|
||||
// executor.
|
||||
sleepmillis(10);
|
||||
_replExecutor.scheduleWorkWithGlobalExclusiveLock(
|
||||
stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
|
||||
this,
|
||||
stdx::placeholders::_1,
|
||||
finishedEvent));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
LockGuard topoLock(_topoMutex);
|
||||
|
||||
invariant(cbData.txn);
|
||||
@@ -668,7 +693,9 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout(
|
||||
_topCoord->setMemberAsDown(now, memberIndex, _getMyLastDurableOpTime_inlock());
|
||||
// Don't mind potential asynchronous stepdown as this is the last step of
|
||||
// liveness check.
|
||||
_handleHeartbeatResponseAction(action, makeStatusWith<ReplSetHeartbeatResponse>());
|
||||
_handleHeartbeatResponseAction(action,
|
||||
makeStatusWith<ReplSetHeartbeatResponse>(),
|
||||
true /*we're holding _mutex*/);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@
|
||||
#include "mongo/db/repl/old_update_position_args.h"
|
||||
#include "mongo/db/repl/optime.h"
|
||||
#include "mongo/db/repl/read_concern_args.h"
|
||||
#include "mongo/db/repl/repl_client_info.h"
|
||||
#include "mongo/db/repl/repl_set_heartbeat_args.h"
|
||||
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
|
||||
#include "mongo/db/repl/repl_settings.h"
|
||||
@@ -79,15 +80,15 @@ using executor::RemoteCommandResponse;
|
||||
using unittest::assertGet;
|
||||
|
||||
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
|
||||
// Helper class to wrap Timestamp as an OpTime with term 0.
|
||||
struct OpTimeWithTermZero {
|
||||
OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
|
||||
// Helper class to wrap Timestamp as an OpTime with term 1.
|
||||
struct OpTimeWithTermOne {
|
||||
OpTimeWithTermOne(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
|
||||
operator OpTime() const {
|
||||
return OpTime(timestamp, 0);
|
||||
return OpTime(timestamp, 1);
|
||||
}
|
||||
|
||||
operator boost::optional<OpTime>() const {
|
||||
return OpTime(timestamp, 0);
|
||||
return OpTime(timestamp, 1);
|
||||
}
|
||||
|
||||
OpTime asOpTime() const {
|
||||
@@ -601,7 +602,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstASta
|
||||
init("");
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
OpTimeWithTermZero time(100, 1);
|
||||
OpTimeWithTermOne time(100, 1);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
|
||||
@@ -620,7 +621,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas
|
||||
init(settings);
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
OpTimeWithTermZero time(100, 1);
|
||||
OpTimeWithTermOne time(100, 1);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
|
||||
@@ -654,7 +655,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
|
||||
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
OpTimeWithTermZero time(100, 1);
|
||||
OpTimeWithTermOne time(100, 1);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
|
||||
@@ -667,7 +668,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
|
||||
ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, statusAndDur.status);
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWZero) {
|
||||
TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWTermOne) {
|
||||
assertStartSuccess(BSON("_id"
|
||||
<< "mySet"
|
||||
<< "version"
|
||||
@@ -687,7 +688,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
|
||||
OpTimeWithTermZero time(100, 1);
|
||||
OpTimeWithTermOne time(100, 1);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
|
||||
@@ -696,13 +697,13 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
|
||||
|
||||
// Become primary.
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
ASSERT(getReplCoord()->getMemberState().primary());
|
||||
|
||||
auto txn = makeOperationContext();
|
||||
;
|
||||
|
||||
ReplicationCoordinator::StatusAndDuration statusAndDur =
|
||||
getReplCoord()->awaitReplication(txn.get(), time, writeConcern);
|
||||
ASSERT_OK(statusAndDur.status);
|
||||
@@ -735,12 +736,12 @@ TEST_F(ReplCoordTest,
|
||||
<< 3))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
|
||||
@@ -815,12 +816,12 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
|
||||
<< 3))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
|
||||
@@ -1026,6 +1027,8 @@ TEST_F(
|
||||
// another name if we didn't get a high enough one.
|
||||
}
|
||||
|
||||
auto zeroOpTimeInCurrentTerm = OpTime(Timestamp(0, 0), 1);
|
||||
ReplClientInfo::forClient(txn.get()->getClient()).setLastOp(zeroOpTimeInCurrentTerm);
|
||||
statusAndDur =
|
||||
getReplCoord()->awaitReplicationOfLastOpForClient(txn.get(), majorityWriteConcern);
|
||||
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
|
||||
@@ -1148,14 +1151,14 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
|
||||
@@ -1212,14 +1215,14 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wDeadline = getNet()->now() + Milliseconds(50);
|
||||
@@ -1263,14 +1266,14 @@ TEST_F(ReplCoordTest,
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
|
||||
@@ -1313,15 +1316,15 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
const auto txn = makeOperationContext();
|
||||
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
|
||||
@@ -1355,14 +1358,14 @@ TEST_F(ReplCoordTest,
|
||||
<< "node3"))),
|
||||
HostAndPort("node1"));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
WriteConcernOptions writeConcern;
|
||||
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
|
||||
@@ -1575,7 +1578,7 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
|
||||
TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
|
||||
const auto txn = makeOperationContext();
|
||||
|
||||
OpTimeWithTermZero optime1(100, 1);
|
||||
OpTimeWithTermOne optime1(100, 1);
|
||||
// All nodes are caught up
|
||||
getReplCoord()->setMyLastAppliedOpTime(optime1);
|
||||
getReplCoord()->setMyLastDurableOpTime(optime1);
|
||||
@@ -1589,7 +1592,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
|
||||
|
||||
TEST_F(StepDownTest,
|
||||
NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) {
|
||||
OpTimeWithTermZero optime1(100, 1);
|
||||
OpTimeWithTermOne optime1(100, 1);
|
||||
// All nodes are caught up
|
||||
getReplCoord()->setMyLastAppliedOpTime(optime1);
|
||||
getReplCoord()->setMyLastDurableOpTime(optime1);
|
||||
@@ -1819,8 +1822,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle
|
||||
|
||||
TEST_F(StepDownTest,
|
||||
NodeReturnsExceededTimeLimitWhenNoSecondaryIsCaughtUpWithinStepDownsSecondaryCatchUpPeriod) {
|
||||
OpTimeWithTermZero optime1(100, 1);
|
||||
OpTimeWithTermZero optime2(100, 2);
|
||||
OpTimeWithTermOne optime1(100, 1);
|
||||
OpTimeWithTermOne optime2(100, 2);
|
||||
// No secondary is caught up
|
||||
auto repl = getReplCoord();
|
||||
repl->setMyLastAppliedOpTime(optime2);
|
||||
@@ -1986,8 +1989,8 @@ TEST_F(StepDownTest,
|
||||
}
|
||||
|
||||
TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
|
||||
OpTimeWithTermZero optime1(100, 1);
|
||||
OpTimeWithTermZero optime2(100, 2);
|
||||
OpTimeWithTermOne optime1(100, 1);
|
||||
OpTimeWithTermOne optime2(100, 2);
|
||||
// No secondary is caught up
|
||||
auto repl = getReplCoord();
|
||||
repl->setMyLastAppliedOpTime(optime2);
|
||||
@@ -2142,9 +2145,9 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
|
||||
<< BSON("_id" << 2 << "host"
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test1", 1234));
|
||||
OpTimeWithTermZero optime1(100, 1);
|
||||
OpTimeWithTermZero optime2(100, 2);
|
||||
OpTimeWithTermZero optime3(2, 1);
|
||||
OpTimeWithTermOne optime1(100, 1);
|
||||
OpTimeWithTermOne optime2(100, 2);
|
||||
OpTimeWithTermOne optime3(2, 1);
|
||||
getReplCoord()->setMyLastAppliedOpTime(optime1);
|
||||
getReplCoord()->setMyLastDurableOpTime(optime1);
|
||||
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2));
|
||||
@@ -2177,7 +2180,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
|
||||
ASSERT_EQUALS(optime3.timestamp,
|
||||
entry[OldUpdatePositionArgs::kOpTimeFieldName]["ts"].timestamp());
|
||||
}
|
||||
ASSERT_EQUALS(0, entry[OldUpdatePositionArgs::kOpTimeFieldName]["t"].Number());
|
||||
ASSERT_EQUALS(1, entry[OldUpdatePositionArgs::kOpTimeFieldName]["t"].Number());
|
||||
}
|
||||
ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes
|
||||
}
|
||||
@@ -2200,8 +2203,8 @@ TEST_F(ReplCoordTest,
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test2", 1234));
|
||||
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
|
||||
// Can't unset maintenance mode if it was never set to begin with.
|
||||
Status status = getReplCoord()->setMaintenanceMode(false);
|
||||
@@ -2227,8 +2230,8 @@ TEST_F(ReplCoordTest,
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test2", 1234));
|
||||
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
// valid set
|
||||
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
|
||||
ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
|
||||
@@ -2259,8 +2262,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test2", 1234));
|
||||
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
// Can set multiple times
|
||||
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
|
||||
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
|
||||
@@ -2293,8 +2296,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test2", 1234));
|
||||
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
|
||||
// From rollback, entering and exiting maintenance mode doesn't change perceived
|
||||
// state.
|
||||
@@ -2335,8 +2338,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) {
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test2", 1234));
|
||||
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
// Can't modify maintenance mode when PRIMARY
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
@@ -2374,8 +2377,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection)
|
||||
<< "test3:1234"))),
|
||||
HostAndPort("test2", 1234));
|
||||
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
|
||||
// TODO this election shouldn't have to happen.
|
||||
simulateSuccessfulV1Election();
|
||||
@@ -2439,8 +2442,8 @@ TEST_F(ReplCoordTest,
|
||||
<< BSON("_id" << 2 << "host" << client2Host.toString()))),
|
||||
HostAndPort("node1", 12345));
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
getReplCoord()->setMyLastAppliedOpTime(time2);
|
||||
getReplCoord()->setMyLastDurableOpTime(time2);
|
||||
@@ -2484,8 +2487,8 @@ TEST_F(ReplCoordTest,
|
||||
<< BSON("_id" << 2 << "host" << client2Host.toString()))),
|
||||
HostAndPort("node1", 12345));
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
getReplCoord()->setMyLastAppliedOpTime(time2);
|
||||
getReplCoord()->setMyLastDurableOpTime(time2);
|
||||
@@ -2515,8 +2518,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast
|
||||
|
||||
|
||||
OID client = OID::gen();
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
|
||||
getExternalState()->setClientHostAndPort(clientHost);
|
||||
HandshakeArgs handshake;
|
||||
@@ -2717,12 +2720,12 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTime time1({100, 1}, 2);
|
||||
OpTime time2({100, 2}, 2);
|
||||
OpTime time1({100, 1}, 1);
|
||||
OpTime time2({100, 2}, 1);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time1);
|
||||
getReplCoord()->setMyLastDurableOpTime(time1);
|
||||
|
||||
@@ -2738,18 +2741,17 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
|
||||
|
||||
// receive updatePosition containing ourself, should not process the update for self
|
||||
UpdatePositionArgs args;
|
||||
ASSERT_OK(args.initialize(
|
||||
BSON(UpdatePositionArgs::kCommandFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kUpdateArrayFieldName
|
||||
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
|
||||
<< 2
|
||||
<< UpdatePositionArgs::kMemberIdFieldName
|
||||
<< 0
|
||||
<< UpdatePositionArgs::kDurableOpTimeFieldName
|
||||
<< BSON("ts" << time2.getTimestamp() << "t" << 2)
|
||||
<< UpdatePositionArgs::kAppliedOpTimeFieldName
|
||||
<< BSON("ts" << time2.getTimestamp() << "t" << 2))))));
|
||||
ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kUpdateArrayFieldName
|
||||
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
|
||||
<< 2
|
||||
<< UpdatePositionArgs::kMemberIdFieldName
|
||||
<< 0
|
||||
<< UpdatePositionArgs::kDurableOpTimeFieldName
|
||||
<< time2.toBSON()
|
||||
<< UpdatePositionArgs::kAppliedOpTimeFieldName
|
||||
<< time2.toBSON())))));
|
||||
|
||||
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
|
||||
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
|
||||
@@ -2776,13 +2778,13 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermZero staleTime(10, 0);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
OpTimeWithTermOne staleTime(10, 0);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time1);
|
||||
getReplCoord()->setMyLastDurableOpTime(time1);
|
||||
|
||||
@@ -2833,12 +2835,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTime time1({100, 1}, 3);
|
||||
OpTime time2({100, 2}, 3);
|
||||
OpTime time1({100, 1}, 1);
|
||||
OpTime time2({100, 2}, 1);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time1);
|
||||
getReplCoord()->setMyLastDurableOpTime(time1);
|
||||
|
||||
@@ -2848,18 +2850,17 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
|
||||
|
||||
// receive updatePosition with incorrect config version
|
||||
UpdatePositionArgs args;
|
||||
ASSERT_OK(args.initialize(
|
||||
BSON(UpdatePositionArgs::kCommandFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kUpdateArrayFieldName
|
||||
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
|
||||
<< 3
|
||||
<< UpdatePositionArgs::kMemberIdFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kDurableOpTimeFieldName
|
||||
<< BSON("ts" << time2.getTimestamp() << "t" << 3)
|
||||
<< UpdatePositionArgs::kAppliedOpTimeFieldName
|
||||
<< BSON("ts" << time2.getTimestamp() << "t" << 3))))));
|
||||
ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kUpdateArrayFieldName
|
||||
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
|
||||
<< 3
|
||||
<< UpdatePositionArgs::kMemberIdFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kDurableOpTimeFieldName
|
||||
<< time2.toBSON()
|
||||
<< UpdatePositionArgs::kAppliedOpTimeFieldName
|
||||
<< time2.toBSON())))));
|
||||
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
@@ -2891,13 +2892,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermZero staleTime(10, 0);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
OpTimeWithTermOne staleTime(10, 0);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time1);
|
||||
getReplCoord()->setMyLastDurableOpTime(time1);
|
||||
|
||||
@@ -2947,12 +2948,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTime time1({100, 1}, 2);
|
||||
OpTime time2({100, 2}, 2);
|
||||
OpTime time1({100, 1}, 1);
|
||||
OpTime time2({100, 2}, 1);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time1);
|
||||
getReplCoord()->setMyLastDurableOpTime(time1);
|
||||
|
||||
@@ -2962,18 +2963,17 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
|
||||
|
||||
// receive updatePosition with nonexistent member id
|
||||
UpdatePositionArgs args;
|
||||
ASSERT_OK(args.initialize(
|
||||
BSON(UpdatePositionArgs::kCommandFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kUpdateArrayFieldName
|
||||
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
|
||||
<< 2
|
||||
<< UpdatePositionArgs::kMemberIdFieldName
|
||||
<< 9
|
||||
<< UpdatePositionArgs::kDurableOpTimeFieldName
|
||||
<< BSON("ts" << time2.getTimestamp() << "t" << 2)
|
||||
<< UpdatePositionArgs::kAppliedOpTimeFieldName
|
||||
<< BSON("ts" << time2.getTimestamp() << "t" << 2))))));
|
||||
ASSERT_OK(args.initialize(BSON(UpdatePositionArgs::kCommandFieldName
|
||||
<< 1
|
||||
<< UpdatePositionArgs::kUpdateArrayFieldName
|
||||
<< BSON_ARRAY(BSON(UpdatePositionArgs::kConfigVersionFieldName
|
||||
<< 2
|
||||
<< UpdatePositionArgs::kMemberIdFieldName
|
||||
<< 9
|
||||
<< UpdatePositionArgs::kDurableOpTimeFieldName
|
||||
<< time2.toBSON()
|
||||
<< UpdatePositionArgs::kAppliedOpTimeFieldName
|
||||
<< time2.toBSON())))));
|
||||
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
@@ -3003,13 +3003,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermZero staleTime(10, 0);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
OpTimeWithTermOne staleTime(10, 0);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time1);
|
||||
getReplCoord()->setMyLastDurableOpTime(time1);
|
||||
|
||||
@@ -3058,13 +3058,13 @@ TEST_F(ReplCoordTest,
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time1(100, 1);
|
||||
OpTimeWithTermZero time2(100, 2);
|
||||
OpTimeWithTermZero staleTime(10, 0);
|
||||
OpTimeWithTermOne time1(100, 1);
|
||||
OpTimeWithTermOne time2(100, 2);
|
||||
OpTimeWithTermOne staleTime(10, 0);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time1);
|
||||
getReplCoord()->setMyLastDurableOpTime(time1);
|
||||
|
||||
@@ -3151,11 +3151,11 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) {
|
||||
disableSnapshots();
|
||||
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time(100, 2);
|
||||
OpTimeWithTermOne time(100, 2);
|
||||
|
||||
// 3 nodes waiting for time
|
||||
WriteConcernOptions writeConcern;
|
||||
@@ -3242,11 +3242,11 @@ TEST_F(
|
||||
<< 2))),
|
||||
HostAndPort("node1", 12345));
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTimeWithTermZero time(100, 2);
|
||||
OpTimeWithTermOne time(100, 2);
|
||||
|
||||
// 3 nodes waiting for time
|
||||
WriteConcernOptions writeConcern;
|
||||
@@ -3316,8 +3316,8 @@ TEST_F(ReplCoordTest,
|
||||
disableSnapshots();
|
||||
|
||||
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 1));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 1));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
|
||||
simulateSuccessfulV1Election();
|
||||
|
||||
OpTime time(Timestamp(100, 2), 1);
|
||||
@@ -3523,15 +3523,15 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin
|
||||
<< 0))),
|
||||
HostAndPort("node1", 12345));
|
||||
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
|
||||
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
shutdown(txn.get());
|
||||
|
||||
auto status = getReplCoord()->waitUntilOpTimeForRead(
|
||||
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
|
||||
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
|
||||
ASSERT_EQ(status, ErrorCodes::ShutdownInProgress);
|
||||
}
|
||||
|
||||
@@ -3547,14 +3547,14 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
|
||||
<< 0))),
|
||||
HostAndPort("node1", 12345));
|
||||
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
|
||||
|
||||
const auto txn = makeOperationContext();
|
||||
killOperation(txn.get());
|
||||
|
||||
auto status = getReplCoord()->waitUntilOpTimeForRead(
|
||||
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
|
||||
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
|
||||
ASSERT_EQ(status, ErrorCodes::Interrupted);
|
||||
}
|
||||
|
||||
@@ -3587,14 +3587,13 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
|
||||
<< 0))),
|
||||
HostAndPort("node1", 12345));
|
||||
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
|
||||
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
|
||||
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
|
||||
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
|
||||
txn.get(),
|
||||
ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)));
|
||||
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)));
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTimeEqualToOurLast) {
|
||||
@@ -3610,7 +3609,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
|
||||
HostAndPort("node1", 12345));
|
||||
|
||||
|
||||
OpTimeWithTermZero time(100, 0);
|
||||
OpTimeWithTermOne time(100, 0);
|
||||
getReplCoord()->setMyLastAppliedOpTime(time);
|
||||
getReplCoord()->setMyLastDurableOpTime(time);
|
||||
|
||||
@@ -3627,7 +3626,7 @@ TEST_F(ReplCoordTest,
|
||||
auto txn = makeOperationContext();
|
||||
|
||||
auto status = getReplCoord()->waitUntilOpTimeForRead(
|
||||
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
|
||||
txn.get(), ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
|
||||
ASSERT_EQ(status, ErrorCodes::NotAReplicaSet);
|
||||
}
|
||||
|
||||
|
||||
@@ -83,6 +83,14 @@ public:
|
||||
// Replica set resync.
|
||||
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
|
||||
if (getGlobalReplicationCoordinator()->getSettings().usingReplSets()) {
|
||||
// Resync is disabled in production on replica sets until it stabilizes (SERVER-27081).
|
||||
if (!Command::testCommandsEnabled) {
|
||||
return appendCommandStatus(
|
||||
result,
|
||||
Status(ErrorCodes::OperationFailed,
|
||||
"Replica sets do not support the resync command"));
|
||||
}
|
||||
|
||||
const MemberState memberState = replCoord->getMemberState();
|
||||
if (memberState.startup()) {
|
||||
return appendCommandStatus(
|
||||
|
||||
@@ -45,6 +45,7 @@
|
||||
#include "mongo/db/storage/storage_engine.h"
|
||||
#include "mongo/db/write_concern_options.h"
|
||||
#include "mongo/rpc/protocol.h"
|
||||
#include "mongo/util/fail_point_service.h"
|
||||
#include "mongo/util/log.h"
|
||||
|
||||
namespace mongo {
|
||||
@@ -59,6 +60,8 @@ static Counter64 gleWtimeouts;
|
||||
static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts",
|
||||
&gleWtimeouts);
|
||||
|
||||
MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern);
|
||||
|
||||
StatusWith<WriteConcernOptions> extractWriteConcern(OperationContext* txn,
|
||||
const BSONObj& cmdObj,
|
||||
const std::string& dbName,
|
||||
@@ -182,6 +185,8 @@ Status waitForWriteConcern(OperationContext* txn,
|
||||
<< ", write concern: " << writeConcern.toBSON();
|
||||
auto replCoord = repl::ReplicationCoordinator::get(txn);
|
||||
|
||||
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern);
|
||||
|
||||
// Next handle blocking on disk
|
||||
Timer syncTimer;
|
||||
WriteConcernOptions writeConcernWithPopulatedSyncMode =
|
||||
|
||||
@@ -30,6 +30,8 @@
|
||||
|
||||
#include "mongo/platform/basic.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/transport/transport_layer_legacy.h"
|
||||
@@ -47,6 +49,14 @@
|
||||
|
||||
namespace mongo {
|
||||
namespace transport {
|
||||
namespace {
|
||||
struct lock_weak {
|
||||
template <typename T>
|
||||
std::shared_ptr<T> operator()(const std::weak_ptr<T>& p) const {
|
||||
return p.lock();
|
||||
}
|
||||
};
|
||||
} // namespace
|
||||
|
||||
TransportLayerLegacy::ListenerLegacy::ListenerLegacy(const TransportLayerLegacy::Options& opts,
|
||||
NewConnectionCb callback)
|
||||
@@ -221,23 +231,40 @@ void TransportLayerLegacy::_closeConnection(Connection* conn) {
|
||||
Listener::globalTicketHolder.release();
|
||||
}
|
||||
|
||||
// Capture all of the weak pointers behind the lock, to delay their expiry until we leave the
|
||||
// locking context. This function requires proof of locking, by passing the lock guard.
|
||||
auto TransportLayerLegacy::lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const
|
||||
-> std::vector<LegacySessionHandle> {
|
||||
using std::begin;
|
||||
using std::end;
|
||||
std::vector<std::shared_ptr<LegacySession>> result;
|
||||
std::transform(begin(_sessions), end(_sessions), std::back_inserter(result), lock_weak());
|
||||
// Skip expired weak pointers.
|
||||
result.erase(std::remove(begin(result), end(result), nullptr), end(result));
|
||||
return result;
|
||||
}
|
||||
|
||||
void TransportLayerLegacy::endAllSessions(Session::TagMask tags) {
|
||||
log() << "legacy transport layer closing all connections";
|
||||
{
|
||||
stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
|
||||
for (auto&& it : _sessions) {
|
||||
stdx::unique_lock<stdx::mutex> lk(_sessionsMutex);
|
||||
// We want to capture the shared_ptrs to our sessions in a way which lets us destroy them
|
||||
// outside of the lock.
|
||||
const auto sessions = lockAllSessions(lk);
|
||||
|
||||
// Attempt to make our weak_ptr into a shared_ptr
|
||||
auto session = it.lock();
|
||||
if (session) {
|
||||
if (session->getTags() & tags) {
|
||||
log() << "Skip closing connection for connection # "
|
||||
<< session->conn()->connectionId;
|
||||
} else {
|
||||
_closeConnection(session->conn());
|
||||
}
|
||||
for (auto&& session : sessions) {
|
||||
if (session->getTags() & tags) {
|
||||
log() << "Skip closing connection for connection # "
|
||||
<< session->conn()->connectionId;
|
||||
} else {
|
||||
_closeConnection(session->conn());
|
||||
}
|
||||
}
|
||||
// TODO(SERVER-27069): Revamp this lock to not cover the loop. This unlock was put here
|
||||
// specifically to minimize risk, just before the release of 3.4. The risk is that we would
|
||||
// be in the loop without the lock, which most of our testing didn't do. We must unlock
|
||||
// manually here, because the `sessions` vector must be destroyed *outside* of the lock.
|
||||
lk.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "mongo/stdx/list.h"
|
||||
#include "mongo/stdx/memory.h"
|
||||
#include "mongo/stdx/mutex.h"
|
||||
@@ -101,6 +103,8 @@ private:
|
||||
using NewConnectionCb = stdx::function<void(std::unique_ptr<AbstractMessagingPort>)>;
|
||||
using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>;
|
||||
|
||||
std::vector<LegacySessionHandle> lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const;
|
||||
|
||||
/**
|
||||
* Connection object, to associate Sessions with AbstractMessagingPorts.
|
||||
*/
|
||||
@@ -127,8 +131,8 @@ private:
|
||||
public:
|
||||
~LegacySession();
|
||||
|
||||
static std::shared_ptr<LegacySession> create(std::unique_ptr<AbstractMessagingPort> amp,
|
||||
TransportLayerLegacy* tl);
|
||||
static LegacySessionHandle create(std::unique_ptr<AbstractMessagingPort> amp,
|
||||
TransportLayerLegacy* tl);
|
||||
|
||||
TransportLayer* getTransportLayer() const override {
|
||||
return _tl;
|
||||
|
||||
Reference in New Issue
Block a user