Skip to content

Commit

Permalink
Added dynamic joint velocity and cartesian velocity control.
Browse files Browse the repository at this point in the history
  • Loading branch information
firephinx committed Jun 4, 2024
1 parent d467fee commit 132bd44
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 42 deletions.
72 changes: 72 additions & 0 deletions examples/run_dynamic_cartesian_velocities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import numpy as np
import time

from frankapy import FrankaConstants as FC
from frankapy import FrankaArm, SensorDataMessageType
from frankapy.proto_utils import sensor_proto2ros_msg, make_sensor_group_msg
from frankapy.proto import CartesianVelocitySensorMessage, ShouldTerminateSensorMessage
from franka_interface_msgs.msg import SensorDataGroup

from frankapy.utils import min_jerk
import rospy


if __name__ == "__main__":
fa = FrankaArm()
fa.reset_joints()

rospy.loginfo('Generating Trajectory')
p = fa.get_pose()
p.translation[2] -= 0.2
fa.goto_pose(p, duration=5, block=False)

cartesian_accelerations = [1, 1, 1, 0.1, 0.1, 0.1]

T = 5
dt = 0.01
ts = np.arange(0, T, dt)
cartesian_velocities = []

for i in range(len(ts)):
cartesian_velocities.append(fa.get_robot_state()['cartesian_velocities'])
time.sleep(dt)

fa.wait_for_skill()

print(cartesian_velocities)

pub = rospy.Publisher(FC.DEFAULT_SENSOR_PUBLISHER_TOPIC, SensorDataGroup, queue_size=1000)
rate = rospy.Rate(1 / dt)

fa.reset_joints()

rospy.loginfo('Initializing Sensor Publisher')

rospy.loginfo('Publishing cartesian velocity trajectory...')
# To ensure skill doesn't end before completing trajectory, make the buffer time much longer than needed
fa.execute_cartesian_velocities(cartesian_velocities[0], cartesian_accelerations, duration=T, dynamic=True, buffer_time=10)
init_time = rospy.Time.now().to_time()
for i in range(len(ts)):
traj_gen_proto_msg = CartesianVelocitySensorMessage(
id=i, timestamp=rospy.Time.now().to_time() - init_time,
cartesian_velocities=cartesian_velocities[i]
)
ros_msg = make_sensor_group_msg(
trajectory_generator_sensor_msg=sensor_proto2ros_msg(
traj_gen_proto_msg, SensorDataMessageType.CARTESIAN_VELOCITY)
)

rospy.loginfo('Publishing: ID {}'.format(traj_gen_proto_msg.id))
pub.publish(ros_msg)
time.sleep(dt)

# Stop the skill
# Alternatively can call fa.stop_skill()
term_proto_msg = ShouldTerminateSensorMessage(timestamp=rospy.Time.now().to_time() - init_time, should_terminate=True)
ros_msg = make_sensor_group_msg(
termination_handler_sensor_msg=sensor_proto2ros_msg(
term_proto_msg, SensorDataMessageType.SHOULD_TERMINATE)
)
pub.publish(ros_msg)

rospy.loginfo('Done')
69 changes: 69 additions & 0 deletions examples/run_dynamic_joint_velocities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import numpy as np
import time

from frankapy import FrankaConstants as FC
from frankapy import FrankaArm, SensorDataMessageType
from frankapy.proto_utils import sensor_proto2ros_msg, make_sensor_group_msg
from frankapy.proto import JointVelocitySensorMessage, ShouldTerminateSensorMessage
from franka_interface_msgs.msg import SensorDataGroup

from frankapy.utils import min_jerk
import rospy

if __name__ == "__main__":
fa = FrankaArm()
fa.reset_joints()

rospy.loginfo('Generating Trajectory')
p = fa.get_pose()
p.translation[2] -= 0.2
fa.goto_pose(p, duration=5, block=False)

joint_accelerations = [1, 1, 1, 1, 1, 1, 1]

T = 5
dt = 0.01
ts = np.arange(0, T, dt)
joint_velocities = []

for i in range(len(ts)):
joint_velocities.append(fa.get_robot_state()['joint_velocities'])
time.sleep(dt)

fa.wait_for_skill()

pub = rospy.Publisher(FC.DEFAULT_SENSOR_PUBLISHER_TOPIC, SensorDataGroup, queue_size=1000)
rate = rospy.Rate(1 / dt)

fa.reset_joints()

rospy.loginfo('Initializing Sensor Publisher')

rospy.loginfo('Publishing joints trajectory...')
# To ensure skill doesn't end before completing trajectory, make the buffer time much longer than needed
fa.execute_joint_velocities(joint_velocities[0], joint_accelerations, duration=T, dynamic=True, buffer_time=10)
init_time = rospy.Time.now().to_time()
for i in range(len(ts)):
traj_gen_proto_msg = JointVelocitySensorMessage(
id=i, timestamp=rospy.Time.now().to_time() - init_time,
joint_velocities=joint_velocities[i]
)
ros_msg = make_sensor_group_msg(
trajectory_generator_sensor_msg=sensor_proto2ros_msg(
traj_gen_proto_msg, SensorDataMessageType.JOINT_VELOCITY)
)

rospy.loginfo('Publishing: ID {}'.format(traj_gen_proto_msg.id))
pub.publish(ros_msg)
time.sleep(dt)

# Stop the skill
# Alternatively can call fa.stop_skill()
term_proto_msg = ShouldTerminateSensorMessage(timestamp=rospy.Time.now().to_time() - init_time, should_terminate=True)
ros_msg = make_sensor_group_msg(
termination_handler_sensor_msg=sensor_proto2ros_msg(
term_proto_msg, SensorDataMessageType.SHOULD_TERMINATE)
)
pub.publish(ros_msg)

rospy.loginfo('Done')
12 changes: 0 additions & 12 deletions examples/run_dynamic_pose.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
T = 5
dt = 0.02
ts = np.arange(0, T, dt)
current_gripper_width = 0.08
has_closed = False

weights = [min_jerk_weight(t, T) for t in ts]
Expand Down Expand Up @@ -64,17 +63,6 @@
feedback_controller_sensor_msg=sensor_proto2ros_msg(
fb_ctrlr_proto, SensorDataMessageType.CARTESIAN_IMPEDANCE)
)

if not has_closed:
current_gripper_width -= 0.0005
else:
current_gripper_width += 0.0005

if current_gripper_width < 0.002:
has_closed = True

fa.goto_gripper(current_gripper_width, block=False)


rospy.loginfo('Publishing: ID {}'.format(traj_gen_proto_msg.id))
pub.publish(ros_msg)
Expand Down
190 changes: 190 additions & 0 deletions frankapy/franka_arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,196 @@ def goto_joints(self,
if dynamic:
sleep(FC.DYNAMIC_SKILL_WAIT_TIME)

def execute_cartesian_velocities(self,
cartesian_velocities,
cartesian_accelerations,
duration=5,
dynamic=False,
buffer_time=FC.DEFAULT_TERM_BUFFER_TIME,
force_thresholds=None,
torque_thresholds=None,
cartesian_impedances=None,
joint_impedances=None,
block=True,
ignore_errors=True,
ignore_virtual_walls=False,
skill_desc='ExecutecartesianVelocities'):
"""
Commands Arm to execute cartesian velocities
Parameters
----------
cartesian_velocities : :obj:`list`
A list of 6 numbers that correspond to cartesian velocities in m/s and rad/s.
cartesian_accelerations : :obj:`list`
A list of 6 numbers that correspond to cartesian accelerations in m/s^2 and rad/s^2.
duration : :obj:`float`
How much time this robot motion should take.
dynamic : :obj:`bool`
Flag that states whether the skill is dynamic. If True, it
will use our cartesian impedance controller and sensor values.
buffer_time : :obj:`float`
How much extra time the termination handler will wait
before stopping the skill after duration has passed.
force_thresholds : :obj:`list`
List of 6 floats corresponding to force limits on translation
(xyz) and rotation about (xyz) axes. Default is None.
If None then will not stop on contact.
torque_thresholds : :obj:`list`
List of 7 floats corresponding to torque limits on each joint.
Default is None. If None then will not stop on contact.
cartesian_impedances : :obj:`list`
List of 6 floats corresponding to impedances on translation
(xyz) and rotation about (xyz) axes. Default is None. If None
then will use default impedances.
joint_impedances : :obj:`list`
List of 7 floats corresponding to impedances on each joint.
This is used when use_impedance is False. Default is None.
If None then will use default impedances.
block : :obj:`bool`
Function blocks by default. If False, the function becomes
asynchronous and can be preempted.
ignore_errors : :obj:`bool`
Function ignores errors by default. If False, errors and some
exceptions can be thrown.
ignore_virtual_walls : :obj:`bool`
Function checks for collisions with virtual walls by default.
If False, the robot no longer checks, which may be dangerous.
skill_desc : :obj:`str`
Skill description to use for logging on the Control PC.
Raises:
ValueError: If is_cartesians_reachable(cartesians) returns False
"""

cartesian_velocities = np.array(cartesian_velocities).tolist()
cartesian_accelerations = np.array(cartesian_accelerations).tolist()

if dynamic:
skill = Skill(SkillType.CartesianVelocitySkill,
TrajectoryGeneratorType.PassThroughCartesianVelocityTrajectoryGenerator,
feedback_controller_type=FeedbackControllerType.SetInternalImpedanceFeedbackController,
termination_handler_type=TerminationHandlerType.TimeTerminationHandler,
skill_desc=skill_desc)
block = False
else:
print("Unimplemented Cartesian Velocity Skill")

skill.add_initial_sensor_values(FC.EMPTY_SENSOR_VALUES)

skill.set_cartesian_impedances(False, cartesian_impedances, cartesian_impedances)

if not skill.check_for_contact_params(buffer_time, force_thresholds, torque_thresholds):
if dynamic:
skill.add_time_termination_params(buffer_time)

skill.add_goal_cartesian_velocities(duration, cartesian_velocities, cartesian_accelerations)
goal = skill.create_goal()

self._send_goal(goal,
cb=lambda x: skill.feedback_callback(x),
block=block,
ignore_errors=ignore_errors)

if dynamic:
sleep(FC.DYNAMIC_SKILL_WAIT_TIME)

def execute_joint_velocities(self,
joint_velocities,
joint_accelerations,
duration=5,
dynamic=False,
buffer_time=FC.DEFAULT_TERM_BUFFER_TIME,
force_thresholds=None,
torque_thresholds=None,
cartesian_impedances=None,
joint_impedances=None,
block=True,
ignore_errors=True,
ignore_virtual_walls=False,
skill_desc='ExecuteJointVelocities'):
"""
Commands Arm to execute joint velocities
Parameters
----------
joint_velocities : :obj:`list`
A list of 7 numbers that correspond to joint velocities in radians/second.
joint_accelerations : :obj:`list`
A list of 7 numbers that correspond to joint accelerations in radians/second^2.
duration : :obj:`float`
How much time this robot motion should take.
dynamic : :obj:`bool`
Flag that states whether the skill is dynamic. If True, it
will use our joint impedance controller and sensor values.
buffer_time : :obj:`float`
How much extra time the termination handler will wait
before stopping the skill after duration has passed.
force_thresholds : :obj:`list`
List of 6 floats corresponding to force limits on translation
(xyz) and rotation about (xyz) axes. Default is None.
If None then will not stop on contact.
torque_thresholds : :obj:`list`
List of 7 floats corresponding to torque limits on each joint.
Default is None. If None then will not stop on contact.
cartesian_impedances : :obj:`list`
List of 6 floats corresponding to impedances on translation
(xyz) and rotation about (xyz) axes. Default is None. If None
then will use default impedances.
joint_impedances : :obj:`list`
List of 7 floats corresponding to impedances on each joint.
This is used when use_impedance is False. Default is None.
If None then will use default impedances.
block : :obj:`bool`
Function blocks by default. If False, the function becomes
asynchronous and can be preempted.
ignore_errors : :obj:`bool`
Function ignores errors by default. If False, errors and some
exceptions can be thrown.
ignore_virtual_walls : :obj:`bool`
Function checks for collisions with virtual walls by default.
If False, the robot no longer checks, which may be dangerous.
skill_desc : :obj:`str`
Skill description to use for logging on the Control PC.
Raises:
ValueError: If is_joints_reachable(joints) returns False
"""

joint_velocities = np.array(joint_velocities).tolist()
joint_accelerations = np.array(joint_accelerations).tolist()

if dynamic:
skill = Skill(SkillType.JointVelocitySkill,
TrajectoryGeneratorType.PassThroughJointVelocityTrajectoryGenerator,
feedback_controller_type=FeedbackControllerType.SetInternalImpedanceFeedbackController,
termination_handler_type=TerminationHandlerType.TimeTerminationHandler,
skill_desc=skill_desc)
block = False
else:
print("Unimplemented Joint Velocity Skill")

skill.add_initial_sensor_values(FC.EMPTY_SENSOR_VALUES)

skill.set_joint_impedances(False, cartesian_impedances, joint_impedances, None, None)

if not skill.check_for_contact_params(buffer_time, force_thresholds, torque_thresholds):
if dynamic:
skill.add_time_termination_params(buffer_time)
else:
skill.add_joint_threshold_params(buffer_time, FC.DEFAULT_JOINT_THRESHOLDS)

skill.add_goal_joint_velocities(duration, joint_velocities, joint_accelerations)
goal = skill.create_goal()

self._send_goal(goal,
cb=lambda x: skill.feedback_callback(x),
block=block,
ignore_errors=ignore_errors)

if dynamic:
sleep(FC.DYNAMIC_SKILL_WAIT_TIME)

def execute_joint_dmp(self,
joint_dmp_info,
duration,
Expand Down
2 changes: 2 additions & 0 deletions frankapy/franka_arm_state_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def get_data(self):
if self._offline:
logging.warn('In offline mode - FrankaArmStateClient will return 0 values.')
return {
'cartesian_velocities': np.zeros(6),
'pose': franka_pose_to_rigid_transform(np.eye(4)),
'pose_desired': franka_pose_to_rigid_transform(np.eye(4)),
'joint_torques': np.zeros(7),
Expand All @@ -42,6 +43,7 @@ def get_data(self):
ros_data = self._get_current_robot_state().robot_state

data = {
'cartesian_velocities': np.array(ros_data.O_dP_EE_c),
'pose': franka_pose_to_rigid_transform(ros_data.O_T_EE),
'pose_desired': franka_pose_to_rigid_transform(ros_data.O_T_EE_d),
'joint_torques': np.array(ros_data.tau_J),
Expand Down
Loading

0 comments on commit 132bd44

Please sign in to comment.