um24clab/src/meter/um24c.py

147 lines
3.6 KiB
Python

from time import sleep
from struct import unpack
from collections import namedtuple
import bluetooth
PACKET_MAGIC = (b'\x09\x63', b'\xff\xf1')
DATA_FORMAT = ">2x2HI2HxB20I2HBx2IHI2x2HIxB2x"
rfcomm = None
UMeterStatus = namedtuple('UMeterStatus', [
'Voltage',
'Current',
'Power',
'Temperature_Celcius',
'Temperature_Fahrenheit',
'Group',
'Accumulated_Capacity0',
'Accumulated_Energy0',
'Accumulated_Capacity1',
'Accumulated_Energy1',
'Accumulated_Capacity2',
'Accumulated_Energy2',
'Accumulated_Capacity3',
'Accumulated_Energy3',
'Accumulated_Capacity4',
'Accumulated_Energy4',
'Accumulated_Capacity5',
'Accumulated_Energy5',
'Accumulated_Capacity6',
'Accumulated_Energy6',
'Accumulated_Capacity7',
'Accumulated_Energy7',
'Accumulated_Capacity8',
'Accumulated_Energy8',
'Accumulated_Capacity9',
'Accumulated_Energy9',
'DPlus',
'DMinus',
'Mode',
'Recorded_Capacity',
'Recorded_Energy',
'Current_Trigger',
'Recorded_Time',
'Display_timeout',
'Display_brightness',
'Load_equivalent_impedance',
'Current_screen']
)
UM24C_GET_STATUS = b'\xf0'
def discover_devices():
"""Discover UM24C Bluetooth USB Power Meters.
Scan nearby bluetooth devices and check for *UM24C* named devices.
Returns:
Returns a list of a bluetooth address and device name pair (`str`,
`str`).
Raises:
Exception if bluetooth controller cannot be accessed.
"""
try:
nearby_devices = bluetooth.discover_devices(lookup_names=True)
except OSError as oserr:
raise Exception('Cannot access bluetooth controller.')
um24_meters = []
for bt_device in nearby_devices:
if 'UM24C' in bt_device:
um24_meters.append(bt_device)
return um24_meters
def connect(dev, port=1):
"""Connect to nearby bluetooth `dev`.
Open a RFCOMM socket connection to the bluetooth device. The argument `dev`
is the bluetooth address. The `port` defines the communication endpoint of
the bluetooth device to connect.
Args:
dev (str): Bluetooth device address
port (int, Default=1): Communication endpoint
"""
global rfcomm
if is_connected():
peername, peerport = rfcomm.getpeername()
raise Exception(f'Already connected to {peername}.')
rfcomm = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
rfcomm.connect((dev, port))
def disconnect():
"""Close connection to UM24C Bluetooth USB Power Meter.
Closes the `rfcomm` socket connection.
"""
global rfcomm
if is_connected():
rfcomm.close()
def is_connected():
"""Check the RFCOMM connection state.
Check if the socket is opened.
Returns:
boolean: Return `True` if connected otherwise return `False`.
"""
global rfcomm
if not rfcomm:
return False
elif rfcomm.fileno() < 0:
return False
else:
try:
rfcomm.getpeername()
return True
except bluetooth.btcommon.BluetoothError:
return False
def get_meter_stats(timeout=50):
global rfcomm
rfcomm.send(UM24C_GET_STATUS)
sleep(timeout)
data = rfcomm.recv(0x8f)
# assert len(data) == 130
# assert data[0:2] == PACKET_MAGIC[0] and data[-2:] == PACKET_MAGIC[1]
try:
values = list(unpack(DATA_FORMAT, data))
for i, value in enumerate(values):
if type(value) is bytes:
values[i] = int.from_bytes(value.strip(b'\x00'), 'big',
signed=False)
return UMeterStatus(*values)
except:
return None