Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Article 3events and criteria #126

Merged
merged 5 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions rostok/criterion/criterion_calculation.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from abc import ABC
import json
from bisect import bisect_left
from typing import Dict, List, Optional, Tuple, Union
from abc import ABC
from typing import List

import numpy as np
import pychrono.core as chrono
from scipy.spatial import distance

from rostok.criterion.simulation_flags import (EventContactTimeOut, EventGrasp,
EventSlipOut)
from rostok.simulation_chrono.simulation_utils import SimulationResult
from rostok.criterion.simulation_flags import SimulationSingleEvent, EventContactTimeOut, EventGrasp, EventSlipOut
from rostok.utils.json_encoder import RostokJSONEncoder


Expand Down Expand Up @@ -131,7 +130,7 @@ def calculate_reward(self, simulation_output: SimulationResult):
body_outer_force_center = env_data.get_data("force_center")[0][self.grasp_event.step_n +
1]
if body_outer_force_center is np.nan:
print(body_COG, body_outer_force_center)
return 0
dist = distance.euclidean(body_COG, body_outer_force_center)
return 1 / (1 + dist)
else:
Expand Down Expand Up @@ -294,15 +293,15 @@ def calculate_reward(self, simulation_output, partial=False):
partial_rewards = []
for criterion in self.criteria:
reward = criterion.calculate_reward(simulation_output)
partial_rewards.append(round(reward,3))
partial_rewards.append(round(reward, 3))

if partial:
return partial_rewards
if self.verbosity > 0:
print([round(x, 3) for x in partial_rewards])

total_reward = sum([a * b for a, b in zip(partial_rewards, self.weights)])

if np.isclose(total_reward, 0, atol=1e-3):
total_reward = 0.02
return round(total_reward, 3)
Expand Down
109 changes: 83 additions & 26 deletions rostok/criterion/simulation_flags.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from abc import ABC, abstractmethod
import json
from typing import Dict, List, Optional, Tuple, Union
from abc import ABC, abstractmethod
from enum import Enum
import types
from typing import Optional

import numpy as np
import pychrono.core as chrono
from rostok.utils.json_encoder import RostokJSONEncoder

from rostok.virtual_experiment.sensors import DataStorage, Sensor
from rostok.utils.json_encoder import RostokJSONEncoder
from rostok.virtual_experiment.sensors import Sensor


class EventCommands(Enum):
Expand Down Expand Up @@ -41,7 +40,7 @@ def reset(self):
self.step_n = None

@abstractmethod
def event_check(self, current_time: float, step_n: int, robot_data, env_data):
def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_data: Sensor):
"""Simulation calls that method each step to check if the event occurred.

Args:
Expand All @@ -63,14 +62,39 @@ def __str__(self) -> str:
class EventContact(SimulationSingleEvent):
"""Event of contact between robot and object

Attributes:
from_body (bool): flag determines the source of contact information.
"""
def __init__(self, take_from_body:bool = False):
super().__init__()
self.from_body = take_from_body

def event_check(self, current_time: float, step_n: int, robot_data, env_data):
if env_data.get_amount_contacts()[0] > 0:
self.state = True
self.step_n = step_n
def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_data: Sensor):
if self.state:
if __debug__:
raise Exception("The EventContact.event_check is called after occurrence in simulation.")
return EventCommands.CONTINUE

if not self.from_body:
# the contact information from the object
if env_data.get_amount_contacts()[0] > 0:
self.state = True
self.step_n = step_n
else:
# the contact information from the robot
robot_contacts = robot_data.get_amount_contacts()
# it works only with current rule set, where the palm/flat always has the smallest index among the bodies
flat_idx_ = list(robot_contacts.keys())[0]
contacts = 0
Huowl marked this conversation as resolved.
Show resolved Hide resolved
# we calculate only the amount of unique keys, therefore the amount of unique contacting bodies
for key, value in robot_contacts.items():
if key != flat_idx_ and value > 0:
contacts += 1

if contacts > 0:
self.state = True
self.step_n = step_n

return EventCommands.CONTINUE


Expand All @@ -94,6 +118,12 @@ def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_
Returns:
EventCommands: return a command for simulation
"""
if self.state:
if __debug__:
raise Exception("The EventContactTimeOut.event_check is called after occurrence in simulation.")
return EventCommands.STOP

# if the contact has already occurred in simulation, return CONTINUE
if self.contact_event.state:
return EventCommands.CONTINUE

Expand Down Expand Up @@ -122,11 +152,15 @@ def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_
Returns:
EventCommands: return a command for simulation
"""
if self.state:
if __debug__:
raise Exception("The EventFlyingApart.event_check is called after occurrence in simulation.")
return EventCommands.STOP

trajectory_points = robot_data.get_body_trajectory_point()
# It takes the position of the first block in the list, that should be the base body
base_position = trajectory_points[next(iter(trajectory_points))]
for block in trajectory_points.values():
position = block
for position in trajectory_points.values():
if np.linalg.norm(np.array(base_position) - np.array(position)) > self.max_distance:
Huowl marked this conversation as resolved.
Show resolved Hide resolved
self.state = True
self.step_n = step_n
Expand Down Expand Up @@ -158,19 +192,26 @@ def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_
Returns:
EventCommands: return a command for simulation
"""
# Old variant: contact = env_data.get_amount_contacts()[0] > 0
if self.state:
if __debug__:
raise Exception("The EventSlipOut.event_check is called after occurrence in simulation.")
return EventCommands.STOP

robot_contacts = robot_data.get_amount_contacts()
flat_idx_= list(robot_contacts.keys())[0]
# it works only with current rule set, where the palm/flat always has the smallest index among the bodies
flat_idx_ = list(robot_contacts.keys())[0]
contacts = 0
# we calculate only the amount of unique keys, therefore the amount of unique contacting bodies
for key, value in robot_contacts.items():
if key != flat_idx_:
contacts += value
if key != flat_idx_ and value > 0:
contacts += 1

contact = contacts > 0
if contact:
self.time_last_contact = current_time
return EventCommands.CONTINUE

if (not self.time_last_contact is None):
if not (self.time_last_contact is None):
if current_time - self.time_last_contact > self.reference_time:
Huowl marked this conversation as resolved.
Show resolved Hide resolved
self.step_n = step_n
self.state = True
Expand All @@ -191,7 +232,11 @@ class EventGrasp(SimulationSingleEvent):
force_test_time (float): the time period of the force test of the grasp
"""

def __init__(self, grasp_limit_time: float, contact_event: EventContact, verbosity: int = 0, simulation_stop = False):
def __init__(self,
grasp_limit_time: float,
contact_event: EventContact,
verbosity: int = 0,
simulation_stop:bool = False):
super().__init__(verbosity=verbosity)
self.grasp_steps: int = 0
self.grasp_time: Optional[float] = None
Expand All @@ -207,30 +252,42 @@ def reset(self):
def check_grasp_timeout(self, current_time):
if current_time > self.grasp_limit_time:
return True
else:
return False
return False

def check_grasp_current_step(self, env_data: Sensor):
def check_grasp_current_step(self, env_data: Sensor, robot_data: Sensor):
obj_velocity = np.linalg.norm(np.array(env_data.get_velocity()[0]))
if obj_velocity <= 0.01 and env_data.get_amount_contacts()[0] >= 2:
robot_contacts = robot_data.get_amount_contacts()
# it works only with current rule set, where the palm/flat always has the smallest index among the bodies
flat_idx_ = list(robot_contacts.keys())[0]
contacts = 0
# we calculate only the amount of unique keys, therefore the amount of unique contacting bodies
for key, value in robot_contacts.items():
if key != flat_idx_ and value > 0:
contacts += 1

if obj_velocity <= 0.01 and contacts >= 2:
self.grasp_steps += 1
else:
self.grasp_steps = 0

def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_data: Sensor):
"""Return ACTIVATE if the body was in contact with the robot and after that at some
point doesn't move for at least 10 steps. Return STOP if the grasp didn't occur during grasp_limit_time.


Returns:
EventCommands: return a command for simulation
"""

if self.state:
if __debug__:
raise Exception("The EventGrasp.event_check is called after occurrence in simulation.")
return EventCommands.CONTINUE

if self.check_grasp_timeout(current_time):
return EventCommands.STOP

if self.contact_event.state:
self.check_grasp_current_step(env_data)
self.check_grasp_current_step(env_data, robot_data)

if self.grasp_steps == 10:
self.state = True
Expand All @@ -247,7 +304,7 @@ def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_


class EventStopExternalForce(SimulationSingleEvent):

"""Event that controls external force and stops simulation after reference time has passed."""
def __init__(self, grasp_event: EventGrasp, force_test_time: float):
super().__init__()
self.grasp_event = grasp_event
Expand Down
2 changes: 1 addition & 1 deletion rostok/simulation_chrono/simulation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple

import pychrono as chrono
import pychrono.irrlicht as chronoirr
Expand Down
5 changes: 2 additions & 3 deletions rostok/simulation_chrono/simulation_scenario.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from copy import deepcopy
import json
from typing import Dict, List, Optional, Tuple
from typing import List

import numpy as np
import pychrono as chrono

from rostok.control_chrono.controller import (ConstController, SinControllerChrono)
Expand All @@ -12,7 +11,7 @@
from rostok.simulation_chrono.simulation import (ChronoSystems, EnvCreator, SingleRobotSimulation,
ChronoVisManager)
from rostok.simulation_chrono.simulation_utils import \
set_covering_sphere_based_position, set_covering_ellipsoid_based_position
set_covering_ellipsoid_based_position
from rostok.utils.json_encoder import RostokJSONEncoder
from rostok.virtual_experiment.sensors import (SensorCalls, SensorObjectClassification)
from rostok.block_builder_chrono.block_builder_chrono_api import \
Expand Down
10 changes: 5 additions & 5 deletions rostok/simulation_chrono/simulation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@


def calculate_covering_sphere(obj: ChronoEasyShapeObject):
v1 = chrono.ChVectorD(0, 0, 0)
v2 = chrono.ChVectorD(0, 0, 0)
obj.body.GetTotalAABB(bbmin=v1, bbmax=v2)
local_center = (v1 + v2) * 0.5
radius = ((v2 - v1).Length()) * 0.5
v_1 = chrono.ChVectorD(0, 0, 0)
v_2 = chrono.ChVectorD(0, 0, 0)
obj.body.GetTotalAABB(bbmin=v_1, bbmax=v_2)
local_center = (v_1 + v_2) * 0.5
radius = ((v_2 - v_1).Length()) * 0.5
visual = chrono.ChSphereShape(radius)
visual.SetOpacity(0.3)
obj.body.AddVisualShape(visual, chrono.ChFrameD(local_center))
Expand Down
6 changes: 4 additions & 2 deletions rostok/virtual_experiment/built_graph_chrono.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import pychrono.core as chrono
from pychrono.core import ChQuaternionD, ChVectorD

from rostok.block_builder_api.block_parameters import (DefaultFrame, FrameTransform)
from rostok.block_builder_api.block_parameters import (DefaultFrame,
FrameTransform)
from rostok.block_builder_chrono.block_builder_chrono_api import \
ChronoBlockCreatorInterface as creator
from rostok.block_builder_chrono.block_classes import (BLOCK_CLASS_TYPES, ChronoRevolveJoint,
from rostok.block_builder_chrono.block_classes import (BLOCK_CLASS_TYPES,
ChronoRevolveJoint,
PrimitiveBody)
from rostok.block_builder_chrono.block_connect import place_and_connect
from rostok.graph_grammar.node import GraphGrammar
Expand Down
3 changes: 2 additions & 1 deletion rostok/virtual_experiment/robot_new.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pychrono.core as chrono

from rostok.block_builder_api.block_parameters import (DefaultFrame, FrameTransform)
from rostok.block_builder_api.block_parameters import (DefaultFrame,
FrameTransform)
from rostok.control_chrono.controller import ConstController
from rostok.graph_grammar.node import GraphGrammar
from rostok.virtual_experiment.built_graph_chrono import BuiltGraphChrono
Expand Down
Loading
Loading