511 lines
18 KiB
Python
511 lines
18 KiB
Python
# PyTuya Module
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Python module to interface with Tuya WiFi smart devices.
|
|
|
|
Mostly derived from Shenzhen Xenon ESP8266MOD WiFi smart devices
|
|
E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U
|
|
|
|
Author: clach04
|
|
Maintained by: rospogrigio
|
|
|
|
For more information see https://github.com/clach04/python-tuya
|
|
|
|
Classes
|
|
TuyaInterface(dev_id, address, local_key=None)
|
|
dev_id (str): Device ID e.g. 01234567891234567890
|
|
address (str): Device Network IP Address e.g. 10.0.1.99
|
|
local_key (str, optional): The encryption key. Defaults to None.
|
|
|
|
Functions
|
|
json = status() # returns json payload
|
|
set_version(version) # 3.1 [default] or 3.3
|
|
detect_available_dps() # returns a list of available dps provided by the device
|
|
add_dps_to_request(dps_index) # adds dps_index to the list of dps used by the
|
|
# device (to be queried in the payload)
|
|
set_dps(on, dps_index) # Set value of any dps index.
|
|
set_timer(num_secs):
|
|
|
|
|
|
Credits
|
|
* TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
|
|
For protocol reverse engineering
|
|
* PyTuya https://github.com/clach04/python-tuya by clach04
|
|
The origin of this python module (now abandoned)
|
|
* LocalTuya https://github.com/rospogrigio/localtuya-homeassistant by rospogrigio
|
|
Updated pytuya to support devices with Device IDs of 22 characters
|
|
"""
|
|
|
|
import base64
|
|
from hashlib import md5
|
|
import json
|
|
import logging
|
|
import socket
|
|
import sys
|
|
import time
|
|
import binascii
|
|
|
|
try:
|
|
# raise ImportError
|
|
import Crypto
|
|
from Crypto.Cipher import AES # PyCrypto
|
|
except ImportError:
|
|
Crypto = AES = None
|
|
import pyaes # https://github.com/ricmoo/pyaes
|
|
|
|
|
|
version_tuple = (8, 1, 0)
|
|
version = version_string = __version__ = "%d.%d.%d" % version_tuple
|
|
__author__ = "rospogrigio"
|
|
|
|
log = logging.getLogger(__name__)
|
|
logging.basicConfig() # TODO include function name/line numbers in log
|
|
# log.setLevel(level=logging.DEBUG) # Uncomment to Debug
|
|
|
|
log.debug("%s version %s", __name__, version)
|
|
log.debug("Python %s on %s", sys.version, sys.platform)
|
|
if Crypto is None:
|
|
log.debug("Using pyaes version %r", pyaes.VERSION)
|
|
log.debug("Using pyaes from %r", pyaes.__file__)
|
|
else:
|
|
log.debug("Using PyCrypto %r", Crypto.version_info)
|
|
log.debug("Using PyCrypto from %r", Crypto.__file__)
|
|
|
|
SET = "set"
|
|
STATUS = "status"
|
|
|
|
PROTOCOL_VERSION_BYTES_31 = b"3.1"
|
|
PROTOCOL_VERSION_BYTES_33 = b"3.3"
|
|
|
|
IS_PY2 = sys.version_info[0] == 2
|
|
|
|
|
|
class AESCipher:
|
|
"""Cipher module for Tuya communication."""
|
|
|
|
def __init__(self, key):
|
|
"""Initialize a new AESCipher."""
|
|
self.bs = 16
|
|
self.key = key
|
|
|
|
def encrypt(self, raw, use_base64=True):
|
|
"""Encrypt data to be sent to device."""
|
|
if Crypto:
|
|
raw = self._pad(raw)
|
|
cipher = AES.new(self.key, mode=AES.MODE_ECB)
|
|
crypted_text = cipher.encrypt(raw)
|
|
else:
|
|
_ = self._pad(raw)
|
|
cipher = pyaes.blockfeeder.Encrypter(
|
|
pyaes.AESModeOfOperationECB(self.key)
|
|
) # no IV, auto pads to 16
|
|
crypted_text = cipher.feed(raw)
|
|
crypted_text += cipher.feed() # flush final block
|
|
# print('crypted_text (%d) %r' % (len(crypted_text), crypted_text))
|
|
if use_base64:
|
|
return base64.b64encode(crypted_text)
|
|
else:
|
|
return crypted_text
|
|
|
|
def decrypt(self, enc, use_base64=True):
|
|
"""Decrypt data from device."""
|
|
if use_base64:
|
|
enc = base64.b64decode(enc)
|
|
# print('enc (%d) %r' % (len(enc), enc))
|
|
# enc = self._unpad(enc)
|
|
# enc = self._pad(enc)
|
|
# print('upadenc (%d) %r' % (len(enc), enc))
|
|
if Crypto:
|
|
cipher = AES.new(self.key, AES.MODE_ECB)
|
|
raw = cipher.decrypt(enc)
|
|
# print('raw (%d) %r' % (len(raw), raw))
|
|
return self._unpad(raw).decode("utf-8")
|
|
# return self._unpad(cipher.decrypt(enc)).decode('utf-8')
|
|
else:
|
|
cipher = pyaes.blockfeeder.Decrypter(
|
|
pyaes.AESModeOfOperationECB(self.key)
|
|
) # no IV, auto pads to 16
|
|
plain_text = cipher.feed(enc)
|
|
plain_text += cipher.feed() # flush final block
|
|
return plain_text
|
|
|
|
def _pad(self, s):
|
|
padnum = self.bs - len(s) % self.bs
|
|
return s + padnum * chr(padnum).encode()
|
|
|
|
@staticmethod
|
|
def _unpad(s):
|
|
return s[: -ord(s[len(s) - 1 :])]
|
|
|
|
|
|
def bin2hex(x, pretty=False):
|
|
"""Convert binary data to hex string."""
|
|
if pretty:
|
|
space = " "
|
|
else:
|
|
space = ""
|
|
if IS_PY2:
|
|
result = "".join("%02X%s" % (ord(y), space) for y in x)
|
|
else:
|
|
result = "".join("%02X%s" % (y, space) for y in x)
|
|
return result
|
|
|
|
|
|
def hex2bin(x):
|
|
"""Convert hex string to binary."""
|
|
if IS_PY2:
|
|
return x.decode("hex")
|
|
else:
|
|
return bytes.fromhex(x)
|
|
|
|
|
|
# This is intended to match requests.json payload at
|
|
# https://github.com/codetheweb/tuyapi :
|
|
# type_0a devices require the 0a command as the status request
|
|
# type_0d devices require the 0d command as the status request, and the list of
|
|
# dps used set to null in the request payload (see generate_payload method)
|
|
|
|
# prefix: # Next byte is command byte ("hexByte") some zero padding, then length
|
|
# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for
|
|
# length, zero padding implies could be more than one byte)
|
|
payload_dict = {
|
|
"type_0a": {
|
|
"status": {"hexByte": "0a", "command": {"gwId": "", "devId": ""}},
|
|
"set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}},
|
|
"prefix": "000055aa00000000000000",
|
|
"suffix": "000000000000aa55",
|
|
},
|
|
"type_0d": {
|
|
"status": {"hexByte": "0d", "command": {"devId": "", "uid": "", "t": ""}},
|
|
"set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}},
|
|
"prefix": "000055aa00000000000000",
|
|
"suffix": "000000000000aa55",
|
|
},
|
|
}
|
|
|
|
|
|
class TuyaInterface:
|
|
"""Represent a Tuya device."""
|
|
|
|
def __init__(
|
|
self, dev_id, address, local_key, protocol_version, connection_timeout=10
|
|
):
|
|
"""
|
|
Initialize a new TuyaInterface.
|
|
|
|
Args:
|
|
dev_id (str): The device id.
|
|
address (str): The network address.
|
|
local_key (str, optional): The encryption key. Defaults to None.
|
|
|
|
Attributes:
|
|
port (int): The port to connect to.
|
|
"""
|
|
self.id = dev_id
|
|
self.address = address
|
|
self.local_key = local_key.encode("latin1")
|
|
self.connection_timeout = connection_timeout
|
|
self.version = protocol_version
|
|
self.dev_type = "type_0a"
|
|
self.dps_to_request = {}
|
|
|
|
self.port = 6668 # default - do not expect caller to pass in
|
|
|
|
def __repr__(self):
|
|
"""Return internal string representation of object."""
|
|
return "%r" % ((self.id, self.address),) # FIXME can do better than this
|
|
|
|
def _send_receive(self, payload):
|
|
"""
|
|
Send single buffer `payload` and receive a single buffer.
|
|
|
|
Args:
|
|
payload(bytes): Data to send.
|
|
"""
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
s.settimeout(self.connection_timeout)
|
|
s.connect((self.address, self.port))
|
|
except Exception as e:
|
|
print("Failed to connect to %s. Raising Exception." % (self.address))
|
|
raise e
|
|
try:
|
|
s.send(payload)
|
|
except Exception as e:
|
|
print("Failed to send payload to %s. Raising Exception." % (self.address))
|
|
# s.close()
|
|
raise e
|
|
|
|
try:
|
|
data = s.recv(1024)
|
|
# print("FIRST: Received %d bytes" % len(data) )
|
|
# sometimes the first packet does not contain data (typically 28 bytes):
|
|
# need to read again
|
|
if len(data) < 40:
|
|
time.sleep(0.1)
|
|
data = s.recv(1024)
|
|
# print("SECOND: Received %d bytes" % len(data) )
|
|
except Exception as e:
|
|
print("Failed to receive data from %s. Raising Exception." % (self.address))
|
|
# s.close()
|
|
raise e
|
|
|
|
s.close()
|
|
return data
|
|
|
|
def detect_available_dps(self):
|
|
"""Return which datapoints are supported by the device."""
|
|
# type_0d devices need a sort of bruteforce querying in order to detect the
|
|
# list of available dps experience shows that the dps available are usually
|
|
# in the ranges [1-25] and [100-110] need to split the bruteforcing in
|
|
# different steps due to request payload limitation (max. length = 255)
|
|
detected_dps = {}
|
|
|
|
# dps 1 must always be sent, otherwise it might fail in case no dps is found
|
|
# in the requested range
|
|
self.dps_to_request = {"1": None}
|
|
self.add_dps_to_request(range(2, 11))
|
|
try:
|
|
data = self.status()
|
|
except Exception as e:
|
|
print("Failed to get status: [{}]".format(e))
|
|
raise
|
|
detected_dps.update(data["dps"])
|
|
|
|
if self.dev_type == "type_0a":
|
|
return detected_dps
|
|
|
|
self.dps_to_request = {"1": None}
|
|
self.add_dps_to_request(range(11, 21))
|
|
try:
|
|
data = self.status()
|
|
except Exception as e:
|
|
print("Failed to get status: [{}]".format(e))
|
|
raise
|
|
detected_dps.update(data["dps"])
|
|
|
|
self.dps_to_request = {"1": None}
|
|
self.add_dps_to_request(range(21, 31))
|
|
try:
|
|
data = self.status()
|
|
except Exception as e:
|
|
print("Failed to get status: [{}]".format(e))
|
|
raise
|
|
detected_dps.update(data["dps"])
|
|
|
|
self.dps_to_request = {"1": None}
|
|
self.add_dps_to_request(range(100, 111))
|
|
try:
|
|
data = self.status()
|
|
except Exception as e:
|
|
print("Failed to get status: [{}]".format(e))
|
|
raise
|
|
detected_dps.update(data["dps"])
|
|
# print("DATA IS [{}] detected_dps [{}]".format(data,detected_dps))
|
|
|
|
return detected_dps
|
|
|
|
def add_dps_to_request(self, dps_index):
|
|
"""Add a datapoint (DP) to be included in requests."""
|
|
if isinstance(dps_index, int):
|
|
self.dps_to_request[str(dps_index)] = None
|
|
else:
|
|
self.dps_to_request.update({str(index): None for index in dps_index})
|
|
|
|
def generate_payload(self, command, data=None):
|
|
"""
|
|
Generate the payload to send.
|
|
|
|
Args:
|
|
command(str): The type of command.
|
|
This is one of the entries from payload_dict
|
|
data(dict, optional): The data to be send.
|
|
This is what will be passed via the 'dps' entry
|
|
"""
|
|
json_data = payload_dict[self.dev_type][command]["command"]
|
|
command_hb = payload_dict[self.dev_type][command]["hexByte"]
|
|
|
|
if "gwId" in json_data:
|
|
json_data["gwId"] = self.id
|
|
if "devId" in json_data:
|
|
json_data["devId"] = self.id
|
|
if "uid" in json_data:
|
|
json_data["uid"] = self.id # still use id, no seperate uid
|
|
if "t" in json_data:
|
|
json_data["t"] = str(int(time.time()))
|
|
|
|
if data is not None:
|
|
json_data["dps"] = data
|
|
if command_hb == "0d":
|
|
json_data["dps"] = self.dps_to_request
|
|
# log.info('******** COMMAND IS %r', self.dps_to_request)
|
|
|
|
# Create byte buffer from hex data
|
|
json_payload = json.dumps(json_data)
|
|
# print(json_payload)
|
|
json_payload = json_payload.replace(
|
|
" ", ""
|
|
) # if spaces are not removed device does not respond!
|
|
json_payload = json_payload.encode("utf-8")
|
|
log.debug("json_payload=%r", json_payload)
|
|
# print('json_payload = ', json_payload, ' cmd = ', command_hb)
|
|
|
|
if self.version == 3.3:
|
|
self.cipher = AESCipher(
|
|
self.local_key
|
|
) # expect to connect and then disconnect to set new
|
|
json_payload = self.cipher.encrypt(json_payload, False)
|
|
self.cipher = None
|
|
if command_hb != "0a":
|
|
# add the 3.3 header
|
|
json_payload = (
|
|
PROTOCOL_VERSION_BYTES_33
|
|
+ b"\0\0\0\0\0\0\0\0\0\0\0\0"
|
|
+ json_payload
|
|
)
|
|
elif command == SET:
|
|
# need to encrypt
|
|
self.cipher = AESCipher(
|
|
self.local_key
|
|
) # expect to connect and then disconnect to set new
|
|
json_payload = self.cipher.encrypt(json_payload)
|
|
preMd5String = (
|
|
b"data="
|
|
+ json_payload
|
|
+ b"||lpv="
|
|
+ PROTOCOL_VERSION_BYTES_31
|
|
+ b"||"
|
|
+ self.local_key
|
|
)
|
|
m = md5()
|
|
m.update(preMd5String)
|
|
hexdigest = m.hexdigest()
|
|
json_payload = (
|
|
PROTOCOL_VERSION_BYTES_31
|
|
+ hexdigest[8:][:16].encode("latin1")
|
|
+ json_payload
|
|
)
|
|
self.cipher = None # expect to connect and then disconnect to set new
|
|
|
|
postfix_payload = hex2bin(
|
|
bin2hex(json_payload) + payload_dict[self.dev_type]["suffix"]
|
|
)
|
|
assert len(postfix_payload) <= 0xFF
|
|
postfix_payload_hex_len = "%x" % len(
|
|
postfix_payload
|
|
) # TODO this assumes a single byte 0-255 (0x00-0xff)
|
|
buffer = (
|
|
hex2bin(
|
|
payload_dict[self.dev_type]["prefix"]
|
|
+ payload_dict[self.dev_type][command]["hexByte"]
|
|
+ "000000"
|
|
+ postfix_payload_hex_len
|
|
)
|
|
+ postfix_payload
|
|
)
|
|
|
|
# calc the CRC of everything except where the CRC goes and the suffix
|
|
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xFFFFFFFF, "08X")
|
|
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:]
|
|
return buffer
|
|
|
|
def status(self):
|
|
"""Return device status."""
|
|
log.debug("status() entry (dev_type is %s)", self.dev_type)
|
|
# open device, send request, then close connection
|
|
payload = self.generate_payload("status")
|
|
|
|
data = self._send_receive(payload)
|
|
log.debug("status received data=%r", data)
|
|
|
|
result = data[20:-8] # hard coded offsets
|
|
if self.dev_type != "type_0a":
|
|
result = result[15:]
|
|
|
|
log.debug("result=%r", result)
|
|
# result = data[data.find('{'):data.rfind('}')+1] # naive marker search,
|
|
# hope neither { nor } occur in header/footer
|
|
# print('result %r' % result)
|
|
if result.startswith(b"{"):
|
|
# this is the regular expected code path
|
|
if not isinstance(result, str):
|
|
result = result.decode()
|
|
result = json.loads(result)
|
|
elif result.startswith(PROTOCOL_VERSION_BYTES_31):
|
|
# got an encrypted payload, happens occasionally
|
|
# expect resulting json to look similar to:
|
|
# {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
|
|
# NOTE dps.2 may or may not be present
|
|
result = result[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header
|
|
# remove (what I'm guessing, but not confirmed is) 16-bytes of MD5
|
|
# hexdigest of payload
|
|
result = result[16:]
|
|
cipher = AESCipher(self.local_key)
|
|
result = cipher.decrypt(result)
|
|
print("decrypted result=[{}]".format(result))
|
|
log.debug("decrypted result=%r", result)
|
|
if not isinstance(result, str):
|
|
result = result.decode()
|
|
result = json.loads(result)
|
|
elif self.version == 3.3:
|
|
cipher = AESCipher(self.local_key)
|
|
result = cipher.decrypt(result, False)
|
|
log.debug("decrypted result=%r", result)
|
|
if "data unvalid" in result:
|
|
self.dev_type = "type_0d"
|
|
log.debug(
|
|
"'data unvalid' error detected: switching to dev_type %r",
|
|
self.dev_type,
|
|
)
|
|
return self.status()
|
|
if not isinstance(result, str):
|
|
result = result.decode()
|
|
result = json.loads(result)
|
|
else:
|
|
log.error("Unexpected status() payload=%r", result)
|
|
|
|
return result
|
|
|
|
def set_dps(self, value, dps_index):
|
|
"""
|
|
Set value (may be any type: bool, int or string) of any dps index.
|
|
|
|
Args:
|
|
dps_index(int): dps index to set
|
|
value: new value for the dps index
|
|
"""
|
|
# open device, send request, then close connection
|
|
if isinstance(dps_index, int):
|
|
dps_index = str(dps_index) # index and payload is a string
|
|
|
|
payload = self.generate_payload(SET, {dps_index: value})
|
|
|
|
data = self._send_receive(payload)
|
|
log.debug("set_dps received data=%r", data)
|
|
|
|
return data
|
|
|
|
def set_timer(self, num_secs):
|
|
"""
|
|
Set a timer.
|
|
|
|
Args:
|
|
num_secs(int): Number of seconds
|
|
"""
|
|
# FIXME / TODO support schemas? Accept timer id number as parameter?
|
|
|
|
# Dumb heuristic; Query status, pick last device id as that is probably
|
|
# the timer
|
|
status = self.status()
|
|
devices = status["dps"]
|
|
devices_numbers = list(devices.keys())
|
|
devices_numbers.sort()
|
|
dps_id = devices_numbers[-1]
|
|
|
|
payload = self.generate_payload(SET, {dps_id: num_secs})
|
|
|
|
data = self._send_receive(payload)
|
|
log.debug("set_timer received data=%r", data)
|
|
return data
|