From 2a4d0dcd4c066d74b705a0f5424b84b48f423884 Mon Sep 17 00:00:00 2001 From: rbaron Date: Sat, 21 Sep 2024 18:41:01 +0200 Subject: [PATCH] Add timeout to printer done event --- catprinter/ble.py | 55 ++++++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/catprinter/ble.py b/catprinter/ble.py index c7cab54..e8e6af5 100644 --- a/catprinter/ble.py +++ b/catprinter/ble.py @@ -13,8 +13,8 @@ # should cover both cases. POSSIBLE_SERVICE_UUIDS = [ - '0000ae30-0000-1000-8000-00805f9b34fb', - '0000af30-0000-1000-8000-00805f9b34fb', + "0000ae30-0000-1000-8000-00805f9b34fb", + "0000af30-0000-1000-8000-00805f9b34fb", ] TX_CHARACTERISTIC_UUID = "0000ae01-0000-1000-8000-00805f9b34fb" @@ -27,34 +27,39 @@ # Wait time after sending each chunk of data through BLE. WAIT_AFTER_EACH_CHUNK_S = 0.02 +# Wait for printer done event timeout. +WAIT_FOR_PRINTER_DONE_TIMEOUT = 30 + async def scan(name: Optional[str], timeout: int): - autodiscover = (not name) + autodiscover = not name if autodiscover: - logger.info('⏳ Trying to auto-discover a printer...') + logger.info("⏳ Trying to auto-discover a printer...") else: - logger.info(f'⏳ Looking for a BLE device named {name}...') + logger.info(f"⏳ Looking for a BLE device named {name}...") def filter_fn(device: BLEDevice, adv_data: AdvertisementData): if autodiscover: - return any(uuid in adv_data.service_uuids - for uuid in POSSIBLE_SERVICE_UUIDS) + return any( + uuid in adv_data.service_uuids for uuid in POSSIBLE_SERVICE_UUIDS + ) else: return device.name == name device = await BleakScanner.find_device_by_filter( - filter_fn, timeout=timeout, + filter_fn, + timeout=timeout, ) if device is None: - raise RuntimeError('Unable to find printer, make sure it is turned on and in range') - logger.info(f'✅ Got it. Address: {device}') + raise RuntimeError( + "Unable to find printer, make sure it is turned on and in range" + ) + logger.info(f"✅ Got it. Address: {device}") return device def chunkify(data, chunk_size): - return ( - data[i: i + chunk_size] for i in range(0, len(data), chunk_size) - ) + return (data[i : i + chunk_size] for i in range(0, len(data), chunk_size)) async def get_device_address(device: Optional[str]): @@ -62,7 +67,7 @@ async def get_device_address(device: Optional[str]): if device: with contextlib.suppress(ValueError): return str(uuid.UUID(device)) - if device.count(':') == 5 and device.replace(':', '').isalnum(): + if device.count(":") == 5 and device.replace(":", "").isalnum(): return device return await scan(device, timeout=SCAN_TIMEOUT_S) @@ -87,22 +92,28 @@ async def run_ble(data, device: Optional[str]): try: address = await get_device_address(device) except RuntimeError as e: - logger.error(f'🛑 {e}') + logger.error(f"🛑 {e}") return - logger.info(f'⏳ Connecting to {address}...') + logger.info(f"⏳ Connecting to {address}...") async with BleakClient(address) as client: - logger.info( - f'✅ Connected: {client.is_connected}; MTU: {client.mtu_size}') + logger.info(f"✅ Connected: {client.is_connected}; MTU: {client.mtu_size}") chunk_size = client.mtu_size - 3 event = asyncio.Event() - recieve_notification = notification_receiver_factory(event) + receive_notification = notification_receiver_factory(event) - await client.start_notify(RX_CHARACTERISTIC_UUID, recieve_notification) + await client.start_notify(RX_CHARACTERISTIC_UUID, receive_notification) - logger.info(f'⏳ Sending {len(data)} bytes of data in chunks of {chunk_size} bytes...') + logger.info( + f"⏳ Sending {len(data)} bytes of data in chunks of {chunk_size} bytes..." + ) for i, chunk in enumerate(chunkify(data, chunk_size)): await client.write_gatt_char(TX_CHARACTERISTIC_UUID, chunk) await asyncio.sleep(WAIT_AFTER_EACH_CHUNK_S) - await wait_for_printer_ready(event) + try: + await asyncio.wait_for( + wait_for_printer_ready(event), timeout=WAIT_FOR_PRINTER_DONE_TIMEOUT + ) + except asyncio.TimeoutError: + logger.error("🛑 Timed out while waiting for printer done event. Exiting.")