Receive detect broadcast
This commit is contained in:
parent
81fcc1ec40
commit
4a87703817
48
lw12.py
48
lw12.py
|
@ -17,7 +17,7 @@ from collections import namedtuple
|
|||
|
||||
|
||||
_LW12_PKT_LEN = 9
|
||||
_LW12_PKT_FMT = 'c6sxc'
|
||||
_LW12_PKT_FMT = 'c6scc'
|
||||
_LW12_PKT_PAYLOAD_FMT = 'cccccc'
|
||||
_LW12_PKT_HEAD = b'\x7e'
|
||||
_LW12_PKT_TAIL = b'\xef'
|
||||
|
@ -26,6 +26,12 @@ _LW12_PKT_TAIL = b'\xef'
|
|||
class LW12_MODE(Enum):
|
||||
LIGHT = b'\x04'
|
||||
COLOR = b'\x05'
|
||||
DETECT = b'\x07'
|
||||
IGNORE = b'\xff'
|
||||
|
||||
|
||||
class LW12_DETECT(Enum):
|
||||
SCAN = b'\x09'
|
||||
IGNORE = b'\xff'
|
||||
|
||||
|
||||
|
@ -34,11 +40,13 @@ class LW12_LIGHT(Enum):
|
|||
FLASH = b'\x02'
|
||||
SET = b'\x03'
|
||||
POWER = b'\x04'
|
||||
IGNORE = b'\xff'
|
||||
|
||||
|
||||
class LW12_POWER(Enum):
|
||||
ON = b'\x01'
|
||||
OFF = b'\x00'
|
||||
IGNORE = b'\xff'
|
||||
|
||||
|
||||
class LW12_COLOR_STATIC(Enum):
|
||||
|
@ -82,7 +90,7 @@ class LW12_COLOR_FLASH(Enum):
|
|||
WHITE = b'\x9c'
|
||||
|
||||
|
||||
LW12_Packet = namedtuple('Packet', 'head data tail')
|
||||
LW12_Packet = namedtuple('Packet', 'head data pad tail')
|
||||
LW12_Payload = namedtuple('Payload', 'mode option value r g b')
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
@ -92,23 +100,40 @@ class LW12Controller(object):
|
|||
|
||||
def __init__(self, host, port):
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.socket_receiver = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.socket_receiver.bind(('', 6000))
|
||||
|
||||
def get_remote_socket(self):
|
||||
return (self.host, self.port)
|
||||
|
||||
def send(self, payload, sock=None):
|
||||
data = self._compile_packet(payload)
|
||||
packet = pack(_LW12_PKT_FMT, _LW12_PKT_HEAD, data, _LW12_PKT_TAIL)
|
||||
if payload.mode == LW12_MODE.DETECT:
|
||||
pad = b'\xff'
|
||||
else:
|
||||
pad = b'\x00'
|
||||
packet = pack(_LW12_PKT_FMT, _LW12_PKT_HEAD, data, pad, _LW12_PKT_TAIL)
|
||||
if len(packet) != 9:
|
||||
raise Exception('Invalid data length. Packet malformed')
|
||||
try:
|
||||
from hexdump import hexdump
|
||||
hexdump(packet)
|
||||
print(sock or self.get_remote_socket())
|
||||
return self.socket.sendto(packet, sock or self.get_remote_socket())
|
||||
except OSError as oserr:
|
||||
if oserr.errno == 101:
|
||||
print('Network is unreachable')
|
||||
|
||||
def read(self):
|
||||
buffer_, sender = self.socket_receiver.recvfrom(_LW12_PKT_LEN)
|
||||
print(buffer_)
|
||||
print(sender)
|
||||
|
||||
|
||||
def _compile_packet(self, payload):
|
||||
if len(payload) != 6:
|
||||
raise Exception('Invalid payload length. Packet malformed')
|
||||
|
@ -121,21 +146,21 @@ class LW12Controller(object):
|
|||
payload.r.value, payload.g.value, payload.b.value)
|
||||
|
||||
def light_off(self):
|
||||
payload = LW12_Payload(r=LW12_MODE.IGNORE, g=LW12_MODE.IGNORE, b=LW12_MODE.IGNORE,
|
||||
payload = LW12_Payload(r=LW12_LIGHT.IGNORE, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
||||
mode=LW12_MODE.LIGHT,
|
||||
option=LW12_LIGHT.POWER,
|
||||
value=LW12_POWER.OFF)
|
||||
self.send(payload)
|
||||
|
||||
def light_on(self):
|
||||
payload = LW12_Payload(r=LW12_MODE.IGNORE, g=LW12_MODE.IGNORE, b=LW12_MODE.IGNORE,
|
||||
payload = LW12_Payload(r=LW12_LIGHT.IGNORE, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
||||
mode=LW12_MODE.LIGHT,
|
||||
option=LW12_LIGHT.POWER,
|
||||
value=LW12_POWER.ON)
|
||||
self.send(payload)
|
||||
|
||||
def set_effect_color(self, color):
|
||||
payload = LW12_Payload(r=LW12_LIGHT.SET, g=LW12_MODE.IGNORE, b=LW12_MODE.IGNORE,
|
||||
payload = LW12_Payload(r=LW12_LIGHT.SET, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
||||
mode=LW12_MODE.LIGHT,
|
||||
option=LW12_LIGHT.SET,
|
||||
value=color)
|
||||
|
@ -145,8 +170,17 @@ class LW12Controller(object):
|
|||
# Fail safe handling for integers > 100
|
||||
if 0 < value > 100:
|
||||
value = int(value / 255 * 100)
|
||||
payload = LW12_Payload(r=LW12_MODE.IGNORE, g=LW12_MODE.IGNORE, b=LW12_MODE.IGNORE,
|
||||
payload = LW12_Payload(r=LW12_LIGHT.IGNORE, g=LW12_LIGHT.IGNORE, b=LW12_LIGHT.IGNORE,
|
||||
mode=LW12_MODE.LIGHT,
|
||||
option=option,
|
||||
value=bytes([value]))
|
||||
self.send(payload)
|
||||
|
||||
def scan(self, broadcast_addr='255.255.255.255', broadcast_port=5000):
|
||||
# Fail safe handling for integers > 100
|
||||
payload = LW12_Payload(r=LW12_DETECT.IGNORE, g=LW12_DETECT.IGNORE, b=LW12_DETECT.IGNORE,
|
||||
mode=LW12_MODE.DETECT,
|
||||
option=LW12_DETECT.SCAN,
|
||||
value=LW12_DETECT.IGNORE)
|
||||
self.send(payload, (broadcast_addr, broadcast_port))
|
||||
self.read()
|
||||
|
|
63
test.py
63
test.py
|
@ -11,34 +11,35 @@ time.sleep(.500)
|
|||
print('[+] LW12 LIGHT ON')
|
||||
lw12.light_on()
|
||||
time.sleep(.500)
|
||||
print('[+] LW12_COLOR_STATIC')
|
||||
for color in LW12_COLOR_STATIC:
|
||||
print(' {}'.format(color))
|
||||
lw12.set_effect_color(color)
|
||||
time.sleep(.250)
|
||||
print('[+] LW12_LIGHT.BRIGHTNESS')
|
||||
for brightness in range(0, 101, 5):
|
||||
print(' brightness={}'.format(brightness))
|
||||
lw12.set_light_option(LW12_LIGHT.BRIGHTNESS, brightness)
|
||||
time.sleep(.250)
|
||||
print('[+] LW12_LIGHT.FLASH')
|
||||
print(' speed=100')
|
||||
lw12.set_light_option(LW12_LIGHT.FLASH, 100)
|
||||
print('[+] LW12_COLOR_JUMP')
|
||||
for color in LW12_COLOR_JUMP:
|
||||
print(' {}'.format(color))
|
||||
lw12.set_effect_color(color)
|
||||
time.sleep(2)
|
||||
print('[+] LW12_COLOR_FLASH')
|
||||
for color in LW12_COLOR_FLASH:
|
||||
print(' {}'.format(color))
|
||||
lw12.set_effect_color(color)
|
||||
time.sleep(2)
|
||||
print('[+] LW12_COLOR_GRADIENT')
|
||||
for color in LW12_COLOR_GRADIENT:
|
||||
print(' {}'.format(color))
|
||||
lw12.set_effect_color(color)
|
||||
time.sleep(2)
|
||||
|
||||
lw12.set_effect_color(LW12_COLOR_STATIC.PURPLE)
|
||||
lw12.set_light_option(LW12_LIGHT.BRIGHTNESS, 10)
|
||||
# print('[+] LW12_COLOR_STATIC')
|
||||
# for color in LW12_COLOR_STATIC:
|
||||
# print(' {}'.format(color))
|
||||
# lw12.set_effect_color(color)
|
||||
# time.sleep(.250)
|
||||
# print('[+] LW12_LIGHT.BRIGHTNESS')
|
||||
# for brightness in range(0, 101, 5):
|
||||
# print(' brightness={}'.format(brightness))
|
||||
# lw12.set_light_option(LW12_LIGHT.BRIGHTNESS, brightness)
|
||||
# time.sleep(.250)
|
||||
# print('[+] LW12_LIGHT.FLASH')
|
||||
# print(' speed=100')
|
||||
# lw12.set_light_option(LW12_LIGHT.FLASH, 100)
|
||||
# print('[+] LW12_COLOR_JUMP')
|
||||
# for color in LW12_COLOR_JUMP:
|
||||
# print(' {}'.format(color))
|
||||
# lw12.set_effect_color(color)
|
||||
# time.sleep(2)
|
||||
# print('[+] LW12_COLOR_FLASH')
|
||||
# for color in LW12_COLOR_FLASH:
|
||||
# print(' {}'.format(color))
|
||||
# lw12.set_effect_color(color)
|
||||
# time.sleep(2)
|
||||
# print('[+] LW12_COLOR_GRADIENT')
|
||||
# for color in LW12_COLOR_GRADIENT:
|
||||
# print(' {}'.format(color))
|
||||
# lw12.set_effect_color(color)
|
||||
# time.sleep(2)
|
||||
# lw12.set_effect_color(LW12_COLOR_STATIC.PURPLE)
|
||||
# lw12.set_light_option(LW12_LIGHT.BRIGHTNESS, 10)
|
||||
print('[+] LW12 SCAN')
|
||||
lw12.scan()
|
||||
|
|
Loading…
Reference in New Issue