From c428f927b8dbc4f02ef006c15932aca5fa74877f Mon Sep 17 00:00:00 2001 From: Ernst Klamer Date: Thu, 18 Jan 2024 08:40:06 +0100 Subject: [PATCH] feat: add verification of the packet id (#105) --- src/bthome_ble/parser.py | 21 ++++++-- tests/test_parser_v2.py | 104 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 4 deletions(-) diff --git a/src/bthome_ble/parser.py b/src/bthome_ble/parser.py index dc5014f..ae47fed 100644 --- a/src/bthome_ble/parser.py +++ b/src/bthome_ble/parser.py @@ -142,6 +142,9 @@ def __init__(self, bindkey: bytes | None = None) -> None: # start at zero allow the first message after a restart. self.encryption_counter = 0.0 + # The packet_id is used to filter duplicate messages in BTHome V2. + self.packet_id: float | None = None + # If True, then we know the actual MAC of the device. # On macOS, we don't unless the device includes it in the advertisement # (CoreBluetooth uses UUID's generated by CoreBluetooth instead of the MAC) @@ -243,7 +246,6 @@ def _parse_bthome_v1( self.encryption_scheme = EncryptionScheme.NONE self.set_device_sw_version("BTHome BLE v1") payload = service_data - packet_id = None # noqa: F841 elif "0000181e-0000-1000-8000-00805f9b34fb" in uuid16: # Encrypted BTHome BLE format self.encryption_scheme = EncryptionScheme.BTHOME_BINDKEY @@ -264,8 +266,6 @@ def _parse_bthome_v1( ) except (ValueError, TypeError): return True - - packet_id = parse_uint(service_data[-8:-4]) # noqa: F841 else: return False @@ -430,6 +430,21 @@ def _parse_payload(self, payload: bytes, sw_version: int) -> bool: if payload_length < next_obj_start: _LOGGER.debug("Invalid payload data length, payload: %s", payload.hex()) break + + # Filter BLE advertisements with packet_id that has already been parsed. + if obj_meas_type == 0: + last_packet_id = self.packet_id + new_packet_id = parse_uint(payload[obj_data_start:next_obj_start]) + if new_packet_id == last_packet_id: + _LOGGER.debug( + "New counter_id %i is the same as the previous received counter_id %i. " + "BLE advertisement will be skipped", + new_packet_id, + last_packet_id, + ) + break + self.packet_id = new_packet_id + measurements.append( { "data format": obj_data_format, diff --git a/tests/test_parser_v2.py b/tests/test_parser_v2.py index 37ffede..93b53cb 100644 --- a/tests/test_parser_v2.py +++ b/tests/test_parser_v2.py @@ -519,6 +519,108 @@ def test_too_short_encryption_advertisement(caplog): assert "Invalid data length (for decryption), adv:" in caplog.text +def test_identical_packet_id(caplog): + """Test BTHome parser for skipping BLE advertisement with identical counter_id.""" + data_string = b"\x40\x00\x09" + advertisement = bytes_to_service_info( + data_string, local_name="ATC_8D18B2", address="A4:C1:38:8D:18:B2" + ) + + device = BTHomeBluetoothDeviceData() + assert device.update(advertisement) == SensorUpdate( + title="ATC 18B2", + devices={ + None: SensorDeviceInfo( + name="ATC 18B2", + manufacturer="Xiaomi", + model="Temperature/Humidity sensor", + sw_version="BTHome BLE v2", + hw_version=None, + ) + }, + entity_descriptions={ + KEY_PACKET_ID: SensorDescription( + device_key=KEY_PACKET_ID, + device_class=SensorDeviceClass.PACKET_ID, + native_unit_of_measurement=None, + ), + KEY_SIGNAL_STRENGTH: SensorDescription( + device_key=KEY_SIGNAL_STRENGTH, + device_class=SensorDeviceClass.SIGNAL_STRENGTH, + native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT, + ), + }, + entity_values={ + KEY_PACKET_ID: SensorValue( + device_key=KEY_PACKET_ID, name="Packet Id", native_value=9 + ), + KEY_SIGNAL_STRENGTH: SensorValue( + device_key=KEY_SIGNAL_STRENGTH, name="Signal Strength", native_value=-60 + ), + }, + ) + assert device.packet_id == 9 + + # second advertisement with the same counter_id + device.update(advertisement) + assert device.packet_id == 9 + assert ( + "New counter_id 9 is the same as the previous received counter_id 9. BLE advertisement " + "will be skipped" in caplog.text + ) + + +def test_increasing_packet_id(caplog): + """Test BTHome parser for BLE advertisement with increasing counter_id.""" + data_string = b"\x40\x00\x09" + advertisement = bytes_to_service_info( + data_string, local_name="ATC_8D18B2", address="A4:C1:38:8D:18:B2" + ) + + device = BTHomeBluetoothDeviceData() + device.update(advertisement) + assert device.packet_id == 9 + + data_string_2 = b"\x40\x00\x0A" + advertisement_2 = bytes_to_service_info( + data_string_2, local_name="ATC_8D18B2", address="A4:C1:38:8D:18:B2" + ) + + assert device.update(advertisement_2) == SensorUpdate( + title="ATC 18B2", + devices={ + None: SensorDeviceInfo( + name="ATC 18B2", + manufacturer="Xiaomi", + model="Temperature/Humidity sensor", + sw_version="BTHome BLE v2", + hw_version=None, + ) + }, + entity_descriptions={ + KEY_PACKET_ID: SensorDescription( + device_key=KEY_PACKET_ID, + device_class=SensorDeviceClass.PACKET_ID, + native_unit_of_measurement=None, + ), + KEY_SIGNAL_STRENGTH: SensorDescription( + device_key=KEY_SIGNAL_STRENGTH, + device_class=SensorDeviceClass.SIGNAL_STRENGTH, + native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT, + ), + }, + entity_values={ + KEY_PACKET_ID: SensorValue( + device_key=KEY_PACKET_ID, name="Packet Id", native_value=10 + ), + KEY_SIGNAL_STRENGTH: SensorValue( + device_key=KEY_SIGNAL_STRENGTH, name="Signal Strength", native_value=-60 + ), + }, + ) + assert device.packet_id == 10 + + def test_bthome_wrong_object_id(caplog): """Test BTHome parser for a non-existing Object ID xFE.""" data_string = b"\x40\xFE\xca\x09" @@ -3068,8 +3170,8 @@ def test_bthome_shelly_button(caplog): ) device = BTHomeBluetoothDeviceData() - assert device.supported(advertisement) is True update = device.update(advertisement) + assert device.supported(advertisement) is True assert update == SensorUpdate( title="BTHome sensor 18B2", devices={