-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoak.py
137 lines (109 loc) · 5.62 KB
/
oak.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import argparse
from pathlib import Path
import cv2
import depthai as dai
import numpy as np
import time
import requests
class OakPipeline:
def __init__(self, host_url, api_key):
self.host_url = host_url
self.headers = {'Authorization': f'Bearer {api_key}'}
self.pipeline: dai.Pipeline = self.create_pipeline()
self.label_map = ["background", "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow",
"diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train", "tvmonitor"]
def create_pipeline(self) -> dai.Pipeline:
nnPathDefault = str((Path(__file__).parent / Path('./mobilenet/mobilenet-ssd_openvino_2021.4_6shave.blob')).resolve().absolute())
parser = argparse.ArgumentParser()
parser.add_argument('-nnPath', help="Path to mobilenet detection network blob", default=nnPathDefault)
parser.add_argument('-ff', '--full_frame', action="store_true", help="Perform tracking on full RGB frame", default=False)
args = parser.parse_args()
fullFrameTracking = args.full_frame
# Create pipeline
pipeline = dai.Pipeline()
# Define sources and outputs
camRgb = pipeline.create(dai.node.ColorCamera)
detectionNetwork = pipeline.create(dai.node.MobileNetDetectionNetwork)
objectTracker = pipeline.create(dai.node.ObjectTracker)
xlinkOut = pipeline.create(dai.node.XLinkOut)
trackerOut = pipeline.create(dai.node.XLinkOut)
xlinkOut.setStreamName("preview")
trackerOut.setStreamName("tracklets")
# Properties
camRgb.setPreviewSize(300, 300)
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.setFps(40)
# testing MobileNet DetectionNetwork
detectionNetwork.setBlobPath(args.nnPath)
detectionNetwork.setConfidenceThreshold(0.5)
detectionNetwork.input.setBlocking(False)
objectTracker.setDetectionLabelsToTrack([12, 15])
objectTracker.setTrackerType(dai.TrackerType.ZERO_TERM_COLOR_HISTOGRAM)
objectTracker.setTrackerIdAssignmentPolicy(dai.TrackerIdAssignmentPolicy.SMALLEST_ID)
# Linking
camRgb.preview.link(detectionNetwork.input)
objectTracker.passthroughTrackerFrame.link(xlinkOut.input)
if fullFrameTracking:
camRgb.video.link(objectTracker.inputTrackerFrame)
else:
detectionNetwork.passthrough.link(objectTracker.inputTrackerFrame)
detectionNetwork.passthrough.link(objectTracker.inputDetectionFrame)
detectionNetwork.out.link(objectTracker.inputDetections)
objectTracker.out.link(trackerOut.input)
return pipeline
def run(self, headless: bool):
with dai.Device(self.pipeline) as device:
preview = device.getOutputQueue("preview", 4, False)
tracklets = device.getOutputQueue("tracklets", 4, False)
startTime = time.monotonic()
counter = 0
fps = 0
frame = None
last_capture_time = 0
while(True):
imgFrame = preview.get()
track = tracklets.get()
counter+=1
current_time = time.monotonic()
if (current_time - startTime) > 1 :
fps = counter / (current_time - startTime)
counter = 0
startTime = current_time
color = (255, 0, 0)
frame = imgFrame.getCvFrame()
trackletsData = track.tracklets
# Check for tracklets and capture every 5 seconds
if trackletsData and current_time - last_capture_time > 5:
filename = f"image_{str(current_time)}.jpg"
self.send_image(frame, filename)
last_capture_time = current_time
if not headless:
for t in trackletsData:
roi = t.roi.denormalize(frame.shape[1], frame.shape[0])
x1 = int(roi.topLeft().x)
y1 = int(roi.topLeft().y)
x2 = int(roi.bottomRight().x)
y2 = int(roi.bottomRight().y)
try:
label = self.label_map[t.label]
except:
label = t.label
cv2.putText(frame, str(label), (x1 + 10, y1 + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, f"ID: {[t.id]}", (x1 + 10, y1 + 35), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, t.status.name, (x1 + 10, y1 + 50), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, cv2.FONT_HERSHEY_SIMPLEX)
cv2.putText(frame, "NN fps: {:.2f}".format(fps), (2, frame.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.4, color)
cv2.imshow("tracker", frame)
if cv2.waitKey(1) == ord('q'):
break
def send_image(self, image, filename):
retval, buffer = cv2.imencode('.jpg', image)
if retval:
image_bytes = np.array(buffer).tobytes()
response = requests.post(f'{self.host_url}/process-image', headers=self.headers, files={'image': (filename, image_bytes, 'image/jpeg')})
return response.content
else:
print("Failed to encode image")
return None