diff --git a/rostok/trajectory_optimizer/control_optimizer.py b/rostok/trajectory_optimizer/control_optimizer.py index 4f0c687e..7025a6d3 100644 --- a/rostok/trajectory_optimizer/control_optimizer.py +++ b/rostok/trajectory_optimizer/control_optimizer.py @@ -3,7 +3,10 @@ from copy import deepcopy from dataclasses import dataclass, field from itertools import product +import os +from joblib import Parallel, delayed +from joblib.parallel import TimeoutError import numpy as np from scipy.optimize import direct, dual_annealing @@ -252,7 +255,7 @@ def simulate_with_control_parameters(self, data, graph, simulation_scenario): starting_positions, vis=False, delay=False) - + def calculate_reward(self, graph: GraphGrammar): """Constant moment optimization method using scenario simulation and rewarder for calculating objective function. @@ -346,30 +349,6 @@ def _reward_with_parameters(self, parameters, graph, simulator_scenario): reward = self.rewarder.calculate_reward(sim_output) return -reward - def _parallel_reward_with_parameters(self, input): - """Objective function to be optimized - - Args: - parameters (np.ndarray): Array variables of objective function - graph (GraphGrammar): Graph of mechanism for which the optimization do - simulator_scenario (ParamtrizedAimulation): Simulation scenario in which data is collected for calcule the objective function - - Returns: - float: Value of objective function - """ - parameters, graph, simulator_scenario = input - data = self._transform_parameters2data(parameters) - # print(f"Data: correct!, {data}") - sim_output = self.simulate_with_control_parameters(data, graph, simulator_scenario) - if list( - filter(lambda x: isinstance(x, EventFlyingApart), - simulator_scenario.event_container))[0].state: - return 0.03 - # print(f"Sim: correct! {sim_output}") - reward = self.rewarder.calculate_reward(sim_output) - # prints(f"Calculate: correct! {reward}") - return parameters, simulator_scenario, reward - def _transform_parameters2data(self, parameters, *args): """Method define transofrm algorigm parameters to data control @@ -419,12 +398,32 @@ def __init__(self, rewarder: SimulationReward, data: TendonControllerParameters, tendon_forces: list[float], - starting_finger_angles=45): + starting_finger_angles=45, + num_cpu_workers=1, + chunksize=1, + timeout_parallel = 60*5): + """Brute force optimization of tendon forces for controlling the mechanism. In subclass, it have to override method: bound_parameter, _transform_parameter2data and run_optimization. Number of cpu workers define number of parallel processes. + + Args: + simulation_scenario (_type_): Scenario of simulation for virtual experiment + rewarder (SimulationReward): Instance of the class on which the objective function will be calculated + data (TendonControllerParameters): Parameters of control class + tendon_forces (list[float]): List of tendon force for brute force optimization. + starting_finger_angles (int, optional): Initial angle of fingers. Defaults to 45. + num_cpu_workers (int, optional): Number of parallel process. When set to "auto", the algorithm selects the number of workers by itself. Defaults to 1. + chunksize (int, optional): Number of batch for one cpu worker. When set to "auto", the algorithm selects the number of workers by itself. Defaults to 1. + """ mock_optimization_bounds = (0, 15) mock_optimization_limit = 10 self.tendon_forces = tendon_forces super().__init__(simulation_scenario, rewarder, data, starting_finger_angles, mock_optimization_bounds, mock_optimization_limit) + self.num_cpu_workers = num_cpu_workers + self.chunksize = chunksize + self.timeout_parallel = timeout_parallel + + if self.num_cpu_workers == "auto": + self.num_cpu_workers = os.cpu_count() - 2 def run_optimization(self, callback, multi_bound, args): graph = args[0] @@ -436,4 +435,70 @@ def run_optimization(self, callback, multi_bound, args): res_comp = Result(res, np.array(variant)) results.append(res_comp) result = min(results, key=lambda i: i.fun) - return result \ No newline at end of file + return result + + def _parallel_reward_with_parameters(self, input): + """Objective function to be optimized + + Args: + parameters (np.ndarray): Array variables of objective function + graph (GraphGrammar): Graph of mechanism for which the optimization do + simulator_scenario (ParamtrizedAimulation): Simulation scenario in which data is collected for calcule the objective function + + Returns: + float: Value of objective function + """ + parameters, graph, simulator_scenario = input + data = self._transform_parameters2data(parameters) + # print(f"Data: correct!, {data}") + sim_output = self.simulate_with_control_parameters(data, graph, simulator_scenario) + if list( + filter(lambda x: isinstance(x, EventFlyingApart), + simulator_scenario.event_container))[0].state: + return parameters, simulator_scenario, 0.03 + reward = self.rewarder.calculate_reward(sim_output) + return parameters, simulator_scenario, reward + + def __parallel_calculate_reward(self, graph: GraphGrammar): + multi_bound = self.bound_parameters(graph) + + if not multi_bound: + return (0, []) + + all_variants_control = list(product(self.tendon_forces, repeat=len(joint_root_paths(graph)))) + if isinstance(self.simulation_scenario, list): + object_weight = {sim_scen[0].grasp_object_callback: sim_scen[1] for sim_scen in self.simulation_scenario} + all_simulations = list(product(all_variants_control, self.simulation_scenario)) + input_dates = [(np.array(put[0]), graph, put[1][0]) for put in all_simulations] + else: + object_weight = {self.simulation_scenario.grasp_object_callback: 1} + all_simulations = list(product(all_variants_control, [self.simulation_scenario])) + input_dates = [(np.array(put[0]), graph, put[1]) for put in all_simulations] + np.random.shuffle(input_dates) + + cpus = len(input_dates) + 1 if len(input_dates) < self.num_cpu_workers else self.num_cpu_workers + print(f"Use CPUs processor: {cpus}, input dates: {len(input_dates)}") + parallel_results = [] + try: + parallel_results = Parallel(cpus, backend = "multiprocessing", verbose=100, timeout=self.timeout_parallel, batch_size = self.chunksize)(delayed(self._parallel_reward_with_parameters)(i) for i in input_dates) + except TimeoutError: + print("TIMEOUT") + return (0.01, []) + result_group_object = {sim_scen[0].grasp_object_callback: [] for sim_scen in self.simulation_scenario} + for results in parallel_results: + obj = results[1].grasp_object_callback + result_group_object[obj].append((results[0], results[2]*object_weight[obj])) + + reward = 0 + control = [] + for value in result_group_object.values(): + best_res = max(value, key=lambda i: i[1]) + reward += best_res[1] + control.append(best_res[0]) + + return (reward, control) + + def calculate_reward(self, graph: GraphGrammar): + if self.num_cpu_workers > 1: + return self.__parallel_calculate_reward(graph) + return super().calculate_reward(graph) \ No newline at end of file