Revert "Revert "SERVER-6071 use command on local.slaves instead of cursor""

This reverts commit 6486b4035c.
This commit is contained in:
matt dannenberg
2013-07-15 14:30:17 -04:00
parent 9bf70757db
commit 27c4e7fbd2
21 changed files with 1830 additions and 52 deletions

View File

@@ -0,0 +1,201 @@
/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of
* sync_source_feedback's updatePosition command and the OplogReader-based method of updating the
* primary's knowledge of the secondaries' sync progress. This is done through a modified version of
* the tags.js replicaset js test because tags.js was the test that helped me discover and resolve
* the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms
* running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the
* six replset_primary_updater tests, we run tags.js with each possible permutation of new and old
* nodes along this chain.
*/
if (!_isWindows()) {
function myprint( x ) {
print( "tags output: " + x );
}
load( './jstests/multiVersion/libs/multi_rs.js' )
load( './jstests/libs/test_background_ops.js' )
var oldVersion = "2.4"
var newVersion = "latest"
var nodes = { n1 : { binVersion : oldVersion },
n2 : { binVersion : oldVersion },
n3 : { binVersion : oldVersion },
n4 : { binVersion : oldVersion },
n5 : { binVersion : newVersion } }
// Wait for a primary node...
var num = 5;
var host = getHostName();
var name = "dannentest";
var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} );
var nodes = replTest.startSet();
var port = replTest.ports;
replTest.initiate({_id : name, members :
[
{_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}},
{_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}},
{_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}},
{_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}},
{_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}},
],
settings : {
getLastErrorModes : {
"2 dc and 3 server" : {"dc" : 2, "server" : 3},
"1 and 2" : {"server" : 1}
}
}});
var master = replTest.getMaster();
// make everyone catch up before reconfig
replTest.awaitReplication();
var config = master.getDB("local").system.replset.findOne();
printjson(config);
var modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
config.version++;
config.members[1].priority = 1.5;
config.members[2].priority = 2;
modes["3 or 4"] = {"sf" : 1};
modes["3 and 4"] = {"sf" : 2};
modes["1 and 2"]["2"] = 1;
modes["2"] = {"2" : 1}
try {
master.getDB("admin").runCommand({replSetReconfig : config});
}
catch(e) {
myprint(e);
}
replTest.awaitReplication();
myprint("primary should now be 2");
master = replTest.getMaster();
config = master.getDB("local").system.replset.findOne();
printjson(config);
modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
assert.eq(modes["3 or 4"]["sf"], 1);
assert.eq(modes["3 and 4"]["sf"], 2);
myprint("bridging");
replTest.bridge();
myprint("bridge 1");
replTest.partition(0, 3);
myprint("bridge 2");
replTest.partition(0, 4);
myprint("bridge 3");
replTest.partition(1, 3);
myprint("bridge 4");
replTest.partition(1, 4);
myprint("bridge 5");
replTest.partition(2, 3);
myprint("bridge 6");
replTest.partition(2, 4);
myprint("bridge 7");
replTest.partition(3, 4);
myprint("done bridging");
myprint("paritions: [0-1-2-0] [3] [4]")
myprint("test1");
myprint("2 should be primary");
master = replTest.getMaster();
printjson(master.getDB("admin").runCommand({replSetGetStatus:1}));
var timeout = 20000;
master.getDB("foo").bar.insert({x:1});
var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(1,4);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test3");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(3,4);
myprint("partitions: [0-4-3] [0-1-2-0]");
myprint("31004 should sync from 31001 (31026)");
myprint("31003 should sync from 31004 (31024)");
myprint("test4");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("non-existent w");
result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout});
printjson(result);
assert.eq(result.code, 14830);
assert.eq(result.ok, 0);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test two on the primary");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test5");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
replTest.unPartition(1,3);
replTest.partition(2, 0);
replTest.partition(2, 1);
replTest.stop(2);
myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads");
master = replTest.getMaster();
myprint("test6");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.stopSet();
myprint("\n\ntags.js SUCCESS\n\n");
}

View File

@@ -0,0 +1,201 @@
/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of
* sync_source_feedback's updatePosition command and the OplogReader-based method of updating the
* primary's knowledge of the secondaries' sync progress. This is done through a modified version of
* the tags.js replicaset js test because tags.js was the test that helped me discover and resolve
* the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms
* running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the
* six replset_primary_updater tests, we run tags.js with each possible permutation of new and old
* nodes along this chain.
*/
if (!_isWindows()) {
function myprint( x ) {
print( "tags output: " + x );
}
load( './jstests/multiVersion/libs/multi_rs.js' )
load( './jstests/libs/test_background_ops.js' )
var oldVersion = "2.4"
var newVersion = "latest"
var nodes = { n1 : { binVersion : oldVersion },
n2 : { binVersion : newVersion },
n3 : { binVersion : oldVersion },
n4 : { binVersion : oldVersion },
n5 : { binVersion : oldVersion } }
// Wait for a primary node...
var num = 5;
var host = getHostName();
var name = "dannentest";
var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} );
var nodes = replTest.startSet();
var port = replTest.ports;
replTest.initiate({_id : name, members :
[
{_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}},
{_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}},
{_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}},
{_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}},
{_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}},
],
settings : {
getLastErrorModes : {
"2 dc and 3 server" : {"dc" : 2, "server" : 3},
"1 and 2" : {"server" : 1}
}
}});
var master = replTest.getMaster();
// make everyone catch up before reconfig
replTest.awaitReplication();
var config = master.getDB("local").system.replset.findOne();
printjson(config);
var modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
config.version++;
config.members[1].priority = 1.5;
config.members[2].priority = 2;
modes["3 or 4"] = {"sf" : 1};
modes["3 and 4"] = {"sf" : 2};
modes["1 and 2"]["2"] = 1;
modes["2"] = {"2" : 1}
try {
master.getDB("admin").runCommand({replSetReconfig : config});
}
catch(e) {
myprint(e);
}
replTest.awaitReplication();
myprint("primary should now be 2");
master = replTest.getMaster();
config = master.getDB("local").system.replset.findOne();
printjson(config);
modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
assert.eq(modes["3 or 4"]["sf"], 1);
assert.eq(modes["3 and 4"]["sf"], 2);
myprint("bridging");
replTest.bridge();
myprint("bridge 1");
replTest.partition(0, 3);
myprint("bridge 2");
replTest.partition(0, 4);
myprint("bridge 3");
replTest.partition(1, 3);
myprint("bridge 4");
replTest.partition(1, 4);
myprint("bridge 5");
replTest.partition(2, 3);
myprint("bridge 6");
replTest.partition(2, 4);
myprint("bridge 7");
replTest.partition(3, 4);
myprint("done bridging");
myprint("paritions: [0-1-2-0] [3] [4]")
myprint("test1");
myprint("2 should be primary");
master = replTest.getMaster();
printjson(master.getDB("admin").runCommand({replSetGetStatus:1}));
var timeout = 20000;
master.getDB("foo").bar.insert({x:1});
var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(1,4);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test3");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(3,4);
myprint("partitions: [0-4-3] [0-1-2-0]");
myprint("31004 should sync from 31001 (31026)");
myprint("31003 should sync from 31004 (31024)");
myprint("test4");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("non-existent w");
result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout});
printjson(result);
assert.eq(result.code, 14830);
assert.eq(result.ok, 0);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test two on the primary");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test5");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
replTest.unPartition(1,3);
replTest.partition(2, 0);
replTest.partition(2, 1);
replTest.stop(2);
myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads");
master = replTest.getMaster();
myprint("test6");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.stopSet();
myprint("\n\ntags.js SUCCESS\n\n");
}

View File

@@ -0,0 +1,201 @@
/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of
* sync_source_feedback's updatePosition command and the OplogReader-based method of updating the
* primary's knowledge of the secondaries' sync progress. This is done through a modified version of
* the tags.js replicaset js test because tags.js was the test that helped me discover and resolve
* the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms
* running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the
* six replset_primary_updater tests, we run tags.js with each possible permutation of new and old
* nodes along this chain.
*/
if (!_isWindows()) {
function myprint( x ) {
print( "tags output: " + x );
}
load( './jstests/multiVersion/libs/multi_rs.js' )
load( './jstests/libs/test_background_ops.js' )
var oldVersion = "2.4"
var newVersion = "latest"
var nodes = { n1 : { binVersion : oldVersion },
n2 : { binVersion : oldVersion },
n3 : { binVersion : newVersion },
n4 : { binVersion : oldVersion },
n5 : { binVersion : oldVersion } }
// Wait for a primary node...
var num = 5;
var host = getHostName();
var name = "dannentest";
var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} );
var nodes = replTest.startSet();
var port = replTest.ports;
replTest.initiate({_id : name, members :
[
{_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}},
{_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}},
{_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}},
{_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}},
{_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}},
],
settings : {
getLastErrorModes : {
"2 dc and 3 server" : {"dc" : 2, "server" : 3},
"1 and 2" : {"server" : 1}
}
}});
var master = replTest.getMaster();
// make everyone catch up before reconfig
replTest.awaitReplication();
var config = master.getDB("local").system.replset.findOne();
printjson(config);
var modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
config.version++;
config.members[1].priority = 1.5;
config.members[2].priority = 2;
modes["3 or 4"] = {"sf" : 1};
modes["3 and 4"] = {"sf" : 2};
modes["1 and 2"]["2"] = 1;
modes["2"] = {"2" : 1}
try {
master.getDB("admin").runCommand({replSetReconfig : config});
}
catch(e) {
myprint(e);
}
replTest.awaitReplication();
myprint("primary should now be 2");
master = replTest.getMaster();
config = master.getDB("local").system.replset.findOne();
printjson(config);
modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
assert.eq(modes["3 or 4"]["sf"], 1);
assert.eq(modes["3 and 4"]["sf"], 2);
myprint("bridging");
replTest.bridge();
myprint("bridge 1");
replTest.partition(0, 3);
myprint("bridge 2");
replTest.partition(0, 4);
myprint("bridge 3");
replTest.partition(1, 3);
myprint("bridge 4");
replTest.partition(1, 4);
myprint("bridge 5");
replTest.partition(2, 3);
myprint("bridge 6");
replTest.partition(2, 4);
myprint("bridge 7");
replTest.partition(3, 4);
myprint("done bridging");
myprint("paritions: [0-1-2-0] [3] [4]")
myprint("test1");
myprint("2 should be primary");
master = replTest.getMaster();
printjson(master.getDB("admin").runCommand({replSetGetStatus:1}));
var timeout = 20000;
master.getDB("foo").bar.insert({x:1});
var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(1,4);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test3");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(3,4);
myprint("partitions: [0-4-3] [0-1-2-0]");
myprint("31004 should sync from 31001 (31026)");
myprint("31003 should sync from 31004 (31024)");
myprint("test4");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("non-existent w");
result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout});
printjson(result);
assert.eq(result.code, 14830);
assert.eq(result.ok, 0);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test two on the primary");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test5");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
replTest.unPartition(1,3);
replTest.partition(2, 0);
replTest.partition(2, 1);
replTest.stop(2);
myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads");
master = replTest.getMaster();
myprint("test6");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.stopSet();
myprint("\n\ntags.js SUCCESS\n\n");
}

View File

@@ -0,0 +1,201 @@
/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of
* sync_source_feedback's updatePosition command and the OplogReader-based method of updating the
* primary's knowledge of the secondaries' sync progress. This is done through a modified version of
* the tags.js replicaset js test because tags.js was the test that helped me discover and resolve
* the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms
* running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the
* six replset_primary_updater tests, we run tags.js with each possible permutation of new and old
* nodes along this chain.
*/
if (!_isWindows()) {
function myprint( x ) {
print( "tags output: " + x );
}
load( './jstests/multiVersion/libs/multi_rs.js' )
load( './jstests/libs/test_background_ops.js' )
var oldVersion = "2.4"
var newVersion = "latest"
var nodes = { n1 : { binVersion : oldVersion },
n2 : { binVersion : newVersion },
n3 : { binVersion : oldVersion },
n4 : { binVersion : oldVersion },
n5 : { binVersion : newVersion } }
// Wait for a primary node...
var num = 5;
var host = getHostName();
var name = "dannentest";
var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} );
var nodes = replTest.startSet();
var port = replTest.ports;
replTest.initiate({_id : name, members :
[
{_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}},
{_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}},
{_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}},
{_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}},
{_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}},
],
settings : {
getLastErrorModes : {
"2 dc and 3 server" : {"dc" : 2, "server" : 3},
"1 and 2" : {"server" : 1}
}
}});
var master = replTest.getMaster();
// make everyone catch up before reconfig
replTest.awaitReplication();
var config = master.getDB("local").system.replset.findOne();
printjson(config);
var modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
config.version++;
config.members[1].priority = 1.5;
config.members[2].priority = 2;
modes["3 or 4"] = {"sf" : 1};
modes["3 and 4"] = {"sf" : 2};
modes["1 and 2"]["2"] = 1;
modes["2"] = {"2" : 1}
try {
master.getDB("admin").runCommand({replSetReconfig : config});
}
catch(e) {
myprint(e);
}
replTest.awaitReplication();
myprint("primary should now be 2");
master = replTest.getMaster();
config = master.getDB("local").system.replset.findOne();
printjson(config);
modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
assert.eq(modes["3 or 4"]["sf"], 1);
assert.eq(modes["3 and 4"]["sf"], 2);
myprint("bridging");
replTest.bridge();
myprint("bridge 1");
replTest.partition(0, 3);
myprint("bridge 2");
replTest.partition(0, 4);
myprint("bridge 3");
replTest.partition(1, 3);
myprint("bridge 4");
replTest.partition(1, 4);
myprint("bridge 5");
replTest.partition(2, 3);
myprint("bridge 6");
replTest.partition(2, 4);
myprint("bridge 7");
replTest.partition(3, 4);
myprint("done bridging");
myprint("paritions: [0-1-2-0] [3] [4]")
myprint("test1");
myprint("2 should be primary");
master = replTest.getMaster();
printjson(master.getDB("admin").runCommand({replSetGetStatus:1}));
var timeout = 20000;
master.getDB("foo").bar.insert({x:1});
var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(1,4);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test3");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(3,4);
myprint("partitions: [0-4-3] [0-1-2-0]");
myprint("31004 should sync from 31001 (31026)");
myprint("31003 should sync from 31004 (31024)");
myprint("test4");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("non-existent w");
result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout});
printjson(result);
assert.eq(result.code, 14830);
assert.eq(result.ok, 0);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test two on the primary");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test5");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
replTest.unPartition(1,3);
replTest.partition(2, 0);
replTest.partition(2, 1);
replTest.stop(2);
myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads");
master = replTest.getMaster();
myprint("test6");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.stopSet();
myprint("\n\ntags.js SUCCESS\n\n");
}

View File

@@ -0,0 +1,201 @@
/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of
* sync_source_feedback's updatePosition command and the OplogReader-based method of updating the
* primary's knowledge of the secondaries' sync progress. This is done through a modified version of
* the tags.js replicaset js test because tags.js was the test that helped me discover and resolve
* the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms
* running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the
* six replset_primary_updater tests, we run tags.js with each possible permutation of new and old
* nodes along this chain.
*/
if (!_isWindows()) {
function myprint( x ) {
print( "tags output: " + x );
}
load( './jstests/multiVersion/libs/multi_rs.js' )
load( './jstests/libs/test_background_ops.js' )
var oldVersion = "2.4"
var newVersion = "latest"
var nodes = { n1 : { binVersion : oldVersion },
n2 : { binVersion : oldVersion },
n3 : { binVersion : newVersion },
n4 : { binVersion : oldVersion },
n5 : { binVersion : newVersion } }
// Wait for a primary node...
var num = 5;
var host = getHostName();
var name = "dannentest";
var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} );
var nodes = replTest.startSet();
var port = replTest.ports;
replTest.initiate({_id : name, members :
[
{_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}},
{_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}},
{_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}},
{_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}},
{_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}},
],
settings : {
getLastErrorModes : {
"2 dc and 3 server" : {"dc" : 2, "server" : 3},
"1 and 2" : {"server" : 1}
}
}});
var master = replTest.getMaster();
// make everyone catch up before reconfig
replTest.awaitReplication();
var config = master.getDB("local").system.replset.findOne();
printjson(config);
var modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
config.version++;
config.members[1].priority = 1.5;
config.members[2].priority = 2;
modes["3 or 4"] = {"sf" : 1};
modes["3 and 4"] = {"sf" : 2};
modes["1 and 2"]["2"] = 1;
modes["2"] = {"2" : 1}
try {
master.getDB("admin").runCommand({replSetReconfig : config});
}
catch(e) {
myprint(e);
}
replTest.awaitReplication();
myprint("primary should now be 2");
master = replTest.getMaster();
config = master.getDB("local").system.replset.findOne();
printjson(config);
modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
assert.eq(modes["3 or 4"]["sf"], 1);
assert.eq(modes["3 and 4"]["sf"], 2);
myprint("bridging");
replTest.bridge();
myprint("bridge 1");
replTest.partition(0, 3);
myprint("bridge 2");
replTest.partition(0, 4);
myprint("bridge 3");
replTest.partition(1, 3);
myprint("bridge 4");
replTest.partition(1, 4);
myprint("bridge 5");
replTest.partition(2, 3);
myprint("bridge 6");
replTest.partition(2, 4);
myprint("bridge 7");
replTest.partition(3, 4);
myprint("done bridging");
myprint("paritions: [0-1-2-0] [3] [4]")
myprint("test1");
myprint("2 should be primary");
master = replTest.getMaster();
printjson(master.getDB("admin").runCommand({replSetGetStatus:1}));
var timeout = 20000;
master.getDB("foo").bar.insert({x:1});
var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(1,4);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test3");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(3,4);
myprint("partitions: [0-4-3] [0-1-2-0]");
myprint("31004 should sync from 31001 (31026)");
myprint("31003 should sync from 31004 (31024)");
myprint("test4");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("non-existent w");
result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout});
printjson(result);
assert.eq(result.code, 14830);
assert.eq(result.ok, 0);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test two on the primary");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test5");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
replTest.unPartition(1,3);
replTest.partition(2, 0);
replTest.partition(2, 1);
replTest.stop(2);
myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads");
master = replTest.getMaster();
myprint("test6");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.stopSet();
myprint("\n\ntags.js SUCCESS\n\n");
}

View File

@@ -0,0 +1,201 @@
/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of
* sync_source_feedback's updatePosition command and the OplogReader-based method of updating the
* primary's knowledge of the secondaries' sync progress. This is done through a modified version of
* the tags.js replicaset js test because tags.js was the test that helped me discover and resolve
* the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms
* running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the
* six replset_primary_updater tests, we run tags.js with each possible permutation of new and old
* nodes along this chain.
*/
if (!_isWindows()) {
function myprint( x ) {
print( "tags output: " + x );
}
load( './jstests/multiVersion/libs/multi_rs.js' )
load( './jstests/libs/test_background_ops.js' )
var oldVersion = "2.4"
var newVersion = "latest"
var nodes = { n1 : { binVersion : oldVersion },
n2 : { binVersion : newVersion },
n3 : { binVersion : newVersion },
n4 : { binVersion : oldVersion },
n5 : { binVersion : oldVersion } }
// Wait for a primary node...
var num = 5;
var host = getHostName();
var name = "dannentest";
var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} );
var nodes = replTest.startSet();
var port = replTest.ports;
replTest.initiate({_id : name, members :
[
{_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}},
{_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}},
{_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}},
{_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}},
{_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}},
],
settings : {
getLastErrorModes : {
"2 dc and 3 server" : {"dc" : 2, "server" : 3},
"1 and 2" : {"server" : 1}
}
}});
var master = replTest.getMaster();
// make everyone catch up before reconfig
replTest.awaitReplication();
var config = master.getDB("local").system.replset.findOne();
printjson(config);
var modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
config.version++;
config.members[1].priority = 1.5;
config.members[2].priority = 2;
modes["3 or 4"] = {"sf" : 1};
modes["3 and 4"] = {"sf" : 2};
modes["1 and 2"]["2"] = 1;
modes["2"] = {"2" : 1}
try {
master.getDB("admin").runCommand({replSetReconfig : config});
}
catch(e) {
myprint(e);
}
replTest.awaitReplication();
myprint("primary should now be 2");
master = replTest.getMaster();
config = master.getDB("local").system.replset.findOne();
printjson(config);
modes = config.settings.getLastErrorModes;
assert.eq(typeof modes, "object");
assert.eq(modes["2 dc and 3 server"].dc, 2);
assert.eq(modes["2 dc and 3 server"].server, 3);
assert.eq(modes["1 and 2"]["server"], 1);
assert.eq(modes["3 or 4"]["sf"], 1);
assert.eq(modes["3 and 4"]["sf"], 2);
myprint("bridging");
replTest.bridge();
myprint("bridge 1");
replTest.partition(0, 3);
myprint("bridge 2");
replTest.partition(0, 4);
myprint("bridge 3");
replTest.partition(1, 3);
myprint("bridge 4");
replTest.partition(1, 4);
myprint("bridge 5");
replTest.partition(2, 3);
myprint("bridge 6");
replTest.partition(2, 4);
myprint("bridge 7");
replTest.partition(3, 4);
myprint("done bridging");
myprint("paritions: [0-1-2-0] [3] [4]")
myprint("test1");
myprint("2 should be primary");
master = replTest.getMaster();
printjson(master.getDB("admin").runCommand({replSetGetStatus:1}));
var timeout = 20000;
master.getDB("foo").bar.insert({x:1});
var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(1,4);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("partitions: [1-4] [0-1-2-0] [3]");
myprint("test3");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.unPartition(3,4);
myprint("partitions: [0-4-3] [0-1-2-0]");
myprint("31004 should sync from 31001 (31026)");
myprint("31003 should sync from 31004 (31024)");
myprint("test4");
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("non-existent w");
result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout});
printjson(result);
assert.eq(result.code, 14830);
assert.eq(result.ok, 0);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test two on the primary");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0});
printjson(result);
assert.eq(result.err, null);
myprint("test5");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
replTest.unPartition(1,3);
replTest.partition(2, 0);
replTest.partition(2, 1);
replTest.stop(2);
myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads");
master = replTest.getMaster();
myprint("test6");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout});
printjson(result);
assert.eq(result.err, null);
myprint("test mode 2");
master.getDB("foo").bar.insert({x:1});
result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout});
printjson(result);
assert.eq(result.err, "timeout");
replTest.stopSet();
myprint("\n\ntags.js SUCCESS\n\n");
}

View File

@@ -35,9 +35,8 @@ replTest.partition(4,3);
jsTestLog("Checking that ops still replicate correctly");
master.getDB("foo").bar.insert({x:1});
replTest.awaitReplication();
var result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:1000});
var result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:30000});
assert.eq(null, result.err, tojson(result));
// 4 is connected to 3
@@ -45,9 +44,8 @@ replTest.partition(4,2);
replTest.unPartition(4,3);
master.getDB("foo").bar.insert({x:1});
replTest.awaitReplication();
result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:1000});
result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:30000});
assert.eq(null, result.err, tojson(result));
replTest.stopSet();
replTest.stopSet();

View File

@@ -458,6 +458,7 @@ serverOnlyFiles = [ "db/curop.cpp",
"db/repl/master_slave.cpp",
"db/repl/finding_start_cursor.cpp",
"db/repl/sync.cpp",
"db/repl/sync_source_feedback.cpp",
"db/repl/oplogreader.cpp",
"db/repl/replication_server_status.cpp",
"db/repl/repl_reads_ok.cpp",

View File

@@ -72,6 +72,7 @@
"replSetReconfig",
"replSetStepDown",
"replSetSyncFrom",
"replSetUpdatePosition",
"resync",
"serverStatus",
"setParameter",

View File

@@ -22,6 +22,7 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/rs.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/base/counter.h"
#include "mongo/db/stats/timer_stats.h"
@@ -79,7 +80,6 @@ namespace replset {
_assumingPrimary(false),
_currentSyncTarget(NULL),
_oplogMarkerTarget(NULL),
_oplogMarker(true /* doHandshake */),
_consumedOpTime(0, 0) {
}
@@ -122,6 +122,7 @@ namespace replset {
void BackgroundSync::notifierThread() {
Client::initThread("rsSyncNotifier");
replLocalAuth();
theReplSet->syncSourceFeedback.go();
while (!inShutdown()) {
bool clearTarget = false;
@@ -168,37 +169,44 @@ namespace replset {
}
void BackgroundSync::markOplog() {
LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog;
LOG(3) << "replset markOplog: " << _consumedOpTime << " "
<< theReplSet->lastOpTimeWritten << rsLog;
if (!hasCursor()) {
sleepsecs(1);
return;
if (theReplSet->syncSourceFeedback.supportsUpdater()) {
theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten);
_consumedOpTime = theReplSet->lastOpTimeWritten;
}
else {
if (!hasCursor()) {
return;
}
if (!_oplogMarker.moreInCurrentBatch()) {
_oplogMarker.more();
if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) {
theReplSet->syncSourceFeedback.more();
}
if (!theReplSet->syncSourceFeedback.more()) {
theReplSet->syncSourceFeedback.tailCheck();
return;
}
// if this member has written the op at optime T
// we want to nextSafe up to and including T
while (_consumedOpTime < theReplSet->lastOpTimeWritten
&& theReplSet->syncSourceFeedback.more()) {
BSONObj temp = theReplSet->syncSourceFeedback.nextSafe();
_consumedOpTime = temp["ts"]._opTime();
}
// call more() to signal the sync target that we've synced T
theReplSet->syncSourceFeedback.more();
}
if (!_oplogMarker.more()) {
_oplogMarker.tailCheck();
sleepsecs(1);
return;
}
// if this member has written the op at optime T, we want to nextSafe up to and including T
while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) {
BSONObj temp = _oplogMarker.nextSafe();
_consumedOpTime = temp["ts"]._opTime();
}
// call more() to signal the sync target that we've synced T
_oplogMarker.more();
}
bool BackgroundSync::hasCursor() {
{
// prevent writers from blocking readers during fsync
SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
// we don't need the local write lock yet, but it's needed by OplogReader::connect
// so we take it preemptively to avoid deadlocking.
Lock::DBWrite lk("local");
@@ -210,25 +218,23 @@ namespace replset {
return false;
}
log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog;
log() << "replset setting oplog notifier to "
<< _currentSyncTarget->fullName() << rsLog;
_oplogMarkerTarget = _currentSyncTarget;
_oplogMarker.resetConnection();
if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) {
LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog;
if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget)) {
_oplogMarkerTarget = NULL;
return false;
}
}
}
if (!_oplogMarker.haveCursor()) {
if (!theReplSet->syncSourceFeedback.haveCursor()) {
BSONObj fields = BSON("ts" << 1);
_oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields);
theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog,
theReplSet->lastOpTimeWritten, &fields);
}
return _oplogMarker.haveCursor();
return theReplSet->syncSourceFeedback.haveCursor();
}
void BackgroundSync::producerThread() {
@@ -525,6 +531,8 @@ namespace replset {
_currentSyncTarget = target;
}
theReplSet->syncSourceFeedback.connect(target);
return;
}

View File

@@ -86,7 +86,6 @@ namespace replset {
boost::mutex _lastOpMutex;
const Member* _oplogMarkerTarget;
OplogReader _oplogMarker; // not locked, only used by notifier thread
OpTime _consumedOpTime; // not locked, only used by notifier thread
BackgroundSync();

View File

@@ -348,6 +348,15 @@ namespace mongo {
return 0;
}
Member* ReplSetImpl::getMutableMember(unsigned id) {
if( _self && id == _self->id() ) return _self;
for( Member *m = head(); m; m = m->next() )
if( m->id() == id )
return m;
return 0;
}
Member* ReplSetImpl::findByName(const std::string& hostname) const {
if (_self && hostname == _self->fullName()) {
return _self;

View File

@@ -140,7 +140,7 @@ namespace mongo {
if( conn() == 0 ) {
_conn = shared_ptr<DBClientConnection>(new DBClientConnection(false,
0,
30 /* tcp timeout */));
tcp_timeout));
string errmsg;
if ( !_conn->connect(hostName.c_str(), errmsg) ||
(AuthorizationManager::isAuthEnabled() && !replAuthenticate(_conn.get(), true)) ) {

View File

@@ -50,6 +50,9 @@ namespace mongo {
return findOne(ns, Query().sort(reverseNaturalObj));
}
/* SO_TIMEOUT (send/recv time out) for our DBClientConnections */
static const int tcp_timeout = 30;
/* ok to call if already connected */
bool connect(const std::string& hostname);

View File

@@ -24,6 +24,7 @@
#include "mongo/db/cmdline.h"
#include "mongo/db/commands.h"
#include "mongo/db/repl/health.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_server_status.h" // replSettings
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rs_config.h"
@@ -396,4 +397,42 @@ namespace mongo {
}
} cmdReplSetSyncFrom;
class CmdReplSetUpdatePosition: public ReplSetCommand {
public:
virtual void help( stringstream &help ) const {
help << "internal";
}
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
ActionSet actions;
actions.addAction(ActionType::replSetUpdatePosition);
out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions));
}
CmdReplSetUpdatePosition() : ReplSetCommand("replSetUpdatePosition") { }
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg,
BSONObjBuilder& result, bool fromRepl) {
if (!check(errmsg, result))
return false;
if (cmdObj.hasField("handshake")) {
// we have received a handshake, not an update message
// handshakes are done here to ensure the receiving end supports the update command
cc().gotHandshake(cmdObj["handshake"].embeddedObject());
// if we aren't primary, pass the handshake along
if (!theReplSet->isPrimary() && theReplSet->syncSourceFeedback.supportsUpdater()) {
theReplSet->syncSourceFeedback.forwardSlaveHandshake();
}
return true;
}
uassert(16888, "optimes field should be an array with an object for each secondary",
cmdObj["optimes"].type() == Array);
BSONArray newTimes = BSONArray(cmdObj["optimes"].Obj());
updateSlaveLocations(newTimes);
return true;
}
} cmdReplSetUpdatePosition;
}

View File

@@ -928,8 +928,11 @@ namespace mongo {
void ReplSetImpl::registerSlave(const BSONObj& rid, const int memberId) {
// To prevent race conditions with clearing the cache at reconfig time,
// we lock the replset mutex here.
lock lk(this);
ghost->associateSlave(rid, memberId);
{
lock lk(this);
ghost->associateSlave(rid, memberId);
}
syncSourceFeedback.associateMember(rid, memberId);
}
class ReplIndexPrefetch : public ServerParameter {

View File

@@ -26,6 +26,7 @@
#include "mongo/db/repl/rs_exception.h"
#include "mongo/db/repl/rs_member.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/sync_source_feedback.h"
#include "mongo/util/concurrency/list.h"
#include "mongo/util/concurrency/msg.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -341,6 +342,8 @@ namespace mongo {
StateBox box;
SyncSourceFeedback syncSourceFeedback;
OpTime lastOpTimeWritten;
long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork"
bool forceSyncFrom(const string& host, string& errmsg, BSONObjBuilder& result);
@@ -505,6 +508,7 @@ namespace mongo {
Member* head() const { return _members.head(); }
public:
const Member* findById(unsigned id) const;
Member* getMutableMember(unsigned id);
Member* findByName(const std::string& hostname) const;
private:
void _getTargets(list<Target>&, int &configVersion);

View File

@@ -0,0 +1,310 @@
/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mongo/db/repl/sync_source_feedback.h"
#include "mongo/client/constants.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/security_key.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/rs.h" // theReplSet
namespace mongo {
// used in replAuthenticate
static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}");
void SyncSourceFeedback::associateMember(const BSONObj& id, const int memberId) {
const OID rid = id["_id"].OID();
boost::unique_lock<boost::mutex> lock(_mtx);
_handshakeNeeded = true;
_members[rid] = theReplSet->getMutableMember(memberId);
_cond.notify_all();
}
bool SyncSourceFeedback::replAuthenticate(bool skipAuthCheck) {
if (!AuthorizationManager::isAuthEnabled()) {
return true;
}
if (!skipAuthCheck && !cc().getAuthorizationSession()->hasInternalAuthorization()) {
log() << "replauthenticate: requires internal authorization, failing" << endl;
return false;
}
if (isInternalAuthSet()) {
return authenticateInternalUser(_connection.get());
}
BSONObj user;
{
Client::ReadContext ctxt("local.");
if(!Helpers::findOne("local.system.users", userReplQuery, user) ||
// try the first user in local
!Helpers::getSingleton("local.system.users", user)) {
log() << "replauthenticate: no user in local.system.users to use"
<< "for authentication" << endl;
return false;
}
}
std::string u = user.getStringField("user");
std::string p = user.getStringField("pwd");
massert(16889, "bad user object? [1]", !u.empty());
massert(16887, "bad user object? [2]", !p.empty());
std::string err;
if( !_connection->auth("local", u.c_str(), p.c_str(), err, false) ) {
log() << "replauthenticate: can't authenticate to master server, user:" << u << endl;
return false;
}
return true;
}
void SyncSourceFeedback::ensureMe() {
string myname = getHostName();
{
Client::WriteContext ctx("local");
// local.me is an identifier for a server for getLastError w:2+
if (!Helpers::getSingleton("local.me", _me) ||
!_me.hasField("host") ||
_me["host"].String() != myname) {
// clean out local.me
Helpers::emptyCollection("local.me");
// repopulate
BSONObjBuilder b;
b.appendOID("_id", 0, true);
b.append("host", myname);
_me = b.obj();
Helpers::putSingleton("local.me", _me);
}
}
}
bool SyncSourceFeedback::replHandshake() {
ensureMe();
// handshake for us
BSONObjBuilder cmd;
cmd.append("replSetUpdatePosition", 1);
BSONObjBuilder sub (cmd.subobjStart("handshake"));
sub.appendAs(_me["_id"], "handshake");
sub.append("member", theReplSet->selfId());
sub.append("config", theReplSet->myConfig().asBson());
sub.doneFast();
BSONObj res;
try {
if (!_connection->runCommand("admin", cmd.obj(), res)) {
if (res["errmsg"].str().find("no such cmd") != std::string::npos) {
_supportsUpdater = false;
}
resetConnection();
return false;
}
else {
_supportsUpdater = true;
}
}
catch (const DBException& e) {
log() << "SyncSourceFeedback error sending handshake: " << e.what() << endl;
resetConnection();
return false;
}
// handshakes for those connected to us
{
for (OIDMemberMap::iterator itr = _members.begin();
itr != _members.end(); ++itr) {
BSONObjBuilder slaveCmd;
slaveCmd.append("replSetUpdatePosition", 1);
// outer handshake indicates this is a handshake command
// inner is needed as part of the structure to be passed to gotHandshake
BSONObjBuilder slaveSub (slaveCmd.subobjStart("handshake"));
slaveSub.append("handshake", itr->first);
slaveSub.append("member", itr->second->id());
slaveSub.append("config", itr->second->config().asBson());
slaveSub.doneFast();
BSONObj slaveRes;
try {
if (!_connection->runCommand("admin", slaveCmd.obj(), slaveRes)) {
resetConnection();
return false;
}
}
catch (const DBException& e) {
log() << "SyncSourceFeedback error sending chained handshakes: "
<< e.what() << endl;
resetConnection();
return false;
}
}
}
return true;
}
bool SyncSourceFeedback::_connect(const std::string& hostName) {
if (hasConnection()) {
return true;
}
_connection.reset(new DBClientConnection(false, 0, OplogReader::tcp_timeout));
string errmsg;
if (!_connection->connect(hostName.c_str(), errmsg) ||
(AuthorizationManager::isAuthEnabled() && !replAuthenticate(true))) {
resetConnection();
log() << "repl: " << errmsg << endl;
return false;
}
if (!replHandshake()) {
if (!supportsUpdater()) {
return connectOplogReader(hostName);
}
return false;
}
return true;
}
bool SyncSourceFeedback::connect(const Member* target) {
boost::unique_lock<boost::mutex> lock(_connmtx);
resetConnection();
resetOplogReaderConnection();
_syncTarget = target;
if (_connect(target->fullName())) {
if (!supportsUpdater()) {
return true;
}
}
return false;
}
void SyncSourceFeedback::forwardSlaveHandshake() {
boost::unique_lock<boost::mutex> lock(_mtx);
_handshakeNeeded = true;
}
void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) {
boost::unique_lock<boost::mutex> lock(_mtx);
LOG(1) << "replSet last: " << _slaveMap[rid].toString() << " to " << ot.toString() << endl;
// only update if ot is newer than what we have already
if (ot > _slaveMap[rid]) {
_slaveMap[rid] = ot;
_positionChanged = true;
LOG(2) << "now last is " << _slaveMap[rid].toString() << endl;
_cond.notify_all();
}
}
bool SyncSourceFeedback::updateUpstream() {
if (theReplSet->isPrimary()) {
// primary has no one to update to
return true;
}
BSONObjBuilder cmd;
cmd.append("replSetUpdatePosition", 1);
// create an array containing objects each member connected to us and for ourself
BSONArrayBuilder array (cmd.subarrayStart("optimes"));
OID myID = _me["_id"].OID();
{
for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin();
itr != _slaveMap.end(); ++itr) {
BSONObjBuilder entry(array.subobjStart());
entry.append("_id", itr->first);
entry.append("optime", itr->second);
if (itr->first == myID) {
entry.append("config", theReplSet->myConfig().asBson());
}
else {
entry.append("config", _members[itr->first]->config().asBson());
}
entry.doneFast();
}
}
array.done();
BSONObj res;
bool ok;
try {
ok = _connection->runCommand("admin", cmd.obj(), res);
}
catch (const DBException& e) {
log() << "SyncSourceFeedback error sending update: " << e.what() << endl;
resetConnection();
return false;
}
if (!ok) {
log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl;
resetConnection();
return false;
}
return true;
}
void SyncSourceFeedback::run() {
Client::initThread("SyncSourceFeedbackThread");
while (true) {
{
boost::unique_lock<boost::mutex> lock(_mtx);
while (!_positionChanged && !_handshakeNeeded) {
_cond.wait(lock);
}
boost::unique_lock<boost::mutex> conlock(_connmtx);
const Member* target = replset::BackgroundSync::get()->getSyncTarget();
if (_syncTarget != target) {
resetConnection();
_syncTarget = target;
}
if (!hasConnection()) {
// fix connection if need be
if (!target) {
continue;
}
if (!_connect(target->fullName())) {
continue;
}
else if (!supportsUpdater()) {
_handshakeNeeded = false;
_positionChanged = false;
continue;
}
}
if (_handshakeNeeded) {
if (!replHandshake()) {
_handshakeNeeded = true;
continue;
}
else {
_handshakeNeeded = false;
}
}
if (_positionChanged) {
if (!updateUpstream()) {
_positionChanged = true;
continue;
}
else {
_positionChanged = false;
}
}
}
}
}
}

View File

@@ -0,0 +1,171 @@
/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "mongo/db/repl/oplogreader.h"
#include "mongo/util/background.h"
namespace mongo {
class Member;
class SyncSourceFeedback : public BackgroundJob {
public:
SyncSourceFeedback() : BackgroundJob(false /*don't selfdelete*/),
_syncTarget(NULL),
_oplogReader(new OplogReader(true)),
_supportsUpdater(true) {}
~SyncSourceFeedback() {
delete _oplogReader;
}
/// Adds an entry to _member for a secondary that has connected to us.
void associateMember(const BSONObj& id, const int memberId);
/// Passes handshake up the replication chain, upon receiving a handshake.
void forwardSlaveHandshake();
void updateSelfInMap(const OpTime& ot) {
ensureMe();
updateMap(_me["_id"].OID(), ot);
}
/// Connect to sync target and create OplogReader if needed.
bool connect(const Member* target);
void resetConnection() {
_connection.reset();
}
void resetOplogReaderConnection() {
_oplogReader->resetConnection();
}
/// Used extensively in bgsync, to see if we need to use the OplogReader syncing method.
bool supportsUpdater() const {
// oplogReader will be NULL if new updater is supported
//boost::unique_lock<boost::mutex> lock(_mtx);
return _supportsUpdater;
}
/// Updates the _slaveMap to be forwarded to the sync target.
void updateMap(const mongo::OID& rid, const OpTime& ot);
std::string name() const { return "SyncSourceFeedbackThread"; }
/// Loops forever, passing updates when they are present.
void run();
/* The below methods just fall through to OplogReader and are only used when our sync target
* does not support the update command.
*/
bool connectOplogReader(const std::string& hostName) {
return _oplogReader->connect(hostName);
}
bool connect(const BSONObj& rid, const int from, const string& to) {
return _oplogReader->connect(rid, from, to);
}
void ghostQueryGTE(const char *ns, OpTime t) {
_oplogReader->ghostQueryGTE(ns, t);
}
bool haveCursor() {
return _oplogReader->haveCursor();
}
bool more() {
return _oplogReader->more();
}
bool moreInCurrentBatch() {
return _oplogReader->moreInCurrentBatch();
}
BSONObj nextSafe() {
return _oplogReader->nextSafe();
}
void tailCheck() {
_oplogReader->tailCheck();
}
void tailingQueryGTE(const char *ns, OpTime t, const BSONObj* fields=0) {
_oplogReader->tailingQueryGTE(ns, t, fields);
}
private:
/// Ensures local.me is populated and populates it if not.
void ensureMe();
/* Generally replAuthenticate will only be called within system threads to fully
* authenticate connections to other nodes in the cluster that will be used as part of
* internal operations. If a user-initiated action results in needing to call
* replAuthenticate, you can call it with skipAuthCheck set to false. Only do this if you
* are certain that the proper auth checks have already run to ensure that the user is
* authorized to do everything that this connection will be used for!
*/
bool replAuthenticate(bool skipAuthCheck);
/* Sends initialization information to our sync target, also determines whether or not they
* support the updater command.
*/
bool replHandshake();
/* Inform the sync target of our current position in the oplog, as well as the positions
* of all secondaries chained through us.
*/
bool updateUpstream();
bool hasConnection() {
return _connection.get();
}
/// Connect to sync target and create OplogReader if needed.
bool _connect(const std::string& hostName);
// stores our OID to be passed along in commands
BSONObj _me;
// the member we are currently syncing from
const Member* _syncTarget;
// holds the oplogReader for use when we fall back to old style updates
OplogReader* _oplogReader;
// our connection to our sync target
boost::scoped_ptr<DBClientConnection> _connection;
// tracks whether we are in fallback mode or not
bool _supportsUpdater;
// protects connection
boost::mutex _connmtx;
// protects cond and maps and the indicator bools
boost::mutex _mtx;
// contains the most recent optime of each member syncing to us
map<mongo::OID, OpTime> _slaveMap;
typedef map<mongo::OID, Member*> OIDMemberMap;
// contains a pointer to each member, which we can look up by oid
OIDMemberMap _members;
// used to alert our thread of changes which need to be passed up the chain
boost::condition _cond;
// used to indicate a position change which has not yet been pushed along
bool _positionChanged;
// used to indicate a connection change which has not yet been shook on
bool _handshakeNeeded;
};
}

View File

@@ -126,20 +126,22 @@ namespace mongo {
scoped_lock mylk(_mutex);
_slaves[ident] = last;
_dirty = true;
if (last > _slaves[ident]) {
_slaves[ident] = last;
_dirty = true;
if (theReplSet && theReplSet->isPrimary()) {
theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last);
}
if (theReplSet && theReplSet->isPrimary()) {
theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last);
}
if ( ! _started ) {
// start background thread here since we definitely need it
_started = true;
go();
if ( ! _started ) {
// start background thread here since we definitely need it
_started = true;
go();
}
_threadsWaitingForReplication.notify_all();
}
_threadsWaitingForReplication.notify_all();
}
bool opReplicatedEnough( OpTime op , BSONElement w ) {
@@ -257,6 +259,28 @@ namespace mongo {
const char * SlaveTracking::NS = "local.slaves";
// parse optimes from replUpdatePositionCommand and pass them to SyncSourceFeedback
void updateSlaveLocations(BSONArray optimes) {
BSONForEach(elem, optimes) {
BSONObj entry = elem.Obj();
BSONObj id = BSON("_id" << entry["_id"].OID());
OpTime ot = entry["optime"]._opTime();
BSONObj config = entry["config"].Obj();
// update locally
slaveTracking.update(id, config, "local.oplog.rs", ot);
if (theReplSet && !theReplSet->isPrimary()) {
// pass along if we are not primary
theReplSet->syncSourceFeedback.updateMap(entry["_id"].OID(), ot);
// for to be backwards compatible
theReplSet->ghost->send(boost::bind(&GhostSync::percolate,
theReplSet->ghost,
id,
ot));
}
}
}
void updateSlaveLocation( CurOp& curop, const char * ns , OpTime lastOp ) {
if ( lastOp.isNull() )
return;

View File

@@ -29,6 +29,8 @@ namespace mongo {
class CurOp;
void updateSlaveLocations(BSONArray optimes);
void updateSlaveLocation( CurOp& curop, const char * oplog_ns , OpTime lastOp );
/** @return true if op has made it to w servers */