import asyncio import bleak.exc from bleak import BleakClient, BleakGATTCharacteristic from hexdump import hexdump from icecream import ic address = "C8:05:E1:A1:93:00" services = { "read": [], "notify": [], "write": [], "write-without-response": [], "indicate": [], } async def notification_handler(sender: BleakGATTCharacteristic, data: bytearray): ic(sender.description) hexdump(data) async def read_services(client, services): for service in services: try: data = await client.read_gatt_char(service.service_uuid) ic(f"{services}: {data}") except bleak.exc.BleakError: continue await asyncio.sleep(0.1) async def main(address): async with BleakClient(address) as client: for service in client.services: ic(service.description) for char in service.characteristics: ic(char.service_uuid) ic(char.properties) for prop in char.properties: if prop == "notify" and len(char.properties) > 1: continue else: services[prop].append(char) ic(char.handle) ic("—" * 79) line = "[+] {}".format("=" * 70) ic(line) for service in services["notify"]: ic(f">>> {service}") for service in services["notify"]: try: ic(f"[+] start_notify {service}") await client.start_notify(service.uuid, notification_handler) ic(f"[+] handler registered for {service.uuid}") except bleak.exc.BleakError as exc: ic(f"Error: {exc}") continue try: while True: await asyncio.sleep(0.1) await read_services(client, services["read"]) except KeyboardInterrupt: for service in services["notify"]: try: ic(f"[-] stop_notify {service}") await client.stop_notify(service.service_uuid) except bleak.exc.BleakError: continue asyncio.run(main(address))