Introduced pytuya with support for 3.4 protocol

This commit is contained in:
rospogrigio
2023-01-07 22:42:11 +01:00
committed by rospogrigio
parent a2e04b40cf
commit 957cf25dd0

View File

@@ -3,11 +3,8 @@
"""
Python module to interface with Tuya WiFi smart devices.
Mostly derived from Shenzhen Xenon ESP8266MOD WiFi smart devices
E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U
Author: clach04
Maintained by: postlund
Author: clach04, postlund
Maintained by: rospogrigio
For more information see https://github.com/clach04/python-tuya
@@ -19,7 +16,7 @@ Classes
Functions
json = status() # returns json payload
set_version(version) # 3.1 [default] or 3.3
set_version(version) # 3.1 [default], 3.2, 3.3 or 3.4
detect_available_dps() # returns a list of available dps provided by the device
update_dps(dps) # sends update dps command
add_dps_to_request(dp_index) # adds dp_index to the list of dps used by the
@@ -32,13 +29,16 @@ Credits
For protocol reverse engineering
* PyTuya https://github.com/clach04/python-tuya by clach04
The origin of this python module (now abandoned)
* LocalTuya https://github.com/rospogrigio/localtuya-homeassistant by rospogrigio
Updated pytuya to support devices with Device IDs of 22 characters
* Tuya Protocol 3.4 Support by uzlonewolf
Enhancement to TuyaMessage logic for multi-payload messages and Tuya Protocol 3.4 support
* TinyTuya https://github.com/jasonacox/tinytuya by jasonacox
Several CLI tools and code for Tuya devices
"""
import asyncio
import base64
import binascii
import hmac
import json
import logging
import struct
@@ -46,18 +46,58 @@ import time
import weakref
from abc import ABC, abstractmethod
from collections import namedtuple
from hashlib import md5
from hashlib import md5,sha256
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
version_tuple = (9, 0, 0)
version_tuple = (10, 0, 0)
version = version_string = __version__ = "%d.%d.%d" % version_tuple
__author__ = "postlund"
__author__ = "rospogrigio"
_LOGGER = logging.getLogger(__name__)
TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc")
# Tuya Packet Format
TuyaHeader = namedtuple('TuyaHeader', 'prefix seqno cmd length')
MessagePayload = namedtuple("MessagePayload", "cmd payload")
try:
TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc crc_good", defaults=(True,))
except:
TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc crc_good")
# TinyTuya Error Response Codes
ERR_JSON = 900
ERR_CONNECT = 901
ERR_TIMEOUT = 902
ERR_RANGE = 903
ERR_PAYLOAD = 904
ERR_OFFLINE = 905
ERR_STATE = 906
ERR_FUNCTION = 907
ERR_DEVTYPE = 908
ERR_CLOUDKEY = 909
ERR_CLOUDRESP = 910
ERR_CLOUDTOKEN = 911
ERR_PARAMS = 912
ERR_CLOUD = 913
error_codes = {
ERR_JSON: "Invalid JSON Response from Device",
ERR_CONNECT: "Network Error: Unable to Connect",
ERR_TIMEOUT: "Timeout Waiting for Device",
ERR_RANGE: "Specified Value Out of Range",
ERR_PAYLOAD: "Unexpected Payload from Device",
ERR_OFFLINE: "Network Error: Device Unreachable",
ERR_STATE: "Device in Unknown State",
ERR_FUNCTION: "Function Not Supported by Device",
ERR_DEVTYPE: "Device22 Detected: Retry Command",
ERR_CLOUDKEY: "Missing Tuya Cloud Key and Secret",
ERR_CLOUDRESP: "Invalid JSON Response from Cloud",
ERR_CLOUDTOKEN: "Unable to Get Cloud Token",
ERR_PARAMS: "Missing Function Parameters",
ERR_CLOUD: "Error Response from Tuya Cloud",
None: "Unknown Error",
}
SET = "set"
STATUS = "status"
@@ -65,56 +105,107 @@ HEARTBEAT = "heartbeat"
RESET = "reset"
UPDATEDPS = "updatedps" # Request refresh of DPS
# Tuya Command Types
# Reference: https://github.com/tuya/tuya-iotos-embeded-sdk-wifi-ble-bk7231n/blob/master/sdk/include/lan_protocol.h
AP_CONFIG = 0x01 # FRM_TP_CFG_WF # only used for ap 3.0 network config
ACTIVE = 0x02 # FRM_TP_ACTV (discard) # WORK_MODE_CMD
SESS_KEY_NEG_START = 0x03 # FRM_SECURITY_TYPE3 # negotiate session key
SESS_KEY_NEG_RESP = 0x04 # FRM_SECURITY_TYPE4 # negotiate session key response
SESS_KEY_NEG_FINISH = 0x05 # FRM_SECURITY_TYPE5 # finalize session key negotiation
UNBIND = 0x06 # FRM_TP_UNBIND_DEV # DATA_QUERT_CMD - issue command
CONTROL = 0x07 # FRM_TP_CMD # STATE_UPLOAD_CMD
STATUS = 0x08 # FRM_TP_STAT_REPORT # STATE_QUERY_CMD
HEART_BEAT = 0x09 # FRM_TP_HB
DP_QUERY = 0x0a # 10 # FRM_QUERY_STAT # UPDATE_START_CMD - get data points
QUERY_WIFI = 0x0b # 11 # FRM_SSID_QUERY (discard) # UPDATE_TRANS_CMD
TOKEN_BIND = 0x0c # 12 # FRM_USER_BIND_REQ # GET_ONLINE_TIME_CMD - system time (GMT)
CONTROL_NEW = 0x0d # 13 # FRM_TP_NEW_CMD # FACTORY_MODE_CMD
ENABLE_WIFI = 0x0e # 14 # FRM_ADD_SUB_DEV_CMD # WIFI_TEST_CMD
WIFI_INFO = 0x0f # 15 # FRM_CFG_WIFI_INFO
DP_QUERY_NEW = 0x10 # 16 # FRM_QUERY_STAT_NEW
SCENE_EXECUTE = 0x11 # 17 # FRM_SCENE_EXEC
UPDATEDPS = 0x12 # 18 # FRM_LAN_QUERY_DP # Request refresh of DPS
UDP_NEW = 0x13 # 19 # FR_TYPE_ENCRYPTION
AP_CONFIG_NEW = 0x14 # 20 # FRM_AP_CFG_WF_V40
BOARDCAST_LPV34 = 0x23 # 35 # FR_TYPE_BOARDCAST_LPV34
LAN_EXT_STREAM = 0x40 # 64 # FRM_LAN_EXT_STREAM
PROTOCOL_VERSION_BYTES_31 = b"3.1"
PROTOCOL_VERSION_BYTES_33 = b"3.3"
PROTOCOL_VERSION_BYTES_34 = b"3.4"
PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + 12 * b"\x00"
MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length
PROTOCOL_3x_HEADER = 12 * b"\x00"
PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + PROTOCOL_3x_HEADER
PROTOCOL_34_HEADER = PROTOCOL_VERSION_BYTES_34 + PROTOCOL_3x_HEADER
MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length [, retcode]
MESSAGE_RECV_HEADER_FMT = ">5I" # 4*uint32: prefix, seqno, cmd, length, retcode
MESSAGE_RETCODE_FMT = ">I" # retcode for received messages
MESSAGE_END_FMT = ">2I" # 2*uint32: crc, suffix
MESSAGE_END_FMT_HMAC = ">32sI" # 32s:hmac, uint32:suffix
PREFIX_VALUE = 0x000055AA
PREFIX_BIN = b"\x00\x00U\xaa"
SUFFIX_VALUE = 0x0000AA55
SUFFIX_BIN = b"\x00\x00\xaaU"
NO_PROTOCOL_HEADER_CMDS = [DP_QUERY, DP_QUERY_NEW, UPDATEDPS, HEART_BEAT, SESS_KEY_NEG_START, SESS_KEY_NEG_RESP, SESS_KEY_NEG_FINISH ]
HEARTBEAT_INTERVAL = 10
# DPS that are known to be safe to use with update_dps (0x12) command
UPDATE_DPS_WHITELIST = [18, 19, 20] # Socket (Wi-Fi)
# Tuya Device Dictionary - Command and Payload Overrides
# This is intended to match requests.json payload at
# https://github.com/codetheweb/tuyapi :
# type_0a devices require the 0a command as the status request
# type_0d devices require the 0d command as the status request, and the list of
# dps used set to null in the request payload (see generate_payload method)
# 'type_0a' devices require the 0a command for the DP_QUERY request
# 'type_0d' devices require the 0d command for the DP_QUERY request and a list of
# dps used set to Null in the request payload
# prefix: # Next byte is command byte ("hexByte") some zero padding, then length
# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for
# length, zero padding implies could be more than one byte)
PAYLOAD_DICT = {
# Any command not defined in payload_dict will be sent as-is with a
# payload of {"gwId": "", "devId": "", "uid": "", "t": ""}
payload_dict = {
# Default Device
"type_0a": {
STATUS: {"hexByte": 0x0A, "command": {"gwId": "", "devId": ""}},
SET: {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}},
HEARTBEAT: {"hexByte": 0x09, "command": {}},
UPDATEDPS: {"hexByte": 0x12, "command": {"dpId": [18, 19, 20]}},
RESET: {
"hexByte": 0x12,
"command": {
"gwId": "",
"devId": "",
"uid": "",
"t": "",
"dpId": [18, 19, 20],
AP_CONFIG: { # [BETA] Set Control Values on Device
"command": {"gwId": "", "devId": "", "uid": "", "t": ""},
},
CONTROL: { # Set Control Values on Device
"command": {"devId": "", "uid": "", "t": ""},
},
STATUS: { # Get Status from Device
"command": {"gwId": "", "devId": ""},
},
HEART_BEAT: {"command": {"gwId": "", "devId": ""}},
DP_QUERY: { # Get Data Points from Device
"command": {"gwId": "", "devId": "", "uid": "", "t": ""},
},
CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": ""}},
DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": ""}},
UPDATEDPS: {"command": {"dpId": [18, 19, 20]}},
},
# Special Case Device "0d" - Some of these devices
# Require the 0d command as the DP_QUERY status request and the list of
# dps requested payload
"type_0d": {
STATUS: {"hexByte": 0x0D, "command": {"devId": "", "uid": "", "t": ""}},
SET: {"hexByte": 0x07, "command": {"devId": "", "uid": "", "t": ""}},
HEARTBEAT: {"hexByte": 0x09, "command": {}},
UPDATEDPS: {"hexByte": 0x12, "command": {"dpId": [18, 19, 20]}},
DP_QUERY: { # Get Data Points from Device
"command_override": CONTROL_NEW, # Uses CONTROL_NEW command for some reason
"command": {"devId": "", "uid": "", "t": ""},
},
},
"v3.4": {
CONTROL: {
"command_override": CONTROL_NEW, # Uses CONTROL_NEW command
"command": {"protocol":5, "t": "int", "data": ""}
},
DP_QUERY: { "command_override": DP_QUERY_NEW },
}
}
class TuyaLoggingAdapter(logging.LoggerAdapter):
@@ -158,8 +249,9 @@ class ContextualLogger:
return self._logger.exception(msg, *args)
def pack_message(msg):
def pack_message(msg,hmac_key=None):
"""Pack a TuyaMessage into bytes."""
end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT
# Create full message excluding CRC and suffix
buffer = (
struct.pack(
@@ -167,28 +259,81 @@ def pack_message(msg):
PREFIX_VALUE,
msg.seqno,
msg.cmd,
len(msg.payload) + struct.calcsize(MESSAGE_END_FMT),
len(msg.payload) + struct.calcsize(end_fmt),
)
+ msg.payload
)
if hmac_key:
crc = hmac.new(hmac_key, buffer, sha256).digest()
else:
crc = binascii.crc32(buffer) & 0xFFFFFFFF
# Calculate CRC, add it together with suffix
buffer += struct.pack(MESSAGE_END_FMT, binascii.crc32(buffer), SUFFIX_VALUE)
buffer += struct.pack(
end_fmt, crc, SUFFIX_VALUE
)
return buffer
def unpack_message(data):
def unpack_message(data, hmac_key=None, header=None, no_retcode=False, logger=None):
"""Unpack bytes into a TuyaMessage."""
header_len = struct.calcsize(MESSAGE_RECV_HEADER_FMT)
end_len = struct.calcsize(MESSAGE_END_FMT)
end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT
# 4-word header plus return code
header_len = struct.calcsize(MESSAGE_HEADER_FMT)
retcode_len = 0 if no_retcode else struct.calcsize(MESSAGE_RETCODE_FMT)
end_len = struct.calcsize(end_fmt)
headret_len = header_len + retcode_len
_, seqno, cmd, _, retcode = struct.unpack(
MESSAGE_RECV_HEADER_FMT, data[:header_len]
if len(data) < headret_len+end_len:
logger.debug('unpack_message(): not enough data to unpack header! need %d but only have %d', headret_len+end_len, len(data))
raise DecodeError('Not enough data to unpack header')
if header is None:
header = parse_header(data)
if len(data) < header_len+header.length:
logger.debug('unpack_message(): not enough data to unpack payload! need %d but only have %d', header_len+header.length, len(data))
raise DecodeError('Not enough data to unpack payload')
retcode = 0 if no_retcode else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])[0]
# the retcode is technically part of the payload, but strip it as we do not want it here
payload = data[header_len+retcode_len:header_len+header.length]
crc, suffix = struct.unpack(end_fmt, payload[-end_len:])
if hmac_key:
have_crc = hmac.new(hmac_key, data[:(header_len+header.length)-end_len], sha256).digest()
else:
have_crc = binascii.crc32(data[:(header_len+header.length)-end_len]) & 0xFFFFFFFF
if suffix != SUFFIX_VALUE:
logger.debug('Suffix prefix wrong! %08X != %08X', suffix, SUFFIX_VALUE)
if crc != have_crc:
if hmac_key:
logger.debug('HMAC checksum wrong! %r != %r', binascii.hexlify(have_crc), binascii.hexlify(crc))
else:
logger.debug('CRC wrong! %08X != %08X', have_crc, crc)
return TuyaMessage(header.seqno, header.cmd, retcode, payload[:-end_len], crc, crc == have_crc)
def parse_header(data):
header_len = struct.calcsize(MESSAGE_HEADER_FMT)
if len(data) < header_len:
raise DecodeError('Not enough data to unpack header')
prefix, seqno, cmd, payload_len = struct.unpack(
MESSAGE_HEADER_FMT, data[:header_len]
)
payload = data[header_len:-end_len]
crc, _ = struct.unpack(MESSAGE_END_FMT, data[-end_len:])
return TuyaMessage(seqno, cmd, retcode, payload, crc)
if prefix != PREFIX_VALUE:
#self.debug('Header prefix wrong! %08X != %08X', prefix, PREFIX_VALUE)
raise DecodeError('Header prefix wrong! %08X != %08X' % (prefix, PREFIX_VALUE))
# sanity check. currently the max payload length is somewhere around 300 bytes
if payload_len > 1000:
raise DecodeError('Header claims the packet size is over 1000 bytes! It is most likely corrupt. Claimed size: %d bytes' % payload_len)
return TuyaHeader(prefix, seqno, cmd, payload_len)
class AESCipher:
@@ -199,19 +344,21 @@ class AESCipher:
self.block_size = 16
self.cipher = Cipher(algorithms.AES(key), modes.ECB(), default_backend())
def encrypt(self, raw, use_base64=True):
def encrypt(self, raw, use_base64=True, pad=True):
"""Encrypt data to be sent to device."""
encryptor = self.cipher.encryptor()
crypted_text = encryptor.update(self._pad(raw)) + encryptor.finalize()
if pad: raw = self._pad(raw)
crypted_text = encryptor.update(raw) + encryptor.finalize()
return base64.b64encode(crypted_text) if use_base64 else crypted_text
def decrypt(self, enc, use_base64=True):
def decrypt(self, enc, use_base64=True, decode_text=True):
"""Decrypt data from device."""
if use_base64:
enc = base64.b64decode(enc)
decryptor = self.cipher.decryptor()
return self._unpad(decryptor.update(enc) + decryptor.finalize()).decode()
raw = self._unpad(decryptor.update(enc) + decryptor.finalize())
return raw.decode("utf-8") if decode_text else raw
def _pad(self, data):
padnum = self.block_size - len(data) % self.block_size
@@ -229,13 +376,16 @@ class MessageDispatcher(ContextualLogger):
# other messages. This is a hack to allow waiting for heartbeats.
HEARTBEAT_SEQNO = -100
RESET_SEQNO = -101
SESS_KEY_SEQNO = -102
def __init__(self, dev_id, listener):
def __init__(self, dev_id, listener, version, local_key):
"""Initialize a new MessageBuffer."""
super().__init__()
self.buffer = b""
self.listeners = {}
self.listener = listener
self.version = version
self.local_key = local_key
self.set_logger(_LOGGER, dev_id)
def abort(self):
@@ -248,12 +398,12 @@ class MessageDispatcher(ContextualLogger):
if isinstance(sem, asyncio.Semaphore):
sem.release()
async def wait_for(self, seqno, timeout=5):
async def wait_for(self, seqno, cmd, timeout=5):
"""Wait for response to a sequence number to be received and return it."""
if seqno in self.listeners:
raise Exception(f"listener exists for {seqno}")
self.debug("Waiting for sequence number %d", seqno)
self.debug("Command %d waiting for sequence number %d", cmd, seqno)
self.listeners[seqno] = asyncio.Semaphore(0)
try:
await asyncio.wait_for(self.listeners[seqno].acquire(), timeout=timeout)
@@ -273,51 +423,39 @@ class MessageDispatcher(ContextualLogger):
if len(self.buffer) < header_len:
break
# Parse header and check if enough data according to length in header
_, seqno, cmd, length, retcode = struct.unpack_from(
MESSAGE_RECV_HEADER_FMT, self.buffer
)
if len(self.buffer[header_len - 4 :]) < length:
break
# length includes payload length, retcode, crc and suffix
if (retcode & 0xFFFFFF00) != 0:
payload_start = header_len - 4
payload_length = length - struct.calcsize(MESSAGE_END_FMT)
else:
payload_start = header_len
payload_length = length - 4 - struct.calcsize(MESSAGE_END_FMT)
payload = self.buffer[payload_start : payload_start + payload_length]
crc, _ = struct.unpack_from(
MESSAGE_END_FMT,
self.buffer[payload_start + payload_length : payload_start + length],
)
self.buffer = self.buffer[header_len - 4 + length :]
self._dispatch(TuyaMessage(seqno, cmd, retcode, payload, crc))
header = parse_header(self.buffer)
hmac_key = self.local_key if self.version == '3.4' else None
msg = unpack_message(self.buffer, header=header, hmac_key=hmac_key, logger=self);
self.buffer = self.buffer[header_len - 4 + header.length :]
self._dispatch(msg)
def _dispatch(self, msg):
"""Dispatch a message to someone that is listening."""
self.debug("Dispatching message %s", msg)
self.debug("Dispatching message CMD %r %s", msg.cmd, msg)
if msg.seqno in self.listeners:
self.debug("Dispatching sequence number %d", msg.seqno)
# self.debug("Dispatching sequence number %d", msg.seqno)
sem = self.listeners[msg.seqno]
self.listeners[msg.seqno] = msg
sem.release()
elif msg.cmd == 0x09:
elif msg.cmd == HEART_BEAT:
self.debug("Got heartbeat response")
if self.HEARTBEAT_SEQNO in self.listeners:
sem = self.listeners[self.HEARTBEAT_SEQNO]
self.listeners[self.HEARTBEAT_SEQNO] = msg
sem.release()
elif msg.cmd == 0x12:
elif msg.cmd == UPDATEDPS:
self.debug("Got normal updatedps response")
if self.RESET_SEQNO in self.listeners:
sem = self.listeners[self.RESET_SEQNO]
self.listeners[self.RESET_SEQNO] = msg
sem.release()
elif msg.cmd == 0x08:
elif msg.cmd == SESS_KEY_NEG_RESP:
self.debug("Got key negotiation response")
if self.SESS_KEY_SEQNO in self.listeners:
sem = self.listeners[self.SESS_KEY_SEQNO]
self.listeners[self.SESS_KEY_SEQNO] = msg
sem.release()
elif msg.cmd == STATUS:
if self.RESET_SEQNO in self.listeners:
self.debug("Got reset status update")
sem = self.listeners[self.RESET_SEQNO]
@@ -327,7 +465,10 @@ class MessageDispatcher(ContextualLogger):
self.debug("Got status update")
self.listener(msg)
else:
self.debug(
if msg.cmd == CONTROL_NEW:
self.debug("Got ACK message for command %d: will ignore it", msg.cmd)
else:
self.error(
"Got message type %d for unknown listener %d: %s",
msg.cmd,
msg.seqno,
@@ -377,11 +518,12 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
self.set_logger(_LOGGER, dev_id)
self.id = dev_id
self.local_key = local_key.encode("latin1")
self.real_local_key = self.local_key
self.version = protocol_version
self.dev_type = "type_0a"
self.dps_to_request = {}
self.cipher = AESCipher(self.local_key)
self.seqno = 0
self.seqno = 1
self.transport = None
self.listener = weakref.ref(listener)
self.dispatcher = self._setup_dispatcher()
@@ -389,6 +531,40 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
self.heartbeater = None
self.dps_cache = {}
if protocol_version:
self.set_version(float(protocol_version))
else:
# make sure we call our set_version() and not a subclass since some of
# them (such as BulbDevice) make connections when called
TuyaProtocol.set_version(self, 3.1)
def set_version(self, version):
self.version = version
self.version_bytes = str(version).encode('latin1')
self.version_header = self.version_bytes + PROTOCOL_3x_HEADER
if version == 3.2: # 3.2 behaves like 3.3 with type_0d
#self.version = 3.3
self.dev_type="type_0d"
if self.dps_to_request == {}:
self.detect_available_dps()
elif version == 3.4:
self.dev_type = "v3.4"
elif self.dev_type == "v3.4":
self.dev_type = "default"
def error_json(self, number=None, payload=None):
"""Return error details in JSON"""
try:
spayload = json.dumps(payload)
# spayload = payload.replace('\"','').replace('\'','')
except:
spayload = '""'
vals = (error_codes[number], str(number), spayload)
self.debug("ERROR %s - %s - payload: %s", *vals)
return json.loads('{ "Error":"%s", "Err":"%s", "Payload":%s }' % vals)
def _setup_dispatcher(self):
def _status_update(msg):
decoded_message = self._decode_payload(msg.payload)
@@ -399,7 +575,7 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
if listener is not None:
listener.status_updated(self.dps_cache)
return MessageDispatcher(self.id, _status_update)
return MessageDispatcher(self.id, _status_update, self.version, self.local_key)
def connection_made(self, transport):
"""Did connect to the device."""
@@ -434,11 +610,13 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
def data_received(self, data):
"""Received data from device."""
# self.debug("received data=%r", binascii.hexlify(data))
self.dispatcher.add_data(data)
def connection_lost(self, exc):
"""Disconnected from device."""
self.debug("Connection lost: %s", exc)
self.real_local_key = self.local_key
try:
listener = self.listener and self.listener()
if listener is not None:
@@ -449,6 +627,7 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
async def close(self):
"""Close connection and abort all outstanding listeners."""
self.debug("Closing connection")
self.real_local_key = self.local_key
if self.heartbeater is not None:
self.heartbeater.cancel()
try:
@@ -464,31 +643,78 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
self.transport = None
transport.close()
# similar to exchange() but never retries sending and does not decode the response
async def exchange_quick(self, payload, recv_retries):
if not self.transport:
self.debug("[" + self.id + "] send quick failed, could not get socket: %s", payload)
return None
enc_payload = self._encode_message(payload) if type(payload) == MessagePayload else payload
# self.debug("Quick-dispatching message %s, seqno %s", binascii.hexlify(enc_payload), self.seqno)
try:
self.transport.write(enc_payload)
except:
# self._check_socket_close(True)
self.close()
return None
while recv_retries:
try:
#msg = await self._receive()
seqno = MessageDispatcher.SESS_KEY_SEQNO
# seqno = self.seqno - 1
msg = await self.dispatcher.wait_for(seqno, payload.cmd)
# for 3.4 devices, we get the starting seqno with the SESS_KEY_NEG_RESP message
self.seqno = msg.seqno
except:
msg = None
if msg and len(msg.payload) != 0:
return msg
recv_retries -= 1
if recv_retries == 0:
self.debug("received null payload (%r) but out of recv retries, giving up", msg)
else:
self.debug("received null payload (%r), fetch new one - %s retries remaining", msg, recv_retries)
return None
async def exchange(self, command, dps=None):
"""Send and receive a message, returning response from device."""
if self.version == 3.4 and self.real_local_key == self.local_key:
self.debug("3.4 device: negotiating a new session key")
await self._negotiate_session_key()
self.debug(
"Sending command %s (device type: %s)",
command,
self.dev_type,
)
payload = self._generate_payload(command, dps)
real_cmd = payload.cmd
dev_type = self.dev_type
# self.debug("Exchange: payload %r %r", payload.cmd, payload.payload)
# Wait for special sequence number if heartbeat or reset
seqno = self.seqno - 1
seqno = self.seqno
if command == HEARTBEAT:
if payload.cmd == HEARTBEAT:
seqno = MessageDispatcher.HEARTBEAT_SEQNO
elif command == RESET:
elif payload.cmd == RESET:
seqno = MessageDispatcher.RESET_SEQNO
self.transport.write(payload)
msg = await self.dispatcher.wait_for(seqno)
enc_payload = self._encode_message(payload)
self.transport.write(enc_payload)
msg = await self.dispatcher.wait_for(seqno, payload.cmd )
if msg is None:
self.debug("Wait was aborted for seqno %d", seqno)
return None
# TODO: Verify stuff, e.g. CRC sequence number?
if real_cmd == CONTROL_NEW and len(msg.payload) == 0:
# device may send one or two messages with empty payload in response
# to a CONTROL_NEW command, consider it an ACK
self.debug("ACK received for command %d: ignoring it", real_cmd)
return None
payload = self._decode_payload(msg.payload)
# Perform a new exchange (once) if we switched device type
@@ -504,7 +730,7 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
async def status(self):
"""Return device status."""
status = await self.exchange(STATUS)
status = await self.exchange(DP_QUERY)
if status and "dps" in status:
self.dps_cache.update(status["dps"])
return self.dps_cache
@@ -539,7 +765,8 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
dps = list(set(dps).intersection(set(UPDATE_DPS_WHITELIST)))
self.debug("updatedps() entry (dps %s, dps_cache %s)", dps, self.dps_cache)
payload = self._generate_payload(UPDATEDPS, dps)
self.transport.write(payload)
enc_payload = self._encode_message(payload)
self.transport.write(enc_payload)
return True
async def set_dp(self, value, dp_index):
@@ -550,11 +777,11 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
dp_index(int): dps index to set
value: new value for the dps index
"""
return await self.exchange(SET, {str(dp_index): value})
return await self.exchange(CONTROL, {str(dp_index): value})
async def set_dps(self, dps):
"""Set values for a set of datapoints."""
return await self.exchange(SET, dps)
return await self.exchange(CONTROL, dps)
async def detect_available_dps(self):
"""Return which datapoints are supported by the device."""
@@ -591,38 +818,175 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
self.dps_to_request.update({str(index): None for index in dp_indicies})
def _decode_payload(self, payload):
if not payload:
payload = "{}"
elif payload.startswith(b"{"):
pass
elif payload.startswith(PROTOCOL_VERSION_BYTES_31):
payload = payload[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header
# remove (what I'm guessing, but not confirmed is) 16-bytes of MD5
# hexdigest of payload
payload = self.cipher.decrypt(payload[16:])
elif self.version == 3.3:
if self.dev_type != "type_0a" or payload.startswith(
PROTOCOL_VERSION_BYTES_33
):
payload = payload[len(PROTOCOL_33_HEADER) :]
payload = self.cipher.decrypt(payload, False)
cipher = AESCipher(self.local_key)
if self.version == 3.4:
# 3.4 devices encrypt the version header in addition to the payload
try:
# self.debug("decrypting=%r", payload)
payload = cipher.decrypt(payload, False, decode_text=False)
except:
self.debug("incomplete payload=%r (len:%d)", payload, len(payload))
return self.error_json(ERR_PAYLOAD)
# self.debug("decrypted 3.x payload=%r", payload)
if payload.startswith(PROTOCOL_VERSION_BYTES_31):
# Received an encrypted payload
# Remove version header
payload = payload[len(PROTOCOL_VERSION_BYTES_31) :]
# Decrypt payload
# Remove 16-bytes of MD5 hexdigest of payload
payload = cipher.decrypt(payload[16:])
elif self.version >= 3.2: # 3.2 or 3.3 or 3.4
# Trim header for non-default device type
if payload.startswith( self.version_bytes ):
payload = payload[len(self.version_header) :]
# self.debug("removing 3.x=%r", payload)
elif self.dev_type == "type_0d" and (len(payload) & 0x0F) != 0:
payload = payload[len(self.version_header) :]
# self.debug("removing type_0d 3.x header=%r", payload)
if self.version != 3.4:
try:
# self.debug("decrypting=%r", payload)
payload = cipher.decrypt(payload, False)
except:
self.debug("incomplete payload=%r (len:%d)", payload, len(payload))
return self.error_json(ERR_PAYLOAD)
# self.debug("decrypted 3.x payload=%r", payload)
# Try to detect if type_0d found
if not isinstance(payload, str):
try:
payload = payload.decode()
except:
self.debug("payload was not string type and decoding failed")
return self.error_json(ERR_JSON, payload)
if "data unvalid" in payload:
self.dev_type = "type_0d"
# set at least one DPS
self.dps_to_request = {"1": None}
self.debug(
"switching to dev_type %s",
"'data unvalid' error detected: switching to dev_type %r",
self.dev_type,
)
return None
else:
raise Exception(f"Unexpected payload={payload}")
elif not payload.startswith(b"{"):
self.debug("Unexpected payload=%r", payload)
return self.error_json(ERR_PAYLOAD, payload)
if not isinstance(payload, str):
payload = payload.decode()
self.debug("Decrypted payload: %s", payload)
return json.loads(payload)
self.debug("Deciphered data = %r", payload)
try:
json_payload = json.loads(payload)
except:
json_payload = self.error_json(ERR_JSON, payload)
def _generate_payload(self, command, data=None):
# v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...}
if "dps" not in json_payload and "data" in json_payload and "dps" in json_payload['data']:
json_payload['dps'] = json_payload['data']['dps']
return json_payload
async def _negotiate_session_key(self):
self.local_nonce = b'0123456789abcdef' # not-so-random random key
self.remote_nonce = b''
self.local_key = self.real_local_key
rkey = await self.exchange_quick( MessagePayload(SESS_KEY_NEG_START, self.local_nonce), 2 )
if not rkey or type(rkey) != TuyaMessage or len(rkey.payload) < 48:
# error
self.debug("session key negotiation failed on step 1")
return False
if rkey.cmd != SESS_KEY_NEG_RESP:
self.debug("session key negotiation step 2 returned wrong command: %d", rkey.cmd)
return False
payload = rkey.payload
try:
# self.debug("decrypting %r using %r", payload, self.real_local_key)
cipher = AESCipher(self.real_local_key)
payload = cipher.decrypt(payload, False, decode_text=False)
except:
self.debug("session key step 2 decrypt failed, payload=%r (len:%d)", payload, len(payload))
return False
self.debug("decrypted session key negotiation step 2: payload=%r", payload)
if len(payload) < 48:
self.debug("session key negotiation step 2 failed, too short response")
return False
self.remote_nonce = payload[:16]
hmac_check = hmac.new(self.local_key, self.local_nonce, sha256).digest()
if hmac_check != payload[16:48]:
self.debug("session key negotiation step 2 failed HMAC check! wanted=%r but got=%r", binascii.hexlify(hmac_check), binascii.hexlify(payload[16:48]))
# self.debug("session local nonce: %r remote nonce: %r", self.local_nonce, self.remote_nonce)
rkey_hmac = hmac.new(self.local_key, self.remote_nonce, sha256).digest()
await self.exchange_quick( MessagePayload(SESS_KEY_NEG_FINISH, rkey_hmac), None )
self.local_key = bytes( [ a^b for (a,b) in zip(self.local_nonce,self.remote_nonce) ] )
# self.debug("Session nonce XOR'd: %r" % self.local_key)
cipher = AESCipher(self.real_local_key)
self.local_key = self.dispatcher.local_key = cipher.encrypt(self.local_key, False, pad=False)
self.debug("Session key negotiate success! session key: %r", self.local_key)
return True
# adds protocol header (if needed) and encrypts
def _encode_message( self, msg ):
hmac_key = None
payload = msg.payload
self.cipher = AESCipher(self.local_key)
if self.version == 3.4:
hmac_key = self.local_key
if msg.cmd not in NO_PROTOCOL_HEADER_CMDS:
# add the 3.x header
payload = self.version_header + payload
self.debug('final payload for cmd %r: %r', msg.cmd, payload)
payload = self.cipher.encrypt(payload, False)
elif self.version >= 3.2:
# expect to connect and then disconnect to set new
payload = self.cipher.encrypt(payload, False)
if msg.cmd not in NO_PROTOCOL_HEADER_CMDS:
# add the 3.x header
payload = self.version_header + payload
elif msg.cmd == CONTROL:
# need to encrypt
payload = self.cipher.encrypt(payload)
preMd5String = (
b"data="
+ payload
+ b"||lpv="
+ PROTOCOL_VERSION_BYTES_31
+ b"||"
+ self.local_key
)
m = md5()
m.update(preMd5String)
hexdigest = m.hexdigest()
# some tuya libraries strip 8: to :24
payload = (
PROTOCOL_VERSION_BYTES_31
+ hexdigest[8:][:16].encode("latin1")
+ payload
)
self.cipher = None
msg = TuyaMessage(self.seqno, msg.cmd, 0, payload, 0, True)
self.seqno += 1 # increase message sequence number
buffer = pack_message(msg,hmac_key=hmac_key)
# self.debug("payload encrypted with key %r => %r", self.local_key, binascii.hexlify(buffer))
return buffer
def _generate_payload(self, command, data=None, gwId=None, devId=None, uid=None):
"""
Generate the payload to send.
@@ -631,58 +995,73 @@ class TuyaProtocol(asyncio.Protocol, ContextualLogger):
This is one of the entries from payload_dict
data(dict, optional): The data to be send.
This is what will be passed via the 'dps' entry
gwId(str, optional): Will be used for gwId
devId(str, optional): Will be used for devId
uid(str, optional): Will be used for uid
"""
cmd_data = PAYLOAD_DICT[self.dev_type][command]
json_data = cmd_data["command"]
command_hb = cmd_data["hexByte"]
json_data = command_override = None
if command in payload_dict[self.dev_type]:
if 'command' in payload_dict[self.dev_type][command]:
json_data = payload_dict[self.dev_type][command]['command']
if 'command_override' in payload_dict[self.dev_type][command]:
command_override = payload_dict[self.dev_type][command]['command_override']
if self.dev_type != 'type_0a':
if json_data is None and command in payload_dict['type_0a'] and 'command' in payload_dict['type_0a'][command]:
json_data = payload_dict['type_0a'][command]['command']
if command_override is None and command in payload_dict['type_0a'] and 'command_override' in payload_dict['type_0a'][command]:
command_override = payload_dict['type_0a'][command]['command_override']
if command_override is None:
command_override = command
if json_data is None:
# I have yet to see a device complain about included but unneeded attribs, but they *will*
# complain about missing attribs, so just include them all unless otherwise specified
json_data = {"gwId": "", "devId": "", "uid": "", "t": ""}
cmd_data = ""
if "gwId" in json_data:
if gwId is not None:
json_data["gwId"] = gwId
else:
json_data["gwId"] = self.id
if "devId" in json_data:
if devId is not None:
json_data["devId"] = devId
else:
json_data["devId"] = self.id
if "uid" in json_data:
json_data["uid"] = self.id # still use id, no separate uid
if uid is not None:
json_data["uid"] = uid
else:
json_data["uid"] = self.id
if "t" in json_data:
if json_data['t'] == "int":
json_data["t"] = int(time.time())
else:
json_data["t"] = str(int(time.time()))
if data is not None:
if "dpId" in json_data:
json_data["dpId"] = data
elif "data" in json_data:
json_data["data"] = {"dps": data}
else:
json_data["dps"] = data
elif command_hb == 0x0D:
elif self.dev_type == "type_0d" and command == DP_QUERY:
json_data["dps"] = self.dps_to_request
payload = json.dumps(json_data).replace(" ", "").encode("utf-8")
self.debug("Send payload: %s", payload)
if json_data == "":
payload = ""
else:
payload = json.dumps(json_data)
# if spaces are not removed device does not respond!
payload = payload.replace(" ", "").encode("utf-8")
# self.debug("Sending payload: %s", payload)
if self.version == 3.3:
payload = self.cipher.encrypt(payload, False)
if command_hb not in [0x0A, 0x12]:
# add the 3.3 header
payload = PROTOCOL_33_HEADER + payload
elif command == SET:
payload = self.cipher.encrypt(payload)
to_hash = (
b"data="
+ payload
+ b"||lpv="
+ PROTOCOL_VERSION_BYTES_31
+ b"||"
+ self.local_key
)
hasher = md5()
hasher.update(to_hash)
hexdigest = hasher.hexdigest()
payload = (
PROTOCOL_VERSION_BYTES_31
+ hexdigest[8:][:16].encode("latin1")
+ payload
)
return MessagePayload(command_override, payload)
msg = TuyaMessage(self.seqno, command_hb, 0, payload, 0)
self.seqno += 1
return pack_message(msg)
def __repr__(self):
"""Return internal string representation of object."""