Introduced cover support

This commit is contained in:
root
2020-03-23 10:15:26 +01:00
parent 161e0d9bfb
commit 1d9b5eb07a
4 changed files with 1322 additions and 697 deletions

View File

@@ -0,0 +1,208 @@
"""
Simple platform to control LOCALLY Tuya cover devices.
Sample config yaml
switch:
- platform: localtuya
host: 192.168.0.123
local_key: 1234567891234567
device_id: 123456789123456789abcd
name: Cover guests
protocol_version: 3.3
id: 1
"""
import logging
import requests
import voluptuous as vol
from homeassistant.components.cover import (
CoverDevice,
PLATFORM_SCHEMA,
SUPPORT_CLOSE,
SUPPORT_OPEN,
SUPPORT_STOP,
)
"""from . import DATA_TUYA, TuyaDevice"""
"""from homeassistant.components.cover import CoverDevice, PLATFORM_SCHEMA"""
from homeassistant.const import (CONF_HOST, CONF_ID, CONF_FRIENDLY_NAME, CONF_ICON, CONF_NAME)
import homeassistant.helpers.config_validation as cv
from time import time, sleep
from threading import Lock
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'localtuyacover'
REQUIREMENTS = ['pytuya==7.0.7']
CONF_DEVICE_ID = 'device_id'
CONF_LOCAL_KEY = 'local_key'
CONF_PROTOCOL_VERSION = 'protocol_version'
DEFAULT_ID = '1'
DEFAULT_PROTOCOL_VERSION = 3.3
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_ICON): cv.icon,
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_DEVICE_ID): cv.string,
vol.Required(CONF_LOCAL_KEY): cv.string,
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_PROTOCOL_VERSION, default=DEFAULT_PROTOCOL_VERSION): vol.Coerce(float),
vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string,
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up Tuya cover devices."""
from . import pytuya
covers = []
localtuyadevice = pytuya.CoverDevice(config.get(CONF_DEVICE_ID), config.get(CONF_HOST), config.get(CONF_LOCAL_KEY))
localtuyadevice.set_version(float(config.get(CONF_PROTOCOL_VERSION)))
cover_device = TuyaCoverCache(localtuyadevice)
covers.append(
TuyaDevice(
cover_device,
config.get(CONF_NAME),
config.get(CONF_ICON),
config.get(CONF_ID),
)
)
print('Setup localtuya cover [{}] with device ID [{}] '.format(config.get(CONF_NAME), config.get(CONF_ID)))
add_entities(covers)
class TuyaCoverCache:
"""Cache wrapper for pytuya.CoverDevice"""
def __init__(self, device):
"""Initialize the cache."""
self._cached_status = ''
self._cached_status_time = 0
self._device = device
self._lock = Lock()
def __get_status(self):
for i in range(20):
try:
status = self._device.status()
return status
except ConnectionError:
if i+1 == 3:
raise ConnectionError("Failed to update status.")
def set_status(self, state, switchid):
"""Change the Tuya switch status and clear the cache."""
self._cached_status = ''
self._cached_status_time = 0
for i in range(20):
try:
return self._device.set_status(state, switchid)
except ConnectionError:
if i+1 == 5:
raise ConnectionError("Failed to set status.")
def status(self):
"""Get state of Tuya switch and cache the results."""
self._lock.acquire()
try:
now = time()
if not self._cached_status or now - self._cached_status_time > 30:
sleep(0.5)
self._cached_status = self.__get_status()
self._cached_status_time = time()
return self._cached_status
finally:
self._lock.release()
class TuyaDevice(CoverDevice):
"""Tuya cover devices."""
def __init__(self, device, name, icon, switchid):
self._device = device
self._name = name
self._icon = icon
self._switch_id = switchid
#self.entity_id = ENTITY_ID_FORMAT.format(_device.object_id())
self._status = self._device.status()
self._state = self._status['dps'][self._switch_id]
print('Initialized tuya cover [{}] with switch status [{}] and state [{}]'.format(self._name, self._status, self._state))
@property
def name(self):
"""Get name of Tuya switch."""
return self._name
@property
def supported_features(self):
"""Flag supported features."""
supported_features = SUPPORT_OPEN | SUPPORT_CLOSE | SUPPORT_STOP
return supported_features
@property
def icon(self):
"""Return the icon."""
return self._icon
@property
def is_opening(self):
#self.update()
state = self._state
#print('is_opening() : state [{}]'.format(state))
if state == 'on':
return True
return False
@property
def is_closing(self):
#self.update()
state = self._state
#print('is_closing() : state [{}]'.format(state))
if state == 'off':
return True
return False
@property
def is_closed(self):
"""Return if the cover is closed or not."""
#self.update()
state = self._state
#print('is_closed() : state [{}]'.format(state))
if state == 'off':
return False
if state == 'on':
return True
return None
def open_cover(self, **kwargs):
"""Open the cover."""
self._device.set_status('on', self._switch_id)
# self._state = 'on'
# self._device._device.open_cover()
def close_cover(self, **kwargs):
"""Close cover."""
self._device.set_status('off', self._switch_id)
# self._state = 'off'
# self._device._device.close_cover()
def stop_cover(self, **kwargs):
"""Stop the cover."""
self._device.set_status('stop', self._switch_id)
# self._state = 'stop'
# self._device._device.stop_cover()
def update(self):
"""Get state of Tuya switch."""
self._status = self._device.status()
self._state = self._status['dps'][self._switch_id]
#print('update() : state [{}]'.format(self._state))

View File

@@ -0,0 +1,288 @@
"""
Simple platform to control LOCALLY Tuya switch devices.
Sample config yaml
switch:
- platform: localtuya
host: 192.168.0.1
local_key: 1234567891234567
device_id: 12345678912345671234
name: tuya_01
protocol_version: 3.3
"""
import voluptuous as vol
from homeassistant.const import (CONF_HOST, CONF_ID, CONF_SWITCHES, CONF_FRIENDLY_NAME, CONF_ICON, CONF_NAME)
import homeassistant.helpers.config_validation as cv
from time import time, sleep
from threading import Lock
import logging
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_TEMP,
ATTR_HS_COLOR,
ENTITY_ID_FORMAT,
SUPPORT_BRIGHTNESS,
SUPPORT_COLOR,
SUPPORT_COLOR_TEMP,
Light,
PLATFORM_SCHEMA
)
from homeassistant.util import color as colorutil
import socket
REQUIREMENTS = ['pytuya==7.0.4']
CONF_DEVICE_ID = 'device_id'
CONF_LOCAL_KEY = 'local_key'
CONF_PROTOCOL_VERSION = 'protocol_version'
# IMPORTANT, id is used as key for state and turning on and off, 1 was fine switched apparently but my bulbs need 20, other feature attributes count up from this, e.g. 21 mode, 22 brightnes etc, see my pytuya modification.
DEFAULT_ID = '1'
DEFAULT_PROTOCOL_VERSION = 3.3
MIN_MIRED = 153
MAX_MIRED = 370
UPDATE_RETRY_LIMIT = 3
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_ICON): cv.icon,
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_DEVICE_ID): cv.string,
vol.Required(CONF_LOCAL_KEY): cv.string,
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_PROTOCOL_VERSION, default=DEFAULT_PROTOCOL_VERSION): vol.Coerce(float),
vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string,
})
log = logging.getLogger(__name__)
log.setLevel(level=logging.DEBUG) # Debug hack!
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up of the Tuya switch."""
from . import pytuya
lights = []
pytuyadevice = pytuya.BulbDevice(config.get(CONF_DEVICE_ID), config.get(CONF_HOST), config.get(CONF_LOCAL_KEY))
pytuyadevice.set_version(float(config.get(CONF_PROTOCOL_VERSION)))
bulb_device = TuyaCache(pytuyadevice)
lights.append(
TuyaDevice(
bulb_device,
config.get(CONF_NAME),
config.get(CONF_ICON),
config.get(CONF_ID)
)
)
add_devices(lights)
class TuyaCache:
"""Cache wrapper for pytuya.BulbDevice"""
def __init__(self, device):
"""Initialize the cache."""
self._cached_status = ''
self._cached_status_time = 0
self._device = device
self._lock = Lock()
def __get_status(self, switchid):
for _ in range(UPDATE_RETRY_LIMIT):
try:
status = self._device.status()['dps'][switchid]
return status
except ConnectionError:
pass
except socket.timeout:
pass
log.warn(
"Failed to get status after {} tries".format(UPDATE_RETRY_LIMIT))
def set_status(self, state, switchid):
"""Change the Tuya switch status and clear the cache."""
self._cached_status = ''
self._cached_status_time = 0
for _ in range(UPDATE_RETRY_LIMIT):
try:
return self._device.set_status(state, switchid)
except ConnectionError:
pass
except socket.timeout:
pass
log.warn(
"Failed to set status after {} tries".format(UPDATE_RETRY_LIMIT))
def status(self, switchid):
"""Get state of Tuya switch and cache the results."""
self._lock.acquire()
try:
now = time()
if not self._cached_status or now - self._cached_status_time > 30:
sleep(0.5)
self._cached_status = self.__get_status(switchid)
self._cached_status_time = time()
return self._cached_status
finally:
self._lock.release()
def cached_status(self):
return self._cached_status
def support_color(self):
return self._device.support_color()
def support_color_temp(self):
return self._device.support_color_temp()
def brightness(self):
for _ in range(UPDATE_RETRY_LIMIT):
try:
return self._device.brightness()
except ConnectionError:
pass
except socket.timeout:
pass
log.warn(
"Failed to get brightness after {} tries".format(UPDATE_RETRY_LIMIT))
def color_temp(self):
for _ in range(UPDATE_RETRY_LIMIT):
try:
return self._device.colourtemp()
except ConnectionError:
pass
except socket.timeout:
pass
log.warn(
"Failed to get color temp after {} tries".format(UPDATE_RETRY_LIMIT))
def set_brightness(self, brightness):
for _ in range(UPDATE_RETRY_LIMIT):
try:
return self._device.set_brightness(brightness)
except ConnectionError:
pass
except socket.timeout:
pass
log.warn(
"Failed to set brightness after {} tries".format(UPDATE_RETRY_LIMIT))
def set_color_temp(self, color_temp):
for _ in range(UPDATE_RETRY_LIMIT):
try:
return self._device.set_colourtemp(color_temp)
except ConnectionError:
pass
except socket.timeout:
pass
log.warn(
"Failed to set color temp after {} tries".format(UPDATE_RETRY_LIMIT))
def state(self):
self._device.state();
def turn_on(self):
self._device.turn_on();
def turn_off(self):
self._device.turn_off();
class TuyaDevice(Light):
"""Representation of a Tuya switch."""
def __init__(self, device, name, icon, bulbid):
"""Initialize the Tuya switch."""
self._device = device
self._name = name
self._state = False
self._brightness = 127
self._color_temp = 127
self._icon = icon
self._bulb_id = bulbid
@property
def name(self):
"""Get name of Tuya switch."""
return self._name
@property
def is_on(self):
"""Check if Tuya switch is on."""
return self._state
@property
def icon(self):
"""Return the icon."""
return self._icon
def update(self):
"""Get state of Tuya switch."""
status = self._device.status(self._bulb_id)
self._state = status
try:
brightness = int(self._device.brightness())
if brightness > 254:
brightness = 255
if brightness < 25:
brightness = 25
self._brightness = brightness
except TypeError:
pass
self._color_temp = self._device.color_temp()
@property
def brightness(self):
"""Return the brightness of the light."""
return self._brightness
# @property
# def hs_color(self):
# """Return the hs_color of the light."""
# return (self._device.color_hsv()[0],self._device.color_hsv()[1])
@property
def color_temp(self):
"""Return the color_temp of the light."""
try:
return int(MAX_MIRED - (((MAX_MIRED - MIN_MIRED) / 255) * self._color_temp))
except TypeError:
pass
@property
def min_mireds(self):
"""Return color temperature min mireds."""
return MIN_MIRED
@property
def max_mireds(self):
"""Return color temperature max mireds."""
return MAX_MIRED
def turn_on(self, **kwargs):
"""Turn on or control the light."""
log.debug("Turning on, state: " + str(self._device.cached_status()))
if not self._device.cached_status():
self._device.set_status(True, self._bulb_id)
if ATTR_BRIGHTNESS in kwargs:
converted_brightness = int(kwargs[ATTR_BRIGHTNESS])
if converted_brightness <= 25:
converted_brightness = 25
self._device.set_brightness(converted_brightness)
if ATTR_HS_COLOR in kwargs:
raise ValueError(" TODO implement RGB from HS")
if ATTR_COLOR_TEMP in kwargs:
color_temp = int(255 - (255 / (MAX_MIRED - MIN_MIRED)) * (int(kwargs[ATTR_COLOR_TEMP]) - MIN_MIRED))
self._device.set_color_temp(color_temp)
def turn_off(self, **kwargs):
"""Turn Tuya switch off."""
self._device.set_status(False, self._bulb_id)
@property
def supported_features(self):
"""Flag supported features."""
supports = SUPPORT_BRIGHTNESS
#if self._device.support_color():
# supports = supports | SUPPORT_COLOR
supports = supports | SUPPORT_COLOR_TEMP
return supports

View File

@@ -1,10 +1,9 @@
# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices # Python module to interface locally with Tuya WiFi smart devices (switches, lights and covers)
# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U
# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug
# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa
# #
# This would not exist without the protocol reverse engineering from # Developed by merging the code from:
# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes # https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
# https://github.com/mileperhour/localtuya-homeassistant
# https://github.com/NameLessJedi/localtuya-homeassistant
# #
# Tested with Python 2.7 and Python 3.6.1 only # Tested with Python 2.7 and Python 3.6.1 only
@@ -17,6 +16,7 @@ import socket
import sys import sys
import time import time
import colorsys import colorsys
import binascii
try: try:
#raise ImportError #raise ImportError
@@ -27,14 +27,15 @@ except ImportError:
import pyaes # https://github.com/ricmoo/pyaes import pyaes # https://github.com/ricmoo/pyaes
version_tuple = (7, 0, 2) version_tuple = (7, 0, 7)
version = version_string = __version__ = '%d.%d.%d' % version_tuple version = version_string = __version__ = '%d.%d.%d' % version_tuple
__author__ = 'clach04' __author__ = 'rospogrigio'
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
logging.basicConfig() # TODO include function name/line numbers in log logging.basicConfig() # TODO include function name/line numbers in log
#log.setLevel(level=logging.DEBUG) # Debug hack! log.setLevel(level=logging.DEBUG) # Debug hack!
log.info('%s version %s', __name__, version)
log.info('Python %s on %s', sys.version, sys.platform) log.info('Python %s on %s', sys.version, sys.platform)
if Crypto is None: if Crypto is None:
log.info('Using pyaes version %r', pyaes.VERSION) log.info('Using pyaes version %r', pyaes.VERSION)
@@ -44,8 +45,10 @@ else:
log.info('Using PyCrypto from %r', Crypto.__file__) log.info('Using PyCrypto from %r', Crypto.__file__)
SET = 'set' SET = 'set'
STATUS = 'status'
PROTOCOL_VERSION_BYTES = b'3.1' PROTOCOL_VERSION_BYTES_31 = b'3.1'
PROTOCOL_VERSION_BYTES_33 = b'3.3'
IS_PY2 = sys.version_info[0] == 2 IS_PY2 = sys.version_info[0] == 2
@@ -54,7 +57,7 @@ class AESCipher(object):
#self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/
self.bs = 16 self.bs = 16
self.key = key self.key = key
def encrypt(self, raw): def encrypt(self, raw, use_base64 = True):
if Crypto: if Crypto:
raw = self._pad(raw) raw = self._pad(raw)
cipher = AES.new(self.key, mode=AES.MODE_ECB) cipher = AES.new(self.key, mode=AES.MODE_ECB)
@@ -66,11 +69,14 @@ class AESCipher(object):
crypted_text += cipher.feed() # flush final block crypted_text += cipher.feed() # flush final block
#print('crypted_text %r' % crypted_text) #print('crypted_text %r' % crypted_text)
#print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text))
crypted_text_b64 = base64.b64encode(crypted_text) if use_base64:
#print('crypted_text_b64 (%d) %r' % (len(crypted_text_b64), crypted_text_b64)) return base64.b64encode(crypted_text)
return crypted_text_b64 else:
def decrypt(self, enc): return crypted_text
enc = base64.b64decode(enc)
def decrypt(self, enc, use_base64=True):
if use_base64:
enc = base64.b64decode(enc)
#print('enc (%d) %r' % (len(enc), enc)) #print('enc (%d) %r' % (len(enc), enc))
#enc = self._unpad(enc) #enc = self._unpad(enc)
#enc = self._pad(enc) #enc = self._pad(enc)
@@ -125,6 +131,18 @@ payload_dict = {
}, },
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"suffix": "000000000000aa55" "suffix": "000000000000aa55"
},
"cover": {
"status": {
"hexByte": "0d",
"command": {"devId": "", "uid": "", "t": ""}
},
"set": {
"hexByte": "07",
"command": {"devId": "", "uid": "", "t": ""}
},
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"suffix": "000000000000aa55"
} }
} }
@@ -150,13 +168,14 @@ class XenonDevice(object):
self.local_key = local_key.encode('latin1') self.local_key = local_key.encode('latin1')
self.dev_type = dev_type self.dev_type = dev_type
self.connection_timeout = connection_timeout self.connection_timeout = connection_timeout
self.version = 3.1
self.port = 6668 # default - do not expect caller to pass in self.port = 6668 # default - do not expect caller to pass in
def __repr__(self): def __repr__(self):
return '%r' % ((self.id, self.address),) # FIXME can do better than this return '%r' % ((self.id, self.address),) # FIXME can do better than this
def _send_receive(self, payload): def _send_receive(self, payload, times=1):
""" """
Send single buffer `payload` and receive a single buffer. Send single buffer `payload` and receive a single buffer.
@@ -169,9 +188,17 @@ class XenonDevice(object):
s.connect((self.address, self.port)) s.connect((self.address, self.port))
s.send(payload) s.send(payload)
data = s.recv(1024) data = s.recv(1024)
#print("FIRST: Received %d bytes" % len(data) )
if times > 1:
time.sleep(0.1)
data = s.recv(1024)
#print("SECOND: Received %d bytes" % len(data) )
s.close() s.close()
return data return data
def set_version(self, version):
self.version = version
def generate_payload(self, command, data=None): def generate_payload(self, command, data=None):
""" """
Generate the payload to send. Generate the payload to send.
@@ -183,6 +210,7 @@ class XenonDevice(object):
This is what will be passed via the 'dps' entry This is what will be passed via the 'dps' entry
""" """
json_data = payload_dict[self.dev_type][command]['command'] json_data = payload_dict[self.dev_type][command]['command']
command_hb = payload_dict[self.dev_type][command]['hexByte']
if 'gwId' in json_data: if 'gwId' in json_data:
json_data['gwId'] = self.id json_data['gwId'] = self.id
@@ -195,6 +223,8 @@ class XenonDevice(object):
if data is not None: if data is not None:
json_data['dps'] = data json_data['dps'] = data
if command_hb == '0d':
json_data['dps'] = {"1": None}
# Create byte buffer from hex data # Create byte buffer from hex data
json_payload = json.dumps(json_data) json_payload = json.dumps(json_data)
@@ -202,14 +232,22 @@ class XenonDevice(object):
json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond!
json_payload = json_payload.encode('utf-8') json_payload = json_payload.encode('utf-8')
log.debug('json_payload=%r', json_payload) log.debug('json_payload=%r', json_payload)
#print('json_payload = ', json_payload, ' cmd = ', command_hb)
if command == SET: if self.version == 3.3:
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload, False)
self.cipher = None
if command_hb != '0a':
# add the 3.3 header
json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload
elif command == SET:
# need to encrypt # need to encrypt
#print('json_payload %r' % json_payload) #print('json_payload %r' % json_payload)
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload) json_payload = self.cipher.encrypt(json_payload)
#print('crypted json_payload %r' % json_payload) #print('crypted json_payload %r' % json_payload)
preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key
#print('preMd5String %r' % preMd5String) #print('preMd5String %r' % preMd5String)
m = md5() m = md5()
m.update(preMd5String) m.update(preMd5String)
@@ -217,7 +255,7 @@ class XenonDevice(object):
hexdigest = m.hexdigest() hexdigest = m.hexdigest()
#print(hexdigest) #print(hexdigest)
#print(hexdigest[8:][:16]) #print(hexdigest[8:][:16])
json_payload = PROTOCOL_VERSION_BYTES + hexdigest[8:][:16].encode('latin1') + json_payload json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload
#print('data_to_send') #print('data_to_send')
#print(json_payload) #print(json_payload)
#print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload))
@@ -238,13 +276,17 @@ class XenonDevice(object):
payload_dict[self.dev_type][command]['hexByte'] + payload_dict[self.dev_type][command]['hexByte'] +
'000000' + '000000' +
postfix_payload_hex_len ) + postfix_payload postfix_payload_hex_len ) + postfix_payload
# calc the CRC of everything except where the CRC goes and the suffix
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X')
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:]
#print('command', command) #print('command', command)
#print('prefix') #print('prefix')
#print(payload_dict[self.dev_type][command]['prefix']) #print(payload_dict[self.dev_type][command]['prefix'])
#print(repr(buffer)) #print(repr(buffer))
#print(bin2hex(buffer, pretty=True))
#print(bin2hex(buffer, pretty=False)) #print(bin2hex(buffer, pretty=False))
#print('full buffer(%d) %r' % (len(buffer), buffer)) #print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) ))
#print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer)))
return buffer return buffer
class Device(XenonDevice): class Device(XenonDevice):
@@ -252,14 +294,21 @@ class Device(XenonDevice):
super(Device, self).__init__(dev_id, address, local_key, dev_type) super(Device, self).__init__(dev_id, address, local_key, dev_type)
def status(self): def status(self):
log.debug('status() entry') if self.dev_type == 'cover':
recv_times = 2
else:
recv_times = 1
log.debug('status() entry (dev_type is %s, recv_times = %d)', self.dev_type, recv_times)
# open device, send request, then close connection # open device, send request, then close connection
payload = self.generate_payload('status') payload = self.generate_payload('status')
data = self._send_receive(payload) data = self._send_receive(payload, recv_times)
log.debug('status received data=%r', data) log.debug('status received data=%r', data)
result = data[20:-8] # hard coded offsets result = data[20:-8] # hard coded offsets
if self.dev_type == 'cover':
result = result[15:]
log.debug('result=%r', result) log.debug('result=%r', result)
#result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer #result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer
#print('result %r' % result) #print('result %r' % result)
@@ -268,11 +317,11 @@ class Device(XenonDevice):
if not isinstance(result, str): if not isinstance(result, str):
result = result.decode() result = result.decode()
result = json.loads(result) result = json.loads(result)
elif result.startswith(PROTOCOL_VERSION_BYTES): elif result.startswith(PROTOCOL_VERSION_BYTES_31):
# got an encrypted payload, happens occasionally # got an encrypted payload, happens occasionally
# expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
# NOTE dps.2 may or may not be present # NOTE dps.2 may or may not be present
result = result[len(PROTOCOL_VERSION_BYTES):] # remove version header result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header
result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload
cipher = AESCipher(self.local_key) cipher = AESCipher(self.local_key)
result = cipher.decrypt(result) result = cipher.decrypt(result)
@@ -280,9 +329,19 @@ class Device(XenonDevice):
if not isinstance(result, str): if not isinstance(result, str):
result = result.decode() result = result.decode()
result = json.loads(result) result = json.loads(result)
elif self.version == 3.3:
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result, False)
log.debug('decrypted result=%r', result)
if not isinstance(result, str):
result = result.decode()
result = json.loads(result)
else: else:
log.error('Unexpected status() payload=%r', result) log.error('Unexpected status() payload=%r', result)
# if self.dev_type == 'cover':
# result = result.encode('utf-8')
# log.debug('encoded result=%r', result)
return result return result
def set_status(self, on, switch=1): def set_status(self, on, switch=1):
@@ -304,6 +363,25 @@ class Device(XenonDevice):
return data return data
def set_value(self, index, value):
"""
Set int value of any index.
Args:
index(int): index to set
value(int): new value for the index
"""
# open device, send request, then close connection
if isinstance(index, int):
index = str(index) # index and payload is a string
payload = self.generate_payload(SET, {
index: value})
data = self._send_receive(payload)
return data
def turn_on(self, switch=1): def turn_on(self, switch=1):
"""Turn the device on""" """Turn the device on"""
self.set_status(True, switch) self.set_status(True, switch)
@@ -339,6 +417,43 @@ class OutletDevice(Device):
dev_type = 'device' dev_type = 'device'
super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type)
class CoverDevice(Device):
DPS_INDEX_MOVE = '1'
DPS_INDEX_BL = '101'
DPS = 'dps'
DPS_2_STATE = {
'1':'movement',
'101':'backlight',
}
def __init__(self, dev_id, address, local_key=None):
dev_type = 'cover'
print('%s version %s' % ( __name__, version))
print('Python %s on %s' % (sys.version, sys.platform))
if Crypto is None:
print('Using pyaes version ', pyaes.VERSION)
print('Using pyaes from ', pyaes.__file__)
else:
print('Using PyCrypto ', Crypto.version_info)
print('Using PyCrypto from ', Crypto.__file__)
super(CoverDevice, self).__init__(dev_id, address, local_key, dev_type)
def open_cover(self, switch=1):
"""Turn the device on"""
self.set_status('on', switch)
def close_cover(self, switch=1):
"""Turn the device off"""
self.set_status('off', switch)
def stop_cover(self, switch=1):
"""Turn the device off"""
self.set_status('stop', switch)
class BulbDevice(Device): class BulbDevice(Device):
DPS_INDEX_ON = '1' DPS_INDEX_ON = '1'
DPS_INDEX_MODE = '2' DPS_INDEX_MODE = '2'
@@ -350,6 +465,14 @@ class BulbDevice(Device):
DPS_MODE_COLOUR = 'colour' DPS_MODE_COLOUR = 'colour'
DPS_MODE_WHITE = 'white' DPS_MODE_WHITE = 'white'
DPS_2_STATE = {
'1':'is_on',
'2':'mode',
'3':'brightness',
'4':'colourtemp',
'5':'colour',
}
def __init__(self, dev_id, address, local_key=None): def __init__(self, dev_id, address, local_key=None):
dev_type = 'device' dev_type = 'device'
super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type)
@@ -441,7 +564,7 @@ class BulbDevice(Device):
if not 0 <= b <= 255: if not 0 <= b <= 255:
raise ValueError("The value for blue needs to be between 0 and 255.") raise ValueError("The value for blue needs to be between 0 and 255.")
print(BulbDevice) #print(BulbDevice)
hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b)
payload = self.generate_payload(SET, { payload = self.generate_payload(SET, {
@@ -519,11 +642,10 @@ class BulbDevice(Device):
def state(self): def state(self):
status = self.status() status = self.status()
state = { state = {}
'is_on' : status[self.DPS][self.DPS_INDEX_ON],
'mode' : status[self.DPS][self.DPS_INDEX_MODE], for key in status[self.DPS].keys():
'brightness' : status[self.DPS][self.DPS_INDEX_BRIGHTNESS], if(int(key)<=5):
'colourtemp' : status[self.DPS][self.DPS_INDEX_COLOURTEMP], state[self.DPS_2_STATE[key]]=status[self.DPS][key]
'colour' : status[self.DPS][self.DPS_INDEX_COLOUR],
}
return state return state

View File

@@ -1,79 +1,74 @@
""" """
Simple platform to control **SOME** Tuya switch devices. Simple platform to control LOCALLY Tuya switch devices.
For more details about this platform, please refer to the documentation at Sample config yaml
https://home-assistant.io/components/switch.tuya/
switch:
- platform: localtuya
host: 192.168.0.1
local_key: 1234567891234567
device_id: 12345678912345671234
name: tuya_01
protocol_version: 3.3
""" """
import voluptuous as vol import voluptuous as vol
from homeassistant.components.switch import SwitchDevice, PLATFORM_SCHEMA from homeassistant.components.switch import SwitchDevice, PLATFORM_SCHEMA
from homeassistant.const import (CONF_NAME, CONF_HOST, CONF_ID, CONF_SWITCHES, CONF_FRIENDLY_NAME, CONF_ICON) from homeassistant.const import (CONF_HOST, CONF_ID, CONF_SWITCHES, CONF_FRIENDLY_NAME, CONF_ICON, CONF_NAME)
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from time import time from time import time, sleep
from threading import Lock from threading import Lock
REQUIREMENTS = ['pytuya==7.0.4'] REQUIREMENTS = ['pytuya==7.0.7']
CONF_DEVICE_ID = 'device_id' CONF_DEVICE_ID = 'device_id'
CONF_LOCAL_KEY = 'local_key' CONF_LOCAL_KEY = 'local_key'
CONF_PROTOCOL_VERSION = 'protocol_version'
CONF_CURRENT = 'current'
CONF_CURRENT_CONSUMPTION = 'current_consumption'
CONF_VOLTAGE = 'voltage'
DEFAULT_ID = '1' DEFAULT_ID = '1'
DEFAULT_PROTOCOL_VERSION = 3.3
ATTR_CURRENT = 'current' ATTR_CURRENT = 'current'
ATTR_CURRENT_CONSUMPTION = 'current_consumption' ATTR_CURRENT_CONSUMPTION = 'current_consumption'
ATTR_VOLTAGE = 'voltage' ATTR_VOLTAGE = 'voltage'
SWITCH_SCHEMA = vol.Schema({
vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string,
vol.Optional(CONF_FRIENDLY_NAME): cv.string,
})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_ICON): cv.icon, vol.Optional(CONF_ICON): cv.icon,
vol.Required(CONF_HOST): cv.string, vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_DEVICE_ID): cv.string, vol.Required(CONF_DEVICE_ID): cv.string,
vol.Required(CONF_LOCAL_KEY): cv.string, vol.Required(CONF_LOCAL_KEY): cv.string,
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_PROTOCOL_VERSION, default=DEFAULT_PROTOCOL_VERSION): vol.Coerce(float),
vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string, vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string,
vol.Optional(CONF_SWITCHES, default={}): vol.Optional(CONF_CURRENT, default='4'): cv.string,
vol.Schema({cv.slug: SWITCH_SCHEMA}), vol.Optional(CONF_CURRENT_CONSUMPTION, default='5'): cv.string,
vol.Optional(CONF_VOLTAGE, default='6'): cv.string,
}) })
def setup_platform(hass, config, add_devices, discovery_info=None): def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up of the Tuya switch.""" """Set up of the Tuya switch."""
import pytuya from . import pytuya
devices = config.get(CONF_SWITCHES)
switches = [] switches = []
pytuyadevice = pytuya.OutletDevice(config.get(CONF_DEVICE_ID), config.get(CONF_HOST), config.get(CONF_LOCAL_KEY))
pytuyadevice.set_version(float(config.get(CONF_PROTOCOL_VERSION)))
outlet_device = TuyaCache( outlet_device = TuyaCache(pytuyadevice)
pytuya.OutletDevice( switches.append(
config.get(CONF_DEVICE_ID), TuyaDevice(
config.get(CONF_HOST), outlet_device,
config.get(CONF_LOCAL_KEY) config.get(CONF_NAME),
) config.get(CONF_ICON),
config.get(CONF_ID),
config.get(CONF_CURRENT),
config.get(CONF_CURRENT_CONSUMPTION),
config.get(CONF_VOLTAGE)
)
) )
#print('Setup localtuya switch [{}] with device ID [{}] '.format(config.get(CONF_NAME), config.get(CONF_ID)))
for object_id, device_config in devices.items():
switches.append(
TuyaDevice(
outlet_device,
device_config.get(CONF_FRIENDLY_NAME, object_id),
device_config.get(CONF_ICON),
device_config.get(CONF_ID)
)
)
name = config.get(CONF_NAME)
if name:
switches.append(
TuyaDevice(
outlet_device,
name,
config.get(CONF_ICON),
config.get(CONF_ID)
)
)
add_devices(switches) add_devices(switches)
@@ -88,7 +83,7 @@ class TuyaCache:
self._lock = Lock() self._lock = Lock()
def __get_status(self): def __get_status(self):
for i in range(3): for i in range(20):
try: try:
status = self._device.status() status = self._device.status()
return status return status
@@ -100,14 +95,20 @@ class TuyaCache:
"""Change the Tuya switch status and clear the cache.""" """Change the Tuya switch status and clear the cache."""
self._cached_status = '' self._cached_status = ''
self._cached_status_time = 0 self._cached_status_time = 0
return self._device.set_status(state, switchid) for i in range(20):
try:
return self._device.set_status(state, switchid)
except ConnectionError:
if i+1 == 5:
raise ConnectionError("Failed to set status.")
def status(self): def status(self):
"""Get state of Tuya switch and cache the results.""" """Get state of Tuya switch and cache the results."""
self._lock.acquire() self._lock.acquire()
try: try:
now = time() now = time()
if not self._cached_status or now - self._cached_status_time > 20: if not self._cached_status or now - self._cached_status_time > 30:
sleep(0.5)
self._cached_status = self.__get_status() self._cached_status = self.__get_status()
self._cached_status_time = time() self._cached_status_time = time()
return self._cached_status return self._cached_status
@@ -117,14 +118,18 @@ class TuyaCache:
class TuyaDevice(SwitchDevice): class TuyaDevice(SwitchDevice):
"""Representation of a Tuya switch.""" """Representation of a Tuya switch."""
def __init__(self, device, name, icon, switchid): def __init__(self, device, name, icon, switchid, attr_current, attr_consumption, attr_voltage):
"""Initialize the Tuya switch.""" """Initialize the Tuya switch."""
self._device = device self._device = device
self._name = name self._name = name
self._state = False
self._icon = icon self._icon = icon
self._switchid = switchid self._switch_id = switchid
self._attr_current = attr_current
self._attr_consumption = attr_consumption
self._attr_voltage = attr_voltage
self._status = self._device.status() self._status = self._device.status()
self._state = self._status['dps'][self._switch_id]
print('Initialized tuya switch [{}] with switch status [{}] and state [{}]'.format(self._name, self._status, self._state))
@property @property
def name(self): def name(self):
@@ -140,9 +145,13 @@ class TuyaDevice(SwitchDevice):
def device_state_attributes(self): def device_state_attributes(self):
attrs = {} attrs = {}
try: try:
attrs[ATTR_CURRENT] = "{}".format(self._status['dps']['104']) attrs[ATTR_CURRENT] = "{}".format(self._status['dps'][self._attr_current])
attrs[ATTR_CURRENT_CONSUMPTION] = "{}".format(self._status['dps']['105']/10) #print('attrs[ATTR_CURRENT]: [{}]'.format(attrs[ATTR_CURRENT]))
attrs[ATTR_VOLTAGE] = "{}".format(self._status['dps']['106']/10) attrs[ATTR_CURRENT_CONSUMPTION] = "{}".format(self._status['dps'][self._attr_consumption]/10)
#print('attrs[ATTR_CURRENT_CONSUMPTION]: [{}]'.format(attrs[ATTR_CURRENT_CONSUMPTION]))
attrs[ATTR_VOLTAGE] = "{}".format(self._status['dps'][self._attr_voltage]/10)
#print('attrs[ATTR_VOLTAGE]: [{}]'.format(attrs[ATTR_VOLTAGE]))
except KeyError: except KeyError:
pass pass
return attrs return attrs
@@ -154,15 +163,13 @@ class TuyaDevice(SwitchDevice):
def turn_on(self, **kwargs): def turn_on(self, **kwargs):
"""Turn Tuya switch on.""" """Turn Tuya switch on."""
self._device.set_status(True, self._switchid) self._device.set_status(True, self._switch_id)
def turn_off(self, **kwargs): def turn_off(self, **kwargs):
"""Turn Tuya switch off.""" """Turn Tuya switch off."""
self._device.set_status(False, self._switchid) self._device.set_status(False, self._switch_id)
def update(self): def update(self):
"""Get state of Tuya switch.""" """Get state of Tuya switch."""
status = self._device.status() self._status = self._device.status()
self._status= status self._state = self._status['dps'][self._switch_id]
self._state = status['dps'][self._switchid]