diff --git a/app/123.py b/app/123.py new file mode 100644 index 00000000..1ffb8b62 --- /dev/null +++ b/app/123.py @@ -0,0 +1,39 @@ +from rostok.block_builder_api.block_blueprints import EnvironmentBodyBlueprint +from rostok.control_chrono.tendon_controller import TendonControllerParameters, TendonController_2p +from rostok.criterion.criterion_calculation import SimulationReward, TimeCriterion +from rostok.library.rule_sets.simple_designs import get_three_link_one_finger, get_two_link_three_finger +from rostok.simulation_chrono.simulation_scenario import GraspScenario +from rostok.trajectory_optimizer.control_optimizer import MockOptiVar, BruteForceOptimisation1D, TendonForceOptiVar +from rostok.library.obj_grasp import objects +from rostok.utils.numeric_utils import Offset + + +if __name__ == '__main__': + data = TendonControllerParameters() + data.amount_pulley_in_body = 2 + data.pulley_parameters_for_body = { + 0: [Offset(-0.14, True), Offset(0.005, False, True), + Offset(0, True)], + 1: [Offset(-0.14, True), Offset(-0.005, False, True), + Offset(0, True)] + } + data.starting_point_parameters = [Offset(-0.02, False), Offset(0.025, False), Offset(0, True)] + data.tip_parameters = [Offset(-0.3, True), Offset(-0.005, False, True), Offset(0, True)] + + gpesk1 = GraspScenario(0.001, 1, TendonController_2p) + gpesk1.grasp_object_callback = objects.get_object_box(0.1, 0.1, 0.1, 0) + + gpesk2 = GraspScenario(0.001, 1, TendonController_2p) + gpesk2.grasp_object_callback = objects.get_object_box(0.1, 0.1, 0.1, 1) + + + simulation_rewarder = SimulationReward(verbosity=0) + + + + graph = get_three_link_one_finger() + ttt = TendonForceOptiVar(data, simulation_rewarder, -45) + + kukish = BruteForceOptimisation1D([10, 10, 11, 12, 13], [gpesk1, gpesk2], ttt, num_cpu_workers=1) + sosik = kukish.parallel_calculate_reward(graph) + pass \ No newline at end of file diff --git a/app/generate_grasper_cfgs.py b/app/generate_grasper_cfgs.py index a104c52f..6a1bac9a 100644 --- a/app/generate_grasper_cfgs.py +++ b/app/generate_grasper_cfgs.py @@ -1,9 +1,45 @@ -from rostok.simulation_chrono.simulation_scenario import GraspScenario +from dataclasses import dataclass, field +from typing import Type +from rostok.block_builder_api.block_blueprints import EnvironmentBodyBlueprint +from rostok.simulation_chrono.simulation_scenario import GraspScenario, ParametrizedSimulation +import rostok.control_chrono.external_force as f_ext -def create_simulator(): - pass - #return simulation_manager - simulation_manager = GraspScenario(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION, obj_external_forces=obj_forces) +@dataclass +class SimulationConfig: + time_step: float = 0.0001 + time_simulation: float = 10 + scenario_cls: Type[ParametrizedSimulation] = GraspScenario + obj_disturbance_forces: list[f_ext.ABCForceCalculator] = field(default_factory=list) -def \ No newline at end of file + +# From tis stuff generates sim_manager + + +@dataclass +class GraspObjective: + object_list: list[EnvironmentBodyBlueprint] = field(default_factory=list) + weight_list: list[float] = field(default_factory=list) + event_time_no_contact: float = 2 + event_flying_apart_time: float = 2 + event_slipout_time: float = 1 + event_grasp_time: float = 5 + event_force_test_time: float = 5 + + time_criterion_weight: float = 1 + instant_contact_link_criterion_weight: float = 1 + instant_force_criterion_weight: float = 1 + instant_cog_criterion_weight: float = 1 + grasp_time_criterion_weight: float = 1 + final_pos_criterion_weight: float = 1 + +@dataclass +class ControlOptimizationParams: + optimisation_bound: tuple = field(default_factory=tuple) + optimisation_iter: int = field(default_factory=int) + +@dataclass +class ControlBruteForceParams: + force_list: list[float] = field(default_factory=list) + staring_angle: float = field(default_factory=float) + diff --git a/app/mcts_run_setup.py b/app/mcts_run_setup.py index 195c0257..5f6482e8 100644 --- a/app/mcts_run_setup.py +++ b/app/mcts_run_setup.py @@ -2,7 +2,7 @@ import hyperparameters as hp -from rostok.control_chrono.tendon_controller import TendonControllerParameters +from rostok.control_chrono.tendon_controller import TendonController_2p, TendonControllerParameters from rostok.criterion.criterion_calculation import ( FinalPositionCriterion, GraspTimeCriterion, InstantContactingLinkCriterion, InstantForceCriterion, InstantObjectCOGCriterion, SimulationReward, @@ -23,7 +23,7 @@ def config_independent_torque(grasp_object_blueprint): # configurate the simulation manager - simulation_manager = GraspScenario(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION, tendon = False) + simulation_manager = GraspScenario(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION) # simulation_manager.grasp_object_callback = lambda: creator.create_environment_body( # grasp_object_blueprint) simulation_manager.grasp_object_callback = grasp_object_blueprint @@ -78,7 +78,7 @@ def config_with_tendon(grasp_object_blueprint): obj_forces.append(f_ext.NullGravity(0)) obj_forces.append(f_ext.RandomForces(1e6, 100, 0)) obj_forces = f_ext.ExternalForces(obj_forces) - simulation_manager = GraspScenario(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION, obj_external_forces=obj_forces) + simulation_manager = GraspScenario(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION, TendonController_2p, obj_external_forces=obj_forces) simulation_manager.grasp_object_callback = grasp_object_blueprint #lambda: creator.create_environment_body( #grasp_object_blueprint) event_contact = EventContact() @@ -140,7 +140,7 @@ def config_with_tendon(grasp_object_blueprint): def config_combination_force_tendon_multiobject_parallel(grasp_object_blueprint, weights): # configurate the simulation manager - simulation_manager = GraspScenario(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION, True) + simulation_manager = GraspScenario(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION, TendonController_2p) object_callback = grasp_object_blueprint #[ # (lambda obj=obj: creator.create_environment_body(obj)) for obj in grasp_object_blueprint diff --git a/rostok/simulation_chrono/simulation_scenario.py b/rostok/simulation_chrono/simulation_scenario.py index 3c72798c..9a66de2c 100644 --- a/rostok/simulation_chrono/simulation_scenario.py +++ b/rostok/simulation_chrono/simulation_scenario.py @@ -17,7 +17,7 @@ from rostok.block_builder_chrono.block_builder_chrono_api import \ ChronoBlockCreatorInterface as creator from rostok.control_chrono.tendon_controller import TendonController_2p - +from abc import abstractmethod class ParametrizedSimulation: @@ -25,7 +25,12 @@ def __init__(self, step_length, simulation_length): self.step_length = step_length self.simulation_length = simulation_length - def run_simulation(self, graph: GraphGrammar, data): + def run_simulation(self, + graph: GraphGrammar, + controller_data, + starting_positions=None, + vis=False, + delay=False): pass def __repr__(self) -> str: @@ -35,30 +40,23 @@ def __repr__(self) -> str: def __str__(self) -> str: json_data = json.dumps(self, indent=4, cls=RostokJSONEncoder) return json_data - - -class GraspScenario(ParametrizedSimulation): - """ + @abstractmethod + def get_scenario_name(self): + return self.__str__ - - - PROKINUT CONTROLL CLS - - Args: - ParametrizedSimulation (_type_): _description_ - """ +class GraspScenario(ParametrizedSimulation): def __init__(self, step_length, simulation_length, - tendon=True, + controller_cls = ConstController, smc=False, obj_external_forces: Optional[ABCForceCalculator] = None) -> None: super().__init__(step_length, simulation_length) self.grasp_object_callback = None self.event_container: List[SimulationSingleEvent] = [] - self.tendon = tendon + self.controller_cls = controller_cls self.smc = smc self.obj_external_forces = obj_external_forces @@ -71,7 +69,7 @@ def reset_events(self): def run_simulation(self, graph: GraphGrammar, - data, + controller_data, starting_positions=None, vis=False, delay=False): @@ -101,13 +99,12 @@ def run_simulation(self, force_torque_controller=chrono_forces) # add design and determine the outer force - if self.tendon: - simulation.add_design(graph, - data, - TendonController_2p, - starting_positions=starting_positions) - else: - simulation.add_design(graph, data, starting_positions=starting_positions) + + simulation.add_design(graph, + controller_data, + self.controller_cls, + starting_positions=starting_positions) + # setup parameters for the data store n_steps = int(self.simulation_length / self.step_length) @@ -127,4 +124,7 @@ def run_simulation(self, "n_contacts": (SensorCalls.AMOUNT_FORCE, SensorObjectClassification.BODY) } simulation.add_robot_data_type_dict(robot_data_dict) - return simulation.simulate(n_steps, self.step_length, 10000, self.event_container, vis) \ No newline at end of file + return simulation.simulate(n_steps, self.step_length, 10000, self.event_container, vis) + + def get_scenario_name(self): + return str(self.grasp_object_callback) \ No newline at end of file diff --git a/rostok/trajectory_optimizer/control_optimizer.py b/rostok/trajectory_optimizer/control_optimizer.py index 7025a6d3..269dcd76 100644 --- a/rostok/trajectory_optimizer/control_optimizer.py +++ b/rostok/trajectory_optimizer/control_optimizer.py @@ -4,23 +4,40 @@ from dataclasses import dataclass, field from itertools import product import os +from typing import Any, Type from joblib import Parallel, delayed -from joblib.parallel import TimeoutError import numpy as np from scipy.optimize import direct, dual_annealing +from rostok.control_chrono.controller import RobotControllerChrono -from rostok.control_chrono.tendon_controller import TendonControllerParameters +from rostok.control_chrono.tendon_controller import TendonController_2p, TendonControllerParameters from rostok.criterion.criterion_calculation import SimulationReward from rostok.criterion.simulation_flags import EventFlyingApart from rostok.graph_grammar.node import GraphGrammar from rostok.graph_grammar.node_block_typing import ( get_joint_matrix_from_graph, get_joint_vector_from_graph) +from rostok.simulation_chrono.simulation_scenario import GraspScenario, ParametrizedSimulation from rostok.trajectory_optimizer.trajectory_generator import ( joint_root_paths) from rostok.utils.json_encoder import RostokJSONEncoder +def is_valid_graph(graph: GraphGrammar): + n_joints = len(get_joint_vector_from_graph(graph)) + return n_joints > 0 + + +def build_equal_starting_positions(graph: GraphGrammar, starting_finger_angles): + joint_matrix = get_joint_matrix_from_graph(graph) + for i in range(len(joint_matrix)): + for j in range(len(joint_matrix[i])): + if j == 0: + joint_matrix[i][j] = starting_finger_angles + else: + joint_matrix[i][j] = 0 + return joint_matrix + class GraphRewardCalculator: def __init__(self): @@ -43,6 +60,161 @@ def __str__(self) -> str: return json_data +class BasePrepareOptiVar(): + + def __init__(self, + each_control_params: Any, + control_class: Type[RobotControllerChrono], + rewarder: SimulationReward, + params_start_pos=None): + self.each_control_params = each_control_params + self.control_class = control_class + self.rewarder = rewarder + self.is_vis = False + self.params_start_pos = params_start_pos + + @abstractmethod + def x_to_control_params(self, graph: GraphGrammar, x: list): + pass + + + def build_starting_positions(self, graph: GraphGrammar): + return None + + + def is_vis_decision(self, graph: GraphGrammar): + return False + + + def bound_parameters(self, graph: GraphGrammar, bounds: tuple): + """A method for determining the relationship between boundaries and mechanism. + Default implementation + Args: + graph (GraphGrammar): _description_ + + Returns: + _type_: _description_ + """ + n_joints = len(get_joint_vector_from_graph(graph)) + print('n_joints:', n_joints) + multi_bound = [] + for _ in range(n_joints): + multi_bound.append(bounds) + + return multi_bound + + def reward_one_sim_scenario(self, x: list, graph: GraphGrammar, sim: ParametrizedSimulation): + control_data = self.x_to_control_params(graph, x) + start_pos = self.build_starting_positions(graph) # pylint: disable=assignment-from-none + is_vis = self.is_vis_decision(graph) + simout = sim.run_simulation(graph, control_data, start_pos, is_vis) + rew = self.rewarder.calculate_reward(simout) + return rew, x, sim + + + + +class MockOptiVar(BasePrepareOptiVar): + + def reward_one_sim_scenario(self, x: list, + graph: GraphGrammar, + sim: ParametrizedSimulation): + + return np.mean(x), x, sim + + +class TendonForceOptiVar(BasePrepareOptiVar): + + def __init__(self, + each_control_params: TendonControllerParameters, + rewarder: SimulationReward, + params_start_pos=None): + super().__init__(each_control_params, TendonController_2p, rewarder, params_start_pos) + + def x_to_control_params(self, graph: GraphGrammar, x: list): + """Method define transofrm algorigm parameters to data control + + Args: + parameters (list): Parameter list for optimizing + + Returns: + dict: Dictionary of data control + """ + np_array_x = np.array(x) + parameters = np_array_x.round(3) + self.each_control_params.forces = list(parameters) + data = deepcopy(self.each_control_params) + return data + + def build_starting_positions(self, graph: GraphGrammar): + if self.params_start_pos: + return build_equal_starting_positions(graph, self.params_start_pos) + return None + +class BruteForceOptimisation1D(): + def __init__(self, + variants: list, + simulation_scenario: list[ParametrizedSimulation], + prepare_reward: BasePrepareOptiVar, + num_cpu_workers=1, + chunksize=1, + timeout_parallel = 60*5): + self.variants = variants + self.simulation_scenario = simulation_scenario + self.prepare_reward = prepare_reward + self.num_cpu_workers = num_cpu_workers + self.chunksize = chunksize + self.timeout_parallel = timeout_parallel + + def parallel_calculate_reward(self, graph: GraphGrammar): + if not is_valid_graph(graph): + return (0.01, []) + + all_variants_control = list(product(self.variants, repeat=len(joint_root_paths(graph)))) + if isinstance(self.simulation_scenario, list): + all_simulations = list(product(all_variants_control, self.simulation_scenario)) + input_dates = [(np.array(put[0]), graph, put[1]) for put in all_simulations] + else: + all_simulations = list(product(all_variants_control, [self.simulation_scenario])) + input_dates = [(np.array(put[0]), graph, put[1]) for put in all_simulations] + np.random.shuffle(input_dates) + parallel_results = [] + if self.num_cpu_workers > 1: + cpus = len(input_dates) + 1 if len(input_dates) < self.num_cpu_workers else self.num_cpu_workers + print(f"Use CPUs processor: {cpus}, input dates: {len(input_dates)}") + + try: + parallel_results = Parallel( + cpus, + backend="multiprocessing", + verbose=100, + timeout=self.timeout_parallel, + batch_size=self.chunksize)( + delayed(self.prepare_reward.reward_one_sim_scenario)(i[0], i[1], i[2]) for i in input_dates) + except TimeoutError: + print("TIMEOUT") + return (0.01, []) + + else: + for i in input_dates: + res = self.prepare_reward.reward_one_sim_scenario(i[0], i[1], i[2]) + parallel_results.append(res) + + result_group_object = {sim_scen.get_scenario_name(): [] for sim_scen in self.simulation_scenario} + for results in parallel_results: + scen_name = results[2].get_scenario_name() + result_group_object[scen_name].append((results[1], results[0])) + + reward = 0 + control = [] + for value in result_group_object.values(): + best_res = max(value, key=lambda i: i[1]) + reward += best_res[1] + control.append(best_res[0]) + + return (reward, control) + + class CalculatorWithConstTorqueOptimization(GraphRewardCalculator): def __init__(self, @@ -248,14 +420,14 @@ def build_starting_positions(self, graph: GraphGrammar): else: return [] - def simulate_with_control_parameters(self, data, graph, simulation_scenario): + def simulate_with_control_parameters(self, controller_data, graph, simulation_scenario): starting_positions = self.build_starting_positions(graph) return simulation_scenario.run_simulation(graph, - data, + controller_data, starting_positions, vis=False, delay=False) - + def calculate_reward(self, graph: GraphGrammar): """Constant moment optimization method using scenario simulation and rewarder for calculating objective function. @@ -464,7 +636,7 @@ def __parallel_calculate_reward(self, graph: GraphGrammar): if not multi_bound: return (0, []) - + all_variants_control = list(product(self.tendon_forces, repeat=len(joint_root_paths(graph)))) if isinstance(self.simulation_scenario, list): object_weight = {sim_scen[0].grasp_object_callback: sim_scen[1] for sim_scen in self.simulation_scenario} @@ -475,20 +647,26 @@ def __parallel_calculate_reward(self, graph: GraphGrammar): all_simulations = list(product(all_variants_control, [self.simulation_scenario])) input_dates = [(np.array(put[0]), graph, put[1]) for put in all_simulations] np.random.shuffle(input_dates) - + cpus = len(input_dates) + 1 if len(input_dates) < self.num_cpu_workers else self.num_cpu_workers print(f"Use CPUs processor: {cpus}, input dates: {len(input_dates)}") parallel_results = [] try: - parallel_results = Parallel(cpus, backend = "multiprocessing", verbose=100, timeout=self.timeout_parallel, batch_size = self.chunksize)(delayed(self._parallel_reward_with_parameters)(i) for i in input_dates) + parallel_results = Parallel( + cpus, + backend="multiprocessing", + verbose=100, + timeout=self.timeout_parallel, + batch_size=self.chunksize)( + delayed(self._parallel_reward_with_parameters)(i) for i in input_dates) except TimeoutError: - print("TIMEOUT") - return (0.01, []) + print("TIMEOUT") + return (0.01, []) result_group_object = {sim_scen[0].grasp_object_callback: [] for sim_scen in self.simulation_scenario} for results in parallel_results: obj = results[1].grasp_object_callback result_group_object[obj].append((results[0], results[2]*object_weight[obj])) - + reward = 0 control = [] for value in result_group_object.values():