# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices # E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U # SKYROKU SM-PW701U Wi-Fi Plug Smart Plug # Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa # # This would not exist without the protocol reverse engineering from # https://github.com/codetheweb/tuyapi by codetheweb and blackrozes # # Tested with Python 2.7 and Python 3.6.1 only import base64 from hashlib import md5 import json import logging import socket import sys import time import colorsys import binascii try: #raise ImportError import Crypto from Crypto.Cipher import AES # PyCrypto except ImportError: Crypto = AES = None import pyaes # https://github.com/ricmoo/pyaes version_tuple = (7, 0, 7) version = version_string = __version__ = '%d.%d.%d' % version_tuple __author__ = 'rospogrigio' log = logging.getLogger(__name__) logging.basicConfig() # TODO include function name/line numbers in log #log.setLevel(level=logging.DEBUG) # Debug hack! log.debug('%s version %s', __name__, version) log.debug('Python %s on %s', sys.version, sys.platform) if Crypto is None: log.debug('Using pyaes version %r', pyaes.VERSION) log.debug('Using pyaes from %r', pyaes.__file__) else: log.debug('Using PyCrypto %r', Crypto.version_info) log.debug('Using PyCrypto from %r', Crypto.__file__) SET = 'set' STATUS = 'status' PROTOCOL_VERSION_BYTES_31 = b'3.1' PROTOCOL_VERSION_BYTES_33 = b'3.3' IS_PY2 = sys.version_info[0] == 2 class AESCipher(object): def __init__(self, key): #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ self.bs = 16 self.key = key def encrypt(self, raw, use_base64 = True): if Crypto: raw = self._pad(raw) cipher = AES.new(self.key, mode=AES.MODE_ECB) crypted_text = cipher.encrypt(raw) else: _ = self._pad(raw) cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 crypted_text = cipher.feed(raw) crypted_text += cipher.feed() # flush final block #print('crypted_text %r' % crypted_text) #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) if use_base64: return base64.b64encode(crypted_text) else: return crypted_text def decrypt(self, enc, use_base64=True): if use_base64: enc = base64.b64decode(enc) #print('enc (%d) %r' % (len(enc), enc)) #enc = self._unpad(enc) #enc = self._pad(enc) #print('upadenc (%d) %r' % (len(enc), enc)) if Crypto: cipher = AES.new(self.key, AES.MODE_ECB) raw = cipher.decrypt(enc) #print('raw (%d) %r' % (len(raw), raw)) return self._unpad(raw).decode('utf-8') #return self._unpad(cipher.decrypt(enc)).decode('utf-8') else: cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 plain_text = cipher.feed(enc) plain_text += cipher.feed() # flush final block return plain_text def _pad(self, s): padnum = self.bs - len(s) % self.bs return s + padnum * chr(padnum).encode() @staticmethod def _unpad(s): return s[:-ord(s[len(s)-1:])] def bin2hex(x, pretty=False): if pretty: space = ' ' else: space = '' if IS_PY2: result = ''.join('%02X%s' % (ord(y), space) for y in x) else: result = ''.join('%02X%s' % (y, space) for y in x) return result def hex2bin(x): if IS_PY2: return x.decode('hex') else: return bytes.fromhex(x) # This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi # device20 or device22 are to be used depending on the length of dev_id (20 or 22 chars) payload_dict = { "device20": { "status": { "hexByte": "0a", "command": {"gwId": "", "devId": ""} }, "set": { "hexByte": "07", "command": {"devId": "", "uid": "", "t": ""} }, "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) "suffix": "000000000000aa55" }, "device22": { "status": { "hexByte": "0d", "command": {"devId": "", "uid": "", "t": ""} }, "set": { "hexByte": "07", "command": {"devId": "", "uid": "", "t": ""} }, "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) "suffix": "000000000000aa55" } } class XenonDevice(object): def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10): """ Represents a Tuya device. Args: dev_id (str): The device id. address (str): The network address. local_key (str, optional): The encryption key. Defaults to None. dev_type (str, optional): The device type. It will be used as key for lookups in payload_dict. Defaults to None. Attributes: port (int): The port to connect to. """ self.id = dev_id self.address = address self.local_key = local_key self.local_key = local_key.encode('latin1') self.dev_type = dev_type self.connection_timeout = connection_timeout self.version = 3.1 self.port = 6668 # default - do not expect caller to pass in def __repr__(self): return '%r' % ((self.id, self.address),) # FIXME can do better than this def _send_receive(self, payload): """ Send single buffer `payload` and receive a single buffer. Args: payload(bytes): Data to send. """ try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.settimeout(self.connection_timeout) s.connect((self.address, self.port)) except Exception as e: print('Failed to connect to %s. Raising Exception.' % (self.address)) raise e try: s.send(payload) except Exception as e: print('Failed to send payload to %s. Raising Exception.' % (self.address)) #s.close() raise e try: data = s.recv(1024) # print("FIRST: Received %d bytes" % len(data) ) # sometimes the first packet does not contain data (typically 28 bytes): need to read again if len(data) < 40: time.sleep(0.1) data = s.recv(1024) # print("SECOND: Received %d bytes" % len(data) ) except Exception as e: print('Failed to receive data from %s. Raising Exception.' % (self.address)) #s.close() raise e s.close() return data def set_version(self, version): self.version = version def generate_payload(self, command, data=None): """ Generate the payload to send. Args: command(str): The type of command. This is one of the entries from payload_dict data(dict, optional): The data to be send. This is what will be passed via the 'dps' entry """ json_data = payload_dict[self.dev_type][command]['command'] command_hb = payload_dict[self.dev_type][command]['hexByte'] if 'gwId' in json_data: json_data['gwId'] = self.id if 'devId' in json_data: json_data['devId'] = self.id if 'uid' in json_data: json_data['uid'] = self.id # still use id, no seperate uid if 't' in json_data: json_data['t'] = str(int(time.time())) if data is not None: json_data['dps'] = data if command_hb == '0d': json_data['dps'] = {"1": None,"101": None,"102": None} # Create byte buffer from hex data json_payload = json.dumps(json_data) #print(json_payload) json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! json_payload = json_payload.encode('utf-8') log.debug('json_payload=%r', json_payload) #print('json_payload = ', json_payload, ' cmd = ', command_hb) if self.version == 3.3: self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new json_payload = self.cipher.encrypt(json_payload, False) self.cipher = None if command_hb != '0a': # add the 3.3 header json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload elif command == SET: # need to encrypt #print('json_payload %r' % json_payload) self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new json_payload = self.cipher.encrypt(json_payload) #print('crypted json_payload %r' % json_payload) preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key #print('preMd5String %r' % preMd5String) m = md5() m.update(preMd5String) #print(repr(m.digest())) hexdigest = m.hexdigest() #print(hexdigest) #print(hexdigest[8:][:16]) json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload #print('data_to_send') #print(json_payload) #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) #print('json_payload %r' % repr(json_payload)) #print('json_payload len %r' % len(json_payload)) #print(bin2hex(json_payload)) self.cipher = None # expect to connect and then disconnect to set new postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) #print('postfix_payload %r' % postfix_payload) #print('postfix_payload %r' % len(postfix_payload)) #print('postfix_payload %x' % len(postfix_payload)) #print('postfix_payload %r' % hex(len(postfix_payload))) assert len(postfix_payload) <= 0xff postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + payload_dict[self.dev_type][command]['hexByte'] + '000000' + postfix_payload_hex_len ) + postfix_payload # calc the CRC of everything except where the CRC goes and the suffix hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X') buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] #print('command', command) #print('prefix') #print(payload_dict[self.dev_type][command]['prefix']) #print(repr(buffer)) #print(bin2hex(buffer, pretty=False)) #print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) )) #print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer))) return buffer class Device(XenonDevice): def __init__(self, dev_id, address, local_key=None, dev_type=None): super(Device, self).__init__(dev_id, address, local_key, dev_type) def status(self): log.debug('status() entry (dev_type is %s)', self.dev_type) # open device, send request, then close connection payload = self.generate_payload('status') data = self._send_receive(payload) log.debug('status received data=%r', data) result = data[20:-8] # hard coded offsets if self.dev_type != 'device20': result = result[15:] log.debug('result=%r', result) #result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer #print('result %r' % result) if result.startswith(b'{'): # this is the regular expected code path if not isinstance(result, str): result = result.decode() result = json.loads(result) elif result.startswith(PROTOCOL_VERSION_BYTES_31): # got an encrypted payload, happens occasionally # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} # NOTE dps.2 may or may not be present result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload cipher = AESCipher(self.local_key) result = cipher.decrypt(result) log.debug('decrypted result=%r', result) if not isinstance(result, str): result = result.decode() result = json.loads(result) elif self.version == 3.3: cipher = AESCipher(self.local_key) result = cipher.decrypt(result, False) log.debug('decrypted result=%r', result) if not isinstance(result, str): result = result.decode() result = json.loads(result) else: log.error('Unexpected status() payload=%r', result) # if self.dev_type == 'cover': # result = result.encode('utf-8') # log.debug('encoded result=%r', result) return result def set_status(self, on, switch=1): """ Set status of the device to 'on' or 'off'. Args: on(bool): True for 'on', False for 'off'. switch(int): The switch to set """ # open device, send request, then close connection if isinstance(switch, int): switch = str(switch) # index and payload is a string payload = self.generate_payload(SET, {switch:on}) #print('payload %r' % payload) data = self._send_receive(payload) log.debug('set_status received data=%r', data) return data def set_value(self, index, value): """ Set int value of any index. Args: index(int): index to set value(int): new value for the index """ # open device, send request, then close connection if isinstance(index, int): index = str(index) # index and payload is a string payload = self.generate_payload(SET, { index: value}) data = self._send_receive(payload) return data def turn_on(self, switch=1): """Turn the device on""" self.set_status(True, switch) def turn_off(self, switch=1): """Turn the device off""" self.set_status(False, switch) def set_timer(self, num_secs): """ Set a timer. Args: num_secs(int): Number of seconds """ # FIXME / TODO support schemas? Accept timer id number as parameter? # Dumb heuristic; Query status, pick last device id as that is probably the timer status = self.status() devices = status['dps'] devices_numbers = list(devices.keys()) devices_numbers.sort() dps_id = devices_numbers[-1] payload = self.generate_payload(SET, {dps_id:num_secs}) data = self._send_receive(payload) log.debug('set_timer received data=%r', data) return data class OutletDevice(Device): def __init__(self, dev_id, address, local_key=None): if len(dev_id) == 22: dev_type = 'device22' else: dev_type = 'device20' super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) class CoverEntity(Device): DPS_INDEX_MOVE = '1' DPS_INDEX_BL = '101' DPS_2_STATE = { '1':'movement', '101':'backlight', } def __init__(self, dev_id, address, local_key=None): dev_type = 'device22' print('%s version %s' % ( __name__, version)) print('Python %s on %s' % (sys.version, sys.platform)) if Crypto is None: print('Using pyaes version ', pyaes.VERSION) print('Using pyaes from ', pyaes.__file__) else: print('Using PyCrypto ', Crypto.version_info) print('Using PyCrypto from ', Crypto.__file__) super(CoverEntity, self).__init__(dev_id, address, local_key, dev_type) def open_cover(self, switch=1): """Turn the device on""" self.set_status('on', switch) def close_cover(self, switch=1): """Turn the device off""" self.set_status('off', switch) def stop_cover(self, switch=1): """Turn the device off""" self.set_status('stop', switch) class BulbDevice(Device): DPS_INDEX_ON = '1' DPS_INDEX_MODE = '2' DPS_INDEX_BRIGHTNESS = '3' DPS_INDEX_COLOURTEMP = '4' DPS_INDEX_COLOUR = '5' DPS = 'dps' DPS_MODE_COLOUR = 'colour' DPS_MODE_WHITE = 'white' DPS_2_STATE = { '1':'is_on', '2':'mode', '3':'brightness', '4':'colourtemp', '5':'colour', } def __init__(self, dev_id, address, local_key=None): dev_type = 'device20' super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) @staticmethod def _rgb_to_hexvalue(r, g, b): """ Convert an RGB value to the hex representation expected by tuya. Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: rrggbb0hhhssvv While r, g and b are just hexadecimal values of the corresponding Red, Green and Blue values, the h, s and v values (which are values between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. Args: r(int): Value for the colour red as int from 0-255. g(int): Value for the colour green as int from 0-255. b(int): Value for the colour blue as int from 0-255. """ rgb = [r,g,b] hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255) hexvalue = "" for value in rgb: temp = str(hex(int(value))).replace("0x","") if len(temp) == 1: temp = "0" + temp hexvalue = hexvalue + temp hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] hexvalue_hsv = "" for value in hsvarray: temp = str(hex(int(value))).replace("0x","") if len(temp) == 1: temp = "0" + temp hexvalue_hsv = hexvalue_hsv + temp if len(hexvalue_hsv) == 7: hexvalue = hexvalue + "0" + hexvalue_hsv else: hexvalue = hexvalue + "00" + hexvalue_hsv return hexvalue @staticmethod def _hexvalue_to_rgb(hexvalue): """ Converts the hexvalue used by tuya for colour representation into an RGB value. Args: hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() """ r = int(hexvalue[0:2], 16) g = int(hexvalue[2:4], 16) b = int(hexvalue[4:6], 16) return (r, g, b) @staticmethod def _hexvalue_to_hsv(hexvalue): """ Converts the hexvalue used by tuya for colour representation into an HSV value. Args: hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() """ h = int(hexvalue[7:10], 16) / 360 s = int(hexvalue[10:12], 16) / 255 v = int(hexvalue[12:14], 16) / 255 return (h, s, v) def set_colour(self, r, g, b): """ Set colour of an rgb bulb. Args: r(int): Value for the colour red as int from 0-255. g(int): Value for the colour green as int from 0-255. b(int): Value for the colour blue as int from 0-255. """ if not 0 <= r <= 255: raise ValueError("The value for red needs to be between 0 and 255.") if not 0 <= g <= 255: raise ValueError("The value for green needs to be between 0 and 255.") if not 0 <= b <= 255: raise ValueError("The value for blue needs to be between 0 and 255.") #print(BulbDevice) hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) payload = self.generate_payload(SET, { self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR, self.DPS_INDEX_COLOUR: hexvalue}) data = self._send_receive(payload) return data def set_white(self, brightness, colourtemp): """ Set white coloured theme of an rgb bulb. Args: brightness(int): Value for the brightness (25-255). colourtemp(int): Value for the colour temperature (0-255). """ if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") if not 0 <= colourtemp <= 255: raise ValueError("The colour temperature needs to be between 0 and 255.") payload = self.generate_payload(SET, { self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, self.DPS_INDEX_BRIGHTNESS: brightness, self.DPS_INDEX_COLOURTEMP: colourtemp}) data = self._send_receive(payload) return data def set_brightness(self, brightness): """ Set the brightness value of an rgb bulb. Args: brightness(int): Value for the brightness (25-255). """ if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness}) data = self._send_receive(payload) return data def set_colourtemp(self, colourtemp): """ Set the colour temperature of an rgb bulb. Args: colourtemp(int): Value for the colour temperature (0-255). """ if not 0 <= colourtemp <= 255: raise ValueError("The colour temperature needs to be between 0 and 255.") payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp}) data = self._send_receive(payload) return data def brightness(self): """Return brightness value""" return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] def colourtemp(self): """Return colour temperature""" return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] def colour_rgb(self): """Return colour as RGB value""" hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] return BulbDevice._hexvalue_to_rgb(hexvalue) def colour_hsv(self): """Return colour as HSV value""" hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] return BulbDevice._hexvalue_to_hsv(hexvalue) def state(self): status = self.status() state = {} for key in status[self.DPS].keys(): if(int(key)<=5): state[self.DPS_2_STATE[key]]=status[self.DPS][key] return state