diff --git a/zigpy_cli/radio.py b/zigpy_cli/radio.py index 7926480..81fce2a 100644 --- a/zigpy_cli/radio.py +++ b/zigpy_cli/radio.py @@ -7,6 +7,8 @@ import itertools import json import logging +import random +import time import click import zigpy.state @@ -224,6 +226,124 @@ async def energy_scan(app, num_scans): print() +@radio.command() +@click.pass_obj +@click.option("-e", "--num-energy-scans", type=int, default=10 * 2**8) +@click.option("-n", "--num-network-scans", type=int, default=5) +@click.option("-r", "--randomize", type=bool, default=True) +@click.argument("output", type=click.File("w"), default="-") +@click_coroutine +async def advanced_energy_scan( + app, + output, + num_energy_scans, + num_network_scans, + randomize, +): + import bellows.types + from bellows.zigbee.application import ( + ControllerApplication as EzspControllerApplication, + ) + from bellows.zigbee.util import map_energy_to_rssi as ezsp_map_energy_to_rssi + + await app.startup() + LOGGER.info("Running scan...") + + channels = zigpy.types.Channels.ALL_CHANNELS + scan_counts = {channel: num_energy_scans for channel in channels} + + scan_data = { + "current_channel": app.state.network_info.channel, + "energy_scan": [], + "network_scan": [], + } + + if randomize: + + def iter_channels(): + while scan_counts: + channel = random.choice(tuple(scan_counts)) + scan_counts[channel] -= 1 + + yield channel + + if scan_counts[channel] <= 0: + del scan_counts[channel] + + else: + + def iter_channels(): + for channel, count in scan_counts.items(): + for i in range(count): + yield channel + + with click.progressbar( + iterable=iter_channels(), + length=len(list(channels)) * num_energy_scans, + item_show_func=lambda item: None if item is None else f"Channel {item}", + ) as bar: + for channel in bar: + results = await app.energy_scan( + channels=zigpy.types.Channels.from_channel_list([channel]), + duration_exp=0, + count=1, + ) + + rssi = None + + if isinstance(app, EzspControllerApplication): + rssi = ezsp_map_energy_to_rssi(results[channel]) + + scan_data["energy_scan"].append( + { + "timestamp": time.time(), + "channel": channel, + "energy": results[channel], + "rssi": rssi, + } + ) + + if not isinstance(app, EzspControllerApplication): + json.dump(scan_data, output, separators=(",", ":")) + return + + for channel in channels: + networks = set() + + for attempt in range(num_network_scans): + print( + "Scanning for networks on channel" + f" {channel} ({attempt + 1} / {num_network_scans})" + ) + networks_scan = await app._ezsp.startScan( + scanType=bellows.types.EzspNetworkScanType.ACTIVE_SCAN, + channelMask=zigpy.types.Channels.from_channel_list([channel]), + duration=6, + ) + + for network, lqi, rssi in networks_scan: + if network.replace(allowingJoin=None).freeze() in networks: + continue + + networks.add(network.replace(allowingJoin=None).freeze()) + + print(f"Found network {network}: LQI={lqi}, RSSI={rssi}") + scan_data["network_scan"].append( + { + "channel": channel, + "lqi": lqi, + "rssi": rssi, + "allowing_join": network.allowingJoin, + "extended_pan_id": str(network.extendedPanId), + "nwk_update_id": network.nwkUpdateId, + "pan_id": network.panId, + "stack_profile": network.stackProfile, + } + ) + + json.dump(scan_data, output, separators=(",", ":")) + + @radio.command() @click.pass_obj @click.option("-c", "--channel", type=int)