Initial work on refactoring pytuya

Still lots of things to improve, but it's a start. Will continue with
receive code.
This commit is contained in:
Pierre Ståhl
2020-09-28 10:21:23 +02:00
committed by rospogrigio
parent 1d2fdfdba9
commit b50cd7bf84

View File

@@ -41,9 +41,11 @@ from hashlib import md5
import json import json
import logging import logging
import socket import socket
import sys
import time import time
import binascii import binascii
import struct
from collections import namedtuple
from contextlib import contextmanager
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
@@ -52,9 +54,9 @@ version_tuple = (8, 1, 0)
version = version_string = __version__ = "%d.%d.%d" % version_tuple version = version_string = __version__ = "%d.%d.%d" % version_tuple
__author__ = "rospogrigio" __author__ = "rospogrigio"
log = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
logging.basicConfig() # TODO include function name/line numbers in log
# log.setLevel(level=logging.DEBUG) # Uncomment to Debug TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc")
SET = "set" SET = "set"
STATUS = "status" STATUS = "status"
@@ -62,7 +64,87 @@ STATUS = "status"
PROTOCOL_VERSION_BYTES_31 = b"3.1" PROTOCOL_VERSION_BYTES_31 = b"3.1"
PROTOCOL_VERSION_BYTES_33 = b"3.3" PROTOCOL_VERSION_BYTES_33 = b"3.3"
IS_PY2 = sys.version_info[0] == 2 PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + 12 * b"\x00"
MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length
MESSAGE_RECV_HEADER_FMT = ">5I" # 4*uint32: prefix, seqno, cmd, length, retcode
MESSAGE_END_FMT = ">2I" # 2*uint32: crc, suffix
PREFIX_VALUE = 0x000055AA
SUFFIX_VALUE = 0x0000AA55
# This is intended to match requests.json payload at
# https://github.com/codetheweb/tuyapi :
# type_0a devices require the 0a command as the status request
# type_0d devices require the 0d command as the status request, and the list of
# dps used set to null in the request payload (see generate_payload method)
# prefix: # Next byte is command byte ("hexByte") some zero padding, then length
# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for
# length, zero padding implies could be more than one byte)
PAYLOAD_DICT = {
"type_0a": {
"status": {"hexByte": 0x0A, "command": {"gwId": "", "devId": ""}},
"set": {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}},
},
"type_0d": {
"status": {"hexByte": 0x0D, "command": {"devId": "", "uid": "", "t": ""}},
"set": {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}},
},
}
@contextmanager
def socketcontext(address, port, timeout):
"""Context manager which sets up and tears down socket properly."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.settimeout(timeout)
s.connect((address, port))
try:
yield s
except Exception:
# This should probably be a warning or error, but since this happens
# every now and the and we do retries on a higher level, use debug level
# to not spam log with errors.
_LOGGER.debug("Failed to connect to %s. Raising Exception.", address)
raise
finally:
s.close()
def pack_message(msg):
"""Pack a TuyaMessage into bytes."""
# Create full message excluding CRC and suffix
buffer = (
struct.pack(
MESSAGE_HEADER_FMT,
PREFIX_VALUE,
msg.seqno,
msg.cmd,
len(msg.payload) + struct.calcsize(MESSAGE_END_FMT),
)
+ msg.payload
)
# Calculate CRC, add it together with suffix
buffer += struct.pack(MESSAGE_END_FMT, binascii.crc32(buffer), SUFFIX_VALUE)
return buffer
def unpack_message(data):
"""Unpack bytes into a TuyaMessage."""
header_len = struct.calcsize(MESSAGE_RECV_HEADER_FMT)
end_len = struct.calcsize(MESSAGE_END_FMT)
_, seqno, cmd, _, retcode = struct.unpack(
MESSAGE_RECV_HEADER_FMT, data[:header_len]
)
payload = data[header_len:-end_len]
crc, _ = struct.unpack(MESSAGE_END_FMT, data[-end_len:])
return TuyaMessage(seqno, cmd, retcode, payload, crc)
class AESCipher: class AESCipher:
@@ -77,11 +159,7 @@ class AESCipher:
"""Encrypt data to be sent to device.""" """Encrypt data to be sent to device."""
encryptor = self.cipher.encryptor() encryptor = self.cipher.encryptor()
crypted_text = encryptor.update(self._pad(raw)) + encryptor.finalize() crypted_text = encryptor.update(self._pad(raw)) + encryptor.finalize()
return base64.b64encode(crypted_text) if use_base64 else crypted_text
if use_base64:
return base64.b64encode(crypted_text)
else:
return crypted_text
def decrypt(self, enc, use_base64=True): def decrypt(self, enc, use_base64=True):
"""Decrypt data from device.""" """Decrypt data from device."""
@@ -100,57 +178,11 @@ class AESCipher:
return s[: -ord(s[len(s) - 1 :])] return s[: -ord(s[len(s) - 1 :])]
def bin2hex(x, pretty=False):
"""Convert binary data to hex string."""
if pretty:
space = " "
else:
space = ""
if IS_PY2:
result = "".join("%02X%s" % (ord(y), space) for y in x)
else:
result = "".join("%02X%s" % (y, space) for y in x)
return result
def hex2bin(x):
"""Convert hex string to binary."""
if IS_PY2:
return x.decode("hex")
else:
return bytes.fromhex(x)
# This is intended to match requests.json payload at
# https://github.com/codetheweb/tuyapi :
# type_0a devices require the 0a command as the status request
# type_0d devices require the 0d command as the status request, and the list of
# dps used set to null in the request payload (see generate_payload method)
# prefix: # Next byte is command byte ("hexByte") some zero padding, then length
# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for
# length, zero padding implies could be more than one byte)
payload_dict = {
"type_0a": {
"status": {"hexByte": "0a", "command": {"gwId": "", "devId": ""}},
"set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}},
"prefix": "000055aa00000000000000",
"suffix": "000000000000aa55",
},
"type_0d": {
"status": {"hexByte": "0d", "command": {"devId": "", "uid": "", "t": ""}},
"set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}},
"prefix": "000055aa00000000000000",
"suffix": "000000000000aa55",
},
}
class TuyaInterface: class TuyaInterface:
"""Represent a Tuya device.""" """Represent a Tuya device."""
def __init__( def __init__(
self, dev_id, address, local_key, protocol_version, connection_timeout=10 self, dev_id, address, local_key, protocol_version, connection_timeout=5
): ):
""" """
Initialize a new TuyaInterface. Initialize a new TuyaInterface.
@@ -170,51 +202,56 @@ class TuyaInterface:
self.version = protocol_version self.version = protocol_version
self.dev_type = "type_0a" self.dev_type = "type_0a"
self.dps_to_request = {} self.dps_to_request = {}
self.cipher = AESCipher(self.local_key)
self.seqno = 0
self.port = 6668 # default - do not expect caller to pass in self.port = 6668 # default - do not expect caller to pass in
def __repr__(self): def exchange(self, command, data=None):
"""Return internal string representation of object.""" """Send and recive a message, returning response from device."""
return "%r" % ((self.id, self.address),) # FIXME can do better than this _LOGGER.debug("Sending command %s (device type: %s", self.dev_type)
payload = self._generate_payload(command, data)
dev_type = self.dev_type
def _send_receive(self, payload): with socketcontext(self.address, self.port, self.connection_timeout) as s:
"""
Send single buffer `payload` and receive a single buffer.
Args:
payload(bytes): Data to send.
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.settimeout(self.connection_timeout)
s.connect((self.address, self.port))
except Exception as e:
print("Failed to connect to %s. Raising Exception." % (self.address))
raise e
try:
s.send(payload) s.send(payload)
except Exception as e:
print("Failed to send payload to %s. Raising Exception." % (self.address))
# s.close()
raise e
try:
data = s.recv(1024) data = s.recv(1024)
# print("FIRST: Received %d bytes" % len(data) )
# sometimes the first packet does not contain data (typically 28 bytes): # sometimes the first packet does not contain data (typically 28 bytes):
# need to read again # need to read again
if len(data) < 40: if len(data) < 40:
time.sleep(0.1) time.sleep(0.1)
data = s.recv(1024) data = s.recv(1024)
# print("SECOND: Received %d bytes" % len(data) )
except Exception as e:
print("Failed to receive data from %s. Raising Exception." % (self.address))
# s.close()
raise e
s.close() msg = unpack_message(data)
return data # TODO: Verify stuff, e.g. CRC sequence number
payload = self._decode_payload(msg.payload)
# Perform a new exchange (once) if we switched device type
if dev_type != self.dev_type:
_LOGGER.debug(
"Re-send %s due to device type change (%s -> %s)",
command,
dev_type,
self.dev_type,
)
return self.exchange(command, data)
return payload
def status(self):
"""Return device status."""
return self.exchange(STATUS)
def set_dps(self, value, dps_index):
"""
Set value (may be any type: bool, int or string) of any dps index.
Args:
dps_index(int): dps index to set
value: new value for the dps index
"""
return self.exchange(SET, {str(dps_index): value})
def detect_available_dps(self): def detect_available_dps(self):
"""Return which datapoints are supported by the device.""" """Return which datapoints are supported by the device."""
@@ -223,49 +260,23 @@ class TuyaInterface:
# in the ranges [1-25] and [100-110] need to split the bruteforcing in # in the ranges [1-25] and [100-110] need to split the bruteforcing in
# different steps due to request payload limitation (max. length = 255) # different steps due to request payload limitation (max. length = 255)
detected_dps = {} detected_dps = {}
ranges = [(2, 11), (11, 21), (21, 31), (100, 111)]
for dps_range in ranges:
# dps 1 must always be sent, otherwise it might fail in case no dps is found # dps 1 must always be sent, otherwise it might fail in case no dps is found
# in the requested range # in the requested range
self.dps_to_request = {"1": None} self.dps_to_request = {"1": None}
self.add_dps_to_request(range(2, 11)) self.add_dps_to_request(range(*dps_range))
try: try:
data = self.status() data = self.status()
except Exception as e: except Exception as e:
print("Failed to get status: [{}]".format(e)) _LOGGER.warning("Failed to get status: [{}]", e)
raise raise
detected_dps.update(data["dps"]) detected_dps.update(data["dps"])
if self.dev_type == "type_0a": if self.dev_type == "type_0a":
return detected_dps return detected_dps
self.dps_to_request = {"1": None}
self.add_dps_to_request(range(11, 21))
try:
data = self.status()
except Exception as e:
print("Failed to get status: [{}]".format(e))
raise
detected_dps.update(data["dps"])
self.dps_to_request = {"1": None}
self.add_dps_to_request(range(21, 31))
try:
data = self.status()
except Exception as e:
print("Failed to get status: [{}]".format(e))
raise
detected_dps.update(data["dps"])
self.dps_to_request = {"1": None}
self.add_dps_to_request(range(100, 111))
try:
data = self.status()
except Exception as e:
print("Failed to get status: [{}]".format(e))
raise
detected_dps.update(data["dps"])
# print("DATA IS [{}] detected_dps [{}]".format(data,detected_dps))
return detected_dps return detected_dps
def add_dps_to_request(self, dps_index): def add_dps_to_request(self, dps_index):
@@ -275,7 +286,37 @@ class TuyaInterface:
else: else:
self.dps_to_request.update({str(index): None for index in dps_index}) self.dps_to_request.update({str(index): None for index in dps_index})
def generate_payload(self, command, data=None): def _decode_payload(self, payload):
_LOGGER.debug("decode payload=%r", payload)
if payload.startswith(PROTOCOL_VERSION_BYTES_31):
payload = payload[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header
# remove (what I'm guessing, but not confirmed is) 16-bytes of MD5
# hexdigest of payload
payload = self.cipher.decrypt(payload[16:])
elif self.version == 3.3:
if self.dev_type != "type_0a" or payload.startswith(
PROTOCOL_VERSION_BYTES_33
):
payload = payload[len(PROTOCOL_33_HEADER) :]
payload = self.cipher.decrypt(payload, False)
if "data unvalid" in payload:
self.dev_type = "type_0d"
_LOGGER.debug(
"'data unvalid' error detected: switching to dev_type %r",
self.dev_type,
)
return None
elif not payload.startswith(b"{"):
raise Exception(f"Unexpected payload={payload}")
if not isinstance(payload, str):
payload = payload.decode()
_LOGGER.debug("decrypted result=%r", payload)
return json.loads(payload)
def _generate_payload(self, command, data=None):
""" """
Generate the payload to send. Generate the payload to send.
@@ -285,8 +326,9 @@ class TuyaInterface:
data(dict, optional): The data to be send. data(dict, optional): The data to be send.
This is what will be passed via the 'dps' entry This is what will be passed via the 'dps' entry
""" """
json_data = payload_dict[self.dev_type][command]["command"] cmd_data = PAYLOAD_DICT[self.dev_type][command]
command_hb = payload_dict[self.dev_type][command]["hexByte"] json_data = cmd_data["command"]
command_hb = cmd_data["hexByte"]
if "gwId" in json_data: if "gwId" in json_data:
json_data["gwId"] = self.id json_data["gwId"] = self.id
@@ -301,40 +343,20 @@ class TuyaInterface:
json_data["dps"] = data json_data["dps"] = data
if command_hb == "0d": if command_hb == "0d":
json_data["dps"] = self.dps_to_request json_data["dps"] = self.dps_to_request
# log.info('******** COMMAND IS %r', self.dps_to_request)
# Create byte buffer from hex data payload = json.dumps(json_data).replace(" ", "").encode("utf-8")
json_payload = json.dumps(json_data) _LOGGER.debug("paylod=%r", payload)
# print(json_payload)
json_payload = json_payload.replace(
" ", ""
) # if spaces are not removed device does not respond!
json_payload = json_payload.encode("utf-8")
log.debug("json_payload=%r", json_payload)
# print('json_payload = ', json_payload, ' cmd = ', command_hb)
if self.version == 3.3: if self.version == 3.3:
self.cipher = AESCipher( payload = self.cipher.encrypt(payload, False)
self.local_key if command_hb != 0x0A:
) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload, False)
self.cipher = None
if command_hb != "0a":
# add the 3.3 header # add the 3.3 header
json_payload = ( payload = PROTOCOL_33_HEADER + payload
PROTOCOL_VERSION_BYTES_33
+ b"\0\0\0\0\0\0\0\0\0\0\0\0"
+ json_payload
)
elif command == SET: elif command == SET:
# need to encrypt payload = self.cipher.encrypt(payload)
self.cipher = AESCipher(
self.local_key
) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload)
preMd5String = ( preMd5String = (
b"data=" b"data="
+ json_payload + payload
+ b"||lpv=" + b"||lpv="
+ PROTOCOL_VERSION_BYTES_31 + PROTOCOL_VERSION_BYTES_31
+ b"||" + b"||"
@@ -343,130 +365,16 @@ class TuyaInterface:
m = md5() m = md5()
m.update(preMd5String) m.update(preMd5String)
hexdigest = m.hexdigest() hexdigest = m.hexdigest()
json_payload = ( payload = (
PROTOCOL_VERSION_BYTES_31 PROTOCOL_VERSION_BYTES_31
+ hexdigest[8:][:16].encode("latin1") + hexdigest[8:][:16].encode("latin1")
+ json_payload + payload
)
self.cipher = None # expect to connect and then disconnect to set new
postfix_payload = hex2bin(
bin2hex(json_payload) + payload_dict[self.dev_type]["suffix"]
)
assert len(postfix_payload) <= 0xFF
postfix_payload_hex_len = "%x" % len(
postfix_payload
) # TODO this assumes a single byte 0-255 (0x00-0xff)
buffer = (
hex2bin(
payload_dict[self.dev_type]["prefix"]
+ payload_dict[self.dev_type][command]["hexByte"]
+ "000000"
+ postfix_payload_hex_len
)
+ postfix_payload
) )
# calc the CRC of everything except where the CRC goes and the suffix msg = TuyaMessage(self.seqno, command_hb, 0, payload, 0)
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xFFFFFFFF, "08X") self.seqno += 1
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] return pack_message(msg)
return buffer
def status(self): def __repr__(self):
"""Return device status.""" """Return internal string representation of object."""
log.debug("status() entry (dev_type is %s)", self.dev_type) return "%r" % ((self.id, self.address),) # FIXME can do better than this
# open device, send request, then close connection
payload = self.generate_payload("status")
data = self._send_receive(payload)
log.debug("status received data=%r", data)
result = data[20:-8] # hard coded offsets
if self.dev_type != "type_0a":
result = result[15:]
log.debug("result=%r", result)
# result = data[data.find('{'):data.rfind('}')+1] # naive marker search,
# hope neither { nor } occur in header/footer
# print('result %r' % result)
if result.startswith(b"{"):
# this is the regular expected code path
if not isinstance(result, str):
result = result.decode()
result = json.loads(result)
elif result.startswith(PROTOCOL_VERSION_BYTES_31):
# got an encrypted payload, happens occasionally
# expect resulting json to look similar to:
# {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
# NOTE dps.2 may or may not be present
result = result[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header
# remove (what I'm guessing, but not confirmed is) 16-bytes of MD5
# hexdigest of payload
result = result[16:]
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result)
print("decrypted result=[{}]".format(result))
log.debug("decrypted result=%r", result)
if not isinstance(result, str):
result = result.decode()
result = json.loads(result)
elif self.version == 3.3:
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result, False)
log.debug("decrypted result=%r", result)
if "data unvalid" in result:
self.dev_type = "type_0d"
log.debug(
"'data unvalid' error detected: switching to dev_type %r",
self.dev_type,
)
return self.status()
if not isinstance(result, str):
result = result.decode()
result = json.loads(result)
else:
log.error("Unexpected status() payload=%r", result)
return result
def set_dps(self, value, dps_index):
"""
Set value (may be any type: bool, int or string) of any dps index.
Args:
dps_index(int): dps index to set
value: new value for the dps index
"""
# open device, send request, then close connection
if isinstance(dps_index, int):
dps_index = str(dps_index) # index and payload is a string
payload = self.generate_payload(SET, {dps_index: value})
data = self._send_receive(payload)
log.debug("set_dps received data=%r", data)
return data
def set_timer(self, num_secs):
"""
Set a timer.
Args:
num_secs(int): Number of seconds
"""
# FIXME / TODO support schemas? Accept timer id number as parameter?
# Dumb heuristic; Query status, pick last device id as that is probably
# the timer
status = self.status()
devices = status["dps"]
devices_numbers = list(devices.keys())
devices_numbers.sort()
dps_id = devices_numbers[-1]
payload = self.generate_payload(SET, {dps_id: num_secs})
data = self._send_receive(payload)
log.debug("set_timer received data=%r", data)
return data