1179 lines
43 KiB
Python
1179 lines
43 KiB
Python
# PyTuya Module
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Python module to interface with Tuya WiFi smart devices.
|
|
|
|
Author: clach04, postlund
|
|
Maintained by: rospogrigio
|
|
|
|
For more information see https://github.com/clach04/python-tuya
|
|
|
|
Classes
|
|
TuyaInterface(dev_id, address, local_key=None)
|
|
dev_id (str): Device ID e.g. 01234567891234567890
|
|
address (str): Device Network IP Address e.g. 10.0.1.99
|
|
local_key (str, optional): The encryption key. Defaults to None.
|
|
|
|
Functions
|
|
json = status() # returns json payload
|
|
set_version(version) # 3.1 [default], 3.2, 3.3 or 3.4
|
|
detect_available_dps() # returns a list of available dps provided by the device
|
|
update_dps(dps) # sends update dps command
|
|
add_dps_to_request(dp_index) # adds dp_index to the list of dps used by the
|
|
# device (to be queried in the payload)
|
|
set_dp(on, dp_index) # Set value of any dps index.
|
|
|
|
|
|
Credits
|
|
* TuyaAPI https://github.com/codetheweb/tuyapi by codetheweb and blackrozes
|
|
For protocol reverse engineering
|
|
* PyTuya https://github.com/clach04/python-tuya by clach04
|
|
The origin of this python module (now abandoned)
|
|
* Tuya Protocol 3.4 Support by uzlonewolf
|
|
Enhancement to TuyaMessage logic for multi-payload messages and Tuya Protocol 3.4 support
|
|
* TinyTuya https://github.com/jasonacox/tinytuya by jasonacox
|
|
Several CLI tools and code for Tuya devices
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import binascii
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import struct
|
|
import time
|
|
import weakref
|
|
from abc import ABC, abstractmethod
|
|
from collections import namedtuple
|
|
from hashlib import md5, sha256
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
|
|
version_tuple = (10, 0, 0)
|
|
version = version_string = __version__ = "%d.%d.%d" % version_tuple
|
|
__author__ = "rospogrigio"
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
# Tuya Packet Format
|
|
TuyaHeader = namedtuple("TuyaHeader", "prefix seqno cmd length")
|
|
MessagePayload = namedtuple("MessagePayload", "cmd payload")
|
|
try:
|
|
TuyaMessage = namedtuple(
|
|
"TuyaMessage", "seqno cmd retcode payload crc crc_good", defaults=(True,)
|
|
)
|
|
except Exception:
|
|
TuyaMessage = namedtuple("TuyaMessage", "seqno cmd retcode payload crc crc_good")
|
|
|
|
# TinyTuya Error Response Codes
|
|
ERR_JSON = 900
|
|
ERR_CONNECT = 901
|
|
ERR_TIMEOUT = 902
|
|
ERR_RANGE = 903
|
|
ERR_PAYLOAD = 904
|
|
ERR_OFFLINE = 905
|
|
ERR_STATE = 906
|
|
ERR_FUNCTION = 907
|
|
ERR_DEVTYPE = 908
|
|
ERR_CLOUDKEY = 909
|
|
ERR_CLOUDRESP = 910
|
|
ERR_CLOUDTOKEN = 911
|
|
ERR_PARAMS = 912
|
|
ERR_CLOUD = 913
|
|
|
|
error_codes = {
|
|
ERR_JSON: "Invalid JSON Response from Device",
|
|
ERR_CONNECT: "Network Error: Unable to Connect",
|
|
ERR_TIMEOUT: "Timeout Waiting for Device",
|
|
ERR_RANGE: "Specified Value Out of Range",
|
|
ERR_PAYLOAD: "Unexpected Payload from Device",
|
|
ERR_OFFLINE: "Network Error: Device Unreachable",
|
|
ERR_STATE: "Device in Unknown State",
|
|
ERR_FUNCTION: "Function Not Supported by Device",
|
|
ERR_DEVTYPE: "Device22 Detected: Retry Command",
|
|
ERR_CLOUDKEY: "Missing Tuya Cloud Key and Secret",
|
|
ERR_CLOUDRESP: "Invalid JSON Response from Cloud",
|
|
ERR_CLOUDTOKEN: "Unable to Get Cloud Token",
|
|
ERR_PARAMS: "Missing Function Parameters",
|
|
ERR_CLOUD: "Error Response from Tuya Cloud",
|
|
None: "Unknown Error",
|
|
}
|
|
|
|
|
|
class DecodeError(Exception):
|
|
"""Specific Exception caused by decoding error."""
|
|
|
|
pass
|
|
|
|
|
|
# Tuya Command Types
|
|
# Reference:
|
|
# https://github.com/tuya/tuya-iotos-embeded-sdk-wifi-ble-bk7231n/blob/master/sdk/include/lan_protocol.h
|
|
AP_CONFIG = 0x01 # FRM_TP_CFG_WF # only used for ap 3.0 network config
|
|
ACTIVE = 0x02 # FRM_TP_ACTV (discard) # WORK_MODE_CMD
|
|
SESS_KEY_NEG_START = 0x03 # FRM_SECURITY_TYPE3 # negotiate session key
|
|
SESS_KEY_NEG_RESP = 0x04 # FRM_SECURITY_TYPE4 # negotiate session key response
|
|
SESS_KEY_NEG_FINISH = 0x05 # FRM_SECURITY_TYPE5 # finalize session key negotiation
|
|
UNBIND = 0x06 # FRM_TP_UNBIND_DEV # DATA_QUERT_CMD - issue command
|
|
CONTROL = 0x07 # FRM_TP_CMD # STATE_UPLOAD_CMD
|
|
STATUS = 0x08 # FRM_TP_STAT_REPORT # STATE_QUERY_CMD
|
|
HEART_BEAT = 0x09 # FRM_TP_HB
|
|
DP_QUERY = 0x0A # 10 # FRM_QUERY_STAT # UPDATE_START_CMD - get data points
|
|
QUERY_WIFI = 0x0B # 11 # FRM_SSID_QUERY (discard) # UPDATE_TRANS_CMD
|
|
TOKEN_BIND = 0x0C # 12 # FRM_USER_BIND_REQ # GET_ONLINE_TIME_CMD - system time (GMT)
|
|
CONTROL_NEW = 0x0D # 13 # FRM_TP_NEW_CMD # FACTORY_MODE_CMD
|
|
ENABLE_WIFI = 0x0E # 14 # FRM_ADD_SUB_DEV_CMD # WIFI_TEST_CMD
|
|
WIFI_INFO = 0x0F # 15 # FRM_CFG_WIFI_INFO
|
|
DP_QUERY_NEW = 0x10 # 16 # FRM_QUERY_STAT_NEW
|
|
SCENE_EXECUTE = 0x11 # 17 # FRM_SCENE_EXEC
|
|
UPDATEDPS = 0x12 # 18 # FRM_LAN_QUERY_DP # Request refresh of DPS
|
|
UDP_NEW = 0x13 # 19 # FR_TYPE_ENCRYPTION
|
|
AP_CONFIG_NEW = 0x14 # 20 # FRM_AP_CFG_WF_V40
|
|
BOARDCAST_LPV34 = 0x23 # 35 # FR_TYPE_BOARDCAST_LPV34
|
|
LAN_EXT_STREAM = 0x40 # 64 # FRM_LAN_EXT_STREAM
|
|
|
|
|
|
PROTOCOL_VERSION_BYTES_31 = b"3.1"
|
|
PROTOCOL_VERSION_BYTES_33 = b"3.3"
|
|
PROTOCOL_VERSION_BYTES_34 = b"3.4"
|
|
|
|
PROTOCOL_3x_HEADER = 12 * b"\x00"
|
|
PROTOCOL_33_HEADER = PROTOCOL_VERSION_BYTES_33 + PROTOCOL_3x_HEADER
|
|
PROTOCOL_34_HEADER = PROTOCOL_VERSION_BYTES_34 + PROTOCOL_3x_HEADER
|
|
MESSAGE_HEADER_FMT = ">4I" # 4*uint32: prefix, seqno, cmd, length [, retcode]
|
|
MESSAGE_RECV_HEADER_FMT = ">5I" # 4*uint32: prefix, seqno, cmd, length, retcode
|
|
MESSAGE_RETCODE_FMT = ">I" # retcode for received messages
|
|
MESSAGE_END_FMT = ">2I" # 2*uint32: crc, suffix
|
|
MESSAGE_END_FMT_HMAC = ">32sI" # 32s:hmac, uint32:suffix
|
|
PREFIX_VALUE = 0x000055AA
|
|
PREFIX_BIN = b"\x00\x00U\xaa"
|
|
SUFFIX_VALUE = 0x0000AA55
|
|
SUFFIX_BIN = b"\x00\x00\xaaU"
|
|
NO_PROTOCOL_HEADER_CMDS = [
|
|
DP_QUERY,
|
|
DP_QUERY_NEW,
|
|
UPDATEDPS,
|
|
HEART_BEAT,
|
|
SESS_KEY_NEG_START,
|
|
SESS_KEY_NEG_RESP,
|
|
SESS_KEY_NEG_FINISH,
|
|
]
|
|
|
|
HEARTBEAT_INTERVAL = 10
|
|
|
|
# DPS that are known to be safe to use with update_dps (0x12) command
|
|
UPDATE_DPS_WHITELIST = [18, 19, 20] # Socket (Wi-Fi)
|
|
|
|
# Tuya Device Dictionary - Command and Payload Overrides
|
|
# This is intended to match requests.json payload at
|
|
# https://github.com/codetheweb/tuyapi :
|
|
# 'type_0a' devices require the 0a command for the DP_QUERY request
|
|
# 'type_0d' devices require the 0d command for the DP_QUERY request and a list of
|
|
# dps used set to Null in the request payload
|
|
# prefix: # Next byte is command byte ("hexByte") some zero padding, then length
|
|
# of remaining payload, i.e. command + suffix (unclear if multiple bytes used for
|
|
# length, zero padding implies could be more than one byte)
|
|
|
|
# Any command not defined in payload_dict will be sent as-is with a
|
|
# payload of {"gwId": "", "devId": "", "uid": "", "t": ""}
|
|
|
|
payload_dict = {
|
|
# Default Device
|
|
"type_0a": {
|
|
AP_CONFIG: { # [BETA] Set Control Values on Device
|
|
"command": {"gwId": "", "devId": "", "uid": "", "t": ""},
|
|
},
|
|
CONTROL: { # Set Control Values on Device
|
|
"command": {"devId": "", "uid": "", "t": ""},
|
|
},
|
|
STATUS: { # Get Status from Device
|
|
"command": {"gwId": "", "devId": ""},
|
|
},
|
|
HEART_BEAT: {"command": {"gwId": "", "devId": ""}},
|
|
DP_QUERY: { # Get Data Points from Device
|
|
"command": {"gwId": "", "devId": "", "uid": "", "t": ""},
|
|
},
|
|
CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": ""}},
|
|
DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": ""}},
|
|
UPDATEDPS: {"command": {"dpId": [18, 19, 20]}},
|
|
},
|
|
# Special Case Device "0d" - Some of these devices
|
|
# Require the 0d command as the DP_QUERY status request and the list of
|
|
# dps requested payload
|
|
"type_0d": {
|
|
DP_QUERY: { # Get Data Points from Device
|
|
"command_override": CONTROL_NEW, # Uses CONTROL_NEW command for some reason
|
|
"command": {"devId": "", "uid": "", "t": ""},
|
|
},
|
|
},
|
|
"v3.4": {
|
|
CONTROL: {
|
|
"command_override": CONTROL_NEW, # Uses CONTROL_NEW command
|
|
"command": {"protocol": 5, "t": "int", "data": ""},
|
|
},
|
|
DP_QUERY: {"command_override": DP_QUERY_NEW},
|
|
},
|
|
}
|
|
|
|
|
|
class TuyaLoggingAdapter(logging.LoggerAdapter):
|
|
"""Adapter that adds device id to all log points."""
|
|
|
|
def process(self, msg, kwargs):
|
|
"""Process log point and return output."""
|
|
dev_id = self.extra["device_id"]
|
|
return f"[{dev_id[0:3]}...{dev_id[-3:]}] {msg}", kwargs
|
|
|
|
|
|
class ContextualLogger:
|
|
"""Contextual logger adding device id to log points."""
|
|
|
|
def __init__(self):
|
|
"""Initialize a new ContextualLogger."""
|
|
self._logger = None
|
|
self._enable_debug = False
|
|
|
|
def set_logger(self, logger, device_id, enable_debug=False):
|
|
"""Set base logger to use."""
|
|
self._enable_debug = enable_debug
|
|
self._logger = TuyaLoggingAdapter(logger, {"device_id": device_id})
|
|
|
|
def debug(self, msg, *args):
|
|
"""Debug level log."""
|
|
if not self._enable_debug:
|
|
return
|
|
return self._logger.log(logging.DEBUG, msg, *args)
|
|
|
|
def info(self, msg, *args):
|
|
"""Info level log."""
|
|
return self._logger.log(logging.INFO, msg, *args)
|
|
|
|
def warning(self, msg, *args):
|
|
"""Warning method log."""
|
|
return self._logger.log(logging.WARNING, msg, *args)
|
|
|
|
def error(self, msg, *args):
|
|
"""Error level log."""
|
|
return self._logger.log(logging.ERROR, msg, *args)
|
|
|
|
def exception(self, msg, *args):
|
|
"""Exception level log."""
|
|
return self._logger.exception(msg, *args)
|
|
|
|
|
|
def pack_message(msg, hmac_key=None):
|
|
"""Pack a TuyaMessage into bytes."""
|
|
end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT
|
|
# Create full message excluding CRC and suffix
|
|
buffer = (
|
|
struct.pack(
|
|
MESSAGE_HEADER_FMT,
|
|
PREFIX_VALUE,
|
|
msg.seqno,
|
|
msg.cmd,
|
|
len(msg.payload) + struct.calcsize(end_fmt),
|
|
)
|
|
+ msg.payload
|
|
)
|
|
if hmac_key:
|
|
crc = hmac.new(hmac_key, buffer, sha256).digest()
|
|
else:
|
|
crc = binascii.crc32(buffer) & 0xFFFFFFFF
|
|
# Calculate CRC, add it together with suffix
|
|
buffer += struct.pack(end_fmt, crc, SUFFIX_VALUE)
|
|
return buffer
|
|
|
|
|
|
def unpack_message(data, hmac_key=None, header=None, no_retcode=False, logger=None):
|
|
"""Unpack bytes into a TuyaMessage."""
|
|
end_fmt = MESSAGE_END_FMT_HMAC if hmac_key else MESSAGE_END_FMT
|
|
# 4-word header plus return code
|
|
header_len = struct.calcsize(MESSAGE_HEADER_FMT)
|
|
retcode_len = 0 if no_retcode else struct.calcsize(MESSAGE_RETCODE_FMT)
|
|
end_len = struct.calcsize(end_fmt)
|
|
headret_len = header_len + retcode_len
|
|
|
|
if len(data) < headret_len + end_len:
|
|
logger.debug(
|
|
"unpack_message(): not enough data to unpack header! need %d but only have %d",
|
|
headret_len + end_len,
|
|
len(data),
|
|
)
|
|
raise DecodeError("Not enough data to unpack header")
|
|
|
|
if header is None:
|
|
header = parse_header(data)
|
|
|
|
if len(data) < header_len + header.length:
|
|
logger.debug(
|
|
"unpack_message(): not enough data to unpack payload! need %d but only have %d",
|
|
header_len + header.length,
|
|
len(data),
|
|
)
|
|
raise DecodeError("Not enough data to unpack payload")
|
|
|
|
retcode = (
|
|
0
|
|
if no_retcode
|
|
else struct.unpack(MESSAGE_RETCODE_FMT, data[header_len:headret_len])[0]
|
|
)
|
|
# the retcode is technically part of the payload, but strip it as we do not want it here
|
|
payload = data[header_len + retcode_len : header_len + header.length]
|
|
crc, suffix = struct.unpack(end_fmt, payload[-end_len:])
|
|
|
|
if hmac_key:
|
|
have_crc = hmac.new(
|
|
hmac_key, data[: (header_len + header.length) - end_len], sha256
|
|
).digest()
|
|
else:
|
|
have_crc = (
|
|
binascii.crc32(data[: (header_len + header.length) - end_len]) & 0xFFFFFFFF
|
|
)
|
|
|
|
if suffix != SUFFIX_VALUE:
|
|
logger.debug("Suffix prefix wrong! %08X != %08X", suffix, SUFFIX_VALUE)
|
|
|
|
if crc != have_crc:
|
|
if hmac_key:
|
|
logger.debug(
|
|
"HMAC checksum wrong! %r != %r",
|
|
binascii.hexlify(have_crc),
|
|
binascii.hexlify(crc),
|
|
)
|
|
else:
|
|
logger.debug("CRC wrong! %08X != %08X", have_crc, crc)
|
|
|
|
return TuyaMessage(
|
|
header.seqno, header.cmd, retcode, payload[:-end_len], crc, crc == have_crc
|
|
)
|
|
|
|
|
|
def parse_header(data):
|
|
"""Unpack bytes into a TuyaHeader."""
|
|
header_len = struct.calcsize(MESSAGE_HEADER_FMT)
|
|
|
|
if len(data) < header_len:
|
|
raise DecodeError("Not enough data to unpack header")
|
|
|
|
prefix, seqno, cmd, payload_len = struct.unpack(
|
|
MESSAGE_HEADER_FMT, data[:header_len]
|
|
)
|
|
|
|
if prefix != PREFIX_VALUE:
|
|
# self.debug('Header prefix wrong! %08X != %08X', prefix, PREFIX_VALUE)
|
|
raise DecodeError("Header prefix wrong! %08X != %08X" % (prefix, PREFIX_VALUE))
|
|
|
|
# sanity check. currently the max payload length is somewhere around 300 bytes
|
|
if payload_len > 1000:
|
|
raise DecodeError(
|
|
"Header claims the packet size is over 1000 bytes! It is most likely corrupt. Claimed size: %d bytes"
|
|
% payload_len
|
|
)
|
|
|
|
return TuyaHeader(prefix, seqno, cmd, payload_len)
|
|
|
|
|
|
class AESCipher:
|
|
"""Cipher module for Tuya communication."""
|
|
|
|
def __init__(self, key):
|
|
"""Initialize a new AESCipher."""
|
|
self.block_size = 16
|
|
self.cipher = Cipher(algorithms.AES(key), modes.ECB(), default_backend())
|
|
|
|
def encrypt(self, raw, use_base64=True, pad=True):
|
|
"""Encrypt data to be sent to device."""
|
|
encryptor = self.cipher.encryptor()
|
|
if pad:
|
|
raw = self._pad(raw)
|
|
crypted_text = encryptor.update(raw) + encryptor.finalize()
|
|
return base64.b64encode(crypted_text) if use_base64 else crypted_text
|
|
|
|
def decrypt(self, enc, use_base64=True, decode_text=True):
|
|
"""Decrypt data from device."""
|
|
if use_base64:
|
|
enc = base64.b64decode(enc)
|
|
|
|
decryptor = self.cipher.decryptor()
|
|
raw = self._unpad(decryptor.update(enc) + decryptor.finalize())
|
|
return raw.decode("utf-8") if decode_text else raw
|
|
|
|
def _pad(self, data):
|
|
padnum = self.block_size - len(data) % self.block_size
|
|
return data + padnum * chr(padnum).encode()
|
|
|
|
@staticmethod
|
|
def _unpad(data):
|
|
return data[: -ord(data[len(data) - 1 :])]
|
|
|
|
|
|
class MessageDispatcher(ContextualLogger):
|
|
"""Buffer and dispatcher for Tuya messages."""
|
|
|
|
# Heartbeats on protocols < 3.3 respond with sequence number 0,
|
|
# so they can't be waited for like other messages.
|
|
# This is a hack to allow waiting for heartbeats.
|
|
HEARTBEAT_SEQNO = -100
|
|
RESET_SEQNO = -101
|
|
SESS_KEY_SEQNO = -102
|
|
|
|
def __init__(self, dev_id, listener, protocol_version, local_key, enable_debug):
|
|
"""Initialize a new MessageBuffer."""
|
|
super().__init__()
|
|
self.buffer = b""
|
|
self.listeners = {}
|
|
self.listener = listener
|
|
self.version = protocol_version
|
|
self.local_key = local_key
|
|
self.set_logger(_LOGGER, dev_id, enable_debug)
|
|
|
|
def abort(self):
|
|
"""Abort all waiting clients."""
|
|
for key in self.listeners:
|
|
sem = self.listeners[key]
|
|
self.listeners[key] = None
|
|
|
|
# TODO: Received data and semahore should be stored separately
|
|
if isinstance(sem, asyncio.Semaphore):
|
|
sem.release()
|
|
|
|
async def wait_for(self, seqno, cmd, timeout=5):
|
|
"""Wait for response to a sequence number to be received and return it."""
|
|
if seqno in self.listeners:
|
|
raise Exception(f"listener exists for {seqno}")
|
|
|
|
self.debug("Command %d waiting for seq. number %d", cmd, seqno)
|
|
self.listeners[seqno] = asyncio.Semaphore(0)
|
|
try:
|
|
await asyncio.wait_for(self.listeners[seqno].acquire(), timeout=timeout)
|
|
except asyncio.TimeoutError:
|
|
self.warning("Command %d timed out waiting for sequence number %d", cmd, seqno)
|
|
del self.listeners[seqno]
|
|
raise
|
|
|
|
return self.listeners.pop(seqno)
|
|
|
|
def add_data(self, data):
|
|
"""Add new data to the buffer and try to parse messages."""
|
|
self.buffer += data
|
|
header_len = struct.calcsize(MESSAGE_RECV_HEADER_FMT)
|
|
|
|
while self.buffer:
|
|
# Check if enough data for measage header
|
|
if len(self.buffer) < header_len:
|
|
break
|
|
|
|
header = parse_header(self.buffer)
|
|
hmac_key = self.local_key if self.version == 3.4 else None
|
|
msg = unpack_message(
|
|
self.buffer, header=header, hmac_key=hmac_key, logger=self
|
|
)
|
|
self.buffer = self.buffer[header_len - 4 + header.length :]
|
|
self._dispatch(msg)
|
|
|
|
def _dispatch(self, msg):
|
|
"""Dispatch a message to someone that is listening."""
|
|
self.debug("Dispatching message CMD %r %s", msg.cmd, msg)
|
|
if msg.seqno in self.listeners:
|
|
# self.debug("Dispatching sequence number %d", msg.seqno)
|
|
sem = self.listeners[msg.seqno]
|
|
self.listeners[msg.seqno] = msg
|
|
sem.release()
|
|
elif msg.cmd == HEART_BEAT:
|
|
self.debug("Got heartbeat response")
|
|
if self.HEARTBEAT_SEQNO in self.listeners:
|
|
sem = self.listeners[self.HEARTBEAT_SEQNO]
|
|
self.listeners[self.HEARTBEAT_SEQNO] = msg
|
|
sem.release()
|
|
elif msg.cmd == UPDATEDPS:
|
|
self.debug("Got normal updatedps response")
|
|
if self.RESET_SEQNO in self.listeners:
|
|
sem = self.listeners[self.RESET_SEQNO]
|
|
self.listeners[self.RESET_SEQNO] = msg
|
|
sem.release()
|
|
elif msg.cmd == SESS_KEY_NEG_RESP:
|
|
self.debug("Got key negotiation response")
|
|
if self.SESS_KEY_SEQNO in self.listeners:
|
|
sem = self.listeners[self.SESS_KEY_SEQNO]
|
|
self.listeners[self.SESS_KEY_SEQNO] = msg
|
|
sem.release()
|
|
elif msg.cmd == STATUS:
|
|
if self.RESET_SEQNO in self.listeners:
|
|
self.debug("Got reset status update")
|
|
sem = self.listeners[self.RESET_SEQNO]
|
|
self.listeners[self.RESET_SEQNO] = msg
|
|
sem.release()
|
|
else:
|
|
self.debug("Got status update")
|
|
self.listener(msg)
|
|
else:
|
|
if msg.cmd == CONTROL_NEW:
|
|
self.debug("Got ACK message for command %d: will ignore it", msg.cmd)
|
|
else:
|
|
self.debug(
|
|
"Got message type %d for unknown listener %d: %s",
|
|
msg.cmd,
|
|
msg.seqno,
|
|
msg,
|
|
)
|
|
|
|
|
|
class TuyaListener(ABC):
|
|
"""Listener interface for Tuya device changes."""
|
|
|
|
@abstractmethod
|
|
def status_updated(self, status):
|
|
"""Device updated status."""
|
|
|
|
@abstractmethod
|
|
def disconnected(self):
|
|
"""Device disconnected."""
|
|
|
|
|
|
class EmptyListener(TuyaListener):
|
|
"""Listener doing nothing."""
|
|
|
|
def status_updated(self, status):
|
|
"""Device updated status."""
|
|
|
|
def disconnected(self):
|
|
"""Device disconnected."""
|
|
|
|
|
|
class TuyaProtocol(asyncio.Protocol, ContextualLogger):
|
|
"""Implementation of the Tuya protocol."""
|
|
|
|
def __init__(
|
|
self, dev_id, local_key, protocol_version, enable_debug, on_connected, listener
|
|
):
|
|
"""
|
|
Initialize a new TuyaInterface.
|
|
|
|
Args:
|
|
dev_id (str): The device id.
|
|
address (str): The network address.
|
|
local_key (str, optional): The encryption key. Defaults to None.
|
|
|
|
Attributes:
|
|
port (int): The port to connect to.
|
|
"""
|
|
super().__init__()
|
|
self.loop = asyncio.get_running_loop()
|
|
self.set_logger(_LOGGER, dev_id, enable_debug)
|
|
self.id = dev_id
|
|
self.local_key = local_key.encode("latin1")
|
|
self.real_local_key = self.local_key
|
|
self.dev_type = "type_0a"
|
|
self.dps_to_request = {}
|
|
|
|
if protocol_version:
|
|
self.set_version(float(protocol_version))
|
|
else:
|
|
# make sure we call our set_version() and not a subclass since some of
|
|
# them (such as BulbDevice) make connections when called
|
|
TuyaProtocol.set_version(self, 3.1)
|
|
|
|
self.cipher = AESCipher(self.local_key)
|
|
self.seqno = 1
|
|
self.transport = None
|
|
self.listener = weakref.ref(listener)
|
|
self.dispatcher = self._setup_dispatcher(enable_debug)
|
|
self.on_connected = on_connected
|
|
self.heartbeater = None
|
|
self.dps_cache = {}
|
|
self.local_nonce = b"0123456789abcdef" # not-so-random random key
|
|
self.remote_nonce = b""
|
|
|
|
def set_version(self, protocol_version):
|
|
"""Set the device version and eventually start available DPs detection."""
|
|
self.version = protocol_version
|
|
self.version_bytes = str(protocol_version).encode("latin1")
|
|
self.version_header = self.version_bytes + PROTOCOL_3x_HEADER
|
|
if protocol_version == 3.2: # 3.2 behaves like 3.3 with type_0d
|
|
# self.version = 3.3
|
|
self.dev_type = "type_0d"
|
|
elif protocol_version == 3.4:
|
|
self.dev_type = "v3.4"
|
|
|
|
def error_json(self, number=None, payload=None):
|
|
"""Return error details in JSON."""
|
|
try:
|
|
spayload = json.dumps(payload)
|
|
# spayload = payload.replace('\"','').replace('\'','')
|
|
except Exception:
|
|
spayload = '""'
|
|
|
|
vals = (error_codes[number], str(number), spayload)
|
|
self.debug("ERROR %s - %s - payload: %s", *vals)
|
|
|
|
return json.loads('{ "Error":"%s", "Err":"%s", "Payload":%s }' % vals)
|
|
|
|
def _setup_dispatcher(self, enable_debug):
|
|
def _status_update(msg):
|
|
if msg.seqno > 0:
|
|
self.seqno = msg.seqno + 1
|
|
decoded_message = self._decode_payload(msg.payload)
|
|
if "dps" in decoded_message:
|
|
self.dps_cache.update(decoded_message["dps"])
|
|
|
|
listener = self.listener and self.listener()
|
|
if listener is not None:
|
|
listener.status_updated(self.dps_cache)
|
|
|
|
return MessageDispatcher(
|
|
self.id, _status_update, self.version, self.local_key, enable_debug
|
|
)
|
|
|
|
def connection_made(self, transport):
|
|
"""Did connect to the device."""
|
|
self.transport = transport
|
|
self.on_connected.set_result(True)
|
|
|
|
def start_heartbeat(self):
|
|
"""Start the heartbeat transmissions with the device."""
|
|
|
|
async def heartbeat_loop():
|
|
"""Continuously send heart beat updates."""
|
|
self.debug("Started heartbeat loop")
|
|
while True:
|
|
try:
|
|
await self.heartbeat()
|
|
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
|
except asyncio.CancelledError:
|
|
self.debug("Stopped heartbeat loop")
|
|
raise
|
|
except asyncio.TimeoutError:
|
|
self.debug("Heartbeat failed due to timeout, disconnecting")
|
|
break
|
|
except Exception as ex: # pylint: disable=broad-except
|
|
self.exception("Heartbeat failed (%s), disconnecting", ex)
|
|
break
|
|
|
|
transport = self.transport
|
|
self.transport = None
|
|
transport.close()
|
|
|
|
self.heartbeater = self.loop.create_task(heartbeat_loop())
|
|
|
|
def data_received(self, data):
|
|
"""Received data from device."""
|
|
# self.debug("received data=%r", binascii.hexlify(data))
|
|
self.dispatcher.add_data(data)
|
|
|
|
def connection_lost(self, exc):
|
|
"""Disconnected from device."""
|
|
self.debug("Connection lost: %s", exc)
|
|
self.real_local_key = self.local_key
|
|
try:
|
|
listener = self.listener and self.listener()
|
|
if listener is not None:
|
|
listener.disconnected()
|
|
except Exception: # pylint: disable=broad-except
|
|
self.exception("Failed to call disconnected callback")
|
|
|
|
async def close(self):
|
|
"""Close connection and abort all outstanding listeners."""
|
|
self.debug("Closing connection")
|
|
self.real_local_key = self.local_key
|
|
if self.heartbeater is not None:
|
|
self.heartbeater.cancel()
|
|
try:
|
|
await self.heartbeater
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self.heartbeater = None
|
|
if self.dispatcher is not None:
|
|
self.dispatcher.abort()
|
|
self.dispatcher = None
|
|
if self.transport is not None:
|
|
transport = self.transport
|
|
self.transport = None
|
|
transport.close()
|
|
|
|
async def exchange_quick(self, payload, recv_retries):
|
|
"""Similar to exchange() but never retries sending and does not decode the response."""
|
|
if not self.transport:
|
|
self.debug(
|
|
"[" + self.id + "] send quick failed, could not get socket: %s", payload
|
|
)
|
|
return None
|
|
enc_payload = (
|
|
self._encode_message(payload)
|
|
if isinstance(payload, MessagePayload)
|
|
else payload
|
|
)
|
|
# self.debug("Quick-dispatching message %s, seqno %s", binascii.hexlify(enc_payload), self.seqno)
|
|
|
|
try:
|
|
self.transport.write(enc_payload)
|
|
except Exception:
|
|
# self._check_socket_close(True)
|
|
self.close()
|
|
return None
|
|
while recv_retries:
|
|
try:
|
|
seqno = MessageDispatcher.SESS_KEY_SEQNO
|
|
msg = await self.dispatcher.wait_for(seqno, payload.cmd)
|
|
# for 3.4 devices, we get the starting seqno with the SESS_KEY_NEG_RESP message
|
|
self.seqno = msg.seqno
|
|
except Exception:
|
|
msg = None
|
|
if msg and len(msg.payload) != 0:
|
|
return msg
|
|
recv_retries -= 1
|
|
if recv_retries == 0:
|
|
self.debug(
|
|
"received null payload (%r) but out of recv retries, giving up", msg
|
|
)
|
|
else:
|
|
self.debug(
|
|
"received null payload (%r), fetch new one - %s retries remaining",
|
|
msg,
|
|
recv_retries,
|
|
)
|
|
return None
|
|
|
|
async def exchange(self, command, dps=None):
|
|
"""Send and receive a message, returning response from device."""
|
|
if self.version == 3.4 and self.real_local_key == self.local_key:
|
|
self.debug("3.4 device: negotiating a new session key")
|
|
await self._negotiate_session_key()
|
|
|
|
self.debug(
|
|
"Sending command %s (device type: %s)",
|
|
command,
|
|
self.dev_type,
|
|
)
|
|
payload = self._generate_payload(command, dps)
|
|
real_cmd = payload.cmd
|
|
dev_type = self.dev_type
|
|
# self.debug("Exchange: payload %r %r", payload.cmd, payload.payload)
|
|
|
|
# Wait for special sequence number if heartbeat or reset
|
|
seqno = self.seqno
|
|
|
|
if payload.cmd == HEART_BEAT:
|
|
seqno = MessageDispatcher.HEARTBEAT_SEQNO
|
|
elif payload.cmd == UPDATEDPS:
|
|
seqno = MessageDispatcher.RESET_SEQNO
|
|
|
|
enc_payload = self._encode_message(payload)
|
|
self.transport.write(enc_payload)
|
|
msg = await self.dispatcher.wait_for(seqno, payload.cmd)
|
|
if msg is None:
|
|
self.debug("Wait was aborted for seqno %d", seqno)
|
|
return None
|
|
|
|
# TODO: Verify stuff, e.g. CRC sequence number?
|
|
if real_cmd in [HEART_BEAT, CONTROL, CONTROL_NEW] and len(msg.payload) == 0:
|
|
# device may send messages with empty payload in response
|
|
# to a HEART_BEAT or CONTROL or CONTROL_NEW command: consider them an ACK
|
|
self.debug("ACK received for command %d: ignoring it", real_cmd)
|
|
return None
|
|
payload = self._decode_payload(msg.payload)
|
|
|
|
# Perform a new exchange (once) if we switched device type
|
|
if dev_type != self.dev_type:
|
|
self.debug(
|
|
"Re-send %s due to device type change (%s -> %s)",
|
|
command,
|
|
dev_type,
|
|
self.dev_type,
|
|
)
|
|
return await self.exchange(command, dps)
|
|
return payload
|
|
|
|
async def status(self):
|
|
"""Return device status."""
|
|
status = await self.exchange(DP_QUERY)
|
|
if status and "dps" in status:
|
|
self.dps_cache.update(status["dps"])
|
|
return self.dps_cache
|
|
|
|
async def heartbeat(self):
|
|
"""Send a heartbeat message."""
|
|
return await self.exchange(HEART_BEAT)
|
|
|
|
async def reset(self, dpIds=None):
|
|
"""Send a reset message (3.3 only)."""
|
|
if self.version == 3.3:
|
|
self.dev_type = "type_0a"
|
|
self.debug("reset switching to dev_type %s", self.dev_type)
|
|
return await self.exchange(UPDATEDPS, dpIds)
|
|
|
|
return True
|
|
|
|
async def update_dps(self, dps=None):
|
|
"""
|
|
Request device to update index.
|
|
|
|
Args:
|
|
dps([int]): list of dps to update, default=detected&whitelisted
|
|
"""
|
|
if self.version in [3.2, 3.3]: # 3.2 behaves like 3.3 with type_0d
|
|
if dps is None:
|
|
if not self.dps_cache:
|
|
await self.detect_available_dps()
|
|
if self.dps_cache:
|
|
dps = [int(dp) for dp in self.dps_cache]
|
|
# filter non whitelisted dps
|
|
dps = list(set(dps).intersection(set(UPDATE_DPS_WHITELIST)))
|
|
self.debug("updatedps() entry (dps %s, dps_cache %s)", dps, self.dps_cache)
|
|
payload = self._generate_payload(UPDATEDPS, dps)
|
|
enc_payload = self._encode_message(payload)
|
|
self.transport.write(enc_payload)
|
|
return True
|
|
|
|
async def set_dp(self, value, dp_index):
|
|
"""
|
|
Set value (may be any type: bool, int or string) of any dps index.
|
|
|
|
Args:
|
|
dp_index(int): dps index to set
|
|
value: new value for the dps index
|
|
"""
|
|
return await self.exchange(CONTROL, {str(dp_index): value})
|
|
|
|
async def set_dps(self, dps):
|
|
"""Set values for a set of datapoints."""
|
|
return await self.exchange(CONTROL, dps)
|
|
|
|
async def detect_available_dps(self):
|
|
"""Return which datapoints are supported by the device."""
|
|
# type_0d devices need a sort of bruteforce querying in order to detect the
|
|
# list of available dps experience shows that the dps available are usually
|
|
# in the ranges [1-25] and [100-110] need to split the bruteforcing in
|
|
# different steps due to request payload limitation (max. length = 255)
|
|
self.dps_cache = {}
|
|
ranges = [(2, 11), (11, 21), (21, 31), (100, 111)]
|
|
|
|
for dps_range in ranges:
|
|
# dps 1 must always be sent, otherwise it might fail in case no dps is found
|
|
# in the requested range
|
|
self.dps_to_request = {"1": None}
|
|
self.add_dps_to_request(range(*dps_range))
|
|
try:
|
|
data = await self.status()
|
|
except Exception as ex:
|
|
self.exception("Failed to get status: %s", ex)
|
|
raise
|
|
if "dps" in data:
|
|
self.dps_cache.update(data["dps"])
|
|
|
|
if self.dev_type == "type_0a":
|
|
return self.dps_cache
|
|
self.debug("Detected dps: %s", self.dps_cache)
|
|
return self.dps_cache
|
|
|
|
def add_dps_to_request(self, dp_indicies):
|
|
"""Add a datapoint (DP) to be included in requests."""
|
|
if isinstance(dp_indicies, int):
|
|
self.dps_to_request[str(dp_indicies)] = None
|
|
else:
|
|
self.dps_to_request.update({str(index): None for index in dp_indicies})
|
|
|
|
def _decode_payload(self, payload):
|
|
cipher = AESCipher(self.local_key)
|
|
|
|
if self.version == 3.4:
|
|
# 3.4 devices encrypt the version header in addition to the payload
|
|
try:
|
|
# self.debug("decrypting=%r", payload)
|
|
payload = cipher.decrypt(payload, False, decode_text=False)
|
|
except Exception:
|
|
self.debug("incomplete payload=%r (len:%d)", payload, len(payload))
|
|
return self.error_json(ERR_PAYLOAD)
|
|
|
|
# self.debug("decrypted 3.x payload=%r", payload)
|
|
|
|
if payload.startswith(PROTOCOL_VERSION_BYTES_31):
|
|
# Received an encrypted payload
|
|
# Remove version header
|
|
payload = payload[len(PROTOCOL_VERSION_BYTES_31) :]
|
|
# Decrypt payload
|
|
# Remove 16-bytes of MD5 hexdigest of payload
|
|
payload = cipher.decrypt(payload[16:])
|
|
elif self.version >= 3.2: # 3.2 or 3.3 or 3.4
|
|
# Trim header for non-default device type
|
|
if payload.startswith(self.version_bytes):
|
|
payload = payload[len(self.version_header) :]
|
|
# self.debug("removing 3.x=%r", payload)
|
|
elif self.dev_type == "type_0d" and (len(payload) & 0x0F) != 0:
|
|
payload = payload[len(self.version_header) :]
|
|
# self.debug("removing type_0d 3.x header=%r", payload)
|
|
|
|
if self.version != 3.4:
|
|
try:
|
|
# self.debug("decrypting=%r", payload)
|
|
payload = cipher.decrypt(payload, False)
|
|
except Exception:
|
|
self.debug("incomplete payload=%r (len:%d)", payload, len(payload))
|
|
return self.error_json(ERR_PAYLOAD)
|
|
|
|
# self.debug("decrypted 3.x payload=%r", payload)
|
|
# Try to detect if type_0d found
|
|
|
|
if not isinstance(payload, str):
|
|
try:
|
|
payload = payload.decode()
|
|
except Exception:
|
|
self.debug("payload was not string type and decoding failed")
|
|
return self.error_json(ERR_JSON, payload)
|
|
if "data unvalid" in payload:
|
|
self.dev_type = "type_0d"
|
|
self.debug(
|
|
"'data unvalid' error detected: switching to dev_type %r",
|
|
self.dev_type,
|
|
)
|
|
return None
|
|
elif not payload.startswith(b"{"):
|
|
self.debug("Unexpected payload=%r", payload)
|
|
return self.error_json(ERR_PAYLOAD, payload)
|
|
|
|
if not isinstance(payload, str):
|
|
payload = payload.decode()
|
|
self.debug("Deciphered data = %r", payload)
|
|
try:
|
|
json_payload = json.loads(payload)
|
|
except Exception:
|
|
json_payload = self.error_json(ERR_JSON, payload)
|
|
|
|
# v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...}
|
|
if (
|
|
"dps" not in json_payload
|
|
and "data" in json_payload
|
|
and "dps" in json_payload["data"]
|
|
):
|
|
json_payload["dps"] = json_payload["data"]["dps"]
|
|
|
|
return json_payload
|
|
|
|
async def _negotiate_session_key(self):
|
|
self.local_key = self.real_local_key
|
|
|
|
rkey = await self.exchange_quick(
|
|
MessagePayload(SESS_KEY_NEG_START, self.local_nonce), 2
|
|
)
|
|
if not rkey or not isinstance(rkey, TuyaMessage) or len(rkey.payload) < 48:
|
|
# error
|
|
self.debug("session key negotiation failed on step 1")
|
|
return False
|
|
|
|
if rkey.cmd != SESS_KEY_NEG_RESP:
|
|
self.debug(
|
|
"session key negotiation step 2 returned wrong command: %d", rkey.cmd
|
|
)
|
|
return False
|
|
|
|
payload = rkey.payload
|
|
try:
|
|
# self.debug("decrypting %r using %r", payload, self.real_local_key)
|
|
cipher = AESCipher(self.real_local_key)
|
|
payload = cipher.decrypt(payload, False, decode_text=False)
|
|
except Exception:
|
|
self.debug(
|
|
"session key step 2 decrypt failed, payload=%r (len:%d)",
|
|
payload,
|
|
len(payload),
|
|
)
|
|
return False
|
|
|
|
self.debug("decrypted session key negotiation step 2: payload=%r", payload)
|
|
|
|
if len(payload) < 48:
|
|
self.debug("session key negotiation step 2 failed, too short response")
|
|
return False
|
|
|
|
self.remote_nonce = payload[:16]
|
|
hmac_check = hmac.new(self.local_key, self.local_nonce, sha256).digest()
|
|
|
|
if hmac_check != payload[16:48]:
|
|
self.debug(
|
|
"session key negotiation step 2 failed HMAC check! wanted=%r but got=%r",
|
|
binascii.hexlify(hmac_check),
|
|
binascii.hexlify(payload[16:48]),
|
|
)
|
|
|
|
# self.debug("session local nonce: %r remote nonce: %r", self.local_nonce, self.remote_nonce)
|
|
rkey_hmac = hmac.new(self.local_key, self.remote_nonce, sha256).digest()
|
|
await self.exchange_quick(MessagePayload(SESS_KEY_NEG_FINISH, rkey_hmac), None)
|
|
|
|
self.local_key = bytes(
|
|
[a ^ b for (a, b) in zip(self.local_nonce, self.remote_nonce)]
|
|
)
|
|
# self.debug("Session nonce XOR'd: %r" % self.local_key)
|
|
|
|
cipher = AESCipher(self.real_local_key)
|
|
self.local_key = self.dispatcher.local_key = cipher.encrypt(
|
|
self.local_key, False, pad=False
|
|
)
|
|
self.debug("Session key negotiate success! session key: %r", self.local_key)
|
|
return True
|
|
|
|
# adds protocol header (if needed) and encrypts
|
|
def _encode_message(self, msg):
|
|
hmac_key = None
|
|
payload = msg.payload
|
|
self.cipher = AESCipher(self.local_key)
|
|
if self.version == 3.4:
|
|
hmac_key = self.local_key
|
|
if msg.cmd not in NO_PROTOCOL_HEADER_CMDS:
|
|
# add the 3.x header
|
|
payload = self.version_header + payload
|
|
self.debug("final payload for cmd %r: %r", msg.cmd, payload)
|
|
payload = self.cipher.encrypt(payload, False)
|
|
elif self.version >= 3.2:
|
|
# expect to connect and then disconnect to set new
|
|
payload = self.cipher.encrypt(payload, False)
|
|
if msg.cmd not in NO_PROTOCOL_HEADER_CMDS:
|
|
# add the 3.x header
|
|
payload = self.version_header + payload
|
|
elif msg.cmd == CONTROL:
|
|
# need to encrypt
|
|
payload = self.cipher.encrypt(payload)
|
|
preMd5String = (
|
|
b"data="
|
|
+ payload
|
|
+ b"||lpv="
|
|
+ PROTOCOL_VERSION_BYTES_31
|
|
+ b"||"
|
|
+ self.local_key
|
|
)
|
|
m = md5()
|
|
m.update(preMd5String)
|
|
hexdigest = m.hexdigest()
|
|
# some tuya libraries strip 8: to :24
|
|
payload = (
|
|
PROTOCOL_VERSION_BYTES_31
|
|
+ hexdigest[8:][:16].encode("latin1")
|
|
+ payload
|
|
)
|
|
|
|
self.cipher = None
|
|
msg = TuyaMessage(self.seqno, msg.cmd, 0, payload, 0, True)
|
|
self.seqno += 1 # increase message sequence number
|
|
buffer = pack_message(msg, hmac_key=hmac_key)
|
|
# self.debug("payload encrypted with key %r => %r", self.local_key, binascii.hexlify(buffer))
|
|
return buffer
|
|
|
|
def _generate_payload(self, command, data=None, gwId=None, devId=None, uid=None):
|
|
"""
|
|
Generate the payload to send.
|
|
|
|
Args:
|
|
command(str): The type of command.
|
|
This is one of the entries from payload_dict
|
|
data(dict, optional): The data to be send.
|
|
This is what will be passed via the 'dps' entry
|
|
gwId(str, optional): Will be used for gwId
|
|
devId(str, optional): Will be used for devId
|
|
uid(str, optional): Will be used for uid
|
|
"""
|
|
json_data = command_override = None
|
|
|
|
if command in payload_dict[self.dev_type]:
|
|
if "command" in payload_dict[self.dev_type][command]:
|
|
json_data = payload_dict[self.dev_type][command]["command"]
|
|
if "command_override" in payload_dict[self.dev_type][command]:
|
|
command_override = payload_dict[self.dev_type][command][
|
|
"command_override"
|
|
]
|
|
|
|
if self.dev_type != "type_0a":
|
|
if (
|
|
json_data is None
|
|
and command in payload_dict["type_0a"]
|
|
and "command" in payload_dict["type_0a"][command]
|
|
):
|
|
json_data = payload_dict["type_0a"][command]["command"]
|
|
if (
|
|
command_override is None
|
|
and command in payload_dict["type_0a"]
|
|
and "command_override" in payload_dict["type_0a"][command]
|
|
):
|
|
command_override = payload_dict["type_0a"][command]["command_override"]
|
|
|
|
if command_override is None:
|
|
command_override = command
|
|
if json_data is None:
|
|
# I have yet to see a device complain about included but unneeded attribs, but they *will*
|
|
# complain about missing attribs, so just include them all unless otherwise specified
|
|
json_data = {"gwId": "", "devId": "", "uid": "", "t": ""}
|
|
|
|
if "gwId" in json_data:
|
|
if gwId is not None:
|
|
json_data["gwId"] = gwId
|
|
else:
|
|
json_data["gwId"] = self.id
|
|
if "devId" in json_data:
|
|
if devId is not None:
|
|
json_data["devId"] = devId
|
|
else:
|
|
json_data["devId"] = self.id
|
|
if "uid" in json_data:
|
|
if uid is not None:
|
|
json_data["uid"] = uid
|
|
else:
|
|
json_data["uid"] = self.id
|
|
if "t" in json_data:
|
|
if json_data["t"] == "int":
|
|
json_data["t"] = int(time.time())
|
|
else:
|
|
json_data["t"] = str(int(time.time()))
|
|
|
|
if data is not None:
|
|
if "dpId" in json_data:
|
|
json_data["dpId"] = data
|
|
elif "data" in json_data:
|
|
json_data["data"] = {"dps": data}
|
|
else:
|
|
json_data["dps"] = data
|
|
elif self.dev_type == "type_0d" and command == DP_QUERY:
|
|
json_data["dps"] = self.dps_to_request
|
|
|
|
if json_data == "":
|
|
payload = ""
|
|
else:
|
|
payload = json.dumps(json_data)
|
|
# if spaces are not removed device does not respond!
|
|
payload = payload.replace(" ", "").encode("utf-8")
|
|
self.debug("Sending payload: %s", payload)
|
|
|
|
return MessagePayload(command_override, payload)
|
|
|
|
def __repr__(self):
|
|
"""Return internal string representation of object."""
|
|
return self.id
|
|
|
|
|
|
async def connect(
|
|
address,
|
|
device_id,
|
|
local_key,
|
|
protocol_version,
|
|
enable_debug,
|
|
listener=None,
|
|
port=6668,
|
|
timeout=5,
|
|
):
|
|
"""Connect to a device."""
|
|
loop = asyncio.get_running_loop()
|
|
on_connected = loop.create_future()
|
|
_, protocol = await loop.create_connection(
|
|
lambda: TuyaProtocol(
|
|
device_id,
|
|
local_key,
|
|
protocol_version,
|
|
enable_debug,
|
|
on_connected,
|
|
listener or EmptyListener(),
|
|
),
|
|
address,
|
|
port,
|
|
)
|
|
|
|
await asyncio.wait_for(on_connected, timeout=timeout)
|
|
return protocol
|