Skip to content

Commit

Permalink
feat: add verification of the packet id (#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ernst79 authored Jan 18, 2024
1 parent 73d36fc commit c428f92
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 4 deletions.
21 changes: 18 additions & 3 deletions src/bthome_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ def __init__(self, bindkey: bytes | None = None) -> None:
# start at zero allow the first message after a restart.
self.encryption_counter = 0.0

# The packet_id is used to filter duplicate messages in BTHome V2.
self.packet_id: float | None = None

# If True, then we know the actual MAC of the device.
# On macOS, we don't unless the device includes it in the advertisement
# (CoreBluetooth uses UUID's generated by CoreBluetooth instead of the MAC)
Expand Down Expand Up @@ -243,7 +246,6 @@ def _parse_bthome_v1(
self.encryption_scheme = EncryptionScheme.NONE
self.set_device_sw_version("BTHome BLE v1")
payload = service_data
packet_id = None # noqa: F841
elif "0000181e-0000-1000-8000-00805f9b34fb" in uuid16:
# Encrypted BTHome BLE format
self.encryption_scheme = EncryptionScheme.BTHOME_BINDKEY
Expand All @@ -264,8 +266,6 @@ def _parse_bthome_v1(
)
except (ValueError, TypeError):
return True

packet_id = parse_uint(service_data[-8:-4]) # noqa: F841
else:
return False

Expand Down Expand Up @@ -430,6 +430,21 @@ def _parse_payload(self, payload: bytes, sw_version: int) -> bool:
if payload_length < next_obj_start:
_LOGGER.debug("Invalid payload data length, payload: %s", payload.hex())
break

# Filter BLE advertisements with packet_id that has already been parsed.
if obj_meas_type == 0:
last_packet_id = self.packet_id
new_packet_id = parse_uint(payload[obj_data_start:next_obj_start])
if new_packet_id == last_packet_id:
_LOGGER.debug(
"New counter_id %i is the same as the previous received counter_id %i. "
"BLE advertisement will be skipped",
new_packet_id,
last_packet_id,
)
break
self.packet_id = new_packet_id

measurements.append(
{
"data format": obj_data_format,
Expand Down
104 changes: 103 additions & 1 deletion tests/test_parser_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,108 @@ def test_too_short_encryption_advertisement(caplog):
assert "Invalid data length (for decryption), adv:" in caplog.text


def test_identical_packet_id(caplog):
"""Test BTHome parser for skipping BLE advertisement with identical counter_id."""
data_string = b"\x40\x00\x09"
advertisement = bytes_to_service_info(
data_string, local_name="ATC_8D18B2", address="A4:C1:38:8D:18:B2"
)

device = BTHomeBluetoothDeviceData()
assert device.update(advertisement) == SensorUpdate(
title="ATC 18B2",
devices={
None: SensorDeviceInfo(
name="ATC 18B2",
manufacturer="Xiaomi",
model="Temperature/Humidity sensor",
sw_version="BTHome BLE v2",
hw_version=None,
)
},
entity_descriptions={
KEY_PACKET_ID: SensorDescription(
device_key=KEY_PACKET_ID,
device_class=SensorDeviceClass.PACKET_ID,
native_unit_of_measurement=None,
),
KEY_SIGNAL_STRENGTH: SensorDescription(
device_key=KEY_SIGNAL_STRENGTH,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
},
entity_values={
KEY_PACKET_ID: SensorValue(
device_key=KEY_PACKET_ID, name="Packet Id", native_value=9
),
KEY_SIGNAL_STRENGTH: SensorValue(
device_key=KEY_SIGNAL_STRENGTH, name="Signal Strength", native_value=-60
),
},
)
assert device.packet_id == 9

# second advertisement with the same counter_id
device.update(advertisement)
assert device.packet_id == 9
assert (
"New counter_id 9 is the same as the previous received counter_id 9. BLE advertisement "
"will be skipped" in caplog.text
)


def test_increasing_packet_id(caplog):
"""Test BTHome parser for BLE advertisement with increasing counter_id."""
data_string = b"\x40\x00\x09"
advertisement = bytes_to_service_info(
data_string, local_name="ATC_8D18B2", address="A4:C1:38:8D:18:B2"
)

device = BTHomeBluetoothDeviceData()
device.update(advertisement)
assert device.packet_id == 9

data_string_2 = b"\x40\x00\x0A"
advertisement_2 = bytes_to_service_info(
data_string_2, local_name="ATC_8D18B2", address="A4:C1:38:8D:18:B2"
)

assert device.update(advertisement_2) == SensorUpdate(
title="ATC 18B2",
devices={
None: SensorDeviceInfo(
name="ATC 18B2",
manufacturer="Xiaomi",
model="Temperature/Humidity sensor",
sw_version="BTHome BLE v2",
hw_version=None,
)
},
entity_descriptions={
KEY_PACKET_ID: SensorDescription(
device_key=KEY_PACKET_ID,
device_class=SensorDeviceClass.PACKET_ID,
native_unit_of_measurement=None,
),
KEY_SIGNAL_STRENGTH: SensorDescription(
device_key=KEY_SIGNAL_STRENGTH,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
},
entity_values={
KEY_PACKET_ID: SensorValue(
device_key=KEY_PACKET_ID, name="Packet Id", native_value=10
),
KEY_SIGNAL_STRENGTH: SensorValue(
device_key=KEY_SIGNAL_STRENGTH, name="Signal Strength", native_value=-60
),
},
)
assert device.packet_id == 10


def test_bthome_wrong_object_id(caplog):
"""Test BTHome parser for a non-existing Object ID xFE."""
data_string = b"\x40\xFE\xca\x09"
Expand Down Expand Up @@ -3068,8 +3170,8 @@ def test_bthome_shelly_button(caplog):
)

device = BTHomeBluetoothDeviceData()
assert device.supported(advertisement) is True
update = device.update(advertisement)
assert device.supported(advertisement) is True
assert update == SensorUpdate(
title="BTHome sensor 18B2",
devices={
Expand Down

0 comments on commit c428f92

Please sign in to comment.