Skip to content

Commit

Permalink
Limit the use of EverestConfig in function signatures in everest csv …
Browse files Browse the repository at this point in the history
…exporting
  • Loading branch information
DanSava committed Nov 7, 2024
1 parent 4b18368 commit 2622c1b
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 249 deletions.
4 changes: 1 addition & 3 deletions src/everest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@
from everest import detached, docs, jobs, templates, util
from everest.bin.utils import export_to_csv, export_with_progress
from everest.config_keys import ConfigKeys
from everest.export import MetaDataColumnNames, export, filter_data, validate_export
from everest.export import MetaDataColumnNames, filter_data

__author__ = "Equinor ASA and TNO"
__all__ = [
"ConfigKeys",
"MetaDataColumnNames",
"detached",
"docs",
"export",
"export_to_csv",
"export_with_progress",
"filter_data",
"jobs",
"load",
"templates",
"util",
"validate_export",
]
14 changes: 11 additions & 3 deletions src/everest/bin/everexport_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from functools import partial

from everest import export_to_csv, validate_export
from everest import export_to_csv, export_with_progress
from everest.config import EverestConfig
from everest.config.export_config import ExportConfig
from everest.strings import EVEREST
Expand All @@ -30,10 +30,18 @@ def everexport_entry(args=None):
batch_list = [int(item) for item in options.batches]
config.export.batches = batch_list

err_msgs, export_ecl = validate_export(config)
err_msgs, export_ecl = config.export.check_for_errors(
optimization_output_path=config.optimization_output_dir,
storage_path=config.storage_dir,
data_file_path=config.model.data_file,
)
for msg in err_msgs:
logger.warning(msg)
export_to_csv(config, export_ecl=export_ecl)

export_to_csv(
data_frame=export_with_progress(config, export_ecl),
export_path=config.export_path,
)


def _build_args_parser():
Expand Down
3 changes: 1 addition & 2 deletions src/everest/bin/everload_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from ert.config import ErtConfig
from ert.storage import open_storage
from everest import MetaDataColumnNames as MDCN
from everest import export
from everest.config import EverestConfig
from everest.config.export_config import ExportConfig
from everest.simulator.everest_to_ert import _everest_to_ert_config_dict
Expand Down Expand Up @@ -162,7 +161,7 @@ def reload_data(ever_config: EverestConfig, backup_path=None):
ert_config = ErtConfig.with_plugins().from_dict(config_dict=ert_config_dict)

# load information about batches from previous run
df = export(ever_config, export_ecl=False)
df = ever_config.export_data(export_ecl=False)
groups = df.groupby(by=MDCN.BATCH)

# backup or delete the previous internal storage
Expand Down
22 changes: 6 additions & 16 deletions src/everest/bin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import colorama
from colorama import Fore
from pandas import DataFrame

from ert.resources import all_shell_script_fm_steps
from ert.simulator.batch_simulator_context import Status
Expand All @@ -20,7 +21,6 @@
get_opt_status,
start_monitor,
)
from everest.export import export
from everest.simulator import JOB_FAILURE, JOB_RUNNING, JOB_SUCCESS
from everest.strings import EVEREST

Expand All @@ -35,24 +35,14 @@ def export_with_progress(config, export_ecl=True):
if ProgressBar is not None:
widgets = [Percentage(), " ", Bar(), " ", Timer(), " ", AdaptiveETA()]
with ProgressBar(max_value=1, widgets=widgets) as bar:
export_data = export(
config=config, export_ecl=export_ecl, progress_callback=bar.update
return config.export_data(
export_ecl=export_ecl, progress_callback=bar.update
)
else:
export_data = export(config=config, export_ecl=export_ecl)

return export_data


def export_to_csv(config: EverestConfig, data_frame=None, export_ecl=True):
if data_frame is None:
data_frame = export_with_progress(config, export_ecl)
return config.export_data(export_ecl=export_ecl)

export_path = config.export_path
output_folder = os.path.dirname(export_path)
if not os.path.exists(output_folder):
os.makedirs(output_folder)

def export_to_csv(data_frame: DataFrame, export_path: str) -> None:
os.makedirs(os.path.dirname(export_path), exist_ok=True)
data_frame.to_csv(export_path, sep=";", index=False)
logging.getLogger(EVEREST).info("Data exported to {}".format(export_path))

Expand Down
48 changes: 48 additions & 0 deletions src/everest/config/everest_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
no_type_check,
)

import pandas as pd
from pydantic import (
AfterValidator,
BaseModel,
Expand All @@ -41,6 +42,12 @@
unique_items,
validate_forward_model_configs,
)
from everest.export import (
MetaDataColumnNames,
export_metadata,
filter_data,
load_simulation_data,
)
from everest.jobs import script_names
from everest.util.forward_models import collect_forward_models

Expand Down Expand Up @@ -828,3 +835,44 @@ def dump(self, fname: Optional[str] = None) -> Optional[str]:
yaml.dump(stripped_conf, out)

return None

def export_data(self, export_ecl=True, progress_callback=lambda _: None):
"""Export everest data into a pandas dataframe. If the config specifies
a data_file and @export_ecl is True, simulation data is included. When
exporting simulation data, only keywords matching elements in @ecl_keywords
are exported. Note that wildcards are allowed.
@progress_callback will be called with a number between 0 and 1 indicating
the fraction of batches that has been loaded.
"""

ecl_keywords = None
# If user exports with a config file that has the SKIP_EXPORT
# set to true export nothing
if self.export is not None:
if self.export.skip_export or self.export.batches == []:
return pd.DataFrame([])

ecl_keywords = self.export.keywords

metadata = export_metadata(self.export, self.optimization_output_dir)
data_file = self.model.data_file
if data_file is None or not export_ecl:
return pd.DataFrame(metadata)

data = load_simulation_data(
output_path=self.output_dir,
metadata=metadata,
progress_callback=progress_callback,
)

if ecl_keywords is not None:
keywords = tuple(ecl_keywords)
# NOTE: Some of these keywords are necessary to export successfully,
# we should not leave this to the user
keywords += tuple(pd.DataFrame(metadata).columns)
keywords += tuple(MetaDataColumnNames.get_all())
keywords_set = set(keywords)
data = filter_data(data, keywords_set)

return data
69 changes: 69 additions & 0 deletions src/everest/config/export_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pydantic import BaseModel, Field, field_validator

from everest.config.validation_utils import check_writable_filepath
from everest.export import available_batches, get_internalized_keys


class ExportConfig(BaseModel, extra="forbid"): # type: ignore
Expand Down Expand Up @@ -41,3 +42,71 @@ class ExportConfig(BaseModel, extra="forbid"): # type: ignore
def validate_output_file_writable(cls, csv_output_filepath): # pylint:disable=E0213
check_writable_filepath(csv_output_filepath)
return csv_output_filepath

def check_for_errors(
self,
optimization_output_path: str,
storage_path: str,
data_file_path: Optional[str],
):
"""
Checks for possible errors when attempting to export current optimization
case.
"""
export_ecl = True
export_errors: List[str] = []

if self.batches:
_available_batches = available_batches(optimization_output_path)
for batch in set(self.batches).difference(_available_batches):
export_errors.append(
"Batch {} not found in optimization "
"results. Skipping for current export."
"".format(batch)
)
self.batches = list(set(self.batches).intersection(_available_batches))

if self.batches == []:
export_errors.append(
"No batches selected for export. "
"Only optimization data will be exported."
)
return export_errors, False

if not data_file_path:
export_ecl = False
export_errors.append(
"No data file found in config."
"Only optimization data will be exported."
)

# If no user defined keywords are present it is no longer possible to check
# availability in internal storage
if self.keywords is None:
return export_errors, export_ecl

if not self.keywords:
export_ecl = False
export_errors.append(
"No eclipse keywords selected for export. Only"
" optimization data will be exported."
)

internal_keys = get_internalized_keys(
config=self,
storage_path=storage_path,
optimization_output_path=optimization_output_path,
batch_ids=set(self.batches) if self.batches else None,
)

extra_keys = set(self.keywords).difference(set(internal_keys))
if extra_keys:
export_ecl = False
export_errors.append(
f"Non-internalized ecl keys selected for export '{' '.join(extra_keys)}'."
" in order to internalize missing keywords "
f"run 'everest load <config_file>'. "
"Only optimization data will be exported."
)

return export_errors, export_ecl
10 changes: 2 additions & 8 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage):
)

try:
_save_running_config(config)
save_config_path = os.path.join(config.output_dir, config.config_file)
config.dump(save_config_path)
except (OSError, LookupError) as e:
logging.getLogger(EVEREST).error(
"Failed to save optimization config: {}".format(e)
Expand Down Expand Up @@ -121,13 +122,6 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage):
return _context


def _save_running_config(config: EverestConfig):
assert config.output_dir is not None
assert config.config_file is not None
save_config_path = os.path.join(config.output_dir, config.config_file)
config.dump(save_config_path)


def context_stop_and_wait():
global _context # noqa: PLW0602
if _context:
Expand Down
22 changes: 17 additions & 5 deletions src/everest/detached/jobs/everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ert.config import QueueSystem
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.run_models.everest_run_model import EverestRunModel
from everest import export_to_csv, validate_export
from everest import export_to_csv, export_with_progress
from everest.config import EverestConfig
from everest.detached import ServerStatus, get_opt_status, update_everserver_status
from everest.simulator import JOB_FAILURE
Expand Down Expand Up @@ -304,10 +304,22 @@ def main():
try:
# Exporting data
update_everserver_status(config, ServerStatus.exporting_to_csv)
err_msgs, export_ecl = validate_export(config)
for msg in err_msgs:
logging.getLogger(EVEREST).warning(msg)
export_to_csv(config, export_ecl=export_ecl)

if config.export is not None:
err_msgs, export_ecl = config.export.check_for_errors(
optimization_output_path=config.optimization_output_dir,
storage_path=config.storage_dir,
data_file_path=config.model.data_file,
)
for msg in err_msgs:
logging.getLogger(EVEREST).warning(msg)
else:
export_ecl = True

export_to_csv(
data_frame=export_with_progress(config, export_ecl),
export_path=config.export_path,
)
except:
update_everserver_status(
config, ServerStatus.failed, message=traceback.format_exc()
Expand Down
Loading

0 comments on commit 2622c1b

Please sign in to comment.