From 63d1ef26d4feebff87b367f0c8c6066f8db20c28 Mon Sep 17 00:00:00 2001 From: rospogrigio Date: Mon, 7 Sep 2020 14:46:35 +0200 Subject: [PATCH] Cleaned up pytuya code --- .../localtuya/pytuya/__init__.py | 91 +++++++++++-------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/custom_components/localtuya/pytuya/__init__.py b/custom_components/localtuya/pytuya/__init__.py index 0ac26fb..6273eb0 100644 --- a/custom_components/localtuya/pytuya/__init__.py +++ b/custom_components/localtuya/pytuya/__init__.py @@ -1,13 +1,58 @@ -# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices -# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U -# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug -# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa -# -# This would not exist without the protocol reverse engineering from -# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes -# -# Tested with Python 2.7 and Python 3.6.1 only +# TinyTuya Module +# -*- coding: utf-8 -*- +""" + Python module to interface with Tuya WiFi smart devices + Mostly derived from Shenzhen Xenon ESP8266MOD WiFi smart devices + E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U + Author: clach04 + Maintained by: rospogrigio + + For more information see https://github.com/clach04/python-tuya + + Classes + OutletDevice(dev_id, address, local_key=None) + CoverDevice(dev_id, address, local_key=None) + BulbDevice(dev_id, address, local_key=None) + + dev_id (str): Device ID e.g. 01234567891234567890 + address (str): Device Network IP Address e.g. 10.0.1.99 + local_key (str, optional): The encryption key. Defaults to None. + + Functions + json = status() # returns json payload + set_version(version) # 3.1 [default] or 3.3 + set_dpsUsed(dpsUsed) + set_status(on, switch=1) # Set status of the device to 'on' or 'off' (bool) + set_value(index, value) # Set int value of any index. + turn_on(switch=1): + turn_off(switch=1): + set_timer(num_secs): + + CoverDevice: + open_cover(switch=1): + close_cover(switch=1): + stop_cover(switch=1): + + BulbDevice + set_colour(r, g, b): + set_white(brightness, colourtemp): + set_brightness(brightness): + set_colourtemp(colourtemp): + result = brightness(): + result = colourtemp(): + (r, g, b) = colour_rgb(): + (h,s,v) = colour_hsv() + result = state(): + + Credits + * TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes + For protocol reverse engineering + * PyTuya https://github.com/clach04/python-tuya by clach04 + The origin of this python module (now abandoned) + * LocalTuya https://github.com/rospogrigio/localtuya-homeassistant by rospogrigio + Updated pytuya to support devices with Device IDs of 22 characters +""" import base64 from hashlib import md5 @@ -34,7 +79,7 @@ __author__ = 'rospogrigio' log = logging.getLogger(__name__) logging.basicConfig() # TODO include function name/line numbers in log -#log.setLevel(level=logging.DEBUG) # Debug hack! +#log.setLevel(level=logging.DEBUG) # Uncomment to Debug log.debug('%s version %s', __name__, version) log.debug('Python %s on %s', sys.version, sys.platform) @@ -55,7 +100,6 @@ IS_PY2 = sys.version_info[0] == 2 class AESCipher(object): def __init__(self, key): - #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ self.bs = 16 self.key = key def encrypt(self, raw, use_base64 = True): @@ -68,7 +112,6 @@ class AESCipher(object): cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 crypted_text = cipher.feed(raw) crypted_text += cipher.feed() # flush final block - #print('crypted_text %r' % crypted_text) #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) if use_base64: return base64.b64encode(crypted_text) @@ -267,33 +310,17 @@ class XenonDevice(object): json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload elif command == SET: # need to encrypt - #print('json_payload %r' % json_payload) self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new json_payload = self.cipher.encrypt(json_payload) - #print('crypted json_payload %r' % json_payload) preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key - #print('preMd5String %r' % preMd5String) m = md5() m.update(preMd5String) - #print(repr(m.digest())) hexdigest = m.hexdigest() - #print(hexdigest) - #print(hexdigest[8:][:16]) json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload - #print('data_to_send') - #print(json_payload) - #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) - #print('json_payload %r' % repr(json_payload)) - #print('json_payload len %r' % len(json_payload)) - #print(bin2hex(json_payload)) self.cipher = None # expect to connect and then disconnect to set new postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) - #print('postfix_payload %r' % postfix_payload) - #print('postfix_payload %r' % len(postfix_payload)) - #print('postfix_payload %x' % len(postfix_payload)) - #print('postfix_payload %r' % hex(len(postfix_payload))) assert len(postfix_payload) <= 0xff postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + @@ -304,11 +331,6 @@ class XenonDevice(object): # calc the CRC of everything except where the CRC goes and the suffix hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X') buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] - #print('command', command) - #print('prefix') - #print(payload_dict[self.dev_type][command]['prefix']) - #print(repr(buffer)) - #print(bin2hex(buffer, pretty=False)) #print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) )) #print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer))) return buffer @@ -359,9 +381,6 @@ class Device(XenonDevice): else: log.error('Unexpected status() payload=%r', result) -# if self.dev_type == 'cover': -# result = result.encode('utf-8') -# log.debug('encoded result=%r', result) return result def set_status(self, on, switch=1):