diff --git a/custom_components/localtuya/discovery.py b/custom_components/localtuya/discovery.py index 672eec0..5a64659 100644 --- a/custom_components/localtuya/discovery.py +++ b/custom_components/localtuya/discovery.py @@ -9,7 +9,8 @@ import asyncio import logging from hashlib import md5 -from Cryptodome.Cipher import AES +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes _LOGGER = logging.getLogger(__name__) @@ -22,7 +23,9 @@ def decrypt_udp(message): def _unpad(data): return data[: -ord(data[len(data) - 1 :])] - return _unpad(AES.new(UDP_KEY, AES.MODE_ECB).decrypt(message)).decode() + cipher = Cipher(algorithms.AES(UDP_KEY), modes.ECB(), default_backend()) + decryptor = cipher.decryptor() + return _unpad(decryptor.update(message) + decryptor.finalize()).decode() class TuyaDiscovery(asyncio.DatagramProtocol): diff --git a/custom_components/localtuya/manifest.json b/custom_components/localtuya/manifest.json index e6c7e86..5710587 100644 --- a/custom_components/localtuya/manifest.json +++ b/custom_components/localtuya/manifest.json @@ -6,6 +6,6 @@ "codeowners": [ "@rospogrigio" ], - "requirements": ["pycryptodome==3.9.8"], + "requirements": ["cryptography==2.9.2"], "config_flow": true } \ No newline at end of file diff --git a/custom_components/localtuya/pytuya/__init__.py b/custom_components/localtuya/pytuya/__init__.py index 8b0ca19..0ce88a5 100644 --- a/custom_components/localtuya/pytuya/__init__.py +++ b/custom_components/localtuya/pytuya/__init__.py @@ -45,14 +45,8 @@ import sys import time import binascii -try: - # raise ImportError - import Crypto - from Crypto.Cipher import AES # PyCrypto -except ImportError: - Crypto = AES = None - import pyaes # https://github.com/ricmoo/pyaes - +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes version_tuple = (8, 1, 0) version = version_string = __version__ = "%d.%d.%d" % version_tuple @@ -62,15 +56,6 @@ log = logging.getLogger(__name__) logging.basicConfig() # TODO include function name/line numbers in log # log.setLevel(level=logging.DEBUG) # Uncomment to Debug -log.debug("%s version %s", __name__, version) -log.debug("Python %s on %s", sys.version, sys.platform) -if Crypto is None: - log.debug("Using pyaes version %r", pyaes.VERSION) - log.debug("Using pyaes from %r", pyaes.__file__) -else: - log.debug("Using PyCrypto %r", Crypto.version_info) - log.debug("Using PyCrypto from %r", Crypto.__file__) - SET = "set" STATUS = "status" @@ -86,22 +71,13 @@ class AESCipher: def __init__(self, key): """Initialize a new AESCipher.""" self.bs = 16 - self.key = key + self.cipher = Cipher(algorithms.AES(key), modes.ECB(), default_backend()) def encrypt(self, raw, use_base64=True): """Encrypt data to be sent to device.""" - if Crypto: - raw = self._pad(raw) - cipher = AES.new(self.key, mode=AES.MODE_ECB) - crypted_text = cipher.encrypt(raw) - else: - _ = self._pad(raw) - cipher = pyaes.blockfeeder.Encrypter( - pyaes.AESModeOfOperationECB(self.key) - ) # no IV, auto pads to 16 - crypted_text = cipher.feed(raw) - crypted_text += cipher.feed() # flush final block - # print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) + encryptor = self.cipher.encryptor() + crypted_text = encryptor.update(self._pad(raw)) + encryptor.finalize() + if use_base64: return base64.b64encode(crypted_text) else: @@ -111,23 +87,9 @@ class AESCipher: """Decrypt data from device.""" if use_base64: enc = base64.b64decode(enc) - # print('enc (%d) %r' % (len(enc), enc)) - # enc = self._unpad(enc) - # enc = self._pad(enc) - # print('upadenc (%d) %r' % (len(enc), enc)) - if Crypto: - cipher = AES.new(self.key, AES.MODE_ECB) - raw = cipher.decrypt(enc) - # print('raw (%d) %r' % (len(raw), raw)) - return self._unpad(raw).decode("utf-8") - # return self._unpad(cipher.decrypt(enc)).decode('utf-8') - else: - cipher = pyaes.blockfeeder.Decrypter( - pyaes.AESModeOfOperationECB(self.key) - ) # no IV, auto pads to 16 - plain_text = cipher.feed(enc) - plain_text += cipher.feed() # flush final block - return plain_text + + decryptor = self.cipher.decryptor() + return self._unpad(decryptor.update(enc) + decryptor.finalize()).decode() def _pad(self, s): padnum = self.bs - len(s) % self.bs diff --git a/requirements_test.txt b/requirements_test.txt index 8c70fa8..8a7dd4e 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -2,4 +2,5 @@ black==20.8b1 codespell==1.17.1 flake8==3.8.3 mypy==0.782 -pydocstyle==5.1.1 \ No newline at end of file +pydocstyle==5.1.1 +cryptography==2.9.2 \ No newline at end of file