Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add notification listener to disconnect once the printer is ready #77

Merged
merged 1 commit into from
Sep 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions catprinter/ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@
'0000af30-0000-1000-8000-00805f9b34fb',
]

TX_CHARACTERISTIC_UUID = '0000ae01-0000-1000-8000-00805f9b34fb'
TX_CHARACTERISTIC_UUID = "0000ae01-0000-1000-8000-00805f9b34fb"
RX_CHARACTERISTIC_UUID = "0000ae02-0000-1000-8000-00805f9b34fb"

PRINTER_READY_NOTIFICATION = b"\x51\x78\xae\x01\x01\x00\x00\x00\xff"

SCAN_TIMEOUT_S = 10

# Wait time after sending each chunk of data through BLE.
WAIT_AFTER_EACH_CHUNK_S = 0.02

# This is a hacky solution so we don't terminate the BLE connection to the printer
# while it's still printing. A better solution is to subscribe to the RX characteristic
# and listen for printer events, so we know exactly when the printing is finished.
WAIT_AFTER_DATA_SENT_S = 30


async def scan(name: Optional[str], timeout: int):
autodiscover = (not name)
Expand Down Expand Up @@ -70,6 +68,21 @@ async def get_device_address(device: Optional[str]):
return await scan(device, timeout=SCAN_TIMEOUT_S)


def notification_receiver_factory(event):
def notification_receiver(sender, data):
logger.debug(f"📡 Received notification: {data}")
if data == PRINTER_READY_NOTIFICATION:
event.set()

return notification_receiver


async def wait_for_printer_ready(event):
logger.info("⏳ Done printing. Waiting for printer to be ready...")
await event.wait()
logger.info("✅ Printer is ready, disconnecting...")


async def run_ble(data, device: Optional[str]):
try:
address = await get_device_address(device)
Expand All @@ -81,10 +94,15 @@ async def run_ble(data, device: Optional[str]):
logger.info(
f'✅ Connected: {client.is_connected}; MTU: {client.mtu_size}')
chunk_size = client.mtu_size - 3
logger.info(
f'⏳ Sending {len(data)} bytes of data in chunks of {chunk_size} bytes...')
event = asyncio.Event()

recieve_notification = notification_receiver_factory(event)

await client.start_notify(RX_CHARACTERISTIC_UUID, recieve_notification)

logger.info(f'⏳ Sending {len(data)} bytes of data in chunks of {chunk_size} bytes...')
for i, chunk in enumerate(chunkify(data, chunk_size)):
await client.write_gatt_char(TX_CHARACTERISTIC_UUID, chunk)
await asyncio.sleep(WAIT_AFTER_EACH_CHUNK_S)
logger.info(f'✅ Done. Waiting {WAIT_AFTER_DATA_SENT_S}s before disconnecting...')
await asyncio.sleep(WAIT_AFTER_DATA_SENT_S)

await wait_for_printer_ready(event)
Loading