um24clab/src/meter/um24c.py

169 lines
4.1 KiB
Python

from time import sleep
from struct import unpack
from enum import Enum
from collections import namedtuple
import bluetooth
PACKET_MAGIC = (b'\x09\x63', b'\xff\xf1')
DATA_FORMAT = ">2x2HI2HxB20I2HBx2IHI2x2HIxB2x"
rfcomm = None
UMeterStatus = namedtuple('UMeterStatus', [
'Voltage',
'Current',
'Power',
'Temperature_Celcius',
'Temperature_Fahrenheit',
'Group',
'Accumulated_Capacity0',
'Accumulated_Energy0',
'Accumulated_Capacity1',
'Accumulated_Energy1',
'Accumulated_Capacity2',
'Accumulated_Energy2',
'Accumulated_Capacity3',
'Accumulated_Energy3',
'Accumulated_Capacity4',
'Accumulated_Energy4',
'Accumulated_Capacity5',
'Accumulated_Energy5',
'Accumulated_Capacity6',
'Accumulated_Energy6',
'Accumulated_Capacity7',
'Accumulated_Energy7',
'Accumulated_Capacity8',
'Accumulated_Energy8',
'Accumulated_Capacity9',
'Accumulated_Energy9',
'DPlus',
'DMinus',
'Mode',
'Recorded_Capacity',
'Recorded_Energy',
'Current_Trigger',
'Recorded_Time',
'Display_timeout',
'Display_brightness',
'Load_equivalent_impedance',
'Current_screen']
)
class Command(Enum):
GET_STATUS = b'\xf0'
NEXT_SCREEN = b'\xf1'
ROTATE_SCREEN = b'\xf2'
NEXT_GROUP = b'\xf3'
CLEAR_GROUP = b'\xf4'
# TODO: brightness
def discover_devices():
"""Discover UM24C Bluetooth USB Power Meters.
Scan nearby bluetooth devices and check for *UM24C* named devices.
Returns:
Returns a list of a bluetooth address and device name pair (`str`,
`str`).
Raises:
Exception if bluetooth controller cannot be accessed.
"""
try:
nearby_devices = bluetooth.discover_devices(lookup_names=True)
except OSError:
raise Exception('Cannot access bluetooth controller.')
um24_meters = []
for bt_device in nearby_devices:
if 'UM24C' in bt_device:
um24_meters.append(bt_device)
return um24_meters
def connect(dev, port=1):
"""Connect to nearby bluetooth `dev`.
Open a RFCOMM socket connection to the bluetooth device. The argument `dev`
is the bluetooth address. The `port` defines the communication endpoint of
the bluetooth device to connect.
Args:
dev (str): Bluetooth device address
port (int, Default=1): Communication endpoint
"""
global rfcomm
if is_connected():
peername, peerport = rfcomm.getpeername()
raise Exception(f'Already connected to {peername}.')
rfcomm = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
rfcomm.connect((dev, port))
def disconnect():
"""Close connection to UM24C Bluetooth USB Power Meter.
Closes the `rfcomm` socket connection.
"""
global rfcomm
if is_connected():
rfcomm.close()
def is_connected():
"""Check the RFCOMM connection state.
Check if the socket is opened.
Returns:
boolean: Return `True` if connected otherwise return `False`.
"""
global rfcomm
if not rfcomm:
return False
elif rfcomm.fileno() < 0:
return False
else:
try:
rfcomm.getpeername()
return True
except bluetooth.btcommon.BluetoothError:
return False
def send_command(command):
global rfcomm
rfcomm.send(command.value)
sleep(0.25)
def get_meter_stats():
global rfcomm
rfcomm.send(Command.GET_STATUS.value)
sleep(0.30)
# data = rfcomm.recv(0x8f)
data = rfcomm.recv(0x8f)
if PACKET_MAGIC[0] not in data or PACKET_MAGIC[1] not in data:
return
pkt_start = data.index(PACKET_MAGIC[0])
pkt_end = data.index(PACKET_MAGIC[1])
if pkt_end - pkt_start != 128:
return
data = data[pkt_start:pkt_end+2]
try:
values = list(unpack(DATA_FORMAT, data))
for i, value in enumerate(values):
if type(value) is bytes:
values[i] = int.from_bytes(value.strip(b'\x00'), 'big',
signed=False)
return UMeterStatus(*values)
except Exception:
return None