Files
localtuya-modded/custom_components/localtuya/discovery.py
2020-09-27 09:48:03 +02:00

87 lines
2.3 KiB
Python

"""Discovery module for Tuya devices.
Entirely based on tuya-convert.py from tuya-convert:
https://github.com/ct-Open-Source/tuya-convert/blob/master/scripts/tuya-discovery.py
"""
import json
import asyncio
import logging
from hashlib import md5
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
_LOGGER = logging.getLogger(__name__)
UDP_KEY = md5(b"yGAdlopoPVldABfn").digest()
def decrypt_udp(message):
"""Decrypt encrypted UDP broadcasts."""
def _unpad(data):
return data[: -ord(data[len(data) - 1 :])]
cipher = Cipher(algorithms.AES(UDP_KEY), modes.ECB(), default_backend())
decryptor = cipher.decryptor()
return _unpad(decryptor.update(message) + decryptor.finalize()).decode()
class TuyaDiscovery(asyncio.DatagramProtocol):
"""Datagram handler listening for Tuya broadcast messages."""
def __init__(self, found_devices):
"""Initialize a new TuyaDiscovery instance."""
self.found_devices = found_devices
def datagram_received(self, data, addr):
"""Handle received broadcast message."""
data = data[20:-8]
try:
data = decrypt_udp(data)
except Exception:
data = data.decode()
decoded = json.loads(data)
if decoded.get("ip") not in self.found_devices:
self.found_devices[decoded.get("ip")] = decoded
_LOGGER.debug("Discovered device: %s", decoded)
async def discover(timeout, loop):
"""Discover and return Tuya devices on the network."""
found_devices = {}
def proto_factory():
return TuyaDiscovery(found_devices)
listener = loop.create_datagram_endpoint(
proto_factory, local_addr=("0.0.0.0", 6666)
)
encrypted_listener = loop.create_datagram_endpoint(
proto_factory, local_addr=("0.0.0.0", 6667)
)
listeners = await asyncio.gather(listener, encrypted_listener)
_LOGGER.debug("Listening to broadcasts on UDP port 6666 and 6667")
try:
await asyncio.sleep(timeout)
finally:
for transport, _ in listeners:
transport.close()
return found_devices
def main():
"""Run discovery and print result."""
loop = asyncio.get_event_loop()
res = loop.run_until_complete(discover(5, loop))
print(res)
if __name__ == "__main__":
main()