2018-03-16 14:53:34 +01:00
|
|
|
#! /usr/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# vim:fenc=utf-8:ts=8:et:sw=4:sts=4
|
|
|
|
#
|
|
|
|
# Copyright © 2017 jpk <jpk@goatpr0n.de>
|
|
|
|
#
|
|
|
|
# Distributed under terms of the MIT license.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
from struct import (pack, unpack, calcsize)
|
|
|
|
from enum import Enum
|
|
|
|
import socket
|
|
|
|
from collections import namedtuple
|
|
|
|
|
|
|
|
|
|
|
|
_LW12_PKT_LEN = 9
|
2018-03-16 18:15:18 +01:00
|
|
|
_LW12_PKT_FMT = 'c6scc'
|
2018-03-16 14:53:34 +01:00
|
|
|
_LW12_PKT_PAYLOAD_FMT = 'cccccc'
|
|
|
|
_LW12_PKT_HEAD = b'\x7e'
|
|
|
|
_LW12_PKT_TAIL = b'\xef'
|
|
|
|
|
|
|
|
|
|
|
|
class LW12_MODE(Enum):
|
|
|
|
LIGHT = b'\x04'
|
2018-03-16 18:57:27 +01:00
|
|
|
EFFECT = b'\x05'
|
|
|
|
COLOR = b'\x07'
|
2018-03-16 18:15:18 +01:00
|
|
|
IGNORE = b'\xff'
|
|
|
|
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
class LW12_CONTROL(Enum):
|
|
|
|
RGB = b'\x05'
|
|
|
|
SCAN_CTRL = b'\x09'
|
2018-03-16 14:53:34 +01:00
|
|
|
IGNORE = b'\xff'
|
|
|
|
|
|
|
|
|
|
|
|
class LW12_LIGHT(Enum):
|
|
|
|
BRIGHTNESS = b'\x01'
|
|
|
|
FLASH = b'\x02'
|
|
|
|
SET = b'\x03'
|
|
|
|
POWER = b'\x04'
|
2018-03-16 18:57:27 +01:00
|
|
|
DIM = b'\x05' # Unused: To be used with LW12_MODE.EFFECT
|
2018-03-16 18:15:18 +01:00
|
|
|
IGNORE = b'\xff'
|
2018-03-16 14:53:34 +01:00
|
|
|
|
|
|
|
|
|
|
|
class LW12_POWER(Enum):
|
|
|
|
ON = b'\x01'
|
|
|
|
OFF = b'\x00'
|
2018-03-16 18:15:18 +01:00
|
|
|
IGNORE = b'\xff'
|
2018-03-16 14:53:34 +01:00
|
|
|
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
class LW12_EFFECT_STATIC(Enum):
|
2018-03-16 14:53:34 +01:00
|
|
|
RED = b'\x80'
|
|
|
|
BLUE = b'\x81'
|
|
|
|
GREEN = b'\x82'
|
|
|
|
CYAN = b'\x83'
|
|
|
|
YELLOW = b'\x84'
|
|
|
|
PURPLE = b'\x85'
|
|
|
|
WHITE = b'\x86'
|
|
|
|
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
class LW12_EFFECT_JUMP(Enum):
|
2018-03-16 14:53:34 +01:00
|
|
|
TRICOLOR = b'\x87'
|
|
|
|
SEVENCOLOR = b'\x88'
|
|
|
|
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
class LW12_EFFECT_GRADIENT(Enum):
|
2018-03-16 14:53:34 +01:00
|
|
|
TRICOLOR = b'\x89'
|
|
|
|
SEVENCOLOR = b'\x8a'
|
|
|
|
RED = b'\x8b'
|
|
|
|
GREEN = b'\x8c'
|
|
|
|
BLUE = b'\x8d'
|
|
|
|
YELLOW = b'\x8e'
|
|
|
|
CYAN = b'\x8f'
|
|
|
|
PURPLE = b'\x90'
|
|
|
|
WHITE = b'\x91'
|
|
|
|
RED_GREEN = b'\x92'
|
|
|
|
RED_BLUE = b'\x93'
|
|
|
|
GREEN_BLUE = b'\x94'
|
|
|
|
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
class LW12_EFFECT_FLASH(Enum):
|
2018-03-16 14:53:34 +01:00
|
|
|
SEVENCOLOR = b'\x95'
|
|
|
|
RED = b'\x96'
|
|
|
|
GREEN = b'\x97'
|
|
|
|
BLUE = b'\x98'
|
|
|
|
YELLOW = b'\x99'
|
|
|
|
CYAN = b'\x9a'
|
|
|
|
PURPLE = b'\x9b'
|
|
|
|
WHITE = b'\x9c'
|
|
|
|
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
class Color(object):
|
|
|
|
|
|
|
|
def __init__(self, value):
|
|
|
|
self._value = value
|
|
|
|
|
|
|
|
@property
|
|
|
|
def value(self):
|
|
|
|
return bytes([self._value])
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return '<lw12.Color: {}>'.format(self.value)
|
|
|
|
|
|
|
|
|
2018-03-16 18:15:18 +01:00
|
|
|
LW12_Packet = namedtuple('Packet', 'head data pad tail')
|
2018-03-16 14:53:34 +01:00
|
|
|
LW12_Payload = namedtuple('Payload', 'mode option value r g b')
|
|
|
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
|
|
|
|
|
|
|
class LW12Controller(object):
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
def __init__(self, host, port, detect=False):
|
2018-03-16 14:53:34 +01:00
|
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
self.host = host
|
|
|
|
self.port = port
|
2018-03-16 18:57:27 +01:00
|
|
|
self._detect_controllers = detect
|
|
|
|
if detect:
|
|
|
|
self.socket_receiver = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
self.socket_receiver.bind(('', 6000))
|
2018-03-16 14:53:34 +01:00
|
|
|
|
|
|
|
def get_remote_socket(self):
|
|
|
|
return (self.host, self.port)
|
|
|
|
|
|
|
|
def send(self, payload, sock=None):
|
|
|
|
data = self._compile_packet(payload)
|
2018-03-16 18:57:27 +01:00
|
|
|
if payload.option == LW12_CONTROL.SCAN_CTRL:
|
2018-03-16 18:15:18 +01:00
|
|
|
pad = b'\xff'
|
|
|
|
else:
|
|
|
|
pad = b'\x00'
|
|
|
|
packet = pack(_LW12_PKT_FMT, _LW12_PKT_HEAD, data, pad, _LW12_PKT_TAIL)
|
2018-03-16 14:53:34 +01:00
|
|
|
if len(packet) != 9:
|
|
|
|
raise Exception('Invalid data length. Packet malformed')
|
|
|
|
try:
|
|
|
|
return self.socket.sendto(packet, sock or self.get_remote_socket())
|
|
|
|
except OSError as oserr:
|
|
|
|
if oserr.errno == 101:
|
|
|
|
print('Network is unreachable')
|
|
|
|
|
2018-03-16 18:15:18 +01:00
|
|
|
def read(self):
|
2018-03-16 18:57:27 +01:00
|
|
|
if not self._detect_controllers:
|
|
|
|
return None
|
2018-03-16 18:15:18 +01:00
|
|
|
buffer_, sender = self.socket_receiver.recvfrom(_LW12_PKT_LEN)
|
|
|
|
print(buffer_)
|
|
|
|
print(sender)
|
|
|
|
|
|
|
|
|
2018-03-16 14:53:34 +01:00
|
|
|
def _compile_packet(self, payload):
|
|
|
|
if len(payload) != 6:
|
|
|
|
raise Exception('Invalid payload length. Packet malformed')
|
|
|
|
try:
|
|
|
|
raw_value = payload.value.value
|
|
|
|
except AttributeError:
|
|
|
|
raw_value = payload.value
|
|
|
|
return pack(_LW12_PKT_PAYLOAD_FMT,
|
|
|
|
payload.mode.value, payload.option.value, raw_value,
|
|
|
|
payload.r.value, payload.g.value, payload.b.value)
|
|
|
|
|
|
|
|
def light_off(self):
|
2018-03-16 18:15:18 +01:00
|
|
|
payload = LW12_Payload(r=LW12_LIGHT.IGNORE, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
2018-03-16 14:53:34 +01:00
|
|
|
mode=LW12_MODE.LIGHT,
|
|
|
|
option=LW12_LIGHT.POWER,
|
|
|
|
value=LW12_POWER.OFF)
|
|
|
|
self.send(payload)
|
|
|
|
|
|
|
|
def light_on(self):
|
2018-03-16 18:15:18 +01:00
|
|
|
payload = LW12_Payload(r=LW12_LIGHT.IGNORE, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
2018-03-16 14:53:34 +01:00
|
|
|
mode=LW12_MODE.LIGHT,
|
|
|
|
option=LW12_LIGHT.POWER,
|
|
|
|
value=LW12_POWER.ON)
|
|
|
|
self.send(payload)
|
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
def set_effect(self, effect):
|
2018-03-16 18:15:18 +01:00
|
|
|
payload = LW12_Payload(r=LW12_LIGHT.SET, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
2018-03-16 14:53:34 +01:00
|
|
|
mode=LW12_MODE.LIGHT,
|
|
|
|
option=LW12_LIGHT.SET,
|
2018-03-16 18:57:27 +01:00
|
|
|
value=effect)
|
2018-03-16 14:53:34 +01:00
|
|
|
self.send(payload)
|
|
|
|
|
|
|
|
def set_light_option(self, option, value):
|
|
|
|
# Fail safe handling for integers > 100
|
|
|
|
if 0 < value > 100:
|
|
|
|
value = int(value / 255 * 100)
|
2018-03-16 18:15:18 +01:00
|
|
|
payload = LW12_Payload(r=LW12_LIGHT.IGNORE, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
2018-03-16 14:53:34 +01:00
|
|
|
mode=LW12_MODE.LIGHT,
|
|
|
|
option=option,
|
|
|
|
value=bytes([value]))
|
|
|
|
self.send(payload)
|
2018-03-16 18:15:18 +01:00
|
|
|
|
2018-03-16 18:57:27 +01:00
|
|
|
def set_color(self, red, green, blue):
|
|
|
|
payload = LW12_Payload(r=Color(red), g=Color(green), b=Color(blue),
|
|
|
|
mode=LW12_MODE.COLOR,
|
|
|
|
option=LW12_CONTROL.RGB,
|
|
|
|
value=LW12_LIGHT.SET)
|
|
|
|
self.send(payload)
|
|
|
|
|
2018-03-16 18:15:18 +01:00
|
|
|
def scan(self, broadcast_addr='255.255.255.255', broadcast_port=5000):
|
2018-03-16 18:57:27 +01:00
|
|
|
if not self._detect_controllers:
|
|
|
|
return None
|
2018-03-16 18:15:18 +01:00
|
|
|
# Fail safe handling for integers > 100
|
|
|
|
payload = LW12_Payload(r=LW12_DETECT.IGNORE, g=LW12_DETECT.IGNORE, b=LW12_DETECT.IGNORE,
|
|
|
|
mode=LW12_MODE.DETECT,
|
2018-03-16 18:57:27 +01:00
|
|
|
option=LW12_DETECT.SCAN_CTRL,
|
2018-03-16 18:15:18 +01:00
|
|
|
value=LW12_DETECT.IGNORE)
|
|
|
|
self.send(payload, (broadcast_addr, broadcast_port))
|
|
|
|
self.read()
|