Skip to content

Commit

Permalink
add bermuda trilateration
Browse files Browse the repository at this point in the history
  • Loading branch information
zeridon committed Oct 23, 2024
1 parent 8c5f743 commit c184da3
Show file tree
Hide file tree
Showing 19 changed files with 3,355 additions and 0 deletions.
108 changes: 108 additions & 0 deletions homeassistant-config/custom_components/bermuda/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Custom integration to integrate Bermuda BLE Trilateration with Home Assistant.
For more details about this integration, please refer to
https://github.com/agittins/bermuda
"""

from __future__ import annotations

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import Config
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv

# from homeassistant.helpers.device_registry import EventDeviceRegistryUpdatedData
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.device_registry import format_mac

from .const import _LOGGER
from .const import DOMAIN
from .const import PLATFORMS
from .const import STARTUP_MESSAGE
from .coordinator import BermudaDataUpdateCoordinator

# from .const import _LOGGER_SPAM_LESS

# from typing import TYPE_CHECKING

# from bthome_ble import BTHomeBluetoothDeviceData

# if TYPE_CHECKING:
# from bleak.backends.device import BLEDevice

CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)


async def async_setup(
hass: HomeAssistant, config: Config
): # pylint: disable=unused-argument;
"""Setting up this integration using YAML is not supported."""
return True


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up this integration using UI."""
if hass.data.get(DOMAIN) is None:
_LOGGER.info(STARTUP_MESSAGE)

coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
BermudaDataUpdateCoordinator(hass, entry)
)

await coordinator.async_refresh()

if not coordinator.last_update_success:
_LOGGER.debug("Coordinator last update failed, rasing ConfigEntryNotReady")
raise ConfigEntryNotReady

await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

entry.async_on_unload(entry.add_update_listener(async_reload_entry))
return True


async def async_remove_config_entry_device(
hass: HomeAssistant, config_entry: ConfigEntry, device_entry: DeviceEntry
) -> bool:
"""Remove a config entry from a device."""
coordinator: BermudaDataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
address = None
for ident in device_entry.identifiers:
try:
if ident[0] == DOMAIN:
# the identifier should be the base device address, and
# may have "_range" or some other per-sensor suffix.
# The address might be a mac address, IRK or iBeacon uuid
address = ident[1].split("_")[0]
except KeyError:
pass
if address is not None:
try:
coordinator.devices[format_mac(address)].create_sensor = False
except KeyError:
_LOGGER.warning("Failed to locate device entry for %s", address)
return True
# Even if we don't know this address it probably just means it's stale or from
# a previous version that used weirder names. Allow it.
_LOGGER.warning(
"Didn't find address for %s but allowing deletion to proceed.",
device_entry.name,
)
return True


async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Handle removal of an entry."""
if unload_result := await hass.config_entries.async_unload_platforms(
entry, PLATFORMS
):
_LOGGER.debug("Unloaded platforms.")
hass.data[DOMAIN].pop(entry.entry_id)
return unload_result


async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload config entry."""
await hass.config_entries.async_reload(entry.entry_id)
140 changes: 140 additions & 0 deletions homeassistant-config/custom_components/bermuda/bermuda_device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Bermuda's internal representation of a bluetooth device.
Each discovered bluetooth device (ie, every found transmitter) will
have one of these entries created for it. These are not HA 'devices' but
our own internal thing. They directly correspond to the entries you will
see when calling the dump_devices service call.
Even devices which are not configured/tracked will get entries created
for them, so we can use them to contribute towards measurements."""

from __future__ import annotations

from homeassistant.components.bluetooth import MONOTONIC_TIME
from homeassistant.components.bluetooth import BluetoothScannerDevice
from homeassistant.const import STATE_HOME
from homeassistant.const import STATE_NOT_HOME
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.helpers.device_registry import format_mac

from .bermuda_device_scanner import BermudaDeviceScanner
from .const import BDADDR_TYPE_UNKNOWN
from .const import CONF_DEVICES
from .const import CONF_DEVTRACK_TIMEOUT
from .const import DEFAULT_DEVTRACK_TIMEOUT


class BermudaDevice(dict):
"""This class is to represent a single bluetooth "device" tracked by Bermuda.
"device" in this context means a bluetooth receiver like an ESPHome
running bluetooth_proxy or a bluetooth transmitter such as a beacon,
a thermometer, watch or phone etc.
We're not storing this as an Entity because we don't want all devices to
become entities in homeassistant, since there might be a _lot_ of them.
"""

def __init__(self, address, options):
"""Initial (empty) data"""
self.name: str | None = None
self.local_name: str | None = None
self.prefname: str | None = None # "preferred" name - ideally local_name
self.address: str = address
self.options = options
self.unique_id: str | None = None # mac address formatted.
self.address_type = BDADDR_TYPE_UNKNOWN
self.area_id: str | None = None
self.area_name: str | None = None
self.area_distance: float | None = None # how far this dev is from that area
self.area_rssi: float | None = None # rssi from closest scanner
self.area_scanner: str | None = None # name of closest scanner
self.zone: str = STATE_UNAVAILABLE # STATE_HOME or STATE_NOT_HOME
self.manufacturer: str | None = None
self.connectable: bool = False
self.is_scanner: bool = False
self.beacon_type: set = set()
self.beacon_sources = (
[]
) # list of MAC addresses that have advertised this beacon
self.beacon_unique_id: str | None = (
None # combined uuid_major_minor for *really* unique id
)
self.beacon_uuid: str | None = None
self.beacon_major: str | None = None
self.beacon_minor: str | None = None
self.beacon_power: float | None = None

self.entry_id: str | None = None # used for scanner devices
self.create_sensor: bool = False # Create/update a sensor for this device
self.create_sensor_done: bool = False # Sensor should now exist
self.create_tracker_done: bool = False # device_tracker should now exist
self.last_seen: float = (
0 # stamp from most recent scanner spotting. MONOTONIC_TIME
)
self.scanners: dict[str, BermudaDeviceScanner] = {}

def calculate_data(self):
"""Call after doing update_scanner() calls so that distances
etc can be freshly smoothed and filtered.
"""
for scanner in self.scanners.values():
scanner.calculate_data()

# Update whether the device has been seen recently, for device_tracker:
if (
self.last_seen is not None
and MONOTONIC_TIME()
- self.options.get(CONF_DEVTRACK_TIMEOUT, DEFAULT_DEVTRACK_TIMEOUT)
< self.last_seen
):
self.zone = STATE_HOME
else:
self.zone = STATE_NOT_HOME

if self.address.upper() in self.options.get(CONF_DEVICES, []):
# We are a device we track. Flag for set-up:
self.create_sensor = True

def update_scanner(
self, scanner_device: BermudaDevice, discoveryinfo: BluetoothScannerDevice
):
"""Add/Update a scanner entry on this device, indicating a received advertisement
This gets called every time a scanner is deemed to have received an advert for
this device. It only loads data into the structure, all calculations are done
with calculate_data()
"""
if format_mac(scanner_device.address) in self.scanners:
# Device already exists, update it
self.scanners[format_mac(scanner_device.address)].update_advertisement(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id or "area_not_defined",
self.options,
)
else:
self.scanners[format_mac(scanner_device.address)] = BermudaDeviceScanner(
self.address,
discoveryinfo, # the entire BluetoothScannerDevice struct
scanner_device.area_id or "area_not_defined",
self.options,
)
device_scanner = self.scanners[format_mac(scanner_device.address)]
# Let's see if we should update our last_seen based on this...
if device_scanner.stamp is not None and self.last_seen < device_scanner.stamp:
self.last_seen = device_scanner.stamp

def to_dict(self):
"""Convert class to serialisable dict for dump_devices"""
out = {}
for var, val in vars(self).items():
if var == "scanners":
scanout = {}
for address, scanner in self.scanners.items():
scanout[address] = scanner.to_dict()
val = scanout
out[var] = val
return out
Loading

0 comments on commit c184da3

Please sign in to comment.