-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathoverdrive.py
276 lines (235 loc) · 9.53 KB
/
overdrive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import struct
import threading
import queue
import logging
import bluepy.btle as btle
class Overdrive:
def __init__(self, addr):
"""Initiate an Anki Overdrive connection object,
and call connect() function.
Parameters:
addr -- Bluetooth MAC address for desired Anki Overdrive car.
"""
self.addr = addr
self._peripheral = btle.Peripheral()
self._readChar = None
self._writeChar = None
self._connected = False
self._reconnect = False
self._delegate = OverdriveDelegate(self)
self._writeQueue = queue.Queue()
self._btleSubThread = None
self.speed = 0
self.location = 0
self.piece = 0
self._locationChangeCallbackFunc = None
self._pongCallbackFunc = None
self._transitionCallbackFunc = None
while True:
try:
self.connect()
break
except btle.BTLEException as e:
logging.getLogger("anki.overdrive").error(e.message)
def __del__(self):
"""Deconstructor for an Overdrive object"""
self.disconnect()
def connect(self):
"""Initiate a connection to the Overdrive."""
if self._btleSubThread is not None and threading.current_thread().ident != self._btleSubThread.ident:
return # not allow
self._peripheral.connect(self.addr, btle.ADDR_TYPE_RANDOM)
self._readChar = self._peripheral.getCharacteristics(1, 0xFFFF, "be15bee06186407e83810bd89c4d8df4")[0]
self._writeChar = self._peripheral.getCharacteristics(1, 0xFFFF, "be15bee16186407e83810bd89c4d8df4")[0]
self._delegate.setHandle(self._readChar.getHandle())
self._peripheral.setDelegate(self._delegate)
self.turnOnSdkMode()
self.enableNotify()
self._connected = True
self._reconnect = False
if self._btleSubThread is None:
self._transferExecution()
def _transferExecution(self):
"""Fork a thread for handling BTLE notification, for internal use only."""
self._btleSubThread = threading.Thread(target=self._executor)
self._btleSubThread.start()
def disconnect(self):
"""Disconnect from the Overdrive."""
if self._connected and (self._btleSubThread is None or not self._btleSubThread.is_alive()):
self._disconnect()
self._connected = False
def _disconnect(self):
"""Internal function. Disconnect from the Overdrive."""
try:
self._writeChar.write(b"\x01\x0D")
self._peripheral.disconnect()
except btle.BTLEException as e:
logging.getLogger("anki.overdrive").error(e.message)
def changeSpeed(self, speed, accel):
"""Change speed for Overdrive.
Parameters:
speed -- Desired speed. (from 0 - 1000)
accel -- Desired acceleration. (from 0 - 1000)
"""
command = struct.pack("<BHHB", 0x24, speed, accel, 0x01)
self.sendCommand(command)
def changeLaneRight(self, speed, accel):
"""Switch to adjacent right lane.
Parameters:
speed -- Desired speed. (from 0 - 1000)
accel -- Desired acceleration. (from 0 - 1000)
"""
self.changeLane(speed, accel, 44.5)
def changeLaneLeft(self, speed, accel):
"""Switch to adjacent left lane.
Parameters:
speed -- Desired speed. (from 0 - 1000)
accel -- Desired acceleration. (from 0 - 1000)
"""
self.changeLane(speed, accel, -44.5)
def changeLane(self, speed, accel, offset):
"""Change lane.
Parameters:
speed -- Desired speed. (from 0 - 1000)
accel -- Desired acceleration. (from 0 - 1000)
offset -- Offset from current lane. (negative for left, positive for right)
"""
self.setLane(0.0)
command = struct.pack("<BHHf", 0x25, speed, accel, offset)
self.sendCommand(command)
def setLane(self, offset):
"""Set internal lane offset (unused).
Parameters:
offset -- Desired offset.
"""
command = struct.pack("<Bf", 0x2c, offset)
self.sendCommand(command)
def turnOnSdkMode(self):
"""Turn on SDK mode for Overdrive."""
self.sendCommand(b"\x90\x01\x01")
def enableNotify(self):
"""Repeatly enable notification, until success."""
while True:
self._delegate.notificationsRecvd = 0
self._peripheral.writeCharacteristic(self._readChar.valHandle + 1, b"\x01\x00")
self.ping()
self._peripheral.waitForNotifications(3.0)
if self.getNotificationsReceived() > 0:
break
logging.getLogger("anki.overdrive").error("Set notify failed")
def ping(self):
"""Ping command."""
self.sendCommand(b"\x16")
def _executor(self):
"""Notification thread, for internal use only."""
data = None
while self._connected:
if self._reconnect:
while True:
try:
self.connect()
self._reconnect = False
if data is not None:
self._writeChar.write(data)
break
except btle.BTLEException as e:
logging.getLogger("anki.overdrive").error(e.message)
self._reconnect = True
try:
data = self._writeQueue.get_nowait()
self._writeChar.write(data)
data = None
except queue.Empty:
try:
self._peripheral.waitForNotifications(0.001)
except btle.BTLEException as e:
logging.getLogger("anki.overdrive").error(e.message)
self._reconnect = True
except btle.BTLEException as e:
logging.getLogger("anki.overdrive").error(e.message)
self._reconnect = True
self._disconnect()
self._btleSubThread = None
def getNotificationsReceived(self):
"""Get notifications received count."""
return self._delegate.notificationsRecvd
def sendCommand(self, command):
"""Send raw command to Overdrive
Parameters:
command -- Raw bytes command, without length.
"""
finalCommand = struct.pack("B", len(command)) + command
if self._writeChar is None:
self._reconnect = True
self._writeQueue.put(finalCommand)
def setLocationChangeCallback(self, func):
"""Set location change callback.
Parameters:
func -- Function for callback. (see _locationChangeCallback() for details)
"""
self._locationChangeCallbackFunc = func
def _locationChangeCallback(self, location, piece, speed, clockwise):
"""Location change callback wrapper.
Parameters:
addr -- MAC address of car
location -- Received location ID on piece.
piece -- Received piece ID.
speed -- Measured speed.
clockwise -- Clockwise flag.
"""
if self._locationChangeCallbackFunc is not None:
self._locationChangeCallbackFunc(self.addr, location, piece, speed, clockwise)
def setPongCallback(self, func):
"""Set pong callback.
Parameters:
func -- Function for callback. (see _pongCallback() for details)
"""
self._pongCallbackFunc = func
def _pongCallback(self):
"""Pong callback wrapper.
Parameters:
addr -- MAC address of car
"""
if self._pongCallbackFunc is not None:
self._pongCallbackFunc(self.addr)
def setTransitionCallback(self, func):
"""Set piece transition callback.
Parameters:
func -- Function for callback. (see _transitionCallback() for details)
"""
self._transitionCallbackFunc = func
def _transitionCallback(self):
"""Piece transition callback wrapper.
Parameters:
addr -- MAC address of car
"""
if self._transitionCallbackFunc is not None:
self._transitionCallbackFunc(self.addr)
class OverdriveDelegate(btle.DefaultDelegate):
"""Notification delegate object for Bluepy, for internal use only."""
def __init__(self, overdrive):
self.handle = None
self.notificationsRecvd = 0
self.overdrive = overdrive
btle.DefaultDelegate.__init__(self)
def handleNotification(self, handle, data):
if self.handle == handle:
self.notificationsRecvd += 1
(commandId,) = struct.unpack_from("B", data, 1)
if commandId == 0x27:
# Location position
location, piece, offset, speed, clockwiseVal = struct.unpack_from("<BBfHB", data, 2)
clockwise = False
if clockwiseVal == 0x47:
clockwise = True
threading.Thread(target=self.overdrive._locationChangeCallback, args=(location, piece, speed, clockwise)).start()
if commandId == 0x29:
# Transition notification
piece, piecePrev, offset, direction = struct.unpack_from("<BBfB", data, 2)
threading.Thread(target=self.overdrive._transitionCallback).start()
elif commandId == 0x17:
# Pong
threading.Thread(target=self.overdrive._pongCallback).start()
def setHandle(self, handle):
self.handle = handle
self.notificationsRecvd = 0