-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsps30_mp.py
288 lines (250 loc) · 11.1 KB
/
sps30_mp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
from ph4_sense.adapters import const, sleep_ms
from ph4_sense.sensors.sps30_base import SPS30
from ph4_sense.support.sensor_helper import SensorHelper
try:
from machine import I2C
from ustruct import unpack_from
except ImportError:
from struct import unpack_from
from busio import I2C
# __version__ = "0.0.0-auto.0"
# __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_SPS30.git"
SPS30_DEFAULT_ADDR = const(0x69)
class SPS30_I2C(SPS30):
# pylint: disable=too-many-instance-attributes
def __init__(
self,
i2c: I2C,
address=SPS30_DEFAULT_ADDR,
*,
auto_init=True,
fp_mode=True,
delays=True,
mode_change_delay=3500,
sensor_helper=None,
**kwargs,
):
super().__init__()
self._i2c = i2c
self._address = address
self._buffer = bytearray(60) # 10*(4+2)
self._cmd_buffer = bytearray(2 + 6)
self._fp_mode = None
self._mode_change_delay = mode_change_delay
self._m_size = None
self._m_total_size = None
self._m_fmt = None
self._delays = delays
self._starts = 0
self.sensor_helper = sensor_helper or SensorHelper()
self.last_response = None
_ = self._set_fp_mode_fields(fp_mode)
if auto_init:
# Send wake-up in case device was left in low power sleep mode
self.wakeup()
self.reset()
self.start(fp_mode, stop_first=False)
self.firmware_version = self.read_firmware_version()
@property
def data_available(self):
"""Boolean indicating if data is available or None for invalid response."""
self._sps30_command(self.CMD_READ_DATA_READY_FLAG, rx_size=3)
self._buffer_check(3)
ready = None
if self._buffer[1] == 0x00:
ready = False
elif self._buffer[1] == 0x01:
ready = True
return ready
@property
def auto_cleaning_interval(self):
"""Read the auto cleaning interval."""
self._sps30_command(self.CMD_RW_AUTO_CLEANING_INTERVAL, rx_size=6)
self._buffer_check(6)
self._scrunch_buffer(6)
if self._delays:
sleep_ms(5)
return unpack_from(">I", self._buffer)[0]
@auto_cleaning_interval.setter
def auto_cleaning_interval(self, value):
"""Write the auto cleaning interval in seconds to SPS30 nvram (0 disables feature).
Data sheet notes for firmware before version 2.2:
"After writing a new interval, this will be activated immediately.
However, if the interval register is read out after setting the
new value, the previous value is returned until the next
start/reset of the sensor module."
"""
self._sps30_command(
self.CMD_RW_AUTO_CLEANING_INTERVAL,
arguments=((value >> 16) & 0xFFFF, value & 0xFFFF),
)
if self._delays:
sleep_ms(20)
def start(self, use_floating_point=None, *, stop_first=True):
"""Send start command to the SPS30.
This will already have been called by constructor
if auto_start is left to default value.
if stop_first is True (default value) a stop will be send first.
A stop is required if the device has previously been started
with a different use_floating_point mode.
Bogus data may be sent by the sensor for approximately one second after
changing the number format and this may cause CRC errors.
"""
if stop_first:
self.stop()
request_fp = self._fp_mode if use_floating_point is None else use_floating_point
output_format = 0x0300 if request_fp else 0x0500
self._sps30_command(self.CMD_START_MEASUREMENT, arguments=(output_format,))
mode_changed = self._set_fp_mode_fields(request_fp)
# Data sheet states command execution time < 20ms
if self._delays:
sleep_ms(20)
if (mode_changed or self._starts == 0) and self._mode_change_delay:
sleep_ms(self._mode_change_delay)
self._starts += 1
def clean(self, *, wait=True):
"""Start the fan cleaning and wait 15 seconds for it to complete.
Firmware 2.2 sets bit 19 of status register during this operation -
this is undocumented behaviour.
"""
self._sps30_command(self.CMD_START_FAN_CLEANING)
if wait:
delay = self.FAN_CLEAN_TIME if wait is True else wait
sleep_ms(delay)
def stop(self):
"""Send stop command to SPS30."""
self._sps30_command(self.CMD_STOP_MEASUREMENT)
# Data sheet states command execution time < 20ms
if self._delays:
sleep_ms(50)
def reset(self):
"""Perform a soft reset on the SPS30, restoring default values
and placing sensor in Idle mode as if it had just powered up.
The sensor must be started after a reset before data is read."""
self._sps30_command(self.CMD_SOFT_RESET)
# Data sheet states command execution time < 100ms
if self._delays:
sleep_ms(100)
def sleep(self):
"""Enters the Sleep-Mode with minimum power consumption."""
self._sps30_command(self.CMD_SLEEP)
# Data sheet states command execution time < 5ms
if self._delays:
sleep_ms(5)
def wakeup(self):
"""Switch from Sleep-Mode to Idle-Mode."""
# Data sheet has two methods to wake-up, one way is to
# intentionally send two consecutive wake-up commands
try:
self._sps30_command(self.CMD_WAKEUP)
except OSError:
pass # ignore any Errno 19 for first command
self._sps30_command(self.CMD_WAKEUP)
# Data sheet states command execution time < 5ms
if self._delays:
sleep_ms(5)
def read_firmware_version(self):
"""Read firmware version returning as two element tuple."""
self._sps30_command(self.CMD_READ_VERSION, rx_size=3, delay=10)
self.sensor_helper.log_info("SPS30 firmware: %s", self._buffer)
self._buffer_check(3)
return self._buffer[0], self._buffer[1]
def read_status_register(self):
"""Read 32bit status register."""
# The datasheet does not indicate a delay is required between write
# and read but the Sensirion library does this for some reason
# https://github.com/Sensirion/embedded-sps/blob/master/sps30-i2c/sps30.c
# https://github.com/Sensirion/arduino-sps/blob/master/sps30.cpp
self._sps30_command(self.CMD_READ_DEVICE_STATUS_REG, rx_size=6)
self._buffer_check(6)
self._scrunch_buffer(6)
return unpack_from(">I", self._buffer)[0]
def clear_status_register(self):
"""Clear 32bit status register."""
self._sps30_command(self.CMD_CLEAR_DEVICE_STATUS_REG)
# Data sheet states command execution time < 5ms
if self._delays:
sleep_ms(5)
def _set_fp_mode_fields(self, use_floating_point):
if self._fp_mode == use_floating_point:
return False
self._fp_mode = use_floating_point
self._m_size = 6 if self._fp_mode else 3
self._m_total_size = len(self.FIELD_NAMES) * self._m_size
self._m_parse_size = len(self.FIELD_NAMES) * (self._m_size * 2 // 3)
self._m_fmt = ">" + ("f" if self._fp_mode else "H") * len(self.FIELD_NAMES)
return True
def _base_sps30_command(self, command, arguments=None, *, rx_size=0, delay: int = 5):
"""Set rx_size to None to read arbitrary amount of data up to max of _buffer size"""
self._cmd_buffer[0] = (command >> 8) & 0xFF
self._cmd_buffer[1] = command & 0xFF
tx_size = 2
# Add arguments if any
if arguments is not None:
for arg in arguments:
self._cmd_buffer[tx_size] = (arg >> 8) & 0xFF
tx_size += 1
self._cmd_buffer[tx_size] = arg & 0xFF
tx_size += 1
self._cmd_buffer[tx_size] = self._crc8(self._cmd_buffer, start=tx_size - 2, end=tx_size)
tx_size += 1
# The write_then_readinto method cannot be used as the SPS30
# does not like it based on real tests using self._CMD_READ_VERSION
# This is probably due to lack of support for i2c repeated start
to_send = memoryview(self._cmd_buffer)[:tx_size]
# self.sensor_helper.log_info("SPS30 send %s %s", len(to_send), bytearray(to_send))
self._i2c.writeto(self._address, to_send)
if delay:
sleep_ms(delay)
if rx_size != 0:
recv_buffer = memoryview(self._buffer)[:rx_size]
self._i2c.readfrom_into(self._address, recv_buffer)
# self.last_response = bytearray(recv_buffer)
# self.sensor_helper.log_info("SPS30 received %s", self.last_response)
def _sps30_command(self, command, arguments=None, *, rx_size=0, retry=SPS30.DEFAULT_RETRIES, delay: int = 5):
for attempt in range(retry):
try:
return self._base_sps30_command(command, arguments, rx_size=rx_size, delay=delay)
except Exception as e:
self.sensor_helper.log_error(f"Attempt {attempt} failed {e}")
if attempt + 1 >= retry:
raise
else:
continue
def _read_into_buffer(self):
data_len = self._m_total_size
self._sps30_command(self.CMD_READ_MEASURED_VALUES, rx_size=data_len)
self._buffer_check(data_len)
def _scrunch_buffer(self, raw_data_len):
"""Move all the data from 0:raw_data_len to one contiguous sequence at
the start of the buffer.
This will overwrite some of the interleaved crcs."""
dst_idx = 2
for src_idx in range(3, raw_data_len, 3):
self._buffer[dst_idx : dst_idx + 2] = self._buffer[src_idx : src_idx + 2]
dst_idx += 2
def _read_parse_data(self, output):
self._scrunch_buffer(self._m_total_size)
# buffer will be longer than the data hence the use of unpack_from
for key, val in zip(self.FIELD_NAMES, unpack_from(self._m_fmt, self._buffer)):
output[key] = val
def _buffer_check(self, raw_data_len):
if raw_data_len % 3 != 0:
raise RuntimeError("Data length not a multiple of three")
for st_chunk in range(0, raw_data_len, 3):
if self._buffer[st_chunk + 2] != self._crc8(self._buffer, st_chunk, st_chunk + 2):
self.sensor_helper.log_error(
f"CRC mismatch, dl {raw_data_len}, buffer: {memoryview(self._buffer)[:raw_data_len]}"
)
raise RuntimeError("CRC mismatch in data at offset " + str(st_chunk))
@staticmethod
def _crc8(buffer, start=None, end=None):
crc = 0xFF
for idx in range(0 if start is None else start, len(buffer) if end is None else end):
crc ^= buffer[idx]
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc = crc << 1
return crc & 0xFF # return the bottom 8 bits