From 4a743a6e39dffbc66833a5f51c039b84b1cf2b32 Mon Sep 17 00:00:00 2001 From: DanSava Date: Mon, 11 Nov 2024 17:10:04 +0200 Subject: [PATCH] Refactor usage of EverestConfig in everest server functionality --- src/everest/api/everest_data_api.py | 2 +- src/everest/bin/everest_script.py | 37 ++++++++-- src/everest/bin/kill_script.py | 4 +- src/everest/bin/monitor_script.py | 16 +++- src/everest/bin/utils.py | 33 +++++---- src/everest/bin/visualization_script.py | 2 +- src/everest/detached/__init__.py | 73 ++++++------------ src/everest/detached/jobs/everserver.py | 66 ++++++++++------- .../entry_points/test_everest_entry.py | 6 +- .../functional/test_main_everest_entry.py | 6 +- tests/everest/test_detached.py | 74 ++++++++++++------- tests/everest/test_everest_output.py | 6 +- tests/everest/test_everserver.py | 32 ++++---- tests/everest/test_logging.py | 12 +-- tests/everest/test_util.py | 6 +- tests/everest/utils/__init__.py | 2 +- 16 files changed, 219 insertions(+), 158 deletions(-) diff --git a/src/everest/api/everest_data_api.py b/src/everest/api/everest_data_api.py index ef95e3a0387..308beb7687b 100644 --- a/src/everest/api/everest_data_api.py +++ b/src/everest/api/everest_data_api.py @@ -204,7 +204,7 @@ def output_folder(self): @property def everest_csv(self): - state = everserver_status(self._config) + state = everserver_status(self._config.everserver_status_path) if state["status"] == ServerStatus.completed: return self._config.export_path else: diff --git a/src/everest/bin/everest_script.py b/src/everest/bin/everest_script.py index f00137e69a5..5528b98a6c6 100755 --- a/src/everest/bin/everest_script.py +++ b/src/everest/bin/everest_script.py @@ -6,6 +6,7 @@ import signal import threading from functools import partial +import os from ert.config import ErtConfig from ert.storage import open_storage @@ -19,6 +20,7 @@ wait_for_context, wait_for_server, ) +from everest.strings import EVEREST from everest.plugins.site_config_env import PluginSiteConfigEnv from everest.util import makedirs_if_needed, version_info @@ -82,7 +84,7 @@ def _build_args_parser(): def run_everest(options): logger = logging.getLogger("everest_main") - server_state = everserver_status(options.config) + server_state = everserver_status(options.config.everserver_status_path) if server_is_running(*options.config.server_context): config_file = options.config.config_file @@ -108,16 +110,35 @@ def run_everest(options): ) makedirs_if_needed(options.config.output_dir, roll_if_exists=True) + try: + output_dir = options.config.output_dir + config_file = options.config.config_file + save_config_path = os.path.join(output_dir, config_file) + options.config.dump(save_config_path) + except (OSError, LookupError) as e: + logging.getLogger(EVEREST).error( + "Failed to save optimization config: {}".format(e) + ) with open_storage(ert_config.ens_path, "w") as storage, PluginSiteConfigEnv(): - context = start_server(options.config, ert_config, storage) + context = start_server(ert_config, storage) print("Waiting for server ...") - wait_for_server(options.config, timeout=600, context=context) + #TODO options.config.server_context might not be valid when called 1st time + wait_for_server( + everserver_status_path=options.config.everserver_status_path, + server_context=options.config.server_context, + timeout=600, + context=context + ) print("Everest server found!") - run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs) + run_detached_monitor( + server_context=options.config.server_context, + optimization_output_dir=options.config.optimization_output_dir, + show_all_jobs=options.show_all_jobs + ) wait_for_context() - server_state = everserver_status(options.config) + server_state = everserver_status(options.config.everserver_status_path) server_state_info = server_state["message"] if server_state["status"] == ServerStatus.failed: logger.error("Everest run failed with: {}".format(server_state_info)) @@ -126,7 +147,11 @@ def run_everest(options): logger.info("Everest run finished with: {}".format(server_state_info)) print(server_state_info) else: - report_on_previous_run(options.config) + report_on_previous_run( + config_file=options.config.config_file, + everserver_status_path=options.config.everserver_status_path, + optimization_output_dir=options.config.optimization_output_dir, + ) if __name__ == "__main__": diff --git a/src/everest/bin/kill_script.py b/src/everest/bin/kill_script.py index c9d0c6453e7..5f8284e0b69 100755 --- a/src/everest/bin/kill_script.py +++ b/src/everest/bin/kill_script.py @@ -74,7 +74,7 @@ def kill_everest(options): print("Server is not running.") return - stopping = stop_server(options.config) + stopping = stop_server(options.config.server_context) if threading.current_thread() is threading.main_thread(): signal.signal(signal.SIGINT, partial(_handle_keyboard_interrupt, after=True)) @@ -83,7 +83,7 @@ def kill_everest(options): return try: print("Waiting for server to stop ...") - wait_for_server_to_stop(options.config, timeout=60) + wait_for_server_to_stop(options.config.server_context, timeout=60) print("Server stopped.") except: logging.debug(traceback.format_exc()) diff --git a/src/everest/bin/monitor_script.py b/src/everest/bin/monitor_script.py index 310f4bd80ca..58bdf0325d0 100755 --- a/src/everest/bin/monitor_script.py +++ b/src/everest/bin/monitor_script.py @@ -61,11 +61,15 @@ def _build_args_parser(): def monitor_everest(options): config: EverestConfig = options.config - server_state = everserver_status(options.config) + server_state = everserver_status(options.config.everserver_status_path) if server_is_running(*config.server_context): - run_detached_monitor(config, show_all_jobs=options.show_all_jobs) - server_state = everserver_status(config) + run_detached_monitor( + server_context=config.server_context, + optimization_output_dir=config.optimization_output_dir, + show_all_jobs=options.show_all_jobs + ) + server_state = everserver_status(config.everserver_status_path) if server_state["status"] == ServerStatus.failed: raise SystemExit(server_state["message"]) if server_state["message"] is not None: @@ -78,7 +82,11 @@ def monitor_everest(options): f" `everest run {config_file}`" ) else: - report_on_previous_run(config) + report_on_previous_run( + config_file=config.config_file, + everserver_status_path=config.everserver_status_path, + optimization_output_dir=config.optimization_output_dir, + ) if __name__ == "__main__": diff --git a/src/everest/bin/utils.py b/src/everest/bin/utils.py index 6a21700783e..b08034dcec7 100644 --- a/src/everest/bin/utils.py +++ b/src/everest/bin/utils.py @@ -4,7 +4,7 @@ import traceback from dataclasses import dataclass, field from itertools import groupby -from typing import ClassVar, Dict, List +from typing import ClassVar, Dict, List, Tuple import colorama from colorama import Fore @@ -140,8 +140,7 @@ class _DetachedMonitor: INDENT = 2 FLOAT_FMT = ".5g" - def __init__(self, config, show_all_jobs): - self._config = config + def __init__(self, show_all_jobs): self._show_all_jobs: bool = show_all_jobs self._clear_lines = 0 self._batches_done = set() @@ -300,19 +299,26 @@ def _clear(self): print(colorama.Cursor.UP(), end=colorama.ansi.clear_line()) -def run_detached_monitor(config: EverestConfig, show_all_jobs: bool = False): - monitor = _DetachedMonitor(config, show_all_jobs) - start_monitor(config, callback=monitor.update) - opt_status = get_opt_status(config.optimization_output_dir) +def run_detached_monitor( + server_context: Tuple[str, str, Tuple[str, str]], + optimization_output_dir: str, + show_all_jobs: bool = False, +): + monitor = _DetachedMonitor(show_all_jobs) + start_monitor(server_context, callback=monitor.update) + opt_status = get_opt_status(optimization_output_dir) if opt_status.get("cli_monitor_data"): msg, _ = monitor.get_opt_progress(opt_status) if msg.strip(): print(f"{msg}\n") -def report_on_previous_run(config: EverestConfig): - server_state = everserver_status(config) - config_file = config.config_file +def report_on_previous_run( + config_file: str, + everserver_status_path: str, + optimization_output_dir: str, +): + server_state = everserver_status(everserver_status_path) if server_state["status"] == ServerStatus.failed: error_msg = server_state["message"] print( @@ -321,14 +327,13 @@ def report_on_previous_run(config: EverestConfig): f"` everest run --new-run {config_file}`\n" ) else: - output_dir = config.output_dir - opt_status = get_opt_status(config.optimization_output_dir) + opt_status = get_opt_status(optimization_output_dir) if opt_status.get("cli_monitor_data"): - monitor = _DetachedMonitor(config, show_all_jobs=False) + monitor = _DetachedMonitor(show_all_jobs=False) msg, _ = monitor.get_opt_progress(opt_status) print(msg + "\n") print( - f"Optimization completed, results in {output_dir}\n" + f"Optimization completed.\n" "\nTo re-run the optimization use command:\n" f" `everest run --new-run {config_file}`\n" "To export the results use command:\n" diff --git a/src/everest/bin/visualization_script.py b/src/everest/bin/visualization_script.py index 782d6b1b5c5..ab864960674 100644 --- a/src/everest/bin/visualization_script.py +++ b/src/everest/bin/visualization_script.py @@ -27,7 +27,7 @@ def visualization_entry(args=None): options = parser.parse_args(args) config = options.config_file - server_state = everserver_status(config) + server_state = everserver_status(config.everserver_status_path) if server_state["status"] != ServerStatus.never_run: pm = EverestPluginManager() pm.hook.visualize_data(api=EverestDataAPI(config)) diff --git a/src/everest/detached/__init__.py b/src/everest/detached/__init__.py index c882fef63c4..583707abaf2 100644 --- a/src/everest/detached/__init__.py +++ b/src/everest/detached/__init__.py @@ -19,7 +19,6 @@ from everest.config import EverestConfig from everest.config_keys import ConfigKeys as CK from everest.strings import ( - EVEREST, EVEREST_SERVER_CONFIG, OPT_PROGRESS_ENDPOINT, OPT_PROGRESS_ID, @@ -55,28 +54,10 @@ _context = None -def start_server(config: EverestConfig, ert_config: ErtConfig, storage): +def start_server(ert_config: ErtConfig, storage): """ Start an Everest server running the optimization defined in the config """ - if server_is_running(*config.server_context): # better safe than sorry - return - - log_dir = config.log_dir - - configure_logger( - name="res", - file_path=os.path.join(log_dir, "everest_server.log"), - log_level=logging.INFO, - log_to_azure=True, - ) - - configure_logger( - name=__name__, - file_path=os.path.join(log_dir, "simulations.log"), - log_level=logging.INFO, - ) - global _server # noqa: PLW0603 global _context # noqa: PLW0603 if _context and _context.running(): @@ -85,14 +66,6 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage): "in the same process is not allowed!" ) - try: - save_config_path = os.path.join(config.output_dir, config.config_file) - config.dump(save_config_path) - except (OSError, LookupError) as e: - logging.getLogger(EVEREST).error( - "Failed to save optimization config: {}".format(e) - ) - experiment = storage.create_experiment( name=f"DetachedEverest@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}", parameters=[], @@ -137,13 +110,13 @@ def wait_for_context(): time.sleep(1) -def stop_server(config: EverestConfig, retries: int = 5): +def stop_server(server_context: Tuple[str, str, Tuple[str, str]], retries: int = 5): """ Stop server if found and it is running. """ for retry in range(retries): try: - url, cert, auth = config.server_context + url, cert, auth = server_context stop_endpoint = "/".join([url, STOP_ENDPOINT]) response = requests.post( stop_endpoint, @@ -166,7 +139,10 @@ def extract_errors_from_file(path: str): def wait_for_server( - config: EverestConfig, timeout: int, context: Optional[BatchContext] = None + everserver_status_path: str, + server_context: Tuple[str, str, Tuple[str, str]], + timeout: int, + context: Optional[BatchContext] = None ) -> None: """ Checks everest server has started _HTTP_REQUEST_RETRY times. Waits @@ -174,11 +150,11 @@ def wait_for_server( Raise an exception when the timeout is reached. """ - if not server_is_running(*config.server_context): + if not server_is_running(*server_context): sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1) for retry_count in range(_HTTP_REQUEST_RETRY): # Failure may occur before contact with the server is established: - status = everserver_status(config) + status = everserver_status(everserver_status_path) if status["status"] == ServerStatus.completed: # For very small cases the optimization will finish and bring down the # server before we can verify that it is running. @@ -196,7 +172,7 @@ def wait_for_server( path = context.job_progress(0).steps[0].std_err_file for err in extract_errors_from_file(path): update_everserver_status( - config, ServerStatus.failed, message=err + everserver_status_path, ServerStatus.failed, message=err ) logging.error(err) raise SystemExit("Failed to start Everest server.") @@ -218,11 +194,11 @@ def wait_for_server( sleep_time = sleep_time_increment * (2**retry_count) time.sleep(sleep_time) - if server_is_running(*config.server_context): + if server_is_running(*server_context): return # If number of retries reached and server is not running - throw exception - if not server_is_running(*config.server_context): + if not server_is_running(*server_context): raise RuntimeError("Failed to start server within configured timeout.") @@ -257,23 +233,23 @@ def get_opt_status(output_folder): } -def wait_for_server_to_stop(config: EverestConfig, timeout): +def wait_for_server_to_stop(server_context: Tuple[str, str, Tuple[str, str]], timeout): """ Checks everest server has stoped _HTTP_REQUEST_RETRY times. Waits progressively longer between each check. Raise an exception when the timeout is reached. """ - if server_is_running(*config.server_context): + if server_is_running(*server_context): sleep_time_increment = float(timeout) / (2**_HTTP_REQUEST_RETRY - 1) for retry_count in range(_HTTP_REQUEST_RETRY): sleep_time = sleep_time_increment * (2**retry_count) time.sleep(sleep_time) - if not server_is_running(*config.server_context): + if not server_is_running(*server_context): return # If number of retries reached and server still running - throw exception - if server_is_running(*config.server_context): + if server_is_running(*server_context): raise Exception("Failed to stop server within configured timeout.") @@ -303,14 +279,14 @@ def get_optimization_status(config: EverestConfig): } -def start_monitor(config: EverestConfig, callback, polling_interval=5): +def start_monitor(server_context: Tuple[str, str, Tuple[str, str]], callback, polling_interval=5): """ Checks status on Everest server and calls callback when status changes Monitoring stops when the server stops answering. It can also be interrupted by returning True from the callback """ - url, cert, auth = config.server_context + url, cert, auth = server_context sim_endpoint = "/".join([url, SIM_PROGRESS_ENDPOINT]) opt_endpoint = "/".join([url, OPT_PROGRESS_ENDPOINT]) @@ -528,17 +504,17 @@ def decode(obj): def update_everserver_status( - config: EverestConfig, status: ServerStatus, message: Optional[str] = None + everserver_status_path: str, status: ServerStatus, message: Optional[str] = None ): """Update the everest server status with new status information""" new_status = {"status": status, "message": message} - path = config.everserver_status_path + path = everserver_status_path if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) with open(path, "w", encoding="utf-8") as outfile: json.dump(new_status, outfile, cls=ServerStatusEncoder) elif os.path.exists(path): - server_status = everserver_status(config) + server_status = everserver_status(path) if server_status["message"] is not None: if message is not None: new_status["message"] = "{}\n{}".format( @@ -550,7 +526,7 @@ def update_everserver_status( json.dump(new_status, outfile, cls=ServerStatusEncoder) -def everserver_status(config: EverestConfig): +def everserver_status(everserver_status_path: str): """Returns a dictionary representing the everest server status. If the status file is not found we assume the server has never ran before, and will return a status of ServerStatus.never_run @@ -560,9 +536,8 @@ def everserver_status(config: EverestConfig): 'message': None } """ - path = config.everserver_status_path - if os.path.exists(path): - with open(path, "r", encoding="utf-8") as f: + if os.path.exists(everserver_status_path): + with open(everserver_status_path, "r", encoding="utf-8") as f: return json.load(f, object_hook=ServerStatusEncoder.decode) else: return {"status": ServerStatus.never_run, "message": None} diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 1aeb9124a0e..fa2a2051bb5 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -88,7 +88,7 @@ def _opt_monitor(shared_data=None): return "stop_optimization" -def _everserver_thread(shared_data, server_config): +def _everserver_thread(shared_data, server_config, logger): app = Flask(__name__) def check_user(password): @@ -107,10 +107,11 @@ def decorated(*args, **kwargs): def log(f): @wraps(f) def decorated(*args, **kwargs): + print("here!!!!") url = request.path method = request.method ip = request.environ.get("HTTP_X_REAL_IP", request.remote_addr) - logging.getLogger("everserver").info( + logger.info( "{} entered from {} with HTTP {}".format(url, ip, method) ) return f(*args, **kwargs) @@ -134,12 +135,18 @@ def stop(): @requires_authenticated @log def get_sim_progress(): + logger.info( + "called get_sim_progress" + ) return jsonify(shared_data[SIM_PROGRESS_ENDPOINT]) @app.route("/" + OPT_PROGRESS_ENDPOINT) @requires_authenticated @log def get_opt_progress(): + logger.info( + "called get_opt_progress" + ) progress = get_opt_status(server_config["optimization_output_dir"]) return jsonify(progress) @@ -149,6 +156,7 @@ def get_opt_progress(): server_config["key_path"], server_config["key_passwd"], ) + logger.info("starting flask") app.run(host="0.0.0.0", port=server_config["port"], ssl_context=ctx) @@ -168,8 +176,7 @@ def _find_open_port(host, lower, upper): raise Exception(msg) -def _write_hostfile(config: EverestConfig, host, port, cert, auth): - host_file_path = config.hostfile_path +def _write_hostfile(host_file_path, host, port, cert, auth): if not os.path.exists(os.path.dirname(host_file_path)): os.makedirs(os.path.dirname(host_file_path)) data = { @@ -184,9 +191,11 @@ def _write_hostfile(config: EverestConfig, host, port, cert, auth): f.write(json_string) -def _configure_loggers(config: EverestConfig): - detached_node_dir = config.detached_node_dir - everest_logs_dir = config.log_dir +def _configure_loggers( + detached_node_dir: str, + everest_logs_dir: str, + logging_level: int, +): configure_logger( name="res", @@ -203,20 +212,20 @@ def _configure_loggers(config: EverestConfig): configure_logger( name=EVEREST, file_path=os.path.join(everest_logs_dir, "everest.log"), - log_level=config.logging_level, + log_level=logging_level, log_to_azure=True, ) configure_logger( name="forward_models", file_path=os.path.join(everest_logs_dir, "forward_models.log"), - log_level=config.logging_level, + log_level=logging_level, ) configure_logger( name="ropt", file_path=os.path.join(everest_logs_dir, "ropt.log"), - log_level=config.logging_level, + log_level=logging_level, ) @@ -230,8 +239,12 @@ def main(): config.logging_level = "debug" try: - _configure_loggers(config) - update_everserver_status(config, ServerStatus.starting) + _configure_loggers( + detached_node_dir=config.detached_node_dir, + everest_logs_dir=config.log_dir, + logging_level=config.logging_level, + ) + update_everserver_status(config.everserver_status_path, ServerStatus.starting) logging.getLogger(EVEREST).info(version_info()) logging.getLogger(EVEREST).info( "Output directory: {}".format(config.output_dir) @@ -239,10 +252,10 @@ def main(): logging.getLogger(EVEREST).debug(str(options)) authentication = _generate_authentication() - cert_path, key_path, key_pw = _generate_certificate(config) + cert_path, key_path, key_pw = _generate_certificate(config.certificate_dir) host = get_machine_name() port = _find_open_port(host, lower=5000, upper=5800) - _write_hostfile(config, host, port, cert_path, authentication) + _write_hostfile(config.hostfile_path, host, port, cert_path, authentication) shared_data = { SIM_PROGRESS_ENDPOINT: {}, @@ -260,18 +273,18 @@ def main(): everserver_instance = threading.Thread( target=_everserver_thread, - args=(shared_data, server_config), + args=(shared_data, server_config, logging.getLogger("everserver")), ) everserver_instance.daemon = True everserver_instance.start() except: update_everserver_status( - config, ServerStatus.failed, message=traceback.format_exc() + config.everserver_status_path, ServerStatus.failed, message=traceback.format_exc() ) return try: - update_everserver_status(config, ServerStatus.running) + update_everserver_status(config.everserver_status_path, ServerStatus.running) run_model = EverestRunModel.create( config, @@ -289,22 +302,22 @@ def main(): status, message = _get_optimization_status(run_model.exit_code, shared_data) if status != ServerStatus.completed: - update_everserver_status(config, status, message) + update_everserver_status(config.everserver_status_path, status, message) return except: if shared_data[STOP_ENDPOINT]: update_everserver_status( - config, ServerStatus.stopped, message="Optimization aborted." + config.everserver_status_path, ServerStatus.stopped, message="Optimization aborted." ) else: update_everserver_status( - config, ServerStatus.failed, message=traceback.format_exc() + config.everserver_status_path, ServerStatus.failed, message=traceback.format_exc() ) return try: # Exporting data - update_everserver_status(config, ServerStatus.exporting_to_csv) + update_everserver_status(config.everserver_status_path, ServerStatus.exporting_to_csv) if config.export is not None: err_msgs, export_ecl = check_for_errors( @@ -324,11 +337,13 @@ def main(): ) except: update_everserver_status( - config, ServerStatus.failed, message=traceback.format_exc() + config.everserver_status_path, ServerStatus.failed, message=traceback.format_exc() ) return - - update_everserver_status(config, ServerStatus.completed, message=message) + logging.getLogger("everserver").info( + "DONE!" + ) + update_everserver_status(config.everserver_status_path, ServerStatus.completed, message=message) def _get_optimization_status(exit_code, shared_data): @@ -373,7 +388,7 @@ def _failed_realizations_messages(shared_data): return messages -def _generate_certificate(config: EverestConfig): +def _generate_certificate(cert_folder: str): """Generate a private key and a certificate signed with it Both the certificate and the key are written to files in the folder given @@ -414,7 +429,6 @@ def _generate_certificate(config: EverestConfig): ) # Write certificate and key to disk - cert_folder = config.certificate_dir makedirs_if_needed(cert_folder) cert_path = os.path.join(cert_folder, cert_name + ".crt") with open(cert_path, "wb") as f: diff --git a/tests/everest/entry_points/test_everest_entry.py b/tests/everest/entry_points/test_everest_entry.py index a5693af2788..f6d40076063 100644 --- a/tests/everest/entry_points/test_everest_entry.py +++ b/tests/everest/entry_points/test_everest_entry.py @@ -68,7 +68,7 @@ def build_job( def run_detached_monitor_mock( config, show_all_jobs=False, status=ServerStatus.completed, error=None ): - update_everserver_status(config, status, message=error) + update_everserver_status(config.everserver_status_path, status, message=error) @patch("everest.bin.everest_script.run_detached_monitor") @@ -474,7 +474,7 @@ def test_complete_status_for_normal_run( ): everest_entry([CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) expected_status = ServerStatus.completed expected_error = None @@ -492,7 +492,7 @@ def test_complete_status_for_normal_run_monitor( ): monitor_entry([CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) expected_status = ServerStatus.completed expected_error = None diff --git a/tests/everest/functional/test_main_everest_entry.py b/tests/everest/functional/test_main_everest_entry.py index 279e63bc3b0..b98958d3957 100644 --- a/tests/everest/functional/test_main_everest_entry.py +++ b/tests/everest/functional/test_main_everest_entry.py @@ -87,7 +87,7 @@ def test_everest_entry_run(copy_math_func_test_data_to_tmp): start_everest(["everest", "run", CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) assert status["status"] == ServerStatus.completed @@ -106,7 +106,7 @@ def test_everest_entry_run(copy_math_func_test_data_to_tmp): start_everest(["everest", "monitor", CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) assert status["status"] == ServerStatus.completed @@ -118,7 +118,7 @@ def test_everest_entry_monitor_no_run(copy_math_func_test_data_to_tmp): start_everest(["everest", "monitor", CONFIG_FILE_MINIMAL]) config = EverestConfig.load_file(CONFIG_FILE_MINIMAL) - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) assert status["status"] == ServerStatus.never_run diff --git a/tests/everest/test_detached.py b/tests/everest/test_detached.py index 5955ce8ac04..4637bec44cb 100644 --- a/tests/everest/test_detached.py +++ b/tests/everest/test_detached.py @@ -59,7 +59,7 @@ def job_progress(*args): return job_progress -@pytest.mark.flaky(reruns=5) +# @pytest.mark.flaky(reruns=5) @pytest.mark.integration_test @pytest.mark.fails_on_macos_github_workflow @pytest.mark.xdist_group(name="starts_everest") @@ -67,21 +67,26 @@ def test_https_requests(copy_math_func_test_data_to_tmp): everest_config = EverestConfig.load_file("config_minimal_slow.yml") expected_server_status = ServerStatus.never_run - assert expected_server_status == everserver_status(everest_config)["status"] + assert expected_server_status == everserver_status(everest_config.everserver_status_path)["status"] wait_for_context() ert_config = ErtConfig.with_plugins().from_dict( generate_everserver_ert_config(everest_config) ) makedirs_if_needed(everest_config.output_dir, roll_if_exists=True) with open_storage(ert_config.ens_path, "w") as storage: - start_server(everest_config, ert_config, storage) + context = start_server(ert_config, storage) try: - wait_for_server(everest_config, 120) + wait_for_server( + everserver_status_path=everest_config.everserver_status_path, + server_context=everest_config.server_context, + timeout=120, + context=context + ) except SystemExit as e: context_stop_and_wait() raise e - server_status = everserver_status(everest_config) + server_status = everserver_status(everest_config.everserver_status_path) assert ServerStatus.running == server_status["status"] url, cert, auth = everest_config.server_context @@ -105,10 +110,10 @@ def test_https_requests(copy_math_func_test_data_to_tmp): # Test stopping server assert server_is_running(*everest_config.server_context) - if stop_server(everest_config): - wait_for_server_to_stop(everest_config, 60) + if stop_server(everest_config.server_context): + wait_for_server_to_stop(everest_config.server_context, 60) context_stop_and_wait() - server_status = everserver_status(everest_config) + server_status = everserver_status(everest_config.everserver_status_path) # Possible the case completed while waiting for the server to stop assert server_status["status"] in [ @@ -118,39 +123,39 @@ def test_https_requests(copy_math_func_test_data_to_tmp): assert not server_is_running(*everest_config.server_context) else: context_stop_and_wait() - server_status = everserver_status(everest_config) + server_status = everserver_status(everest_config.everserver_status_path) assert ServerStatus.stopped == server_status["status"] def test_server_status(copy_math_func_test_data_to_tmp): config = EverestConfig.load_file("config_minimal.yml") - + everserver_status_path = config.everserver_status_path # Check status file does not exist before initial status update - assert not os.path.exists(config.everserver_status_path) - update_everserver_status(config, ServerStatus.starting) + assert not os.path.exists(everserver_status_path) + update_everserver_status(everserver_status_path, ServerStatus.starting) # Check status file exists after initial status update - assert os.path.exists(config.everserver_status_path) + assert os.path.exists(everserver_status_path) # Check we can read the server status from disk - status = everserver_status(config) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.starting assert status["message"] is None err_msg_1 = "Danger the universe is preparing for implosion!!!" - update_everserver_status(config, ServerStatus.failed, message=err_msg_1) - status = everserver_status(config) + update_everserver_status(everserver_status_path, ServerStatus.failed, message=err_msg_1) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.failed assert status["message"] == err_msg_1 err_msg_2 = "Danger exotic matter detected!!!" - update_everserver_status(config, ServerStatus.failed, message=err_msg_2) - status = everserver_status(config) + update_everserver_status(everserver_status_path, ServerStatus.failed, message=err_msg_2) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.failed assert status["message"] == "{}\n{}".format(err_msg_1, err_msg_2) - update_everserver_status(config, ServerStatus.completed) - status = everserver_status(config) + update_everserver_status(everserver_status_path, ServerStatus.completed) + status = everserver_status(everserver_status_path) assert status["status"] == ServerStatus.completed assert status["message"] is not None assert status["message"] == "{}\n{}".format(err_msg_1, err_msg_2) @@ -164,12 +169,21 @@ def test_wait_for_server( config = EverestConfig.load_file("valid_yaml_config.yml") with caplog.at_level(logging.DEBUG), pytest.raises(RuntimeError): - wait_for_server(config, timeout=1, context=None) + wait_for_server( + everserver_status_path=config.everserver_status_path, + server_context=config.server_context, + timeout=1, + ) assert not caplog.messages context = MockContext() with caplog.at_level(logging.DEBUG), pytest.raises(SystemExit): - wait_for_server(config, timeout=120, context=context) + wait_for_server( + everserver_status_path=config.everserver_status_path, + server_context=config.server_context, + timeout=120, + context=context, + ) expected_error_msg = ( 'Error when parsing config_file:"DISTANCE3" ' @@ -181,7 +195,7 @@ def test_wait_for_server( assert expected_error_msg in "\n".join(caplog.messages) - server_status = everserver_status(config) + server_status = everserver_status(config.everserver_status_path) assert server_status["status"] == ServerStatus.failed assert server_status["message"] == expected_error_msg @@ -204,7 +218,12 @@ def get_job_state(*args): return JobState.WAITING with caplog.at_level(logging.ERROR), pytest.raises(RuntimeError): - wait_for_server(config, timeout=1, context=_MockContext()) + wait_for_server( + everserver_status_path=config.everserver_status_path, + server_context=config.server_context, + timeout=1, + context=_MockContext(), + ) assert ( "Race condition in wait_for_server, job did fail but is now in WAITING" @@ -230,7 +249,12 @@ def get_job_state(*args): raise IndexError("Some trackback") with caplog.at_level(logging.ERROR), pytest.raises(SystemExit): - wait_for_server(config, timeout=1, context=_MockContext()) + wait_for_server( + everserver_status_path=config.everserver_status_path, + server_context=config.server_context, + timeout=1, + context=_MockContext(), + ) assert any( "Race condition in wait_for_server, failed job removed from scheduler" diff --git a/tests/everest/test_everest_output.py b/tests/everest/test_everest_output.py index 6719795b07c..6c6cf100582 100644 --- a/tests/everest/test_everest_output.py +++ b/tests/everest/test_everest_output.py @@ -79,7 +79,7 @@ def useless_cb(*args, **kwargs): ) makedirs_if_needed(config.output_dir, roll_if_exists=True) with open_storage(ert_config.ens_path, "w") as storage: - start_server(config, ert_config, storage) + start_server(ert_config, storage) start_mock.assert_called_once() (path, folders, files) = next(os.walk(config_folder)) @@ -98,7 +98,7 @@ def useless_cb(*args, **kwargs): assert "storage" not in final_folders makedirs_if_needed(config.output_dir, roll_if_exists=True) with open_storage(ert_config.ens_path, "w") as storage: - start_server(config, ert_config, storage) + start_server(ert_config, storage) assert start_mock.call_count == 2 final_files = os.listdir(config_folder) @@ -115,7 +115,7 @@ def test_save_running_config(start_mock, copy_math_func_test_data_to_tmp): ) makedirs_if_needed(config.output_dir, roll_if_exists=True) with open_storage(ert_config.ens_path, "w") as storage: - start_server(config, ert_config, storage) + start_server(ert_config, storage) start_mock.assert_called_once() saved_config_path = os.path.join(config.output_dir, file_name) diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index c5bd3f50293..fe60e08467b 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -2,7 +2,7 @@ import ssl from functools import partial from unittest.mock import patch - +import json from ropt.enums import OptimizerExitCode from seba_sqlite.snapshot import SebaSnapshot @@ -11,7 +11,7 @@ from everest.detached.jobs import everserver from everest.simulator import JOB_FAILURE, JOB_SUCCESS from everest.strings import OPT_FAILURE_REALIZATIONS, SIM_PROGRESS_ENDPOINT - +from pathlib import Path def configure_everserver_logger(*args, **kwargs): """Mock exception raised""" @@ -19,7 +19,8 @@ def configure_everserver_logger(*args, **kwargs): def check_status(*args, **kwargs): - status = everserver_status(args[0]) + everest_server_status_path = str(Path(args[0]).parent / "status") + status = everserver_status(everest_server_status_path) assert status["status"] == kwargs["status"] @@ -51,7 +52,7 @@ def set_shared_status(context_status, event, shared_data, progress): def test_certificate_generation(copy_math_func_test_data_to_tmp): everest_config = EverestConfig.load_file("config_minimal.yml") - cert, key, pw = everserver._generate_certificate(everest_config) + cert, key, pw = everserver._generate_certificate(everest_config.certificate_dir) # check that files are written assert os.path.exists(cert) @@ -62,8 +63,9 @@ def test_certificate_generation(copy_math_func_test_data_to_tmp): ctx.load_cert_chain(cert, key, pw) # raise on error -def test_hostfile_storage(copy_math_func_test_data_to_tmp): - config = EverestConfig.load_file("config_minimal.yml") +def test_hostfile_storage(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + host_file_path = "detach/.session/host_file" expected_result = { "host": "hostname.1.2.3", @@ -71,8 +73,10 @@ def test_hostfile_storage(copy_math_func_test_data_to_tmp): "cert": "/a/b/c.cert", "auth": "1234", } - everserver._write_hostfile(config, **expected_result) - result = config.server_info + everserver._write_hostfile(host_file_path, **expected_result) + assert os.path.exists(host_file_path) + with open(host_file_path, encoding="utf-8") as f: + result = json.load(f) assert result == expected_result @@ -85,7 +89,7 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp): config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) assert status["status"] == ServerStatus.failed assert "Exception: Configuring logger failed" in status["message"] @@ -111,7 +115,7 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp): "ert.run_models.everest_run_model.EverestRunModel.run_experiment", autospec=True, side_effect=lambda self, evaluator_server_config, restart=False: check_status( - self.everest_config, status=ServerStatus.running + self.everest_config.hostfile_path, status=ServerStatus.running ), ) @patch( @@ -125,7 +129,7 @@ def test_everserver_status_running_complete( config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) assert status["status"] == ServerStatus.completed assert status["message"] == "Optimization completed." @@ -173,7 +177,7 @@ def test_everserver_status_failed_job( config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) # The server should fail and store a user-friendly message. assert status["status"] == ServerStatus.failed @@ -211,7 +215,7 @@ def test_everserver_status_exception( config_file = "config_minimal.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) # The server should fail, and store the exception that # start_optimization raised. @@ -242,7 +246,7 @@ def test_everserver_status_max_batch_num( config_file = "config_one_batch.yml" config = EverestConfig.load_file(config_file) everserver.main() - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) # The server should complete without error. assert status["status"] == ServerStatus.completed diff --git a/tests/everest/test_logging.py b/tests/everest/test_logging.py index 6675bcb6323..31e68fc9bad 100644 --- a/tests/everest/test_logging.py +++ b/tests/everest/test_logging.py @@ -23,7 +23,7 @@ def string_exists_in_file(file_path, string): return string in txt -@pytest.mark.flaky(reruns=5) +# @pytest.mark.flaky(reruns=5) @pytest.mark.integration_test @pytest.mark.xdist_group(name="starts_everest") @pytest.mark.fails_on_macos_github_workflow @@ -36,9 +36,13 @@ def test_logging_setup(copy_math_func_test_data_to_tmp): ) makedirs_if_needed(everest_config.output_dir, roll_if_exists=True) with open_storage(ert_config.ens_path, "w") as storage: - start_server(everest_config, ert_config, storage) + start_server(ert_config, storage) try: - wait_for_server(everest_config, 120) + wait_for_server( + everserver_status_path=everest_config.everserver_status_path, + server_context=everest_config.server_context, + timeout=120, + ) wait_for_context() except SystemExit as e: context_stop_and_wait() @@ -52,7 +56,6 @@ def test_logging_setup(copy_math_func_test_data_to_tmp): everest_log_path = os.path.join(everest_logs_dir_path, "everest.log") forward_model_log_path = os.path.join(everest_logs_dir_path, "forward_models.log") - simulation_log_path = os.path.join(everest_logs_dir_path, "simulations.log") everest_server_stderr_path = os.path.join( everest_logs_dir_path, "everest_server.stderr.0" ) @@ -63,7 +66,6 @@ def test_logging_setup(copy_math_func_test_data_to_tmp): assert os.path.exists(everest_output_path) assert os.path.exists(everest_logs_dir_path) assert os.path.exists(forward_model_log_path) - assert os.path.exists(simulation_log_path) assert os.path.exists(everest_log_path) assert os.path.exists(everest_server_stderr_path) assert os.path.exists(everest_server_stdout_path) diff --git a/tests/everest/test_util.py b/tests/everest/test_util.py index 72937e88697..6c1b8798d71 100644 --- a/tests/everest/test_util.py +++ b/tests/everest/test_util.py @@ -150,6 +150,10 @@ def test_report_on_previous_run(_, change_to_tmpdir): f.write(" ") config = EverestConfig.with_defaults(**{ConfigKeys.CONFIGPATH: "config_file"}) with capture_streams() as (out, _): - report_on_previous_run(config) + report_on_previous_run( + config_file=config.config_file, + everserver_status_path=config.everserver_status_path, + optimization_output_dir=config.optimization_output_dir, + ) lines = [line.strip() for line in out.getvalue().split("\n")] assert lines[0] == "Optimization run failed, with error: mock error" diff --git a/tests/everest/utils/__init__.py b/tests/everest/utils/__init__.py index 26772311f1b..71d5beca0e3 100644 --- a/tests/everest/utils/__init__.py +++ b/tests/everest/utils/__init__.py @@ -142,6 +142,6 @@ def create_cached_mocked_test_case(request, monkeypatch) -> pathlib.Path: monkeypatch.chdir("mocked_run") start_everest(["everest", "run", config_file]) config = EverestConfig.load_file(config_file) - status = everserver_status(config) + status = everserver_status(config.everserver_status_path) assert status["status"] == ServerStatus.completed return cache_path / "mocked_run"