From 4a5bf8386786b8fbb70f8de7c5b2a2e1dfbcf3a5 Mon Sep 17 00:00:00 2001 From: rospogrigio Date: Sat, 7 Jan 2023 22:42:11 +0100 Subject: [PATCH] Introduced pytuya with support for 3.4 protocol --- .../localtuya/pytuya/__init__.py | 713 ++++++++++++++---- 1 file changed, 546 insertions(+), 167 deletions(-) diff --git a/custom_components/localtuya/pytuya/__init__.py b/custom_components/localtuya/pytuya/__init__.py index d36cd5e..a417534 100644 --- a/custom_components/localtuya/pytuya/__init__.py +++ b/custom_components/localtuya/pytuya/__init__.py @@ -3,11 +3,8 @@ """ Python module to interface with Tuya WiFi smart devices. -Mostly derived from Shenzhen Xenon ESP8266MOD WiFi smart devices -E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U - -Author: clach04 -Maintained by: postlund +Author: clach04, postlund +Maintained by: rospogrigio For more information see https://github.com/clach04/python-tuya @@ -19,7 +16,7 @@ Classes Functions json = status() # returns json payload - set_version(version) # 3.1 [default] or 3.3 + set_version(version) # 3.1 [default], 3.2, 3.3 or 3.4 detect_available_dps() # returns a list of available dps provided by the device update_dps(dps) # sends update dps command add_dps_to_request(dp_index) # adds dp_index to the list of dps used by the @@ -27,18 +24,21 @@ Functions set_dp(on, dp_index) # Set value of any dps index. -Credits - * TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes - For protocol reverse engineering - * PyTuya https://github.com/clach04/python-tuya by clach04 - The origin of this python module (now abandoned) - * LocalTuya https://github.com/rospogrigio/localtuya-homeassistant by rospogrigio - Updated pytuya to support devices with Device IDs of 22 characters + Credits + * TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes + For protocol reverse engineering + * PyTuya https://github.com/clach04/python-tuya by clach04 + The origin of this python module (now abandoned) + * Tuya Protocol 3.4 Support by uzlonewolf + Enhancement to TuyaMessage logic for multi-payload messages and Tuya Protocol 3.4 support + * TinyTuya https://github.com/jasonacox/tinytuya by jasonacox + Several CLI tools and code for Tuya devices """ import asyncio import base64 import binascii +import hmac import json import logging import struct @@ -46,18 +46,58 @@ import time import weakref from abc import ABC, abstractmethod from collections import namedtuple -from hashlib import md5 +from hashlib import md5,sha256 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -version_tuple = (9, 0, 0) +version_tuple = (10, 0, 0) version = version_string = __version__ = "%d.%d.%d" % version_tuple -__author__ = "postlund" +__author__ = "rospogrigio" _LOGGER = logging.getLogger(__name__) -TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc") +# Tuya Packet Format +TuyaHeader = namedtuple('TuyaHeader', 'prefix seqno cmd length') +MessagePayload = namedtuple("MessagePayload", "cmd payload") +try: + TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc crc_good", defaults=(True,)) +except: + TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc crc_good") + +# TinyTuya Error Response Codes +ERR_JSON = 900 +ERR_CONNECT = 901 +ERR_TIMEOUT = 902 +ERR_RANGE = 903 +ERR_PAYLOAD = 904 +ERR_OFFLINE = 905 +ERR_STATE = 906 +ERR_FUNCTION = 907 +ERR_DEVTYPE = 908 +ERR_CLOUDKEY = 909 +ERR_CLOUDRESP = 910 +ERR_CLOUDTOKEN = 911 +ERR_PARAMS = 912 +ERR_CLOUD = 913 + +error_codes = { + ERR_JSON: "Invalid JSON Response from Device", + ERR_CONNECT: "Network Error: Unable to Connect", + ERR_TIMEOUT: "Timeout Waiting for Device", + ERR_RANGE: "Specified Value Out of Range", + ERR_PAYLOAD: "Unexpected Payload from Device", + ERR_OFFLINE: "Network Error: Device Unreachable", + ERR_STATE: "Device in Unknown State", + ERR_FUNCTION: "Function Not Supported by Device", + ERR_DEVTYPE: "Device22 Detected: Retry Command", + ERR_CLOUDKEY: "Missing Tuya Cloud Key and Secret", + ERR_CLOUDRESP: "Invalid JSON Response from Cloud", + ERR_CLOUDTOKEN: "Unable to Get Cloud Token", + ERR_PARAMS: "Missing Function Parameters", + ERR_CLOUD: "Error Response from Tuya Cloud", + None: "Unknown Error", +} SET = "set" STATUS = "status" @@ -65,58 +105,109 @@ HEARTBEAT = "heartbeat" RESET = "reset" UPDATEDPS = "updatedps" # Request refresh of DPS +# Tuya Command Types +# Reference: https://github.com/tuya/tuya-iotos-embeded-sdk-wifi-ble-bk7231n/blob/master/sdk/include/lan_protocol.h +AP_CONFIG = 0x01 # FRM_TP_CFG_WF # only used for ap 3.0 network config +ACTIVE = 0x02 # FRM_TP_ACTV (discard) # WORK_MODE_CMD +SESS_KEY_NEG_START = 0x03 # FRM_SECURITY_TYPE3 # negotiate session key +SESS_KEY_NEG_RESP = 0x04 # FRM_SECURITY_TYPE4 # negotiate session key response +SESS_KEY_NEG_FINISH = 0x05 # FRM_SECURITY_TYPE5 # finalize session key negotiation +UNBIND = 0x06 # FRM_TP_UNBIND_DEV # DATA_QUERT_CMD - issue command +CONTROL = 0x07 # FRM_TP_CMD # STATE_UPLOAD_CMD +STATUS = 0x08 # FRM_TP_STAT_REPORT # STATE_QUERY_CMD +HEART_BEAT = 0x09 # FRM_TP_HB +DP_QUERY = 0x0a # 10 # FRM_QUERY_STAT # UPDATE_START_CMD - get data points +QUERY_WIFI = 0x0b # 11 # FRM_SSID_QUERY (discard) # UPDATE_TRANS_CMD +TOKEN_BIND = 0x0c # 12 # FRM_USER_BIND_REQ # GET_ONLINE_TIME_CMD - system time (GMT) +CONTROL_NEW = 0x0d # 13 # FRM_TP_NEW_CMD # FACTORY_MODE_CMD +ENABLE_WIFI = 0x0e # 14 # FRM_ADD_SUB_DEV_CMD # WIFI_TEST_CMD +WIFI_INFO = 0x0f # 15 # FRM_CFG_WIFI_INFO +DP_QUERY_NEW = 0x10 # 16 # FRM_QUERY_STAT_NEW +SCENE_EXECUTE = 0x11 # 17 # FRM_SCENE_EXEC +UPDATEDPS = 0x12 # 18 # FRM_LAN_QUERY_DP # Request refresh of DPS +UDP_NEW = 0x13 # 19 # FR_TYPE_ENCRYPTION +AP_CONFIG_NEW = 0x14 # 20 # FRM_AP_CFG_WF_V40 +BOARDCAST_LPV34 = 0x23 # 35 # FR_TYPE_BOARDCAST_LPV34 +LAN_EXT_STREAM = 0x40 # 64 # FRM_LAN_EXT_STREAM + + PROTOCOL_VERSION_BYTES_31 = b"3.1" PROTOCOL_VERSION_BYTES_33 = b"3.3" +PROTOCOL_VERSION_BYTES_34 = b"3.4" -PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + 12 * b"\x00" - -MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length +PROTOCOL_3x_HEADER = 12 * b"\x00" +PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + PROTOCOL_3x_HEADER +PROTOCOL_34_HEADER = PROTOCOL_VERSION_BYTES_34 + PROTOCOL_3x_HEADER +MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length [, retcode] MESSAGE_RECV_HEADER_FMT = ">5I" # 4*uint32: prefix, seqno, cmd, length, retcode +MESSAGE_RETCODE_FMT = ">I" # retcode for received messages MESSAGE_END_FMT = ">2I" # 2*uint32: crc, suffix - +MESSAGE_END_FMT_HMAC = ">32sI" # 32s:hmac, uint32:suffix PREFIX_VALUE = 0x000055AA +PREFIX_BIN = b"\x00\x00U\xaa" SUFFIX_VALUE = 0x0000AA55 +SUFFIX_BIN = b"\x00\x00\xaaU" +NO_PROTOCOL_HEADER_CMDS = [DP_QUERY, DP_QUERY_NEW, UPDATEDPS, HEART_BEAT, SESS_KEY_NEG_START, SESS_KEY_NEG_RESP, SESS_KEY_NEG_FINISH ] HEARTBEAT_INTERVAL = 10 # DPS that are known to be safe to use with update_dps (0x12) command UPDATE_DPS_WHITELIST = [18, 19, 20] # Socket (Wi-Fi) +# Tuya Device Dictionary - Command and Payload Overrides # This is intended to match requests.json payload at # https://github.com/codetheweb/tuyapi : -# type_0a devices require the 0a command as the status request -# type_0d devices require the 0d command as the status request, and the list of -# dps used set to null in the request payload (see generate_payload method) - +# 'type_0a' devices require the 0a command for the DP_QUERY request +# 'type_0d' devices require the 0d command for the DP_QUERY request and a list of +# dps used set to Null in the request payload # prefix: # Next byte is command byte ("hexByte") some zero padding, then length # of remaining payload, i.e. command + suffix (unclear if multiple bytes used for # length, zero padding implies could be more than one byte) -PAYLOAD_DICT = { + +# Any command not defined in payload_dict will be sent as-is with a +# payload of {"gwId": "", "devId": "", "uid": "", "t": ""} + +payload_dict = { + # Default Device "type_0a": { - STATUS: {"hexByte": 0x0A, "command": {"gwId": "", "devId": ""}}, - SET: {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}}, - HEARTBEAT: {"hexByte": 0x09, "command": {}}, - UPDATEDPS: {"hexByte": 0x12, "command": {"dpId": [18, 19, 20]}}, - RESET: { - "hexByte": 0x12, - "command": { - "gwId": "", - "devId": "", - "uid": "", - "t": "", - "dpId": [18, 19, 20], - }, + AP_CONFIG: { # [BETA] Set Control Values on Device + "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, + }, + CONTROL: { # Set Control Values on Device + "command": {"devId": "", "uid": "", "t": ""}, + }, + STATUS: { # Get Status from Device + "command": {"gwId": "", "devId": ""}, + }, + HEART_BEAT: {"command": {"gwId": "", "devId": ""}}, + DP_QUERY: { # Get Data Points from Device + "command": {"gwId": "", "devId": "", "uid": "", "t": ""}, + }, + CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": ""}}, + DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": ""}}, + UPDATEDPS: {"command": {"dpId": [18, 19, 20]}}, + }, + # Special Case Device "0d" - Some of these devices + # Require the 0d command as the DP_QUERY status request and the list of + # dps requested payload + "type_0d": { + DP_QUERY: { # Get Data Points from Device + "command_override": CONTROL_NEW, # Uses CONTROL_NEW command for some reason + "command": {"devId": "", "uid": "", "t": ""}, }, }, - "type_0d": { - STATUS: {"hexByte": 0x0D, "command": {"devId": "", "uid": "", "t": ""}}, - SET: {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}}, - HEARTBEAT: {"hexByte": 0x09, "command": {}}, - UPDATEDPS: {"hexByte": 0x12, "command": {"dpId": [18, 19, 20]}}, - }, + "v3.4": { + CONTROL: { + "command_override": CONTROL_NEW, # Uses CONTROL_NEW command + "command": {"protocol":5, "t": "int", "data": ""} + }, + DP_QUERY: { "command_override": DP_QUERY_NEW }, + } } + + class TuyaLoggingAdapter(logging.LoggerAdapter): """Adapter that adds device id to all log points.""" @@ -158,8 +249,9 @@ class ContextualLogger: return self._logger.exception(msg, *args) -def pack_message(msg): +def pack_message(msg,hmac_key=None): """Pack a TuyaMessage into bytes.""" + end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT # Create full message excluding CRC and suffix buffer = ( struct.pack( @@ -167,28 +259,81 @@ def pack_message(msg): PREFIX_VALUE, msg.seqno, msg.cmd, - len(msg.payload) + struct.calcsize(MESSAGE_END_FMT), + len(msg.payload) + struct.calcsize(end_fmt), ) + msg.payload ) - + if hmac_key: + crc = hmac.new(hmac_key, buffer, sha256).digest() + else: + crc = binascii.crc32(buffer) & 0xFFFFFFFF # Calculate CRC, add it together with suffix - buffer += struct.pack(MESSAGE_END_FMT, binascii.crc32(buffer), SUFFIX_VALUE) - + buffer += struct.pack( + end_fmt, crc, SUFFIX_VALUE + ) return buffer -def unpack_message(data): +def unpack_message(data, hmac_key=None, header=None, no_retcode=False, logger=None): """Unpack bytes into a TuyaMessage.""" - header_len = struct.calcsize(MESSAGE_RECV_HEADER_FMT) - end_len = struct.calcsize(MESSAGE_END_FMT) + end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT + # 4-word header plus return code + header_len = struct.calcsize(MESSAGE_HEADER_FMT) + retcode_len = 0 if no_retcode else struct.calcsize(MESSAGE_RETCODE_FMT) + end_len = struct.calcsize(end_fmt) + headret_len = header_len + retcode_len - _, seqno, cmd, _, retcode = struct.unpack( - MESSAGE_RECV_HEADER_FMT, data[:header_len] + if len(data) < headret_len+end_len: + logger.debug('unpack_message(): not enough data to unpack header! need %d but only have %d', headret_len+end_len, len(data)) + raise DecodeError('Not enough data to unpack header') + + if header is None: + header = parse_header(data) + + if len(data) < header_len+header.length: + logger.debug('unpack_message(): not enough data to unpack payload! need %d but only have %d', header_len+header.length, len(data)) + raise DecodeError('Not enough data to unpack payload') + + retcode = 0 if no_retcode else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])[0] + # the retcode is technically part of the payload, but strip it as we do not want it here + payload = data[header_len+retcode_len:header_len+header.length] + crc, suffix = struct.unpack(end_fmt, payload[-end_len:]) + + if hmac_key: + have_crc = hmac.new(hmac_key, data[:(header_len+header.length)-end_len], sha256).digest() + else: + have_crc = binascii.crc32(data[:(header_len+header.length)-end_len]) & 0xFFFFFFFF + + if suffix != SUFFIX_VALUE: + logger.debug('Suffix prefix wrong! %08X != %08X', suffix, SUFFIX_VALUE) + + if crc != have_crc: + if hmac_key: + logger.debug('HMAC checksum wrong! %r != %r', binascii.hexlify(have_crc), binascii.hexlify(crc)) + else: + logger.debug('CRC wrong! %08X != %08X', have_crc, crc) + + return TuyaMessage(header.seqno, header.cmd, retcode, payload[:-end_len], crc, crc == have_crc) + +def parse_header(data): + header_len = struct.calcsize(MESSAGE_HEADER_FMT) + + if len(data) < header_len: + raise DecodeError('Not enough data to unpack header') + + prefix, seqno, cmd, payload_len = struct.unpack( + MESSAGE_HEADER_FMT, data[:header_len] ) - payload = data[header_len:-end_len] - crc, _ = struct.unpack(MESSAGE_END_FMT, data[-end_len:]) - return TuyaMessage(seqno, cmd, retcode, payload, crc) + + if prefix != PREFIX_VALUE: + #self.debug('Header prefix wrong! %08X != %08X', prefix, PREFIX_VALUE) + raise DecodeError('Header prefix wrong! %08X != %08X' % (prefix, PREFIX_VALUE)) + + # sanity check. currently the max payload length is somewhere around 300 bytes + if payload_len > 1000: + raise DecodeError('Header claims the packet size is over 1000 bytes! It is most likely corrupt. Claimed size: %d bytes' % payload_len) + + return TuyaHeader(prefix, seqno, cmd, payload_len) class AESCipher: @@ -199,19 +344,21 @@ class AESCipher: self.block_size = 16 self.cipher = Cipher(algorithms.AES(key), modes.ECB(), default_backend()) - def encrypt(self, raw, use_base64=True): + def encrypt(self, raw, use_base64=True, pad=True): """Encrypt data to be sent to device.""" encryptor = self.cipher.encryptor() - crypted_text = encryptor.update(self._pad(raw)) + encryptor.finalize() + if pad: raw = self._pad(raw) + crypted_text = encryptor.update(raw) + encryptor.finalize() return base64.b64encode(crypted_text) if use_base64 else crypted_text - def decrypt(self, enc, use_base64=True): + def decrypt(self, enc, use_base64=True, decode_text=True): """Decrypt data from device.""" if use_base64: enc = base64.b64decode(enc) decryptor = self.cipher.decryptor() - return self._unpad(decryptor.update(enc) + decryptor.finalize()).decode() + raw = self._unpad(decryptor.update(enc) + decryptor.finalize()) + return raw.decode("utf-8") if decode_text else raw def _pad(self, data): padnum = self.block_size - len(data) % self.block_size @@ -229,13 +376,16 @@ class MessageDispatcher(ContextualLogger): # other messages. This is a hack to allow waiting for heartbeats. HEARTBEAT_SEQNO = -100 RESET_SEQNO = -101 + SESS_KEY_SEQNO = -102 - def __init__(self, dev_id, listener): + def __init__(self, dev_id, listener, version, local_key): """Initialize a new MessageBuffer.""" super().__init__() self.buffer = b"" self.listeners = {} self.listener = listener + self.version = version + self.local_key = local_key self.set_logger(_LOGGER, dev_id) def abort(self): @@ -248,12 +398,12 @@ class MessageDispatcher(ContextualLogger): if isinstance(sem, asyncio.Semaphore): sem.release() - async def wait_for(self, seqno, timeout=5): + async def wait_for(self, seqno, cmd, timeout=5): """Wait for response to a sequence number to be received and return it.""" if seqno in self.listeners: raise Exception(f"listener exists for {seqno}") - self.debug("Waiting for sequence number %d", seqno) + self.debug("Command %d waiting for sequence number %d", cmd, seqno) self.listeners[seqno] = asyncio.Semaphore(0) try: await asyncio.wait_for(self.listeners[seqno].acquire(), timeout=timeout) @@ -273,51 +423,39 @@ class MessageDispatcher(ContextualLogger): if len(self.buffer) < header_len: break - # Parse header and check if enough data according to length in header - _, seqno, cmd, length, retcode = struct.unpack_from( - MESSAGE_RECV_HEADER_FMT, self.buffer - ) - if len(self.buffer[header_len - 4 :]) < length: - break - - # length includes payload length, retcode, crc and suffix - if (retcode & 0xFFFFFF00) != 0: - payload_start = header_len - 4 - payload_length = length - struct.calcsize(MESSAGE_END_FMT) - else: - payload_start = header_len - payload_length = length - 4 - struct.calcsize(MESSAGE_END_FMT) - payload = self.buffer[payload_start : payload_start + payload_length] - - crc, _ = struct.unpack_from( - MESSAGE_END_FMT, - self.buffer[payload_start + payload_length : payload_start + length], - ) - - self.buffer = self.buffer[header_len - 4 + length :] - self._dispatch(TuyaMessage(seqno, cmd, retcode, payload, crc)) + header = parse_header(self.buffer) + hmac_key = self.local_key if self.version == '3.4' else None + msg = unpack_message(self.buffer, header=header, hmac_key=hmac_key, logger=self); + self.buffer = self.buffer[header_len - 4 + header.length :] + self._dispatch(msg) def _dispatch(self, msg): """Dispatch a message to someone that is listening.""" - self.debug("Dispatching message %s", msg) + self.debug("Dispatching message CMD %r %s", msg.cmd, msg) if msg.seqno in self.listeners: - self.debug("Dispatching sequence number %d", msg.seqno) + # self.debug("Dispatching sequence number %d", msg.seqno) sem = self.listeners[msg.seqno] self.listeners[msg.seqno] = msg sem.release() - elif msg.cmd == 0x09: + elif msg.cmd == HEART_BEAT: self.debug("Got heartbeat response") if self.HEARTBEAT_SEQNO in self.listeners: sem = self.listeners[self.HEARTBEAT_SEQNO] self.listeners[self.HEARTBEAT_SEQNO] = msg sem.release() - elif msg.cmd == 0x12: + elif msg.cmd == UPDATEDPS: self.debug("Got normal updatedps response") if self.RESET_SEQNO in self.listeners: sem = self.listeners[self.RESET_SEQNO] self.listeners[self.RESET_SEQNO] = msg sem.release() - elif msg.cmd == 0x08: + elif msg.cmd == SESS_KEY_NEG_RESP: + self.debug("Got key negotiation response") + if self.SESS_KEY_SEQNO in self.listeners: + sem = self.listeners[self.SESS_KEY_SEQNO] + self.listeners[self.SESS_KEY_SEQNO] = msg + sem.release() + elif msg.cmd == STATUS: if self.RESET_SEQNO in self.listeners: self.debug("Got reset status update") sem = self.listeners[self.RESET_SEQNO] @@ -327,12 +465,15 @@ class MessageDispatcher(ContextualLogger): self.debug("Got status update") self.listener(msg) else: - self.debug( - "Got message type %d for unknown listener %d: %s", - msg.cmd, - msg.seqno, - msg, - ) + if msg.cmd == CONTROL_NEW: + self.debug("Got ACK message for command %d: will ignore it", msg.cmd) + else: + self.error( + "Got message type %d for unknown listener %d: %s", + msg.cmd, + msg.seqno, + msg, + ) class TuyaListener(ABC): @@ -377,11 +518,12 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): self.set_logger(_LOGGER, dev_id) self.id = dev_id self.local_key = local_key.encode("latin1") + self.real_local_key = self.local_key self.version = protocol_version self.dev_type = "type_0a" self.dps_to_request = {} self.cipher = AESCipher(self.local_key) - self.seqno = 0 + self.seqno = 1 self.transport = None self.listener = weakref.ref(listener) self.dispatcher = self._setup_dispatcher() @@ -389,6 +531,40 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): self.heartbeater = None self.dps_cache = {} + if protocol_version: + self.set_version(float(protocol_version)) + else: + # make sure we call our set_version() and not a subclass since some of + # them (such as BulbDevice) make connections when called + TuyaProtocol.set_version(self, 3.1) + + def set_version(self, version): + self.version = version + self.version_bytes = str(version).encode('latin1') + self.version_header = self.version_bytes + PROTOCOL_3x_HEADER + if version == 3.2: # 3.2 behaves like 3.3 with type_0d + #self.version = 3.3 + self.dev_type="type_0d" + if self.dps_to_request == {}: + self.detect_available_dps() + elif version == 3.4: + self.dev_type = "v3.4" + elif self.dev_type == "v3.4": + self.dev_type = "default" + + def error_json(self, number=None, payload=None): + """Return error details in JSON""" + try: + spayload = json.dumps(payload) + # spayload = payload.replace('\"','').replace('\'','') + except: + spayload = '""' + + vals = (error_codes[number], str(number), spayload) + self.debug("ERROR %s - %s - payload: %s", *vals) + + return json.loads('{ "Error":"%s", "Err":"%s", "Payload":%s }' % vals) + def _setup_dispatcher(self): def _status_update(msg): decoded_message = self._decode_payload(msg.payload) @@ -399,7 +575,7 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): if listener is not None: listener.status_updated(self.dps_cache) - return MessageDispatcher(self.id, _status_update) + return MessageDispatcher(self.id, _status_update, self.version, self.local_key) def connection_made(self, transport): """Did connect to the device.""" @@ -434,11 +610,13 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): def data_received(self, data): """Received data from device.""" + # self.debug("received data=%r", binascii.hexlify(data)) self.dispatcher.add_data(data) def connection_lost(self, exc): """Disconnected from device.""" self.debug("Connection lost: %s", exc) + self.real_local_key = self.local_key try: listener = self.listener and self.listener() if listener is not None: @@ -449,6 +627,7 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): async def close(self): """Close connection and abort all outstanding listeners.""" self.debug("Closing connection") + self.real_local_key = self.local_key if self.heartbeater is not None: self.heartbeater.cancel() try: @@ -464,31 +643,78 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): self.transport = None transport.close() + # similar to exchange() but never retries sending and does not decode the response + async def exchange_quick(self, payload, recv_retries): + + if not self.transport: + self.debug("[" + self.id + "] send quick failed, could not get socket: %s", payload) + return None + enc_payload = self._encode_message(payload) if type(payload) == MessagePayload else payload + # self.debug("Quick-dispatching message %s, seqno %s", binascii.hexlify(enc_payload), self.seqno) + + try: + self.transport.write(enc_payload) + except: + # self._check_socket_close(True) + self.close() + return None + while recv_retries: + try: + #msg = await self._receive() + seqno = MessageDispatcher.SESS_KEY_SEQNO + # seqno = self.seqno - 1 + msg = await self.dispatcher.wait_for(seqno, payload.cmd) + # for 3.4 devices, we get the starting seqno with the SESS_KEY_NEG_RESP message + self.seqno = msg.seqno + except: + msg = None + if msg and len(msg.payload) != 0: + return msg + recv_retries -= 1 + if recv_retries == 0: + self.debug("received null payload (%r) but out of recv retries, giving up", msg) + else: + self.debug("received null payload (%r), fetch new one - %s retries remaining", msg, recv_retries) + return None + async def exchange(self, command, dps=None): """Send and receive a message, returning response from device.""" + + if self.version == 3.4 and self.real_local_key == self.local_key: + self.debug("3.4 device: negotiating a new session key") + await self._negotiate_session_key() + self.debug( "Sending command %s (device type: %s)", command, self.dev_type, ) payload = self._generate_payload(command, dps) + real_cmd = payload.cmd dev_type = self.dev_type + # self.debug("Exchange: payload %r %r", payload.cmd, payload.payload) # Wait for special sequence number if heartbeat or reset - seqno = self.seqno - 1 + seqno = self.seqno - if command == HEARTBEAT: + if payload.cmd == HEARTBEAT: seqno = MessageDispatcher.HEARTBEAT_SEQNO - elif command == RESET: + elif payload.cmd == RESET: seqno = MessageDispatcher.RESET_SEQNO - self.transport.write(payload) - msg = await self.dispatcher.wait_for(seqno) + enc_payload = self._encode_message(payload) + self.transport.write(enc_payload) + msg = await self.dispatcher.wait_for(seqno, payload.cmd ) if msg is None: self.debug("Wait was aborted for seqno %d", seqno) return None # TODO: Verify stuff, e.g. CRC sequence number? + if real_cmd == CONTROL_NEW and len(msg.payload) == 0: + # device may send one or two messages with empty payload in response + # to a CONTROL_NEW command, consider it an ACK + self.debug("ACK received for command %d: ignoring it", real_cmd) + return None payload = self._decode_payload(msg.payload) # Perform a new exchange (once) if we switched device type @@ -504,7 +730,7 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): async def status(self): """Return device status.""" - status = await self.exchange(STATUS) + status = await self.exchange(DP_QUERY) if status and "dps" in status: self.dps_cache.update(status["dps"]) return self.dps_cache @@ -539,7 +765,8 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): dps = list(set(dps).intersection(set(UPDATE_DPS_WHITELIST))) self.debug("updatedps() entry (dps %s, dps_cache %s)", dps, self.dps_cache) payload = self._generate_payload(UPDATEDPS, dps) - self.transport.write(payload) + enc_payload = self._encode_message(payload) + self.transport.write(enc_payload) return True async def set_dp(self, value, dp_index): @@ -550,11 +777,11 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): dp_index(int): dps index to set value: new value for the dps index """ - return await self.exchange(SET, {str(dp_index): value}) + return await self.exchange(CONTROL, {str(dp_index): value}) async def set_dps(self, dps): """Set values for a set of datapoints.""" - return await self.exchange(SET, dps) + return await self.exchange(CONTROL, dps) async def detect_available_dps(self): """Return which datapoints are supported by the device.""" @@ -591,38 +818,175 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): self.dps_to_request.update({str(index): None for index in dp_indicies}) def _decode_payload(self, payload): - if not payload: - payload = "{}" - elif payload.startswith(b"{"): - pass - elif payload.startswith(PROTOCOL_VERSION_BYTES_31): - payload = payload[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header - # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 - # hexdigest of payload - payload = self.cipher.decrypt(payload[16:]) - elif self.version == 3.3: - if self.dev_type != "type_0a" or payload.startswith( - PROTOCOL_VERSION_BYTES_33 - ): - payload = payload[len(PROTOCOL_33_HEADER) :] - payload = self.cipher.decrypt(payload, False) + cipher = AESCipher(self.local_key) + if self.version == 3.4: + # 3.4 devices encrypt the version header in addition to the payload + try: + # self.debug("decrypting=%r", payload) + payload = cipher.decrypt(payload, False, decode_text=False) + except: + self.debug("incomplete payload=%r (len:%d)", payload, len(payload)) + return self.error_json(ERR_PAYLOAD) + + # self.debug("decrypted 3.x payload=%r", payload) + + if payload.startswith(PROTOCOL_VERSION_BYTES_31): + # Received an encrypted payload + # Remove version header + payload = payload[len(PROTOCOL_VERSION_BYTES_31) :] + # Decrypt payload + # Remove 16-bytes of MD5 hexdigest of payload + payload = cipher.decrypt(payload[16:]) + elif self.version >= 3.2: # 3.2 or 3.3 or 3.4 + # Trim header for non-default device type + if payload.startswith( self.version_bytes ): + payload = payload[len(self.version_header) :] + # self.debug("removing 3.x=%r", payload) + elif self.dev_type == "type_0d" and (len(payload) & 0x0F) != 0: + payload = payload[len(self.version_header) :] + # self.debug("removing type_0d 3.x header=%r", payload) + + if self.version != 3.4: + try: + # self.debug("decrypting=%r", payload) + payload = cipher.decrypt(payload, False) + except: + self.debug("incomplete payload=%r (len:%d)", payload, len(payload)) + return self.error_json(ERR_PAYLOAD) + + # self.debug("decrypted 3.x payload=%r", payload) + # Try to detect if type_0d found + + if not isinstance(payload, str): + try: + payload = payload.decode() + except: + self.debug("payload was not string type and decoding failed") + return self.error_json(ERR_JSON, payload) if "data unvalid" in payload: self.dev_type = "type_0d" + # set at least one DPS + self.dps_to_request = {"1": None} self.debug( - "switching to dev_type %s", + "'data unvalid' error detected: switching to dev_type %r", self.dev_type, ) return None - else: - raise Exception(f"Unexpected payload={payload}") + elif not payload.startswith(b"{"): + self.debug("Unexpected payload=%r", payload) + return self.error_json(ERR_PAYLOAD, payload) if not isinstance(payload, str): payload = payload.decode() - self.debug("Decrypted payload: %s", payload) - return json.loads(payload) + self.debug("Deciphered data = %r", payload) + try: + json_payload = json.loads(payload) + except: + json_payload = self.error_json(ERR_JSON, payload) - def _generate_payload(self, command, data=None): + # v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...} + if "dps" not in json_payload and "data" in json_payload and "dps" in json_payload['data']: + json_payload['dps'] = json_payload['data']['dps'] + + return json_payload + + async def _negotiate_session_key(self): + self.local_nonce = b'0123456789abcdef' # not-so-random random key + self.remote_nonce = b'' + self.local_key = self.real_local_key + + rkey = await self.exchange_quick( MessagePayload(SESS_KEY_NEG_START, self.local_nonce), 2 ) + if not rkey or type(rkey) != TuyaMessage or len(rkey.payload) < 48: + # error + self.debug("session key negotiation failed on step 1") + return False + + if rkey.cmd != SESS_KEY_NEG_RESP: + self.debug("session key negotiation step 2 returned wrong command: %d", rkey.cmd) + return False + + payload = rkey.payload + try: + # self.debug("decrypting %r using %r", payload, self.real_local_key) + cipher = AESCipher(self.real_local_key) + payload = cipher.decrypt(payload, False, decode_text=False) + except: + self.debug("session key step 2 decrypt failed, payload=%r (len:%d)", payload, len(payload)) + return False + + self.debug("decrypted session key negotiation step 2: payload=%r", payload) + + if len(payload) < 48: + self.debug("session key negotiation step 2 failed, too short response") + return False + + self.remote_nonce = payload[:16] + hmac_check = hmac.new(self.local_key, self.local_nonce, sha256).digest() + + if hmac_check != payload[16:48]: + self.debug("session key negotiation step 2 failed HMAC check! wanted=%r but got=%r", binascii.hexlify(hmac_check), binascii.hexlify(payload[16:48])) + + # self.debug("session local nonce: %r remote nonce: %r", self.local_nonce, self.remote_nonce) + + rkey_hmac = hmac.new(self.local_key, self.remote_nonce, sha256).digest() + await self.exchange_quick( MessagePayload(SESS_KEY_NEG_FINISH, rkey_hmac), None ) + + self.local_key = bytes( [ a^b for (a,b) in zip(self.local_nonce,self.remote_nonce) ] ) + # self.debug("Session nonce XOR'd: %r" % self.local_key) + + cipher = AESCipher(self.real_local_key) + self.local_key = self.dispatcher.local_key = cipher.encrypt(self.local_key, False, pad=False) + self.debug("Session key negotiate success! session key: %r", self.local_key) + return True + + # adds protocol header (if needed) and encrypts + def _encode_message( self, msg ): + hmac_key = None + payload = msg.payload + self.cipher = AESCipher(self.local_key) + if self.version == 3.4: + hmac_key = self.local_key + if msg.cmd not in NO_PROTOCOL_HEADER_CMDS: + # add the 3.x header + payload = self.version_header + payload + self.debug('final payload for cmd %r: %r', msg.cmd, payload) + payload = self.cipher.encrypt(payload, False) + elif self.version >= 3.2: + # expect to connect and then disconnect to set new + payload = self.cipher.encrypt(payload, False) + if msg.cmd not in NO_PROTOCOL_HEADER_CMDS: + # add the 3.x header + payload = self.version_header + payload + elif msg.cmd == CONTROL: + # need to encrypt + payload = self.cipher.encrypt(payload) + preMd5String = ( + b"data=" + + payload + + b"||lpv=" + + PROTOCOL_VERSION_BYTES_31 + + b"||" + + self.local_key + ) + m = md5() + m.update(preMd5String) + hexdigest = m.hexdigest() + # some tuya libraries strip 8: to :24 + payload = ( + PROTOCOL_VERSION_BYTES_31 + + hexdigest[8:][:16].encode("latin1") + + payload + ) + + self.cipher = None + msg = TuyaMessage(self.seqno, msg.cmd, 0, payload, 0, True) + self.seqno += 1 # increase message sequence number + buffer = pack_message(msg,hmac_key=hmac_key) + # self.debug("payload encrypted with key %r => %r", self.local_key, binascii.hexlify(buffer)) + return buffer + + def _generate_payload(self, command, data=None, gwId=None, devId=None, uid=None): """ Generate the payload to send. @@ -631,58 +995,73 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger): This is one of the entries from payload_dict data(dict, optional): The data to be send. This is what will be passed via the 'dps' entry + gwId(str, optional): Will be used for gwId + devId(str, optional): Will be used for devId + uid(str, optional): Will be used for uid """ - cmd_data = PAYLOAD_DICT[self.dev_type][command] - json_data = cmd_data["command"] - command_hb = cmd_data["hexByte"] + json_data = command_override = None + + if command in payload_dict[self.dev_type]: + if 'command' in payload_dict[self.dev_type][command]: + json_data = payload_dict[self.dev_type][command]['command'] + if 'command_override' in payload_dict[self.dev_type][command]: + command_override = payload_dict[self.dev_type][command]['command_override'] + + if self.dev_type != 'type_0a': + if json_data is None and command in payload_dict['type_0a'] and 'command' in payload_dict['type_0a'][command]: + json_data = payload_dict['type_0a'][command]['command'] + if command_override is None and command in payload_dict['type_0a'] and 'command_override' in payload_dict['type_0a'][command]: + command_override = payload_dict['type_0a'][command]['command_override'] + + if command_override is None: + command_override = command + if json_data is None: + # I have yet to see a device complain about included but unneeded attribs, but they *will* + # complain about missing attribs, so just include them all unless otherwise specified + json_data = {"gwId": "", "devId": "", "uid": "", "t": ""} + cmd_data = "" if "gwId" in json_data: - json_data["gwId"] = self.id + if gwId is not None: + json_data["gwId"] = gwId + else: + json_data["gwId"] = self.id if "devId" in json_data: - json_data["devId"] = self.id + if devId is not None: + json_data["devId"] = devId + else: + json_data["devId"] = self.id if "uid" in json_data: - json_data["uid"] = self.id # still use id, no separate uid + if uid is not None: + json_data["uid"] = uid + else: + json_data["uid"] = self.id if "t" in json_data: - json_data["t"] = str(int(time.time())) + if json_data['t'] == "int": + json_data["t"] = int(time.time()) + else: + json_data["t"] = str(int(time.time())) if data is not None: if "dpId" in json_data: json_data["dpId"] = data + elif "data" in json_data: + json_data["data"] = {"dps": data} else: json_data["dps"] = data - elif command_hb == 0x0D: + elif self.dev_type == "type_0d" and command == DP_QUERY: json_data["dps"] = self.dps_to_request - payload = json.dumps(json_data).replace(" ", "").encode("utf-8") - self.debug("Send payload: %s", payload) + if json_data == "": + payload = "" + else: + payload = json.dumps(json_data) + # if spaces are not removed device does not respond! + payload = payload.replace(" ", "").encode("utf-8") + # self.debug("Sending payload: %s", payload) - if self.version == 3.3: - payload = self.cipher.encrypt(payload, False) - if command_hb not in [0x0A, 0x12]: - # add the 3.3 header - payload = PROTOCOL_33_HEADER + payload - elif command == SET: - payload = self.cipher.encrypt(payload) - to_hash = ( - b"data=" - + payload - + b"||lpv=" - + PROTOCOL_VERSION_BYTES_31 - + b"||" - + self.local_key - ) - hasher = md5() - hasher.update(to_hash) - hexdigest = hasher.hexdigest() - payload = ( - PROTOCOL_VERSION_BYTES_31 - + hexdigest[8:][:16].encode("latin1") - + payload - ) + return MessagePayload(command_override, payload) - msg = TuyaMessage(self.seqno, command_hb, 0, payload, 0) - self.seqno += 1 - return pack_message(msg) def __repr__(self): """Return internal string representation of object."""