diff --git a/adapter.py b/adapter.py index c077b3e..031a85d 100644 --- a/adapter.py +++ b/adapter.py @@ -115,7 +115,7 @@ async def wait_till_adapter_present_then_init(self) -> None: await asyncio.sleep(2) self.alias = DEVICE_NAME - self.bluetooth_devices.remove_devices() + await self.bluetooth_devices.remove_devices() self.bluetooth_devices.add_devices() self.initialising_adapter = False @@ -137,11 +137,11 @@ def interfaces_added(self, obj_name: str, interfaces: Container[str]) -> None: def interfaces_removed(self, obj_name: str, interfaces: Container[str]) -> None: if (obj_name==ADAPTER_OBJECT or obj_name==ROOT_OBJECT): self.adapter = None - self.bluetooth_devices.remove_devices() + asyncio.run_coroutine_threadsafe(self.bluetooth_devices.remove_devices(), loop=self.loop) print("Bluetooth adapter removed. Stopping") asyncio.run_coroutine_threadsafe(self.init(), loop=self.loop) elif INPUT_HOST_INTERFACE in interfaces or INPUT_DEVICE_INTERFACE in interfaces: - self.bluetooth_devices.remove_device(obj_name) + asyncio.run_coroutine_threadsafe(self.bluetooth_devices.remove_device(obj_name), loop=self.loop) self.on_interface_changed() def register_agent(self) -> None: diff --git a/bluetooth_devices.py b/bluetooth_devices.py index 80d8ca5..987f71a 100644 --- a/bluetooth_devices.py +++ b/bluetooth_devices.py @@ -3,6 +3,8 @@ import asyncio import socket import os +from concurrent.futures import Future +from contextlib import suppress from subprocess import DEVNULL, PIPE from typing import Awaitable, Callable, Optional, TYPE_CHECKING @@ -37,6 +39,7 @@ def __init__(self, bus: SystemMessageBus, loop: asyncio.AbstractEventLoop, self.interrupt_socket_path: Optional[str] = interrupt_socket_path self.interrupt_socket: Optional[socket.socket] = None self.sockets_connected = False + self._tasks: set[Future[None]] = set() print("BT Device ",object_path," created") asyncio.run_coroutine_threadsafe(self.reconcile_connected_state(1), loop=self.loop) @@ -47,7 +50,7 @@ async def reconcile_connected_state(self, delay: int) -> None: if self.connected and not self.sockets_connected: await self.connect_sockets() elif not self.connected and self.sockets_connected: - self.disconnect_sockets() + await self.disconnect_sockets() except Exception as exc: print("Possibly dbus error during reconcile_connected_state ",exc) @@ -73,8 +76,8 @@ async def connect_sockets(self) -> None: else: self.device_registry.connected_devices.append(self) print("Connected sockets for ",self.object_path) - asyncio.run_coroutine_threadsafe(self.loop_of_fun(True), loop=self.loop) - asyncio.run_coroutine_threadsafe(self.loop_of_fun(False), loop=self.loop) + self._tasks.add(asyncio.run_coroutine_threadsafe(self.loop_of_fun(True), loop=self.loop)) + self._tasks.add(asyncio.run_coroutine_threadsafe(self.loop_of_fun(False), loop=self.loop)) except Exception as err: print("Error while connecting sockets for ",self.object_path,". Will retry in a sec", err) try: @@ -87,7 +90,13 @@ async def connect_sockets(self) -> None: await asyncio.sleep(1) asyncio.run_coroutine_threadsafe(self.connect_sockets(), loop=self.loop) - def disconnect_sockets(self) -> None: + async def disconnect_sockets(self) -> None: + for t in self._tasks: + t.cancel() + # TODO: Reenable if we manage to turn this into tasks (i.e. not use coroutine_threasfe). + #with suppress(asyncio.CancelledError): + # await t + if self.control_socket is not None: self.control_socket.close() self.control_socket = None @@ -112,7 +121,7 @@ async def loop_of_fun(self, is_ctrl: bool) -> None: print("Cannot read data from socket. ", self.object_path ,"Closing sockets") if self is not None: try: - self.disconnect_sockets() + await self.disconnect_sockets() except: print("Error while disconnecting sockets") print("Arranging reconnect") @@ -147,12 +156,12 @@ def device_connected_state_changed(self, _arg1: object, _arg2: object, _arg3: ob if self.device_registry.on_devices_changed_handler is not None: asyncio.run_coroutine_threadsafe(self.device_registry.on_devices_changed_handler(), loop=self.loop) - def finalise(self) -> None: + async def finalise(self) -> None: self.props.PropertiesChanged.disconnect(self.device_connected_state_changed) self.control_socket_path = None self.interrupt_socket_path = None # Close sockets - self.disconnect_sockets() + await self.disconnect_sockets() print("BT Device ",self.object_path," finalised") @@ -216,13 +225,13 @@ async def is_slave(self, device_address: str) -> bool: stdout, stderr = await proc.communicate() return any("SLAVE" in l and device_address in l for l in stdout.decode().split("\n")) - def remove_devices(self) -> None: + async def remove_devices(self) -> None: print("Removing all BT devices") while len(self.all) >0: - self.remove_device(list(self.all)[0]) + await self.remove_device(list(self.all)[0]) - def remove_device(self, device_object_path: str) -> None: + async def remove_device(self, device_object_path: str) -> None: if device_object_path not in self.all: return # No such device device = self.all[device_object_path] @@ -230,7 +239,7 @@ def remove_device(self, device_object_path: str) -> None: list = self.connected_hosts if device.is_host else self.connected_devices if device in list: list.remove(device) - device.finalise() + await device.finalise() del device def switch_host(self) -> None: @@ -255,7 +264,7 @@ def send_message(self, msg: bytes, send_to_hosts: bool, is_control_channel: bool print("Cannot send data to socket of ",target.object_path,". Closing") if target is not None: try: - target.disconnect_sockets() + asyncio.run_coroutine_threadsafe(target.disconnect_sockets(), loop=self.loop) except: print("Error while trying to disconnect sockets") asyncio.run_coroutine_threadsafe(target.reconcile_connected_state(1), loop=self.loop)