[WIP] monitor live mode

This commit is contained in:
jpk 2023-05-20 14:12:39 +02:00
parent 056f6d82d9
commit 6020b7c38b
3 changed files with 95 additions and 10 deletions

View File

@ -10,8 +10,9 @@ def scanner_cmd() -> None:
@click.command(name="monitor")
def monitor_cmd() -> None:
monitor.run()
@click.argument("address")
def monitor_cmd(address) -> None:
monitor.run(address)
@click.command(name="tui")

View File

@ -1,18 +1,101 @@
import asyncio
from binascii import hexlify
from rich.live import Live
from rich.table import Table
import bleak.exc
from bleak import BleakClient, BleakGATTCharacteristic
from hexdump import hexdump
from icecream import ic
from bleak.backends.bluezdbus.characteristic import BleakGATTCharacteristicBlueZDBus
from pydispatch import Dispatcher
from ...events.event import DeviceDiscovered
from ...tools import context
from ...tools.context import BlattedEnvironment
async def monitor_services(filter: list[str] = []):
await asyncio.sleep(0.1)
environment = context.get_environment()
console = context.get_console()
table = Table(width=100)
live = Live(table, console=console)
table.add_column("Service", max_width=18)
table.add_column("UUID", max_width=18)
table.add_column("Size", max_width=6)
table.add_column("data", width=58)
def run(uuid_filter: list[str] = []):
print("monitor called")
class Monitor(Dispatcher):
_events_ = ["acquire_notify", "update_service"]
def __init__(self, ble_address: str, service_uuids: list[str]) -> None:
self.environment = context.get_environment()
self.ble_address = ble_address
self.service_uuids = service_uuids
self.acquired_notify: list[str] = []
self.bind(acquire_notify=self.on_acquire_notify)
self.bind(update_service=self.on_update_service)
super().__init__()
async def notify_handler(self, sender: BleakGATTCharacteristic, data: bytearray) -> None:
data_received = (sender, data)
if environment == BlattedEnvironment.CLI:
self.emit("update_service", data=data_received)
elif environment == BlattedEnvironment.TUI:
pass # TODO Implement
def on_acquire_notify(self, service: BleakGATTCharacteristicBlueZDBus) -> None:
# print(type(service))
# print(service.description)
# print(service.uuid)
# print(service.service_uuid)
table.add_row(f"{service.description}", f"{service.uuid}", "", "")
def on_update_service(self, data) -> None:
service, received_data = data
uuid_cells = table.columns[1]._cells
try:
row_index = uuid_cells.index(service.uuid)
table.columns[2]._cells[row_index] = f"{len(received_data)}"
table.columns[3]._cells[row_index] = f"{hexlify(received_data)}"
except ValueError:
pass
async def run(self) -> None:
stop_event = asyncio.Event()
async with BleakClient(self.ble_address) as ble_client:
for service in ble_client.services:
for char in service.characteristics:
if "notify" in char.properties:
try:
await ble_client.start_notify(char.uuid, self.notify_handler)
if BlattedEnvironment.CLI:
self.emit("acquire_notify", char)
self.acquired_notify.append(char.uuid)
except bleak.exc.BleakError as exc:
# TODO notify error
print(exc)
await stop_event.wait()
for service in self.acquired_notify:
try:
await ble_client.stop_notify(service.uuid)
except bleak.exc.BleakError:
# TODO notify error
continue
def run(ble_address: str, service_uuids: list[str] = []):
if environment == BlattedEnvironment.CLI:
live.start()
try:
asyncio.run(monitor_services(uuid_filter))
asyncio.run(Monitor(ble_address, service_uuids).run())
except bleak.exc.BleakDBusError as exc:
print(f"ERROR: {exc}")
console.log(f"ERROR: {exc}")
if environment == BlattedEnvironment.CLI:
live.stop()

View File

@ -33,6 +33,7 @@ class Scanner(Dispatcher):
self.environment = context.get_environment()
self.devices: Dict[str, Any] = {}
self.bind(device_discovered=self.on_device_discovered)
super().__init__()
def discover_callback(
self, device: BLEDevice, advertising_data: AdvertisementData