-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdevice_manager.py
125 lines (100 loc) · 3.72 KB
/
device_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import asyncio
class DeviceManager:
def __init__(self, devices, adapter_timeout, scan_timeout):
self._adapter = None
self._devices = {d['address']: Device(**d) for d in devices}
self._adapter_timeout = adapter_timeout
self._scan_timeout = scan_timeout
self._discovering_clients = 0
self._subscriber_queues = set()
async def connect(self, address):
device = self._devices[address]
if device.state != 'disconnected':
return
self._publish_state(device, 'connecting')
if device.discovered.is_set():
await self._adapter.remove_device(device.dbus_proxy)
await self._adapter_timeout.wait_event(device.lost)
self._discovering_clients += 1
if self._discovering_clients == 1:
await self._adapter.set_discovery_filter({})
await self._adapter.start_discovery()
try:
await self._scan_timeout.wait_event(device.discovered)
except asyncio.TimeoutError:
pass
self._discovering_clients -= 1
if self._discovering_clients == 0:
await self._adapter.stop_discovery()
if device.dbus_proxy:
await device.dbus_proxy.pair()
await device.dbus_proxy.trust()
await device.dbus_proxy.connect()
else:
self._publish_state(device, 'disconnected')
async def disconnect(self, address):
device = self._devices[address]
if device.state != 'connected':
return
self._publish_state(device, 'disconnecting')
await device.dbus_proxy.disconnect()
def get_devices(self):
return [d.as_dict() for d in self._devices.values()]
def subscribe(self):
subscriber = Subscriber(self)
self._subscriber_queues.add(subscriber.queue)
subscriber.queue.put_nowait(self.get_devices())
return subscriber
def unsubscribe(self, queue):
self._subscriber_queues.remove(queue)
def add_adapter(self, dbus_proxy):
self._adapter = dbus_proxy
def add_device(self, address, dbus_proxy, connected):
try:
device = self._devices[address]
except KeyError:
return
device.dbus_proxy = dbus_proxy
device.discovered.set()
device.lost.clear()
if connected:
self._publish_state(device, 'connected')
def remove_device(self, address):
try:
device = self._devices[address]
except KeyError:
return
self._publish_state(device, 'disconnected')
device.discovered.clear()
device.lost.set()
device.dbus_proxy = None
def update_device(self, address, connected):
try:
device = self._devices[address]
except KeyError:
return
self._publish_state(device, 'connected' if connected else 'disconnected')
def _publish_state(self, device, state):
device.state = state
devices = self.get_devices()
for s in self._subscriber_queues:
s.put_nowait(devices)
class Device:
def __init__(self, name, address):
self.name = name
self.address = address
self.state = 'disconnected'
self.discovered = asyncio.Event()
self.lost = asyncio.Event()
self.lost.set()
self.dbus_proxy = None
def as_dict(self):
return {'name': self.name, 'address': self.address, 'state': self.state}
class Subscriber:
def __init__(self, device_manager):
self.queue = asyncio.Queue()
self._device_manager = device_manager
def __enter__(self):
return self.queue
def __exit__(self, *exc_info):
self._device_manager.unsubscribe(self.queue)