Skip to content

Commit

Permalink
Merge pull request #128 from aimclub/optimization/brute_force_parallel
Browse files Browse the repository at this point in the history
Optimization/brute force parallel
  • Loading branch information
ZharkovKirill authored Oct 13, 2023
2 parents 06aafc8 + a850602 commit 218337d
Showing 1 changed file with 92 additions and 27 deletions.
119 changes: 92 additions & 27 deletions rostok/trajectory_optimizer/control_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from copy import deepcopy
from dataclasses import dataclass, field
from itertools import product
import os

from joblib import Parallel, delayed
from joblib.parallel import TimeoutError
import numpy as np
from scipy.optimize import direct, dual_annealing

Expand Down Expand Up @@ -252,7 +255,7 @@ def simulate_with_control_parameters(self, data, graph, simulation_scenario):
starting_positions,
vis=False,
delay=False)

def calculate_reward(self, graph: GraphGrammar):
"""Constant moment optimization method using scenario simulation and rewarder for calculating objective function.
Expand Down Expand Up @@ -346,30 +349,6 @@ def _reward_with_parameters(self, parameters, graph, simulator_scenario):
reward = self.rewarder.calculate_reward(sim_output)
return -reward

def _parallel_reward_with_parameters(self, input):
"""Objective function to be optimized
Args:
parameters (np.ndarray): Array variables of objective function
graph (GraphGrammar): Graph of mechanism for which the optimization do
simulator_scenario (ParamtrizedAimulation): Simulation scenario in which data is collected for calcule the objective function
Returns:
float: Value of objective function
"""
parameters, graph, simulator_scenario = input
data = self._transform_parameters2data(parameters)
# print(f"Data: correct!, {data}")
sim_output = self.simulate_with_control_parameters(data, graph, simulator_scenario)
if list(
filter(lambda x: isinstance(x, EventFlyingApart),
simulator_scenario.event_container))[0].state:
return 0.03
# print(f"Sim: correct! {sim_output}")
reward = self.rewarder.calculate_reward(sim_output)
# prints(f"Calculate: correct! {reward}")
return parameters, simulator_scenario, reward

def _transform_parameters2data(self, parameters, *args):
"""Method define transofrm algorigm parameters to data control
Expand Down Expand Up @@ -419,12 +398,32 @@ def __init__(self,
rewarder: SimulationReward,
data: TendonControllerParameters,
tendon_forces: list[float],
starting_finger_angles=45):
starting_finger_angles=45,
num_cpu_workers=1,
chunksize=1,
timeout_parallel = 60*5):
"""Brute force optimization of tendon forces for controlling the mechanism. In subclass, it have to override method: bound_parameter, _transform_parameter2data and run_optimization. Number of cpu workers define number of parallel processes.
Args:
simulation_scenario (_type_): Scenario of simulation for virtual experiment
rewarder (SimulationReward): Instance of the class on which the objective function will be calculated
data (TendonControllerParameters): Parameters of control class
tendon_forces (list[float]): List of tendon force for brute force optimization.
starting_finger_angles (int, optional): Initial angle of fingers. Defaults to 45.
num_cpu_workers (int, optional): Number of parallel process. When set to "auto", the algorithm selects the number of workers by itself. Defaults to 1.
chunksize (int, optional): Number of batch for one cpu worker. When set to "auto", the algorithm selects the number of workers by itself. Defaults to 1.
"""
mock_optimization_bounds = (0, 15)
mock_optimization_limit = 10
self.tendon_forces = tendon_forces
super().__init__(simulation_scenario, rewarder, data, starting_finger_angles,
mock_optimization_bounds, mock_optimization_limit)
self.num_cpu_workers = num_cpu_workers
self.chunksize = chunksize
self.timeout_parallel = timeout_parallel

if self.num_cpu_workers == "auto":
self.num_cpu_workers = os.cpu_count() - 2

def run_optimization(self, callback, multi_bound, args):
graph = args[0]
Expand All @@ -436,4 +435,70 @@ def run_optimization(self, callback, multi_bound, args):
res_comp = Result(res, np.array(variant))
results.append(res_comp)
result = min(results, key=lambda i: i.fun)
return result
return result

def _parallel_reward_with_parameters(self, input):
"""Objective function to be optimized
Args:
parameters (np.ndarray): Array variables of objective function
graph (GraphGrammar): Graph of mechanism for which the optimization do
simulator_scenario (ParamtrizedAimulation): Simulation scenario in which data is collected for calcule the objective function
Returns:
float: Value of objective function
"""
parameters, graph, simulator_scenario = input
data = self._transform_parameters2data(parameters)
# print(f"Data: correct!, {data}")
sim_output = self.simulate_with_control_parameters(data, graph, simulator_scenario)
if list(
filter(lambda x: isinstance(x, EventFlyingApart),
simulator_scenario.event_container))[0].state:
return parameters, simulator_scenario, 0.03
reward = self.rewarder.calculate_reward(sim_output)
return parameters, simulator_scenario, reward

def __parallel_calculate_reward(self, graph: GraphGrammar):
multi_bound = self.bound_parameters(graph)

if not multi_bound:
return (0, [])

all_variants_control = list(product(self.tendon_forces, repeat=len(joint_root_paths(graph))))
if isinstance(self.simulation_scenario, list):
object_weight = {sim_scen[0].grasp_object_callback: sim_scen[1] for sim_scen in self.simulation_scenario}
all_simulations = list(product(all_variants_control, self.simulation_scenario))
input_dates = [(np.array(put[0]), graph, put[1][0]) for put in all_simulations]
else:
object_weight = {self.simulation_scenario.grasp_object_callback: 1}
all_simulations = list(product(all_variants_control, [self.simulation_scenario]))
input_dates = [(np.array(put[0]), graph, put[1]) for put in all_simulations]
np.random.shuffle(input_dates)

cpus = len(input_dates) + 1 if len(input_dates) < self.num_cpu_workers else self.num_cpu_workers
print(f"Use CPUs processor: {cpus}, input dates: {len(input_dates)}")
parallel_results = []
try:
parallel_results = Parallel(cpus, backend = "multiprocessing", verbose=100, timeout=self.timeout_parallel, batch_size = self.chunksize)(delayed(self._parallel_reward_with_parameters)(i) for i in input_dates)
except TimeoutError:
print("TIMEOUT")
return (0.01, [])
result_group_object = {sim_scen[0].grasp_object_callback: [] for sim_scen in self.simulation_scenario}
for results in parallel_results:
obj = results[1].grasp_object_callback
result_group_object[obj].append((results[0], results[2]*object_weight[obj]))

reward = 0
control = []
for value in result_group_object.values():
best_res = max(value, key=lambda i: i[1])
reward += best_res[1]
control.append(best_res[0])

return (reward, control)

def calculate_reward(self, graph: GraphGrammar):
if self.num_cpu_workers > 1:
return self.__parallel_calculate_reward(graph)
return super().calculate_reward(graph)

0 comments on commit 218337d

Please sign in to comment.