Make light entity inherit from LocalTuyaEntity

This commit is contained in:
Pierre Ståhl
2020-09-21 22:07:22 +02:00
committed by rospogrigio
parent b169c86473
commit dd48fc00f5
2 changed files with 79 additions and 201 deletions

View File

@@ -1,5 +1,5 @@
""" """
Simple platform to control LOCALLY Tuya switch devices. Simple platform to control LOCALLY Tuya light devices.
Sample config yaml Sample config yaml
@@ -8,7 +8,6 @@ light:
host: 192.168.0.1 host: 192.168.0.1
local_key: 1234567891234567 local_key: 1234567891234567
device_id: 12345678912345671234 device_id: 12345678912345671234
name: tuya_01
friendly_name: This Light friendly_name: This Light
protocol_version: 3.3 protocol_version: 3.3
""" """
@@ -30,12 +29,14 @@ from homeassistant.components.light import (
ATTR_HS_COLOR, ATTR_HS_COLOR,
SUPPORT_BRIGHTNESS, SUPPORT_BRIGHTNESS,
SUPPORT_COLOR, SUPPORT_COLOR,
SUPPORT_COLOR_TEMP,
) )
from homeassistant.util import color as colorutil
from . import BASE_PLATFORM_SCHEMA, import_from_yaml, prepare_setup_entities from . import (
from .pytuya import TuyaDevice BASE_PLATFORM_SCHEMA,
LocalTuyaEntity,
import_from_yaml,
prepare_setup_entities,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -43,6 +44,12 @@ MIN_MIRED = 153
MAX_MIRED = 370 MAX_MIRED = 370
UPDATE_RETRY_LIMIT = 3 UPDATE_RETRY_LIMIT = 3
DPS_INDEX_ON = "1"
DPS_INDEX_MODE = "2"
DPS_INDEX_BRIGHTNESS = "3"
DPS_INDEX_COLOURTEMP = "4"
DPS_INDEX_COLOUR = "5"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(BASE_PLATFORM_SCHEMA) PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(BASE_PLATFORM_SCHEMA)
@@ -52,10 +59,8 @@ def flow_schema(dps):
async def async_setup_entry(hass, config_entry, async_add_entities): async def async_setup_entry(hass, config_entry, async_add_entities):
"""Setup a Tuya switch based on a config entry.""" """Setup a Tuya light based on a config entry."""
device, entities_to_setup = prepare_setup_entities( device, entities_to_setup = prepare_setup_entities(config_entry, DOMAIN)
config_entry, DOMAIN
)
if not entities_to_setup: if not entities_to_setup:
return return
@@ -67,7 +72,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
lights.append( lights.append(
LocaltuyaLight( LocaltuyaLight(
TuyaCache(device, config_entry.data[CONF_FRIENDLY_NAME]), TuyaCache(device, config_entry.data[CONF_FRIENDLY_NAME]),
device_config[CONF_FRIENDLY_NAME], config_entry,
device_config[CONF_ID], device_config[CONF_ID],
) )
) )
@@ -76,7 +81,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
def setup_platform(hass, config, add_devices, discovery_info=None): def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up of the Tuya switch.""" """Set up of the Tuya light."""
return import_from_yaml(hass, config, DOMAIN) return import_from_yaml(hass, config, DOMAIN)
@@ -96,16 +101,27 @@ class TuyaCache:
"""Return unique device identifier.""" """Return unique device identifier."""
return self._device.id return self._device.id
def __get_status(self, switchid): def __get_status(self):
for _ in range(UPDATE_RETRY_LIMIT): for i in range(5):
try: try:
return self._device.status()["dps"][switchid] status = self._device.status()
except (ConnectionError, socket.timeout): return status
pass except Exception:
_LOGGER.warning("Failed to get status after %d tries", UPDATE_RETRY_LIMIT) print(
"Failed to update status of device [{}]".format(
self._device.address
)
)
sleep(1.0)
if i + 1 == 3:
_LOGGER.error(
"Failed to update status of device %s", self._device.address
)
# return None
raise ConnectionError("Failed to update status .")
def set_dps(self, state, dps_index): def set_dps(self, state, dps_index):
"""Change the Tuya switch status and clear the cache.""" """Change the Tuya light status and clear the cache."""
self._cached_status = "" self._cached_status = ""
self._cached_status_time = 0 self._cached_status_time = 0
for _ in range(UPDATE_RETRY_LIMIT): for _ in range(UPDATE_RETRY_LIMIT):
@@ -117,109 +133,43 @@ class TuyaCache:
pass pass
_LOGGER.warning("Failed to set status after %d tries", UPDATE_RETRY_LIMIT) _LOGGER.warning("Failed to set status after %d tries", UPDATE_RETRY_LIMIT)
def status(self, switchid): def status(self):
"""Get state of Tuya switch and cache the results.""" """Get state of Tuya light and cache the results."""
with self._lock: with self._lock:
now = time() now = time()
if not self._cached_status or now - self._cached_status_time > 15: if not self._cached_status or now - self._cached_status_time > 15:
sleep(0.5) sleep(0.5)
self._cached_status = self.__get_status(switchid) self._cached_status = self.__get_status()
self._cached_status_time = time() self._cached_status_time = time()
return self._cached_status return self._cached_status
def cached_status(self):
return self._cached_status
def state(self): class LocaltuyaLight(LocalTuyaEntity, LightEntity):
self._device.state(); """Representation of a Tuya light."""
class LocaltuyaLight(LightEntity):
"""Representation of a Tuya switch."""
DPS_INDEX_ON = '1'
DPS_INDEX_MODE = '2'
DPS_INDEX_BRIGHTNESS = '3'
DPS_INDEX_COLOURTEMP = '4'
DPS_INDEX_COLOUR = '5'
def __init__(self, device, friendly_name, bulbid): def __init__(
"""Initialize the Tuya switch.""" self,
self._device = device device,
self._available = False config_entry,
self._name = friendly_name lightid,
**kwargs,
):
"""Initialize the Tuya light."""
super().__init__(device, config_entry, lightid, **kwargs)
self._state = False self._state = False
self._brightness = 127 self._brightness = 127
self._color_temp = 127 self._color_temp = 127
self._bulb_id = bulbid
@property
def device_info(self):
return {
"identifiers": {
# Serial numbers are unique identifiers within a specific domain
("LocalTuya", f"local_{self._device.unique_id}")
},
"name": self._device._friendly_name,
"manufacturer": "Tuya generic",
"model": "SmartLight",
"sw_version": "3.3",
}
def state(self):
self._device.state()
@property
def name(self):
"""Get name of Tuya switch."""
return self._name
@property
def unique_id(self):
"""Return unique device identifier."""
return f"local_{self._device.unique_id}"
@property
def available(self):
"""Return if device is available or not."""
return self._available
@property @property
def is_on(self): def is_on(self):
"""Check if Tuya switch is on.""" """Check if Tuya light is on."""
return self._state return self._state
def update(self):
"""Get state of Tuya switch."""
try:
self._update_state()
except:
self._available = False
else:
self._available = True
def _update_state(self):
status = self._device.status(self._bulb_id)
self._state = status
try:
brightness = int(self._device.brightness())
if brightness > 254:
brightness = 255
if brightness < 25:
brightness = 25
self._brightness = brightness
except TypeError:
pass
self._color_temp = self._device.color_temp()
@property @property
def brightness(self): def brightness(self):
"""Return the brightness of the light.""" """Return the brightness of the light."""
return self._brightness return self._brightness
# @property
# def hs_color(self):
# """Return the hs_color of the light."""
# return (self._device.color_hsv()[0],self._device.color_hsv()[1])
@property @property
def color_temp(self): def color_temp(self):
"""Return the color_temp of the light.""" """Return the color_temp of the light."""
@@ -238,118 +188,46 @@ class LocaltuyaLight(LightEntity):
"""Return color temperature max mireds.""" """Return color temperature max mireds."""
return MAX_MIRED return MAX_MIRED
@property
def supported_features(self):
"""Flag supported features."""
supports = SUPPORT_BRIGHTNESS
if self._color_temp is not None:
supports = supports | SUPPORT_COLOR
return supports
def turn_on(self, **kwargs): def turn_on(self, **kwargs):
"""Turn on or control the light.""" """Turn on or control the light."""
_LOGGER.debug("Turning on, state: %s", self._device.cached_status()) self._device.set_dps(True, self._dps_id)
if not self._device.cached_status():
self._device.set_dps(True, self._bulb_id)
if ATTR_BRIGHTNESS in kwargs: if ATTR_BRIGHTNESS in kwargs:
converted_brightness = int(kwargs[ATTR_BRIGHTNESS]) self._device.set_dps(
if converted_brightness <= 25: max(int(kwargs[ATTR_BRIGHTNESS]), 25), DPS_INDEX_BRIGHTNESS
converted_brightness = 25 )
self.set_brightness(converted_brightness)
if ATTR_HS_COLOR in kwargs: if ATTR_HS_COLOR in kwargs:
raise ValueError(" TODO implement RGB from HS") raise ValueError(" TODO implement RGB from HS")
if ATTR_COLOR_TEMP in kwargs: if ATTR_COLOR_TEMP in kwargs:
color_temp = int( color_temp = int(
255 255
- (255 / (MAX_MIRED - MIN_MIRED)) - (255 / (MAX_MIRED - MIN_MIRED))
* (int(kwargs[ATTR_COLOR_TEMP]) - MIN_MIRED) * (int(kwargs[ATTR_COLOR_TEMP]) - MIN_MIRED)
) )
self._device.set_color_temp(color_temp) self._device.set_dps(color_temp, DPS_INDEX_COLOURTEMP)
def turn_off(self, **kwargs): def turn_off(self, **kwargs):
"""Turn Tuya switch off.""" """Turn Tuya light off."""
self._device.set_dps(False, self._bulb_id) self._device.set_dps(False, self._dps_id)
@property def status_updated(self):
def supported_features(self): """Device status was updated."""
"""Flag supported features.""" self._state = self.dps(self._dps_id)
supports = SUPPORT_BRIGHTNESS
if self._device.color_temp() != "999":
supports = supports | SUPPORT_COLOR
# supports = supports | SUPPORT_COLOR_TEMP
return supports
def support_color(self): brightness = self.dps(DPS_INDEX_BRIGHTNESS)
return supported_features() & SUPPORT_COLOR if brightness is not None:
brightness = min(brightness, 255)
def support_color_temp(self): brightness = max(brightness, 25)
return supported_features() & SUPPORT_COLOR_TEMP self._brightness = brightness
def color_temp(self):
for _ in range(UPDATE_RETRY_LIMIT):
try:
return self._state[self.DPS][self.DPS_INDEX_COLOURTEMP]
except ConnectionError:
pass
except KeyError:
return "999"
except socket.timeout:
pass
log.warn(
"Failed to get color temp after {} tries".format(UPDATE_RETRY_LIMIT))
def set_color_temp(self, color_temp):
for _ in range(UPDATE_RETRY_LIMIT):
try:
if not 0 <= colourtemp <= 255:
raise ValueError("The colour temperature needs to be between 0 and 255.")
self._device.set_dps(colourtemp, self.DPS_INDEX_COLOURTEMP)
except ConnectionError:
pass
except KeyError:
pass
except socket.timeout:
pass
log.warn(
"Failed to set color temp after {} tries".format(UPDATE_RETRY_LIMIT))
def set_brightness(self, brightness):
for _ in range(UPDATE_RETRY_LIMIT):
try:
if not 25 <= brightness <= 255:
raise ValueError("The brightness needs to be between 25 and 255.")
self._device.set_dps(brightness, self.DPS_INDEX_BRIGHTNESS)
except ConnectionError:
pass
except KeyError:
pass
except socket.timeout:
pass
log.warn(
"Failed to set brightness after {} tries".format(UPDATE_RETRY_LIMIT))
@staticmethod
def _hexvalue_to_rgb(hexvalue):
"""
Converts the hexvalue used by tuya for colour representation into
an RGB value.
Args:
hexvalue(string): The hex representation generated by TuyaDevice._rgb_to_hexvalue()
"""
r = int(hexvalue[0:2], 16)
g = int(hexvalue[2:4], 16)
b = int(hexvalue[4:6], 16)
return (r, g, b)
@staticmethod
def _hexvalue_to_hsv(hexvalue):
"""
Converts the hexvalue used by tuya for colour representation into
an HSV value.
Args:
hexvalue(string): The hex representation generated by TuyaDevice._rgb_to_hexvalue()
"""
h = int(hexvalue[7:10], 16) / 360
s = int(hexvalue[10:12], 16) / 255
v = int(hexvalue[12:14], 16) / 255
return (h, s, v)
self._color_temp = self.dps(DPS_INDEX_COLOURTEMP)

View File

@@ -174,7 +174,7 @@ class TuyaCache:
) )
return return
# raise ConnectionError("Failed to set status.") # raise ConnectionError("Failed to set status.")
self.update() self.update()
def status(self): def status(self):
@@ -237,5 +237,5 @@ class LocaltuyaSwitch(LocalTuyaEntity, SwitchEntity):
self._device.set_dps(False, self._dps_id) self._device.set_dps(False, self._dps_id)
def status_updated(self): def status_updated(self):
"""Device statua was updated.""" """Device status was updated."""
self._state = self.dps(self._dps_id) self._state = self.dps(self._dps_id)