Skip to content

Commit

Permalink
feat: Config status and Calibration additions
Browse files Browse the repository at this point in the history
- Add table and nicer formatting to initial config dialog status info.
- Add tables and instructions to calibration steps to make choosing values easier
- Error checking and mac case-folding in _get_bermuda_device_from_registry
- pluralise CONF_RSSI_OFFSET
- add typing to coordinator.scanner_list
- add coordinator.get_active_scanner_summary
- add (copious) instructions to calibration1
- pluralise step name calibration2_scanners
  • Loading branch information
agittins committed Aug 6, 2024
1 parent 3e89777 commit 5b826d7
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 49 deletions.
4 changes: 2 additions & 2 deletions custom_components/bermuda/bermuda_device_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
CONF_DEVICES,
CONF_MAX_VELOCITY,
CONF_REF_POWER,
CONF_RSSI_OFFSET,
CONF_RSSI_OFFSETS,
CONF_SMOOTHING_SAMPLES,
DISTANCE_INFINITE,
DISTANCE_TIMEOUT,
Expand Down Expand Up @@ -143,7 +143,7 @@ def update_advertisement(
self.rssi = scandata.advertisement.rssi
self.hist_rssi.insert(0, self.rssi)
self.rssi_distance_raw = rssi_to_metres(
self.rssi + options.get(CONF_RSSI_OFFSET, {}).get(self.scanner_device_name, 0),
self.rssi + options.get(CONF_RSSI_OFFSETS, {}).get(self.scanner_device_name, 0),
options.get(CONF_REF_POWER),
options.get(CONF_ATTENUATION),
)
Expand Down
153 changes: 120 additions & 33 deletions custom_components/bermuda/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
CONF_MAX_RADIUS,
CONF_MAX_VELOCITY,
CONF_REF_POWER,
CONF_RSSI_OFFSET,
CONF_RSSI_OFFSETS,
CONF_SAVE_AND_CLOSE,
CONF_SCANNER_INFO,
CONF_SCANNERS,
Expand Down Expand Up @@ -141,8 +141,9 @@ async def async_step_init(self, user_input=None): # pylint: disable=unused-argu
messages = {}
active_devices = self.coordinator.count_active_devices()
active_scanners = self.coordinator.count_active_scanners()
messages["device_count"] = f"{active_devices} active of {len(self.devices)}"
messages["scanner_count"] = f"{active_scanners} active of {len(self.coordinator.scanner_list)}"

messages["device_count"] = f"{active_devices} active out of {len(self.devices)}"
messages["scanner_count"] = f"{active_scanners} active out of {len(self.coordinator.scanner_list)}"
if len(self.coordinator.scanner_list) == 0:
messages["status"] = (
"You need to configure some bluetooth scanners before Bermuda will have anything to work with. "
Expand All @@ -157,14 +158,32 @@ async def async_step_init(self, user_input=None): # pylint: disable=unused-argu
else:
messages["status"] = "Life looks good."

# Build a markdown table of scanners so the user can see what's up.
scanner_table = "\n|Scanner|Address|Last advertisement|\n|---|---|---:|\n"
# Use emoji to indicate if age is "good"
for scanner in self.coordinator.get_active_scanner_summary():
age = int(scanner.get("last_stamp_age", 999))
if age < 2:
status = '<ha-icon icon="mdi:check-circle-outline"></ha-icon>'
elif age < 10:
status = '<ha-icon icon="mdi:alert-outline"></ha-icon>'
else:
status = '<ha-icon icon="mdi:skull-crossbones"></ha-icon>'

scanner_table += (
f"| {scanner.get("name", "NAME_ERR")}| [{scanner.get("address", "ADDR_ERR")}]"
f"| {status} {int(scanner.get("last_stamp_age")):d} seconds ago.|\n"
)
messages["status"] += scanner_table

# return await self.async_step_globalopts()
return self.async_show_menu(
step_id="init",
menu_options={
"globalopts": "Global Options",
"selectdevices": "Select Devices",
"calibration1_global": "Calibration 1: Global",
"calibration2_scanner": "Calibration 2: Scanner",
"calibration2_scanners": "Calibration 2: Scanners",
},
description_placeholders=messages,
)
Expand Down Expand Up @@ -299,6 +318,18 @@ async def async_step_selectdevices(self, user_input=None):
return self.async_show_form(step_id="selectdevices", data_schema=vol.Schema(data_schema))

async def async_step_calibration1_global(self, user_input=None):
# FIXME: This is ridiculous. But I can't yet find a better way.
_ugly_token_hack = {
# These are work-arounds for (broken?) placeholder substitutions.
# I've not been able to find out why, but just having <details> in the
# en.json will cause placeholders to break, due to *something* treating
# the html elements as placeholders.
"details": "<details>",
"details_end": "</details>",
"summary": "<summary>",
"summary_end": "</summary>",
}

if user_input is not None:
if user_input[CONF_SAVE_AND_CLOSE]:
self.options.update(
Expand Down Expand Up @@ -358,7 +389,8 @@ async def async_step_calibration1_global(self, user_input=None):
return self.async_show_form(
step_id="calibration1_global",
data_schema=vol.Schema(data_schema),
description_placeholders={"suffix": "After you click Submit, the new distances will be shown here."},
description_placeholders=_ugly_token_hack
| {"suffix": "After you click Submit, the new distances will be shown here."},
)
device = self._get_bermuda_device_from_registry(user_input[CONF_DEVICES])
scanner = device.scanners[user_input[CONF_SCANNERS]]
Expand All @@ -367,16 +399,49 @@ async def async_step_calibration1_global(self, user_input=None):
rssi_to_metres(historical_rssi, self._last_ref_power, self._last_attenuation)
for historical_rssi in scanner.hist_rssi
]

# Build a markdown table showing distance and rssi history for the
# selected device / scanner combination
results_str = f"| {device.name} |"
# Limit the number of columns to what's available up to a max of 5.
cols = min(5, len(distances), len(scanner.hist_rssi))
for i in range(cols):
results_str += f" {i} |"
results_str += "\n|---|"
for i in range(cols): # noqa for unused var i
results_str += "---:|"

results_str += "\n| Estimate (m) |"
for i in range(cols):
results_str += f" `{distances[i]:>5.2f}`|"
results_str += "\n| RSSI Actual |"
for i in range(cols):
results_str += f" `{scanner.hist_rssi[i]:>5}`|"
results_str += "\n"

return self.async_show_form(
step_id="calibration1_global",
data_schema=vol.Schema(data_schema),
description_placeholders={
"suffix": f"Using reference_power of {self._last_ref_power} "
f"and attenuation of {self._last_attenuation}, recent distances are:\n\n{distances}"
description_placeholders=_ugly_token_hack
| {
"suffix": (
f"Recent distances, calculated using `ref_power = {self._last_ref_power}` "
f"and `attenuation = {self._last_attenuation}` (values from new...old):\n\n{results_str}"
),
},
)

async def async_step_calibration2_scanner(self, user_input=None):
async def async_step_calibration2_scanners(self, user_input=None):
"""
Per-scanner calibration of rssi_offset.
Prompts the user to select a configured device, then adjust the offset
so that the estimated distance to each proxy is correct (typically by
placing device at 1m from each proxy in turn).
Distances are recalculated and displayed each time the user presses
Submit, and they check "Save and Close" to save the config.
"""
if user_input is not None:
if user_input[CONF_SAVE_AND_CLOSE]:
self.options.update(user_input[CONF_SCANNER_INFO])
Expand All @@ -388,10 +453,10 @@ async def async_step_calibration2_scanner(self, user_input=None):
return await self.async_step_init()
self._last_scanner_info = user_input[CONF_SCANNER_INFO]
self._last_device = user_input[CONF_DEVICES]
existing_rssi_offsets = self.options.get(CONF_RSSI_OFFSET, {})
existing_rssi_offsets = self.options.get(CONF_RSSI_OFFSETS, {})
rssi_offset_dict = {}
for scanner in self.coordinator.scanner_list:
scanner_name = self.coordinator.devices[scanner].name if scanner in self.coordinator.devices else scanner
scanner_name = self.coordinator.devices[scanner].name
rssi_offset_dict[scanner_name] = existing_rssi_offsets.get(scanner, 0)
data_schema = {
vol.Required(
Expand All @@ -400,49 +465,71 @@ async def async_step_calibration2_scanner(self, user_input=None):
): DeviceSelector(DeviceSelectorConfig(integration=DOMAIN)),
vol.Required(
CONF_SCANNER_INFO,
default={CONF_RSSI_OFFSET: rssi_offset_dict}
default={CONF_RSSI_OFFSETS: rssi_offset_dict}
if not self._last_scanner_info
else self._last_scanner_info,
): ObjectSelector(),
vol.Optional(CONF_SAVE_AND_CLOSE, default=False): vol.Coerce(bool),
}
if user_input is None:
return self.async_show_form(
step_id="calibration2_scanner",
step_id="calibration2_scanners",
data_schema=vol.Schema(data_schema),
description_placeholders={"suffix": "After you click Submit, the new distances will be shown here."},
)
device = self._get_bermuda_device_from_registry(self._last_device)
results = {}
# Gather new estimates for distances using rssi hist and the new offset.
for scanner in self.coordinator.scanner_list:
cur_offset = self._last_scanner_info[CONF_RSSI_OFFSET].get(scanner, 0)
scanner_name = self.coordinator.devices[scanner].name
cur_offset = self._last_scanner_info[CONF_RSSI_OFFSETS].get(scanner_name, 0)
if scanner in device.scanners:
results[device.scanners[scanner].name] = rssi_to_metres(
device.scanners[scanner].rssi + cur_offset,
self.options.get(CONF_REF_POWER, DEFAULT_REF_POWER),
self.options.get(CONF_ATTENUATION, DEFAULT_ATTENUATION),
)
results[scanner_name] = [
rssi_to_metres(
historical_rssi + cur_offset,
self.options.get(CONF_REF_POWER, DEFAULT_REF_POWER),
self.options.get(CONF_ATTENUATION, DEFAULT_ATTENUATION),
)
for historical_rssi in device.scanners[scanner].hist_rssi
]
# Format the results for display (HA has full markdown support!)
results_str = "| Scanner | Measurements (new...old)|\n|---|---|"
for scanner_name, distances in results.items():
results_str += f"\n|{scanner_name}|"
for distance in distances:
# We round to 2 places (1cm) and pad to fit nn.nn
results_str += f" `{distance:>5.2f}`"
results_str += "|"

return self.async_show_form(
step_id="calibration2_scanner",
step_id="calibration2_scanners",
data_schema=vol.Schema(data_schema),
description_placeholders={"suffix": f"Most recent distances are: {results}"},
description_placeholders={"suffix": results_str},
)

def _get_bermuda_device_from_registry(self, registry_id: str) -> BermudaDevice:
def _get_bermuda_device_from_registry(self, registry_id: str) -> BermudaDevice | None:
"""
Given a device registry device id, return the associated MAC address.
Returns None if the id can not be resolved to a mac.
"""
devreg = dr.async_get(self.hass)
device = devreg.async_get(registry_id)
device_address = None
for connection in device.connections:
if connection[0] in {
DOMAIN_PRIVATE_BLE_DEVICE,
dr.CONNECTION_BLUETOOTH,
"ibeacon",
}:
device_address = connection[1]
break
# TODO: IF device_address IS NONE, Something has gone wrong
return self.coordinator.devices[device_address]
return self.coordinator.devices[device_address.lower()]
if device is not None:
for connection in device.connections:
if connection[0] in {
DOMAIN_PRIVATE_BLE_DEVICE,
dr.CONNECTION_BLUETOOTH,
"ibeacon",
}:
device_address = connection[1]
break
if device_address is not None:
return self.coordinator.devices[device_address.lower()]
# We couldn't match the HA device id to a bermuda device mac.
return None

async def _update_options(self):
"""Update config entry options."""
return self.async_create_entry(title=NAME, data=self.options)
2 changes: 1 addition & 1 deletion custom_components/bermuda/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@

CONF_SAVE_AND_CLOSE = "save_and_close"
CONF_SCANNER_INFO = "scanner_info"
CONF_RSSI_OFFSET = "rssi_offset"
CONF_RSSI_OFFSETS = "rssi_offset"

CONF_UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL = "update_interval", 10
DOCS[CONF_UPDATE_INTERVAL] = (
Expand Down
40 changes: 30 additions & 10 deletions custom_components/bermuda/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
CONF_MAX_RADIUS,
CONF_MAX_VELOCITY,
CONF_REF_POWER,
CONF_RSSI_OFFSET,
CONF_RSSI_OFFSETS,
CONF_SMOOTHING_SAMPLES,
CONF_UPDATE_INTERVAL,
CONFDATA_SCANNERS,
Expand Down Expand Up @@ -237,7 +237,7 @@ def handle_devreg_changes(ev: Event[EventDeviceRegistryUpdatedData]):
self.options[CONF_REF_POWER] = DEFAULT_REF_POWER
self.options[CONF_SMOOTHING_SAMPLES] = DEFAULT_SMOOTHING_SAMPLES
self.options[CONF_UPDATE_INTERVAL] = DEFAULT_UPDATE_INTERVAL
self.options[CONF_RSSI_OFFSET] = {}
self.options[CONF_RSSI_OFFSETS] = {}

if hasattr(entry, "options"):
# Firstly, on some calls (specifically during reload after settings changes)
Expand All @@ -253,7 +253,7 @@ def handle_devreg_changes(ev: Event[EventDeviceRegistryUpdatedData]):
CONF_MAX_VELOCITY,
CONF_REF_POWER,
CONF_SMOOTHING_SAMPLES,
CONF_RSSI_OFFSET,
CONF_RSSI_OFFSETS,
):
self.options[key] = val

Expand All @@ -266,7 +266,7 @@ def handle_devreg_changes(ev: Event[EventDeviceRegistryUpdatedData]):
# a list of known scanners so we can
# restore the sensor states even if we don't have a full set of
# scanner receipts in the discovery data.
self.scanner_list = []
self.scanner_list: list[str] = []
if hasattr(entry, "data"):
for address, saved in entry.data.get(CONFDATA_SCANNERS, {}).items():
scanner = self._get_or_create_device(address)
Expand Down Expand Up @@ -364,20 +364,40 @@ def count_active_devices(self) -> int:
fresh_count += 1
return fresh_count

def count_active_scanners(self) -> int:
def count_active_scanners(self, max_age=10) -> int:
"""Returns count of scanners that have recently sent updates."""
stamp = MONOTONIC_TIME() - 10 # seconds
stamp = MONOTONIC_TIME() - max_age # seconds
fresh_count = 0
for scanner in self.get_active_scanner_summary():
if scanner.get("last_stamp", 0) > stamp:
fresh_count += 1
return fresh_count

def get_active_scanner_summary(self) -> list[dict]:
"""
Returns a list of dicts suitable for seeing which scanners
are configured in the system and how long it has been since
each has returned an advertisement.
"""
stamp = MONOTONIC_TIME()
results = []
for scanner in self.scanner_list:
scannerdev = self.devices[scanner]
last_stamp: float = 0
for device in self.devices.values():
record = device.scanners.get(scanner, None)
if record is not None and record.stamp is not None:
if record.stamp > last_stamp:
last_stamp = record.stamp
if last_stamp > stamp:
fresh_count += 1
return fresh_count
results.append(
{
"name": scannerdev.name,
"address": scanner,
"last_stamp": last_stamp,
"last_stamp_age": stamp - last_stamp,
}
)
return results

def _get_device(self, address: str) -> BermudaDevice | None:
"""Search for a device entry based on mac address."""
Expand Down Expand Up @@ -1058,7 +1078,7 @@ def _refresh_scanners(self, scanners: list[BluetoothScannerDevice] | None = None
for entry in self.hass.config_entries.async_entries(DOMAIN, include_disabled=False, include_ignore=False):
_LOGGER.debug("Loaded entry %s", entry.entry_id)
self.config_entry = entry
self.scanner_list = []
self.scanner_list.clear()
confdata_scanners: dict[str, dict] = {}
for device in self.devices.values():
if device.is_scanner:
Expand Down
Loading

0 comments on commit 5b826d7

Please sign in to comment.