Initial step to have continuous integration

This commit introduces the use of tox, which currently only verifies
that code is formatted according to the black project. More checks will
come in upcoming PRs for codespell, pydocstyle, flake8 and mypy.

A GitHub actions is included that runs tox on new pull requests, so we
get automatic feedback and can block commits that fail any check.
Another (official) action that runs hassfest is included as well, to
make sure we are compatible with Home Assistant.
This commit is contained in:
Pierre Ståhl
2020-09-22 09:38:14 +02:00
parent 83b3b6f21b
commit 30392d3ac5
7 changed files with 266 additions and 143 deletions

31
.github/workflows/ci.yaml vendored Normal file
View File

@@ -0,0 +1,31 @@
name: CI
on: [pull_request, pull_request_target]
jobs:
build:
name: >-
${{ matrix.python-version }}
/
${{ matrix.platform }}
runs-on: ${{ matrix.platform }}
strategy:
matrix:
# https://help.github.com/articles/virtual-environments-for-github-actions
platform:
- ubuntu-latest
python-version:
- 3.7
- 3.8
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade setuptools pip
python -m pip install tox-gh-actions
- name: Run tox
run: tox -q -p auto

14
.github/workflows/hassfest.yaml vendored Normal file
View File

@@ -0,0 +1,14 @@
name: Validate with hassfest
on:
push:
pull_request:
schedule:
- cron: "0 0 * * *"
jobs:
validate:
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: home-assistant/actions/hassfest@master

2
.gitignore vendored
View File

@@ -1,2 +1,4 @@
*~ *~
__pycache__ __pycache__
.tox
tuyadebug/

View File

@@ -55,34 +55,36 @@ except ImportError:
version_tuple = (8, 1, 0) version_tuple = (8, 1, 0)
version = version_string = __version__ = '%d.%d.%d' % version_tuple version = version_string = __version__ = "%d.%d.%d" % version_tuple
__author__ = 'rospogrigio' __author__ = "rospogrigio"
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
logging.basicConfig() # TODO include function name/line numbers in log logging.basicConfig() # TODO include function name/line numbers in log
# log.setLevel(level=logging.DEBUG) # Uncomment to Debug # log.setLevel(level=logging.DEBUG) # Uncomment to Debug
log.debug('%s version %s', __name__, version) log.debug("%s version %s", __name__, version)
log.debug('Python %s on %s', sys.version, sys.platform) log.debug("Python %s on %s", sys.version, sys.platform)
if Crypto is None: if Crypto is None:
log.debug('Using pyaes version %r', pyaes.VERSION) log.debug("Using pyaes version %r", pyaes.VERSION)
log.debug('Using pyaes from %r', pyaes.__file__) log.debug("Using pyaes from %r", pyaes.__file__)
else: else:
log.debug('Using PyCrypto %r', Crypto.version_info) log.debug("Using PyCrypto %r", Crypto.version_info)
log.debug('Using PyCrypto from %r', Crypto.__file__) log.debug("Using PyCrypto from %r", Crypto.__file__)
SET = 'set' SET = "set"
STATUS = 'status' STATUS = "status"
PROTOCOL_VERSION_BYTES_31 = b'3.1' PROTOCOL_VERSION_BYTES_31 = b"3.1"
PROTOCOL_VERSION_BYTES_33 = b'3.3' PROTOCOL_VERSION_BYTES_33 = b"3.3"
IS_PY2 = sys.version_info[0] == 2 IS_PY2 = sys.version_info[0] == 2
class AESCipher(object): class AESCipher(object):
def __init__(self, key): def __init__(self, key):
self.bs = 16 self.bs = 16
self.key = key self.key = key
def encrypt(self, raw, use_base64=True): def encrypt(self, raw, use_base64=True):
if Crypto: if Crypto:
raw = self._pad(raw) raw = self._pad(raw)
@@ -90,7 +92,9 @@ class AESCipher(object):
crypted_text = cipher.encrypt(raw) crypted_text = cipher.encrypt(raw)
else: else:
_ = self._pad(raw) _ = self._pad(raw)
cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 cipher = pyaes.blockfeeder.Encrypter(
pyaes.AESModeOfOperationECB(self.key)
) # no IV, auto pads to 16
crypted_text = cipher.feed(raw) crypted_text = cipher.feed(raw)
crypted_text += cipher.feed() # flush final block crypted_text += cipher.feed() # flush final block
# print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) # print('crypted_text (%d) %r' % (len(crypted_text), crypted_text))
@@ -110,16 +114,20 @@ class AESCipher(object):
cipher = AES.new(self.key, AES.MODE_ECB) cipher = AES.new(self.key, AES.MODE_ECB)
raw = cipher.decrypt(enc) raw = cipher.decrypt(enc)
# print('raw (%d) %r' % (len(raw), raw)) # print('raw (%d) %r' % (len(raw), raw))
return self._unpad(raw).decode('utf-8') return self._unpad(raw).decode("utf-8")
# return self._unpad(cipher.decrypt(enc)).decode('utf-8') # return self._unpad(cipher.decrypt(enc)).decode('utf-8')
else: else:
cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 cipher = pyaes.blockfeeder.Decrypter(
pyaes.AESModeOfOperationECB(self.key)
) # no IV, auto pads to 16
plain_text = cipher.feed(enc) plain_text = cipher.feed(enc)
plain_text += cipher.feed() # flush final block plain_text += cipher.feed() # flush final block
return plain_text return plain_text
def _pad(self, s): def _pad(self, s):
padnum = self.bs - len(s) % self.bs padnum = self.bs - len(s) % self.bs
return s + padnum * chr(padnum).encode() return s + padnum * chr(padnum).encode()
@staticmethod @staticmethod
def _unpad(s): def _unpad(s):
return s[: -ord(s[len(s) - 1 :])] return s[: -ord(s[len(s) - 1 :])]
@@ -127,55 +135,46 @@ class AESCipher(object):
def bin2hex(x, pretty=False): def bin2hex(x, pretty=False):
if pretty: if pretty:
space = ' ' space = " "
else: else:
space = '' space = ""
if IS_PY2: if IS_PY2:
result = ''.join('%02X%s' % (ord(y), space) for y in x) result = "".join("%02X%s" % (ord(y), space) for y in x)
else: else:
result = ''.join('%02X%s' % (y, space) for y in x) result = "".join("%02X%s" % (y, space) for y in x)
return result return result
def hex2bin(x): def hex2bin(x):
if IS_PY2: if IS_PY2:
return x.decode('hex') return x.decode("hex")
else: else:
return bytes.fromhex(x) return bytes.fromhex(x)
# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi : # This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi :
# type_0a devices require the 0a command as the status request # type_0a devices require the 0a command as the status request
# type_0d devices require the 0d command as the status request, and the list of dps used set to null in the request payload (see generate_payload method) # type_0d devices require the 0d command as the status request, and the list of dps used set to null in the request payload (see generate_payload method)
payload_dict = { payload_dict = {
"type_0a": { "type_0a": {
"status": { "status": {"hexByte": "0a", "command": {"gwId": "", "devId": ""}},
"hexByte": "0a", "set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}},
"command": {"gwId": "", "devId": ""}
},
"set": {
"hexByte": "07",
"command": {"devId": "", "uid": "", "t": ""}
},
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"suffix": "000000000000aa55" "suffix": "000000000000aa55",
}, },
"type_0d": { "type_0d": {
"status": { "status": {"hexByte": "0d", "command": {"devId": "", "uid": "", "t": ""}},
"hexByte": "0d", "set": {"hexByte": "07", "command": {"devId": "", "uid": "", "t": ""}},
"command": {"devId": "", "uid": "", "t": ""}
},
"set": {
"hexByte": "07",
"command": {"devId": "", "uid": "", "t": ""}
},
"prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte)
"suffix": "000000000000aa55" "suffix": "000000000000aa55",
} },
} }
class TuyaInterface(object): class TuyaInterface(object):
def __init__(self, dev_id, address, local_key, protocol_version, connection_timeout=10): def __init__(
self, dev_id, address, local_key, protocol_version, connection_timeout=10
):
""" """
Represents a Tuya device. Represents a Tuya device.
@@ -190,16 +189,16 @@ class TuyaInterface(object):
self.id = dev_id self.id = dev_id
self.address = address self.address = address
self.local_key = local_key.encode('latin1') self.local_key = local_key.encode("latin1")
self.connection_timeout = connection_timeout self.connection_timeout = connection_timeout
self.version = protocol_version self.version = protocol_version
self.dev_type = 'type_0a' self.dev_type = "type_0a"
self.dps_to_request = {} self.dps_to_request = {}
self.port = 6668 # default - do not expect caller to pass in self.port = 6668 # default - do not expect caller to pass in
def __repr__(self): def __repr__(self):
return '%r' % ((self.id, self.address),) # FIXME can do better than this return "%r" % ((self.id, self.address),) # FIXME can do better than this
def _send_receive(self, payload): def _send_receive(self, payload):
""" """
@@ -214,12 +213,12 @@ class TuyaInterface(object):
s.settimeout(self.connection_timeout) s.settimeout(self.connection_timeout)
s.connect((self.address, self.port)) s.connect((self.address, self.port))
except Exception as e: except Exception as e:
print('Failed to connect to %s. Raising Exception.' % (self.address)) print("Failed to connect to %s. Raising Exception." % (self.address))
raise e raise e
try: try:
s.send(payload) s.send(payload)
except Exception as e: except Exception as e:
print('Failed to send payload to %s. Raising Exception.' % (self.address)) print("Failed to send payload to %s. Raising Exception." % (self.address))
# s.close() # s.close()
raise e raise e
@@ -232,7 +231,7 @@ class TuyaInterface(object):
data = s.recv(1024) data = s.recv(1024)
# print("SECOND: Received %d bytes" % len(data) ) # print("SECOND: Received %d bytes" % len(data) )
except Exception as e: except Exception as e:
print('Failed to receive data from %s. Raising Exception.' % (self.address)) print("Failed to receive data from %s. Raising Exception." % (self.address))
# s.close() # s.close()
raise e raise e
@@ -304,82 +303,111 @@ class TuyaInterface(object):
data(dict, optional): The data to be send. data(dict, optional): The data to be send.
This is what will be passed via the 'dps' entry This is what will be passed via the 'dps' entry
""" """
json_data = payload_dict[self.dev_type][command]['command'] json_data = payload_dict[self.dev_type][command]["command"]
command_hb = payload_dict[self.dev_type][command]['hexByte'] command_hb = payload_dict[self.dev_type][command]["hexByte"]
if 'gwId' in json_data: if "gwId" in json_data:
json_data['gwId'] = self.id json_data["gwId"] = self.id
if 'devId' in json_data: if "devId" in json_data:
json_data['devId'] = self.id json_data["devId"] = self.id
if 'uid' in json_data: if "uid" in json_data:
json_data['uid'] = self.id # still use id, no seperate uid json_data["uid"] = self.id # still use id, no seperate uid
if 't' in json_data: if "t" in json_data:
json_data['t'] = str(int(time.time())) json_data["t"] = str(int(time.time()))
if data is not None: if data is not None:
json_data['dps'] = data json_data["dps"] = data
if command_hb == '0d': if command_hb == "0d":
json_data['dps'] = self.dps_to_request json_data["dps"] = self.dps_to_request
# log.info('******** COMMAND IS %r', self.dps_to_request) # log.info('******** COMMAND IS %r', self.dps_to_request)
# Create byte buffer from hex data # Create byte buffer from hex data
json_payload = json.dumps(json_data) json_payload = json.dumps(json_data)
# print(json_payload) # print(json_payload)
json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! json_payload = json_payload.replace(
json_payload = json_payload.encode('utf-8') " ", ""
log.debug('json_payload=%r', json_payload) ) # if spaces are not removed device does not respond!
json_payload = json_payload.encode("utf-8")
log.debug("json_payload=%r", json_payload)
# print('json_payload = ', json_payload, ' cmd = ', command_hb) # print('json_payload = ', json_payload, ' cmd = ', command_hb)
if self.version == 3.3: if self.version == 3.3:
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new self.cipher = AESCipher(
self.local_key
) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload, False) json_payload = self.cipher.encrypt(json_payload, False)
self.cipher = None self.cipher = None
if command_hb != '0a': if command_hb != "0a":
# add the 3.3 header # add the 3.3 header
json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload json_payload = (
PROTOCOL_VERSION_BYTES_33
+ b"\0\0\0\0\0\0\0\0\0\0\0\0"
+ json_payload
)
elif command == SET: elif command == SET:
# need to encrypt # need to encrypt
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new self.cipher = AESCipher(
self.local_key
) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload) json_payload = self.cipher.encrypt(json_payload)
preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key preMd5String = (
b"data="
+ json_payload
+ b"||lpv="
+ PROTOCOL_VERSION_BYTES_31
+ b"||"
+ self.local_key
)
m = md5() m = md5()
m.update(preMd5String) m.update(preMd5String)
hexdigest = m.hexdigest() hexdigest = m.hexdigest()
json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload json_payload = (
PROTOCOL_VERSION_BYTES_31
+ hexdigest[8:][:16].encode("latin1")
+ json_payload
)
self.cipher = None # expect to connect and then disconnect to set new self.cipher = None # expect to connect and then disconnect to set new
postfix_payload = hex2bin(
postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) bin2hex(json_payload) + payload_dict[self.dev_type]["suffix"]
assert len(postfix_payload) <= 0xff )
postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) assert len(postfix_payload) <= 0xFF
buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + postfix_payload_hex_len = "%x" % len(
payload_dict[self.dev_type][command]['hexByte'] + postfix_payload
'000000' + ) # TODO this assumes a single byte 0-255 (0x00-0xff)
postfix_payload_hex_len ) + postfix_payload buffer = (
hex2bin(
payload_dict[self.dev_type]["prefix"]
+ payload_dict[self.dev_type][command]["hexByte"]
+ "000000"
+ postfix_payload_hex_len
)
+ postfix_payload
)
# calc the CRC of everything except where the CRC goes and the suffix # calc the CRC of everything except where the CRC goes and the suffix
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X') hex_crc = format(binascii.crc32(buffer[:-8]) & 0xFFFFFFFF, "08X")
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:]
# print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) )) # print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) ))
# print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer))) # print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer)))
return buffer return buffer
def status(self): def status(self):
log.debug('status() entry (dev_type is %s)', self.dev_type) log.debug("status() entry (dev_type is %s)", self.dev_type)
# open device, send request, then close connection # open device, send request, then close connection
payload = self.generate_payload('status') payload = self.generate_payload("status")
data = self._send_receive(payload) data = self._send_receive(payload)
log.debug('status received data=%r', data) log.debug("status received data=%r", data)
result = data[20:-8] # hard coded offsets result = data[20:-8] # hard coded offsets
if self.dev_type != 'type_0a': if self.dev_type != "type_0a":
result = result[15:] result = result[15:]
log.debug('result=%r', result) log.debug("result=%r", result)
# result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer # result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer
# print('result %r' % result) # print('result %r' % result)
if result.startswith(b'{'): if result.startswith(b"{"):
# this is the regular expected code path # this is the regular expected code path
if not isinstance(result, str): if not isinstance(result, str):
result = result.decode() result = result.decode()
@@ -389,27 +417,32 @@ class TuyaInterface(object):
# expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
# NOTE dps.2 may or may not be present # NOTE dps.2 may or may not be present
result = result[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header result = result[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header
result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload result = result[
16:
] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload
cipher = AESCipher(self.local_key) cipher = AESCipher(self.local_key)
result = cipher.decrypt(result) result = cipher.decrypt(result)
print('decrypted result=[{}]'.format(result)) print("decrypted result=[{}]".format(result))
log.debug('decrypted result=%r', result) log.debug("decrypted result=%r", result)
if not isinstance(result, str): if not isinstance(result, str):
result = result.decode() result = result.decode()
result = json.loads(result) result = json.loads(result)
elif self.version == 3.3: elif self.version == 3.3:
cipher = AESCipher(self.local_key) cipher = AESCipher(self.local_key)
result = cipher.decrypt(result, False) result = cipher.decrypt(result, False)
log.debug('decrypted result=%r', result) log.debug("decrypted result=%r", result)
if "data unvalid" in result: if "data unvalid" in result:
self.dev_type = 'type_0d' self.dev_type = "type_0d"
log.debug("'data unvalid' error detected: switching to dev_type %r", self.dev_type) log.debug(
"'data unvalid' error detected: switching to dev_type %r",
self.dev_type,
)
return self.status() return self.status()
if not isinstance(result, str): if not isinstance(result, str):
result = result.decode() result = result.decode()
result = json.loads(result) result = json.loads(result)
else: else:
log.error('Unexpected status() payload=%r', result) log.error("Unexpected status() payload=%r", result)
return result return result
@@ -425,15 +458,13 @@ class TuyaInterface(object):
if isinstance(dps_index, int): if isinstance(dps_index, int):
dps_index = str(dps_index) # index and payload is a string dps_index = str(dps_index) # index and payload is a string
payload = self.generate_payload(SET, { payload = self.generate_payload(SET, {dps_index: value})
dps_index: value})
data = self._send_receive(payload) data = self._send_receive(payload)
log.debug('set_dps received data=%r', data) log.debug("set_dps received data=%r", data)
return data return data
def set_timer(self, num_secs): def set_timer(self, num_secs):
""" """
Set a timer. Set a timer.
@@ -445,7 +476,7 @@ class TuyaInterface(object):
# Dumb heuristic; Query status, pick last device id as that is probably the timer # Dumb heuristic; Query status, pick last device id as that is probably the timer
status = self.status() status = self.status()
devices = status['dps'] devices = status["dps"]
devices_numbers = list(devices.keys()) devices_numbers = list(devices.keys())
devices_numbers.sort() devices_numbers.sort()
dps_id = devices_numbers[-1] dps_id = devices_numbers[-1]
@@ -453,5 +484,5 @@ class TuyaInterface(object):
payload = self.generate_payload(SET, {dps_id: num_secs}) payload = self.generate_payload(SET, {dps_id: num_secs})
data = self._send_receive(payload) data = self._send_receive(payload)
log.debug('set_timer received data=%r', data) log.debug("set_timer received data=%r", data)
return data return data

3
pyproject.toml Normal file
View File

@@ -0,0 +1,3 @@
[tool.black]
target-version = ["py37", "py38"]
include = "custom_components/localtuya/*.py"

2
requirements_test.txt Normal file
View File

@@ -0,0 +1,2 @@
black==20.8b1
mypy==0.782

40
tox.ini Normal file
View File

@@ -0,0 +1,40 @@
[tox]
skipsdist = true
envlist = py{37,38}, lint, typing
skip_missing_interpreters = True
cs_exclude_words =
[gh-actions]
python =
3.7: clean, py37, lint, typing
3.8: clean, py38, lint, typing
[testenv]
passenv = TOXENV CI
whitelist_externals =
true
setenv =
LANG=en_US.UTF-8
PYTHONPATH = {toxinidir}/localtuya-homeassistant
deps =
-r{toxinidir}/requirements_test.txt
commands =
true # TODO: Run tests later
#pytest -n auto --log-level=debug -v --timeout=30 --durations=10 {posargs}
[testenv:lint]
ignore_errors = True
deps =
{[testenv]deps}
commands =
#codespell -q 4 -L {[tox]cs_exclude_words} --skip="*.pyc,*.pyi,*~" custom_components
#flake8 cu
black --fast --check .
#pydocstyle -v custom_components
[testenv:typing]
whitelist_externals =
true
commands =
true
#mypy --ignore-missing-imports --follow-imports=skip custom_components