From f5f03e5a15f20f114251abb190c571d64c168af3 Mon Sep 17 00:00:00 2001 From: Yuxiao Hua Date: Tue, 26 Dec 2023 21:46:12 +0800 Subject: [PATCH] =?UTF-8?q?#13=20=E5=AE=9E=E7=8E=B0=E4=BD=8D=E7=BD=AE?= =?UTF-8?q?=E5=BE=AE=E8=B0=83=E9=97=AD=E7=8E=AF=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{master.py => main.py} | 993 ++++++++---------- 1 file changed, 449 insertions(+), 544 deletions(-) rename tb4_find_pickup_place/tb4_find_pickup_place/{master.py => main.py} (86%) diff --git a/tb4_find_pickup_place/tb4_find_pickup_place/master.py b/tb4_find_pickup_place/tb4_find_pickup_place/main.py similarity index 86% rename from tb4_find_pickup_place/tb4_find_pickup_place/master.py rename to tb4_find_pickup_place/tb4_find_pickup_place/main.py index 9433554..1a56163 100644 --- a/tb4_find_pickup_place/tb4_find_pickup_place/master.py +++ b/tb4_find_pickup_place/tb4_find_pickup_place/main.py @@ -34,7 +34,7 @@ PointB = PoseStamped( header=Header(frame_id='map'), pose=Pose( - position=Point(x=2.85, y=-3.45, z=0.0), + position=Point(x=2.85, y=-3.2, z=0.0), orientation=Quaternion(x=qB[0], y=qB[1], z=qB[2], w=qB[3]) ) ) @@ -49,32 +49,30 @@ ) -class Pickup(Node): + + +class Place(Node): def __init__(self): - super().__init__("Pickup") - self.marker2camera_Matrix = np.eye(4) - self.camera2base_Matrix = np.eye(4) - self.marker2base_Matrix = np.eye(4) + super().__init__("Place") - self.rot_matrix1 = np.array([[0, 0, 1, 0.02], - [0, 1, 0, 0.025], - [-1, 0, 0, -0.02], + self.rot_matrix1 = np.array([[0, 0, 1, 0], + [0, 1, 0, 0], + [-1, 0, 0, 0], [0, 0, 0, 1]]) self.rot_matrix2 = np.array([[0, 0, 1, 0], [-1, 0, 0, 0], [0, -1, 0, 0], [0, 0, 0, 1]]) - self.aruco_update = True self.fb_sub = self.create_subscription(JointState, "/joint_states", self.js_cb, 10) - self.marker_sub = self.create_subscription(PoseArray,"/aruco_poses",self.ar_cb, 10) + self.tf_buffer = Buffer() self.tf_listener = TransformListener(self.tf_buffer, self) self.cmd_pub = self.create_publisher(JointSingleCommand, "/px100/commands/joint_single", 10) self.group_pub = self.create_publisher(JointGroupCommand, "/px100/commands/joint_group", 10) - self.pub_timer = self.create_timer(0.1, self.pickup_cb) + self.place_timer = self.create_timer(0.1, self.place_cb) self.pantil_pub = self.create_publisher(PanTiltCmdDeg,"/pan_tilt_cmd_deg",10) self.ar_pos = None @@ -117,6 +115,9 @@ def quat2matrix(self,quat,pos): def js_cb(self, msg): + # if self.pickupFlag == False or self.placeFlag == False: + # return + # print('joint state callback') if len(msg.name) == 7: self.joint_pos.clear() @@ -125,128 +126,26 @@ def js_cb(self, msg): # print(self.joint_pos) - # camera2marker matrix - def ar_cb(self, msg): - # print('ar_cb') - if not self.aruco_update: - return - # 更新相机与base的转换 - now = rclpy.time.Time() - try: - trans = self.tf_buffer.lookup_transform("px100/base_link","camera_link",now) - position = trans.transform.translation - orientation = trans.transform.rotation - position = np.array([position.x, position.y, position.z]) - orientation = np.array([orientation.x, orientation.y, orientation.z, orientation.w]) - self.camera2base_Matrix = self.quat2matrix(orientation, position) - self.aruco_update = False - # print("camera2base updated") - except : - print("pass") - pass - - # 这里处理接收到的坐标信息 - pose = msg.poses[0] - - position = np.array([pose.position.x, pose.position.y, pose.position.z]) - # 从 Pose 消息中获取方向信息 - orientation = np.array([pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]) - matrix = self.quat2matrix(orientation, position) - - self.marker2camera_Matrix = matrix - - self.marker2base_Matrix = np.matmul(np.matmul(self.camera2base_Matrix, self.rot_matrix2), self.marker2camera_Matrix) - self.marker2base_Matrix = np.matmul(self.marker2base_Matrix, self.rot_matrix1) - - return None - - - def pickup_cb(self): - # print('pickup_cb') - # print(self.joint_pos) + def place_cb(self): if len(self.joint_pos) == 7: - # 获取当前时间 - current_time = datetime.datetime.now() - duration = current_time - start_time - print(current_time, duration) - if duration > datetime.timedelta(seconds=40): - print('Time out! Forced exit!') - print(1/0) - - print(self.machine_state) match self.machine_state: + # case "INIT": + # if self.go_home_pos() == True: + # print('Go home pos done!') + # self.machine_state = "NEXT0" case "INIT": - self.pantil_deg_cmd.pitch = 10.0 # 调整云台俯仰角 - self.pantil_deg_cmd.yaw = 0.0 - self.pantil_deg_cmd.speed = 10 - self.pantil_pub.publish(self.pantil_deg_cmd) - if self.go_sleep_pos() == True and self.release(): - print('Go slep pos done!') - self.aruco_update = True - - # print('base2marker_Matrix: ', self.base2marker_Matrix) - if np.array_equal(self.marker2base_Matrix, np.eye(4)): - print('no marker detected!') - self.machine_state = "INIT" - # time.sleep(3.0) - else: - print('marker detected!') - print('base2marker_Matrix: ', self.marker2base_Matrix) - self.machine_state = "NEXT0" - #self.machine_state = "NEXT4" - # time.sleep(3.0) - - case "NEXT0": - if self.set_group_pos([-1.0, 0.0, 0.8, -1.4]) == True: - print('NEXT0 control done!') - # self.machine_state = "NEXT1" - self.machine_state = "NEXT1" - #self.machine_state = "TEST1" - + if self.release(): + print('Release done!') + self.machine_state = "NEXT1" case "NEXT1": - angle = np.arctan2(self.marker2base_Matrix[1, 3], self.marker2base_Matrix[0, 3]) - if self.set_group_pos([angle, -0.3, 0.8, -1.4]) == True: - print('NEXT1 control done!') - self.machine_state = "NEXT2" - - case "NEXT2": - np.set_printoptions(precision=3) - print('base2marker_Matrix: ', self.marker2base_Matrix) - angle = np.arctan2(self.marker2base_Matrix[1, 3], self.marker2base_Matrix[0, 3]) - guess2 = [angle, -0.1, 0.0, 0.0] - mlist, mflag = self.matrix_control(self.marker2base_Matrix, custom_guess=guess2) - print('mlist2:', mlist) - self.fk = self.joint_to_pose(mlist) - np.set_printoptions(precision=3) - print('fk', self.fk) - # error = self.marker2base_Matrix - self.fk - np.set_printoptions(precision=3) - - if self.set_group_pos([mlist[0], mlist[1], mlist[2], mlist[3]]) and self.release(): - print('NEXT2 control done!') - # time.sleep(3.0) - self.grasp(0.7) - self.machine_state = "NEXT3" - time.sleep(1.0) - - case "NEXT3": - if self.set_group_pos([0.0, -0.2, 0.0, -1.2]) == True: - print('NEXT3 control done!') - self.machine_state = "NEXT4" - # time.sleep(3.0) - case "NEXT4": if self.go_sleep_pos() == True: print('Go sleep pos done!') - self.machine_state = "QUIT" - # self.release() - # self.machine_state = "NEXT6" - # time.sleep(1.0) - case 'QUIT': - self.pub_timer.destroy() + self.machine_state = "EXIT" + case 'EXIT': print(1/0) + self.place_timer.destroy() return - - pass + pass def set_single_pos(self, name, pos, blocking=True): @@ -351,40 +250,33 @@ def matrix_control(self, T_sd, custom_guess: list[float]=None, execute: bool=Tru initial_guesses = [custom_guess] for guess in initial_guesses: + theta_list, success = mr.IKinSpace( + Slist=self.robot_des.Slist, + M=self.robot_des.M, + T=T_sd, + thetalist0=guess, + eomg=5.0, + ev=0.02, + ) + solution_found = True + print('success: ',success) + # Check to make sure a solution was found and that no joint limits were violated + if success: + print('success',success) + theta_list = self._wrap_theta_list(theta_list) + # solution_found = self._check_joint_limits(theta_list) + solution_found = True + else: + solution_found = False - thetalist = np.array(guess).copy() - i = 0 - maxiterations = 400 - Tsb = mr.FKinSpace(self.robot_des.M, self.robot_des.Slist, thetalist) - err = Tsb[0:3, 3] - T_sd[0:3, 3] - print(err) - step = 0.005 - while sum(abs(err)) > 0.01 and i < maxiterations: - if abs(err[0]) > 0.005: - if err[0] < 0: - if thetalist[2] < 0: - if thetalist[1] < 0.9: - thetalist = thetalist + np.array([0.0, step, -step, 0.0]) - else: - thetalist = thetalist + np.array([0.0, 0.0, -step, step]) - else: - thetalist = thetalist - np.array([0.0, 0.0, -step, step]) - if abs(err[2]) > 0.005: - if err[2] < 0: - thetalist = thetalist + np.array([0.0, 0.0, -step, -step]) - else: - thetalist = thetalist - np.array([0.0, 0.0, -step, -step]) - Tsb = mr.FKinSpace(self.robot_des.M, self.robot_des.Slist, thetalist) - err = Tsb[0:3, 3] - T_sd[0:3, 3] - i = i + 1 - # print(i) - print('tlist', thetalist) - return thetalist, True + if solution_found: + if execute: + joint_list = [theta_list[0],theta_list[1],theta_list[2], theta_list[3]] + self.set_group_pos(joint_list) + self.T_sb = T_sd + return theta_list, True - # self.core.get_logger().warn('No valid pose could be found. Will not execute') - - return initial_guesses, False - # return theta_list, False + return theta_list, False def waist_control(self, pos): @@ -509,20 +401,19 @@ def _wrap_theta_list(self, theta_list: list[np.ndarray]) -> list[np.ndarray]: -class Spin(Node): - def __init__(self): - super().__init__("Spin") + + +class Pickup(Node): + def __init__(self): + super().__init__("Pickup") self.marker2camera_Matrix = np.eye(4) self.camera2base_Matrix = np.eye(4) self.marker2base_Matrix = np.eye(4) - # self.rot_matrix1 = np.array([[1, 0, 0, -0.01], - # [0, 1, 0, 0], - # [0, 0, 1, 0], - # [0, 0, 0, 1]]) - self.rot_matrix1 = np.array([[0, 0, 1, 0], - [0, 1, 0, 0], - [-1, 0, 0, 0], + + self.rot_matrix1 = np.array([[0, 0, 1, 0.02], + [0, 1, 0, 0.025], + [-1, 0, 0, -0.02], [0, 0, 0, 1]]) self.rot_matrix2 = np.array([[0, 0, 1, 0], @@ -536,12 +427,18 @@ def __init__(self): self.tf_buffer = Buffer() self.tf_listener = TransformListener(self.tf_buffer, self) - self.cmd_vel = self.create_publisher('cmd_vel_mux/input/navi', Twist, queue_size=10) - self.place_timer = self.create_timer(0.1, self.place_cb) + self.cmd_pub = self.create_publisher(JointSingleCommand, "/px100/commands/joint_single", 10) + self.group_pub = self.create_publisher(JointGroupCommand, "/px100/commands/joint_group", 10) + self.pub_timer = self.create_timer(0.1, self.pickup_cb) + self.pantil_pub = self.create_publisher(PanTiltCmdDeg,"/pan_tilt_cmd_deg",10) self.ar_pos = None self.ar_quat = None - + self.pantil_deg_cmd = PanTiltCmdDeg() + self.arm_command = JointSingleCommand() + self.arm_group_command = JointGroupCommand() + + self.cnt = 0 self.thred = 0.1 self.joint_pos = [] self.moving_time = 2.0 @@ -553,6 +450,13 @@ def __init__(self): self.initial_guesses[2][0] = np.deg2rad(30) self.robot_des: mrd.ModernRoboticsDescription = getattr(mrd, 'px100') + self.machine_state = "INIT" + + self.gripper_pressure: float = 0.5 + self.gripper_pressure_lower_limit: int = 150 + self.gripper_pressure_upper_limit: int = 350 + self.gripper_value = self.gripper_pressure_lower_limit + (self.gripper_pressure*(self.gripper_pressure_upper_limit - self.gripper_pressure_lower_limit)) + pass @@ -568,9 +472,6 @@ def quat2matrix(self,quat,pos): def js_cb(self, msg): - # if self.pickupFlag == False or self.placeFlag == False: - # return - # print('joint state callback') if len(msg.name) == 7: self.joint_pos.clear() @@ -608,338 +509,99 @@ def ar_cb(self, msg): matrix = self.quat2matrix(orientation, position) self.marker2camera_Matrix = matrix - # print("camera2marker matrix: ", self.marker2camera_Matrix) - #self.marker2base_Matrix = np.dot(self.camera2base_Matrix, np.dot(self.rot_matrix2, np.dot(self.marker2camera_Matrix, self.rot_matrix1))) - self.marker2base_Matrix = np.matmul(np.matmul(self.camera2base_Matrix, self.rot_matrix2), self.marker2camera_Matrix) self.marker2base_Matrix = np.matmul(self.marker2base_Matrix, self.rot_matrix1) - # print('base2marker matrix:', self.base2marker_Matrix) - #print('m1 ', self.marker2camera_Matrix) - #print('m2 ', self.camera2base_Matrix) return None - def place_cb(self): - # Twist is a datatype for velocity - move_cmd = Twist() - # let's turn at 0 radians/s - move_cmd.angular.z = radians(45); #45 deg/s in radians/s - - if np.array_equal(self.marker2base_Matrix, np.eye(4)): - print('no marker detected!') - elif self.marker2base_Matrix[0, 0] > 0.1: - # publish the velocity - self.cmd_vel.publish(move_cmd) - else: - self.place_timer.destroy() - print(1/0) - return - - - -class Move(Node): - def __init__(self): - super().__init__("Move") - - self.marker2camera_Matrix = np.eye(4) - self.camera2base_Matrix = np.eye(4) - self.marker2base_Matrix = np.eye(4) - - self.rot_matrix1 = np.array([[0, 0, 1, 0], - [0, 1, 0, 0], - [-1, 0, 0, 0], - [0, 0, 0, 1]]) - - self.rot_matrix2 = np.array([[0, 0, 1, 0], - [-1, 0, 0, 0], - [0, -1, 0, 0], - [0, 0, 0, 1]]) - self.aruco_update = True - - self.fb_sub = self.create_subscription(JointState, "/joint_states", self.js_cb, 10) - self.marker_sub = self.create_subscription(PoseArray,"/aruco_poses",self.ar_cb, 10) - self.tf_buffer = Buffer() - self.tf_listener = TransformListener(self.tf_buffer, self) - - self.cmd_pub = self.create_publisher(JointSingleCommand, "/px100/commands/joint_single", 10) - self.group_pub = self.create_publisher(JointGroupCommand, "/px100/commands/joint_group", 10) - self.place_timer = self.create_timer(0.1, self.place_cb) - self.pantil_pub = self.create_publisher(PanTiltCmdDeg,"/pan_tilt_cmd_deg",10) - - self.ar_pos = None - self.ar_quat = None - self.pantil_deg_cmd = PanTiltCmdDeg() - - self.thred = 0.1 - self.joint_pos = [] - self.moving_time = 2.0 - self.num_joints = 4 - self.joint_lower_limits = [-1.5, -0.4, -1.1, -1.4] - self.joint_upper_limits = [1.5, 0.9, 0.8, 1.8] - self.initial_guesses = [[0.0] * self.num_joints] * 3 - self.initial_guesses[1][0] = np.deg2rad(-30) - self.initial_guesses[2][0] = np.deg2rad(30) - self.robot_des: mrd.ModernRoboticsDescription = getattr(mrd, 'px100') - - pass - - - def quat2matrix(self,quat,pos): - position = pos[:,np.newaxis] - share_vector = np.array([0,0,0,1],dtype = float)[np.newaxis,:] - r = R.from_quat(quat) - rotation_matrix = r.as_matrix() - m34 = np.concatenate((rotation_matrix, position),axis = 1) - matrix = np.concatenate((m34,share_vector),axis = 0) - - return matrix - - - def js_cb(self, msg): - # if self.pickupFlag == False or self.placeFlag == False: - # return - - # print('joint state callback') - if len(msg.name) == 7: - self.joint_pos.clear() - for i in range(7): - self.joint_pos.append(msg.position[i]) - # print(self.joint_pos) - - - # camera2marker matrix - def ar_cb(self, msg): - # print('ar_cb') - if not self.aruco_update: - return - # 更新相机与base的转换 - now = rclpy.time.Time() - try: - trans = self.tf_buffer.lookup_transform("px100/base_link","camera_link",now) - position = trans.transform.translation - orientation = trans.transform.rotation - position = np.array([position.x, position.y, position.z]) - orientation = np.array([orientation.x, orientation.y, orientation.z, orientation.w]) - self.camera2base_Matrix = self.quat2matrix(orientation, position) - self.aruco_update = False - # print("camera2base updated") - except : - print("pass") - pass - - # 这里处理接收到的坐标信息 - pose = msg.poses[0] - - position = np.array([pose.position.x, pose.position.y, pose.position.z]) - # 从 Pose 消息中获取方向信息 - orientation = np.array([pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]) - matrix = self.quat2matrix(orientation, position) - - self.marker2camera_Matrix = matrix - # print("camera2marker matrix: ", self.marker2camera_Matrix) - - #self.marker2base_Matrix = np.dot(self.camera2base_Matrix, np.dot(self.rot_matrix2, np.dot(self.marker2camera_Matrix, self.rot_matrix1))) - - self.marker2base_Matrix = np.matmul(np.matmul(self.camera2base_Matrix, self.rot_matrix2), self.marker2camera_Matrix) - self.marker2base_Matrix = np.matmul(self.marker2base_Matrix, self.rot_matrix1) - - # print('base2marker matrix:', self.base2marker_Matrix) - #print('m1 ', self.marker2camera_Matrix) - #print('m2 ', self.camera2base_Matrix) - return None - - - def place_cb(self): - - navigate = BasicNavigator() - navigate.waitUntilNav2Active() - - relative = [self.marker2base_Matrix[0,3], - self.marker2base_Matrix[1,3]] - final = [0.25-(relative[0]-0.2), - -3.3-(relative[1]-0.2)] - - angle = np.arctan2(relative[1], relative[0]) - qFinal = quaternion_from_euler(0.0, 0.0, np.deg2rad(angle-90.0)) - - PointFinal = PoseStamped( - header=Header(frame_id='map'), - pose=Pose( - position=Point(x=final[0], y=final[1], z=0.0), - orientation=Quaternion(x=qFinal[0], y=qFinal[1], z=qFinal[2], w=qFinal[3]) - ) - ) - - navigate.goToPose(PointFinal) - while not navigate.isTaskComplete(): - feedback = navigate.getFeedback() - if feedback: - # Some navigation timeout to demo cancellation - if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0): - navigate.cancelTask() - # Do something depending on the return code - result = navigate.getResult() - if result == TaskResult.SUCCEEDED: - print('Goal reached') - elif result == TaskResult.CANCELED: - print('Goal was canceled!') - elif result == TaskResult.FAILED: - print('Goal failed!') - else: - print('Goal has an invalid return status!') - self.place_timer.destroy() - print(1/0) - pass - - - -class Place(Node): - def __init__(self): - super().__init__("Place") - - self.marker2camera_Matrix = np.eye(4) - self.camera2base_Matrix = np.eye(4) - self.marker2base_Matrix = np.eye(4) - # self.rot_matrix1 = np.array([[1, 0, 0, -0.01], - # [0, 1, 0, 0], - # [0, 0, 1, 0], - # [0, 0, 0, 1]]) - self.rot_matrix1 = np.array([[0, 0, 1, 0], - [0, 1, 0, 0], - [-1, 0, 0, 0], - [0, 0, 0, 1]]) - - self.rot_matrix2 = np.array([[0, 0, 1, 0], - [-1, 0, 0, 0], - [0, -1, 0, 0], - [0, 0, 0, 1]]) - self.aruco_update = True - - self.fb_sub = self.create_subscription(JointState, "/joint_states", self.js_cb, 10) - self.marker_sub = self.create_subscription(PoseArray,"/aruco_poses",self.ar_cb, 10) - self.tf_buffer = Buffer() - self.tf_listener = TransformListener(self.tf_buffer, self) - - self.cmd_pub = self.create_publisher(JointSingleCommand, "/px100/commands/joint_single", 10) - self.group_pub = self.create_publisher(JointGroupCommand, "/px100/commands/joint_group", 10) - self.place_timer = self.create_timer(0.1, self.place_cb) - self.pantil_pub = self.create_publisher(PanTiltCmdDeg,"/pan_tilt_cmd_deg",10) - - self.ar_pos = None - self.ar_quat = None - self.pantil_deg_cmd = PanTiltCmdDeg() - self.arm_command = JointSingleCommand() - self.arm_group_command = JointGroupCommand() - - self.cnt = 0 - self.thred = 0.1 - self.joint_pos = [] - self.moving_time = 2.0 - self.num_joints = 4 - self.joint_lower_limits = [-1.5, -0.4, -1.1, -1.4] - self.joint_upper_limits = [1.5, 0.9, 0.8, 1.8] - self.initial_guesses = [[0.0] * self.num_joints] * 3 - self.initial_guesses[1][0] = np.deg2rad(-30) - self.initial_guesses[2][0] = np.deg2rad(30) - self.robot_des: mrd.ModernRoboticsDescription = getattr(mrd, 'px100') - - self.machine_state = "INIT" - - self.gripper_pressure: float = 0.5 - self.gripper_pressure_lower_limit: int = 150 - self.gripper_pressure_upper_limit: int = 350 - self.gripper_value = self.gripper_pressure_lower_limit + (self.gripper_pressure*(self.gripper_pressure_upper_limit - self.gripper_pressure_lower_limit)) - - pass - - - def quat2matrix(self,quat,pos): - position = pos[:,np.newaxis] - share_vector = np.array([0,0,0,1],dtype = float)[np.newaxis,:] - r = R.from_quat(quat) - rotation_matrix = r.as_matrix() - m34 = np.concatenate((rotation_matrix, position),axis = 1) - matrix = np.concatenate((m34,share_vector),axis = 0) - - return matrix - - - def js_cb(self, msg): - # if self.pickupFlag == False or self.placeFlag == False: - # return - - # print('joint state callback') - if len(msg.name) == 7: - self.joint_pos.clear() - for i in range(7): - self.joint_pos.append(msg.position[i]) + def pickup_cb(self): + # print('pickup_cb') # print(self.joint_pos) - - - # camera2marker matrix - def ar_cb(self, msg): - # print('ar_cb') - if not self.aruco_update: - return - # 更新相机与base的转换 - now = rclpy.time.Time() - try: - trans = self.tf_buffer.lookup_transform("px100/base_link","camera_link",now) - position = trans.transform.translation - orientation = trans.transform.rotation - position = np.array([position.x, position.y, position.z]) - orientation = np.array([orientation.x, orientation.y, orientation.z, orientation.w]) - self.camera2base_Matrix = self.quat2matrix(orientation, position) - self.aruco_update = False - # print("camera2base updated") - except : - print("pass") - pass - - # 这里处理接收到的坐标信息 - pose = msg.poses[0] - - position = np.array([pose.position.x, pose.position.y, pose.position.z]) - # 从 Pose 消息中获取方向信息 - orientation = np.array([pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]) - matrix = self.quat2matrix(orientation, position) - - self.marker2camera_Matrix = matrix - # print("camera2marker matrix: ", self.marker2camera_Matrix) - - #self.marker2base_Matrix = np.dot(self.camera2base_Matrix, np.dot(self.rot_matrix2, np.dot(self.marker2camera_Matrix, self.rot_matrix1))) - - self.marker2base_Matrix = np.matmul(np.matmul(self.camera2base_Matrix, self.rot_matrix2), self.marker2camera_Matrix) - self.marker2base_Matrix = np.matmul(self.marker2base_Matrix, self.rot_matrix1) - - # print('base2marker matrix:', self.base2marker_Matrix) - #print('m1 ', self.marker2camera_Matrix) - #print('m2 ', self.camera2base_Matrix) - return None - - - def place_cb(self): if len(self.joint_pos) == 7: + # 获取当前时间 + current_time = datetime.datetime.now() + duration = current_time - start_time + print(current_time, duration) + if duration > datetime.timedelta(seconds=30): + print('Time out! Forced exit!') + print(1/0) + + print(self.machine_state) match self.machine_state: - # case "INIT": - # if self.go_home_pos() == True: - # print('Go home pos done!') - # self.machine_state = "NEXT0" case "INIT": - if self.release(): - print('Release done!') - self.machine_state = "EXIT" + self.pantil_deg_cmd.pitch = 10.0 # 调整云台俯仰角 + self.pantil_deg_cmd.yaw = 0.0 + self.pantil_deg_cmd.speed = 10 + self.pantil_pub.publish(self.pantil_deg_cmd) + if self.go_sleep_pos() == True and self.release(): + print('Go slep pos done!') + self.aruco_update = True + + # print('base2marker_Matrix: ', self.base2marker_Matrix) + if np.array_equal(self.marker2base_Matrix, np.eye(4)): + print('no marker detected!') + self.machine_state = "INIT" + # time.sleep(3.0) + else: + print('marker detected!') + print('base2marker_Matrix: ', self.marker2base_Matrix) + self.machine_state = "NEXT0" + #self.machine_state = "NEXT4" + # time.sleep(3.0) + + case "NEXT0": + if self.set_group_pos([-1.0, 0.0, 0.8, -1.4]) == True: + print('NEXT0 control done!') + # self.machine_state = "NEXT1" + self.machine_state = "NEXT1" + #self.machine_state = "TEST1" + case "NEXT1": + angle = np.arctan2(self.marker2base_Matrix[1, 3], self.marker2base_Matrix[0, 3]) + if self.set_group_pos([angle, -0.3, 0.8, -1.4]) == True: + print('NEXT1 control done!') + self.machine_state = "NEXT2" + + case "NEXT2": + np.set_printoptions(precision=3) + print('base2marker_Matrix: ', self.marker2base_Matrix) + angle = np.arctan2(self.marker2base_Matrix[1, 3], self.marker2base_Matrix[0, 3]) + guess2 = [angle, -0.1, 0.0, 0.0] + mlist, mflag = self.matrix_control(self.marker2base_Matrix, custom_guess=guess2) + print('mlist2:', mlist) + self.fk = self.joint_to_pose(mlist) + np.set_printoptions(precision=3) + print('fk', self.fk) + # error = self.marker2base_Matrix - self.fk + np.set_printoptions(precision=3) + + if self.set_group_pos([mlist[0], mlist[1], mlist[2], mlist[3]]) and self.release(): + print('NEXT2 control done!') + # time.sleep(3.0) + self.grasp(0.7) + self.machine_state = "NEXT3" + time.sleep(1.0) + + case "NEXT3": + if self.set_group_pos([0.0, -0.2, 0.0, -1.2]) == True: + print('NEXT3 control done!') + self.machine_state = "NEXT4" + # time.sleep(3.0) + case "NEXT4": if self.go_sleep_pos() == True: print('Go sleep pos done!') - self.machine_state = "EXIT" - case 'EXIT': + self.machine_state = "QUIT" + # self.release() + # self.machine_state = "NEXT6" + # time.sleep(1.0) + case 'QUIT': + self.pub_timer.destroy() print(1/0) - self.place_timer.destroy() return - pass + + pass def set_single_pos(self, name, pos, blocking=True): @@ -1044,55 +706,40 @@ def matrix_control(self, T_sd, custom_guess: list[float]=None, execute: bool=Tru initial_guesses = [custom_guess] for guess in initial_guesses: - theta_list, success = mr.IKinSpace( - Slist=self.robot_des.Slist, - M=self.robot_des.M, - T=T_sd, - thetalist0=guess, - eomg=5.0, - ev=0.02, - ) - solution_found = True - print('success: ',success) - # Check to make sure a solution was found and that no joint limits were violated - if success: - print('success',success) - theta_list = self._wrap_theta_list(theta_list) - # solution_found = self._check_joint_limits(theta_list) - solution_found = True - else: - solution_found = False - if solution_found: - if execute: - joint_list = [theta_list[0],theta_list[1],theta_list[2], theta_list[3]] - self.set_group_pos(joint_list) - self.T_sb = T_sd - return theta_list, True - - # thetalist = np.array(guess).copy() - # i = 0 - # maxiterations = 50 - # Tsb = mr.FKinSpace(self.robot_des.M, self.robot_des.Slist, thetalist) - # err = Tsb[0:2, 3] - T_sd[0:2, 3] - # while sum(abs(err)) > 0.05 and i < maxiterations: - # if abs(err[0]) > 0.01: - # if err[0] < 0: - # thetalist = thetalist + np.array([0.0, 0.0, -0.01, 0.01]) - # else: - # thetalist = thetalist - np.array([0.0, 0.0, -0.01, 0.01]) - # if abs(err[2]) > 0.01: - # if err[2] < 0: - # thetalist = thetalist + np.array([0.0, 0.0, -0.01, 0.0]) - # else: - # thetalist = thetalist - np.array([0.0, 0.0, -0.01, 0.0]) - # i = i + 1 - # return (thetalist, not err) + thetalist = np.array(guess).copy() + i = 0 + maxiterations = 400 + Tsb = mr.FKinSpace(self.robot_des.M, self.robot_des.Slist, thetalist) + err = Tsb[0:3, 3] - T_sd[0:3, 3] + print(err) + step = 0.005 + while sum(abs(err)) > 0.01 and i < maxiterations: + if abs(err[0]) > 0.005: + if err[0] < 0: + if thetalist[2] < 0: + if thetalist[1] < 0.9: + thetalist = thetalist + np.array([0.0, step, -step, 0.0]) + else: + thetalist = thetalist + np.array([0.0, 0.0, -step, step]) + else: + thetalist = thetalist - np.array([0.0, 0.0, -step, step]) + if abs(err[2]) > 0.005: + if err[2] < 0: + thetalist = thetalist + np.array([0.0, 0.0, -step, -step]) + else: + thetalist = thetalist - np.array([0.0, 0.0, -step, -step]) + Tsb = mr.FKinSpace(self.robot_des.M, self.robot_des.Slist, thetalist) + err = Tsb[0:3, 3] - T_sd[0:3, 3] + i = i + 1 + # print(i) + print('tlist', thetalist) + return thetalist, True # self.core.get_logger().warn('No valid pose could be found. Will not execute') - # return initial_guesses, False - return theta_list, False + return initial_guesses, False + # return theta_list, False def waist_control(self, pos): @@ -1217,8 +864,258 @@ def _wrap_theta_list(self, theta_list: list[np.ndarray]) -> list[np.ndarray]: +class Spin(Node): + def __init__(self): + super().__init__("Spin") + + self.marker2camera_Matrix = np.eye(4) + self.camera2base_Matrix = np.eye(4) + self.marker2base_Matrix = np.eye(4) + # self.rot_matrix1 = np.array([[1, 0, 0, -0.01], + # [0, 1, 0, 0], + # [0, 0, 1, 0], + # [0, 0, 0, 1]]) + self.rot_matrix1 = np.array([[0, 0, 1, 0], + [0, 1, 0, 0], + [-1, 0, 0, 0], + [0, 0, 0, 1]]) + + self.rot_matrix2 = np.array([[0, 0, 1, 0], + [-1, 0, 0, 0], + [0, -1, 0, 0], + [0, 0, 0, 1]]) + self.aruco_update = True + + self.fb_sub = self.create_subscription(JointState, "/joint_states", self.js_cb, 10) + self.marker_sub = self.create_subscription(PoseArray,"/aruco_poses",self.ar_cb, 10) + self.tf_buffer = Buffer() + self.tf_listener = TransformListener(self.tf_buffer, self) + + self.cmd_vel = self.create_publisher('cmd_vel_mux/input/navi', Twist, queue_size=10) + self.place_timer = self.create_timer(0.1, self.place_cb) + + self.ar_pos = None + self.ar_quat = None + + self.thred = 0.1 + self.joint_pos = [] + self.moving_time = 2.0 + self.num_joints = 4 + self.joint_lower_limits = [-1.5, -0.4, -1.1, -1.4] + self.joint_upper_limits = [1.5, 0.9, 0.8, 1.8] + self.initial_guesses = [[0.0] * self.num_joints] * 3 + self.initial_guesses[1][0] = np.deg2rad(-30) + self.initial_guesses[2][0] = np.deg2rad(30) + self.robot_des: mrd.ModernRoboticsDescription = getattr(mrd, 'px100') + + pass + + + def quat2matrix(self,quat,pos): + position = pos[:,np.newaxis] + share_vector = np.array([0,0,0,1],dtype = float)[np.newaxis,:] + r = R.from_quat(quat) + rotation_matrix = r.as_matrix() + m34 = np.concatenate((rotation_matrix, position),axis = 1) + matrix = np.concatenate((m34,share_vector),axis = 0) + + return matrix + + + def js_cb(self, msg): + # if self.pickupFlag == False or self.placeFlag == False: + # return + + # print('joint state callback') + if len(msg.name) == 7: + self.joint_pos.clear() + for i in range(7): + self.joint_pos.append(msg.position[i]) + # print(self.joint_pos) + + + # camera2marker matrix + def ar_cb(self, msg): + # print('ar_cb') + if not self.aruco_update: + return + # 更新相机与base的转换 + now = rclpy.time.Time() + try: + trans = self.tf_buffer.lookup_transform("px100/base_link","camera_link",now) + position = trans.transform.translation + orientation = trans.transform.rotation + position = np.array([position.x, position.y, position.z]) + orientation = np.array([orientation.x, orientation.y, orientation.z, orientation.w]) + self.camera2base_Matrix = self.quat2matrix(orientation, position) + self.aruco_update = False + # print("camera2base updated") + except : + print("pass") + pass + + # 这里处理接收到的坐标信息 + pose = msg.poses[0] + + position = np.array([pose.position.x, pose.position.y, pose.position.z]) + # 从 Pose 消息中获取方向信息 + orientation = np.array([pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]) + matrix = self.quat2matrix(orientation, position) + + self.marker2camera_Matrix = matrix + # print("camera2marker matrix: ", self.marker2camera_Matrix) + + #self.marker2base_Matrix = np.dot(self.camera2base_Matrix, np.dot(self.rot_matrix2, np.dot(self.marker2camera_Matrix, self.rot_matrix1))) + + self.marker2base_Matrix = np.matmul(np.matmul(self.camera2base_Matrix, self.rot_matrix2), self.marker2camera_Matrix) + self.marker2base_Matrix = np.matmul(self.marker2base_Matrix, self.rot_matrix1) + + # print('base2marker matrix:', self.base2marker_Matrix) + #print('m1 ', self.marker2camera_Matrix) + #print('m2 ', self.camera2base_Matrix) + return None + + + def place_cb(self): + # Twist is a datatype for velocity + move_cmd = Twist() + # let's turn at 0 radians/s + move_cmd.angular.z = radians(45); #45 deg/s in radians/s + + if np.array_equal(self.marker2base_Matrix, np.eye(4)): + print('no marker detected!') + elif self.marker2base_Matrix[0, 0] > 0.1: + # publish the velocity + self.cmd_vel.publish(move_cmd) + else: + self.place_timer.destroy() + print(1/0) + return + + + +class Move(Node): + def __init__(self): + super().__init__("Move") + + self.marker2camera_Matrix = np.eye(4) + self.camera2base_Matrix = np.eye(4) + self.marker2base_Matrix = np.eye(4) + + self.rot_matrix1 = np.array([[0, 0, 1, 0], + [0, 1, 0, 0], + [-1, 0, 0, 0], + [0, 0, 0, 1]]) + + self.rot_matrix2 = np.array([[0, 0, 1, 0], + [-1, 0, 0, 0], + [0, -1, 0, 0], + [0, 0, 0, 1]]) + self.aruco_update = True + + self.marker_sub = self.create_subscription(PoseArray,"/aruco_poses",self.ar_cb, 10) + self.tf_buffer = Buffer() + self.tf_listener = TransformListener(self.tf_buffer, self) + + self.move_timer = self.create_timer(0.1, self.move_cb) + + self.cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10) + + self.ar_pos = None + self.ar_quat = None + # Twist is a datatype for velocity + self.move_cmd = Twist() + + self.thred = 0.1 + self.joint_pos = [] + self.moving_time = 2.0 + self.num_joints = 4 + self.joint_lower_limits = [-1.5, -0.4, -1.1, -1.4] + self.joint_upper_limits = [1.5, 0.9, 0.8, 1.8] + self.initial_guesses = [[0.0] * self.num_joints] * 3 + self.initial_guesses[1][0] = np.deg2rad(-30) + self.initial_guesses[2][0] = np.deg2rad(30) + self.robot_des: mrd.ModernRoboticsDescription = getattr(mrd, 'px100') + + pass + + + def quat2matrix(self,quat,pos): + position = pos[:,np.newaxis] + share_vector = np.array([0,0,0,1],dtype = float)[np.newaxis,:] + r = R.from_quat(quat) + rotation_matrix = r.as_matrix() + m34 = np.concatenate((rotation_matrix, position),axis = 1) + matrix = np.concatenate((m34,share_vector),axis = 0) + + return matrix + + + # camera2marker matrix + def ar_cb(self, msg): + print('ar_cb', self.aruco_update) + # if not self.aruco_update: + # return + # 更新相机与base的转换 + now = rclpy.time.Time() + try: + trans = self.tf_buffer.lookup_transform("px100/base_link","camera_link",now) + position = trans.transform.translation + orientation = trans.transform.rotation + position = np.array([position.x, position.y, position.z]) + orientation = np.array([orientation.x, orientation.y, orientation.z, orientation.w]) + self.camera2base_Matrix = self.quat2matrix(orientation, position) + self.aruco_update = False + # print("camera2base updated") + except : + print("pass") + pass + + # 这里处理接收到的坐标信息 + pose = msg.poses[0] + + position = np.array([pose.position.x, pose.position.y, pose.position.z]) + # 从 Pose 消息中获取方向信息 + orientation = np.array([pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]) + matrix = self.quat2matrix(orientation, position) + + self.marker2camera_Matrix = matrix + self.marker2base_Matrix = np.matmul(np.matmul(self.camera2base_Matrix, self.rot_matrix2), self.marker2camera_Matrix) + self.marker2base_Matrix = np.matmul(self.marker2base_Matrix, self.rot_matrix1) + + return None + + + def move_cb(self): + print('move_cb') + print(self.marker2base_Matrix) + # 检查是否找到物块 + if np.array_equal(self.marker2base_Matrix, np.eye(4)): + print('no marker detected!') + self.move_cmd.linear.x = 0.0 + self.move_cmd.linear.y = 0.0 + self.cmd_pub.publish(self.move_cmd) + else: + relative = [self.marker2base_Matrix[0,3], + self.marker2base_Matrix[1,3]] + print(relative) + # 判断是否足够接近物块 + if (relative[0] < 0.2) and (abs(relative[1]) < 0.2): + self.move_cmd.linear.x = 0.0 + self.move_cmd.linear.y = 0.0 + self.cmd_pub.publish(self.move_cmd) + # self.place_timer.destroy() + print(1/0) + return + else: + # let's turn at 0 radians/s + self.move_cmd.linear.x = relative[0] - 0.2 + self.move_cmd.linear.y = relative[1] - 0.2 + self.cmd_pub.publish(self.move_cmd) + + -def spin(controller): +def rclpySpin(controller): rclpy.spin(controller) @@ -1226,12 +1123,19 @@ def main(): rclpy.init(args=None) + ### 初始状态 ### + place = Place() + place_thread = Thread(target=rclpySpin, args=(place,)) + place_thread.start() + place_thread.join() + + # 创建导航器并等待其激活 navigator = BasicNavigator() navigator.waitUntilNav2Active() - ### 前往S点 ### + ## 前往S点 ### navigator.goToPose(PointS) while not navigator.isTaskComplete(): feedback = navigator.getFeedback() @@ -1271,20 +1175,21 @@ def main(): print('Goal has an invalid return status!') - # move = Move() - # move_thread = Thread(target=spin, args=(move,)) - # move_thread.start() - # move_thread.join() + move = Move() + move_thread = Thread(target=rclpySpin, args=(move,)) + move_thread.start() + move_thread.join() + global start_time start_time = datetime.datetime.now() pickup = Pickup() - pickup_thread = Thread(target=spin, args=(pickup,)) + pickup_thread = Thread(target=rclpySpin, args=(pickup,)) pickup_thread.start() pickup_thread.join() - # ### 前往B点 ### + ### 前往B点 ### navigator.goToPose(PointB) while not navigator.isTaskComplete(): feedback = navigator.getFeedback() @@ -1305,7 +1210,7 @@ def main(): place = Place() - place_thread = Thread(target=spin, args=(place,)) + place_thread = Thread(target=rclpySpin, args=(place,)) place_thread.start() place_thread.join()