Pytuya library refactoring and code cleaning
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
# TinyTuya Module
|
||||
# PyTuya Module
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Python module to interface with Tuya WiFi smart devices
|
||||
@@ -11,10 +11,7 @@
|
||||
For more information see https://github.com/clach04/python-tuya
|
||||
|
||||
Classes
|
||||
OutletDevice(dev_id, address, local_key=None)
|
||||
CoverDevice(dev_id, address, local_key=None)
|
||||
BulbDevice(dev_id, address, local_key=None)
|
||||
|
||||
PytuyaDevice(dev_id, address, local_key=None)
|
||||
dev_id (str): Device ID e.g. 01234567891234567890
|
||||
address (str): Device Network IP Address e.g. 10.0.1.99
|
||||
local_key (str, optional): The encryption key. Defaults to None.
|
||||
@@ -23,27 +20,10 @@
|
||||
json = status() # returns json payload
|
||||
set_version(version) # 3.1 [default] or 3.3
|
||||
set_dpsUsed(dpsUsed)
|
||||
set_status(on, switch=1) # Set status of the device to 'on' or 'off' (bool)
|
||||
set_dps(on, switch=1) # Set the of the device to 'on' or 'off' (bool)
|
||||
set_value(index, value) # Set int value of any index.
|
||||
turn_on(switch=1):
|
||||
turn_off(switch=1):
|
||||
set_timer(num_secs):
|
||||
|
||||
CoverDevice:
|
||||
open_cover(switch=1):
|
||||
close_cover(switch=1):
|
||||
stop_cover(switch=1):
|
||||
|
||||
BulbDevice
|
||||
set_colour(r, g, b):
|
||||
set_white(brightness, colourtemp):
|
||||
set_brightness(brightness):
|
||||
set_colourtemp(colourtemp):
|
||||
result = brightness():
|
||||
result = colourtemp():
|
||||
(r, g, b) = colour_rgb():
|
||||
(h,s,v) = colour_hsv()
|
||||
result = state():
|
||||
|
||||
Credits
|
||||
* TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
|
||||
@@ -73,7 +53,7 @@ except ImportError:
|
||||
import pyaes # https://github.com/ricmoo/pyaes
|
||||
|
||||
|
||||
version_tuple = (7, 1, 0)
|
||||
version_tuple = (8, 0, 0)
|
||||
version = version_string = __version__ = '%d.%d.%d' % version_tuple
|
||||
__author__ = 'rospogrigio'
|
||||
|
||||
@@ -192,7 +172,12 @@ payload_dict = {
|
||||
}
|
||||
}
|
||||
|
||||
class XenonDevice(object):
|
||||
#class PytuyaDevice(XenonDevice):
|
||||
# def __init__(self, dev_id, address, local_key=None, dev_type=None):
|
||||
# super(PytuyaDevice, self).__init__(dev_id, address, local_key, dev_type)
|
||||
|
||||
#class XenonDevice(object):
|
||||
class PytuyaDevice(object):
|
||||
def __init__(self, dev_id, address, local_key=None, connection_timeout=10):
|
||||
"""
|
||||
Represents a Tuya device.
|
||||
@@ -208,7 +193,6 @@ class XenonDevice(object):
|
||||
|
||||
self.id = dev_id
|
||||
self.address = address
|
||||
self.local_key = local_key
|
||||
self.local_key = local_key.encode('latin1')
|
||||
self.connection_timeout = connection_timeout
|
||||
self.version = 3.1
|
||||
@@ -332,11 +316,7 @@ class XenonDevice(object):
|
||||
#print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) ))
|
||||
#print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer)))
|
||||
return buffer
|
||||
|
||||
class Device(XenonDevice):
|
||||
def __init__(self, dev_id, address, local_key=None, dev_type=None):
|
||||
super(Device, self).__init__(dev_id, address, local_key, dev_type)
|
||||
|
||||
|
||||
def status(self):
|
||||
log.debug('status() entry (dev_type is %s)', self.dev_type)
|
||||
# open device, send request, then close connection
|
||||
@@ -384,52 +364,27 @@ class Device(XenonDevice):
|
||||
log.error('Unexpected status() payload=%r', result)
|
||||
|
||||
return result
|
||||
|
||||
def set_status(self, on, switch=1):
|
||||
"""
|
||||
Set status of the device to 'on' or 'off'.
|
||||
|
||||
Args:
|
||||
on(bool): True for 'on', False for 'off'.
|
||||
switch(int): The switch to set
|
||||
"""
|
||||
# open device, send request, then close connection
|
||||
if isinstance(switch, int):
|
||||
switch = str(switch) # index and payload is a string
|
||||
payload = self.generate_payload(SET, {switch:on})
|
||||
#print('payload %r' % payload)
|
||||
|
||||
data = self._send_receive(payload)
|
||||
log.debug('set_status received data=%r', data)
|
||||
|
||||
return data
|
||||
|
||||
def set_value(self, index, value):
|
||||
def set_dps(self, value, dpsIndex):
|
||||
"""
|
||||
Set int value of any index.
|
||||
|
||||
Args:
|
||||
index(int): index to set
|
||||
value(int): new value for the index
|
||||
dpsIndex(int): dps index to set
|
||||
value: new value for the dps index
|
||||
"""
|
||||
# open device, send request, then close connection
|
||||
if isinstance(index, int):
|
||||
index = str(index) # index and payload is a string
|
||||
if isinstance(dpsIndex, int):
|
||||
dpsIndex = str(dpsIndex) # index and payload is a string
|
||||
|
||||
payload = self.generate_payload(SET, {
|
||||
index: value})
|
||||
dpsIndex: value})
|
||||
|
||||
data = self._send_receive(payload)
|
||||
log.debug('set_dps received data=%r', data)
|
||||
|
||||
return data
|
||||
|
||||
def turn_on(self, switch=1):
|
||||
"""Turn the device on"""
|
||||
self.set_status(True, switch)
|
||||
|
||||
def turn_off(self, switch=1):
|
||||
"""Turn the device off"""
|
||||
self.set_status(False, switch)
|
||||
|
||||
def set_timer(self, num_secs):
|
||||
"""
|
||||
@@ -452,241 +407,3 @@ class Device(XenonDevice):
|
||||
data = self._send_receive(payload)
|
||||
log.debug('set_timer received data=%r', data)
|
||||
return data
|
||||
|
||||
class OutletDevice(Device):
|
||||
def __init__(self, dev_id, address, local_key=None):
|
||||
super(OutletDevice, self).__init__(dev_id, address, local_key)
|
||||
|
||||
class FanDevice(Device):
|
||||
DPS_INDEX_SPEED = '2'
|
||||
|
||||
def __init__(self, dev_id, address, local_key=None):
|
||||
super(FanDevice, self).__init__(dev_id, address, local_key)
|
||||
|
||||
class CoverEntity(Device):
|
||||
DPS_INDEX_MOVE = '1'
|
||||
DPS_INDEX_BL = '101'
|
||||
|
||||
DPS_2_STATE = {
|
||||
'1':'movement',
|
||||
'101':'backlight',
|
||||
}
|
||||
|
||||
def __init__(self, dev_id, address, local_key=None):
|
||||
print('%s version %s' % ( __name__, version))
|
||||
print('Python %s on %s' % (sys.version, sys.platform))
|
||||
if Crypto is None:
|
||||
print('Using pyaes version ', pyaes.VERSION)
|
||||
print('Using pyaes from ', pyaes.__file__)
|
||||
else:
|
||||
print('Using PyCrypto ', Crypto.version_info)
|
||||
print('Using PyCrypto from ', Crypto.__file__)
|
||||
super(CoverEntity, self).__init__(dev_id, address, local_key)
|
||||
|
||||
def open_cover(self, switch=1):
|
||||
"""Turn the device on"""
|
||||
self.set_status('on', switch)
|
||||
|
||||
def close_cover(self, switch=1):
|
||||
"""Turn the device off"""
|
||||
self.set_status('off', switch)
|
||||
|
||||
def stop_cover(self, switch=1):
|
||||
"""Turn the device off"""
|
||||
self.set_status('stop', switch)
|
||||
|
||||
|
||||
class BulbDevice(Device):
|
||||
DPS_INDEX_ON = '1'
|
||||
DPS_INDEX_MODE = '2'
|
||||
DPS_INDEX_BRIGHTNESS = '3'
|
||||
DPS_INDEX_COLOURTEMP = '4'
|
||||
DPS_INDEX_COLOUR = '5'
|
||||
|
||||
DPS = 'dps'
|
||||
DPS_MODE_COLOUR = 'colour'
|
||||
DPS_MODE_WHITE = 'white'
|
||||
|
||||
DPS_2_STATE = {
|
||||
'1':'is_on',
|
||||
'2':'mode',
|
||||
'3':'brightness',
|
||||
'4':'colourtemp',
|
||||
'5':'colour',
|
||||
}
|
||||
|
||||
def __init__(self, dev_id, address, local_key=None):
|
||||
super(BulbDevice, self).__init__(dev_id, address, local_key)
|
||||
|
||||
@staticmethod
|
||||
def _rgb_to_hexvalue(r, g, b):
|
||||
"""
|
||||
Convert an RGB value to the hex representation expected by tuya.
|
||||
|
||||
Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format:
|
||||
rrggbb0hhhssvv
|
||||
|
||||
While r, g and b are just hexadecimal values of the corresponding
|
||||
Red, Green and Blue values, the h, s and v values (which are values
|
||||
between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively.
|
||||
|
||||
Args:
|
||||
r(int): Value for the colour red as int from 0-255.
|
||||
g(int): Value for the colour green as int from 0-255.
|
||||
b(int): Value for the colour blue as int from 0-255.
|
||||
"""
|
||||
rgb = [r,g,b]
|
||||
hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255)
|
||||
|
||||
hexvalue = ""
|
||||
for value in rgb:
|
||||
temp = str(hex(int(value))).replace("0x","")
|
||||
if len(temp) == 1:
|
||||
temp = "0" + temp
|
||||
hexvalue = hexvalue + temp
|
||||
|
||||
hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
|
||||
hexvalue_hsv = ""
|
||||
for value in hsvarray:
|
||||
temp = str(hex(int(value))).replace("0x","")
|
||||
if len(temp) == 1:
|
||||
temp = "0" + temp
|
||||
hexvalue_hsv = hexvalue_hsv + temp
|
||||
if len(hexvalue_hsv) == 7:
|
||||
hexvalue = hexvalue + "0" + hexvalue_hsv
|
||||
else:
|
||||
hexvalue = hexvalue + "00" + hexvalue_hsv
|
||||
|
||||
return hexvalue
|
||||
|
||||
@staticmethod
|
||||
def _hexvalue_to_rgb(hexvalue):
|
||||
"""
|
||||
Converts the hexvalue used by tuya for colour representation into
|
||||
an RGB value.
|
||||
|
||||
Args:
|
||||
hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
|
||||
"""
|
||||
r = int(hexvalue[0:2], 16)
|
||||
g = int(hexvalue[2:4], 16)
|
||||
b = int(hexvalue[4:6], 16)
|
||||
|
||||
return (r, g, b)
|
||||
|
||||
@staticmethod
|
||||
def _hexvalue_to_hsv(hexvalue):
|
||||
"""
|
||||
Converts the hexvalue used by tuya for colour representation into
|
||||
an HSV value.
|
||||
|
||||
Args:
|
||||
hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue()
|
||||
"""
|
||||
h = int(hexvalue[7:10], 16) / 360
|
||||
s = int(hexvalue[10:12], 16) / 255
|
||||
v = int(hexvalue[12:14], 16) / 255
|
||||
|
||||
return (h, s, v)
|
||||
|
||||
def set_colour(self, r, g, b):
|
||||
"""
|
||||
Set colour of an rgb bulb.
|
||||
|
||||
Args:
|
||||
r(int): Value for the colour red as int from 0-255.
|
||||
g(int): Value for the colour green as int from 0-255.
|
||||
b(int): Value for the colour blue as int from 0-255.
|
||||
"""
|
||||
if not 0 <= r <= 255:
|
||||
raise ValueError("The value for red needs to be between 0 and 255.")
|
||||
if not 0 <= g <= 255:
|
||||
raise ValueError("The value for green needs to be between 0 and 255.")
|
||||
if not 0 <= b <= 255:
|
||||
raise ValueError("The value for blue needs to be between 0 and 255.")
|
||||
|
||||
#print(BulbDevice)
|
||||
hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b)
|
||||
|
||||
payload = self.generate_payload(SET, {
|
||||
self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR,
|
||||
self.DPS_INDEX_COLOUR: hexvalue})
|
||||
data = self._send_receive(payload)
|
||||
return data
|
||||
|
||||
def set_white(self, brightness, colourtemp):
|
||||
"""
|
||||
Set white coloured theme of an rgb bulb.
|
||||
|
||||
Args:
|
||||
brightness(int): Value for the brightness (25-255).
|
||||
colourtemp(int): Value for the colour temperature (0-255).
|
||||
"""
|
||||
if not 25 <= brightness <= 255:
|
||||
raise ValueError("The brightness needs to be between 25 and 255.")
|
||||
if not 0 <= colourtemp <= 255:
|
||||
raise ValueError("The colour temperature needs to be between 0 and 255.")
|
||||
|
||||
payload = self.generate_payload(SET, {
|
||||
self.DPS_INDEX_MODE: self.DPS_MODE_WHITE,
|
||||
self.DPS_INDEX_BRIGHTNESS: brightness,
|
||||
self.DPS_INDEX_COLOURTEMP: colourtemp})
|
||||
|
||||
data = self._send_receive(payload)
|
||||
return data
|
||||
|
||||
def set_brightness(self, brightness):
|
||||
"""
|
||||
Set the brightness value of an rgb bulb.
|
||||
|
||||
Args:
|
||||
brightness(int): Value for the brightness (25-255).
|
||||
"""
|
||||
if not 25 <= brightness <= 255:
|
||||
raise ValueError("The brightness needs to be between 25 and 255.")
|
||||
|
||||
payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness})
|
||||
data = self._send_receive(payload)
|
||||
return data
|
||||
|
||||
def set_colourtemp(self, colourtemp):
|
||||
"""
|
||||
Set the colour temperature of an rgb bulb.
|
||||
|
||||
Args:
|
||||
colourtemp(int): Value for the colour temperature (0-255).
|
||||
"""
|
||||
if not 0 <= colourtemp <= 255:
|
||||
raise ValueError("The colour temperature needs to be between 0 and 255.")
|
||||
|
||||
payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp})
|
||||
data = self._send_receive(payload)
|
||||
return data
|
||||
|
||||
def brightness(self):
|
||||
"""Return brightness value"""
|
||||
return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS]
|
||||
|
||||
def colourtemp(self):
|
||||
"""Return colour temperature"""
|
||||
return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP]
|
||||
|
||||
def colour_rgb(self):
|
||||
"""Return colour as RGB value"""
|
||||
hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
|
||||
return BulbDevice._hexvalue_to_rgb(hexvalue)
|
||||
|
||||
def colour_hsv(self):
|
||||
"""Return colour as HSV value"""
|
||||
hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR]
|
||||
return BulbDevice._hexvalue_to_hsv(hexvalue)
|
||||
|
||||
def state(self):
|
||||
status = self.status()
|
||||
state = {}
|
||||
|
||||
for key in status[self.DPS].keys():
|
||||
if(int(key)<=5):
|
||||
state[self.DPS_2_STATE[key]]=status[self.DPS][key]
|
||||
|
||||
return state
|
||||
|
Reference in New Issue
Block a user