-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuzz.py
288 lines (243 loc) · 8.92 KB
/
buzz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
"""Module responsable for controlling Buzz Controllers"""
import time
import os
import argparse
import tkinter as tk
from typing import List
from abc import abstractmethod
from threading import Thread
from easyhid import Enumeration
import requests
from dotenv import load_dotenv
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
class BuzzBase:
"""Class for controlling Buzz Controllers"""
def __init__(self, host: str, port: int):
self.server_url = f"http://{host}:{port}"
self.light_array = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
self.button_state = [
{
"red": False,
"yellow": False,
"green": False,
"orange": False,
"blue": False,
},
{
"red": False,
"yellow": False,
"green": False,
"orange": False,
"blue": False,
},
{
"red": False,
"yellow": False,
"green": False,
"orange": False,
"blue": False,
},
{
"red": False,
"yellow": False,
"green": False,
"orange": False,
"blue": False,
},
]
def _reset_state(self):
"""
Clears the state of the buttons
"""
for i in range(4):
for j in ["red", "yellow", "green", "orange", "blue"]:
self.button_state[i][j] = False
def _handle_buzz_no_confirm(self):
"""
Handles the data from the buzzes
"""
for i in range(0, 4):
for color, value in self.button_state[i].items():
if value:
try:
requests.post(
f"{self.server_url}/buzz",
json={"controller": i, "color": color},
timeout=0.00000000000001,
)
except requests.exceptions.ReadTimeout:
pass
break
def _handle_buzz(self):
"""
Handles the data from the buzzes
"""
for i in range(0, 4):
for color, value in self.button_state[i].items():
if value:
requests.post(
f"{self.server_url}/buzz",
json={"controller": i, "color": color},
timeout=10,
)
break
@abstractmethod
def start(self):
"""
Thread that reads the buzzes and calls _handle_buzz
"""
@abstractmethod
def update_controllers_lights(self):
"""turn the lights of the given controllers off"""
class Buzz(BuzzBase):
"""Class for controlling Buzz Controllers"""
def __init__(self, host: str, port: int):
super().__init__(host, port)
en = Enumeration()
devices = en.find(manufacturer="Namtai")
self.dev = devices[0]
self.dev.open()
self.dev.set_nonblocking(True)
def update_controllers_lights(self):
"""Update the lights of the controllers"""
self.dev.write(self.light_array)
def __parse_buzz_data(self, data):
"""
Parses the data from the buzz
"""
self.button_state[0]["red"] = (data[2] & 0x01) != 0 # red
self.button_state[0]["yellow"] = (data[2] & 0x02) != 0 # yellow
self.button_state[0]["green"] = (data[2] & 0x04) != 0 # green
self.button_state[0]["orange"] = (data[2] & 0x08) != 0 # orange
self.button_state[0]["blue"] = (data[2] & 0x10) != 0 # blue
self.button_state[1]["red"] = (data[2] & 0x20) != 0 # red
self.button_state[1]["yellow"] = (data[2] & 0x40) != 0 # yellow
self.button_state[1]["green"] = (data[2] & 0x80) != 0 # green
self.button_state[1]["orange"] = (data[3] & 0x01) != 0 # orange
self.button_state[1]["blue"] = (data[3] & 0x02) != 0 # blue
self.button_state[2]["red"] = (data[3] & 0x04) != 0 # red
self.button_state[2]["yellow"] = (data[3] & 0x08) != 0 # yellow
self.button_state[2]["green"] = (data[3] & 0x10) != 0 # green
self.button_state[2]["orange"] = (data[3] & 0x20) != 0 # orange
self.button_state[2]["blue"] = (data[3] & 0x40) != 0 # blue
self.button_state[3]["red"] = (data[3] & 0x80) != 0 # red
self.button_state[3]["yellow"] = (data[4] & 0x01) != 0 # yellow
self.button_state[3]["green"] = (data[4] & 0x02) != 0 # green
self.button_state[3]["orange"] = (data[4] & 0x04) != 0 # orange
self.button_state[3]["blue"] = (data[4] & 0x08) != 0 # blue
def start(self):
"""
Thread that reads the buzzes
"""
while True:
data = self.dev.read(5)
if data:
self._reset_state()
self.__parse_buzz_data(data)
self._handle_buzz()
time.sleep(0.001)
class VirtualBuzz(BuzzBase):
"""Class for Buzz controlling Virtual Buzz Controllers"""
def __init__(self, host: str, port: int):
super().__init__(host, port)
self.red_buttons = []
def __update_button_state(self, controller: int, color: str):
"""
Trigger when a button is pressed.
Args:
controller (int): The controller pressed.
color (str): The color of the button pressed.
"""
super()._reset_state()
self.button_state[controller][color] = True
super()._handle_buzz_no_confirm()
def __create_controller_frame(self, root: tk.Tk, controller_index: int):
"""Creates a virtual buzz controller.
Args:
root (tk.Tk): Canvas to add the controller to.
controller_index (int): Index of the controller.
"""
frame = tk.Frame(root, padx=10, pady=10)
frame.pack(side=tk.LEFT, expand=True, fill=tk.BOTH)
b = tk.Button(
frame,
text="Red",
bg="#8B0000",
command=lambda: self.__update_button_state(controller_index, "red"),
)
b.pack(pady=5)
# Create the red button
self.red_buttons.append(b)
# Create the remaining color buttons
colors = ["yellow", "green", "orange", "blue"]
for color in colors:
color_button = tk.Button(
frame,
text=color.capitalize(),
bg=color,
command=lambda c=color: self.__update_button_state(controller_index, c),
)
color_button.pack(pady=5)
def update_controllers_lights(self):
"""
Updates the colors of the red circle buttons based on the light array.
"""
for i in range(4):
new_color = "#FF0000" if self.light_array[i + 1] == 0xFF else "#8B0000"
self.red_buttons[i].config(bg=new_color)
def start(self):
"""
Thread that creates the virtual buzz controllers.
"""
root = tk.Tk()
root.title("Virtual Buzz Controllers")
for i in range(4):
self.__create_controller_frame(root, i)
root.mainloop()
if __name__ == "__main__":
load_dotenv("backend/.env", override=True)
SERVER_PORT = int(os.getenv("SERVER_PORT", "8000"))
CONTROLLER_PORT = int(os.getenv("CONTROLLER_PORT", "8001"))
parser = argparse.ArgumentParser(description="Jeopardy Backend Controller")
parser.add_argument(
"--virtual", action="store_true", help="Enable virtual buzz buttons."
)
args = parser.parse_args()
buzzController = (
VirtualBuzz("localhost", SERVER_PORT)
if args.virtual
else Buzz("localhost", SERVER_PORT)
)
t = Thread(target=buzzController.start, args=())
t.start()
app = FastAPI()
class LightRequest(BaseModel):
"""Request model"""
controllers: List[int]
class Reponse(BaseModel):
"""Response model"""
status: str
@app.post("/on", response_model=Reponse)
def post_on(body: LightRequest):
"""Turn the lights of the given controllers on
Args:
light_request (LightRequest): controllers
"""
for i in body.controllers:
buzzController.light_array[i + 1] = 0xFF
buzzController.update_controllers_lights()
return {"status": "success"}
@app.post("/off", response_model=Reponse)
def post_off(body: LightRequest):
"""Turn the lights of the given controllers off
Args:
light_request (LightRequest): controllers
"""
for i in body.controllers:
buzzController.light_array[i + 1] = 0x00
buzzController.update_controllers_lights()
return {"status": "success"}
uvicorn.run(app, host="0.0.0.0", port=CONTROLLER_PORT)
t.join()