Files
mongo/db/repl/heartbeat.cpp

382 lines
14 KiB
C++
Raw Normal View History

2010-05-18 16:09:46 -04:00
/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,b
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pch.h"
2010-05-29 15:45:47 -04:00
#include "rs.h"
2010-05-18 16:09:46 -04:00
#include "health.h"
#include "../../util/background.h"
#include "../../client/dbclient.h"
#include "../commands.h"
#include "../../util/concurrency/value.h"
#include "../../util/concurrency/task.h"
2010-05-18 17:21:11 -04:00
#include "../../util/concurrency/msg.h"
2010-05-18 16:09:46 -04:00
#include "../../util/mongoutils/html.h"
#include "../../util/goodies.h"
#include "../../util/ramlog.h"
#include "../helpers/dblogger.h"
#include "connections.h"
2010-05-19 09:02:27 -04:00
#include "../../util/unittest.h"
#include "../instance.h"
#include "../repl.h"
2010-05-18 16:09:46 -04:00
2011-01-04 00:40:41 -05:00
namespace mongo {
2010-05-18 17:21:11 -04:00
2010-05-18 16:09:46 -04:00
using namespace bson;
2010-07-26 16:04:54 -04:00
extern bool replSetBlind;
extern ReplSettings replSettings;
2011-07-29 13:32:08 -04:00
unsigned int HeartbeatInfo::numPings;
long long HeartbeatInfo::timeDown() const {
if( up() ) return 0;
2011-01-04 00:40:41 -05:00
if( downSince == 0 )
return 0; // still waiting on first heartbeat
return jsTime() - downSince;
}
2010-05-18 16:09:46 -04:00
/* { replSetHeartbeat : <setname> } */
2010-06-01 15:17:38 -04:00
class CmdReplSetHeartbeat : public ReplSetCommand {
2010-05-18 16:09:46 -04:00
public:
2010-06-01 15:17:38 -04:00
CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( replSetBlind ) {
if (theReplSet) {
errmsg = str::stream() << theReplSet->selfFullName() << " is blind";
}
2010-07-26 16:04:54 -04:00
return false;
}
2010-07-26 16:04:54 -04:00
2011-01-04 00:40:41 -05:00
/* we don't call ReplSetCommand::check() here because heartbeat
2010-06-01 15:17:38 -04:00
checks many things that are pre-initialization. */
2010-05-18 16:09:46 -04:00
if( !replSet ) {
2010-06-01 15:17:38 -04:00
errmsg = "not running with --replSet";
2010-05-18 16:09:46 -04:00
return false;
}
if (!checkAuth(errmsg, result)) {
return false;
2010-05-18 16:09:46 -04:00
}
2010-08-27 16:38:56 -04:00
/* we want to keep heartbeat connections open when relinquishing primary. tag them here. */
{
2011-04-05 02:24:16 -04:00
AbstractMessagingPort *mp = cc().port();
2011-01-04 00:40:41 -05:00
if( mp )
2010-08-27 16:38:56 -04:00
mp->tag |= 1;
}
2011-01-04 00:40:41 -05:00
if( cmdObj["pv"].Int() != 1 ) {
2010-05-18 16:09:46 -04:00
errmsg = "incompatible replset protocol version";
return false;
}
2010-08-02 15:21:26 -04:00
{
string s = string(cmdObj.getStringField("replSetHeartbeat"));
if( cmdLine.ourSetName() != s ) {
errmsg = "repl set names do not match";
2011-05-15 11:10:51 -04:00
log() << "replSet set names do not match, our cmdline: " << cmdLine._replSet << rsLog;
log() << "replSet s: " << s << rsLog;
2010-08-02 15:21:26 -04:00
result.append("mismatch", true);
return false;
}
2010-05-18 16:09:46 -04:00
}
2010-06-29 13:17:57 -04:00
2010-05-18 16:09:46 -04:00
result.append("rs", true);
2011-01-04 00:40:41 -05:00
if( cmdObj["checkEmpty"].trueValue() ) {
result.append("hasData", replHasDatabases());
2010-06-29 13:17:57 -04:00
}
if( theReplSet == 0 ) {
string from( cmdObj.getStringField("from") );
if( !from.empty() ) {
replSettings.discoveredSeeds.insert(from);
}
2010-05-18 16:09:46 -04:00
errmsg = "still initializing";
return false;
}
2011-01-04 00:40:41 -05:00
if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
2010-05-18 16:09:46 -04:00
errmsg = "repl set names do not match (2)";
result.append("mismatch", true);
return false;
}
result.append("set", theReplSet->name());
2010-07-22 17:50:54 -04:00
result.append("state", theReplSet->state().s);
2011-04-12 13:56:32 -04:00
result.append("e", theReplSet->iAmElectable());
2010-07-16 14:42:04 -04:00
result.append("hbmsg", theReplSet->hbmsg());
2010-08-18 14:11:42 -04:00
result.append("time", (long long) time(0));
2010-06-29 13:42:53 -04:00
result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
2010-05-18 16:09:46 -04:00
int v = theReplSet->config().version;
result.append("v", v);
if( v > cmdObj["v"].Int() )
result << "config" << theReplSet->config().asBson();
2010-06-29 13:17:57 -04:00
2010-05-18 16:09:46 -04:00
return true;
}
} cmdReplSetHeartbeat;
bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result,
int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
2011-01-04 00:40:41 -05:00
if( replSetBlind ) {
2010-07-26 16:04:54 -04:00
return false;
}
BSONObj cmd = BSON( "replSetHeartbeat" << setName <<
"v" << myCfgVersion <<
"pv" << 1 <<
"checkEmpty" << checkEmpty <<
"from" << from );
2010-07-07 18:08:16 -04:00
// generally not a great idea to do outbound waiting calls in a
// write lock. heartbeats can be slow (multisecond to respond), so
// generally we don't want to be locked, at least not without
2010-07-14 13:01:31 -04:00
// thinking acarefully about it first.
uassert(15900, "can't heartbeat: too much lock",
!dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );
2010-07-14 13:01:31 -04:00
2010-05-18 16:09:46 -04:00
ScopedConn conn(memberFullName);
return conn.runCommand("admin", cmd, result, 0);
2010-05-18 16:09:46 -04:00
}
/**
* Poll every other set member to check its status.
*
* A detail about local machines and authentication: suppose we have 2
* members, A and B, on the same machine using different keyFiles. A is
* primary. If we're just starting the set, there are no admin users, so A
* and B can access each other because it's local access.
*
* Then we add a user to A. B cannot sync this user from A, because as soon
* as we add a an admin user, A requires auth. However, A can still
* heartbeat B, because B *doesn't* have an admin user. So A can reach B
* but B cannot reach A.
*
* Once B is restarted with the correct keyFile, everything should work as
* expected.
*/
class ReplSetHealthPollTask : public task::Task {
private:
2010-05-20 12:40:22 -04:00
HostAndPort h;
2010-05-20 13:52:30 -04:00
HeartbeatInfo m;
int tries;
const int threshold;
2010-05-20 13:52:30 -04:00
public:
ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm)
: h(hh), m(mm), tries(0), threshold(15) { }
2010-05-18 16:09:46 -04:00
string name() const { return "rsHealthPoll"; }
2011-01-04 00:40:41 -05:00
void doWork() {
if ( !theReplSet ) {
2011-07-16 12:43:51 -04:00
LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog;
return;
}
2010-05-20 13:52:30 -04:00
HeartbeatInfo mem = m;
HeartbeatInfo old = mem;
2011-01-04 00:40:41 -05:00
try {
2010-05-18 16:09:46 -04:00
BSONObj info;
int theirConfigVersion = -10000;
2010-07-29 15:35:32 -04:00
bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
2011-06-09 16:16:18 -04:00
// weight new ping with old pings
// on the first ping, just use the ping value
if (old.ping != 0) {
mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
}
2010-05-18 16:09:46 -04:00
if( ok ) {
up(info, mem);
}
else if (!info["errmsg"].eoo() &&
info["errmsg"].str() == "need to login") {
authIssue(mem);
2010-05-18 16:09:46 -04:00
}
2011-01-04 00:40:41 -05:00
else {
2010-05-18 16:09:46 -04:00
down(mem, info.getStringField("errmsg"));
}
}
catch(DBException& e) {
down(mem, e.what());
}
catch(...) {
2011-05-15 11:10:51 -04:00
down(mem, "replSet unexpected exception in ReplSetHealthPollTask");
2010-05-18 16:09:46 -04:00
}
m = mem;
2010-07-26 16:19:08 -04:00
2010-05-24 08:06:05 -04:00
theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
2010-05-24 17:11:47 -04:00
static time_t last = 0;
time_t now = time(0);
2010-08-25 13:51:41 -04:00
bool changed = mem.changed(old);
2011-01-04 00:40:41 -05:00
if( changed ) {
if( old.hbstate != mem.hbstate )
2011-05-15 11:10:51 -04:00
log() << "replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog;
2010-08-25 13:51:41 -04:00
}
if( changed || now-last>4 ) {
2010-05-24 17:11:47 -04:00
last = now;
2010-06-01 16:25:47 -04:00
theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
2010-05-18 16:09:46 -04:00
}
}
private:
bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
if (tries++ % threshold == (threshold - 1)) {
ScopedConn conn(h.toString());
conn.reconnect();
}
Timer timer;
time_t before = curTimeMicros64() / 1000000;
bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
h.toString(), info, theReplSet->config().version, theirConfigVersion);
mem.ping = (unsigned int)timer.millis();
// we set this on any response - we don't get this far if
// couldn't connect because exception is thrown
time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
if ( info["time"].isNumber() ) {
long long t = info["time"].numberLong();
if( t > after )
mem.skew = (int) (t - after);
else if( t < before )
mem.skew = (int) (t - before); // negative
}
else {
// it won't be there if remote hasn't initialized yet
if( info.hasElement("time") )
warning() << "heatbeat.time isn't a number: " << info << endl;
mem.skew = INT_MIN;
}
{
be state = info["state"];
if( state.ok() )
mem.hbstate = MemberState(state.Int());
}
return ok;
}
void authIssue(HeartbeatInfo& mem) {
mem.authIssue = true;
mem.hbstate = MemberState::RS_UNKNOWN;
// set health to 0 so that this doesn't count towards majority
mem.health = 0.0;
theReplSet->rmFromElectable(mem.id());
}
2010-05-20 13:52:30 -04:00
void down(HeartbeatInfo& mem, string msg) {
mem.authIssue = false;
2010-05-18 16:09:46 -04:00
mem.health = 0.0;
mem.ping = 0;
if( mem.upSince || mem.downSince == 0 ) {
2010-05-18 16:09:46 -04:00
mem.upSince = 0;
mem.downSince = jsTime();
mem.hbstate = MemberState::RS_DOWN;
2010-11-07 14:49:20 -05:00
log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog;
2010-05-18 16:09:46 -04:00
}
mem.lastHeartbeatMsg = msg;
2011-04-12 13:56:32 -04:00
theReplSet->rmFromElectable(mem.id());
2010-05-18 16:09:46 -04:00
}
void up(const BSONObj& info, HeartbeatInfo& mem) {
HeartbeatInfo::numPings++;
mem.authIssue = false;
if( mem.upSince == 0 ) {
log() << "replSet member " << h.toString() << " is up" << rsLog;
mem.upSince = mem.lastHeartbeat;
}
mem.health = 1.0;
mem.lastHeartbeatMsg = info["hbmsg"].String();
if( info.hasElement("opTime") )
mem.opTime = info["opTime"].Date();
// see if this member is in the electable set
if( info["e"].eoo() ) {
// for backwards compatibility
const Member *member = theReplSet->findById(mem.id());
if (member && member->config().potentiallyHot()) {
theReplSet->addToElectable(mem.id());
}
else {
theReplSet->rmFromElectable(mem.id());
}
}
// add this server to the electable set if it is within 10
// seconds of the latest optime we know of
else if( info["e"].trueValue() &&
mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
if (lastOp > 0 && mem.opTime >= lastOp - 10) {
theReplSet->addToElectable(mem.id());
}
}
else {
theReplSet->rmFromElectable(mem.id());
}
be cfg = info["config"];
if( cfg.ok() ) {
// received a new config
boost::function<void()> f =
boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
theReplSet->mgr->send(f);
}
}
2010-05-18 16:09:46 -04:00
};
2011-01-04 00:40:41 -05:00
void ReplSetImpl::endOldHealthTasks() {
2010-07-20 14:58:51 -04:00
unsigned sz = healthTasks.size();
for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ )
(*i)->halt();
healthTasks.clear();
2011-01-04 00:40:41 -05:00
if( sz )
2010-07-20 14:58:51 -04:00
DEV log() << "replSet debug: cleared old tasks " << sz << endl;
}
void ReplSetImpl::startHealthTaskFor(Member *m) {
ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
healthTasks.insert(task);
2010-07-28 11:06:04 -04:00
task::repeat(task, 2000);
}
2010-07-19 13:08:24 -04:00
void startSyncThread();
2011-01-04 00:40:41 -05:00
/** called during repl set startup. caller expects it to return fairly quickly.
note ReplSet object is only created once we get a config - so this won't run
2010-05-18 16:09:46 -04:00
until the initiation.
*/
2010-06-01 16:25:47 -04:00
void ReplSetImpl::startThreads() {
2010-07-28 11:06:04 -04:00
task::fork(mgr);
2010-06-01 16:25:47 -04:00
mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
2010-07-19 13:08:24 -04:00
boost::thread t(startSyncThread);
2011-05-09 11:33:40 -04:00
2011-06-09 15:05:34 -04:00
task::fork(ghost);
2011-05-09 11:33:40 -04:00
// member heartbeats are started in ReplSetImpl::initFromConfig
2010-05-18 16:09:46 -04:00
}
}
/* todo:
stop bg job and delete on removefromset
*/