Updating the status of TuyaDevice using the response of the set_dps() call

This commit is contained in:
rospogrigio
2020-09-28 14:56:55 +02:00
committed by rospogrigio
parent d9c6d66bb7
commit 3fc19da259
2 changed files with 67 additions and 6 deletions

View File

@@ -253,6 +253,62 @@ class TuyaInterface:
"""
return self.exchange(SET, {str(dps_index): value})
def _decode_received_data(self, data, is_status):
"""Return device status."""
"""is_status may be True (result of a status request) or False (result of a set_dps request)"""
result = data[20:-8] # hard coded offsets
if self.dev_type != "type_0a":
result = result[15:]
elif not is_status:
result = result[15:]
log.debug("Decrypting %r :", result)
# result = data[data.find('{'):data.rfind('}')+1] # naive marker search,
# hope neither { nor } occur in header/footer
# print('result %r' % result)
if result.startswith(b"{"):
# this is the regular expected code path
if not isinstance(result, str):
result = result.decode()
result = json.loads(result)
elif result.startswith(PROTOCOL_VERSION_BYTES_31):
# got an encrypted payload, happens occasionally
# expect resulting json to look similar to:
# {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
# NOTE dps.2 may or may not be present
result = result[len(PROTOCOL_VERSION_BYTES_31) :] # remove version header
# remove (what I'm guessing, but not confirmed is) 16-bytes of MD5
# hexdigest of payload
result = result[16:]
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result)
#print("decrypted result=[{}]".format(result))
log.info("decrypted result=%r", result)
if not isinstance(result, str):
result = result.decode()
result = json.loads(result)
elif self.version == 3.3:
# results of a set_dps request must have a further offset
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result, False)
log.debug("decrypted result=%r", result)
if "data unvalid" in result:
self.dev_type = "type_0d"
log.info(
"'data unvalid' error detected: switching to dev_type %r",
self.dev_type,
)
return self.status()
if not isinstance(result, str):
result = result.decode()
result = json.loads(result)
else:
log.error("Unexpected status() payload=%r", result)
return result
def detect_available_dps(self):
"""Return which datapoints are supported by the device."""
# type_0d devices need a sort of bruteforce querying in order to detect the