diff --git a/README.md b/README.md index 09189d0..f7c9f83 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,8 @@ A BLE GATT notification service reverse engineering tool which dumps all data as hexdump. It registers for every service with *notify* flag. + + +## Proof of concepts + +* Working scanner wich dynamically updates `rich.table.Table` cells: [Scanner demo](https://asciinema.org/a/1EbiCyZiUMngZmgATdYCHFZYW) diff --git a/blatted/tools/ble/scanner.py b/blatted/tools/ble/scanner.py index e631beb..0d53032 100644 --- a/blatted/tools/ble/scanner.py +++ b/blatted/tools/ble/scanner.py @@ -1,6 +1,9 @@ import asyncio from typing import Dict, Any +from rich.live import Live +from rich.table import Table + import bleak.exc from bleak import BleakScanner from bleak.backends.device import BLEDevice @@ -9,6 +12,18 @@ from pydispatch import Dispatcher from ...events.event import DeviceDiscovered from ...tools import context +from ...tools.context import BlattedEnvironment + + +environment = context.get_environment() +console = context.get_console() + +table = Table(width=100) +live = Live(table, console=console) +table.add_column("Address", max_width=18) +table.add_column("Name", max_width=32) +table.add_column("RSSI", max_width=5) +table.add_column("Services", width=100-55) class Scanner(Dispatcher): @@ -25,20 +40,25 @@ class Scanner(Dispatcher): if len(advertising_data.service_uuids) > 0: #discovered = DiscoveredDevice(device, advertising_data) discovered = DeviceDiscovered(device, advertising_data) - if context.get_environment() == context.BlattedEnvironment.CLI: + if environment == BlattedEnvironment.CLI: self.emit("device_discovered", data=discovered) - elif context.get_environment() == context.BlattedEnvironment.TUI: + live.update(table) + elif environment == BlattedEnvironment.TUI: pass # TODO Implement def on_device_discovered(self, data: DeviceDiscovered) -> None: if data.device.address not in self.devices: - print(f"new device discovered: {data.device}") + row = table.add_row(f"{data.device.address}", f"{data.device.name}", + f"{data.adverisement_data.rssi}", + "\n".join(data.adverisement_data.service_uuids)) self.devices[data.device.address] = {"data": data, "seen": 1} else: - self.devices[data.device.address]["seen"] += 1 - print( - f"device seen {self.devices[data.device.address]['seen']} times: {data.device}" - ) + known_data = self.devices[data.device.address] + known_data["seen"] += 1 + address_cells = table.columns[0]._cells + row_index = address_cells.index(data.device.address) + table.columns[1]._cells[row_index] = data.device.name + table.columns[2]._cells[row_index] = f"{data.adverisement_data.rssi}" async def run(self) -> None: stop_event = asyncio.Event() @@ -48,8 +68,13 @@ class Scanner(Dispatcher): def run(): - print("scanner called") + if environment == BlattedEnvironment.CLI: + live.start() + try: asyncio.run(Scanner().run()) except bleak.exc.BleakDBusError as exc: - print(f"ERROR: {exc}") + console.log(f"ERROR: {exc}") + + if environment == BlattedEnvironment.CLI: + live.stop() diff --git a/blatted/tools/context.py b/blatted/tools/context.py index e073c39..e4e9478 100644 --- a/blatted/tools/context.py +++ b/blatted/tools/context.py @@ -1,8 +1,7 @@ from contextvars import ContextVar from enum import Enum - -blatted_environment_var: ContextVar = ContextVar("blatted_environment") +from rich.console import Console class BlattedEnvironment(Enum): @@ -10,9 +9,28 @@ class BlattedEnvironment(Enum): TUI = "terminal user interface" -def set_environment(mode: BlattedEnvironment) -> None: +blatted_environment_var: ContextVar = ContextVar("blatted_environment") + +blatted_console_var: ContextVar = ContextVar("blatted_console") +try: + # test if the variable was set + blatted_console_var.get() +except LookupError: + console = Console() + blatted_console_var.set(console) + + +def set_environment(mode: BlattedEnvironment = BlattedEnvironment.CLI) -> None: blatted_environment_var.set(mode) def get_environment() -> BlattedEnvironment: - return blatted_environment_var.get() + try: + return blatted_environment_var.get() + except LookupError: + blatted_environment_var.set(BlattedEnvironment.CLI) + return blatted_environment_var.get() + + +def get_console() -> Console: + return blatted_console_var.get()