Skip to content

aioble: Add pairing and bonding multitests #1021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions micropython/bluetooth/aioble/multitests/ble_bond.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Test BLE GAP pairing with bonding (persistent pairing) using aioble

import sys

# ruff: noqa: E402
sys.path.append("")

from micropython import const
import machine
import time
import os

import asyncio
import aioble
import aioble.security
import bluetooth

TIMEOUT_MS = 5000

SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")

_FLAG_READ = const(0x0002)
_FLAG_READ_ENCRYPTED = const(0x0200)


# For aioble, we need to directly use the low-level bluetooth API for encrypted characteristics
class EncryptedCharacteristic(aioble.Characteristic):
def __init__(self, service, uuid, **kwargs):
super().__init__(service, uuid, read=True, **kwargs)
# Override flags to add encryption requirement
self.flags |= _FLAG_READ_ENCRYPTED


# Acting in peripheral role.
async def instance0_task():
# Clean up any existing secrets from previous tests
try:
os.remove("ble_secrets.json")
except:
pass

# Load secrets (will be empty initially but enables bond storage)
aioble.security.load_secrets()

service = aioble.Service(SERVICE_UUID)
characteristic = EncryptedCharacteristic(service, CHAR_UUID)
aioble.register_services(service)

multitest.globals(BDADDR=aioble.config("mac"))
multitest.next()

# Write initial characteristic value.
characteristic.write("bonded_data")

# Wait for central to connect to us.
print("advertise")
connection = await aioble.advertise(
20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS
)
print("connected")

# Wait for pairing to complete
print("wait_for_bonding")
start_time = time.ticks_ms()
while not connection.encrypted and time.ticks_diff(time.ticks_ms(), start_time) < TIMEOUT_MS:
await asyncio.sleep_ms(100)

# Give additional time for bonding to complete after encryption
await asyncio.sleep_ms(500)

if connection.encrypted:
print(
"bonded encrypted=1 authenticated={} bonded={}".format(
1 if connection.authenticated else 0, 1 if connection.bonded else 0
)
)
else:
print("bonding_timeout")

# Wait for the central to disconnect.
await connection.disconnected(timeout_ms=TIMEOUT_MS)
print("disconnected")


def instance0():
try:
asyncio.run(instance0_task())
finally:
aioble.stop()


# Acting in central role.
async def instance1_task():
multitest.next()

# Clean up any existing secrets from previous tests
try:
os.remove("ble_secrets.json")
except:
pass

# Load secrets (will be empty initially but enables bond storage)
aioble.security.load_secrets()

# Connect to peripheral.
print("connect")
device = aioble.Device(*BDADDR)
connection = await device.connect(timeout_ms=TIMEOUT_MS)

# Discover characteristics (before pairing).
service = await connection.service(SERVICE_UUID)
print("service", service.uuid)
characteristic = await service.characteristic(CHAR_UUID)
print("characteristic", characteristic.uuid)

# Pair with bonding enabled.
print("bond")
await connection.pair(
bond=True, # Enable bonding
le_secure=True,
mitm=False,
timeout_ms=TIMEOUT_MS,
)

# Give additional time for bonding to complete after encryption
await asyncio.sleep_ms(500)

print(
"bonded encrypted={} authenticated={} bonded={}".format(
1 if connection.encrypted else 0,
1 if connection.authenticated else 0,
1 if connection.bonded else 0,
)
)

# Read the peripheral's characteristic, should be encrypted.
print("read_encrypted")
data = await characteristic.read(timeout_ms=TIMEOUT_MS)
print("read", data)

# Check if secrets were saved
try:
os.stat("ble_secrets.json")
print("secrets_exist", "yes")
except:
print("secrets_exist", "no")

# Disconnect from peripheral.
print("disconnect")
await connection.disconnect(timeout_ms=TIMEOUT_MS)
print("disconnected")


def instance1():
try:
asyncio.run(instance1_task())
finally:
aioble.stop()
17 changes: 17 additions & 0 deletions micropython/bluetooth/aioble/multitests/ble_bond.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
--- instance0 ---
advertise
connected
wait_for_bonding
bonded encrypted=1 authenticated=0 bonded=1
disconnected
--- instance1 ---
connect
service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
characteristic UUID('00000000-1111-2222-3333-444444444444')
bond
bonded encrypted=1 authenticated=0 bonded=1
read_encrypted
read b'bonded_data'
secrets_exist yes
disconnect
disconnected
148 changes: 148 additions & 0 deletions micropython/bluetooth/aioble/multitests/ble_pair.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Test BLE GAP pairing and bonding with aioble

import sys

# ruff: noqa: E402
sys.path.append("")

from micropython import const
import machine
import time

import asyncio
import aioble
import bluetooth

TIMEOUT_MS = 5000

SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")

_FLAG_READ = const(0x0002)
_FLAG_READ_ENCRYPTED = const(0x0200)


# For aioble, we need to directly use the low-level bluetooth API for encrypted characteristics
class EncryptedCharacteristic(aioble.Characteristic):
def __init__(self, service, uuid, **kwargs):
super().__init__(service, uuid, read=True, **kwargs)
# Override flags to add encryption requirement
self.flags |= _FLAG_READ_ENCRYPTED


# Acting in peripheral role.
async def instance0_task():
# Clear any existing bond state
import os

try:
os.remove("ble_secrets.json")
except:
pass

service = aioble.Service(SERVICE_UUID)
characteristic = EncryptedCharacteristic(service, CHAR_UUID)
aioble.register_services(service)

multitest.globals(BDADDR=aioble.config("mac"))
multitest.next()

# Write initial characteristic value.
characteristic.write("encrypted_data")

# Wait for central to connect to us.
print("advertise")
connection = await aioble.advertise(
20_000, adv_data=b"\x02\x01\x06\x04\xffMPY", timeout_ms=TIMEOUT_MS
)
print("connected")

# Wait for pairing to complete
print("wait_for_pairing")
start_time = time.ticks_ms()
while not connection.encrypted and time.ticks_diff(time.ticks_ms(), start_time) < TIMEOUT_MS:
await asyncio.sleep_ms(100)

# Give a small delay for bonding state to stabilize
await asyncio.sleep_ms(200)

if connection.encrypted:
print(
"paired encrypted=1 authenticated={} bonded={}".format(
1 if connection.authenticated else 0, 1 if connection.bonded else 0
)
)
else:
print("pairing_timeout")

# Wait for the central to disconnect.
await connection.disconnected(timeout_ms=TIMEOUT_MS)
print("disconnected")


def instance0():
try:
asyncio.run(instance0_task())
finally:
aioble.stop()


# Acting in central role.
async def instance1_task():
multitest.next()

# Clear any existing bond state
import os

try:
os.remove("ble_secrets.json")
except:
pass

# Connect to peripheral.
print("connect")
device = aioble.Device(*BDADDR)
connection = await device.connect(timeout_ms=TIMEOUT_MS)

# Discover characteristics (before pairing).
service = await connection.service(SERVICE_UUID)
print("service", service.uuid)
characteristic = await service.characteristic(CHAR_UUID)
print("characteristic", characteristic.uuid)

# Pair with the peripheral.
print("pair")
await connection.pair(
bond=False, # Don't bond for this test
le_secure=True,
mitm=False,
timeout_ms=TIMEOUT_MS,
)

# Give a small delay for bonding state to stabilize
await asyncio.sleep_ms(200)

print(
"paired encrypted={} authenticated={} bonded={}".format(
1 if connection.encrypted else 0,
1 if connection.authenticated else 0,
1 if connection.bonded else 0,
)
)

# Read the peripheral's characteristic, should be encrypted.
print("read_encrypted")
data = await characteristic.read(timeout_ms=TIMEOUT_MS)
print("read", data)

# Disconnect from peripheral.
print("disconnect")
await connection.disconnect(timeout_ms=TIMEOUT_MS)
print("disconnected")


def instance1():
try:
asyncio.run(instance1_task())
finally:
aioble.stop()
16 changes: 16 additions & 0 deletions micropython/bluetooth/aioble/multitests/ble_pair.py.exp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--- instance0 ---
advertise
connected
wait_for_pairing
paired encrypted=1 authenticated=0 bonded=0
disconnected
--- instance1 ---
connect
service UUID('a5a5a5a5-ffff-9999-1111-5a5a5a5a5a5a')
characteristic UUID('00000000-1111-2222-3333-444444444444')
pair
paired encrypted=1 authenticated=0 bonded=0
read_encrypted
read b'encrypted_data'
disconnect
disconnected
Loading