From 6174dcf60c9dc5922b4da8b30f20a1655c7e3f38 Mon Sep 17 00:00:00 2001 From: Yury Hayeu Date: Sat, 24 Feb 2024 12:31:02 +0100 Subject: [PATCH] Implement fixes for multirun simulation --- diploma_thesis/configuration/jsp.yml | 10 ++- diploma_thesis/workflow/multi_simulation.py | 87 +++++++++++++-------- diploma_thesis/workflow/simulation.py | 6 ++ diploma_thesis/workflow/tournament.py | 4 + diploma_thesis/workflow/workflow.py | 19 ++++- 5 files changed, 85 insertions(+), 41 deletions(-) diff --git a/diploma_thesis/configuration/jsp.yml b/diploma_thesis/configuration/jsp.yml index 05d6a57..e0edd6e 100644 --- a/diploma_thesis/configuration/jsp.yml +++ b/diploma_thesis/configuration/jsp.yml @@ -81,11 +81,11 @@ task: # Action Set - [ 'model/rules/all.yml', - 'model/rules/marl_as.yml' + 'model/rules/marl.yml' ] # Model - [ - 'model/model/marl_as.yml' + 'model/model/marl_as.yml', 'model/model/marl_mr.yml' ] @@ -98,8 +98,10 @@ task: run: parameters: mods: +# - ['test.yml'] + - [ 'multi.yml' ] +# - [ 'multi.yml', 'concurrent.yml' ] # - [ 'util_70.yml' ] # - [ 'util_80.yml' ] # - [ 'util_90.yml' ] -# - [ 'multi.yml' ] - - [ 'multi.yml', 'concurrent.yml' ] + diff --git a/diploma_thesis/workflow/multi_simulation.py b/diploma_thesis/workflow/multi_simulation.py index 2390f35..820e5f7 100644 --- a/diploma_thesis/workflow/multi_simulation.py +++ b/diploma_thesis/workflow/multi_simulation.py @@ -1,10 +1,32 @@ +import gc +import traceback +import time +from typing import Dict -import tqdm +from joblib import Parallel, delayed -from .simulation import Simulation from utils.multi_value_cli import multi_value_cli -from typing import Dict -from joblib import Parallel, delayed +from .simulation import Simulation + + +def __run__(s: Dict): + s = Simulation(s) + + print(f'Simulation started {s.parameters["name"]}') + + start = time.time() + + try: + s.run() + except Exception as e: + print(f'Error in simulation {s.parameters["name"]}: {e}') + print(traceback.format_exc()) + + print(f'Simulation finished {s.parameters["name"]}. Elapsed time: {time.time() - start} seconds.') + + del s + + gc.collect() class MultiSimulation: @@ -12,51 +34,50 @@ class MultiSimulation: def __init__(self, parameters: Dict): self.parameters = parameters + @property + def workflow_id(self) -> str: + return '' + def run(self): - simulations = self.__fetch_tasks__() + parameters = self.__fetch_tasks__() + parameters = self.__add_debug_info__(parameters) + parameters = self.__fix_names__(parameters) - self.__add_debug_info__(simulations) - self.__fix_names__(simulations) + print(f'Running {len(parameters)} simulations') n_workers = self.parameters.get('n_workers', -1) - def __run__(s: Simulation): - try: - s.run() - except Exception as e: - print(f'Error in simulation {s.parameters["name"]}: {e}') - - return s - - iter = Parallel( - n_jobs=n_workers, - backend='loky', - return_as='generator', - prefer='processes', - )(delayed(lambda s: __run__(s))(s) for s in simulations) - - for s in tqdm.tqdm(iter, total=len(simulations)): - print(f'Simulation finished {s.parameters["name"]}') + Parallel( + n_jobs=n_workers + )(delayed(__run__)(s) for s in parameters) def __fetch_tasks__(self): - result: [Simulation] = [] + result: [Dict] = [] for task in self.parameters['tasks']: match self.parameters['kind']: case 'task': - result += [Simulation(task['parameters'])] + result += [task['parameters']] case 'multi_task': - result += multi_value_cli(task['parameters'], lambda p: Simulation(p)) + result += multi_value_cli(task['parameters'], lambda p: p) case _: raise ValueError(f"Unknown kind: {self.parameters['kind']}") return result - def __add_debug_info__(self, simulations: [Simulation]): + def __add_debug_info__(self, simulations: [Dict]): + result = simulations + if self.parameters.get('debug', False): - for simulation in simulations: - simulation.parameters['debug'] = True + for index, _ in enumerate(result): + result[index]['debug'] = True + + return result - def __fix_names__(self, simulations: [Simulation]): - for i, simulation in enumerate(simulations): - simulation.parameters['name'] = f"{simulation.parameters['name']}_{i}" + def __fix_names__(self, simulations: [Dict]): + result = simulations + + for i, simulation in enumerate(result): + result[i]['name'] = f"{simulation['name']}_{i}" + + return result diff --git a/diploma_thesis/workflow/simulation.py b/diploma_thesis/workflow/simulation.py index 9dee4c1..d01190e 100644 --- a/diploma_thesis/workflow/simulation.py +++ b/diploma_thesis/workflow/simulation.py @@ -24,6 +24,10 @@ def __init__(self, parameters: Dict): self.parameters = parameters + @property + def workflow_id(self) -> str: + return self.parameters.get('name', '') + @property def log_stdout(self): return self.parameters.get('log_stdout', False) @@ -152,6 +156,8 @@ def __to_dataframe__(data): if data.batch_size == torch.Size([]): return pd.DataFrame(columns=['shop_floor_id']) + data = data.to_dict() + return pd.DataFrame(data) machine_reward = __to_dataframe__(reward_cache.machines) diff --git a/diploma_thesis/workflow/tournament.py b/diploma_thesis/workflow/tournament.py index 27dcc96..3dd10fa 100644 --- a/diploma_thesis/workflow/tournament.py +++ b/diploma_thesis/workflow/tournament.py @@ -25,6 +25,10 @@ class Tournament(Workflow): def __init__(self, parameters: Dict): self.parameters = parameters + @property + def workflow_id(self) -> str: + return '' + def run(self): candidates = self.__make_candidates__() criteria = self.__make_criteria__() diff --git a/diploma_thesis/workflow/workflow.py b/diploma_thesis/workflow/workflow.py index 0cb7763..2a8faa2 100644 --- a/diploma_thesis/workflow/workflow.py +++ b/diploma_thesis/workflow/workflow.py @@ -13,9 +13,13 @@ class Workflow(metaclass=ABCMeta): def run(self): pass + @property + @abstractmethod + def workflow_id(self) -> str: + pass + def __make_logger__(self, name: str, filename: str = None, log_stdout: bool = False) -> logging.Logger: - logger = logging.getLogger(name) - logger.setLevel(logging.INFO) + logger = self.__get_logger__(name) formatter = logging.Formatter('%(asctime)s | | %(name)s | %(levelname)s | %(message)s') @@ -39,8 +43,7 @@ def format(self, record): record.time = str(time) return super(_Formatter, self).format(record) - logger = logging.getLogger(name) - logger.setLevel(logging.INFO) + logger = self.__get_logger__(name) formatter = _Formatter('%(asctime)s | %(time)s | %(name)s | %(levelname)s | %(message)s') @@ -67,3 +70,11 @@ def __add_handlers__(self, logger, formatter, filename: str, log_stdout: bool): file_handler.setFormatter(formatter) logger.addHandler(file_handler) + + def __get_logger__(self, name): + workflow_id = self.workflow_id + + logger = logging.Logger(name + '_' + self.workflow_id if len(workflow_id) > 0 else name) + logger.setLevel(logging.INFO) + + return logger