diff --git a/custom_components/localtuya/cover.py b/custom_components/localtuya/cover.py new file mode 100644 index 0000000..539d56e --- /dev/null +++ b/custom_components/localtuya/cover.py @@ -0,0 +1,208 @@ +""" +Simple platform to control LOCALLY Tuya cover devices. + +Sample config yaml + +switch: + - platform: localtuya + host: 192.168.0.123 + local_key: 1234567891234567 + device_id: 123456789123456789abcd + name: Cover guests + protocol_version: 3.3 + id: 1 + +""" +import logging +import requests + +import voluptuous as vol + +from homeassistant.components.cover import ( + CoverDevice, + PLATFORM_SCHEMA, + SUPPORT_CLOSE, + SUPPORT_OPEN, + SUPPORT_STOP, +) + +"""from . import DATA_TUYA, TuyaDevice""" +"""from homeassistant.components.cover import CoverDevice, PLATFORM_SCHEMA""" +from homeassistant.const import (CONF_HOST, CONF_ID, CONF_FRIENDLY_NAME, CONF_ICON, CONF_NAME) +import homeassistant.helpers.config_validation as cv +from time import time, sleep +from threading import Lock + +_LOGGER = logging.getLogger(__name__) + +DEFAULT_NAME = 'localtuyacover' + +REQUIREMENTS = ['pytuya==7.0.7'] + +CONF_DEVICE_ID = 'device_id' +CONF_LOCAL_KEY = 'local_key' +CONF_PROTOCOL_VERSION = 'protocol_version' + +DEFAULT_ID = '1' +DEFAULT_PROTOCOL_VERSION = 3.3 + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ + vol.Optional(CONF_ICON): cv.icon, + vol.Required(CONF_HOST): cv.string, + vol.Required(CONF_DEVICE_ID): cv.string, + vol.Required(CONF_LOCAL_KEY): cv.string, + vol.Required(CONF_NAME): cv.string, + vol.Required(CONF_PROTOCOL_VERSION, default=DEFAULT_PROTOCOL_VERSION): vol.Coerce(float), + vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string, +}) + + + + +def setup_platform(hass, config, add_entities, discovery_info=None): + """Set up Tuya cover devices.""" + from . import pytuya + + covers = [] + localtuyadevice = pytuya.CoverDevice(config.get(CONF_DEVICE_ID), config.get(CONF_HOST), config.get(CONF_LOCAL_KEY)) + localtuyadevice.set_version(float(config.get(CONF_PROTOCOL_VERSION))) + + cover_device = TuyaCoverCache(localtuyadevice) + covers.append( + TuyaDevice( + cover_device, + config.get(CONF_NAME), + config.get(CONF_ICON), + config.get(CONF_ID), + ) + ) + print('Setup localtuya cover [{}] with device ID [{}] '.format(config.get(CONF_NAME), config.get(CONF_ID))) + + add_entities(covers) + + +class TuyaCoverCache: + """Cache wrapper for pytuya.CoverDevice""" + + def __init__(self, device): + """Initialize the cache.""" + self._cached_status = '' + self._cached_status_time = 0 + self._device = device + self._lock = Lock() + + def __get_status(self): + for i in range(20): + try: + status = self._device.status() + return status + except ConnectionError: + if i+1 == 3: + raise ConnectionError("Failed to update status.") + + def set_status(self, state, switchid): + """Change the Tuya switch status and clear the cache.""" + self._cached_status = '' + self._cached_status_time = 0 + for i in range(20): + try: + return self._device.set_status(state, switchid) + except ConnectionError: + if i+1 == 5: + raise ConnectionError("Failed to set status.") + + def status(self): + """Get state of Tuya switch and cache the results.""" + self._lock.acquire() + try: + now = time() + if not self._cached_status or now - self._cached_status_time > 30: + sleep(0.5) + self._cached_status = self.__get_status() + self._cached_status_time = time() + return self._cached_status + finally: + self._lock.release() + +class TuyaDevice(CoverDevice): + """Tuya cover devices.""" + + def __init__(self, device, name, icon, switchid): + self._device = device + self._name = name + self._icon = icon + self._switch_id = switchid + #self.entity_id = ENTITY_ID_FORMAT.format(_device.object_id()) + self._status = self._device.status() + self._state = self._status['dps'][self._switch_id] + print('Initialized tuya cover [{}] with switch status [{}] and state [{}]'.format(self._name, self._status, self._state)) + + @property + def name(self): + """Get name of Tuya switch.""" + return self._name + + @property + def supported_features(self): + """Flag supported features.""" + supported_features = SUPPORT_OPEN | SUPPORT_CLOSE | SUPPORT_STOP + return supported_features + + @property + def icon(self): + """Return the icon.""" + return self._icon + + @property + def is_opening(self): + #self.update() + state = self._state + #print('is_opening() : state [{}]'.format(state)) + if state == 'on': + return True + return False + + @property + def is_closing(self): + #self.update() + state = self._state + #print('is_closing() : state [{}]'.format(state)) + if state == 'off': + return True + return False + + @property + def is_closed(self): + """Return if the cover is closed or not.""" + #self.update() + state = self._state + #print('is_closed() : state [{}]'.format(state)) + if state == 'off': + return False + if state == 'on': + return True + return None + + def open_cover(self, **kwargs): + """Open the cover.""" + self._device.set_status('on', self._switch_id) +# self._state = 'on' +# self._device._device.open_cover() + + def close_cover(self, **kwargs): + """Close cover.""" + self._device.set_status('off', self._switch_id) +# self._state = 'off' +# self._device._device.close_cover() + + def stop_cover(self, **kwargs): + """Stop the cover.""" + self._device.set_status('stop', self._switch_id) +# self._state = 'stop' +# self._device._device.stop_cover() + + def update(self): + """Get state of Tuya switch.""" + self._status = self._device.status() + self._state = self._status['dps'][self._switch_id] + #print('update() : state [{}]'.format(self._state)) diff --git a/custom_components/localtuya/light.py b/custom_components/localtuya/light.py new file mode 100644 index 0000000..a9dd6b6 --- /dev/null +++ b/custom_components/localtuya/light.py @@ -0,0 +1,288 @@ +""" +Simple platform to control LOCALLY Tuya switch devices. + +Sample config yaml + +switch: + - platform: localtuya + host: 192.168.0.1 + local_key: 1234567891234567 + device_id: 12345678912345671234 + name: tuya_01 + protocol_version: 3.3 +""" +import voluptuous as vol +from homeassistant.const import (CONF_HOST, CONF_ID, CONF_SWITCHES, CONF_FRIENDLY_NAME, CONF_ICON, CONF_NAME) +import homeassistant.helpers.config_validation as cv +from time import time, sleep +from threading import Lock +import logging +from homeassistant.components.light import ( + ATTR_BRIGHTNESS, + ATTR_COLOR_TEMP, + ATTR_HS_COLOR, + ENTITY_ID_FORMAT, + SUPPORT_BRIGHTNESS, + SUPPORT_COLOR, + SUPPORT_COLOR_TEMP, + Light, + PLATFORM_SCHEMA +) +from homeassistant.util import color as colorutil +import socket + +REQUIREMENTS = ['pytuya==7.0.4'] + +CONF_DEVICE_ID = 'device_id' +CONF_LOCAL_KEY = 'local_key' +CONF_PROTOCOL_VERSION = 'protocol_version' +# IMPORTANT, id is used as key for state and turning on and off, 1 was fine switched apparently but my bulbs need 20, other feature attributes count up from this, e.g. 21 mode, 22 brightnes etc, see my pytuya modification. +DEFAULT_ID = '1' +DEFAULT_PROTOCOL_VERSION = 3.3 +MIN_MIRED = 153 +MAX_MIRED = 370 +UPDATE_RETRY_LIMIT = 3 + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ + vol.Optional(CONF_ICON): cv.icon, + vol.Required(CONF_HOST): cv.string, + vol.Required(CONF_DEVICE_ID): cv.string, + vol.Required(CONF_LOCAL_KEY): cv.string, + vol.Required(CONF_NAME): cv.string, + vol.Required(CONF_PROTOCOL_VERSION, default=DEFAULT_PROTOCOL_VERSION): vol.Coerce(float), + vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string, +}) +log = logging.getLogger(__name__) +log.setLevel(level=logging.DEBUG) # Debug hack! + + +def setup_platform(hass, config, add_devices, discovery_info=None): + """Set up of the Tuya switch.""" + from . import pytuya + + lights = [] + pytuyadevice = pytuya.BulbDevice(config.get(CONF_DEVICE_ID), config.get(CONF_HOST), config.get(CONF_LOCAL_KEY)) + pytuyadevice.set_version(float(config.get(CONF_PROTOCOL_VERSION))) + + bulb_device = TuyaCache(pytuyadevice) + lights.append( + TuyaDevice( + bulb_device, + config.get(CONF_NAME), + config.get(CONF_ICON), + config.get(CONF_ID) + ) + ) + + add_devices(lights) + +class TuyaCache: + """Cache wrapper for pytuya.BulbDevice""" + + def __init__(self, device): + """Initialize the cache.""" + self._cached_status = '' + self._cached_status_time = 0 + self._device = device + self._lock = Lock() + + def __get_status(self, switchid): + for _ in range(UPDATE_RETRY_LIMIT): + try: + status = self._device.status()['dps'][switchid] + return status + except ConnectionError: + pass + except socket.timeout: + pass + log.warn( + "Failed to get status after {} tries".format(UPDATE_RETRY_LIMIT)) + + def set_status(self, state, switchid): + """Change the Tuya switch status and clear the cache.""" + self._cached_status = '' + self._cached_status_time = 0 + for _ in range(UPDATE_RETRY_LIMIT): + try: + return self._device.set_status(state, switchid) + except ConnectionError: + pass + except socket.timeout: + pass + log.warn( + "Failed to set status after {} tries".format(UPDATE_RETRY_LIMIT)) + + def status(self, switchid): + """Get state of Tuya switch and cache the results.""" + self._lock.acquire() + try: + now = time() + if not self._cached_status or now - self._cached_status_time > 30: + sleep(0.5) + self._cached_status = self.__get_status(switchid) + self._cached_status_time = time() + return self._cached_status + finally: + self._lock.release() + + def cached_status(self): + return self._cached_status + + def support_color(self): + return self._device.support_color() + + def support_color_temp(self): + return self._device.support_color_temp() + + def brightness(self): + for _ in range(UPDATE_RETRY_LIMIT): + try: + return self._device.brightness() + except ConnectionError: + pass + except socket.timeout: + pass + log.warn( + "Failed to get brightness after {} tries".format(UPDATE_RETRY_LIMIT)) + + def color_temp(self): + for _ in range(UPDATE_RETRY_LIMIT): + try: + return self._device.colourtemp() + except ConnectionError: + pass + except socket.timeout: + pass + log.warn( + "Failed to get color temp after {} tries".format(UPDATE_RETRY_LIMIT)) + + def set_brightness(self, brightness): + for _ in range(UPDATE_RETRY_LIMIT): + try: + return self._device.set_brightness(brightness) + except ConnectionError: + pass + except socket.timeout: + pass + log.warn( + "Failed to set brightness after {} tries".format(UPDATE_RETRY_LIMIT)) + + def set_color_temp(self, color_temp): + for _ in range(UPDATE_RETRY_LIMIT): + try: + return self._device.set_colourtemp(color_temp) + except ConnectionError: + pass + except socket.timeout: + pass + log.warn( + "Failed to set color temp after {} tries".format(UPDATE_RETRY_LIMIT)) + + def state(self): + self._device.state(); + + def turn_on(self): + self._device.turn_on(); + + def turn_off(self): + self._device.turn_off(); + +class TuyaDevice(Light): + """Representation of a Tuya switch.""" + + def __init__(self, device, name, icon, bulbid): + """Initialize the Tuya switch.""" + self._device = device + self._name = name + self._state = False + self._brightness = 127 + self._color_temp = 127 + self._icon = icon + self._bulb_id = bulbid + + @property + def name(self): + """Get name of Tuya switch.""" + return self._name + + @property + def is_on(self): + """Check if Tuya switch is on.""" + return self._state + + @property + def icon(self): + """Return the icon.""" + return self._icon + + def update(self): + """Get state of Tuya switch.""" + status = self._device.status(self._bulb_id) + self._state = status + try: + brightness = int(self._device.brightness()) + if brightness > 254: + brightness = 255 + if brightness < 25: + brightness = 25 + self._brightness = brightness + except TypeError: + pass + self._color_temp = self._device.color_temp() + + @property + def brightness(self): + """Return the brightness of the light.""" + return self._brightness + +# @property +# def hs_color(self): +# """Return the hs_color of the light.""" +# return (self._device.color_hsv()[0],self._device.color_hsv()[1]) + + @property + def color_temp(self): + """Return the color_temp of the light.""" + try: + return int(MAX_MIRED - (((MAX_MIRED - MIN_MIRED) / 255) * self._color_temp)) + except TypeError: + pass + + @property + def min_mireds(self): + """Return color temperature min mireds.""" + return MIN_MIRED + + @property + def max_mireds(self): + """Return color temperature max mireds.""" + return MAX_MIRED + + def turn_on(self, **kwargs): + """Turn on or control the light.""" + log.debug("Turning on, state: " + str(self._device.cached_status())) + if not self._device.cached_status(): + self._device.set_status(True, self._bulb_id) + if ATTR_BRIGHTNESS in kwargs: + converted_brightness = int(kwargs[ATTR_BRIGHTNESS]) + if converted_brightness <= 25: + converted_brightness = 25 + self._device.set_brightness(converted_brightness) + if ATTR_HS_COLOR in kwargs: + raise ValueError(" TODO implement RGB from HS") + if ATTR_COLOR_TEMP in kwargs: + color_temp = int(255 - (255 / (MAX_MIRED - MIN_MIRED)) * (int(kwargs[ATTR_COLOR_TEMP]) - MIN_MIRED)) + self._device.set_color_temp(color_temp) + + def turn_off(self, **kwargs): + """Turn Tuya switch off.""" + self._device.set_status(False, self._bulb_id) + + @property + def supported_features(self): + """Flag supported features.""" + supports = SUPPORT_BRIGHTNESS + #if self._device.support_color(): + # supports = supports | SUPPORT_COLOR + supports = supports | SUPPORT_COLOR_TEMP + return supports diff --git a/custom_components/localtuya/pytuya/__init__.py b/custom_components/localtuya/pytuya/__init__.py index 18be395..0ad7fdd 100644 --- a/custom_components/localtuya/pytuya/__init__.py +++ b/custom_components/localtuya/pytuya/__init__.py @@ -1,529 +1,651 @@ -# Python module to interface with Shenzhen Xenon ESP8266MOD WiFi smart devices -# E.g. https://wikidevi.com/wiki/Xenon_SM-PW701U -# SKYROKU SM-PW701U Wi-Fi Plug Smart Plug -# Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging Works with Alexa -# -# This would not exist without the protocol reverse engineering from -# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes -# -# Tested with Python 2.7 and Python 3.6.1 only - - -import base64 -from hashlib import md5 -import json -import logging -import socket -import sys -import time -import colorsys - -try: - #raise ImportError - import Crypto - from Crypto.Cipher import AES # PyCrypto -except ImportError: - Crypto = AES = None - import pyaes # https://github.com/ricmoo/pyaes - - -version_tuple = (7, 0, 2) -version = version_string = __version__ = '%d.%d.%d' % version_tuple -__author__ = 'clach04' - -log = logging.getLogger(__name__) -logging.basicConfig() # TODO include function name/line numbers in log -#log.setLevel(level=logging.DEBUG) # Debug hack! - -log.info('Python %s on %s', sys.version, sys.platform) -if Crypto is None: - log.info('Using pyaes version %r', pyaes.VERSION) - log.info('Using pyaes from %r', pyaes.__file__) -else: - log.info('Using PyCrypto %r', Crypto.version_info) - log.info('Using PyCrypto from %r', Crypto.__file__) - -SET = 'set' - -PROTOCOL_VERSION_BYTES = b'3.1' - -IS_PY2 = sys.version_info[0] == 2 - -class AESCipher(object): - def __init__(self, key): - #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ - self.bs = 16 - self.key = key - def encrypt(self, raw): - if Crypto: - raw = self._pad(raw) - cipher = AES.new(self.key, mode=AES.MODE_ECB) - crypted_text = cipher.encrypt(raw) - else: - _ = self._pad(raw) - cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 - crypted_text = cipher.feed(raw) - crypted_text += cipher.feed() # flush final block - #print('crypted_text %r' % crypted_text) - #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) - crypted_text_b64 = base64.b64encode(crypted_text) - #print('crypted_text_b64 (%d) %r' % (len(crypted_text_b64), crypted_text_b64)) - return crypted_text_b64 - def decrypt(self, enc): - enc = base64.b64decode(enc) - #print('enc (%d) %r' % (len(enc), enc)) - #enc = self._unpad(enc) - #enc = self._pad(enc) - #print('upadenc (%d) %r' % (len(enc), enc)) - if Crypto: - cipher = AES.new(self.key, AES.MODE_ECB) - raw = cipher.decrypt(enc) - #print('raw (%d) %r' % (len(raw), raw)) - return self._unpad(raw).decode('utf-8') - #return self._unpad(cipher.decrypt(enc)).decode('utf-8') - else: - cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 - plain_text = cipher.feed(enc) - plain_text += cipher.feed() # flush final block - return plain_text - def _pad(self, s): - padnum = self.bs - len(s) % self.bs - return s + padnum * chr(padnum).encode() - @staticmethod - def _unpad(s): - return s[:-ord(s[len(s)-1:])] - - -def bin2hex(x, pretty=False): - if pretty: - space = ' ' - else: - space = '' - if IS_PY2: - result = ''.join('%02X%s' % (ord(y), space) for y in x) - else: - result = ''.join('%02X%s' % (y, space) for y in x) - return result - - -def hex2bin(x): - if IS_PY2: - return x.decode('hex') - else: - return bytes.fromhex(x) - -# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi -payload_dict = { - "device": { - "status": { - "hexByte": "0a", - "command": {"gwId": "", "devId": ""} - }, - "set": { - "hexByte": "07", - "command": {"devId": "", "uid": "", "t": ""} - }, - "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) - "suffix": "000000000000aa55" - } -} - -class XenonDevice(object): - def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10): - """ - Represents a Tuya device. - - Args: - dev_id (str): The device id. - address (str): The network address. - local_key (str, optional): The encryption key. Defaults to None. - dev_type (str, optional): The device type. - It will be used as key for lookups in payload_dict. - Defaults to None. - - Attributes: - port (int): The port to connect to. - """ - self.id = dev_id - self.address = address - self.local_key = local_key - self.local_key = local_key.encode('latin1') - self.dev_type = dev_type - self.connection_timeout = connection_timeout - - self.port = 6668 # default - do not expect caller to pass in - - def __repr__(self): - return '%r' % ((self.id, self.address),) # FIXME can do better than this - - def _send_receive(self, payload): - """ - Send single buffer `payload` and receive a single buffer. - - Args: - payload(bytes): Data to send. - """ - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - s.settimeout(self.connection_timeout) - s.connect((self.address, self.port)) - s.send(payload) - data = s.recv(1024) - s.close() - return data - - def generate_payload(self, command, data=None): - """ - Generate the payload to send. - - Args: - command(str): The type of command. - This is one of the entries from payload_dict - data(dict, optional): The data to be send. - This is what will be passed via the 'dps' entry - """ - json_data = payload_dict[self.dev_type][command]['command'] - - if 'gwId' in json_data: - json_data['gwId'] = self.id - if 'devId' in json_data: - json_data['devId'] = self.id - if 'uid' in json_data: - json_data['uid'] = self.id # still use id, no seperate uid - if 't' in json_data: - json_data['t'] = str(int(time.time())) - - if data is not None: - json_data['dps'] = data - - # Create byte buffer from hex data - json_payload = json.dumps(json_data) - #print(json_payload) - json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! - json_payload = json_payload.encode('utf-8') - log.debug('json_payload=%r', json_payload) - - if command == SET: - # need to encrypt - #print('json_payload %r' % json_payload) - self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new - json_payload = self.cipher.encrypt(json_payload) - #print('crypted json_payload %r' % json_payload) - preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key - #print('preMd5String %r' % preMd5String) - m = md5() - m.update(preMd5String) - #print(repr(m.digest())) - hexdigest = m.hexdigest() - #print(hexdigest) - #print(hexdigest[8:][:16]) - json_payload = PROTOCOL_VERSION_BYTES + hexdigest[8:][:16].encode('latin1') + json_payload - #print('data_to_send') - #print(json_payload) - #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) - #print('json_payload %r' % repr(json_payload)) - #print('json_payload len %r' % len(json_payload)) - #print(bin2hex(json_payload)) - self.cipher = None # expect to connect and then disconnect to set new - - - postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) - #print('postfix_payload %r' % postfix_payload) - #print('postfix_payload %r' % len(postfix_payload)) - #print('postfix_payload %x' % len(postfix_payload)) - #print('postfix_payload %r' % hex(len(postfix_payload))) - assert len(postfix_payload) <= 0xff - postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) - buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + - payload_dict[self.dev_type][command]['hexByte'] + - '000000' + - postfix_payload_hex_len ) + postfix_payload - #print('command', command) - #print('prefix') - #print(payload_dict[self.dev_type][command]['prefix']) - #print(repr(buffer)) - #print(bin2hex(buffer, pretty=True)) - #print(bin2hex(buffer, pretty=False)) - #print('full buffer(%d) %r' % (len(buffer), buffer)) - return buffer - -class Device(XenonDevice): - def __init__(self, dev_id, address, local_key=None, dev_type=None): - super(Device, self).__init__(dev_id, address, local_key, dev_type) - - def status(self): - log.debug('status() entry') - # open device, send request, then close connection - payload = self.generate_payload('status') - - data = self._send_receive(payload) - log.debug('status received data=%r', data) - - result = data[20:-8] # hard coded offsets - log.debug('result=%r', result) - #result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer - #print('result %r' % result) - if result.startswith(b'{'): - # this is the regular expected code path - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - elif result.startswith(PROTOCOL_VERSION_BYTES): - # got an encrypted payload, happens occasionally - # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} - # NOTE dps.2 may or may not be present - result = result[len(PROTOCOL_VERSION_BYTES):] # remove version header - result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload - cipher = AESCipher(self.local_key) - result = cipher.decrypt(result) - log.debug('decrypted result=%r', result) - if not isinstance(result, str): - result = result.decode() - result = json.loads(result) - else: - log.error('Unexpected status() payload=%r', result) - - return result - - def set_status(self, on, switch=1): - """ - Set status of the device to 'on' or 'off'. - - Args: - on(bool): True for 'on', False for 'off'. - switch(int): The switch to set - """ - # open device, send request, then close connection - if isinstance(switch, int): - switch = str(switch) # index and payload is a string - payload = self.generate_payload(SET, {switch:on}) - #print('payload %r' % payload) - - data = self._send_receive(payload) - log.debug('set_status received data=%r', data) - - return data - - def turn_on(self, switch=1): - """Turn the device on""" - self.set_status(True, switch) - - def turn_off(self, switch=1): - """Turn the device off""" - self.set_status(False, switch) - - def set_timer(self, num_secs): - """ - Set a timer. - - Args: - num_secs(int): Number of seconds - """ - # FIXME / TODO support schemas? Accept timer id number as parameter? - - # Dumb heuristic; Query status, pick last device id as that is probably the timer - status = self.status() - devices = status['dps'] - devices_numbers = list(devices.keys()) - devices_numbers.sort() - dps_id = devices_numbers[-1] - - payload = self.generate_payload(SET, {dps_id:num_secs}) - - data = self._send_receive(payload) - log.debug('set_timer received data=%r', data) - return data - -class OutletDevice(Device): - def __init__(self, dev_id, address, local_key=None): - dev_type = 'device' - super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) - -class BulbDevice(Device): - DPS_INDEX_ON = '1' - DPS_INDEX_MODE = '2' - DPS_INDEX_BRIGHTNESS = '3' - DPS_INDEX_COLOURTEMP = '4' - DPS_INDEX_COLOUR = '5' - - DPS = 'dps' - DPS_MODE_COLOUR = 'colour' - DPS_MODE_WHITE = 'white' - - def __init__(self, dev_id, address, local_key=None): - dev_type = 'device' - super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) - - @staticmethod - def _rgb_to_hexvalue(r, g, b): - """ - Convert an RGB value to the hex representation expected by tuya. - - Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: - rrggbb0hhhssvv - - While r, g and b are just hexadecimal values of the corresponding - Red, Green and Blue values, the h, s and v values (which are values - between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. - - Args: - r(int): Value for the colour red as int from 0-255. - g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - rgb = [r,g,b] - hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255) - - hexvalue = "" - for value in rgb: - temp = str(hex(int(value))).replace("0x","") - if len(temp) == 1: - temp = "0" + temp - hexvalue = hexvalue + temp - - hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] - hexvalue_hsv = "" - for value in hsvarray: - temp = str(hex(int(value))).replace("0x","") - if len(temp) == 1: - temp = "0" + temp - hexvalue_hsv = hexvalue_hsv + temp - if len(hexvalue_hsv) == 7: - hexvalue = hexvalue + "0" + hexvalue_hsv - else: - hexvalue = hexvalue + "00" + hexvalue_hsv - - return hexvalue - - @staticmethod - def _hexvalue_to_rgb(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into - an RGB value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - r = int(hexvalue[0:2], 16) - g = int(hexvalue[2:4], 16) - b = int(hexvalue[4:6], 16) - - return (r, g, b) - - @staticmethod - def _hexvalue_to_hsv(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into - an HSV value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - h = int(hexvalue[7:10], 16) / 360 - s = int(hexvalue[10:12], 16) / 255 - v = int(hexvalue[12:14], 16) / 255 - - return (h, s, v) - - def set_colour(self, r, g, b): - """ - Set colour of an rgb bulb. - - Args: - r(int): Value for the colour red as int from 0-255. - g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - if not 0 <= r <= 255: - raise ValueError("The value for red needs to be between 0 and 255.") - if not 0 <= g <= 255: - raise ValueError("The value for green needs to be between 0 and 255.") - if not 0 <= b <= 255: - raise ValueError("The value for blue needs to be between 0 and 255.") - - print(BulbDevice) - hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) - - payload = self.generate_payload(SET, { - self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR, - self.DPS_INDEX_COLOUR: hexvalue}) - data = self._send_receive(payload) - return data - - def set_white(self, brightness, colourtemp): - """ - Set white coloured theme of an rgb bulb. - - Args: - brightness(int): Value for the brightness (25-255). - colourtemp(int): Value for the colour temperature (0-255). - """ - if not 25 <= brightness <= 255: - raise ValueError("The brightness needs to be between 25 and 255.") - if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") - - payload = self.generate_payload(SET, { - self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, - self.DPS_INDEX_BRIGHTNESS: brightness, - self.DPS_INDEX_COLOURTEMP: colourtemp}) - - data = self._send_receive(payload) - return data - - def set_brightness(self, brightness): - """ - Set the brightness value of an rgb bulb. - - Args: - brightness(int): Value for the brightness (25-255). - """ - if not 25 <= brightness <= 255: - raise ValueError("The brightness needs to be between 25 and 255.") - - payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness}) - data = self._send_receive(payload) - return data - - def set_colourtemp(self, colourtemp): - """ - Set the colour temperature of an rgb bulb. - - Args: - colourtemp(int): Value for the colour temperature (0-255). - """ - if not 0 <= colourtemp <= 255: - raise ValueError("The colour temperature needs to be between 0 and 255.") - - payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp}) - data = self._send_receive(payload) - return data - - def brightness(self): - """Return brightness value""" - return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] - - def colourtemp(self): - """Return colour temperature""" - return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] - - def colour_rgb(self): - """Return colour as RGB value""" - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_rgb(hexvalue) - - def colour_hsv(self): - """Return colour as HSV value""" - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_hsv(hexvalue) - - def state(self): - status = self.status() - state = { - 'is_on' : status[self.DPS][self.DPS_INDEX_ON], - 'mode' : status[self.DPS][self.DPS_INDEX_MODE], - 'brightness' : status[self.DPS][self.DPS_INDEX_BRIGHTNESS], - 'colourtemp' : status[self.DPS][self.DPS_INDEX_COLOURTEMP], - 'colour' : status[self.DPS][self.DPS_INDEX_COLOUR], - } - return state +# Python module to interface locally with Tuya WiFi smart devices (switches, lights and covers) +# +# Developed by merging the code from: +# https://github.com/codetheweb/tuyapi by codetheweb and blackrozes +# https://github.com/mileperhour/localtuya-homeassistant +# https://github.com/NameLessJedi/localtuya-homeassistant +# +# Tested with Python 2.7 and Python 3.6.1 only + + +import base64 +from hashlib import md5 +import json +import logging +import socket +import sys +import time +import colorsys +import binascii + +try: + #raise ImportError + import Crypto + from Crypto.Cipher import AES # PyCrypto +except ImportError: + Crypto = AES = None + import pyaes # https://github.com/ricmoo/pyaes + + +version_tuple = (7, 0, 7) +version = version_string = __version__ = '%d.%d.%d' % version_tuple +__author__ = 'rospogrigio' + +log = logging.getLogger(__name__) +logging.basicConfig() # TODO include function name/line numbers in log +log.setLevel(level=logging.DEBUG) # Debug hack! + +log.info('%s version %s', __name__, version) +log.info('Python %s on %s', sys.version, sys.platform) +if Crypto is None: + log.info('Using pyaes version %r', pyaes.VERSION) + log.info('Using pyaes from %r', pyaes.__file__) +else: + log.info('Using PyCrypto %r', Crypto.version_info) + log.info('Using PyCrypto from %r', Crypto.__file__) + +SET = 'set' +STATUS = 'status' + +PROTOCOL_VERSION_BYTES_31 = b'3.1' +PROTOCOL_VERSION_BYTES_33 = b'3.3' + +IS_PY2 = sys.version_info[0] == 2 + +class AESCipher(object): + def __init__(self, key): + #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ + self.bs = 16 + self.key = key + def encrypt(self, raw, use_base64 = True): + if Crypto: + raw = self._pad(raw) + cipher = AES.new(self.key, mode=AES.MODE_ECB) + crypted_text = cipher.encrypt(raw) + else: + _ = self._pad(raw) + cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + crypted_text = cipher.feed(raw) + crypted_text += cipher.feed() # flush final block + #print('crypted_text %r' % crypted_text) + #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) + if use_base64: + return base64.b64encode(crypted_text) + else: + return crypted_text + + def decrypt(self, enc, use_base64=True): + if use_base64: + enc = base64.b64decode(enc) + #print('enc (%d) %r' % (len(enc), enc)) + #enc = self._unpad(enc) + #enc = self._pad(enc) + #print('upadenc (%d) %r' % (len(enc), enc)) + if Crypto: + cipher = AES.new(self.key, AES.MODE_ECB) + raw = cipher.decrypt(enc) + #print('raw (%d) %r' % (len(raw), raw)) + return self._unpad(raw).decode('utf-8') + #return self._unpad(cipher.decrypt(enc)).decode('utf-8') + else: + cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + plain_text = cipher.feed(enc) + plain_text += cipher.feed() # flush final block + return plain_text + def _pad(self, s): + padnum = self.bs - len(s) % self.bs + return s + padnum * chr(padnum).encode() + @staticmethod + def _unpad(s): + return s[:-ord(s[len(s)-1:])] + + +def bin2hex(x, pretty=False): + if pretty: + space = ' ' + else: + space = '' + if IS_PY2: + result = ''.join('%02X%s' % (ord(y), space) for y in x) + else: + result = ''.join('%02X%s' % (y, space) for y in x) + return result + + +def hex2bin(x): + if IS_PY2: + return x.decode('hex') + else: + return bytes.fromhex(x) + +# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi +payload_dict = { + "device": { + "status": { + "hexByte": "0a", + "command": {"gwId": "", "devId": ""} + }, + "set": { + "hexByte": "07", + "command": {"devId": "", "uid": "", "t": ""} + }, + "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) + "suffix": "000000000000aa55" + }, + "cover": { + "status": { + "hexByte": "0d", + "command": {"devId": "", "uid": "", "t": ""} + }, + "set": { + "hexByte": "07", + "command": {"devId": "", "uid": "", "t": ""} + }, + "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) + "suffix": "000000000000aa55" + } +} + +class XenonDevice(object): + def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10): + """ + Represents a Tuya device. + + Args: + dev_id (str): The device id. + address (str): The network address. + local_key (str, optional): The encryption key. Defaults to None. + dev_type (str, optional): The device type. + It will be used as key for lookups in payload_dict. + Defaults to None. + + Attributes: + port (int): The port to connect to. + """ + self.id = dev_id + self.address = address + self.local_key = local_key + self.local_key = local_key.encode('latin1') + self.dev_type = dev_type + self.connection_timeout = connection_timeout + self.version = 3.1 + + self.port = 6668 # default - do not expect caller to pass in + + def __repr__(self): + return '%r' % ((self.id, self.address),) # FIXME can do better than this + + def _send_receive(self, payload, times=1): + """ + Send single buffer `payload` and receive a single buffer. + + Args: + payload(bytes): Data to send. + """ + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + s.settimeout(self.connection_timeout) + s.connect((self.address, self.port)) + s.send(payload) + data = s.recv(1024) + #print("FIRST: Received %d bytes" % len(data) ) + if times > 1: + time.sleep(0.1) + data = s.recv(1024) + #print("SECOND: Received %d bytes" % len(data) ) + s.close() + return data + + def set_version(self, version): + self.version = version + + def generate_payload(self, command, data=None): + """ + Generate the payload to send. + + Args: + command(str): The type of command. + This is one of the entries from payload_dict + data(dict, optional): The data to be send. + This is what will be passed via the 'dps' entry + """ + json_data = payload_dict[self.dev_type][command]['command'] + command_hb = payload_dict[self.dev_type][command]['hexByte'] + + if 'gwId' in json_data: + json_data['gwId'] = self.id + if 'devId' in json_data: + json_data['devId'] = self.id + if 'uid' in json_data: + json_data['uid'] = self.id # still use id, no seperate uid + if 't' in json_data: + json_data['t'] = str(int(time.time())) + + if data is not None: + json_data['dps'] = data + if command_hb == '0d': + json_data['dps'] = {"1": None} + + # Create byte buffer from hex data + json_payload = json.dumps(json_data) + #print(json_payload) + json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! + json_payload = json_payload.encode('utf-8') + log.debug('json_payload=%r', json_payload) + #print('json_payload = ', json_payload, ' cmd = ', command_hb) + + if self.version == 3.3: + self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new + json_payload = self.cipher.encrypt(json_payload, False) + self.cipher = None + if command_hb != '0a': + # add the 3.3 header + json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload + elif command == SET: + # need to encrypt + #print('json_payload %r' % json_payload) + self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new + json_payload = self.cipher.encrypt(json_payload) + #print('crypted json_payload %r' % json_payload) + preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key + #print('preMd5String %r' % preMd5String) + m = md5() + m.update(preMd5String) + #print(repr(m.digest())) + hexdigest = m.hexdigest() + #print(hexdigest) + #print(hexdigest[8:][:16]) + json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload + #print('data_to_send') + #print(json_payload) + #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) + #print('json_payload %r' % repr(json_payload)) + #print('json_payload len %r' % len(json_payload)) + #print(bin2hex(json_payload)) + self.cipher = None # expect to connect and then disconnect to set new + + + postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) + #print('postfix_payload %r' % postfix_payload) + #print('postfix_payload %r' % len(postfix_payload)) + #print('postfix_payload %x' % len(postfix_payload)) + #print('postfix_payload %r' % hex(len(postfix_payload))) + assert len(postfix_payload) <= 0xff + postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) + buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + + payload_dict[self.dev_type][command]['hexByte'] + + '000000' + + postfix_payload_hex_len ) + postfix_payload + + # calc the CRC of everything except where the CRC goes and the suffix + hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X') + buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:] + #print('command', command) + #print('prefix') + #print(payload_dict[self.dev_type][command]['prefix']) + #print(repr(buffer)) + #print(bin2hex(buffer, pretty=False)) + #print('full buffer(%d) %r' % (len(buffer), bin2hex(buffer, pretty=True) )) + #print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer))) + return buffer + +class Device(XenonDevice): + def __init__(self, dev_id, address, local_key=None, dev_type=None): + super(Device, self).__init__(dev_id, address, local_key, dev_type) + + def status(self): + if self.dev_type == 'cover': + recv_times = 2 + else: + recv_times = 1 + log.debug('status() entry (dev_type is %s, recv_times = %d)', self.dev_type, recv_times) + # open device, send request, then close connection + payload = self.generate_payload('status') + + data = self._send_receive(payload, recv_times) + log.debug('status received data=%r', data) + + result = data[20:-8] # hard coded offsets + if self.dev_type == 'cover': + result = result[15:] + + log.debug('result=%r', result) + #result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer + #print('result %r' % result) + if result.startswith(b'{'): + # this is the regular expected code path + if not isinstance(result, str): + result = result.decode() + result = json.loads(result) + elif result.startswith(PROTOCOL_VERSION_BYTES_31): + # got an encrypted payload, happens occasionally + # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} + # NOTE dps.2 may or may not be present + result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header + result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload + cipher = AESCipher(self.local_key) + result = cipher.decrypt(result) + log.debug('decrypted result=%r', result) + if not isinstance(result, str): + result = result.decode() + result = json.loads(result) + elif self.version == 3.3: + cipher = AESCipher(self.local_key) + result = cipher.decrypt(result, False) + log.debug('decrypted result=%r', result) + if not isinstance(result, str): + result = result.decode() + result = json.loads(result) + else: + log.error('Unexpected status() payload=%r', result) + +# if self.dev_type == 'cover': +# result = result.encode('utf-8') +# log.debug('encoded result=%r', result) + return result + + def set_status(self, on, switch=1): + """ + Set status of the device to 'on' or 'off'. + + Args: + on(bool): True for 'on', False for 'off'. + switch(int): The switch to set + """ + # open device, send request, then close connection + if isinstance(switch, int): + switch = str(switch) # index and payload is a string + payload = self.generate_payload(SET, {switch:on}) + #print('payload %r' % payload) + + data = self._send_receive(payload) + log.debug('set_status received data=%r', data) + + return data + + def set_value(self, index, value): + """ + Set int value of any index. + + Args: + index(int): index to set + value(int): new value for the index + """ + # open device, send request, then close connection + if isinstance(index, int): + index = str(index) # index and payload is a string + + payload = self.generate_payload(SET, { + index: value}) + + data = self._send_receive(payload) + + return data + + def turn_on(self, switch=1): + """Turn the device on""" + self.set_status(True, switch) + + def turn_off(self, switch=1): + """Turn the device off""" + self.set_status(False, switch) + + def set_timer(self, num_secs): + """ + Set a timer. + + Args: + num_secs(int): Number of seconds + """ + # FIXME / TODO support schemas? Accept timer id number as parameter? + + # Dumb heuristic; Query status, pick last device id as that is probably the timer + status = self.status() + devices = status['dps'] + devices_numbers = list(devices.keys()) + devices_numbers.sort() + dps_id = devices_numbers[-1] + + payload = self.generate_payload(SET, {dps_id:num_secs}) + + data = self._send_receive(payload) + log.debug('set_timer received data=%r', data) + return data + +class OutletDevice(Device): + def __init__(self, dev_id, address, local_key=None): + dev_type = 'device' + super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) + + +class CoverDevice(Device): + DPS_INDEX_MOVE = '1' + DPS_INDEX_BL = '101' + + DPS = 'dps' + + DPS_2_STATE = { + '1':'movement', + '101':'backlight', + } + + def __init__(self, dev_id, address, local_key=None): + dev_type = 'cover' + print('%s version %s' % ( __name__, version)) + print('Python %s on %s' % (sys.version, sys.platform)) + if Crypto is None: + print('Using pyaes version ', pyaes.VERSION) + print('Using pyaes from ', pyaes.__file__) + else: + print('Using PyCrypto ', Crypto.version_info) + print('Using PyCrypto from ', Crypto.__file__) + super(CoverDevice, self).__init__(dev_id, address, local_key, dev_type) + + def open_cover(self, switch=1): + """Turn the device on""" + self.set_status('on', switch) + + def close_cover(self, switch=1): + """Turn the device off""" + self.set_status('off', switch) + + def stop_cover(self, switch=1): + """Turn the device off""" + self.set_status('stop', switch) + + +class BulbDevice(Device): + DPS_INDEX_ON = '1' + DPS_INDEX_MODE = '2' + DPS_INDEX_BRIGHTNESS = '3' + DPS_INDEX_COLOURTEMP = '4' + DPS_INDEX_COLOUR = '5' + + DPS = 'dps' + DPS_MODE_COLOUR = 'colour' + DPS_MODE_WHITE = 'white' + + DPS_2_STATE = { + '1':'is_on', + '2':'mode', + '3':'brightness', + '4':'colourtemp', + '5':'colour', + } + + def __init__(self, dev_id, address, local_key=None): + dev_type = 'device' + super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) + + @staticmethod + def _rgb_to_hexvalue(r, g, b): + """ + Convert an RGB value to the hex representation expected by tuya. + + Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: + rrggbb0hhhssvv + + While r, g and b are just hexadecimal values of the corresponding + Red, Green and Blue values, the h, s and v values (which are values + between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. + + Args: + r(int): Value for the colour red as int from 0-255. + g(int): Value for the colour green as int from 0-255. + b(int): Value for the colour blue as int from 0-255. + """ + rgb = [r,g,b] + hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255) + + hexvalue = "" + for value in rgb: + temp = str(hex(int(value))).replace("0x","") + if len(temp) == 1: + temp = "0" + temp + hexvalue = hexvalue + temp + + hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] + hexvalue_hsv = "" + for value in hsvarray: + temp = str(hex(int(value))).replace("0x","") + if len(temp) == 1: + temp = "0" + temp + hexvalue_hsv = hexvalue_hsv + temp + if len(hexvalue_hsv) == 7: + hexvalue = hexvalue + "0" + hexvalue_hsv + else: + hexvalue = hexvalue + "00" + hexvalue_hsv + + return hexvalue + + @staticmethod + def _hexvalue_to_rgb(hexvalue): + """ + Converts the hexvalue used by tuya for colour representation into + an RGB value. + + Args: + hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() + """ + r = int(hexvalue[0:2], 16) + g = int(hexvalue[2:4], 16) + b = int(hexvalue[4:6], 16) + + return (r, g, b) + + @staticmethod + def _hexvalue_to_hsv(hexvalue): + """ + Converts the hexvalue used by tuya for colour representation into + an HSV value. + + Args: + hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() + """ + h = int(hexvalue[7:10], 16) / 360 + s = int(hexvalue[10:12], 16) / 255 + v = int(hexvalue[12:14], 16) / 255 + + return (h, s, v) + + def set_colour(self, r, g, b): + """ + Set colour of an rgb bulb. + + Args: + r(int): Value for the colour red as int from 0-255. + g(int): Value for the colour green as int from 0-255. + b(int): Value for the colour blue as int from 0-255. + """ + if not 0 <= r <= 255: + raise ValueError("The value for red needs to be between 0 and 255.") + if not 0 <= g <= 255: + raise ValueError("The value for green needs to be between 0 and 255.") + if not 0 <= b <= 255: + raise ValueError("The value for blue needs to be between 0 and 255.") + + #print(BulbDevice) + hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) + + payload = self.generate_payload(SET, { + self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR, + self.DPS_INDEX_COLOUR: hexvalue}) + data = self._send_receive(payload) + return data + + def set_white(self, brightness, colourtemp): + """ + Set white coloured theme of an rgb bulb. + + Args: + brightness(int): Value for the brightness (25-255). + colourtemp(int): Value for the colour temperature (0-255). + """ + if not 25 <= brightness <= 255: + raise ValueError("The brightness needs to be between 25 and 255.") + if not 0 <= colourtemp <= 255: + raise ValueError("The colour temperature needs to be between 0 and 255.") + + payload = self.generate_payload(SET, { + self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, + self.DPS_INDEX_BRIGHTNESS: brightness, + self.DPS_INDEX_COLOURTEMP: colourtemp}) + + data = self._send_receive(payload) + return data + + def set_brightness(self, brightness): + """ + Set the brightness value of an rgb bulb. + + Args: + brightness(int): Value for the brightness (25-255). + """ + if not 25 <= brightness <= 255: + raise ValueError("The brightness needs to be between 25 and 255.") + + payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness}) + data = self._send_receive(payload) + return data + + def set_colourtemp(self, colourtemp): + """ + Set the colour temperature of an rgb bulb. + + Args: + colourtemp(int): Value for the colour temperature (0-255). + """ + if not 0 <= colourtemp <= 255: + raise ValueError("The colour temperature needs to be between 0 and 255.") + + payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp}) + data = self._send_receive(payload) + return data + + def brightness(self): + """Return brightness value""" + return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] + + def colourtemp(self): + """Return colour temperature""" + return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] + + def colour_rgb(self): + """Return colour as RGB value""" + hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] + return BulbDevice._hexvalue_to_rgb(hexvalue) + + def colour_hsv(self): + """Return colour as HSV value""" + hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] + return BulbDevice._hexvalue_to_hsv(hexvalue) + + def state(self): + status = self.status() + state = {} + + for key in status[self.DPS].keys(): + if(int(key)<=5): + state[self.DPS_2_STATE[key]]=status[self.DPS][key] + + return state diff --git a/custom_components/localtuya/switch.py b/custom_components/localtuya/switch.py index c9c805f..76e2be0 100644 --- a/custom_components/localtuya/switch.py +++ b/custom_components/localtuya/switch.py @@ -1,168 +1,175 @@ -""" -Simple platform to control **SOME** Tuya switch devices. - -For more details about this platform, please refer to the documentation at -https://home-assistant.io/components/switch.tuya/ -""" -import voluptuous as vol -from homeassistant.components.switch import SwitchDevice, PLATFORM_SCHEMA -from homeassistant.const import (CONF_NAME, CONF_HOST, CONF_ID, CONF_SWITCHES, CONF_FRIENDLY_NAME, CONF_ICON) -import homeassistant.helpers.config_validation as cv -from time import time -from threading import Lock - -REQUIREMENTS = ['pytuya==7.0.4'] - -CONF_DEVICE_ID = 'device_id' -CONF_LOCAL_KEY = 'local_key' - -DEFAULT_ID = '1' - -ATTR_CURRENT = 'current' -ATTR_CURRENT_CONSUMPTION = 'current_consumption' -ATTR_VOLTAGE = 'voltage' - -SWITCH_SCHEMA = vol.Schema({ - vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string, - vol.Optional(CONF_FRIENDLY_NAME): cv.string, -}) - -PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ - vol.Optional(CONF_NAME): cv.string, - vol.Optional(CONF_ICON): cv.icon, - vol.Required(CONF_HOST): cv.string, - vol.Required(CONF_DEVICE_ID): cv.string, - vol.Required(CONF_LOCAL_KEY): cv.string, - vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string, - vol.Optional(CONF_SWITCHES, default={}): - vol.Schema({cv.slug: SWITCH_SCHEMA}), -}) - - -def setup_platform(hass, config, add_devices, discovery_info=None): - """Set up of the Tuya switch.""" - import pytuya - - devices = config.get(CONF_SWITCHES) - switches = [] - - outlet_device = TuyaCache( - pytuya.OutletDevice( - config.get(CONF_DEVICE_ID), - config.get(CONF_HOST), - config.get(CONF_LOCAL_KEY) - ) - ) - - for object_id, device_config in devices.items(): - switches.append( - TuyaDevice( - outlet_device, - device_config.get(CONF_FRIENDLY_NAME, object_id), - device_config.get(CONF_ICON), - device_config.get(CONF_ID) - ) - ) - - name = config.get(CONF_NAME) - if name: - switches.append( - TuyaDevice( - outlet_device, - name, - config.get(CONF_ICON), - config.get(CONF_ID) - ) - ) - - add_devices(switches) - -class TuyaCache: - """Cache wrapper for pytuya.OutletDevice""" - - def __init__(self, device): - """Initialize the cache.""" - self._cached_status = '' - self._cached_status_time = 0 - self._device = device - self._lock = Lock() - - def __get_status(self): - for i in range(3): - try: - status = self._device.status() - return status - except ConnectionError: - if i+1 == 3: - raise ConnectionError("Failed to update status.") - - def set_status(self, state, switchid): - """Change the Tuya switch status and clear the cache.""" - self._cached_status = '' - self._cached_status_time = 0 - return self._device.set_status(state, switchid) - - def status(self): - """Get state of Tuya switch and cache the results.""" - self._lock.acquire() - try: - now = time() - if not self._cached_status or now - self._cached_status_time > 20: - self._cached_status = self.__get_status() - self._cached_status_time = time() - return self._cached_status - finally: - self._lock.release() - -class TuyaDevice(SwitchDevice): - """Representation of a Tuya switch.""" - - def __init__(self, device, name, icon, switchid): - """Initialize the Tuya switch.""" - self._device = device - self._name = name - self._state = False - self._icon = icon - self._switchid = switchid - self._status = self._device.status() - - @property - def name(self): - """Get name of Tuya switch.""" - return self._name - - @property - def is_on(self): - """Check if Tuya switch is on.""" - return self._state - - @property - def device_state_attributes(self): - attrs = {} - try: - attrs[ATTR_CURRENT] = "{}".format(self._status['dps']['104']) - attrs[ATTR_CURRENT_CONSUMPTION] = "{}".format(self._status['dps']['105']/10) - attrs[ATTR_VOLTAGE] = "{}".format(self._status['dps']['106']/10) - except KeyError: - pass - return attrs - - @property - def icon(self): - """Return the icon.""" - return self._icon - - def turn_on(self, **kwargs): - """Turn Tuya switch on.""" - self._device.set_status(True, self._switchid) - - def turn_off(self, **kwargs): - """Turn Tuya switch off.""" - self._device.set_status(False, self._switchid) - - def update(self): - """Get state of Tuya switch.""" - status = self._device.status() - self._status= status - self._state = status['dps'][self._switchid] - +""" +Simple platform to control LOCALLY Tuya switch devices. + +Sample config yaml + +switch: + - platform: localtuya + host: 192.168.0.1 + local_key: 1234567891234567 + device_id: 12345678912345671234 + name: tuya_01 + protocol_version: 3.3 +""" +import voluptuous as vol +from homeassistant.components.switch import SwitchDevice, PLATFORM_SCHEMA +from homeassistant.const import (CONF_HOST, CONF_ID, CONF_SWITCHES, CONF_FRIENDLY_NAME, CONF_ICON, CONF_NAME) +import homeassistant.helpers.config_validation as cv +from time import time, sleep +from threading import Lock + +REQUIREMENTS = ['pytuya==7.0.7'] + +CONF_DEVICE_ID = 'device_id' +CONF_LOCAL_KEY = 'local_key' +CONF_PROTOCOL_VERSION = 'protocol_version' +CONF_CURRENT = 'current' +CONF_CURRENT_CONSUMPTION = 'current_consumption' +CONF_VOLTAGE = 'voltage' + +DEFAULT_ID = '1' +DEFAULT_PROTOCOL_VERSION = 3.3 + +ATTR_CURRENT = 'current' +ATTR_CURRENT_CONSUMPTION = 'current_consumption' +ATTR_VOLTAGE = 'voltage' + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ + vol.Optional(CONF_ICON): cv.icon, + vol.Required(CONF_HOST): cv.string, + vol.Required(CONF_DEVICE_ID): cv.string, + vol.Required(CONF_LOCAL_KEY): cv.string, + vol.Required(CONF_NAME): cv.string, + vol.Required(CONF_PROTOCOL_VERSION, default=DEFAULT_PROTOCOL_VERSION): vol.Coerce(float), + vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string, + vol.Optional(CONF_CURRENT, default='4'): cv.string, + vol.Optional(CONF_CURRENT_CONSUMPTION, default='5'): cv.string, + vol.Optional(CONF_VOLTAGE, default='6'): cv.string, +}) + + +def setup_platform(hass, config, add_devices, discovery_info=None): + """Set up of the Tuya switch.""" + from . import pytuya + + switches = [] + pytuyadevice = pytuya.OutletDevice(config.get(CONF_DEVICE_ID), config.get(CONF_HOST), config.get(CONF_LOCAL_KEY)) + pytuyadevice.set_version(float(config.get(CONF_PROTOCOL_VERSION))) + + outlet_device = TuyaCache(pytuyadevice) + switches.append( + TuyaDevice( + outlet_device, + config.get(CONF_NAME), + config.get(CONF_ICON), + config.get(CONF_ID), + config.get(CONF_CURRENT), + config.get(CONF_CURRENT_CONSUMPTION), + config.get(CONF_VOLTAGE) + ) + ) + #print('Setup localtuya switch [{}] with device ID [{}] '.format(config.get(CONF_NAME), config.get(CONF_ID))) + + add_devices(switches) + +class TuyaCache: + """Cache wrapper for pytuya.OutletDevice""" + + def __init__(self, device): + """Initialize the cache.""" + self._cached_status = '' + self._cached_status_time = 0 + self._device = device + self._lock = Lock() + + def __get_status(self): + for i in range(20): + try: + status = self._device.status() + return status + except ConnectionError: + if i+1 == 3: + raise ConnectionError("Failed to update status.") + + def set_status(self, state, switchid): + """Change the Tuya switch status and clear the cache.""" + self._cached_status = '' + self._cached_status_time = 0 + for i in range(20): + try: + return self._device.set_status(state, switchid) + except ConnectionError: + if i+1 == 5: + raise ConnectionError("Failed to set status.") + + def status(self): + """Get state of Tuya switch and cache the results.""" + self._lock.acquire() + try: + now = time() + if not self._cached_status or now - self._cached_status_time > 30: + sleep(0.5) + self._cached_status = self.__get_status() + self._cached_status_time = time() + return self._cached_status + finally: + self._lock.release() + +class TuyaDevice(SwitchDevice): + """Representation of a Tuya switch.""" + + def __init__(self, device, name, icon, switchid, attr_current, attr_consumption, attr_voltage): + """Initialize the Tuya switch.""" + self._device = device + self._name = name + self._icon = icon + self._switch_id = switchid + self._attr_current = attr_current + self._attr_consumption = attr_consumption + self._attr_voltage = attr_voltage + self._status = self._device.status() + self._state = self._status['dps'][self._switch_id] + print('Initialized tuya switch [{}] with switch status [{}] and state [{}]'.format(self._name, self._status, self._state)) + + @property + def name(self): + """Get name of Tuya switch.""" + return self._name + + @property + def is_on(self): + """Check if Tuya switch is on.""" + return self._state + + @property + def device_state_attributes(self): + attrs = {} + try: + attrs[ATTR_CURRENT] = "{}".format(self._status['dps'][self._attr_current]) + #print('attrs[ATTR_CURRENT]: [{}]'.format(attrs[ATTR_CURRENT])) + attrs[ATTR_CURRENT_CONSUMPTION] = "{}".format(self._status['dps'][self._attr_consumption]/10) + #print('attrs[ATTR_CURRENT_CONSUMPTION]: [{}]'.format(attrs[ATTR_CURRENT_CONSUMPTION])) + attrs[ATTR_VOLTAGE] = "{}".format(self._status['dps'][self._attr_voltage]/10) + #print('attrs[ATTR_VOLTAGE]: [{}]'.format(attrs[ATTR_VOLTAGE])) + + except KeyError: + pass + return attrs + + @property + def icon(self): + """Return the icon.""" + return self._icon + + def turn_on(self, **kwargs): + """Turn Tuya switch on.""" + self._device.set_status(True, self._switch_id) + + def turn_off(self, **kwargs): + """Turn Tuya switch off.""" + self._device.set_status(False, self._switch_id) + + def update(self): + """Get state of Tuya switch.""" + self._status = self._device.status() + self._state = self._status['dps'][self._switch_id]