[WIP] scanner live table update

This commit is contained in:
jpk 2023-05-19 15:49:25 +02:00
parent 42afd9dc65
commit 02c651aefb
3 changed files with 61 additions and 13 deletions

View File

@ -2,3 +2,8 @@
A BLE GATT notification service reverse engineering tool which dumps all data as hexdump. A BLE GATT notification service reverse engineering tool which dumps all data as hexdump.
It registers for every service with *notify* flag. It registers for every service with *notify* flag.
## Proof of concepts
* Working scanner wich dynamically updates `rich.table.Table` cells: [Scanner demo](https://asciinema.org/a/1EbiCyZiUMngZmgATdYCHFZYW)

View File

@ -1,6 +1,9 @@
import asyncio import asyncio
from typing import Dict, Any from typing import Dict, Any
from rich.live import Live
from rich.table import Table
import bleak.exc import bleak.exc
from bleak import BleakScanner from bleak import BleakScanner
from bleak.backends.device import BLEDevice from bleak.backends.device import BLEDevice
@ -9,6 +12,18 @@ from pydispatch import Dispatcher
from ...events.event import DeviceDiscovered from ...events.event import DeviceDiscovered
from ...tools import context from ...tools import context
from ...tools.context import BlattedEnvironment
environment = context.get_environment()
console = context.get_console()
table = Table(width=100)
live = Live(table, console=console)
table.add_column("Address", max_width=18)
table.add_column("Name", max_width=32)
table.add_column("RSSI", max_width=5)
table.add_column("Services", width=100-55)
class Scanner(Dispatcher): class Scanner(Dispatcher):
@ -25,20 +40,25 @@ class Scanner(Dispatcher):
if len(advertising_data.service_uuids) > 0: if len(advertising_data.service_uuids) > 0:
#discovered = DiscoveredDevice(device, advertising_data) #discovered = DiscoveredDevice(device, advertising_data)
discovered = DeviceDiscovered(device, advertising_data) discovered = DeviceDiscovered(device, advertising_data)
if context.get_environment() == context.BlattedEnvironment.CLI: if environment == BlattedEnvironment.CLI:
self.emit("device_discovered", data=discovered) self.emit("device_discovered", data=discovered)
elif context.get_environment() == context.BlattedEnvironment.TUI: live.update(table)
elif environment == BlattedEnvironment.TUI:
pass # TODO Implement pass # TODO Implement
def on_device_discovered(self, data: DeviceDiscovered) -> None: def on_device_discovered(self, data: DeviceDiscovered) -> None:
if data.device.address not in self.devices: if data.device.address not in self.devices:
print(f"new device discovered: {data.device}") row = table.add_row(f"{data.device.address}", f"{data.device.name}",
f"{data.adverisement_data.rssi}",
"\n".join(data.adverisement_data.service_uuids))
self.devices[data.device.address] = {"data": data, "seen": 1} self.devices[data.device.address] = {"data": data, "seen": 1}
else: else:
self.devices[data.device.address]["seen"] += 1 known_data = self.devices[data.device.address]
print( known_data["seen"] += 1
f"device seen {self.devices[data.device.address]['seen']} times: {data.device}" address_cells = table.columns[0]._cells
) row_index = address_cells.index(data.device.address)
table.columns[1]._cells[row_index] = data.device.name
table.columns[2]._cells[row_index] = f"{data.adverisement_data.rssi}"
async def run(self) -> None: async def run(self) -> None:
stop_event = asyncio.Event() stop_event = asyncio.Event()
@ -48,8 +68,13 @@ class Scanner(Dispatcher):
def run(): def run():
print("scanner called") if environment == BlattedEnvironment.CLI:
live.start()
try: try:
asyncio.run(Scanner().run()) asyncio.run(Scanner().run())
except bleak.exc.BleakDBusError as exc: except bleak.exc.BleakDBusError as exc:
print(f"ERROR: {exc}") console.log(f"ERROR: {exc}")
if environment == BlattedEnvironment.CLI:
live.stop()

View File

@ -1,8 +1,7 @@
from contextvars import ContextVar from contextvars import ContextVar
from enum import Enum from enum import Enum
from rich.console import Console
blatted_environment_var: ContextVar = ContextVar("blatted_environment")
class BlattedEnvironment(Enum): class BlattedEnvironment(Enum):
@ -10,9 +9,28 @@ class BlattedEnvironment(Enum):
TUI = "terminal user interface" TUI = "terminal user interface"
def set_environment(mode: BlattedEnvironment) -> None: blatted_environment_var: ContextVar = ContextVar("blatted_environment")
blatted_console_var: ContextVar = ContextVar("blatted_console")
try:
# test if the variable was set
blatted_console_var.get()
except LookupError:
console = Console()
blatted_console_var.set(console)
def set_environment(mode: BlattedEnvironment = BlattedEnvironment.CLI) -> None:
blatted_environment_var.set(mode) blatted_environment_var.set(mode)
def get_environment() -> BlattedEnvironment: def get_environment() -> BlattedEnvironment:
return blatted_environment_var.get() try:
return blatted_environment_var.get()
except LookupError:
blatted_environment_var.set(BlattedEnvironment.CLI)
return blatted_environment_var.get()
def get_console() -> Console:
return blatted_console_var.get()