Files
localtuya-modded/custom_components/localtuya/pytuya/__init__.py

1092 lines
41 KiB
Python

# PyTuya Module
# -*- coding: utf-8 -*-
"""
Python module to interface with Tuya WiFi smart devices.
Author: clach04, postlund
Maintained by: rospogrigio
For more information see https://github.com/clach04/python-tuya
Classes
TuyaInterface(dev_id, address, local_key=None)
dev_id (str): Device ID e.g. 01234567891234567890
address (str): Device Network IP Address e.g. 10.0.1.99
local_key (str, optional): The encryption key. Defaults to None.
Functions
json = status() # returns json payload
set_version(version) # 3.1 [default], 3.2, 3.3 or 3.4
detect_available_dps() # returns a list of available dps provided by the device
update_dps(dps) # sends update dps command
add_dps_to_request(dp_index) # adds dp_index to the list of dps used by the
# device (to be queried in the payload)
set_dp(on, dp_index) # Set value of any dps index.
Credits
* TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
For protocol reverse engineering
* PyTuya https://github.com/clach04/python-tuya by clach04
The origin of this python module (now abandoned)
* Tuya Protocol 3.4 Support by uzlonewolf
Enhancement to TuyaMessage logic for multi-payload messages and Tuya Protocol 3.4 support
* TinyTuya https://github.com/jasonacox/tinytuya by jasonacox
Several CLI tools and code for Tuya devices
"""
import asyncio
import base64
import binascii
import hmac
import json
import logging
import struct
import time
import weakref
from abc import ABC, abstractmethod
from collections import namedtuple
from hashlib import md5,sha256
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
version_tuple = (10, 0, 0)
version = version_string = __version__ = "%d.%d.%d" % version_tuple
__author__ = "rospogrigio"
_LOGGER = logging.getLogger(__name__)
# Tuya Packet Format
TuyaHeader = namedtuple('TuyaHeader', 'prefix seqno cmd length')
MessagePayload = namedtuple("MessagePayload", "cmd payload")
try:
TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc crc_good", defaults=(True,))
except:
TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc crc_good")
# TinyTuya Error Response Codes
ERR_JSON = 900
ERR_CONNECT = 901
ERR_TIMEOUT = 902
ERR_RANGE = 903
ERR_PAYLOAD = 904
ERR_OFFLINE = 905
ERR_STATE = 906
ERR_FUNCTION = 907
ERR_DEVTYPE = 908
ERR_CLOUDKEY = 909
ERR_CLOUDRESP = 910
ERR_CLOUDTOKEN = 911
ERR_PARAMS = 912
ERR_CLOUD = 913
error_codes = {
ERR_JSON: "Invalid JSON Response from Device",
ERR_CONNECT: "Network Error: Unable to Connect",
ERR_TIMEOUT: "Timeout Waiting for Device",
ERR_RANGE: "Specified Value Out of Range",
ERR_PAYLOAD: "Unexpected Payload from Device",
ERR_OFFLINE: "Network Error: Device Unreachable",
ERR_STATE: "Device in Unknown State",
ERR_FUNCTION: "Function Not Supported by Device",
ERR_DEVTYPE: "Device22 Detected: Retry Command",
ERR_CLOUDKEY: "Missing Tuya Cloud Key and Secret",
ERR_CLOUDRESP: "Invalid JSON Response from Cloud",
ERR_CLOUDTOKEN: "Unable to Get Cloud Token",
ERR_PARAMS: "Missing Function Parameters",
ERR_CLOUD: "Error Response from Tuya Cloud",
None: "Unknown Error",
}
# Tuya Command Types
# Reference: https://github.com/tuya/tuya-iotos-embeded-sdk-wifi-ble-bk7231n/blob/master/sdk/include/lan_protocol.h
AP_CONFIG = 0x01 # FRM_TP_CFG_WF # only used for ap 3.0 network config
ACTIVE = 0x02 # FRM_TP_ACTV (discard) # WORK_MODE_CMD
SESS_KEY_NEG_START = 0x03 # FRM_SECURITY_TYPE3 # negotiate session key
SESS_KEY_NEG_RESP = 0x04 # FRM_SECURITY_TYPE4 # negotiate session key response
SESS_KEY_NEG_FINISH = 0x05 # FRM_SECURITY_TYPE5 # finalize session key negotiation
UNBIND = 0x06 # FRM_TP_UNBIND_DEV # DATA_QUERT_CMD - issue command
CONTROL = 0x07 # FRM_TP_CMD # STATE_UPLOAD_CMD
STATUS = 0x08 # FRM_TP_STAT_REPORT # STATE_QUERY_CMD
HEART_BEAT = 0x09 # FRM_TP_HB
DP_QUERY = 0x0a # 10 # FRM_QUERY_STAT # UPDATE_START_CMD - get data points
QUERY_WIFI = 0x0b # 11 # FRM_SSID_QUERY (discard) # UPDATE_TRANS_CMD
TOKEN_BIND = 0x0c # 12 # FRM_USER_BIND_REQ # GET_ONLINE_TIME_CMD - system time (GMT)
CONTROL_NEW = 0x0d # 13 # FRM_TP_NEW_CMD # FACTORY_MODE_CMD
ENABLE_WIFI = 0x0e # 14 # FRM_ADD_SUB_DEV_CMD # WIFI_TEST_CMD
WIFI_INFO = 0x0f # 15 # FRM_CFG_WIFI_INFO
DP_QUERY_NEW = 0x10 # 16 # FRM_QUERY_STAT_NEW
SCENE_EXECUTE = 0x11 # 17 # FRM_SCENE_EXEC
UPDATEDPS = 0x12 # 18 # FRM_LAN_QUERY_DP # Request refresh of DPS
UDP_NEW = 0x13 # 19 # FR_TYPE_ENCRYPTION
AP_CONFIG_NEW = 0x14 # 20 # FRM_AP_CFG_WF_V40
BOARDCAST_LPV34 = 0x23 # 35 # FR_TYPE_BOARDCAST_LPV34
LAN_EXT_STREAM = 0x40 # 64 # FRM_LAN_EXT_STREAM
PROTOCOL_VERSION_BYTES_31 = b"3.1"
PROTOCOL_VERSION_BYTES_33 = b"3.3"
PROTOCOL_VERSION_BYTES_34 = b"3.4"
PROTOCOL_3x_HEADER = 12 * b"\x00"
PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + PROTOCOL_3x_HEADER
PROTOCOL_34_HEADER = PROTOCOL_VERSION_BYTES_34 + PROTOCOL_3x_HEADER
MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length [, retcode]
MESSAGE_RECV_HEADER_FMT = ">5I" # 4*uint32: prefix, seqno, cmd, length, retcode
MESSAGE_RETCODE_FMT = ">I" # retcode for received messages
MESSAGE_END_FMT = ">2I" # 2*uint32: crc, suffix
MESSAGE_END_FMT_HMAC = ">32sI" # 32s:hmac, uint32:suffix
PREFIX_VALUE = 0x000055AA
PREFIX_BIN = b"\x00\x00U\xaa"
SUFFIX_VALUE = 0x0000AA55
SUFFIX_BIN = b"\x00\x00\xaaU"
NO_PROTOCOL_HEADER_CMDS = [DP_QUERY, DP_QUERY_NEW, UPDATEDPS, HEART_BEAT, SESS_KEY_NEG_START, SESS_KEY_NEG_RESP, SESS_KEY_NEG_FINISH ]
HEARTBEAT_INTERVAL = 10
# DPS that are known to be safe to use with update_dps (0x12) command
UPDATE_DPS_WHITELIST = [18, 19, 20] # Socket (Wi-Fi)
# Tuya Device Dictionary - Command and Payload Overrides
# This is intended to match requests.json payload at
# https://github.com/codetheweb/tuyapi :
# 'type_0a' devices require the 0a command for the DP_QUERY request
# 'type_0d' devices require the 0d command for the DP_QUERY request and a list of
# dps used set to Null in the request payload
# prefix: # Next byte is command byte ("hexByte") some zero padding, then length
# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for
# length, zero padding implies could be more than one byte)
# Any command not defined in payload_dict will be sent as-is with a
# payload of {"gwId": "", "devId": "", "uid": "", "t": ""}
payload_dict = {
# Default Device
"type_0a": {
AP_CONFIG: { # [BETA] Set Control Values on Device
"command": {"gwId": "", "devId": "", "uid": "", "t": ""},
},
CONTROL: { # Set Control Values on Device
"command": {"devId": "", "uid": "", "t": ""},
},
STATUS: { # Get Status from Device
"command": {"gwId": "", "devId": ""},
},
HEART_BEAT: {"command": {"gwId": "", "devId": ""}},
DP_QUERY: { # Get Data Points from Device
"command": {"gwId": "", "devId": "", "uid": "", "t": ""},
},
CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": ""}},
DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": ""}},
UPDATEDPS: {"command": {"dpId": [18, 19, 20]}},
},
# Special Case Device "0d" - Some of these devices
# Require the 0d command as the DP_QUERY status request and the list of
# dps requested payload
"type_0d": {
DP_QUERY: { # Get Data Points from Device
"command_override": CONTROL_NEW, # Uses CONTROL_NEW command for some reason
"command": {"devId": "", "uid": "", "t": ""},
},
},
"v3.4": {
CONTROL: {
"command_override": CONTROL_NEW, # Uses CONTROL_NEW command
"command": {"protocol":5, "t": "int", "data": ""}
},
DP_QUERY: { "command_override": DP_QUERY_NEW },
}
}
class TuyaLoggingAdapter(logging.LoggerAdapter):
"""Adapter that adds device id to all log points."""
def process(self, msg, kwargs):
"""Process log point and return output."""
dev_id = self.extra["device_id"]
return f"[{dev_id[0:3]}...{dev_id[-3:]}] {msg}", kwargs
class ContextualLogger:
"""Contextual logger adding device id to log points."""
def __init__(self):
"""Initialize a new ContextualLogger."""
self._logger = None
def set_logger(self, logger, device_id):
"""Set base logger to use."""
self._logger = TuyaLoggingAdapter(logger, {"device_id": device_id})
def debug(self, msg, *args):
"""Debug level log."""
return self._logger.log(logging.DEBUG, msg, *args)
def info(self, msg, *args):
"""Info level log."""
return self._logger.log(logging.INFO, msg, *args)
def warning(self, msg, *args):
"""Warning method log."""
return self._logger.log(logging.WARNING, msg, *args)
def error(self, msg, *args):
"""Error level log."""
return self._logger.log(logging.ERROR, msg, *args)
def exception(self, msg, *args):
"""Exception level log."""
return self._logger.exception(msg, *args)
def pack_message(msg,hmac_key=None):
"""Pack a TuyaMessage into bytes."""
end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT
# Create full message excluding CRC and suffix
buffer = (
struct.pack(
MESSAGE_HEADER_FMT,
PREFIX_VALUE,
msg.seqno,
msg.cmd,
len(msg.payload) + struct.calcsize(end_fmt),
)
+ msg.payload
)
if hmac_key:
crc = hmac.new(hmac_key, buffer, sha256).digest()
else:
crc = binascii.crc32(buffer) & 0xFFFFFFFF
# Calculate CRC, add it together with suffix
buffer += struct.pack(
end_fmt, crc, SUFFIX_VALUE
)
return buffer
def unpack_message(data, hmac_key=None, header=None, no_retcode=False, logger=None):
"""Unpack bytes into a TuyaMessage."""
end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT
# 4-word header plus return code
header_len = struct.calcsize(MESSAGE_HEADER_FMT)
retcode_len = 0 if no_retcode else struct.calcsize(MESSAGE_RETCODE_FMT)
end_len = struct.calcsize(end_fmt)
headret_len = header_len + retcode_len
if len(data) < headret_len+end_len:
logger.debug('unpack_message(): not enough data to unpack header! need %d but only have %d', headret_len+end_len, len(data))
raise DecodeError('Not enough data to unpack header')
if header is None:
header = parse_header(data)
if len(data) < header_len+header.length:
logger.debug('unpack_message(): not enough data to unpack payload! need %d but only have %d', header_len+header.length, len(data))
raise DecodeError('Not enough data to unpack payload')
retcode = 0 if no_retcode else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])[0]
# the retcode is technically part of the payload, but strip it as we do not want it here
payload = data[header_len+retcode_len:header_len+header.length]
crc, suffix = struct.unpack(end_fmt, payload[-end_len:])
if hmac_key:
have_crc = hmac.new(hmac_key, data[:(header_len+header.length)-end_len], sha256).digest()
else:
have_crc = binascii.crc32(data[:(header_len+header.length)-end_len]) & 0xFFFFFFFF
if suffix != SUFFIX_VALUE:
logger.debug('Suffix prefix wrong! %08X != %08X', suffix, SUFFIX_VALUE)
if crc != have_crc:
if hmac_key:
logger.debug('HMAC checksum wrong! %r != %r', binascii.hexlify(have_crc), binascii.hexlify(crc))
else:
logger.debug('CRC wrong! %08X != %08X', have_crc, crc)
return TuyaMessage(header.seqno, header.cmd, retcode, payload[:-end_len], crc, crc == have_crc)
def parse_header(data):
header_len = struct.calcsize(MESSAGE_HEADER_FMT)
if len(data) < header_len:
raise DecodeError('Not enough data to unpack header')
prefix, seqno, cmd, payload_len = struct.unpack(
MESSAGE_HEADER_FMT, data[:header_len]
)
if prefix != PREFIX_VALUE:
#self.debug('Header prefix wrong! %08X != %08X', prefix, PREFIX_VALUE)
raise DecodeError('Header prefix wrong! %08X != %08X' % (prefix, PREFIX_VALUE))
# sanity check. currently the max payload length is somewhere around 300 bytes
if payload_len > 1000:
raise DecodeError('Header claims the packet size is over 1000 bytes! It is most likely corrupt. Claimed size: %d bytes' % payload_len)
return TuyaHeader(prefix, seqno, cmd, payload_len)
class AESCipher:
"""Cipher module for Tuya communication."""
def __init__(self, key):
"""Initialize a new AESCipher."""
self.block_size = 16
self.cipher = Cipher(algorithms.AES(key), modes.ECB(), default_backend())
def encrypt(self, raw, use_base64=True, pad=True):
"""Encrypt data to be sent to device."""
encryptor = self.cipher.encryptor()
if pad: raw = self._pad(raw)
crypted_text = encryptor.update(raw) + encryptor.finalize()
return base64.b64encode(crypted_text) if use_base64 else crypted_text
def decrypt(self, enc, use_base64=True, decode_text=True):
"""Decrypt data from device."""
if use_base64:
enc = base64.b64decode(enc)
decryptor = self.cipher.decryptor()
raw = self._unpad(decryptor.update(enc) + decryptor.finalize())
return raw.decode("utf-8") if decode_text else raw
def _pad(self, data):
padnum = self.block_size - len(data) % self.block_size
return data + padnum * chr(padnum).encode()
@staticmethod
def _unpad(data):
return data[: -ord(data[len(data) - 1 :])]
class MessageDispatcher(ContextualLogger):
"""Buffer and dispatcher for Tuya messages."""
# Heartbeats on protocols < 3.3 respond with sequence number 0,
# so they can't be waited for like other messages.
# This is a hack to allow waiting for heartbeats.
HEARTBEAT_SEQNO = -100
RESET_SEQNO = -101
SESS_KEY_SEQNO = -102
def __init__(self, dev_id, listener, version, local_key):
"""Initialize a new MessageBuffer."""
super().__init__()
self.buffer = b""
self.listeners = {}
self.listener = listener
self.version = version
self.local_key = local_key
self.set_logger(_LOGGER, dev_id)
def abort(self):
"""Abort all waiting clients."""
for key in self.listeners:
sem = self.listeners[key]
self.listeners[key] = None
# TODO: Received data and semahore should be stored separately
if isinstance(sem, asyncio.Semaphore):
sem.release()
async def wait_for(self, seqno, cmd, timeout=5):
"""Wait for response to a sequence number to be received and return it."""
if seqno in self.listeners:
raise Exception(f"listener exists for {seqno}")
self.debug("Command %d waiting for sequence number %d", cmd, seqno)
self.listeners[seqno] = asyncio.Semaphore(0)
try:
await asyncio.wait_for(self.listeners[seqno].acquire(), timeout=timeout)
except asyncio.TimeoutError:
del self.listeners[seqno]
raise
return self.listeners.pop(seqno)
def add_data(self, data):
"""Add new data to the buffer and try to parse messages."""
self.buffer += data
header_len = struct.calcsize(MESSAGE_RECV_HEADER_FMT)
while self.buffer:
# Check if enough data for measage header
if len(self.buffer) < header_len:
break
header = parse_header(self.buffer)
hmac_key = self.local_key if self.version == 3.4 else None
msg = unpack_message(self.buffer, header=header, hmac_key=hmac_key, logger=self);
self.buffer = self.buffer[header_len - 4 + header.length :]
self._dispatch(msg)
def _dispatch(self, msg):
"""Dispatch a message to someone that is listening."""
self.debug("Dispatching message CMD %r %s", msg.cmd, msg)
if msg.seqno in self.listeners:
# self.debug("Dispatching sequence number %d", msg.seqno)
sem = self.listeners[msg.seqno]
self.listeners[msg.seqno] = msg
sem.release()
elif msg.cmd == HEART_BEAT:
self.debug("Got heartbeat response")
if self.HEARTBEAT_SEQNO in self.listeners:
sem = self.listeners[self.HEARTBEAT_SEQNO]
self.listeners[self.HEARTBEAT_SEQNO] = msg
sem.release()
elif msg.cmd == UPDATEDPS:
self.debug("Got normal updatedps response")
if self.RESET_SEQNO in self.listeners:
sem = self.listeners[self.RESET_SEQNO]
self.listeners[self.RESET_SEQNO] = msg
sem.release()
elif msg.cmd == SESS_KEY_NEG_RESP:
self.debug("Got key negotiation response")
if self.SESS_KEY_SEQNO in self.listeners:
sem = self.listeners[self.SESS_KEY_SEQNO]
self.listeners[self.SESS_KEY_SEQNO] = msg
sem.release()
elif msg.cmd == STATUS:
if self.RESET_SEQNO in self.listeners:
self.debug("Got reset status update")
sem = self.listeners[self.RESET_SEQNO]
self.listeners[self.RESET_SEQNO] = msg
sem.release()
else:
self.debug("Got status update")
self.listener(msg)
else:
if msg.cmd == CONTROL_NEW:
self.debug("Got ACK message for command %d: will ignore it", msg.cmd)
else:
self.error(
"Got message type %d for unknown listener %d: %s",
msg.cmd,
msg.seqno,
msg,
)
class TuyaListener(ABC):
"""Listener interface for Tuya device changes."""
@abstractmethod
def status_updated(self, status):
"""Device updated status."""
@abstractmethod
def disconnected(self):
"""Device disconnected."""
class EmptyListener(TuyaListener):
"""Listener doing nothing."""
def status_updated(self, status):
"""Device updated status."""
def disconnected(self):
"""Device disconnected."""
class TuyaProtocol(asyncio.Protocol, ContextualLogger):
"""Implementation of the Tuya protocol."""
def __init__(self, dev_id, local_key, protocol_version, on_connected, listener):
"""
Initialize a new TuyaInterface.
Args:
dev_id (str): The device id.
address (str): The network address.
local_key (str, optional): The encryption key. Defaults to None.
Attributes:
port (int): The port to connect to.
"""
super().__init__()
self.loop = asyncio.get_running_loop()
self.set_logger(_LOGGER, dev_id)
self.id = dev_id
self.local_key = local_key.encode("latin1")
self.real_local_key = self.local_key
self.version = protocol_version
self.dev_type = "type_0a"
self.dps_to_request = {}
self.cipher = AESCipher(self.local_key)
self.seqno = 1
self.transport = None
self.listener = weakref.ref(listener)
self.dispatcher = self._setup_dispatcher()
self.on_connected = on_connected
self.heartbeater = None
self.dps_cache = {}
if protocol_version:
self.set_version(float(protocol_version))
else:
# make sure we call our set_version() and not a subclass since some of
# them (such as BulbDevice) make connections when called
TuyaProtocol.set_version(self, 3.1)
def set_version(self, version):
self.version = version
self.version_bytes = str(version).encode('latin1')
self.version_header = self.version_bytes + PROTOCOL_3x_HEADER
if version == 3.2: # 3.2 behaves like 3.3 with type_0d
#self.version = 3.3
self.dev_type="type_0d"
if self.dps_to_request == {}:
self.detect_available_dps()
elif version == 3.4:
self.dev_type = "v3.4"
elif self.dev_type == "v3.4":
self.dev_type = "default"
def error_json(self, number=None, payload=None):
"""Return error details in JSON"""
try:
spayload = json.dumps(payload)
# spayload = payload.replace('\"','').replace('\'','')
except:
spayload = '""'
vals = (error_codes[number], str(number), spayload)
self.debug("ERROR %s - %s - payload: %s", *vals)
return json.loads('{ "Error":"%s", "Err":"%s", "Payload":%s }' % vals)
def _setup_dispatcher(self):
def _status_update(msg):
if self.seqno > 0:
self.seqno = msg.seqno + 1
decoded_message = self._decode_payload(msg.payload)
if "dps" in decoded_message:
self.dps_cache.update(decoded_message["dps"])
listener = self.listener and self.listener()
if listener is not None:
listener.status_updated(self.dps_cache)
return MessageDispatcher(self.id, _status_update, self.version, self.local_key)
def connection_made(self, transport):
"""Did connect to the device."""
self.transport = transport
self.on_connected.set_result(True)
def start_heartbeat(self):
"""Start the heartbeat transmissions with the device."""
async def heartbeat_loop():
"""Continuously send heart beat updates."""
self.debug("Started heartbeat loop")
while True:
try:
await self.heartbeat()
await asyncio.sleep(HEARTBEAT_INTERVAL)
except asyncio.CancelledError:
self.debug("Stopped heartbeat loop")
raise
except asyncio.TimeoutError:
self.debug("Heartbeat failed due to timeout, disconnecting")
break
except Exception as ex: # pylint: disable=broad-except
self.exception("Heartbeat failed (%s), disconnecting", ex)
break
transport = self.transport
self.transport = None
transport.close()
self.heartbeater = self.loop.create_task(heartbeat_loop())
def data_received(self, data):
"""Received data from device."""
# self.debug("received data=%r", binascii.hexlify(data))
self.dispatcher.add_data(data)
def connection_lost(self, exc):
"""Disconnected from device."""
self.debug("Connection lost: %s", exc)
self.real_local_key = self.local_key
try:
listener = self.listener and self.listener()
if listener is not None:
listener.disconnected()
except Exception: # pylint: disable=broad-except
self.exception("Failed to call disconnected callback")
async def close(self):
"""Close connection and abort all outstanding listeners."""
self.debug("Closing connection")
self.real_local_key = self.local_key
if self.heartbeater is not None:
self.heartbeater.cancel()
try:
await self.heartbeater
except asyncio.CancelledError:
pass
self.heartbeater = None
if self.dispatcher is not None:
self.dispatcher.abort()
self.dispatcher = None
if self.transport is not None:
transport = self.transport
self.transport = None
transport.close()
# similar to exchange() but never retries sending and does not decode the response
async def exchange_quick(self, payload, recv_retries):
if not self.transport:
self.debug("[" + self.id + "] send quick failed, could not get socket: %s", payload)
return None
enc_payload = self._encode_message(payload) if type(payload) == MessagePayload else payload
# self.debug("Quick-dispatching message %s, seqno %s", binascii.hexlify(enc_payload), self.seqno)
try:
self.transport.write(enc_payload)
except:
# self._check_socket_close(True)
self.close()
return None
while recv_retries:
try:
#msg = await self._receive()
seqno = MessageDispatcher.SESS_KEY_SEQNO
# seqno = self.seqno - 1
msg = await self.dispatcher.wait_for(seqno, payload.cmd)
# for 3.4 devices, we get the starting seqno with the SESS_KEY_NEG_RESP message
self.seqno = msg.seqno
except:
msg = None
if msg and len(msg.payload) != 0:
return msg
recv_retries -= 1
if recv_retries == 0:
self.debug("received null payload (%r) but out of recv retries, giving up", msg)
else:
self.debug("received null payload (%r), fetch new one - %s retries remaining", msg, recv_retries)
return None
async def exchange(self, command, dps=None):
"""Send and receive a message, returning response from device."""
if self.version == 3.4 and self.real_local_key == self.local_key:
self.debug("3.4 device: negotiating a new session key")
await self._negotiate_session_key()
self.debug(
"Sending command %s (device type: %s)",
command,
self.dev_type,
)
payload = self._generate_payload(command, dps)
real_cmd = payload.cmd
dev_type = self.dev_type
# self.debug("Exchange: payload %r %r", payload.cmd, payload.payload)
# Wait for special sequence number if heartbeat or reset
seqno = self.seqno
if payload.cmd == HEART_BEAT:
seqno = MessageDispatcher.HEARTBEAT_SEQNO
elif payload.cmd == UPDATEDPS:
seqno = MessageDispatcher.RESET_SEQNO
enc_payload = self._encode_message(payload)
self.transport.write(enc_payload)
msg = await self.dispatcher.wait_for(seqno, payload.cmd )
if msg is None:
self.debug("Wait was aborted for seqno %d", seqno)
return None
# TODO: Verify stuff, e.g. CRC sequence number?
if real_cmd in [HEART_BEAT, CONTROL_NEW] and len(msg.payload) == 0:
# device may send one or two messages with empty payload in response
# to a CONTROL_NEW command, consider it an ACK
self.debug("ACK received for command %d: ignoring it", real_cmd)
return None
payload = self._decode_payload(msg.payload)
# Perform a new exchange (once) if we switched device type
if dev_type != self.dev_type:
self.debug(
"Re-send %s due to device type change (%s -> %s)",
command,
dev_type,
self.dev_type,
)
return await self.exchange(command, dps)
return payload
async def status(self):
"""Return device status."""
status = await self.exchange(DP_QUERY)
if status and "dps" in status:
self.dps_cache.update(status["dps"])
return self.dps_cache
async def heartbeat(self):
"""Send a heartbeat message."""
return await self.exchange(HEART_BEAT)
async def reset(self, dpIds=None):
"""Send a reset message (3.3 only)."""
if self.version == 3.3:
self.dev_type = "type_0a"
self.debug("reset switching to dev_type %s", self.dev_type)
return await self.exchange(UPDATEDPS, dpIds)
return True
async def update_dps(self, dps=None):
"""
Request device to update index.
Args:
dps([int]): list of dps to update, default=detected&whitelisted
"""
if self.version == 3.3:
if dps is None:
if not self.dps_cache:
await self.detect_available_dps()
if self.dps_cache:
dps = [int(dp) for dp in self.dps_cache]
# filter non whitelisted dps
dps = list(set(dps).intersection(set(UPDATE_DPS_WHITELIST)))
self.debug("updatedps() entry (dps %s, dps_cache %s)", dps, self.dps_cache)
payload = self._generate_payload(UPDATEDPS, dps)
enc_payload = self._encode_message(payload)
self.transport.write(enc_payload)
return True
async def set_dp(self, value, dp_index):
"""
Set value (may be any type: bool, int or string) of any dps index.
Args:
dp_index(int): dps index to set
value: new value for the dps index
"""
return await self.exchange(CONTROL, {str(dp_index): value})
async def set_dps(self, dps):
"""Set values for a set of datapoints."""
return await self.exchange(CONTROL, dps)
async def detect_available_dps(self):
"""Return which datapoints are supported by the device."""
# type_0d devices need a sort of bruteforce querying in order to detect the
# list of available dps experience shows that the dps available are usually
# in the ranges [1-25] and [100-110] need to split the bruteforcing in
# different steps due to request payload limitation (max. length = 255)
self.dps_cache = {}
ranges = [(2, 11), (11, 21), (21, 31), (100, 111)]
for dps_range in ranges:
# dps 1 must always be sent, otherwise it might fail in case no dps is found
# in the requested range
self.dps_to_request = {"1": None}
self.add_dps_to_request(range(*dps_range))
try:
data = await self.status()
except Exception as ex:
self.exception("Failed to get status: %s", ex)
raise
if "dps" in data:
self.dps_cache.update(data["dps"])
if self.dev_type == "type_0a":
return self.dps_cache
self.debug("Detected dps: %s", self.dps_cache)
return self.dps_cache
def add_dps_to_request(self, dp_indicies):
"""Add a datapoint (DP) to be included in requests."""
if isinstance(dp_indicies, int):
self.dps_to_request[str(dp_indicies)] = None
else:
self.dps_to_request.update({str(index): None for index in dp_indicies})
def _decode_payload(self, payload):
cipher = AESCipher(self.local_key)
if self.version == 3.4:
# 3.4 devices encrypt the version header in addition to the payload
try:
# self.debug("decrypting=%r", payload)
payload = cipher.decrypt(payload, False, decode_text=False)
except:
self.debug("incomplete payload=%r (len:%d)", payload, len(payload))
return self.error_json(ERR_PAYLOAD)
# self.debug("decrypted 3.x payload=%r", payload)
if payload.startswith(PROTOCOL_VERSION_BYTES_31):
# Received an encrypted payload
# Remove version header
payload = payload[len(PROTOCOL_VERSION_BYTES_31) :]
# Decrypt payload
# Remove 16-bytes of MD5 hexdigest of payload
payload = cipher.decrypt(payload[16:])
elif self.version >= 3.2: # 3.2 or 3.3 or 3.4
# Trim header for non-default device type
if payload.startswith( self.version_bytes ):
payload = payload[len(self.version_header) :]
# self.debug("removing 3.x=%r", payload)
elif self.dev_type == "type_0d" and (len(payload) & 0x0F) != 0:
payload = payload[len(self.version_header) :]
# self.debug("removing type_0d 3.x header=%r", payload)
if self.version != 3.4:
try:
# self.debug("decrypting=%r", payload)
payload = cipher.decrypt(payload, False)
except:
self.debug("incomplete payload=%r (len:%d)", payload, len(payload))
return self.error_json(ERR_PAYLOAD)
# self.debug("decrypted 3.x payload=%r", payload)
# Try to detect if type_0d found
if not isinstance(payload, str):
try:
payload = payload.decode()
except:
self.debug("payload was not string type and decoding failed")
return self.error_json(ERR_JSON, payload)
if "data unvalid" in payload:
self.dev_type = "type_0d"
self.debug(
"'data unvalid' error detected: switching to dev_type %r",
self.dev_type,
)
return None
elif not payload.startswith(b"{"):
self.debug("Unexpected payload=%r", payload)
return self.error_json(ERR_PAYLOAD, payload)
if not isinstance(payload, str):
payload = payload.decode()
self.debug("Deciphered data = %r", payload)
try:
json_payload = json.loads(payload)
except:
json_payload = self.error_json(ERR_JSON, payload)
# v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...}
if "dps" not in json_payload and "data" in json_payload and "dps" in json_payload['data']:
json_payload['dps'] = json_payload['data']['dps']
return json_payload
async def _negotiate_session_key(self):
self.local_nonce = b'0123456789abcdef' # not-so-random random key
self.remote_nonce = b''
self.local_key = self.real_local_key
rkey = await self.exchange_quick( MessagePayload(SESS_KEY_NEG_START, self.local_nonce), 2 )
if not rkey or type(rkey) != TuyaMessage or len(rkey.payload) < 48:
# error
self.debug("session key negotiation failed on step 1")
return False
if rkey.cmd != SESS_KEY_NEG_RESP:
self.debug("session key negotiation step 2 returned wrong command: %d", rkey.cmd)
return False
payload = rkey.payload
try:
# self.debug("decrypting %r using %r", payload, self.real_local_key)
cipher = AESCipher(self.real_local_key)
payload = cipher.decrypt(payload, False, decode_text=False)
except:
self.debug("session key step 2 decrypt failed, payload=%r (len:%d)", payload, len(payload))
return False
self.debug("decrypted session key negotiation step 2: payload=%r", payload)
if len(payload) < 48:
self.debug("session key negotiation step 2 failed, too short response")
return False
self.remote_nonce = payload[:16]
hmac_check = hmac.new(self.local_key, self.local_nonce, sha256).digest()
if hmac_check != payload[16:48]:
self.debug("session key negotiation step 2 failed HMAC check! wanted=%r but got=%r", binascii.hexlify(hmac_check), binascii.hexlify(payload[16:48]))
# self.debug("session local nonce: %r remote nonce: %r", self.local_nonce, self.remote_nonce)
rkey_hmac = hmac.new(self.local_key, self.remote_nonce, sha256).digest()
await self.exchange_quick( MessagePayload(SESS_KEY_NEG_FINISH, rkey_hmac), None )
self.local_key = bytes( [ a^b for (a,b) in zip(self.local_nonce,self.remote_nonce) ] )
# self.debug("Session nonce XOR'd: %r" % self.local_key)
cipher = AESCipher(self.real_local_key)
self.local_key = self.dispatcher.local_key = cipher.encrypt(self.local_key, False, pad=False)
self.debug("Session key negotiate success! session key: %r", self.local_key)
return True
# adds protocol header (if needed) and encrypts
def _encode_message( self, msg ):
hmac_key = None
payload = msg.payload
self.cipher = AESCipher(self.local_key)
if self.version == 3.4:
hmac_key = self.local_key
if msg.cmd not in NO_PROTOCOL_HEADER_CMDS:
# add the 3.x header
payload = self.version_header + payload
self.debug('final payload for cmd %r: %r', msg.cmd, payload)
payload = self.cipher.encrypt(payload, False)
elif self.version >= 3.2:
# expect to connect and then disconnect to set new
payload = self.cipher.encrypt(payload, False)
if msg.cmd not in NO_PROTOCOL_HEADER_CMDS:
# add the 3.x header
payload = self.version_header + payload
elif msg.cmd == CONTROL:
# need to encrypt
payload = self.cipher.encrypt(payload)
preMd5String = (
b"data="
+ payload
+ b"||lpv="
+ PROTOCOL_VERSION_BYTES_31
+ b"||"
+ self.local_key
)
m = md5()
m.update(preMd5String)
hexdigest = m.hexdigest()
# some tuya libraries strip 8: to :24
payload = (
PROTOCOL_VERSION_BYTES_31
+ hexdigest[8:][:16].encode("latin1")
+ payload
)
self.cipher = None
msg = TuyaMessage(self.seqno, msg.cmd, 0, payload, 0, True)
self.seqno += 1 # increase message sequence number
buffer = pack_message(msg,hmac_key=hmac_key)
# self.debug("payload encrypted with key %r => %r", self.local_key, binascii.hexlify(buffer))
return buffer
def _generate_payload(self, command, data=None, gwId=None, devId=None, uid=None):
"""
Generate the payload to send.
Args:
command(str): The type of command.
This is one of the entries from payload_dict
data(dict, optional): The data to be send.
This is what will be passed via the 'dps' entry
gwId(str, optional): Will be used for gwId
devId(str, optional): Will be used for devId
uid(str, optional): Will be used for uid
"""
json_data = command_override = None
if command in payload_dict[self.dev_type]:
if 'command' in payload_dict[self.dev_type][command]:
json_data = payload_dict[self.dev_type][command]['command']
if 'command_override' in payload_dict[self.dev_type][command]:
command_override = payload_dict[self.dev_type][command]['command_override']
if self.dev_type != 'type_0a':
if json_data is None and command in payload_dict['type_0a'] and 'command' in payload_dict['type_0a'][command]:
json_data = payload_dict['type_0a'][command]['command']
if command_override is None and command in payload_dict['type_0a'] and 'command_override' in payload_dict['type_0a'][command]:
command_override = payload_dict['type_0a'][command]['command_override']
if command_override is None:
command_override = command
if json_data is None:
# I have yet to see a device complain about included but unneeded attribs, but they *will*
# complain about missing attribs, so just include them all unless otherwise specified
json_data = {"gwId": "", "devId": "", "uid": "", "t": ""}
cmd_data = ""
if "gwId" in json_data:
if gwId is not None:
json_data["gwId"] = gwId
else:
json_data["gwId"] = self.id
if "devId" in json_data:
if devId is not None:
json_data["devId"] = devId
else:
json_data["devId"] = self.id
if "uid" in json_data:
if uid is not None:
json_data["uid"] = uid
else:
json_data["uid"] = self.id
if "t" in json_data:
if json_data['t'] == "int":
json_data["t"] = int(time.time())
else:
json_data["t"] = str(int(time.time()))
if data is not None:
if "dpId" in json_data:
json_data["dpId"] = data
elif "data" in json_data:
json_data["data"] = {"dps": data}
else:
json_data["dps"] = data
elif self.dev_type == "type_0d" and command == DP_QUERY:
json_data["dps"] = self.dps_to_request
if json_data == "":
payload = ""
else:
payload = json.dumps(json_data)
# if spaces are not removed device does not respond!
payload = payload.replace(" ", "").encode("utf-8")
self.debug("Sending payload: %s", payload)
return MessagePayload(command_override, payload)
def __repr__(self):
"""Return internal string representation of object."""
return self.id
async def connect(
address,
device_id,
local_key,
protocol_version,
listener=None,
port=6668,
timeout=5,
):
"""Connect to a device."""
loop = asyncio.get_running_loop()
on_connected = loop.create_future()
_, protocol = await loop.create_connection(
lambda: TuyaProtocol(
device_id,
local_key,
protocol_version,
on_connected,
listener or EmptyListener(),
),
address,
port,
)
await asyncio.wait_for(on_connected, timeout=timeout)
return protocol