Cleaned up pytuya code

This commit is contained in:
rospogrigio
2020-09-07 14:46:35 +02:00
parent 3d0505ac41
commit 63d1ef26d4

View File

@@ -1,13 +1,58 @@
# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices
# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U
# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug
# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa
#
# This would not exist without the protocol reverse engineering from
# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
#
# Tested with Python 2.7 and Python 3.6.1 only
# TinyTuya Module
# -*- coding: utf-8 -*-
"""
Python module to interface with Tuya WiFi smart devices
Mostly derived from Shenzhen Xenon ESP8266MOD WiFi smart devices
E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U
Author: clach04
Maintained by: rospogrigio
For more information see https://github.com/clach04/python-tuya
Classes
OutletDevice(dev_id, address, local_key=None)
CoverDevice(dev_id, address, local_key=None)
BulbDevice(dev_id, address, local_key=None)
dev_id (str): Device ID e.g. 01234567891234567890
address (str): Device Network IP Address e.g. 10.0.1.99
local_key (str, optional): The encryption key. Defaults to None.
Functions
json = status() # returns json payload
set_version(version) # 3.1 [default] or 3.3
set_dpsUsed(dpsUsed)
set_status(on, switch=1) # Set status of the device to 'on' or 'off' (bool)
set_value(index, value) # Set int value of any index.
turn_on(switch=1):
turn_off(switch=1):
set_timer(num_secs):
CoverDevice:
open_cover(switch=1):
close_cover(switch=1):
stop_cover(switch=1):
BulbDevice
set_colour(r, g, b):
set_white(brightness, colourtemp):
set_brightness(brightness):
set_colourtemp(colourtemp):
result = brightness():
result = colourtemp():
(r, g, b) = colour_rgb():
(h,s,v) = colour_hsv()
result = state():
Credits
* TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
For protocol reverse engineering
* PyTuya https://github.com/clach04/python-tuya by clach04
The origin of this python module (now abandoned)
* LocalTuya https://github.com/rospogrigio/localtuya-homeassistant by rospogrigio
Updated pytuya to support devices with Device IDs of 22 characters
"""
import base64
from hashlib import md5
@@ -34,7 +79,7 @@ __author__ = 'rospogrigio'
log = logging.getLogger(__name__)
logging.basicConfig() # TODO include function name/line numbers in log
#log.setLevel(level=logging.DEBUG) # Debug hack!
#log.setLevel(level=logging.DEBUG) # Uncomment to Debug
log.debug('%s version %s', __name__, version)
log.debug('Python %s on %s', sys.version, sys.platform)
@@ -55,7 +100,6 @@ IS_PY2 = sys.version_info[0] == 2
class AESCipher(object):
def __init__(self, key):
#self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/
self.bs = 16
self.key = key
def encrypt(self, raw, use_base64 = True):
@@ -68,7 +112,6 @@ class AESCipher(object):
cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16
crypted_text = cipher.feed(raw)
crypted_text += cipher.feed() # flush final block
#print('crypted_text %r' % crypted_text)
#print('crypted_text (%d) %r' % (len(crypted_text), crypted_text))
if use_base64:
return base64.b64encode(crypted_text)
@@ -267,33 +310,17 @@ class XenonDevice(object):
json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload
elif command == SET:
# need to encrypt
#print('json_payload %r' % json_payload)
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload)
#print('crypted json_payload %r' % json_payload)
preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key
#print('preMd5String %r' % preMd5String)
m = md5()
m.update(preMd5String)
#print(repr(m.digest()))
hexdigest = m.hexdigest()
#print(hexdigest)
#print(hexdigest[8:][:16])
json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload
#print('data_to_send')
#print(json_payload)
#print('crypted json_payload (%d) %r' % (len(json_payload), json_payload))
#print('json_payload %r' % repr(json_payload))
#print('json_payload len %r' % len(json_payload))
#print(bin2hex(json_payload))
self.cipher = None # expect to connect and then disconnect to set new
postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix'])
#print('postfix_payload %r' % postfix_payload)
#print('postfix_payload %r' % len(postfix_payload))
#print('postfix_payload %x' % len(postfix_payload))
#print('postfix_payload %r' % hex(len(postfix_payload)))
assert len(postfix_payload) <= 0xff
postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff)
buffer = hex2bin( payload_dict[self.dev_type]['prefix'] +
@@ -304,11 +331,6 @@ class XenonDevice(object):
# calc the CRC of everything except where the CRC goes and the suffix
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X')
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:]
#print('command', command)
#print('prefix')
#print(payload_dict[self.dev_type][command]['prefix'])
#print(repr(buffer))
#print(bin2hex(buffer, pretty=False))
#print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) ))
#print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer)))
return buffer
@@ -359,9 +381,6 @@ class Device(XenonDevice):
else:
log.error('Unexpected status() payload=%r', result)
# if self.dev_type == 'cover':
# result = result.encode('utf-8')
# log.debug('encoded result=%r', result)
return result
def set_status(self, on, switch=1):