diff --git a/custom_components/localtuya/pytuya/__init__.py b/custom_components/localtuya/pytuya/__init__.py index 0ce88a5..324ddcf 100644 --- a/custom_components/localtuya/pytuya/__init__.py +++ b/custom_components/localtuya/pytuya/__init__.py @@ -41,9 +41,11 @@ from hashlib import md5 import json import logging import socket -import sys import time import binascii +import struct +from collections import namedtuple +from contextlib import contextmanager from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -52,9 +54,9 @@ version_tuple = (8, 1, 0) version = version_string = __version__ = "%d.%d.%d" % version_tuple __author__ = "rospogrigio" -log = logging.getLogger(__name__) -logging.basicConfig() # TODO include function name/line numbers in log -# log.setLevel(level=logging.DEBUG) # Uncomment to Debug +_LOGGER = logging.getLogger(__name__) + +TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc") SET = "set" STATUS = "status" @@ -62,7 +64,87 @@ STATUS = "status" PROTOCOL_VERSION_BYTES_31 = b"3.1" PROTOCOL_VERSION_BYTES_33 = b"3.3" -IS_PY2 = sys.version_info[0] == 2 +PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + 12 * b"\x00" + +MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length +MESSAGE_RECV_HEADER_FMT = ">5I" # 4*uint32: prefix, seqno, cmd, length, retcode +MESSAGE_END_FMT = ">2I" # 2*uint32: crc, suffix + +PREFIX_VALUE = 0x000055AA +SUFFIX_VALUE = 0x0000AA55 + + +# This is intended to match requests.json payload at +# https://github.com/codetheweb/tuyapi : +# type_0a devices require the 0a command as the status request +# type_0d devices require the 0d command as the status request, and the list of +# dps used set to null in the request payload (see generate_payload method) + +# prefix: # Next byte is command byte ("hexByte") some zero padding, then length +# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for +# length, zero padding implies could be more than one byte) +PAYLOAD_DICT = { + "type_0a": { + "status": {"hexByte": 0x0A, "command": {"gwId": "", "devId": ""}}, + "set": {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}}, + }, + "type_0d": { + "status": {"hexByte": 0x0D, "command": {"devId": "", "uid": "", "t": ""}}, + "set": {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}}, + }, +} + + +@contextmanager +def socketcontext(address, port, timeout): + """Context manager which sets up and tears down socket properly.""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + s.settimeout(timeout) + s.connect((address, port)) + try: + yield s + except Exception: + # This should probably be a warning or error, but since this happens + # every now and the and we do retries on a higher level, use debug level + # to not spam log with errors. + _LOGGER.debug("Failed to connect to %s. Raising Exception.", address) + raise + finally: + s.close() + + +def pack_message(msg): + """Pack a TuyaMessage into bytes.""" + # Create full message excluding CRC and suffix + buffer = ( + struct.pack( + MESSAGE_HEADER_FMT, + PREFIX_VALUE, + msg.seqno, + msg.cmd, + len(msg.payload) + struct.calcsize(MESSAGE_END_FMT), + ) + + msg.payload + ) + + # Calculate CRC, add it together with suffix + buffer += struct.pack(MESSAGE_END_FMT, binascii.crc32(buffer), SUFFIX_VALUE) + + return buffer + + +def unpack_message(data): + """Unpack bytes into a TuyaMessage.""" + header_len = struct.calcsize(MESSAGE_RECV_HEADER_FMT) + end_len = struct.calcsize(MESSAGE_END_FMT) + + _, seqno, cmd, _, retcode = struct.unpack( + MESSAGE_RECV_HEADER_FMT, data[:header_len] + ) + payload = data[header_len:-end_len] + crc, _ = struct.unpack(MESSAGE_END_FMT, data[-end_len:]) + return TuyaMessage(seqno, cmd, retcode, payload, crc) class AESCipher: @@ -77,11 +159,7 @@ class AESCipher: """Encrypt data to be sent to device.""" encryptor = self.cipher.encryptor() crypted_text = encryptor.update(self._pad(raw)) + encryptor.finalize() - - if use_base64: - return base64.b64encode(crypted_text) - else: - return crypted_text + return base64.b64encode(crypted_text) if use_base64 else crypted_text def decrypt(self, enc, use_base64=True): """Decrypt data from device.""" @@ -100,57 +178,11 @@ class AESCipher: return s[: -ord(s[len(s) - 1 :])] -def bin2hex(x, pretty=False): - """Convert binary data to hex string.""" - if pretty: - space = " " - else: - space = "" - if IS_PY2: - result = "".join("%02X%s" % (ord(y), space) for y in x) - else: - result = "".join("%02X%s" % (y, space) for y in x) - return result - - -def hex2bin(x): - """Convert hex string to binary.""" - if IS_PY2: - return x.decode("hex") - else: - return bytes.fromhex(x) - - -# This is intended to match requests.json payload at -# https://github.com/codetheweb/tuyapi : -# type_0a devices require the 0a command as the status request -# type_0d devices require the 0d command as the status request, and the list of -# dps used set to null in the request payload (see generate_payload method) - -# prefix: # Next byte is command byte ("hexByte") some zero padding, then length -# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for -# length, zero padding implies could be more than one byte) -payload_dict = { - "type_0a": { - "status": {"hexByte": "0a", "command": {"gwId": "", "devId": ""}}, - "set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}}, - "prefix": "000055aa00000000000000", - "suffix": "000000000000aa55", - }, - "type_0d": { - "status": {"hexByte": "0d", "command": {"devId": "", "uid": "", "t": ""}}, - "set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}}, - "prefix": "000055aa00000000000000", - "suffix": "000000000000aa55", - }, -} - - class TuyaInterface: """Represent a Tuya device.""" def __init__( - self, dev_id, address, local_key, protocol_version, connection_timeout=10 + self, dev_id, address, local_key, protocol_version, connection_timeout=5 ): """ Initialize a new TuyaInterface. @@ -170,51 +202,56 @@ class TuyaInterface: self.version = protocol_version self.dev_type = "type_0a" self.dps_to_request = {} + self.cipher = AESCipher(self.local_key) + self.seqno = 0 self.port = 6668 # default - do not expect caller to pass in - def __repr__(self): - """Return internal string representation of object.""" - return "%r" % ((self.id, self.address),) # FIXME can do better than this + def exchange(self, command, data=None): + """Send and recive a message, returning response from device.""" + _LOGGER.debug("Sending command %s (device type: %s", self.dev_type) + payload = self._generate_payload(command, data) + dev_type = self.dev_type - def _send_receive(self, payload): - """ - Send single buffer `payload` and receive a single buffer. - - Args: - payload(bytes): Data to send. - """ - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - s.settimeout(self.connection_timeout) - s.connect((self.address, self.port)) - except Exception as e: - print("Failed to connect to %s. Raising Exception." % (self.address)) - raise e - try: + with socketcontext(self.address, self.port, self.connection_timeout) as s: s.send(payload) - except Exception as e: - print("Failed to send payload to %s. Raising Exception." % (self.address)) - # s.close() - raise e - - try: data = s.recv(1024) - # print("FIRST: Received %d bytes" % len(data) ) + # sometimes the first packet does not contain data (typically 28 bytes): # need to read again if len(data) < 40: time.sleep(0.1) data = s.recv(1024) - # print("SECOND: Received %d bytes" % len(data) ) - except Exception as e: - print("Failed to receive data from %s. Raising Exception." % (self.address)) - # s.close() - raise e - s.close() - return data + msg = unpack_message(data) + # TODO: Verify stuff, e.g. CRC sequence number + + payload = self._decode_payload(msg.payload) + + # Perform a new exchange (once) if we switched device type + if dev_type != self.dev_type: + _LOGGER.debug( + "Re-send %s due to device type change (%s -> %s)", + command, + dev_type, + self.dev_type, + ) + return self.exchange(command, data) + return payload + + def status(self): + """Return device status.""" + return self.exchange(STATUS) + + def set_dps(self, value, dps_index): + """ + Set value (may be any type: bool, int or string) of any dps index. + + Args: + dps_index(int): dps index to set + value: new value for the dps index + """ + return self.exchange(SET, {str(dps_index): value}) def detect_available_dps(self): """Return which datapoints are supported by the device.""" @@ -223,48 +260,22 @@ class TuyaInterface: # in the ranges [1-25] and [100-110] need to split the bruteforcing in # different steps due to request payload limitation (max. length = 255) detected_dps = {} + ranges = [(2, 11), (11, 21), (21, 31), (100, 111)] - # dps 1 must always be sent, otherwise it might fail in case no dps is found - # in the requested range - self.dps_to_request = {"1": None} - self.add_dps_to_request(range(2, 11)) - try: - data = self.status() - except Exception as e: - print("Failed to get status: [{}]".format(e)) - raise - detected_dps.update(data["dps"]) + for dps_range in ranges: + # dps 1 must always be sent, otherwise it might fail in case no dps is found + # in the requested range + self.dps_to_request = {"1": None} + self.add_dps_to_request(range(*dps_range)) + try: + data = self.status() + except Exception as e: + _LOGGER.warning("Failed to get status: [{}]", e) + raise + detected_dps.update(data["dps"]) - if self.dev_type == "type_0a": - return detected_dps - - self.dps_to_request = {"1": None} - self.add_dps_to_request(range(11, 21)) - try: - data = self.status() - except Exception as e: - print("Failed to get status: [{}]".format(e)) - raise - detected_dps.update(data["dps"]) - - self.dps_to_request = {"1": None} - self.add_dps_to_request(range(21, 31)) - try: - data = self.status() - except Exception as e: - print("Failed to get status: [{}]".format(e)) - raise - detected_dps.update(data["dps"]) - - self.dps_to_request = {"1": None} - self.add_dps_to_request(range(100, 111)) - try: - data = self.status() - except Exception as e: - print("Failed to get status: [{}]".format(e)) - raise - detected_dps.update(data["dps"]) - # print("DATA IS [{}] detected_dps [{}]".format(data,detected_dps)) + if self.dev_type == "type_0a": + return detected_dps return detected_dps @@ -275,7 +286,37 @@ class TuyaInterface: else: self.dps_to_request.update({str(index): None for index in dps_index}) - def generate_payload(self, command, data=None): + def _decode_payload(self, payload): + _LOGGER.debug("decode payload=%r", payload) + + if payload.startswith(PROTOCOL_VERSION_BYTES_31): + payload = payload[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header + # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 + # hexdigest of payload + payload = self.cipher.decrypt(payload[16:]) + elif self.version == 3.3: + if self.dev_type != "type_0a" or payload.startswith( + PROTOCOL_VERSION_BYTES_33 + ): + payload = payload[len(PROTOCOL_33_HEADER) :] + payload = self.cipher.decrypt(payload, False) + + if "data unvalid" in payload: + self.dev_type = "type_0d" + _LOGGER.debug( + "'data unvalid' error detected: switching to dev_type %r", + self.dev_type, + ) + return None + elif not payload.startswith(b"{"): + raise Exception(f"Unexpected payload={payload}") + + if not isinstance(payload, str): + payload = payload.decode() + _LOGGER.debug("decrypted result=%r", payload) + return json.loads(payload) + + def _generate_payload(self, command, data=None): """ Generate the payload to send. @@ -285,8 +326,9 @@ class TuyaInterface: data(dict, optional): The data to be send. This is what will be passed via the 'dps' entry """ - json_data = payload_dict[self.dev_type][command]["command"] - command_hb = payload_dict[self.dev_type][command]["hexByte"] + cmd_data = PAYLOAD_DICT[self.dev_type][command] + json_data = cmd_data["command"] + command_hb = cmd_data["hexByte"] if "gwId" in json_data: json_data["gwId"] = self.id @@ -301,40 +343,20 @@ class TuyaInterface: json_data["dps"] = data if command_hb == "0d": json_data["dps"] = self.dps_to_request - # log.info('******** COMMAND IS %r', self.dps_to_request) - # Create byte buffer from hex data - json_payload = json.dumps(json_data) - # print(json_payload) - json_payload = json_payload.replace( - " ", "" - ) # if spaces are not removed device does not respond! - json_payload = json_payload.encode("utf-8") - log.debug("json_payload=%r", json_payload) - # print('json_payload = ', json_payload, ' cmd = ', command_hb) + payload = json.dumps(json_data).replace(" ", "").encode("utf-8") + _LOGGER.debug("paylod=%r", payload) if self.version == 3.3: - self.cipher = AESCipher( - self.local_key - ) # expect to connect and then disconnect to set new - json_payload = self.cipher.encrypt(json_payload, False) - self.cipher = None - if command_hb != "0a": + payload = self.cipher.encrypt(payload, False) + if command_hb != 0x0A: # add the 3.3 header - json_payload = ( - PROTOCOL_VERSION_BYTES_33 - + b"\0\0\0\0\0\0\0\0\0\0\0\0" - + json_payload - ) + payload = PROTOCOL_33_HEADER + payload elif command == SET: - # need to encrypt - self.cipher = AESCipher( - self.local_key - ) # expect to connect and then disconnect to set new - json_payload = self.cipher.encrypt(json_payload) + payload = self.cipher.encrypt(payload) preMd5String = ( b"data=" - + json_payload + + payload + b"||lpv=" + PROTOCOL_VERSION_BYTES_31 + b"||" @@ -343,130 +365,16 @@ class TuyaInterface: m = md5() m.update(preMd5String) hexdigest = m.hexdigest() - json_payload = ( + payload = ( PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode("latin1") - + json_payload + + payload ) - self.cipher = None # expect to connect and then disconnect to set new - postfix_payload = hex2bin( - bin2hex(json_payload) + payload_dict[self.dev_type]["suffix"] - ) - assert len(postfix_payload) <= 0xFF - postfix_payload_hex_len = "%x" % len( - postfix_payload - ) # TODO this assumes a single byte 0-255 (0x00-0xff) - buffer = ( - hex2bin( - payload_dict[self.dev_type]["prefix"] - + payload_dict[self.dev_type][command]["hexByte"] - + "000000" - + postfix_payload_hex_len - ) - + postfix_payload - ) + msg = TuyaMessage(self.seqno, command_hb, 0, payload, 0) + self.seqno += 1 + return pack_message(msg) - # calc the CRC of everything except where the CRC goes and the suffix - hex_crc = format(binascii.crc32(buffer[:-8]) & 0xFFFFFFFF, "08X") - buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] - return buffer - - def status(self): - """Return device status.""" - log.debug("status() entry (dev_type is %s)", self.dev_type) - # open device, send request, then close connection - payload = self.generate_payload("status") - - data = self._send_receive(payload) - log.debug("status received data=%r", data) - - result = data[20:-8] # hard coded offsets - if self.dev_type != "type_0a": - result = result[15:] - - log.debug("result=%r", result) - # result = data[data.find('{'):data.rfind('}')+1] # naive marker search, - # hope neither { nor } occur in header/footer - # print('result %r' % result) - if result.startswith(b"{"): - # this is the regular expected code path - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - elif result.startswith(PROTOCOL_VERSION_BYTES_31): - # got an encrypted payload, happens occasionally - # expect resulting json to look similar to: - # {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} - # NOTE dps.2 may or may not be present - result = result[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header - # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 - # hexdigest of payload - result = result[16:] - cipher = AESCipher(self.local_key) - result = cipher.decrypt(result) - print("decrypted result=[{}]".format(result)) - log.debug("decrypted result=%r", result) - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - elif self.version == 3.3: - cipher = AESCipher(self.local_key) - result = cipher.decrypt(result, False) - log.debug("decrypted result=%r", result) - if "data unvalid" in result: - self.dev_type = "type_0d" - log.debug( - "'data unvalid' error detected: switching to dev_type %r", - self.dev_type, - ) - return self.status() - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - else: - log.error("Unexpected status() payload=%r", result) - - return result - - def set_dps(self, value, dps_index): - """ - Set value (may be any type: bool, int or string) of any dps index. - - Args: - dps_index(int): dps index to set - value: new value for the dps index - """ - # open device, send request, then close connection - if isinstance(dps_index, int): - dps_index = str(dps_index) # index and payload is a string - - payload = self.generate_payload(SET, {dps_index: value}) - - data = self._send_receive(payload) - log.debug("set_dps received data=%r", data) - - return data - - def set_timer(self, num_secs): - """ - Set a timer. - - Args: - num_secs(int): Number of seconds - """ - # FIXME / TODO support schemas? Accept timer id number as parameter? - - # Dumb heuristic; Query status, pick last device id as that is probably - # the timer - status = self.status() - devices = status["dps"] - devices_numbers = list(devices.keys()) - devices_numbers.sort() - dps_id = devices_numbers[-1] - - payload = self.generate_payload(SET, {dps_id: num_secs}) - - data = self._send_receive(payload) - log.debug("set_timer received data=%r", data) - return data + def __repr__(self): + """Return internal string representation of object.""" + return "%r" % ((self.id, self.address),) # FIXME can do better than this