forked from peterhinch/micropython-async
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapoll.py
64 lines (53 loc) · 2.43 KB
/
apoll.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Demonstration of a device driver using a coroutine to poll a dvice.
# Runs on Pyboard: displays results from the onboard accelerometer.
# Uses crude filtering to discard noisy data.
# Author: Peter Hinch
# Copyright Peter Hinch 2017 Released under the MIT license
import uasyncio as asyncio
import pyb
import utime as time
class Accelerometer(object):
threshold_squared = 16
def __init__(self, accelhw, timeout):
self.loop = asyncio.get_event_loop()
self.accelhw = accelhw
self.timeout = timeout
self.last_change = self.loop.time()
self.coords = [accelhw.x(), accelhw.y(), accelhw.z()]
def dsquared(self, xyz): # Return the square of the distance between this and a passed
return sum(map(lambda p, q : (p-q)**2, self.coords, xyz)) # acceleration vector
def poll(self): # Device is noisy. Only update if change exceeds a threshold
xyz = [self.accelhw.x(), self.accelhw.y(), self.accelhw.z()]
if self.dsquared(xyz) > Accelerometer.threshold_squared:
self.coords = xyz
self.last_change = self.loop.time()
return 0
return time.ticks_diff(self.loop.time(), self.last_change)
def vector(self):
return self.coords
def timed_out(self): # Time since last change or last timeout report
if time.ticks_diff(self.loop.time(), self.last_change) > self.timeout:
self.last_change = self.loop.time()
return True
return False
async def accel_coro(timeout = 2000):
loop = asyncio.get_event_loop()
accelhw = pyb.Accel() # Instantiate accelerometer hardware
await asyncio.sleep_ms(30) # Allow it to settle
accel = Accelerometer(accelhw, timeout)
while True:
result = accel.poll()
if result == 0: # Value has changed
x, y, z = accel.vector()
print("Value x:{:3d} y:{:3d} z:{:3d}".format(x, y, z))
elif accel.timed_out(): # Report every 2 secs
print("Timeout waiting for accelerometer change")
await asyncio.sleep_ms(100) # Poll every 100ms
async def main(delay):
print('Testing accelerometer for {} secs. Move the Pyboard!'.format(delay))
print('Test runs for 20s.')
await asyncio.sleep(delay)
print('Test complete!')
loop = asyncio.get_event_loop()
loop.create_task(accel_coro())
loop.run_until_complete(main(20))