forked from Sensirion/libsensors-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_example.py
executable file
·43 lines (33 loc) · 1.17 KB
/
async_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#! /usr/bin/python
# -*- coding: utf-8 -*-
"""
Sample app to asynchronously read out sensor values.
All sensors are polled on their own thread with individual polling frequencies.The program blocks until cancelled
by a keyboard interrupt.
"""
from sensirion_sensors import find_sensor_by_type, SensorReader
import sys
def main():
sfm = find_sensor_by_type('sfm3000')
sdp = find_sensor_by_type('sdp600')
sht = find_sensor_by_type('sht3x')
if not all((sfm, sdp, sht)):
sys.stderr.writelines("couldn't find all sensors\n")
return
def print_sensor_values(sensor_name):
def print_to_console(timestamp, values):
print "{0}: {1}".format(sensor_name, values.values()[0])
return True
return print_to_console
try:
sfm_reader = SensorReader((sfm,), 100, print_sensor_values('sfm'))
sdp_reader = SensorReader((sdp,), 10, print_sensor_values('sdp'))
sht_reader = SensorReader((sht,), 1, print_sensor_values('sht'))
sfm_reader.start()
sdp_reader.start()
sht_reader.run()
finally:
sfm_reader.join()
sdp_reader.join()
if __name__ == '__main__':
main()