Skip to content
This repository has been archived by the owner on Jul 27, 2024. It is now read-only.

Commit

Permalink
#13 实现角度微调闭环控制
Browse files Browse the repository at this point in the history
  • Loading branch information
HuaYuXiao authored Dec 26, 2023
1 parent f5f03e5 commit 88960ac
Showing 1 changed file with 129 additions and 144 deletions.
273 changes: 129 additions & 144 deletions tb4_find_pickup_place/tb4_find_pickup_place/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,7 @@ def __init__(self):
self.marker2camera_Matrix = np.eye(4)
self.camera2base_Matrix = np.eye(4)
self.marker2base_Matrix = np.eye(4)
# self.rot_matrix1 = np.array([[1, 0, 0, -0.01],
# [0, 1, 0, 0],
# [0, 0, 1, 0],
# [0, 0, 0, 1]])

self.rot_matrix1 = np.array([[0, 0, 1, 0],
[0, 1, 0, 0],
[-1, 0, 0, 0],
Expand All @@ -884,18 +881,19 @@ def __init__(self):
[-1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, 0, 1]])
self.aruco_update = True

self.fb_sub = self.create_subscription(JointState, "/joint_states", self.js_cb, 10)
self.marker_sub = self.create_subscription(PoseArray,"/aruco_poses",self.ar_cb, 10)
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)

self.cmd_vel = self.create_publisher('cmd_vel_mux/input/navi', Twist, queue_size=10)
self.place_timer = self.create_timer(0.1, self.place_cb)
self.spin_timer = self.create_timer(0.1, self.spin_cb)

self.cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)

self.ar_pos = None
self.ar_quat = None
# Twist is a datatype for velocity
self.move_cmd = Twist()

self.thred = 0.1
self.joint_pos = []
Expand All @@ -922,24 +920,9 @@ def quat2matrix(self,quat,pos):
return matrix


def js_cb(self, msg):
# if self.pickupFlag == False or self.placeFlag == False:
# return

# print('joint state callback')
if len(msg.name) == 7:
self.joint_pos.clear()
for i in range(7):
self.joint_pos.append(msg.position[i])
# print(self.joint_pos)


# camera2marker matrix
def ar_cb(self, msg):
# print('ar_cb')
if not self.aruco_update:
return
# 更新相机与base的转换
print('ar_cb')
now = rclpy.time.Time()
try:
trans = self.tf_buffer.lookup_transform("px100/base_link","camera_link",now)
Expand All @@ -963,35 +946,38 @@ def ar_cb(self, msg):
matrix = self.quat2matrix(orientation, position)

self.marker2camera_Matrix = matrix
# print("camera2marker matrix: ", self.marker2camera_Matrix)

#self.marker2base_Matrix = np.dot(self.camera2base_Matrix, np.dot(self.rot_matrix2, np.dot(self.marker2camera_Matrix, self.rot_matrix1)))

self.marker2base_Matrix = np.matmul(np.matmul(self.camera2base_Matrix, self.rot_matrix2), self.marker2camera_Matrix)
self.marker2base_Matrix = np.matmul(self.marker2base_Matrix, self.rot_matrix1)

# print('base2marker matrix:', self.base2marker_Matrix)
#print('m1 ', self.marker2camera_Matrix)
#print('m2 ', self.camera2base_Matrix)
return None


def place_cb(self):
# Twist is a datatype for velocity
move_cmd = Twist()
# let's turn at 0 radians/s
move_cmd.angular.z = radians(45); #45 deg/s in radians/s

def spin_cb(self):
print('spin_cb', self.marker2base_Matrix)
print(np.deg2rad(10.0))
# 检查是否找到物块
if np.array_equal(self.marker2base_Matrix, np.eye(4)):
print('no marker detected!')
elif self.marker2base_Matrix[0, 0] > 0.1:
# publish the velocity
self.cmd_vel.publish(move_cmd)
self.move_cmd.angular.z = 0.3
self.cmd_pub.publish(self.move_cmd)
else:
self.place_timer.destroy()
print(1/0)
return

relative = [self.marker2base_Matrix[0,3],
self.marker2base_Matrix[1,3]]
angle = np.arctan2(relative[1], relative[0])
print('position: ', relative, 'angle: ', angle)
print(np.deg2rad(10.0))
# 判断是否足够接近物块
if abs(angle) < np.deg2rad(10.0):
self.move_cmd.linear.x = 0.0
self.move_cmd.linear.y = 0.0
self.cmd_pub.publish(self.move_cmd)
# self.place_timer.destroy()
print(1/0)
return
else:
# let's turn at 0 radians/s
self.move_cmd.angular.z = 0.3
self.cmd_pub.publish(self.move_cmd)


class Move(Node):
Expand All @@ -1011,7 +997,6 @@ def __init__(self):
[-1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, 0, 1]])
self.aruco_update = True

self.marker_sub = self.create_subscription(PoseArray,"/aruco_poses",self.ar_cb, 10)
self.tf_buffer = Buffer()
Expand Down Expand Up @@ -1053,10 +1038,7 @@ def quat2matrix(self,quat,pos):

# camera2marker matrix
def ar_cb(self, msg):
print('ar_cb', self.aruco_update)
# if not self.aruco_update:
# return
# 更新相机与base的转换
print('ar_cb')
now = rclpy.time.Time()
try:
trans = self.tf_buffer.lookup_transform("px100/base_link","camera_link",now)
Expand All @@ -1065,8 +1047,6 @@ def ar_cb(self, msg):
position = np.array([position.x, position.y, position.z])
orientation = np.array([orientation.x, orientation.y, orientation.z, orientation.w])
self.camera2base_Matrix = self.quat2matrix(orientation, position)
self.aruco_update = False
# print("camera2base updated")
except :
print("pass")
pass
Expand All @@ -1087,8 +1067,7 @@ def ar_cb(self, msg):


def move_cb(self):
print('move_cb')
print(self.marker2base_Matrix)
print('move_cb', self.marker2base_Matrix)
# 检查是否找到物块
if np.array_equal(self.marker2base_Matrix, np.eye(4)):
print('no marker detected!')
Expand All @@ -1114,7 +1093,6 @@ def move_cb(self):
self.cmd_pub.publish(self.move_cmd)



def rclpySpin(controller):
rclpy.spin(controller)

Expand All @@ -1131,108 +1109,115 @@ def main():


# 创建导航器并等待其激活
navigator = BasicNavigator()
navigator.waitUntilNav2Active()
# navigator = BasicNavigator()
# navigator.waitUntilNav2Active()


## 前往S点 ###
navigator.goToPose(PointS)
while not navigator.isTaskComplete():
feedback = navigator.getFeedback()
if feedback:
# Some navigation timeout to demo cancellation
if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
navigator.cancelTask()
# Do something depending on the return code
result = navigator.getResult()
if result == TaskResult.SUCCEEDED:
print('Goal reached')
elif result == TaskResult.CANCELED:
print('Goal was canceled!')
elif result == TaskResult.FAILED:
print('Goal failed!')
else:
print('Goal has an invalid return status!')
# navigator.goToPose(PointS)
# while not navigator.isTaskComplete():
# feedback = navigator.getFeedback()
# if feedback:
# # Some navigation timeout to demo cancellation
# if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
# navigator.cancelTask()
# # Do something depending on the return code
# result = navigator.getResult()
# if result == TaskResult.SUCCEEDED:
# print('Goal reached')
# elif result == TaskResult.CANCELED:
# print('Goal was canceled!')
# elif result == TaskResult.FAILED:
# print('Goal failed!')
# else:
# print('Goal has an invalid return status!')


### 前往A点 ###
navigator.goToPose(PointA)
while not navigator.isTaskComplete():
feedback = navigator.getFeedback()
if feedback:
# Some navigation timeout to demo cancellation
if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
navigator.cancelTask()
# Do something depending on the return code
result = navigator.getResult()
if result == TaskResult.SUCCEEDED:
print('Goal reached')
elif result == TaskResult.CANCELED:
print('Goal was canceled!')
elif result == TaskResult.FAILED:
print('Goal failed!')
else:
print('Goal has an invalid return status!')


move = Move()
move_thread = Thread(target=rclpySpin, args=(move,))
move_thread.start()
move_thread.join()


global start_time
start_time = datetime.datetime.now()
pickup = Pickup()
pickup_thread = Thread(target=rclpySpin, args=(pickup,))
pickup_thread.start()
pickup_thread.join()
# navigator.goToPose(PointA)
# while not navigator.isTaskComplete():
# feedback = navigator.getFeedback()
# if feedback:
# # Some navigation timeout to demo cancellation
# if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
# navigator.cancelTask()
# # Do something depending on the return code
# result = navigator.getResult()
# if result == TaskResult.SUCCEEDED:
# print('Goal reached')
# elif result == TaskResult.CANCELED:
# print('Goal was canceled!')
# elif result == TaskResult.FAILED:
# print('Goal failed!')
# else:
# print('Goal has an invalid return status!')


spin = Spin()
spin_thread = Thread(target=rclpySpin, args=(spin,))
spin_thread.start()
spin_thread.join()



# move = Move()
# move_thread = Thread(target=rclpySpin, args=(move,))
# move_thread.start()
# move_thread.join()


# global start_time
# start_time = datetime.datetime.now()
# pickup = Pickup()
# pickup_thread = Thread(target=rclpySpin, args=(pickup,))
# pickup_thread.start()
# pickup_thread.join()


### 前往B点 ###
navigator.goToPose(PointB)
while not navigator.isTaskComplete():
feedback = navigator.getFeedback()
if feedback:
# Some navigation timeout to demo cancellation
if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
navigator.cancelTask()
# Do something depending on the return code
result = navigator.getResult()
if result == TaskResult.SUCCEEDED:
print('Goal reached')
elif result == TaskResult.CANCELED:
print('Goal was canceled!')
elif result == TaskResult.FAILED:
print('Goal failed!')
else:
print('Goal has an invalid return status!')


place = Place()
place_thread = Thread(target=rclpySpin, args=(place,))
place_thread.start()
place_thread.join()
# navigator.goToPose(PointB)
# while not navigator.isTaskComplete():
# feedback = navigator.getFeedback()
# if feedback:
# # Some navigation timeout to demo cancellation
# if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
# navigator.cancelTask()
# # Do something depending on the return code
# result = navigator.getResult()
# if result == TaskResult.SUCCEEDED:
# print('Goal reached')
# elif result == TaskResult.CANCELED:
# print('Goal was canceled!')
# elif result == TaskResult.FAILED:
# print('Goal failed!')
# else:
# print('Goal has an invalid return status!')


# place = Place()
# place_thread = Thread(target=rclpySpin, args=(place,))
# place_thread.start()
# place_thread.join()


### 前往S点 ###
navigator.goToPose(PointS)
while not navigator.isTaskComplete():
feedback = navigator.getFeedback()
if feedback:
# Some navigation timeout to demo cancellation
if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
navigator.cancelTask()
# Do something depending on the return code
result = navigator.getResult()
if result == TaskResult.SUCCEEDED:
print('Goal reached')
elif result == TaskResult.CANCELED:
print('Goal was canceled!')
elif result == TaskResult.FAILED:
print('Goal failed!')
else:
print('Goal has an invalid return status!')
# navigator.goToPose(PointS)
# while not navigator.isTaskComplete():
# feedback = navigator.getFeedback()
# if feedback:
# # Some navigation timeout to demo cancellation
# if Duration.from_msg(feedback.navigation_time) > Duration(seconds=60.0):
# navigator.cancelTask()
# # Do something depending on the return code
# result = navigator.getResult()
# if result == TaskResult.SUCCEEDED:
# print('Goal reached')
# elif result == TaskResult.CANCELED:
# print('Goal was canceled!')
# elif result == TaskResult.FAILED:
# print('Goal failed!')
# else:
# print('Goal has an invalid return status!')


if __name__ == '__main__':
Expand Down

0 comments on commit 88960ac

Please sign in to comment.