diff --git a/blatted/cli/__init__.py b/blatted/cli/__init__.py index fb36f8f..966c44f 100644 --- a/blatted/cli/__init__.py +++ b/blatted/cli/__init__.py @@ -10,8 +10,9 @@ def scanner_cmd() -> None: @click.command(name="monitor") -def monitor_cmd() -> None: - monitor.run() +@click.argument("address") +def monitor_cmd(address) -> None: + monitor.run(address) @click.command(name="tui") diff --git a/blatted/tools/ble/monitor.py b/blatted/tools/ble/monitor.py index 6129468..7ef23f4 100644 --- a/blatted/tools/ble/monitor.py +++ b/blatted/tools/ble/monitor.py @@ -1,18 +1,101 @@ import asyncio +from binascii import hexlify + +from rich.live import Live +from rich.table import Table + import bleak.exc from bleak import BleakClient, BleakGATTCharacteristic -from hexdump import hexdump -from icecream import ic +from bleak.backends.bluezdbus.characteristic import BleakGATTCharacteristicBlueZDBus + +from pydispatch import Dispatcher + +from ...events.event import DeviceDiscovered +from ...tools import context +from ...tools.context import BlattedEnvironment -async def monitor_services(filter: list[str] = []): - await asyncio.sleep(0.1) +environment = context.get_environment() +console = context.get_console() + +table = Table(width=100) +live = Live(table, console=console) +table.add_column("Service", max_width=18) +table.add_column("UUID", max_width=18) +table.add_column("Size", max_width=6) +table.add_column("data", width=58) -def run(uuid_filter: list[str] = []): - print("monitor called") +class Monitor(Dispatcher): + _events_ = ["acquire_notify", "update_service"] + + def __init__(self, ble_address: str, service_uuids: list[str]) -> None: + self.environment = context.get_environment() + self.ble_address = ble_address + self.service_uuids = service_uuids + self.acquired_notify: list[str] = [] + self.bind(acquire_notify=self.on_acquire_notify) + self.bind(update_service=self.on_update_service) + super().__init__() + + async def notify_handler(self, sender: BleakGATTCharacteristic, data: bytearray) -> None: + data_received = (sender, data) + if environment == BlattedEnvironment.CLI: + self.emit("update_service", data=data_received) + elif environment == BlattedEnvironment.TUI: + pass # TODO Implement + + def on_acquire_notify(self, service: BleakGATTCharacteristicBlueZDBus) -> None: + # print(type(service)) + # print(service.description) + # print(service.uuid) + # print(service.service_uuid) + table.add_row(f"{service.description}", f"{service.uuid}", "", "") + + def on_update_service(self, data) -> None: + service, received_data = data + uuid_cells = table.columns[1]._cells + try: + row_index = uuid_cells.index(service.uuid) + table.columns[2]._cells[row_index] = f"{len(received_data)}" + table.columns[3]._cells[row_index] = f"{hexlify(received_data)}" + except ValueError: + pass + + async def run(self) -> None: + stop_event = asyncio.Event() + + async with BleakClient(self.ble_address) as ble_client: + for service in ble_client.services: + for char in service.characteristics: + if "notify" in char.properties: + try: + await ble_client.start_notify(char.uuid, self.notify_handler) + if BlattedEnvironment.CLI: + self.emit("acquire_notify", char) + self.acquired_notify.append(char.uuid) + except bleak.exc.BleakError as exc: + # TODO notify error + print(exc) + + await stop_event.wait() + + for service in self.acquired_notify: + try: + await ble_client.stop_notify(service.uuid) + except bleak.exc.BleakError: + # TODO notify error + continue + +def run(ble_address: str, service_uuids: list[str] = []): + if environment == BlattedEnvironment.CLI: + live.start() + try: - asyncio.run(monitor_services(uuid_filter)) + asyncio.run(Monitor(ble_address, service_uuids).run()) except bleak.exc.BleakDBusError as exc: - print(f"ERROR: {exc}") + console.log(f"ERROR: {exc}") + + if environment == BlattedEnvironment.CLI: + live.stop() diff --git a/blatted/tools/ble/scanner.py b/blatted/tools/ble/scanner.py index 0d53032..534429b 100644 --- a/blatted/tools/ble/scanner.py +++ b/blatted/tools/ble/scanner.py @@ -33,6 +33,7 @@ class Scanner(Dispatcher): self.environment = context.get_environment() self.devices: Dict[str, Any] = {} self.bind(device_discovered=self.on_device_discovered) + super().__init__() def discover_callback( self, device: BLEDevice, advertising_data: AdvertisementData