blatted/blatted/tools/ble/scanner.py

56 lines
1.9 KiB
Python

import asyncio
from typing import Dict, Any
import bleak.exc
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from pydispatch import Dispatcher
from ...events.event import DeviceDiscovered
from ...tools import context
class Scanner(Dispatcher):
_events_ = ["device_discovered"]
def __init__(self) -> None:
self.environment = context.get_environment()
self.devices: Dict[str, Any] = {}
self.bind(device_discovered=self.on_device_discovered)
def discover_callback(
self, device: BLEDevice, advertising_data: AdvertisementData
) -> None:
if len(advertising_data.service_uuids) > 0:
#discovered = DiscoveredDevice(device, advertising_data)
discovered = DeviceDiscovered(device, advertising_data)
if context.get_environment() == context.BlattedEnvironment.CLI:
self.emit("device_discovered", data=discovered)
elif context.get_environment() == context.BlattedEnvironment.TUI:
pass # TODO Implement
def on_device_discovered(self, data: DeviceDiscovered) -> None:
if data.device.address not in self.devices:
print(f"new device discovered: {data.device}")
self.devices[data.device.address] = {"data": data, "seen": 1}
else:
self.devices[data.device.address]["seen"] += 1
print(
f"device seen {self.devices[data.device.address]['seen']} times: {data.device}"
)
async def run(self) -> None:
stop_event = asyncio.Event()
async with BleakScanner(self.discover_callback):
await stop_event.wait()
def run():
print("scanner called")
try:
asyncio.run(Scanner().run())
except bleak.exc.BleakDBusError as exc:
print(f"ERROR: {exc}")