Skip to content

Commit

Permalink
Add timeout to printer done event
Browse files Browse the repository at this point in the history
  • Loading branch information
rbaron committed Sep 21, 2024
1 parent ea92b08 commit 2a4d0dc
Showing 1 changed file with 33 additions and 22 deletions.
55 changes: 33 additions & 22 deletions catprinter/ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# should cover both cases.

POSSIBLE_SERVICE_UUIDS = [
'0000ae30-0000-1000-8000-00805f9b34fb',
'0000af30-0000-1000-8000-00805f9b34fb',
"0000ae30-0000-1000-8000-00805f9b34fb",
"0000af30-0000-1000-8000-00805f9b34fb",
]

TX_CHARACTERISTIC_UUID = "0000ae01-0000-1000-8000-00805f9b34fb"
Expand All @@ -27,42 +27,47 @@
# Wait time after sending each chunk of data through BLE.
WAIT_AFTER_EACH_CHUNK_S = 0.02

# Wait for printer done event timeout.
WAIT_FOR_PRINTER_DONE_TIMEOUT = 30


async def scan(name: Optional[str], timeout: int):
autodiscover = (not name)
autodiscover = not name
if autodiscover:
logger.info('⏳ Trying to auto-discover a printer...')
logger.info("⏳ Trying to auto-discover a printer...")
else:
logger.info(f'⏳ Looking for a BLE device named {name}...')
logger.info(f"⏳ Looking for a BLE device named {name}...")

def filter_fn(device: BLEDevice, adv_data: AdvertisementData):
if autodiscover:
return any(uuid in adv_data.service_uuids
for uuid in POSSIBLE_SERVICE_UUIDS)
return any(
uuid in adv_data.service_uuids for uuid in POSSIBLE_SERVICE_UUIDS
)
else:
return device.name == name

device = await BleakScanner.find_device_by_filter(
filter_fn, timeout=timeout,
filter_fn,
timeout=timeout,
)
if device is None:
raise RuntimeError('Unable to find printer, make sure it is turned on and in range')
logger.info(f'βœ… Got it. Address: {device}')
raise RuntimeError(
"Unable to find printer, make sure it is turned on and in range"
)
logger.info(f"βœ… Got it. Address: {device}")
return device


def chunkify(data, chunk_size):
return (
data[i: i + chunk_size] for i in range(0, len(data), chunk_size)
)
return (data[i : i + chunk_size] for i in range(0, len(data), chunk_size))


async def get_device_address(device: Optional[str]):
# See if we were passed a string that smells like an UUID or MAC address.
if device:
with contextlib.suppress(ValueError):
return str(uuid.UUID(device))
if device.count(':') == 5 and device.replace(':', '').isalnum():
if device.count(":") == 5 and device.replace(":", "").isalnum():
return device

return await scan(device, timeout=SCAN_TIMEOUT_S)
Expand All @@ -87,22 +92,28 @@ async def run_ble(data, device: Optional[str]):
try:
address = await get_device_address(device)
except RuntimeError as e:
logger.error(f'πŸ›‘ {e}')
logger.error(f"πŸ›‘ {e}")
return
logger.info(f'⏳ Connecting to {address}...')
logger.info(f"⏳ Connecting to {address}...")
async with BleakClient(address) as client:
logger.info(
f'βœ… Connected: {client.is_connected}; MTU: {client.mtu_size}')
logger.info(f"βœ… Connected: {client.is_connected}; MTU: {client.mtu_size}")
chunk_size = client.mtu_size - 3
event = asyncio.Event()

recieve_notification = notification_receiver_factory(event)
receive_notification = notification_receiver_factory(event)

await client.start_notify(RX_CHARACTERISTIC_UUID, recieve_notification)
await client.start_notify(RX_CHARACTERISTIC_UUID, receive_notification)

logger.info(f'⏳ Sending {len(data)} bytes of data in chunks of {chunk_size} bytes...')
logger.info(
f"⏳ Sending {len(data)} bytes of data in chunks of {chunk_size} bytes..."
)
for i, chunk in enumerate(chunkify(data, chunk_size)):
await client.write_gatt_char(TX_CHARACTERISTIC_UUID, chunk)
await asyncio.sleep(WAIT_AFTER_EACH_CHUNK_S)

await wait_for_printer_ready(event)
try:
await asyncio.wait_for(
wait_for_printer_ready(event), timeout=WAIT_FOR_PRINTER_DONE_TIMEOUT
)
except asyncio.TimeoutError:
logger.error("πŸ›‘ Timed out while waiting for printer done event. Exiting.")

0 comments on commit 2a4d0dc

Please sign in to comment.