generated from rochacbruno/python-project-template
-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement fixes for multirun simulation
- Loading branch information
Showing
5 changed files
with
85 additions
and
41 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,62 +1,83 @@ | ||
import gc | ||
import traceback | ||
import time | ||
from typing import Dict | ||
|
||
import tqdm | ||
from joblib import Parallel, delayed | ||
|
||
from .simulation import Simulation | ||
from utils.multi_value_cli import multi_value_cli | ||
from typing import Dict | ||
from joblib import Parallel, delayed | ||
from .simulation import Simulation | ||
|
||
|
||
def __run__(s: Dict): | ||
s = Simulation(s) | ||
|
||
print(f'Simulation started {s.parameters["name"]}') | ||
|
||
start = time.time() | ||
|
||
try: | ||
s.run() | ||
except Exception as e: | ||
print(f'Error in simulation {s.parameters["name"]}: {e}') | ||
print(traceback.format_exc()) | ||
|
||
print(f'Simulation finished {s.parameters["name"]}. Elapsed time: {time.time() - start} seconds.') | ||
|
||
del s | ||
|
||
gc.collect() | ||
|
||
|
||
class MultiSimulation: | ||
|
||
def __init__(self, parameters: Dict): | ||
self.parameters = parameters | ||
|
||
@property | ||
def workflow_id(self) -> str: | ||
return '' | ||
|
||
def run(self): | ||
simulations = self.__fetch_tasks__() | ||
parameters = self.__fetch_tasks__() | ||
parameters = self.__add_debug_info__(parameters) | ||
parameters = self.__fix_names__(parameters) | ||
|
||
self.__add_debug_info__(simulations) | ||
self.__fix_names__(simulations) | ||
print(f'Running {len(parameters)} simulations') | ||
|
||
n_workers = self.parameters.get('n_workers', -1) | ||
|
||
def __run__(s: Simulation): | ||
try: | ||
s.run() | ||
except Exception as e: | ||
print(f'Error in simulation {s.parameters["name"]}: {e}') | ||
|
||
return s | ||
|
||
iter = Parallel( | ||
n_jobs=n_workers, | ||
backend='loky', | ||
return_as='generator', | ||
prefer='processes', | ||
)(delayed(lambda s: __run__(s))(s) for s in simulations) | ||
|
||
for s in tqdm.tqdm(iter, total=len(simulations)): | ||
print(f'Simulation finished {s.parameters["name"]}') | ||
Parallel( | ||
n_jobs=n_workers | ||
)(delayed(__run__)(s) for s in parameters) | ||
|
||
def __fetch_tasks__(self): | ||
result: [Simulation] = [] | ||
result: [Dict] = [] | ||
|
||
for task in self.parameters['tasks']: | ||
match self.parameters['kind']: | ||
case 'task': | ||
result += [Simulation(task['parameters'])] | ||
result += [task['parameters']] | ||
case 'multi_task': | ||
result += multi_value_cli(task['parameters'], lambda p: Simulation(p)) | ||
result += multi_value_cli(task['parameters'], lambda p: p) | ||
case _: | ||
raise ValueError(f"Unknown kind: {self.parameters['kind']}") | ||
|
||
return result | ||
|
||
def __add_debug_info__(self, simulations: [Simulation]): | ||
def __add_debug_info__(self, simulations: [Dict]): | ||
result = simulations | ||
|
||
if self.parameters.get('debug', False): | ||
for simulation in simulations: | ||
simulation.parameters['debug'] = True | ||
for index, _ in enumerate(result): | ||
result[index]['debug'] = True | ||
|
||
return result | ||
|
||
def __fix_names__(self, simulations: [Simulation]): | ||
for i, simulation in enumerate(simulations): | ||
simulation.parameters['name'] = f"{simulation.parameters['name']}_{i}" | ||
def __fix_names__(self, simulations: [Dict]): | ||
result = simulations | ||
|
||
for i, simulation in enumerate(result): | ||
result[i]['name'] = f"{simulation['name']}_{i}" | ||
|
||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters