490 lines
23 KiB
Python
490 lines
23 KiB
Python
"""Test hook that periodically makes the primary of a replica set step down."""
|
|
|
|
import collections
|
|
import os.path
|
|
import random
|
|
import threading
|
|
import time
|
|
|
|
import bson
|
|
import pymongo.errors
|
|
|
|
from buildscripts.resmokelib import errors
|
|
from buildscripts.resmokelib import utils
|
|
from buildscripts.resmokelib.testing.hooks import interface
|
|
from buildscripts.resmokelib.testing.fixtures import replicaset
|
|
from buildscripts.resmokelib.testing.fixtures import shardedcluster
|
|
|
|
|
|
class ContinuousStepdown(interface.Hook): # pylint: disable=too-many-instance-attributes
|
|
"""Regularly connect to replica sets and send a replSetStepDown command."""
|
|
|
|
DESCRIPTION = ("Continuous stepdown (steps down the primary of replica sets at regular"
|
|
" intervals)")
|
|
|
|
def __init__( # pylint: disable=too-many-arguments
|
|
self, hook_logger, fixture, config_stepdown=True, shard_stepdown=True,
|
|
stepdown_interval_ms=8000, terminate=False, kill=False,
|
|
use_stepdown_permitted_file=False, use_stepping_down_file=False,
|
|
wait_for_mongos_retarget=False, stepdown_via_heartbeats=True):
|
|
"""Initialize the ContinuousStepdown.
|
|
|
|
Args:
|
|
hook_logger: the logger instance for this hook.
|
|
fixture: the target fixture (a replica set or sharded cluster).
|
|
config_stepdown: whether to stepdown the CSRS.
|
|
shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster.
|
|
stepdown_interval_ms: the number of milliseconds between stepdowns.
|
|
terminate: shut down the node cleanly as a means of stepping it down.
|
|
kill: With a 50% probability, kill the node instead of shutting it down cleanly.
|
|
use_stepdown_permitted_file: use a file to control if stepdown thread should do a stepdown.
|
|
use_stepping_down_file: use a file to denote when stepdown is active.
|
|
wait_for_mongos_retarget: whether to run validate on all mongoses for each collection
|
|
in each database, after pausing the stepdown thread.
|
|
stepdown_via_heartbeats: step up secondaries instead of stepping down primary.
|
|
|
|
Note that the "terminate" and "kill" arguments are named after the "SIGTERM" and
|
|
"SIGKILL" signals that are used to stop the process. On Windows, there are no signals,
|
|
so we use a different means to achieve the same result as sending SIGTERM or SIGKILL.
|
|
"""
|
|
interface.Hook.__init__(self, hook_logger, fixture, ContinuousStepdown.DESCRIPTION)
|
|
|
|
self._fixture = fixture
|
|
self._config_stepdown = config_stepdown
|
|
self._shard_stepdown = shard_stepdown
|
|
self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000
|
|
self._wait_for_mongos_retarget = wait_for_mongos_retarget
|
|
self._stepdown_via_heartbeats = stepdown_via_heartbeats
|
|
|
|
self._rs_fixtures = []
|
|
self._mongos_fixtures = []
|
|
self._stepdown_thread = None
|
|
|
|
# kill implies terminate.
|
|
self._terminate = terminate or kill
|
|
self._kill = kill
|
|
|
|
# The stepdown file names need to match the same construction as found in
|
|
# jstests/concurrency/fsm_libs/resmoke_runner.js.
|
|
dbpath_prefix = fixture.get_dbpath_prefix()
|
|
|
|
if use_stepdown_permitted_file:
|
|
self._stepdown_permitted_file = os.path.join(
|
|
dbpath_prefix, "concurrency_sharded_stepdown_stepdown_permitted")
|
|
else:
|
|
self._stepdown_permitted_file = None
|
|
if use_stepping_down_file:
|
|
self._stepping_down_file = os.path.join(dbpath_prefix,
|
|
"concurrency_sharded_stepdown_stepping_down")
|
|
else:
|
|
self._stepping_down_file = None
|
|
|
|
def before_suite(self, test_report):
|
|
"""Before suite."""
|
|
if not self._rs_fixtures:
|
|
self._add_fixture(self._fixture)
|
|
utils.remove_if_exists(self._stepdown_permitted_file)
|
|
utils.remove_if_exists(self._stepping_down_file)
|
|
self._stepdown_thread = _StepdownThread(
|
|
self.logger, self._mongos_fixtures, self._rs_fixtures, self._stepdown_interval_secs,
|
|
self._terminate, self._kill, self._stepdown_permitted_file, self._stepping_down_file,
|
|
self._wait_for_mongos_retarget, self._stepdown_via_heartbeats)
|
|
self.logger.info("Starting the stepdown thread.")
|
|
self._stepdown_thread.start()
|
|
|
|
def after_suite(self, test_report):
|
|
"""After suite."""
|
|
self.logger.info("Stopping the stepdown thread.")
|
|
self._stepdown_thread.stop()
|
|
self.logger.info("Stepdown thread stopped.")
|
|
|
|
def before_test(self, test, test_report):
|
|
"""Before test."""
|
|
self._check_thread()
|
|
self.logger.info("Resuming the stepdown thread.")
|
|
# Once the stepdown thread has started any files it creates must be deleted within the
|
|
# thread, since the Windows file handle is still open.
|
|
self._stepdown_thread.pause()
|
|
self._stepdown_thread.clean_stepdown_files()
|
|
self._stepdown_thread.resume()
|
|
|
|
def after_test(self, test, test_report):
|
|
"""After test."""
|
|
self._check_thread()
|
|
self.logger.info("Pausing the stepdown thread.")
|
|
self._stepdown_thread.pause()
|
|
self.logger.info("Paused the stepdown thread.")
|
|
|
|
def _check_thread(self):
|
|
if not self._stepdown_thread.is_alive():
|
|
msg = "The stepdown thread is not running."
|
|
self.logger.error(msg)
|
|
raise errors.ServerFailure(msg)
|
|
|
|
def _add_fixture(self, fixture):
|
|
if isinstance(fixture, replicaset.ReplicaSetFixture):
|
|
if not fixture.all_nodes_electable:
|
|
raise ValueError(
|
|
"The replica sets that are the target of the ContinuousStepdown hook must have"
|
|
" the 'all_nodes_electable' option set.")
|
|
self._rs_fixtures.append(fixture)
|
|
elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
|
|
if self._shard_stepdown:
|
|
for shard_fixture in fixture.shards:
|
|
self._add_fixture(shard_fixture)
|
|
if self._config_stepdown:
|
|
self._add_fixture(fixture.configsvr)
|
|
if self._wait_for_mongos_retarget:
|
|
for mongos_fixture in fixture.mongos:
|
|
self._mongos_fixtures.append(mongos_fixture)
|
|
|
|
|
|
class _StepdownThread(threading.Thread): # pylint: disable=too-many-instance-attributes
|
|
def __init__( # pylint: disable=too-many-arguments
|
|
self, logger, mongos_fixtures, rs_fixtures, stepdown_interval_secs, terminate, kill,
|
|
stepdown_permitted_file, stepping_down_file, wait_for_mongos_retarget,
|
|
stepdown_via_heartbeats):
|
|
"""Initialize _StepdownThread."""
|
|
threading.Thread.__init__(self, name="StepdownThread")
|
|
self.daemon = True
|
|
self.logger = logger
|
|
self._mongos_fixtures = mongos_fixtures
|
|
self._rs_fixtures = rs_fixtures
|
|
self._stepdown_interval_secs = stepdown_interval_secs
|
|
# We set the self._stepdown_duration_secs to a very long time, to ensure that the former
|
|
# primary will not step back up on its own and the stepdown thread will cause it step up via
|
|
# replSetStepUp.
|
|
self._stepdown_duration_secs = 24 * 60 * 60 # 24 hours
|
|
self._terminate = terminate
|
|
self._kill = kill
|
|
self._stepdown_permitted_file = stepdown_permitted_file
|
|
self._stepping_down_file = stepping_down_file
|
|
self._should_wait_for_mongos_retarget = wait_for_mongos_retarget
|
|
self._stepdown_via_heartbeats = stepdown_via_heartbeats
|
|
|
|
self._last_exec = time.time()
|
|
# Event set when the thread has been stopped using the 'stop()' method.
|
|
self._is_stopped_evt = threading.Event()
|
|
# Event set when the thread is not paused.
|
|
self._is_resumed_evt = threading.Event()
|
|
self._is_resumed_evt.set()
|
|
# Event set when the thread is not performing stepdowns.
|
|
self._is_idle_evt = threading.Event()
|
|
self._is_idle_evt.set()
|
|
|
|
self._step_up_stats = collections.Counter()
|
|
|
|
def run(self):
|
|
"""Execute the thread."""
|
|
if not self._rs_fixtures:
|
|
self.logger.warning("No replica set on which to run stepdowns.")
|
|
return
|
|
|
|
try:
|
|
while True:
|
|
if self.__is_stopped():
|
|
break
|
|
self._wait_for_permission_or_resume()
|
|
now = time.time()
|
|
if now - self._last_exec > self._stepdown_interval_secs:
|
|
self.logger.info("Starting stepdown of all primaries")
|
|
self._step_down_all()
|
|
# Wait until each replica set has a primary, so the test can make progress.
|
|
self._await_primaries()
|
|
self._last_exec = time.time()
|
|
self.logger.info("Completed stepdown of all primaries in %0d ms",
|
|
(self._last_exec - now) * 1000)
|
|
now = time.time()
|
|
if self._is_permitted():
|
|
# The 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment
|
|
# the last stepdown command was sent.
|
|
wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
|
|
self._wait(wait_secs)
|
|
|
|
except Exception: # pylint: disable=W0703
|
|
# Proactively log the exception when it happens so it will be
|
|
# flushed immediately.
|
|
self.logger.exception("Stepdown Thread threw exception")
|
|
|
|
def stop(self):
|
|
"""Stop the thread."""
|
|
self._is_stopped_evt.set()
|
|
# Unpause to allow the thread to finish.
|
|
self.resume()
|
|
self.join()
|
|
|
|
def __is_stopped(self):
|
|
return self._is_stopped_evt.is_set()
|
|
|
|
def pause(self):
|
|
"""Pause the thread."""
|
|
self._is_resumed_evt.clear()
|
|
# Wait until we are no longer executing stepdowns.
|
|
self._is_idle_evt.wait()
|
|
# Wait until we all the replica sets have primaries.
|
|
self._await_primaries()
|
|
# Wait for Mongos to retarget the primary for each shard and the config server.
|
|
self._do_wait_for_mongos_retarget()
|
|
|
|
def resume(self):
|
|
"""Resume the thread."""
|
|
self._is_resumed_evt.set()
|
|
|
|
self.logger.info(
|
|
"Current statistics about which nodes have been successfully stepped up: %s",
|
|
self._step_up_stats)
|
|
|
|
def _wait_for_permission_or_resume(self):
|
|
# Wait until stop, _stepdown_permitted_file or resume.
|
|
if self._stepdown_permitted_file:
|
|
while not os.path.isfile(self._stepdown_permitted_file) and not self.__is_stopped():
|
|
# Set a short sleep during busy wait time for self._stepdown_permitted_file.
|
|
self._wait(0.1)
|
|
else:
|
|
self._is_resumed_evt.wait()
|
|
|
|
def _wait(self, timeout):
|
|
# Wait until stop or timeout.
|
|
self._is_stopped_evt.wait(timeout)
|
|
|
|
def _await_primaries(self):
|
|
for fixture in self._rs_fixtures:
|
|
fixture.get_primary()
|
|
|
|
def _step_down_all(self):
|
|
self._is_idle_evt.clear()
|
|
self._stepdown_starting()
|
|
try:
|
|
if self._is_permitted():
|
|
for rs_fixture in self._rs_fixtures:
|
|
self._step_down(rs_fixture)
|
|
finally:
|
|
self._stepdown_completed()
|
|
self._is_idle_evt.set()
|
|
|
|
# pylint: disable=R0912,R0915
|
|
def _step_down(self, rs_fixture):
|
|
try:
|
|
primary = rs_fixture.get_primary(timeout_secs=self._stepdown_interval_secs)
|
|
except errors.ServerFailure:
|
|
# We ignore the ServerFailure exception because it means a primary wasn't available.
|
|
# We'll try again after self._stepdown_interval_secs seconds.
|
|
return
|
|
|
|
secondaries = rs_fixture.get_secondaries()
|
|
|
|
# Check that the fixture is still running before stepping down or killing the primary.
|
|
# This ensures we still detect some cases in which the fixture has already crashed.
|
|
if not rs_fixture.is_running():
|
|
raise errors.ServerFailure("ReplicaSetFixture {} expected to be running in"
|
|
" ContinuousStepdown, but wasn't.".format(
|
|
rs_fixture.replset_name))
|
|
|
|
if self._terminate:
|
|
should_kill = self._kill and random.choice([True, False])
|
|
action = "Killing" if should_kill else "Terminating"
|
|
self.logger.info("%s the primary on port %d of replica set '%s'.", action, primary.port,
|
|
rs_fixture.replset_name)
|
|
|
|
# We send the mongod process the signal to exit but don't immediately wait for it to
|
|
# exit because clean shutdown may take a while and we want to restore write availability
|
|
# as quickly as possible.
|
|
primary.mongod.stop(kill=should_kill)
|
|
elif not self._stepdown_via_heartbeats:
|
|
self.logger.info("Stepping down the primary on port %d of replica set '%s'.",
|
|
primary.port, rs_fixture.replset_name)
|
|
try:
|
|
client = primary.mongo_client()
|
|
client.admin.command(
|
|
bson.SON([
|
|
("replSetStepDown", self._stepdown_duration_secs),
|
|
("force", True),
|
|
]))
|
|
except pymongo.errors.AutoReconnect:
|
|
# AutoReconnect exceptions are expected as connections are closed during stepdown.
|
|
pass
|
|
except pymongo.errors.PyMongoError:
|
|
self.logger.exception(
|
|
"Error while stepping down the primary on port %d of replica set '%s'.",
|
|
primary.port, rs_fixture.replset_name)
|
|
raise
|
|
|
|
# We have skipped stepping down the primary if we want to step up secondaries instead. Here,
|
|
# we simply need to pick an arbitrary secondary to run for election which will lead to
|
|
# unconditional stepdown on the primary.
|
|
#
|
|
# If we have terminated/killed/stepped down the primary above, write availability has lost.
|
|
# We pick an arbitrary secondary to run for election immediately in order to avoid a long
|
|
# period where the replica set doesn't have write availability. If none of the secondaries
|
|
# are eligible, or their election attempt fails, then we'll run the replSetStepUp command on
|
|
# 'primary' to ensure we have write availability sooner than the
|
|
# self._stepdown_duration_secs duration expires.
|
|
while secondaries:
|
|
chosen = random.choice(secondaries)
|
|
|
|
self.logger.info("Attempting to step up the secondary on port %d of replica set '%s'.",
|
|
chosen.port, rs_fixture.replset_name)
|
|
|
|
try:
|
|
client = chosen.mongo_client()
|
|
client.admin.command("replSetStepUp")
|
|
break
|
|
except pymongo.errors.OperationFailure:
|
|
# OperationFailure exceptions are expected when the election attempt fails due to
|
|
# not receiving enough votes. This can happen when the 'chosen' secondary's opTime
|
|
# is behind that of other secondaries. We handle this by attempting to elect a
|
|
# different secondary.
|
|
self.logger.info("Failed to step up the secondary on port %d of replica set '%s'.",
|
|
chosen.port, rs_fixture.replset_name)
|
|
secondaries.remove(chosen)
|
|
|
|
if self._terminate:
|
|
self.logger.info("Waiting for the old primary on port %d of replica set '%s' to exit.",
|
|
primary.port, rs_fixture.replset_name)
|
|
|
|
primary.mongod.wait()
|
|
|
|
self.logger.info("Attempting to restart the old primary on port %d of replica set '%s.",
|
|
primary.port, rs_fixture.replset_name)
|
|
|
|
# Restart the mongod on the old primary and wait until we can contact it again. Keep the
|
|
# original preserve_dbpath to restore after restarting the mongod.
|
|
original_preserve_dbpath = primary.preserve_dbpath
|
|
primary.preserve_dbpath = True
|
|
try:
|
|
primary.setup()
|
|
primary.await_ready()
|
|
finally:
|
|
primary.preserve_dbpath = original_preserve_dbpath
|
|
elif not self._stepdown_via_heartbeats:
|
|
# If we chose to step up a secondary instead, the primary was never stepped down via the
|
|
# replSetStepDown command and thus will not have a stepdown period. So we can skip
|
|
# running {replSetFreeze: 0}. Otherwise, the replSetStepDown command run earlier
|
|
# introduced a stepdown period on the former primary and so we have to run the
|
|
# {replSetFreeze: 0} command to ensure the former primary is electable in the next
|
|
# round of _step_down().
|
|
client = primary.mongo_client()
|
|
client.admin.command({"replSetFreeze": 0})
|
|
elif secondaries:
|
|
# We successfully stepped up a secondary, wait for the former primary to step down via
|
|
# heartbeats. We need to wait for the former primary to step down to complete this step
|
|
# down round and to avoid races between the ContinuousStepdown hook and other test hooks
|
|
# that may depend on the health of the replica set.
|
|
self.logger.info(
|
|
"Successfully stepped up the secondary on port %d of replica set '%s'.",
|
|
chosen.port, rs_fixture.replset_name)
|
|
while True:
|
|
try:
|
|
client = primary.mongo_client()
|
|
is_secondary = client.admin.command("isMaster")["secondary"]
|
|
if is_secondary:
|
|
break
|
|
except pymongo.errors.AutoReconnect:
|
|
pass
|
|
self.logger.info("Waiting for primary on port %d of replica set '%s' to step down.",
|
|
primary.port, rs_fixture.replset_name)
|
|
time.sleep(0.2) # Wait a little bit before trying again.
|
|
self.logger.info("Primary on port %d of replica set '%s' stepped down.", primary.port,
|
|
rs_fixture.replset_name)
|
|
|
|
if not secondaries:
|
|
# If we failed to step up one of the secondaries, then we run the replSetStepUp to try
|
|
# and elect the former primary again. This way we don't need to wait
|
|
# self._stepdown_duration_secs seconds to restore write availability to the cluster.
|
|
# Since the former primary may have been killed, we need to wait until it has been
|
|
# restarted by retrying replSetStepUp.
|
|
retry_time_secs = rs_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
|
|
retry_start_time = time.time()
|
|
while True:
|
|
try:
|
|
client = primary.mongo_client()
|
|
client.admin.command("replSetStepUp")
|
|
break
|
|
except pymongo.errors.OperationFailure:
|
|
self._wait(0.2)
|
|
if time.time() - retry_start_time > retry_time_secs:
|
|
raise errors.ServerFailure(
|
|
"The old primary on port {} of replica set {} did not step up in"
|
|
" {} seconds.".format(client.port, rs_fixture.replset_name,
|
|
retry_time_secs))
|
|
|
|
# Bump the counter for the chosen secondary to indicate that the replSetStepUp command
|
|
# executed successfully.
|
|
key = "{}/{}".format(rs_fixture.replset_name,
|
|
chosen.get_internal_connection_string() if secondaries else "none")
|
|
self._step_up_stats[key] += 1
|
|
|
|
def _do_wait_for_mongos_retarget(self): # pylint: disable=too-many-branches
|
|
"""Run collStats on each collection in each database on each mongos.
|
|
|
|
This is to ensure mongos can target the primary for each shard with data, including the
|
|
config servers.
|
|
"""
|
|
if not self._should_wait_for_mongos_retarget:
|
|
return
|
|
|
|
for mongos_fixture in self._mongos_fixtures:
|
|
mongos_conn_str = mongos_fixture.get_internal_connection_string()
|
|
try:
|
|
client = mongos_fixture.mongo_client()
|
|
except pymongo.errors.AutoReconnect:
|
|
pass
|
|
for db in client.database_names():
|
|
self.logger.info("Waiting for mongos %s to retarget db: %s", mongos_conn_str, db)
|
|
start_time = time.time()
|
|
while True:
|
|
try:
|
|
coll_names = client[db].collection_names()
|
|
break
|
|
except pymongo.errors.NotMasterError:
|
|
pass
|
|
retarget_time = time.time() - start_time
|
|
if retarget_time >= 60:
|
|
raise RuntimeError(
|
|
"Timeout waiting for mongos: {} to retarget to db: {}".format(
|
|
mongos_conn_str, db))
|
|
time.sleep(0.2)
|
|
for coll in coll_names:
|
|
while True:
|
|
try:
|
|
client[db].command({"collStats": coll})
|
|
break
|
|
except pymongo.errors.NotMasterError:
|
|
pass
|
|
retarget_time = time.time() - start_time
|
|
if retarget_time >= 60:
|
|
raise RuntimeError(
|
|
"Timeout waiting for mongos: {} to retarget to db: {}".format(
|
|
mongos_conn_str, db))
|
|
time.sleep(0.2)
|
|
retarget_time = time.time() - start_time
|
|
self.logger.info("Finished waiting for mongos: %s to retarget db: %s, in %d ms",
|
|
mongos_conn_str, db, retarget_time * 1000)
|
|
|
|
def _is_permitted(self):
|
|
"""Permit a stepdown if the permitted file is not specified or it exists.
|
|
|
|
The self._permitted_file is created by an external framework, i.e., JS tests.
|
|
"""
|
|
if self._stepdown_permitted_file:
|
|
return os.path.isfile(self._stepdown_permitted_file)
|
|
return self._is_resumed_evt.is_set()
|
|
|
|
def _stepdown_starting(self):
|
|
"""Create self._stepping_down_file, if specified."""
|
|
if self._stepping_down_file:
|
|
if os.path.isfile(self._stepping_down_file):
|
|
raise RuntimeError("Stepping down file {} already exists".format(
|
|
self._stepping_down_file))
|
|
with open(self._stepping_down_file, "w") as fh:
|
|
fh.write("")
|
|
|
|
def _stepdown_completed(self):
|
|
"""Delete self._stepping_down_file, if specified."""
|
|
utils.remove_if_exists(self._stepping_down_file)
|
|
|
|
def clean_stepdown_files(self):
|
|
"""Remove the stepdown files."""
|
|
utils.remove_if_exists(self._stepdown_permitted_file)
|
|
utils.remove_if_exists(self._stepping_down_file)
|