103 lines
3.6 KiB
Python
103 lines
3.6 KiB
Python
import asyncio
|
|
from binascii import hexlify
|
|
|
|
import bleak.exc
|
|
from bleak import BleakClient, BleakGATTCharacteristic
|
|
from bleak.backends.bluezdbus.characteristic import BleakGATTCharacteristicBlueZDBus
|
|
from pydispatch import Dispatcher
|
|
from rich.live import Live
|
|
from rich.table import Table
|
|
|
|
from ...events.event import DeviceDiscovered
|
|
from ...tools import context
|
|
from ...tools.context import BlattedEnvironment
|
|
|
|
environment = context.get_environment()
|
|
console = context.get_console()
|
|
|
|
table = Table(width=100)
|
|
live = Live(table, console=console)
|
|
table.add_column("Service", max_width=18)
|
|
table.add_column("UUID", max_width=18)
|
|
table.add_column("Size", max_width=6)
|
|
table.add_column("data", width=58)
|
|
|
|
|
|
class Monitor(Dispatcher):
|
|
_events_ = ["acquire_notify", "update_service"]
|
|
|
|
def __init__(self, ble_address: str, service_uuids: list[str]) -> None:
|
|
self.environment = context.get_environment()
|
|
self.ble_address = ble_address
|
|
self.service_uuids = service_uuids
|
|
self.acquired_notify: list[str] = []
|
|
self.bind(acquire_notify=self.on_acquire_notify)
|
|
self.bind(update_service=self.on_update_service)
|
|
super().__init__()
|
|
|
|
async def notify_handler(
|
|
self, sender: BleakGATTCharacteristic, data: bytearray
|
|
) -> None:
|
|
data_received = (sender, data)
|
|
if environment == BlattedEnvironment.CLI:
|
|
self.emit("update_service", data=data_received)
|
|
elif environment == BlattedEnvironment.TUI:
|
|
pass # TODO Implement
|
|
|
|
def on_acquire_notify(self, service: BleakGATTCharacteristicBlueZDBus) -> None:
|
|
# print(type(service))
|
|
# print(service.description)
|
|
# print(service.uuid)
|
|
# print(service.service_uuid)
|
|
table.add_row(f"{service.description}", f"{service.uuid}", "", "")
|
|
|
|
def on_update_service(self, data) -> None:
|
|
service, received_data = data
|
|
uuid_cells = table.columns[1]._cells
|
|
try:
|
|
row_index = uuid_cells.index(service.uuid)
|
|
table.columns[2]._cells[row_index] = f"{len(received_data)}"
|
|
table.columns[3]._cells[row_index] = f"{received_data.hex(' ')}"
|
|
except ValueError:
|
|
pass
|
|
|
|
async def run(self) -> None:
|
|
stop_event = asyncio.Event()
|
|
|
|
async with BleakClient(self.ble_address) as ble_client:
|
|
for service in ble_client.services:
|
|
for char in service.characteristics:
|
|
if "notify" in char.properties:
|
|
try:
|
|
await ble_client.start_notify(
|
|
char.uuid, self.notify_handler
|
|
)
|
|
if BlattedEnvironment.CLI:
|
|
self.emit("acquire_notify", char)
|
|
self.acquired_notify.append(char.uuid)
|
|
except bleak.exc.BleakError as exc:
|
|
# TODO notify error
|
|
print(exc)
|
|
|
|
await stop_event.wait()
|
|
|
|
for service in self.acquired_notify:
|
|
try:
|
|
await ble_client.stop_notify(service.uuid)
|
|
except bleak.exc.BleakError:
|
|
# TODO notify error
|
|
continue
|
|
|
|
|
|
def run(ble_address: str, service_uuids: list[str] = []):
|
|
if environment == BlattedEnvironment.CLI:
|
|
live.start()
|
|
|
|
try:
|
|
asyncio.run(Monitor(ble_address, service_uuids).run())
|
|
except bleak.exc.BleakDBusError as exc:
|
|
console.log(f"ERROR: {exc}")
|
|
|
|
if environment == BlattedEnvironment.CLI:
|
|
live.stop()
|