From ab2c26b6d3312f367438cfab9fdd3af932c4ce1e Mon Sep 17 00:00:00 2001 From: DanSava Date: Mon, 11 Nov 2024 17:10:04 +0200 Subject: [PATCH] Refactor usage of EverestConfig in everest server functionality --- src/everest/api/everest_data_api.py | 5 +- src/everest/bin/everest_script.py | 33 ++++- src/everest/bin/kill_script.py | 9 +- src/everest/bin/monitor_script.py | 21 +++- src/everest/bin/utils.py | 33 ++--- src/everest/bin/visualization_script.py | 6 +- src/everest/config/server_config.py | 2 +- src/everest/detached/__init__.py | 114 ++++++------------ src/everest/detached/jobs/everserver.py | 65 ++++++---- src/everest/util/forward_models.py | 2 +- .../entry_points/test_everest_entry.py | 14 ++- .../functional/test_main_everest_entry.py | 14 ++- tests/everest/test_detached.py | 74 +++++++----- tests/everest/test_everest_output.py | 20 ++- tests/everest/test_everserver.py | 45 +++++-- tests/everest/test_logging.py | 4 +- tests/everest/test_util.py | 8 +- tests/everest/utils/__init__.py | 6 +- 18 files changed, 270 insertions(+), 205 deletions(-) diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index ef95e3a0387..c6837ecda73 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -4,7 +4,7 @@ from seba_sqlite.snapshot import SebaSnapshot from ert.storage import open_storage -from everest.config import EverestConfig +from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status @@ -204,7 +204,8 @@ def output_folder(self): @property def everest_csv(self): - state = everserver_status(self._config) + status_path = ServerConfig.get_everserver_status_path(self._config.output_dir) + state = everserver_status(status_path) if state["status"] == ServerStatus.completed: return self._config.export_path else: diff --git a/src/everest/bin/everest_script.py b/src/everest/bin/everest_script.py index e7e7806f59c..6631eaf81e2 100755 --- a/src/everest/bin/everest_script.py +++ b/src/everest/bin/everest_script.py @@ -4,6 +4,7 @@ import asyncio import json import logging +import os import signal import threading from functools import partial @@ -17,6 +18,7 @@ start_server, wait_for_server, ) +from everest.strings import EVEREST from everest.util import ( makedirs_if_needed, version_info, @@ -86,8 +88,10 @@ def _build_args_parser(): async def run_everest(options): logger = logging.getLogger("everest_main") - server_state = everserver_status(options.config) - + everserver_status_path = ServerConfig.get_everserver_status_path( + options.config.output_dir + ) + server_state = everserver_status(everserver_status_path) if server_is_running(*ServerConfig.get_server_context(options.config.output_dir)): config_file = options.config.config_file print( @@ -105,13 +109,26 @@ async def run_everest(options): logger.info("Everest forward model contains job {}".format(job_name)) makedirs_if_needed(options.config.output_dir, roll_if_exists=True) + try: + output_dir = options.config.output_dir + config_file = options.config.config_file + save_config_path = os.path.join(output_dir, config_file) + options.config.dump(save_config_path) + except (OSError, LookupError) as e: + logging.getLogger(EVEREST).error( + "Failed to save optimization config: {}".format(e) + ) await start_server(options.config, options.debug) print("Waiting for server ...") - wait_for_server(options.config, timeout=600) + wait_for_server(options.config.output_dir, timeout=600) print("Everest server found!") - run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs) + run_detached_monitor( + server_context=ServerConfig.get_server_context(options.config.output_dir), + optimization_output_dir=options.config.optimization_output_dir, + show_all_jobs=options.show_all_jobs, + ) - server_state = everserver_status(options.config) + server_state = everserver_status(everserver_status_path) server_state_info = server_state["message"] if server_state["status"] == ServerStatus.failed: logger.error("Everest run failed with: {}".format(server_state_info)) @@ -120,7 +137,11 @@ async def run_everest(options): logger.info("Everest run finished with: {}".format(server_state_info)) print(server_state_info) else: - report_on_previous_run(options.config) + report_on_previous_run( + config_file=options.config.config_file, + everserver_status_path=everserver_status_path, + optimization_output_dir=options.config.optimization_output_dir, + ) if __name__ == "__main__": diff --git a/src/everest/bin/kill_script.py b/src/everest/bin/kill_script.py index 1717f108a3b..c2b6ff99736 100755 --- a/src/everest/bin/kill_script.py +++ b/src/everest/bin/kill_script.py @@ -70,13 +70,12 @@ def _handle_keyboard_interrupt(signal, frame, after=False): def kill_everest(options): - if not server_is_running( - *ServerConfig.get_server_context(options.config.output_dir) - ): + server_context = ServerConfig.get_server_context(options.config.output_dir) + if not server_is_running(*server_context): print("Server is not running.") return - stopping = stop_server(options.config) + stopping = stop_server(server_context) if threading.current_thread() is threading.main_thread(): signal.signal(signal.SIGINT, partial(_handle_keyboard_interrupt, after=True)) @@ -85,7 +84,7 @@ def kill_everest(options): return try: print("Waiting for server to stop ...") - wait_for_server_to_stop(options.config, timeout=60) + wait_for_server_to_stop(server_context, timeout=60) print("Server stopped.") except: logging.debug(traceback.format_exc()) diff --git a/src/everest/bin/monitor_script.py b/src/everest/bin/monitor_script.py index a3187f24fa5..8322f617a18 100755 --- a/src/everest/bin/monitor_script.py +++ b/src/everest/bin/monitor_script.py @@ -61,11 +61,16 @@ def _build_args_parser(): def monitor_everest(options): config: EverestConfig = options.config - server_state = everserver_status(options.config) - - if server_is_running(*ServerConfig.get_server_context(config.output_dir)): - run_detached_monitor(config, show_all_jobs=options.show_all_jobs) - server_state = everserver_status(config) + status_path = ServerConfig.get_everserver_status_path(config.output_dir) + server_state = everserver_status(status_path) + server_context = ServerConfig.get_server_context(config.output_dir) + if server_is_running(*server_context): + run_detached_monitor( + server_context=server_context, + optimization_output_dir=config.optimization_output_dir, + show_all_jobs=options.show_all_jobs, + ) + server_state = everserver_status(status_path) if server_state["status"] == ServerStatus.failed: raise SystemExit(server_state["message"]) if server_state["message"] is not None: @@ -78,7 +83,11 @@ def monitor_everest(options): f" `everest run {config_file}`" ) else: - report_on_previous_run(config) + report_on_previous_run( + config_file=config.config_file, + everserver_status_path=status_path, + optimization_output_dir=config.optimization_output_dir, + ) if __name__ == "__main__": diff --git a/src/everest/bin/utils.py b/src/everest/bin/utils.py index 188da9cd6c0..0e5939d5fbc 100644 --- a/src/everest/bin/utils.py +++ b/src/everest/bin/utils.py @@ -4,7 +4,7 @@ import traceback from dataclasses import dataclass, field from itertools import groupby -from typing import ClassVar, Dict, List +from typing import ClassVar, Dict, List, Tuple import colorama from colorama import Fore @@ -140,8 +140,7 @@ class _DetachedMonitor: INDENT = 2 FLOAT_FMT = ".5g" - def __init__(self, config, show_all_jobs): - self._config = config + def __init__(self, show_all_jobs): self._show_all_jobs: bool = show_all_jobs self._clear_lines = 0 self._batches_done = set() @@ -300,19 +299,26 @@ def _clear(self): print(colorama.Cursor.UP(), end=colorama.ansi.clear_line()) -def run_detached_monitor(config: EverestConfig, show_all_jobs: bool = False): - monitor = _DetachedMonitor(config, show_all_jobs) - start_monitor(config, callback=monitor.update) - opt_status = get_opt_status(config.optimization_output_dir) +def run_detached_monitor( + server_context: Tuple[str, str, Tuple[str, str]], + optimization_output_dir: str, + show_all_jobs: bool = False, +): + monitor = _DetachedMonitor(show_all_jobs) + start_monitor(server_context, callback=monitor.update) + opt_status = get_opt_status(optimization_output_dir) if opt_status.get("cli_monitor_data"): msg, _ = monitor.get_opt_progress(opt_status) if msg.strip(): print(f"{msg}\n") -def report_on_previous_run(config: EverestConfig): - server_state = everserver_status(config) - config_file = config.config_file +def report_on_previous_run( + config_file: str, + everserver_status_path: str, + optimization_output_dir: str, +): + server_state = everserver_status(everserver_status_path) if server_state["status"] == ServerStatus.failed: error_msg = server_state["message"] print( @@ -321,14 +327,13 @@ def report_on_previous_run(config: EverestConfig): f"` everest run --new-run {config_file}`\n" ) else: - output_dir = config.output_dir - opt_status = get_opt_status(config.optimization_output_dir) + opt_status = get_opt_status(optimization_output_dir) if opt_status.get("cli_monitor_data"): - monitor = _DetachedMonitor(config, show_all_jobs=False) + monitor = _DetachedMonitor(show_all_jobs=False) msg, _ = monitor.get_opt_progress(opt_status) print(msg + "\n") print( - f"Optimization completed, results in {output_dir}\n" + f"Optimization completed.\n" "\nTo re-run the optimization use command:\n" f" `everest run --new-run {config_file}`\n" "To export the results use command:\n" diff --git a/src/everest/bin/visualization_script.py b/src/everest/bin/visualization_script.py index 782d6b1b5c5..958983f9e10 100644 --- a/src/everest/bin/visualization_script.py +++ b/src/everest/bin/visualization_script.py @@ -4,7 +4,7 @@ from functools import partial from everest.api import EverestDataAPI -from everest.config import EverestConfig +from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status from everest.plugins.everest_plugin_manager import EverestPluginManager @@ -27,7 +27,9 @@ def visualization_entry(args=None): options = parser.parse_args(args) config = options.config_file - server_state = everserver_status(config) + server_state = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) if server_state["status"] != ServerStatus.never_run: pm = EverestPluginManager() pm.hook.visualize_data(api=EverestDataAPI(config)) diff --git a/src/everest/config/server_config.py b/src/everest/config/server_config.py index 19f219aa4b4..50fc7911634 100644 --- a/src/everest/config/server_config.py +++ b/src/everest/config/server_config.py @@ -62,7 +62,7 @@ def get_server_url(output_dir: str) -> str: return f"https://{server_info['host']}:{server_info['port']}" @staticmethod - def get_server_context(output_dir: str) -> Tuple[str, bool, Tuple[str, str]]: + def get_server_context(output_dir: str) -> Tuple[str, str, Tuple[str, str]]: """Returns a tuple with - url of the server - path to the .cert file diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index 0655b7895a8..ff8c1be0de7 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -24,10 +24,9 @@ from ert.scheduler import create_driver from ert.scheduler.driver import Driver, FailedSubmit from ert.scheduler.event import StartedEvent -from everest.config import EverestConfig, ServerConfig +from everest.config import EverestConfig, ServerConfig, SimulatorConfig from everest.config_keys import ConfigKeys as CK from everest.strings import ( - EVEREST, EVEREST_SERVER_CONFIG, OPT_PROGRESS_ENDPOINT, OPT_PROGRESS_ID, @@ -35,7 +34,6 @@ SIM_PROGRESS_ID, STOP_ENDPOINT, ) -from everest.util import configure_logger # Specifies how many times to try a http request within the specified timeout. _HTTP_REQUEST_RETRY = 10 @@ -54,35 +52,7 @@ async def start_server(config: EverestConfig, debug: bool = False) -> Driver: """ Start an Everest server running the optimization defined in the config """ - if server_is_running( - *ServerConfig.get_server_context(config.output_dir) - ): # better safe than sorry - return - - log_dir = config.log_dir - - configure_logger( - name="res", - file_path=os.path.join(log_dir, "everest_server.log"), - log_level=logging.INFO, - log_to_azure=True, - ) - - configure_logger( - name=__name__, - file_path=os.path.join(log_dir, "simulations.log"), - log_level=logging.INFO, - ) - - try: - save_config_path = os.path.join(config.output_dir, config.config_file) - config.dump(save_config_path) - except (OSError, LookupError) as e: - logging.getLogger(EVEREST).error( - "Failed to save optimization config: {}".format(e) - ) - - driver = create_driver(get_server_queue_options(config)) + driver = create_driver(get_server_queue_options(config.simulator, config.server)) try: args = ["--config-file", str(config.config_path)] if debug: @@ -96,13 +66,13 @@ async def start_server(config: EverestConfig, debug: bool = False) -> Driver: return driver -def stop_server(config: EverestConfig, retries: int = 5): +def stop_server(server_context: Tuple[str, str, Tuple[str, str]], retries: int = 5): """ Stop server if found and it is running. """ for retry in range(retries): try: - url, cert, auth = ServerConfig.get_server_context(config.output_dir) + url, cert, auth = server_context stop_endpoint = "/".join([url, STOP_ENDPOINT]) response = requests.post( stop_endpoint, @@ -124,18 +94,19 @@ def extract_errors_from_file(path: str): return re.findall(r"(Error \w+.*)", content) -def wait_for_server(config: EverestConfig, timeout: int) -> None: +def wait_for_server(output_dir: str, timeout: int) -> None: """ Checks everest server has started _HTTP_REQUEST_RETRY times. Waits progressively longer between each check. Raise an exception when the timeout is reached. """ - if not server_is_running(*ServerConfig.get_server_context(config.output_dir)): + everserver_status_path = ServerConfig.get_everserver_status_path(output_dir) + if not server_is_running(*ServerConfig.get_server_context(output_dir)): sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1) for retry_count in range(_HTTP_REQUEST_RETRY): # Failure may occur before contact with the server is established: - status = everserver_status(config) + status = everserver_status(everserver_status_path) if status["status"] == ServerStatus.completed: # For very small cases the optimization will finish and bring down the # server before we can verify that it is running. @@ -148,12 +119,11 @@ def wait_for_server(config: EverestConfig, timeout: int) -> None: sleep_time = sleep_time_increment * (2**retry_count) time.sleep(sleep_time) - if server_is_running(*ServerConfig.get_server_context(config.output_dir)): + if server_is_running(*ServerConfig.get_server_context(output_dir)): return # If number of retries reached and server is not running - throw exception - if not server_is_running(*ServerConfig.get_server_context(config.output_dir)): - raise RuntimeError("Failed to start server within configured timeout.") + raise RuntimeError("Failed to start server within configured timeout.") def get_opt_status(output_folder): @@ -187,29 +157,27 @@ def get_opt_status(output_folder): } -def wait_for_server_to_stop(config: EverestConfig, timeout): +def wait_for_server_to_stop(server_context: Tuple[str, str, Tuple[str, str]], timeout): """ Checks everest server has stoped _HTTP_REQUEST_RETRY times. Waits progressively longer between each check. Raise an exception when the timeout is reached. """ - if server_is_running(*ServerConfig.get_server_context(config.output_dir)): + if server_is_running(*server_context): sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1) for retry_count in range(_HTTP_REQUEST_RETRY): sleep_time = sleep_time_increment * (2**retry_count) time.sleep(sleep_time) - if not server_is_running( - *ServerConfig.get_server_context(config.output_dir) - ): + if not server_is_running(*server_context): return # If number of retries reached and server still running - throw exception - if server_is_running(*ServerConfig.get_server_context(config.output_dir)): + if server_is_running(*server_context): raise Exception("Failed to stop server within configured timeout.") -def server_is_running(url: str, cert: bool, auth: Tuple[str, str]): +def server_is_running(url: str, cert: str, auth: Tuple[str, str]): try: response = requests.get( url, @@ -225,24 +193,16 @@ def server_is_running(url: str, cert: bool, auth: Tuple[str, str]): return True -def get_optimization_status(config: EverestConfig): - seba_snapshot = SebaSnapshot(config.optimization_output_dir) - snapshot = seba_snapshot.get_snapshot(filter_out_gradient=True) - - return { - "objective_history": snapshot.expected_single_objective, - "control_history": snapshot.optimization_controls, - } - - -def start_monitor(config: EverestConfig, callback, polling_interval=5): +def start_monitor( + server_context: Tuple[str, str, Tuple[str, str]], callback, polling_interval=5 +): """ Checks status on Everest server and calls callback when status changes Monitoring stops when the server stops answering. It can also be interrupted by returning True from the callback """ - url, cert, auth = ServerConfig.get_server_context(config.output_dir) + url, cert, auth = server_context sim_endpoint = "/".join([url, SIM_PROGRESS_ENDPOINT]) opt_endpoint = "/".join([url, OPT_PROGRESS_ENDPOINT]) @@ -292,14 +252,17 @@ def start_monitor(config: EverestConfig, callback, polling_interval=5): } -def _find_res_queue_system(config: EverestConfig): +def _find_res_queue_system( + simulator: Optional[SimulatorConfig], + server: Optional[ServerConfig], +): queue_system_simulator: Literal["lsf", "local", "slurm", "torque"] = "local" - if config.simulator is not None: - queue_system_simulator = config.simulator.queue_system or queue_system_simulator + if simulator is not None and simulator.queue_system is not None: + queue_system_simulator = simulator.queue_system queue_system = queue_system_simulator - if config.server is not None: - queue_system = config.server.queue_system or queue_system + if server is not None: + queue_system = server.queue_system or queue_system if queue_system_simulator == CK.LOCAL and queue_system_simulator != queue_system: raise ValueError( @@ -312,10 +275,12 @@ def _find_res_queue_system(config: EverestConfig): return QueueSystem(queue_system.upper()) -def get_server_queue_options(config: EverestConfig) -> QueueOptions: - queue_system = _find_res_queue_system(config) - - ever_queue_config = config.server if config.server is not None else config.simulator +def get_server_queue_options( + simulator: Optional[SimulatorConfig], + server: Optional[ServerConfig], +) -> QueueOptions: + queue_system = _find_res_queue_system(simulator, server) + ever_queue_config = server if server is not None else simulator if queue_system == QueueSystem.LSF: queue = LsfQueueOptions( @@ -376,17 +341,17 @@ def decode(obj): def update_everserver_status( - config: EverestConfig, status: ServerStatus, message: Optional[str] = None + everserver_status_path: str, status: ServerStatus, message: Optional[str] = None ): """Update the everest server status with new status information""" new_status = {"status": status, "message": message} - path = ServerConfig.get_everserver_status_path(config.output_dir) + path = everserver_status_path if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) with open(path, "w", encoding="utf-8") as outfile: json.dump(new_status, outfile, cls=ServerStatusEncoder) elif os.path.exists(path): - server_status = everserver_status(config) + server_status = everserver_status(path) if server_status["message"] is not None: if message is not None: new_status["message"] = "{}\n{}".format( @@ -398,7 +363,7 @@ def update_everserver_status( json.dump(new_status, outfile, cls=ServerStatusEncoder) -def everserver_status(config: EverestConfig): +def everserver_status(everserver_status_path: str): """Returns a dictionary representing the everest server status. If the status file is not found we assume the server has never ran before, and will return a status of ServerStatus.never_run @@ -408,9 +373,8 @@ def everserver_status(config: EverestConfig): 'message': None } """ - path = ServerConfig.get_everserver_status_path(config.output_dir) - if os.path.exists(path): - with open(path, "r", encoding="utf-8") as f: + if os.path.exists(everserver_status_path): + with open(everserver_status_path, "r", encoding="utf-8") as f: return json.load(f, object_hook=ServerStatusEncoder.decode) else: return {"status": ServerStatus.never_run, "message": None} diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 6f7e3145f28..f75c06a1508 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -168,10 +168,7 @@ def _find_open_port(host, lower, upper): raise Exception(msg) -def _write_hostfile(config: EverestConfig, host, port, cert, auth): - # host_file_path = config.hostfile_path - host_file_path = ServerConfig.get_hostfile_path(config.output_dir) - +def _write_hostfile(host_file_path, host, port, cert, auth): if not os.path.exists(os.path.dirname(host_file_path)): os.makedirs(os.path.dirname(host_file_path)) data = { @@ -186,10 +183,11 @@ def _write_hostfile(config: EverestConfig, host, port, cert, auth): f.write(json_string) -def _configure_loggers(config: EverestConfig): - detached_node_dir = ServerConfig.get_detached_node_dir(config.output_dir) - everest_logs_dir = config.log_dir - +def _configure_loggers( + detached_node_dir: str, + everest_logs_dir: str, + logging_level: int, +): configure_logger( name="res", file_path=os.path.join(detached_node_dir, "simulations.log"), @@ -205,20 +203,20 @@ def _configure_loggers(config: EverestConfig): configure_logger( name=EVEREST, file_path=os.path.join(everest_logs_dir, "everest.log"), - log_level=config.logging_level, + log_level=logging_level, log_to_azure=True, ) configure_logger( name="forward_models", file_path=os.path.join(everest_logs_dir, "forward_models.log"), - log_level=config.logging_level, + log_level=logging_level, ) configure_logger( name="ropt", file_path=os.path.join(everest_logs_dir, "ropt.log"), - log_level=config.logging_level, + log_level=logging_level, ) @@ -230,10 +228,17 @@ def main(): config = EverestConfig.load_file(options.config_file) if options.debug: config.logging_level = "debug" + detached_dir = ServerConfig.get_detached_node_dir(config.output_dir) + status_path = ServerConfig.get_everserver_status_path(config.output_dir) + host_file = ServerConfig.get_hostfile_path(config.output_dir) try: - _configure_loggers(config) - update_everserver_status(config, ServerStatus.starting) + _configure_loggers( + detached_node_dir=detached_dir, + everest_logs_dir=config.log_dir, + logging_level=config.logging_level, + ) + update_everserver_status(status_path, ServerStatus.starting) logging.getLogger(EVEREST).info(version_info()) logging.getLogger(EVEREST).info( "Output directory: {}".format(config.output_dir) @@ -241,10 +246,12 @@ def main(): logging.getLogger(EVEREST).debug(str(options)) authentication = _generate_authentication() - cert_path, key_path, key_pw = _generate_certificate(config) + cert_path, key_path, key_pw = _generate_certificate( + ServerConfig.get_certificate_dir(config.output_dir) + ) host = get_machine_name() port = _find_open_port(host, lower=5000, upper=5800) - _write_hostfile(config, host, port, cert_path, authentication) + _write_hostfile(host_file, host, port, cert_path, authentication) shared_data = { SIM_PROGRESS_ENDPOINT: {}, @@ -268,12 +275,14 @@ def main(): everserver_instance.start() except: update_everserver_status( - config, ServerStatus.failed, message=traceback.format_exc() + status_path, + ServerStatus.failed, + message=traceback.format_exc(), ) return try: - update_everserver_status(config, ServerStatus.running) + update_everserver_status(status_path, ServerStatus.running) run_model = EverestRunModel.create( config, @@ -291,22 +300,26 @@ def main(): status, message = _get_optimization_status(run_model.exit_code, shared_data) if status != ServerStatus.completed: - update_everserver_status(config, status, message) + update_everserver_status(status_path, status, message) return except: if shared_data[STOP_ENDPOINT]: update_everserver_status( - config, ServerStatus.stopped, message="Optimization aborted." + status_path, + ServerStatus.stopped, + message="Optimization aborted.", ) else: update_everserver_status( - config, ServerStatus.failed, message=traceback.format_exc() + status_path, + ServerStatus.failed, + message=traceback.format_exc(), ) return try: # Exporting data - update_everserver_status(config, ServerStatus.exporting_to_csv) + update_everserver_status(status_path, ServerStatus.exporting_to_csv) if config.export is not None: err_msgs, export_ecl = check_for_errors( @@ -326,11 +339,12 @@ def main(): ) except: update_everserver_status( - config, ServerStatus.failed, message=traceback.format_exc() + status_path, + ServerStatus.failed, + message=traceback.format_exc(), ) return - - update_everserver_status(config, ServerStatus.completed, message=message) + update_everserver_status(status_path, ServerStatus.completed, message=message) def _get_optimization_status(exit_code, shared_data): @@ -375,7 +389,7 @@ def _failed_realizations_messages(shared_data): return messages -def _generate_certificate(config: EverestConfig): +def _generate_certificate(cert_folder: str): """Generate a private key and a certificate signed with it Both the certificate and the key are written to files in the folder given @@ -416,7 +430,6 @@ def _generate_certificate(config: EverestConfig): ) # Write certificate and key to disk - cert_folder = ServerConfig.get_certificate_dir(config.output_dir) makedirs_if_needed(cert_folder) cert_path = os.path.join(cert_folder, cert_name + ".crt") with open(cert_path, "wb") as f: diff --git a/src/everest/util/forward_models.py b/src/everest/util/forward_models.py index 23be79b5b95..3ae8d8af0c0 100644 --- a/src/everest/util/forward_models.py +++ b/src/everest/util/forward_models.py @@ -23,7 +23,7 @@ def lint_forward_model_job(job: str, args) -> List[str]: def check_forward_model_objective( forward_model_steps: List[str], objectives: Set[str] ) -> None: - if not objectives: + if not objectives or not forward_model_steps: return fm_outputs = pm.hook.custom_forward_model_outputs( forward_model_steps=forward_model_steps, diff --git a/tests/everest/entry_points/test_everest_entry.py b/tests/everest/entry_points/test_everest_entry.py index 53a47d63f27..4756d5d374a 100644 --- a/tests/everest/entry_points/test_everest_entry.py +++ b/tests/everest/entry_points/test_everest_entry.py @@ -65,10 +65,10 @@ def build_job( raise Exception("Stop! Hands in the air!") -def run_detached_monitor_mock( - config, show_all_jobs=False, status=ServerStatus.completed, error=None -): - update_everserver_status(config, status, message=error) +def run_detached_monitor_mock(status=ServerStatus.completed, error=None, **kwargs): + optimization_output = kwargs.get("optimization_output_dir") + path = os.path.join(optimization_output, "../detached_node_output/.session/status") + update_everserver_status(path, status, message=error) @patch("everest.bin.everest_script.run_detached_monitor") @@ -470,7 +470,8 @@ def test_complete_status_for_normal_run( ): everest_entry([CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status_path = ServerConfig.get_everserver_status_path(config.output_dir) + status = everserver_status(status_path) expected_status = ServerStatus.completed expected_error = None @@ -488,7 +489,8 @@ def test_complete_status_for_normal_run_monitor( ): monitor_entry([CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status_path = ServerConfig.get_everserver_status_path(config.output_dir) + status = everserver_status(status_path) expected_status = ServerStatus.completed expected_error = None diff --git a/tests/everest/functional/test_main_everest_entry.py b/tests/everest/functional/test_main_everest_entry.py index 49c363892e2..bf28d46aa34 100644 --- a/tests/everest/functional/test_main_everest_entry.py +++ b/tests/everest/functional/test_main_everest_entry.py @@ -11,7 +11,7 @@ from everest import __version__ as everest_version from everest.bin.main import start_everest -from everest.config import EverestConfig +from everest.config import EverestConfig, ServerConfig from everest.detached import ( ServerStatus, everserver_status, @@ -85,7 +85,9 @@ def test_everest_entry_run(copy_math_func_test_data_to_tmp): start_everest(["everest", "run", CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) assert status["status"] == ServerStatus.completed @@ -102,7 +104,9 @@ def test_everest_entry_run(copy_math_func_test_data_to_tmp): start_everest(["everest", "monitor", CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) assert status["status"] == ServerStatus.completed @@ -112,7 +116,9 @@ def test_everest_entry_monitor_no_run(copy_math_func_test_data_to_tmp): start_everest(["everest", "monitor", CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) assert status["status"] == ServerStatus.never_run diff --git a/tests/everest/test_detached.py b/tests/everest/test_detached.py index 839f8bd54db..fc1a4207bfc 100644 --- a/tests/everest/test_detached.py +++ b/tests/everest/test_detached.py @@ -47,17 +47,17 @@ @pytest.mark.xdist_group(name="starts_everest") async def test_https_requests(copy_math_func_test_data_to_tmp): everest_config = EverestConfig.load_file("config_minimal_slow.yml") - + status_path = ServerConfig.get_everserver_status_path(everest_config.output_dir) expected_server_status = ServerStatus.never_run - assert expected_server_status == everserver_status(everest_config)["status"] + assert expected_server_status == everserver_status(status_path)["status"] makedirs_if_needed(everest_config.output_dir, roll_if_exists=True) await start_server(everest_config) try: - wait_for_server(everest_config, 120) + wait_for_server(everest_config.output_dir, 120) except SystemExit as e: raise e - server_status = everserver_status(everest_config) + server_status = everserver_status(status_path) assert ServerStatus.running == server_status["status"] url, cert, auth = ServerConfig.get_server_context(everest_config.output_dir) @@ -82,55 +82,55 @@ async def test_https_requests(copy_math_func_test_data_to_tmp): assert server_is_running( *ServerConfig.get_server_context(everest_config.output_dir) ) - - if stop_server(everest_config): - wait_for_server_to_stop(everest_config, 60) - server_status = everserver_status(everest_config) + server_context = ServerConfig.get_server_context(everest_config.output_dir) + if stop_server(server_context): + wait_for_server_to_stop(server_context, 60) + server_status = everserver_status(status_path) # Possible the case completed while waiting for the server to stop assert server_status["status"] in [ ServerStatus.stopped, ServerStatus.completed, ] - assert not server_is_running( - *ServerConfig.get_server_context(everest_config.output_dir) - ) + assert not server_is_running(*server_context) else: - server_status = everserver_status(everest_config) + server_status = everserver_status(status_path) assert ServerStatus.stopped == server_status["status"] def test_server_status(copy_math_func_test_data_to_tmp): config = EverestConfig.load_file("config_minimal.yml") - + everserver_status_path = ServerConfig.get_everserver_status_path(config.output_dir) # Check status file does not exist before initial status update - assert not os.path.exists( - ServerConfig.get_everserver_status_path(config.output_dir) - ) - update_everserver_status(config, ServerStatus.starting) + assert not os.path.exists(everserver_status_path) + update_everserver_status(everserver_status_path, ServerStatus.starting) # Check status file exists after initial status update - assert os.path.exists(ServerConfig.get_everserver_status_path(config.output_dir)) + assert os.path.exists(everserver_status_path) # Check we can read the server status from disk - status = everserver_status(config) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.starting assert status["message"] is None err_msg_1 = "Danger the universe is preparing for implosion!!!" - update_everserver_status(config, ServerStatus.failed, message=err_msg_1) - status = everserver_status(config) + update_everserver_status( + everserver_status_path, ServerStatus.failed, message=err_msg_1 + ) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.failed assert status["message"] == err_msg_1 err_msg_2 = "Danger exotic matter detected!!!" - update_everserver_status(config, ServerStatus.failed, message=err_msg_2) - status = everserver_status(config) + update_everserver_status( + everserver_status_path, ServerStatus.failed, message=err_msg_2 + ) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.failed assert status["message"] == "{}\n{}".format(err_msg_1, err_msg_2) - update_everserver_status(config, ServerStatus.completed) - status = everserver_status(config) + update_everserver_status(everserver_status_path, ServerStatus.completed) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.completed assert status["message"] is not None assert status["message"] == "{}\n{}".format(err_msg_1, err_msg_2) @@ -141,7 +141,7 @@ def test_wait_for_server(server_is_running_mock, caplog, monkeypatch): config = EverestConfig.with_defaults() with pytest.raises(RuntimeError, match="Failed to start .* timeout"): - wait_for_server(config, timeout=1) + wait_for_server(config.output_dir, timeout=1) assert not caplog.messages @@ -183,7 +183,9 @@ def _get_reference_config(): def test_detached_mode_config_base(copy_math_func_test_data_to_tmp): everest_config, _ = _get_reference_config() - queue_config = get_server_queue_options(everest_config) + queue_config = get_server_queue_options( + everest_config.simulator, everest_config.server + ) assert queue_config == LocalQueueOptions(max_running=1) @@ -207,7 +209,9 @@ def test_everserver_queue_config_equal_to_run_config( if name is not None: simulator_config.update({"name": name}) everest_config.simulator = SimulatorConfig(**simulator_config) - server_queue_option = get_server_queue_options(everest_config) + server_queue_option = get_server_queue_options( + everest_config.simulator, everest_config.server + ) ert_config = _everest_to_ert_config_dict(everest_config) run_queue_option = ert_config["QUEUE_OPTION"] @@ -231,7 +235,9 @@ def test_detached_mode_config_only_sim(copy_math_func_test_data_to_tmp, queue_sy queue_options = [(queue_system.upper(), "MAX_RUNNING", 1)] reference.setdefault("QUEUE_OPTION", []).extend(queue_options) everest_config.simulator = SimulatorConfig(**{CK.QUEUE_SYSTEM: queue_system}) - queue_config = get_server_queue_options(everest_config) + queue_config = get_server_queue_options( + everest_config.simulator, everest_config.server + ) assert str(queue_config.name.name).lower() == queue_system @@ -244,7 +250,7 @@ def test_detached_mode_config_error(copy_math_func_test_data_to_tmp): everest_config.server = ServerConfig(name="server", queue_system="lsf") with pytest.raises(ValueError, match="so must the everest server"): - get_server_queue_options(everest_config) + get_server_queue_options(everest_config.simulator, everest_config.server) @pytest.mark.parametrize( @@ -271,14 +277,16 @@ def test_detached_mode_config_error(copy_math_func_test_data_to_tmp): ], ) def test_find_queue_system(config: EverestConfig, expected_result): - result = _find_res_queue_system(config) + result = _find_res_queue_system(config.simulator, config.server) assert result == expected_result def test_generate_queue_options_no_config(): config = EverestConfig.with_defaults(**{}) - assert get_server_queue_options(config) == LocalQueueOptions(max_running=1) + assert get_server_queue_options( + config.simulator, config.server + ) == LocalQueueOptions(max_running=1) @pytest.mark.parametrize( @@ -296,7 +304,7 @@ def test_generate_queue_options_no_config(): ) def test_generate_queue_options_use_simulator_values(queue_options, expected_result): config = EverestConfig.with_defaults(**{"simulator": queue_options}) - assert get_server_queue_options(config) == expected_result + assert get_server_queue_options(config.simulator, config.server) == expected_result @pytest.mark.timeout(5) # Simulation might not finish diff --git a/tests/everest/test_everest_output.py b/tests/everest/test_everest_output.py index 4ac98fe9dad..fa3025fdec5 100644 --- a/tests/everest/test_everest_output.py +++ b/tests/everest/test_everest_output.py @@ -1,14 +1,16 @@ import fnmatch import os import shutil +from unittest.mock import patch import pytest from ert.config import ErtConfig from ert.run_models.everest_run_model import EverestRunModel from ert.storage import open_storage +from everest.bin.everest_script import everest_entry from everest.config import EverestConfig -from everest.detached import start_server +from everest.detached import ServerStatus, start_server from everest.simulator.everest_to_ert import _everest_to_ert_config_dict from everest.strings import ( DEFAULT_OUTPUT_DIR, @@ -97,12 +99,20 @@ def useless_cb(*args, **kwargs): assert len(fnmatch.filter(final_files, "everest_output*")) == 2 -async def test_save_running_config(copy_math_func_test_data_to_tmp): +@patch("everest.bin.everest_script.server_is_running", return_value=False) +@patch("everest.bin.everest_script.run_detached_monitor") +@patch("everest.bin.everest_script.wait_for_server") +@patch("everest.bin.everest_script.start_server") +@patch( + "everest.bin.everest_script.everserver_status", + return_value={"status": ServerStatus.never_run, "message": None}, +) +def test_save_running_config(_, _1, _2, _3, _4, copy_math_func_test_data_to_tmp): + """Test everest detached, when an optimization has already run""" + # optimization already run, notify the user file_name = "config_minimal.yml" config = EverestConfig.load_file(file_name) - makedirs_if_needed(config.output_dir, roll_if_exists=True) - await start_server(config) - + everest_entry([file_name]) saved_config_path = os.path.join(config.output_dir, file_name) assert os.path.exists(saved_config_path) diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index db3d81e7d95..0455af8bc87 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -1,6 +1,8 @@ +import json import os import ssl from functools import partial +from pathlib import Path from unittest.mock import patch from ropt.enums import OptimizerExitCode @@ -19,7 +21,8 @@ def configure_everserver_logger(*args, **kwargs): def check_status(*args, **kwargs): - status = everserver_status(args[0]) + everest_server_status_path = str(Path(args[0]).parent / "status") + status = everserver_status(everest_server_status_path) assert status["status"] == kwargs["status"] @@ -50,8 +53,10 @@ def set_shared_status(context_status, event, shared_data, progress): def test_certificate_generation(copy_math_func_test_data_to_tmp): - everest_config = EverestConfig.load_file("config_minimal.yml") - cert, key, pw = everserver._generate_certificate(everest_config) + config = EverestConfig.load_file("config_minimal.yml") + cert, key, pw = everserver._generate_certificate( + ServerConfig.get_certificate_dir(config.output_dir) + ) # check that files are written assert os.path.exists(cert) @@ -62,8 +67,9 @@ def test_certificate_generation(copy_math_func_test_data_to_tmp): ctx.load_cert_chain(cert, key, pw) # raise on error -def test_hostfile_storage(copy_math_func_test_data_to_tmp): - config = EverestConfig.load_file("config_minimal.yml") +def test_hostfile_storage(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + host_file_path = "detach/.session/host_file" expected_result = { "host": "hostname.1.2.3", @@ -71,8 +77,10 @@ def test_hostfile_storage(copy_math_func_test_data_to_tmp): "cert": "/a/b/c.cert", "auth": "1234", } - everserver._write_hostfile(config, **expected_result) - result = ServerConfig.get_server_info(config.output_dir) + everserver._write_hostfile(host_file_path, **expected_result) + assert os.path.exists(host_file_path) + with open(host_file_path, encoding="utf-8") as f: + result = json.load(f) assert result == expected_result @@ -85,7 +93,9 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp): config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) assert status["status"] == ServerStatus.failed assert "Exception: Configuring logger failed" in status["message"] @@ -111,7 +121,8 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp): "ert.run_models.everest_run_model.EverestRunModel.run_experiment", autospec=True, side_effect=lambda self, evaluator_server_config, restart=False: check_status( - self.everest_config, status=ServerStatus.running + ServerConfig.get_hostfile_path(self.everest_config.output_dir), + status=ServerStatus.running, ), ) @patch( @@ -125,7 +136,9 @@ def test_everserver_status_running_complete( config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) assert status["status"] == ServerStatus.completed assert status["message"] == "Optimization completed." @@ -173,7 +186,9 @@ def test_everserver_status_failed_job( config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) # The server should fail and store a user-friendly message. assert status["status"] == ServerStatus.failed @@ -211,7 +226,9 @@ def test_everserver_status_exception( config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) # The server should fail, and store the exception that # start_optimization raised. @@ -242,7 +259,9 @@ def test_everserver_status_max_batch_num( config_file = "config_one_batch.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) # The server should complete without error. assert status["status"] == ServerStatus.completed diff --git a/tests/everest/test_logging.py b/tests/everest/test_logging.py index 7269457f633..51f7b60bfd3 100644 --- a/tests/everest/test_logging.py +++ b/tests/everest/test_logging.py @@ -34,7 +34,7 @@ async def server_running(): makedirs_if_needed(everest_config.output_dir, roll_if_exists=True) driver = await start_server(everest_config, debug=True) try: - wait_for_server(everest_config, 120) + wait_for_server(everest_config.output_dir, 120) except SystemExit as e: raise e await server_running() @@ -48,12 +48,10 @@ async def server_running(): everest_log_path = os.path.join(everest_logs_dir_path, "everest.log") forward_model_log_path = os.path.join(everest_logs_dir_path, "forward_models.log") - simulation_log_path = os.path.join(everest_logs_dir_path, "simulations.log") assert os.path.exists(everest_output_path) assert os.path.exists(everest_logs_dir_path) assert os.path.exists(forward_model_log_path) - assert os.path.exists(simulation_log_path) assert os.path.exists(everest_log_path) assert os.path.exists(endpoint_log_path) diff --git a/tests/everest/test_util.py b/tests/everest/test_util.py index edfbf58e191..3570f6aedaa 100644 --- a/tests/everest/test_util.py +++ b/tests/everest/test_util.py @@ -150,6 +150,12 @@ def test_report_on_previous_run(_, change_to_tmpdir): f.write(" ") config = EverestConfig.with_defaults(**{ConfigKeys.CONFIGPATH: "config_file"}) with capture_streams() as (out, _): - report_on_previous_run(config) + report_on_previous_run( + config_file=config.config_file, + everserver_status_path=ServerConfig.get_everserver_status_path( + config.output_dir + ), + optimization_output_dir=config.optimization_output_dir, + ) lines = [line.strip() for line in out.getvalue().split("\n")] assert lines[0] == "Optimization run failed, with error: mock error" diff --git a/tests/everest/utils/__init__.py b/tests/everest/utils/__init__.py index 26772311f1b..84ad48afd89 100644 --- a/tests/everest/utils/__init__.py +++ b/tests/everest/utils/__init__.py @@ -11,7 +11,7 @@ import pytest from everest.bin.main import start_everest -from everest.config import EverestConfig +from everest.config import EverestConfig, ServerConfig from everest.detached import ServerStatus, everserver_status from everest.jobs import script_names from everest.util import has_opm @@ -142,6 +142,8 @@ def create_cached_mocked_test_case(request, monkeypatch) -> pathlib.Path: monkeypatch.chdir("mocked_run") start_everest(["everest", "run", config_file]) config = EverestConfig.load_file(config_file) - status = everserver_status(config) + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) assert status["status"] == ServerStatus.completed return cache_path / "mocked_run"