83 lines
2.7 KiB
Python
83 lines
2.7 KiB
Python
import asyncio
|
|
from typing import Any, Dict
|
|
|
|
import bleak.exc
|
|
from bleak import BleakScanner
|
|
from bleak.backends.device import BLEDevice
|
|
from bleak.backends.scanner import AdvertisementData
|
|
from pydispatch import Dispatcher
|
|
from rich.live import Live
|
|
from rich.table import Table
|
|
|
|
from ...events.event import DeviceDiscovered
|
|
from ...tools import context
|
|
from ...tools.context import BlattedEnvironment
|
|
|
|
environment = context.get_environment()
|
|
console = context.get_console()
|
|
|
|
table = Table(width=100)
|
|
live = Live(table, console=console)
|
|
table.add_column("Address", max_width=18)
|
|
table.add_column("Name", max_width=32)
|
|
table.add_column("RSSI", max_width=5)
|
|
table.add_column("Services", width=100 - 55)
|
|
|
|
|
|
class Scanner(Dispatcher):
|
|
_events_ = ["device_discovered"]
|
|
|
|
def __init__(self) -> None:
|
|
self.environment = context.get_environment()
|
|
self.devices: dict[str, Any] = {}
|
|
self.bind(device_discovered=self.on_device_discovered)
|
|
super().__init__()
|
|
|
|
def discover_callback(
|
|
self, device: BLEDevice, advertising_data: AdvertisementData
|
|
) -> None:
|
|
if len(advertising_data.service_uuids) > 0:
|
|
# discovered = DiscoveredDevice(device, advertising_data)
|
|
discovered = DeviceDiscovered(device, advertising_data)
|
|
if environment == BlattedEnvironment.CLI:
|
|
self.emit("device_discovered", data=discovered)
|
|
live.update(table)
|
|
elif environment == BlattedEnvironment.TUI:
|
|
pass # TODO Implement
|
|
|
|
def on_device_discovered(self, data: DeviceDiscovered) -> None:
|
|
if data.device.address not in self.devices:
|
|
table.add_row(
|
|
f"{data.device.address}",
|
|
f"{data.device.name}",
|
|
f"{data.adverisement_data.rssi}",
|
|
"\n".join(data.adverisement_data.service_uuids),
|
|
)
|
|
self.devices[data.device.address] = {"data": data, "seen": 1}
|
|
else:
|
|
known_data = self.devices[data.device.address]
|
|
known_data["seen"] += 1
|
|
address_cells = table.columns[0]._cells
|
|
row_index = address_cells.index(data.device.address)
|
|
table.columns[1]._cells[row_index] = data.device.name
|
|
table.columns[2]._cells[row_index] = f"{data.adverisement_data.rssi}"
|
|
|
|
async def run(self) -> None:
|
|
stop_event = asyncio.Event()
|
|
|
|
async with BleakScanner(self.discover_callback):
|
|
await stop_event.wait()
|
|
|
|
|
|
def run():
|
|
if environment == BlattedEnvironment.CLI:
|
|
live.start()
|
|
|
|
try:
|
|
asyncio.run(Scanner().run())
|
|
except bleak.exc.BleakDBusError as exc:
|
|
console.log(f"ERROR: {exc}")
|
|
|
|
if environment == BlattedEnvironment.CLI:
|
|
live.stop()
|