From eee3e0e69a47bb5e6a8d7e993bd09083bd62d938 Mon Sep 17 00:00:00 2001 From: Ashley Gittins Date: Thu, 7 Nov 2024 22:19:04 +1100 Subject: [PATCH] fix: Handling retired or changed scanners (#360) - Refactor _refresh_scanners to be more efficient, and to handle the case where a scanner still exists but is no longer a scanner. - Refactor coordinator.init() to move callbacks out into class methods. - Tamed down lots of debug logging - Altered self.last_config_entry_update to use MONOTONIC_TIME(), and changed cooldown to a const of 10 seconds, and included check for parallel save tasks. - Removed assert that excluded scanners from area-by-mindist --- .../bermuda/bermuda_device_scanner.py | 14 +- custom_components/bermuda/const.py | 2 + custom_components/bermuda/coordinator.py | 408 +++++++++--------- 3 files changed, 219 insertions(+), 205 deletions(-) diff --git a/custom_components/bermuda/bermuda_device_scanner.py b/custom_components/bermuda/bermuda_device_scanner.py index ba21945..6913ce0 100644 --- a/custom_components/bermuda/bermuda_device_scanner.py +++ b/custom_components/bermuda/bermuda_device_scanner.py @@ -191,11 +191,15 @@ def update_advertisement(self, scandata: BluetoothScannerDevice): # I want to know if it does. # AJG 2024-01-11: This does happen. Looks like maybe apple devices? # Changing from warning to debug to quiet users' logs. - _LOGGER.debug( - "Device changed TX-POWER! That was unexpected: %s %sdB", - self.parent_device_address, - scandata.advertisement.tx_power, - ) + # Also happens with esphome set with long beacon interval tx, as it alternates + # between sending some generic advert and the iBeacon advert. ie, it's bogus for that + # case. + # _LOGGER.debug( + # "Device changed TX-POWER! That was unexpected: %s %sdB", + # self.parent_device_address, + # scandata.advertisement.tx_power, + # ) + pass self.tx_power = scandata.advertisement.tx_power # Track each advertisement element as or if they change. diff --git a/custom_components/bermuda/const.py b/custom_components/bermuda/const.py index 5626247..fb003a9 100644 --- a/custom_components/bermuda/const.py +++ b/custom_components/bermuda/const.py @@ -92,6 +92,8 @@ PRUNE_TIME_DEFAULT = 259200 # Max age of regular device entries (3days) PRUNE_TIME_IRK = 3600 # Resolvable Private addresses change often, prune regularly (1h) +SAVEOUT_COOLDOWN = 10 # seconds to delay before re-trying config entry save. + DOCS = {} diff --git a/custom_components/bermuda/coordinator.py b/custom_components/bermuda/coordinator.py index 131c334..5f2c660 100644 --- a/custom_components/bermuda/coordinator.py +++ b/custom_components/bermuda/coordinator.py @@ -17,6 +17,7 @@ BluetoothScannerDevice, ) from homeassistant.components.bluetooth.api import _get_manager +from homeassistant.config_entries import ConfigEntryState from homeassistant.const import EVENT_STATE_CHANGED from homeassistant.core import ( Event, @@ -83,6 +84,7 @@ PRUNE_TIME_DEFAULT, PRUNE_TIME_INTERVAL, PRUNE_TIME_IRK, + SAVEOUT_COOLDOWN, SIGNAL_DEVICE_NEW, UPDATE_INTERVAL, ) @@ -139,15 +141,8 @@ def __init__( self.stamp_last_prune: float = 0 # When we last pruned device list self.member_uuids = {} - def load_manufacturer_ids(): - """Import yaml file containing manufacturer name mappings.""" - file_path = Path(__file__).parent / "manufacturer_identification" / "member_uuids.yaml" - with file_path.open("r") as f: - member_uuids_yaml = yaml.safe_load(f)["uuids"] - self.member_uuids = {hex(member["uuid"])[2:]: member["name"] for member in member_uuids_yaml} - - hass.async_add_executor_job(load_manufacturer_ids) + hass.async_add_executor_job(self.load_manufacturer_ids) super().__init__( hass, _LOGGER, @@ -167,38 +162,9 @@ def load_manufacturer_ids(): self.metadevices: dict[str, BermudaDevice] = {} self._ad_listener_cancel: Cancellable | None = None - self.last_config_entry_update: datetime | None = None - - @callback - def handle_state_changes(ev: Event[EventStateChangedData]): - """Watch for new mac addresses on private ble devices and act.""" - if ev.event_type == EVENT_STATE_CHANGED: - event_entity = ev.data.get("entity_id", "invalid_event_entity") - if event_entity in self.pb_state_sources: - # It's a state change of an entity we are tracking. - new_state = ev.data.get("new_state") - if new_state: - # _LOGGER.debug("New state change! %s", new_state) - # check new_state.attributes.assumed_state - if hasattr(new_state, "attributes"): - new_address = new_state.attributes.get("current_address") - if new_address is not None and new_address.lower() != self.pb_state_sources[event_entity]: - _LOGGER.debug( - "Have a new source address for %s, %s", - event_entity, - new_address, - ) - self.pb_state_sources[event_entity] = new_address.lower() - # Flag that we need new pb checks, and work them out: - self._do_private_device_init = True - # If no sensors have yet been configured, the coordinator - # won't be getting polled for fresh data. Since we have - # found something, we should get it to do that. - # No longer using async_config_entry_first_refresh as it - # breaks - self.hass.add_job(self.async_refresh()) - - self.hass.bus.async_listen(EVENT_STATE_CHANGED, handle_state_changes) + self.last_config_entry_update: float = 0 + + self.hass.bus.async_listen(EVENT_STATE_CHANGED, self.handle_state_changes) # First time around we freshen the restored scanner info by # forcing a scan of the captured info. @@ -208,70 +174,9 @@ def handle_state_changes(ev: Event[EventStateChangedData]): # any there for us to track. self._do_private_device_init = True - @callback - def handle_devreg_changes(ev: Event[EventDeviceRegistryUpdatedData]): - """ - Update our scanner list if the device registry is changed. - - This catches area changes (on scanners) and any new/changed - Private BLE Devices. - """ - # TODO: Ignore the below, and implement filtering. This gets - # called a "fair number" of times each time we get reloaded. - # - # We could try filtering on "updates" and "area" but I doubt - # this will fire all that often, and even when it does fire - # the difference in cycle time appears to be less than 1ms. - _LOGGER.debug( - "Device registry has changed. ev: %s", - ev, - ) - if ev.data["action"] in {"create", "update"}: - device = self._device_registry.async_get(ev.data["device_id"]) - # if this is an "update" we also get a "changes" dict, but we don't - # bother with it yet. - - if device is not None: - # Work out if it's a device that interests us and respond appropriately. - for conn_type, _conn_id in device.connections: - if conn_type == "private_ble_device": - _LOGGER.debug("Trigger updating of Private BLE Devices") - self._do_private_device_init = True - elif conn_type == "ibeacon": - # this was probably us, nothing else to do - pass - else: - # might be a scanner, so let's refresh those - _LOGGER.debug("Trigger updating of Scanner Listings") - self._do_full_scanner_init = True - else: - _LOGGER.error("Received DR update/create but device id does not exist: %s", ev.data["device_id"]) - - elif ev.data["action"] == "remove": - device_found = False - for scanner in self.scanner_list: - if self.devices[scanner].entry_id == ev.data["device_id"]: - _LOGGER.debug("Scanner %s removed, trigger update of scanners.", self.devices[scanner].name) - self._do_full_scanner_init = True - device_found = True - if not device_found: - # If we save the private ble device's device_id into devices[].entry_id - # we could check ev.data["device_id"] against it to decide if we should - # rescan PBLE devices. But right now we don't, so scan 'em anyway. - _LOGGER.debug("Opportunistic trigger of update for Private BLE Devices") - self._do_private_device_init = True - - # The co-ordinator will only get updates if we have created entities already. - # Since this might not always be the case (say, private_ble_device loads after - # we do), then we trigger an update here with the expectation that we got a - # device registry update after the private ble device was created. There might - # be other corner cases where we need to trigger our own update here, so test - # carefully and completely if you are tempted to remove / alter this. - self.hass.add_job(self._async_update_data()) - # Listen for changes to the device registry and handle them. # Primarily for changes to scanners and Private BLE Devices. - hass.bus.async_listen(EVENT_DEVICE_REGISTRY_UPDATED, handle_devreg_changes) + hass.bus.async_listen(EVENT_DEVICE_REGISTRY_UPDATED, self.handle_devreg_changes) self.options = {} @@ -324,7 +229,7 @@ def handle_devreg_changes(ev: Event[EventDeviceRegistryUpdatedData]): setattr(scanner, key, value) self.scanner_list.append(address) - # Set up the dump_devices service + # Register the dump_devices service hass.services.async_register( DOMAIN, "dump_devices", @@ -350,6 +255,103 @@ def handle_devreg_changes(ev: Event[EventDeviceRegistryUpdatedData]): ) ) + def load_manufacturer_ids(self): + """Import yaml file containing manufacturer name mappings.""" + file_path = Path(__file__).parent / "manufacturer_identification" / "member_uuids.yaml" + + with file_path.open("r") as f: + member_uuids_yaml = yaml.safe_load(f)["uuids"] + self.member_uuids = {hex(member["uuid"])[2:]: member["name"] for member in member_uuids_yaml} + + @callback + def handle_state_changes(self, ev: Event[EventStateChangedData]): + """Watch for new mac addresses on private ble devices and act.""" + if ev.event_type == EVENT_STATE_CHANGED: + event_entity = ev.data.get("entity_id", "invalid_event_entity") + if event_entity in self.pb_state_sources: + # It's a state change of an entity we are tracking. + new_state = ev.data.get("new_state") + if new_state: + # _LOGGER.debug("New state change! %s", new_state) + # check new_state.attributes.assumed_state + if hasattr(new_state, "attributes"): + new_address = new_state.attributes.get("current_address") + if new_address is not None and new_address.lower() != self.pb_state_sources[event_entity]: + _LOGGER.debug( + "Have a new source address for %s, %s", + event_entity, + new_address, + ) + self.pb_state_sources[event_entity] = new_address.lower() + # Flag that we need new pb checks, and work them out: + self._do_private_device_init = True + # If no sensors have yet been configured, the coordinator + # won't be getting polled for fresh data. Since we have + # found something, we should get it to do that. + # No longer using async_config_entry_first_refresh as it + # breaks + self.hass.add_job(self.async_refresh()) + + @callback + def handle_devreg_changes(self, ev: Event[EventDeviceRegistryUpdatedData]): + """ + Update our scanner list if the device registry is changed. + + This catches area changes (on scanners) and any new/changed + Private BLE Devices. + """ + # TODO: Ignore the below, and implement filtering. This gets + # called a "fair number" of times each time we get reloaded. + # + # We could try filtering on "updates" and "area" but I doubt + # this will fire all that often, and even when it does fire + # the difference in cycle time appears to be less than 1ms. + _LOGGER.debug( + "Device registry has changed. ev: %s", + ev, + ) + if ev.data["action"] in {"create", "update"}: + device = self._device_registry.async_get(ev.data["device_id"]) + # if this is an "update" we also get a "changes" dict, but we don't + # bother with it yet. + + if device is not None: + # Work out if it's a device that interests us and respond appropriately. + for conn_type, _conn_id in device.connections: + if conn_type == "private_ble_device": + _LOGGER.debug("Trigger updating of Private BLE Devices") + self._do_private_device_init = True + elif conn_type == "ibeacon": + # this was probably us, nothing else to do + pass + else: + # might be a scanner, so let's refresh those + _LOGGER.debug("Trigger updating of Scanner Listings") + self._do_full_scanner_init = True + else: + _LOGGER.error("Received DR update/create but device id does not exist: %s", ev.data["device_id"]) + + elif ev.data["action"] == "remove": + device_found = False + for scanner in self.scanner_list: + if self.devices[scanner].entry_id == ev.data["device_id"]: + _LOGGER.debug("Scanner %s removed, trigger update of scanners.", self.devices[scanner].name) + self._do_full_scanner_init = True + device_found = True + if not device_found: + # If we save the private ble device's device_id into devices[].entry_id + # we could check ev.data["device_id"] against it to decide if we should + # rescan PBLE devices. But right now we don't, so scan 'em anyway. + _LOGGER.debug("Opportunistic trigger of update for Private BLE Devices") + self._do_private_device_init = True + # The co-ordinator will only get updates if we have created entities already. + # Since this might not always be the case (say, private_ble_device loads after + # we do), then we trigger an update here with the expectation that we got a + # device registry update after the private ble device was created. There might + # be other corner cases where we need to trigger our own update here, so test + # carefully and completely if you are tempted to remove / alter this. + self.hass.add_job(self._async_update_data()) + @callback def async_handle_advert( self, @@ -365,14 +367,14 @@ def async_handle_advert( responding to changing rssi values, but it *is* good for seeding our updates in case there are no defined sensors yet (or the defined ones are away). """ - _LOGGER.debug( - "New Advert! change: %s, scanner: %s mac: %s name: %s serviceinfo: %s", - change, - service_info.source, - service_info.address, - service_info.name, - service_info, - ) + # _LOGGER.debug( + # "New Advert! change: %s, scanner: %s mac: %s name: %s serviceinfo: %s", + # change, + # service_info.source, + # service_info.address, + # service_info.name, + # service_info, + # ) # # If there are no configured_devices already present during Bermuda's # initial setup, then no sensors will be created, and no updates will @@ -662,9 +664,10 @@ async def _async_update_data(self): # since the last time we booted. if self._do_full_scanner_init: if not self._refresh_scanners(): - _LOGGER.debug("Failed to refresh scanners, likely config entry not ready.") + # _LOGGER.debug("Failed to refresh scanners, likely config entry not ready.") # don't fail the update, just try again next time. # self.last_update_success = False + pass # set up any beacons and update their data. We do this after all the devices # have had their updates done since any beacon inherits data from its source @@ -1042,8 +1045,6 @@ def _refresh_areas_by_min_distance(self): def _refresh_area_by_min_distance(self, device: BermudaDevice): """Very basic Area setting by finding closest beacon to a given device.""" - # FIXME: Asserts should be avoided in non-tests as running python in optimized mode will skip them - assert device.is_scanner is not True # noqa closest_scanner: BermudaDeviceScanner | None = None for scanner in device.scanners.values(): # Check each scanner and keep note of the closest one based on rssi_distance. @@ -1067,77 +1068,84 @@ def _refresh_scanners(self, scanners: list[BluetoothScannerDevice] | None = None """ Refresh our local (and saved) list of scanners (BLE Proxies). - If self._do_full_scanner_init is true a full scan will be done. - Otherwise, you need to supply a list of scanners that you wish - to refresh. + The scanners list param is ignored and no longer required. We refresh all scanners + each time we are called, since the overhead is now lower and we had prematurely + optimised the routine. We only save out the config entry if it has changed *AND* + we haven't tried to do so in the last SAVEOUT_COOLDOWN seconds (10 seems to be enough, + we only do it when the proxies config has *actually* changed). """ - addresses = set() - update_scannerlist = False + _previous_scannerlist = [device.address for device in self.devices.values() if device.is_scanner] + _purge_scanners = _previous_scannerlist.copy() - if scanners is not None: - for scanner in scanners: - addresses.add(scanner.scanner.source.lower()) + # _LOGGER.error("Preserving %d current scanner entries", len(_previous_scannerlist)) - # If we are doing a full scan, add all the known - # scanner addresses to the list, since that will cover - # the scanners that have been restored from config.data - if self._do_full_scanner_init: + # Find active HaBaseScanners in the backend, and only pay attention to those + # instead of trawling through the device registry first. + # + # scanner_ha: BaseHaScanner from HA's bluetooth backend + # scanner_devreg: DeviceEntry from HA's device_registry + # scanner_b: BermudaDevice entry + # + # Evil: We're acessing private members of bt manager to do it since there's no API call for it. + _allscanners = self._manager._connectable_scanners | self._manager._non_connectable_scanners # noqa: SLF001 + for scanner_ha in _allscanners: + scanner_address = format_mac(scanner_ha.source).lower() + scanner_devreg = self._device_registry.async_get_device(connections={("mac", scanner_address)}) + if scanner_devreg is None: + _LOGGER_SPAM_LESS.error( + "scanner_not_in_devreg", + "Failed to find scanner %s (%s) in Device Registry", + scanner_ha.name, + scanner_ha.source, + ) + continue + # _LOGGER.info("Great! Found scanner: %s (%s)", scanner_ha.name, scanner_ha.source) + # Since this scanner still exists, we won't purge it + if scanner_address in _purge_scanners: + _purge_scanners.remove(scanner_address) + scanner_b = self._get_device(scanner_address) + if scanner_b is None: + # It's a new scanner, we will need to update our saved config. + # _LOGGER.debug("New Scanner: %s", scanner_ha.name) + scanner_b = self._get_or_create_device(scanner_address) + + # We found the device entry and have created our scannerdevice, + # now update any fields that might be new from the device reg: + scanner_b.area_id = scanner_devreg.area_id + scanner_b.entry_id = scanner_devreg.id + if scanner_devreg.name_by_user is not None: + scanner_b.name = scanner_devreg.name_by_user + else: + scanner_b.name = scanner_devreg.name + areas = self.area_reg.async_get_area(scanner_devreg.area_id) if scanner_devreg.area_id else None + if areas is not None and hasattr(areas, "name"): + scanner_b.area_name = areas.name + else: + _LOGGER_SPAM_LESS.warning( + f"no_area_on_update{scanner_b.name}", + "No area name or no area id updating scanner %s, area_id %s", + scanner_b.name, + areas, + ) + scanner_b.is_scanner = True + + # Now un-tag any devices that are no longer scanners + for address in _purge_scanners: + self.devices[address].is_scanner = False update_scannerlist = True - for address in self.scanner_list: - addresses.add(address.lower()) - self._do_full_scanner_init = False - - if len(addresses) > 0: - # FIXME: Really? This can't possibly be a sensible nesting of loops. - # should probably look at the API. Anyway, we are checking any devices - # that have a "mac" or "bluetooth" connection, - devreg = dr.async_get(self.hass) - for dev_entry in devreg.devices.data.values(): - for dev_connection in dev_entry.connections: - if dev_connection[0] in ["mac", "bluetooth"]: - found_address = format_mac(dev_connection[1]) - if found_address in addresses: - scandev = self._get_device(found_address) - if scandev is None: - # It's a new scanner, we will need to update our saved config. - _LOGGER.debug("New Scanner: %s", found_address) - update_scannerlist = True - scandev = self._get_or_create_device(found_address) - # Found the device entry and have created our scannerdevice, - # now update any fields that might be new from the device reg: - scandev_orig = scandev - scandev.area_id = dev_entry.area_id - scandev.entry_id = dev_entry.id - if dev_entry.name_by_user is not None: - scandev.name = dev_entry.name_by_user - else: - scandev.name = dev_entry.name - areas = self.area_reg.async_get_area(dev_entry.area_id) if dev_entry.area_id else None - if areas is not None and hasattr(areas, "name"): - scandev.area_name = areas.name - else: - _LOGGER_SPAM_LESS.warning( - f"no_area_on_update{scandev.name}", - "No area name or no area id updating scanner %s, area_id %s", - scandev.name, - areas, - ) - scandev.is_scanner = True - # If the scanner data we loaded from our saved data appears - # out of date, trigger a full rescan of seen scanners. - if scandev_orig != scandev: - # something changed, let's update the saved list. - _LOGGER.debug( - "Scanner info for %s has changed, we'll update our saved data.", - scandev.name, - ) - update_scannerlist = True + + # Because of the quick check-time and the checks we have on saving the config_entry, + # we'll update on every call: + update_scannerlist = True if update_scannerlist: - # Take the existing list of scanners and save them into config data - # for our next start-up. - for entry in self.hass.config_entries.async_entries(DOMAIN, include_disabled=False, include_ignore=False): - _LOGGER.debug("Loaded entry %s, state: %s", entry.entry_id, entry.state) - self.config_entry = entry + # bail out if the config entry isn't ready yet. + if self.config_entry is None or self.config_entry.state != ConfigEntryState.LOADED: + # _LOGGER.debug("Aborting refresh scanners due to config entry not being ready") + self._do_full_scanner_init = True + return False + + # Build the config_data and self.scanner_list structs fresh + # ready to update our config entry if needed. self.scanner_list.clear() confdata_scanners: dict[str, dict] = {} for device in self.devices.values(): @@ -1145,23 +1153,20 @@ def _refresh_scanners(self, scanners: list[BluetoothScannerDevice] | None = None confdata_scanners[device.address] = device.to_dict() self.scanner_list.append(device.address) - if self.config_entry is None: - _LOGGER.debug("Aborting refresh scanners due to config entry not being ready") - return False - if self.config_entry.data.get(CONFDATA_SCANNERS, {}) == confdata_scanners: - _LOGGER.debug("Scanner configs are identical, not doing update.") + # _LOGGER.debug("Scanner configs are identical, not doing update.") # Return true since we're happy that the config entry # exists and has the current scanner data that we want, # so there's nothing to do. # See #351, #341 + self._do_full_scanner_init = False return True - _LOGGER.debug( - "Replacing config data scanners was %s now %s", - self.config_entry.data.get(CONFDATA_SCANNERS, {}), - confdata_scanners, - ) + # _LOGGER.debug( + # "Replacing config data scanners was %s now %s", + # self.config_entry.data.get(CONFDATA_SCANNERS, {}), + # confdata_scanners, + # ) @callback def async_call_update_entry() -> None: @@ -1170,7 +1175,11 @@ def async_call_update_entry() -> None: We do this via add_job to ensure it runs in the event loop. """ - self.last_config_entry_update = now() + if self.last_config_entry_update > MONOTONIC_TIME() - SAVEOUT_COOLDOWN: + # We are probably not the only instance of ourselves in this queue. + # let's back off for a bit. + return + self.last_config_entry_update = MONOTONIC_TIME() self.hass.config_entries.async_update_entry( self.config_entry, data={ @@ -1178,15 +1187,14 @@ def async_call_update_entry() -> None: CONFDATA_SCANNERS: confdata_scanners, }, ) - - # To prevent strain on the system, let's only update if - # A) we have a new scanner - # B) It has been 30 minutes since our last update - if ( - len(self.config_entry.data.get(CONFDATA_SCANNERS, {})) != len(confdata_scanners) - or self.last_config_entry_update is None - or (now() - self.last_config_entry_update).total_seconds() > 1800 - ): + # Clear the flag for init + self._do_full_scanner_init = False + + # After calling the update there are a lot of cycles while loading etc. + # Cool off for a little before calling again... + if self.last_config_entry_update < MONOTONIC_TIME() - SAVEOUT_COOLDOWN: + self.last_config_entry_update = MONOTONIC_TIME() + _LOGGER.info("Saving out scanner configs") self.hass.add_job(async_call_update_entry) return True