From ca5e2e4e3334c92062cad841a53da7f69711cdcc Mon Sep 17 00:00:00 2001 From: Shreyas Kalyan Date: Thu, 1 Nov 2018 11:53:08 -0400 Subject: [PATCH] SERVER-37664 Add support for doing resmoke.py process management through jasper --- buildscripts/resmoke.py | 114 +++++++++- buildscripts/resmokelib/config.py | 5 + buildscripts/resmokelib/core/jasper.proto | 209 ++++++++++++++++++ .../resmokelib/core/jasper_process.py | 88 ++++++++ buildscripts/resmokelib/core/programs.py | 19 +- buildscripts/resmokelib/parser.py | 6 + .../testing/testcases/cpp_unittest.py | 3 +- .../testcases/mql_model_haskell_test.py | 2 +- 8 files changed, 437 insertions(+), 9 deletions(-) create mode 100644 buildscripts/resmokelib/core/jasper.proto create mode 100644 buildscripts/resmokelib/core/jasper_process.py diff --git a/buildscripts/resmoke.py b/buildscripts/resmoke.py index 1b9865dba1b..2094610d65d 100755 --- a/buildscripts/resmoke.py +++ b/buildscripts/resmoke.py @@ -4,10 +4,22 @@ from __future__ import absolute_import import os.path +import platform import random +import subprocess import sys +import tarfile import time +import pkg_resources +import requests + +try: + import grpc_tools.protoc + import grpc +except ImportError: + pass + # Get relative imports to work when the package is not installed on the PYTHONPATH. if __name__ == "__main__" and __package__ is None: sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -23,8 +35,11 @@ from buildscripts.resmokelib import suitesconfig from buildscripts.resmokelib import testing from buildscripts.resmokelib import utils +from buildscripts.resmokelib.core import process +from buildscripts.resmokelib.core import jasper_process -class Resmoke(object): + +class Resmoke(object): # pylint: disable=too-many-instance-attributes """The main class to run tests with resmoke.""" def __init__(self): @@ -34,6 +49,7 @@ class Resmoke(object): self._exec_logger = None self._resmoke_logger = None self._archive = None + self._jasper_server = None self._interrupted = False self._exit_code = 0 @@ -132,8 +148,9 @@ class Resmoke(object): suites = None try: suites = self._get_suites() - self._setup_archival() + if config.SPAWN_USING == "jasper": + self._setup_jasper() self._setup_signal_handler(suites) for suite in suites: @@ -148,6 +165,8 @@ class Resmoke(object): exit_code = max(suite.return_code for suite in suites) self.exit(exit_code) finally: + if config.SPAWN_USING == "jasper": + self._exit_jasper() self._exit_archival() if suites: reportfile.write(suites) @@ -251,6 +270,97 @@ class Resmoke(object): if self._archive and not self._interrupted: self._archive.exit() + # pylint: disable=too-many-instance-attributes,too-many-statements,too-many-locals + def _setup_jasper(self): + """Start up the jasper process manager.""" + root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + proto_file = os.path.join(root_dir, "buildscripts", "resmokelib", "core", "jasper.proto") + try: + well_known_protos_include = pkg_resources.resource_filename("grpc_tools", "_proto") + except ImportError: + raise ImportError("You must run: sys.executable + '-m pip install grpcio grpcio-tools " + "googleapis-common-protos' to use --spawnUsing=jasper.") + + # We use the build/ directory as the output directory because the generated files aren't + # meant to because tracked by git or linted. + proto_out = os.path.join(root_dir, "build", "jasper") + + utils.rmtree(proto_out, ignore_errors=True) + os.makedirs(proto_out) + + # We make 'proto_out' into a Python package so we can add it to 'sys.path' and import the + # *pb2*.py modules from it. + with open(os.path.join(proto_out, "__init__.py"), "w"): + pass + + ret = grpc_tools.protoc.main([ + grpc_tools.protoc.__file__, + "--grpc_python_out", + proto_out, + "--python_out", + proto_out, + "--proto_path", + os.path.dirname(proto_file), + "--proto_path", + well_known_protos_include, + os.path.basename(proto_file), + ]) + + if ret != 0: + raise RuntimeError("Failed to generated gRPC files from the jasper.proto file") + + sys.path.append(os.path.dirname(proto_out)) + + from jasper import jasper_pb2 + from jasper import jasper_pb2_grpc + + jasper_process.Process.jasper_pb2 = jasper_pb2 + jasper_process.Process.jasper_pb2_grpc = jasper_pb2_grpc + + curator_path = "build/curator" + git_hash = "1b8c7344aa1daed0846e32204dffb21cfdda208c" + curator_exists = os.path.isfile(curator_path) + curator_same_version = False + if curator_exists: + curator_version = subprocess.check_output([curator_path, "--version"]).split() + curator_same_version = git_hash in curator_version + + if curator_exists and not curator_same_version: + os.remove(curator_path) + self._resmoke_logger.info( + "Found a different version of curator. Downloading version %s of curator to enable" + "process management using jasper.", git_hash) + + if not curator_exists or not curator_same_version: + if sys.platform == "darwin": + os_platform = "macos" + elif sys.platform == "win32": + os_platform = "windows-64" + elif sys.platform.startswith("linux"): + os_platform = "ubuntu1604" + else: + raise OSError("Unrecognized platform. " + "This program is meant to be run on MacOS, Windows, or Linux.") + url = ("https://s3.amazonaws.com/boxes.10gen.com/build/curator/" + "curator-dist-%s-%s.tar.gz") % (os_platform, git_hash) + response = requests.get(url, stream=True) + with tarfile.open(mode="r|gz", fileobj=response.raw) as tf: + tf.extractall(path="./build/") + + jasper_port = config.BASE_PORT - 1 + jasper_conn_str = "localhost:%d" % jasper_port + jasper_process.Process.connection_str = jasper_conn_str + jasper_command = [curator_path, "jasper", "grpc", "--port", str(jasper_port)] + self._jasper_server = process.Process(self._resmoke_logger, jasper_command) + self._jasper_server.start() + + channel = grpc.insecure_channel(jasper_conn_str) + grpc.channel_ready_future(channel).result() + + def _exit_jasper(self): + if self._jasper_server: + self._jasper_server.stop() + def exit(self, exit_code): """Exit with the provided exit code.""" self._exit_code = exit_code diff --git a/buildscripts/resmokelib/config.py b/buildscripts/resmokelib/config.py index 4b742f94538..b18120c561f 100644 --- a/buildscripts/resmokelib/config.py +++ b/buildscripts/resmokelib/config.py @@ -79,6 +79,7 @@ DEFAULTS = { "shell_read_mode": None, "shell_write_mode": None, "shuffle": None, + "spawn_using": None, "stagger_jobs": None, "majority_read_concern": None, # Default is set on the commandline. "storage_engine": None, @@ -342,6 +343,10 @@ SHELL_WRITE_MODE = None # alphabetical (case-insensitive) order. SHUFFLE = None +# Possible values are python and jasper. If python, resmoke uses the python built-in subprocess +# or subprocess32 module to spawn threads. If jasper, resmoke uses the jasper module. +SPAWN_USING = None + # If true, the launching of jobs is staggered in resmoke.py. STAGGER_JOBS = None diff --git a/buildscripts/resmokelib/core/jasper.proto b/buildscripts/resmokelib/core/jasper.proto new file mode 100644 index 00000000000..a85472a6632 --- /dev/null +++ b/buildscripts/resmokelib/core/jasper.proto @@ -0,0 +1,209 @@ +syntax = "proto3"; + +package jasper; +option go_package = "internal"; + +import "google/protobuf/empty.proto"; + +message Logger { + LogType log_type = 1; + LogOptions log_options = 2; +} + +message OutputOptions { + repeated Logger loggers = 1; + bool suppress_output = 2; + bool suppress_error = 3; + bool redirect_output_to_error = 4; + bool redirect_error_to_output = 5; +} + +enum LogType { + LOGUNKNOWN = 0; + LOGBUILDLOGGERV2 = 1; + LOGBUILDLOGGERV3 = 2; + LOGDEFAULT = 3; + LOGFILE = 4; + LOGINHERIT = 5; + LOGSPLUNK = 6; + LOGSUMOLOGIC = 7; + LOGINMEMORY = 8; +} + +enum LogFormat { + LOGFORMATUNKNOWN = 0; + LOGFORMATDEFAULT = 1; + LOGFORMATJSON = 2; + LOGFORMATPLAIN = 3; +} + +message LogOptions { + BufferOptions buffer_options = 1; + BuildloggerOptions buildlogger_options = 2; + string default_prefix = 3; + string file_name = 4; + LogFormat format = 5; + int64 in_memory_cap = 6; + SplunkOptions splunk_options = 7; + string sumo_endpoint = 8; +} + +message BufferOptions { + bool buffered = 1; + int64 duration = 2; + int64 max_size = 3; +} + +message BuildloggerOptions { + bool create_test = 1; + string url = 2; + int64 number = 3; + string phase = 4; + string builder = 5; + string test = 6; + string command = 7; +} + +message SplunkOptions { + string url = 1; + string token = 2; + string channel = 3; +} + +message CreateOptions { + repeated string args = 1; + string working_directory = 2; + map environment = 3; + bool override_environ = 4; + int64 timeout_seconds = 5; + repeated string tags = 6; + repeated CreateOptions on_success = 7; + repeated CreateOptions on_failure = 8; + repeated CreateOptions on_timeout = 9; + OutputOptions output = 10; +} + +message ProcessInfo { + string id = 1; + int64 pid = 2; + string host_id = 3; + bool running = 4; + bool successful = 5; + bool complete = 6; + bool timedout = 7; + CreateOptions options = 8; + int32 exit_code = 9; +} + +message StatusResponse { + string host_id = 1; + bool active = 2; +} + +message Filter { + FilterSpecifications name = 1; +} + +enum FilterSpecifications { + ALL = 0; + RUNNING = 1; + TERMINATED = 2; + FAILED = 3; + SUCCESSFUL = 4; +} + +message SignalProcess { + JasperProcessID ProcessID = 1; + Signals signal = 2; +} + +enum Signals { + UNKNOWN = 0; + TERMINATE = 1; + KILL = 2; + HANGUP = 3; + INIT = 4; + USER1 = 5; + USER2 = 6; +} + + +message TagName { + string value = 1; +} + +message ProcessTags { + string processID = 1; + repeated string tags = 2; +} + +message JasperProcessID { + string value = 1; + +} + +message OperationOutcome { + bool success = 1; + string text = 2; + int32 exit_code = 3; +} + +message BuildOptions { + string target = 1; + string arch = 2; + string edition = 3; + bool debug = 4; +} + +message MongoDBDownloadOptions { + BuildOptions build_options = 1; + string path = 2; + repeated string releases = 3; +} + +message CacheOptions { + bool disabled = 1; + int64 prune_delay = 2; + int64 max_size = 3; +} + +enum ArchiveFormat { + ARCHIVEUNKNOWN = 0; + ARCHIVEAUTO = 1; + ARCHIVETARGZ = 2; + ARCHIVEZIP = 3; +} + +message ArchiveOptions { + bool should_extract = 1; + ArchiveFormat format = 2; + string target_path = 3; +} + +message DownloadInfo { + string url = 1; + string path = 2; + ArchiveOptions archive_opts = 3; +} + +message BuildloggerURLs { + repeated string urls = 1; +} + +service JasperProcessManager { + rpc Status(google.protobuf.Empty) returns (StatusResponse); + rpc Create(CreateOptions) returns (ProcessInfo); + rpc List(Filter) returns (stream ProcessInfo); + rpc Group(TagName) returns (stream ProcessInfo); + rpc Get(JasperProcessID) returns (ProcessInfo); + rpc Wait(JasperProcessID) returns (OperationOutcome); + rpc Signal(SignalProcess) returns (OperationOutcome); + rpc Close(google.protobuf.Empty) returns (OperationOutcome); + rpc TagProcess(ProcessTags) returns (OperationOutcome); + rpc ResetTags(JasperProcessID) returns (OperationOutcome); + rpc GetTags(JasperProcessID) returns (ProcessTags); + rpc DownloadFile(DownloadInfo) returns (OperationOutcome); + rpc DownloadMongoDB(MongoDBDownloadOptions) returns (OperationOutcome); + rpc ConfigureCache(CacheOptions) returns (OperationOutcome); + rpc GetBuildloggerURLs(JasperProcessID) returns (BuildloggerURLs); +} diff --git a/buildscripts/resmokelib/core/jasper_process.py b/buildscripts/resmokelib/core/jasper_process.py new file mode 100644 index 00000000000..e00762c663b --- /dev/null +++ b/buildscripts/resmokelib/core/jasper_process.py @@ -0,0 +1,88 @@ +"""A process management system using mongodb/jasper. + +Serves as an alternative to process.py. +""" + +from __future__ import absolute_import + +try: + import grpc +except ImportError: + pass + +from . import process as _process + + +class Process(_process.Process): + """Class for spawning a process using mongodb/jasper.""" + + jasper_pb2 = None + jasper_pb2_grpc = None + connection_str = None + + def __init__(self, logger, args, env=None, env_vars=None): + """Initialize the process with the specified logger, arguments, and environment.""" + _process.Process.__init__(self, logger, args, env=env, env_vars=env_vars) + self._id = None + self._stub = self.jasper_pb2_grpc.JasperProcessManagerStub( + grpc.insecure_channel(self.connection_str)) + self._return_code = None + + def start(self): + """Start the process and the logger pipes for its stdout and stderr.""" + log_type = self.jasper_pb2.LogType.Value("LOGINHERIT") + log_format = self.jasper_pb2.LogFormat.Value("LOGFORMATPLAIN") + log_options = self.jasper_pb2.LogOptions(format=log_format) + logger = self.jasper_pb2.Logger(log_type=log_type, log_options=log_options) + + output_opts = self.jasper_pb2.OutputOptions(loggers=[logger]) + create_options = self.jasper_pb2.CreateOptions( + args=self.args, + environment=self.env, + override_environ=True, + timeout_seconds=0, + output=output_opts, + ) + + val = self._stub.Create(create_options) + self.pid = val.pid + self._id = self.jasper_pb2.JasperProcessID(value=val.id) + self._return_code = None + + def stop(self, kill=False): + """Terminate the process.""" + signal = self.jasper_pb2.Signals.Value("TERMINATE") + if kill: + signal = self.jasper_pb2.Signals.Value("KILL") + + signal_process = self.jasper_pb2.SignalProcess(ProcessID=self._id, signal=signal) + try: + val = self._stub.Signal(signal_process) + if not val.success: + raise OSError("Unable to stop process %d." % self.pid) + except grpc.RpcError as err: + err.details = err.details() + if "cannot signal a process that has terminated" not in err.details \ + and "os: process already finished" not in err.details: + raise + + def poll(self): + """Poll.""" + if self._return_code is None: + process = self._stub.Get(self._id) + if not process.running: + self.wait() + return self._return_code + + def wait(self): + """Wait until process has terminated and all output has been consumed by the logger pipes.""" + if self._return_code is None: + try: + wait = self._stub.Wait(self._id) + self._return_code = wait.exit_code + except grpc.RpcError as err: + if "problem encountered while waiting: operation failed" not in err.details(): + raise + wait = self._stub.Get(self._id) + self._return_code = wait.exit_code + return self._return_code diff --git a/buildscripts/resmokelib/core/programs.py b/buildscripts/resmokelib/core/programs.py index 210f0b54a4a..f3518c75e8c 100644 --- a/buildscripts/resmokelib/core/programs.py +++ b/buildscripts/resmokelib/core/programs.py @@ -10,7 +10,8 @@ import os import os.path import stat -from . import process as _process +from . import jasper_process +from . import process from .. import config from .. import utils @@ -32,6 +33,14 @@ DEFAULT_EVERGREEN_MONGOD_LOG_COMPONENT_VERBOSITY = { } +def make_process(*args, **kwargs): + """Choose whether to use python built in process or jasper.""" + process_cls = process.Process + if config.SPAWN_USING == "jasper": + process_cls = jasper_process.Process + return process_cls(*args, **kwargs) + + def default_mongod_log_component_verbosity(): """Return the default 'logComponentVerbosity' value to use for mongod processes.""" if config.EVERGREEN_TASK_ID: @@ -140,7 +149,7 @@ def mongod_program( # pylint: disable=too-many-branches _set_keyfile_permissions(kwargs) process_kwargs = utils.default_if_none(process_kwargs, {}) - return _process.Process(logger, args, **process_kwargs) + return make_process(logger, args, **process_kwargs) def mongos_program(logger, executable=None, process_kwargs=None, **kwargs): @@ -164,7 +173,7 @@ def mongos_program(logger, executable=None, process_kwargs=None, **kwargs): _set_keyfile_permissions(kwargs) process_kwargs = utils.default_if_none(process_kwargs, {}) - return _process.Process(logger, args, **process_kwargs) + return make_process(logger, args, **process_kwargs) def mongo_shell_program( # pylint: disable=too-many-branches,too-many-locals,too-many-statements @@ -297,7 +306,7 @@ def mongo_shell_program( # pylint: disable=too-many-branches,too-many-locals,to _set_keyfile_permissions(test_data) process_kwargs = utils.default_if_none(process_kwargs, {}) - return _process.Process(logger, args, **process_kwargs) + return make_process(logger, args, **process_kwargs) def _format_shell_vars(sb, path, value): @@ -357,7 +366,7 @@ def generic_program(logger, args, process_kwargs=None, **kwargs): _apply_kwargs(args, kwargs) process_kwargs = utils.default_if_none(process_kwargs, {}) - return _process.Process(logger, args, **process_kwargs) + return make_process(logger, args, **process_kwargs) def _apply_set_parameters(args, set_parameter): diff --git a/buildscripts/resmokelib/parser.py b/buildscripts/resmokelib/parser.py index 1ff39226b5d..3c7969a3f19 100644 --- a/buildscripts/resmokelib/parser.py +++ b/buildscripts/resmokelib/parser.py @@ -84,6 +84,11 @@ def _make_parser(): # pylint: disable=too-many-statements parser.add_option("--genny", dest="genny_executable", metavar="PATH", help="The path to the genny executable for resmoke to use.") + parser.add_option("--spawnUsing", type="choice", dest="spawn_using", choices=("python", + "jasper"), + help=("Allows you to spawn resmoke processes using python or Jasper." + "Defaults to python. Options are 'python' or 'jasper'.")) + parser.add_option("--includeWithAnyTags", action="append", dest="include_with_any_tags", metavar="TAG1,TAG2", help=("Comma separated list of tags. For the jstest portion of the suite(s)," @@ -419,6 +424,7 @@ def _update_config_vars(values): # pylint: disable=too-many-statements _config.SERVICE_EXECUTOR = config.pop("service_executor") _config.SHELL_READ_MODE = config.pop("shell_read_mode") _config.SHELL_WRITE_MODE = config.pop("shell_write_mode") + _config.SPAWN_USING = config.pop("spawn_using") _config.STAGGER_JOBS = config.pop("stagger_jobs") == "on" _config.STORAGE_ENGINE = config.pop("storage_engine") _config.STORAGE_ENGINE_CACHE_SIZE = config.pop("storage_engine_cache_size_gb") diff --git a/buildscripts/resmokelib/testing/testcases/cpp_unittest.py b/buildscripts/resmokelib/testing/testcases/cpp_unittest.py index f9512f8feb5..b9fb427d0da 100644 --- a/buildscripts/resmokelib/testing/testcases/cpp_unittest.py +++ b/buildscripts/resmokelib/testing/testcases/cpp_unittest.py @@ -21,4 +21,5 @@ class CPPUnitTestCase(interface.ProcessTestCase): self.program_options = utils.default_if_none(program_options, {}).copy() def _make_process(self): - return core.process.Process(self.logger, [self.program_executable], **self.program_options) + return core.programs.make_process(self.logger, [self.program_executable], + **self.program_options) diff --git a/buildscripts/resmokelib/testing/testcases/mql_model_haskell_test.py b/buildscripts/resmokelib/testing/testcases/mql_model_haskell_test.py index c7fd4845bbe..7911aa1b7d6 100644 --- a/buildscripts/resmokelib/testing/testcases/mql_model_haskell_test.py +++ b/buildscripts/resmokelib/testing/testcases/mql_model_haskell_test.py @@ -38,7 +38,7 @@ class MqlModelHaskellTestCase(interface.ProcessTestCase): self.program_executable = execs[0] def _make_process(self): - return core.process.Process(self.logger, [ + return core.programs.make_process(self.logger, [ self.program_executable, "--test", self.json_test_file, "--prefix", self.top_level_dirname ])