-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDeltaArray.py
134 lines (104 loc) · 4.88 KB
/
DeltaArray.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python
import time
import serial
import math
import numpy as np
class DeltaArray:
def __init__(self, port):
self.ser = serial.Serial(port, 57600) # open serial port
self.current_joint_positions = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.current_joint_velocities = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
self.minimum_joint_position = 0.005
self.maximum_joint_position = 0.0988
self.done_moving = False
time.sleep(1.0)
self.ser.reset_input_buffer() #clear incoming usb data
self.ser.reset_output_buffer() #clear outgoing usb data
def reset(self):
self.ser.write(b'<r>')
def stop(self):
self.ser.write(b'<s,1>')
def start(self):
self.ser.write(b'<s,0>')
def move_joint_position(self, desired_joint_positions, durations):
desired_joint_positions = np.clip(desired_joint_positions,self.minimum_joint_position,self.maximum_joint_position)
combined_array = np.hstack((np.array(durations).reshape(-1,1),np.array(desired_joint_positions)))
num_trajectory_points = combined_array.shape[0]
compressed_array = np.float32(combined_array.flatten())
msg = '<p,' + str(num_trajectory_points) + ','
for number in compressed_array:
msg += str(round(number, 4)) + ','
msg = msg[:-1] + '>'
#num_messages = int(np.ceil(len(msg) / 60.0))
# for i in range(num_messages):
# print("outgoing message: " + msg)
self.ser.write(msg.encode())
time.sleep(0.1)
def move_joint_speed_position(self, desired_joint_positions, speeds):
desired_joint_positions = np.clip(desired_joint_positions,self.minimum_joint_position,self.maximum_joint_position)
combined_array = np.hstack((np.array(speeds),np.array(desired_joint_positions)))
num_trajectory_points = combined_array.shape[0]
compressed_array = np.float32(combined_array.flatten())
msg = '<m,' + str(num_trajectory_points) + ','
for number in compressed_array:
msg += str(round(number, 4)) + ','
msg = msg[:-1] + '>'
num_messages = int(np.ceil(len(msg) / 60.0))
print("MSG: "+msg)
for i in range(num_messages):
self.ser.write(msg[i*60:(i+1)*60].encode())
time.sleep(0.1)
def move_joint_velocity(self, desired_joint_velocities, durations):
combined_array = np.hstack((np.array(durations).reshape(-1,1),np.array(desired_joint_velocities)))
num_trajectory_points = combined_array.shape[0]
compressed_array = np.float32(combined_array.flatten())
msg = '<v,' + str(num_trajectory_points) + ','
for number in compressed_array:
msg += str(round(number, 9)) + ','
msg = msg[:-1] + '>'
num_messages = int(np.ceil(len(msg) / 60.0))
for i in range(num_messages):
self.ser.write(msg[i*60:(i+1)*60].encode())
time.sleep(0.1)
def close(self):
self.ser.close()
def update_joint_positions_and_velocities(self):
#line = self.ser.readline() # read and throw away first incomplete line
while True:
line = self.ser.readline() #CHANGE: test complete lines
#print(line)
try:
stringline = bytes.decode(line)
#print(stringline)
if stringline[0] == 'j':
numbers = np.array(stringline[2:].strip().split(','))
for i in range(12):
self.current_joint_positions[i] = numbers[i*2]
self.current_joint_velocities[i] = numbers[i*2+1]
return False
elif stringline[0] == 'd':
#print(stringline[0:])
#breakpoint()
print('done')
self.done_moving = True
return True
except:
pass # Readline did not return a valid string.
def get_joint_positions(self):
self.ser.reset_input_buffer() #deprecated
self.update_joint_positions_and_velocities()
return self.current_joint_positions
def get_joint_velocities(self):
self.ser.reset_input_buffer() #deprecated
self.update_joint_positions_and_velocities()
return self.current_joint_velocities
def wait_until_done_moving(self, timeout = 6):
self.done_moving = False
start_time = time.time()
elapsed_time = 0.0
#pid loop is not well-tuned, timeout of 6 seconds may not be enough
self.ser.reset_input_buffer()
while not self.done_moving:# and elapsed_time < timeout:
self.done_moving = self.update_joint_positions_and_velocities()
elapsed_time = time.time() - start_time
print(elapsed_time)