Add config flow support for fan

This commit is contained in:
Pierre Ståhl
2020-09-10 12:07:59 +02:00
parent edc9debf83
commit f9a4b4ad8a
2 changed files with 206 additions and 217 deletions

View File

@@ -20,4 +20,4 @@ CONF_STOP_CMD = 'stop_cmd'
DOMAIN = "localtuya" DOMAIN = "localtuya"
# Platforms in this list must support config flows # Platforms in this list must support config flows
PLATFORMS = ["cover", "light", "switch"] PLATFORMS = ["cover", "fan", "light", "switch"]

View File

@@ -1,216 +1,205 @@
""" """
Simple platform to control LOCALLY Tuya cover devices. Simple platform to control LOCALLY Tuya cover devices.
Sample config yaml Sample config yaml
fan: fan:
- platform: localtuya - platform: localtuya
host: 192.168.0.123 host: 192.168.0.123
local_key: 1234567891234567 local_key: 1234567891234567
device_id: 123456789123456789abcd device_id: 123456789123456789abcd
name: fan guests name: fan guests
friendly_name: fan guests friendly_name: fan guests
protocol_version: 3.3 protocol_version: 3.3
id: 1 id: 1
""" """
import logging import logging
import requests
import voluptuous as vol
import voluptuous as vol
from homeassistant.components.fan import (
from homeassistant.components.fan import (SPEED_LOW, SPEED_MEDIUM, SPEED_HIGH, SPEED_LOW,
FanEntity, SUPPORT_SET_SPEED, SPEED_MEDIUM,
SUPPORT_OSCILLATE, SUPPORT_DIRECTION, PLATFORM_SCHEMA) SPEED_HIGH,
FanEntity,
from homeassistant.const import STATE_OFF SUPPORT_SET_SPEED,
"""from . import DATA_TUYA, TuyaDevice""" SUPPORT_OSCILLATE,
from homeassistant.const import (CONF_HOST, CONF_ID, CONF_FRIENDLY_NAME, CONF_ICON, CONF_NAME) SUPPORT_DIRECTION,
import homeassistant.helpers.config_validation as cv PLATFORM_SCHEMA,
)
_LOGGER = logging.getLogger(__name__) from homeassistant.const import CONF_ID, CONF_FRIENDLY_NAME, STATE_OFF
import homeassistant.helpers.config_validation as cv
DEFAULT_NAME = 'localtuyafan'
from . import BASE_PLATFORM_SCHEMA, prepare_setup_entities, import_from_yaml
REQUIREMENTS = ['pytuya==7.0.9'] from .pytuya import FanDevice
CONF_DEVICE_ID = 'device_id' _LOGGER = logging.getLogger(__name__)
CONF_LOCAL_KEY = 'local_key'
CONF_PROTOCOL_VERSION = 'protocol_version' PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(BASE_PLATFORM_SCHEMA)
DEFAULT_ID = '1'
DEFAULT_PROTOCOL_VERSION = 3.3 def flow_schema(dps):
"""Return schema used in config flow."""
return {}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_ICON): cv.icon,
vol.Required(CONF_HOST): cv.string, async def async_setup_entry(hass, config_entry, async_add_entities):
vol.Required(CONF_DEVICE_ID): cv.string, """Setup a Tuya fan based on a config entry."""
vol.Required(CONF_LOCAL_KEY): cv.string, device, entities_to_setup = prepare_setup_entities(config_entry, "fan", FanDevice)
vol.Required(CONF_NAME): cv.string, if not entities_to_setup:
vol.Required(CONF_FRIENDLY_NAME): cv.string, return
vol.Required(CONF_PROTOCOL_VERSION, default=DEFAULT_PROTOCOL_VERSION): vol.Coerce(float),
vol.Optional(CONF_ID, default=DEFAULT_ID): cv.string, lights = []
}) for device_config in entities_to_setup:
lights.append(
TuyaDevice(
def setup_platform(hass, config, add_entities, discovery_info=None): device,
"""Set up of the Tuya switch.""" device_config[CONF_FRIENDLY_NAME],
from . import pytuya device_config[CONF_ID],
fans = [] )
localtuyadevice = pytuya.FanDevice(config.get(CONF_DEVICE_ID), config.get(CONF_HOST), config.get(CONF_LOCAL_KEY)) )
localtuyadevice.set_version(float(config.get(CONF_PROTOCOL_VERSION)))
_LOGGER.debug("localtuya fan: setup_platform: %s", localtuyadevice) async_add_entities(lights, True)
fan_device = localtuyadevice
fans.append( def setup_platform(hass, config, add_devices, discovery_info=None):
TuyaDevice( """Set up of the Tuya fan."""
fan_device, return import_from_yaml(hass, config, "fan")
config.get(CONF_NAME),
config.get(CONF_FRIENDLY_NAME),
config.get(CONF_ICON), class TuyaDevice(FanEntity):
config.get(CONF_ID), """A demonstration fan component."""
)
) def __init__(self, device, friendly_name, switchid):
_LOGGER.info("Setup localtuya fan %s with device ID %s ", config.get(CONF_FRIENDLY_NAME), config.get(CONF_DEVICE_ID) ) """Initialize the entity."""
self._device = device
add_entities(fans, True) self._name = friendly_name
self._available = False
class TuyaDevice(FanEntity): self._friendly_name = friendly_name
"""A demonstration fan component.""" self._switch_id = switchid
self._status = self._device.status()
# def __init__(self, hass, name: str, supported_features: int) -> None: self._state = False
def __init__(self, device, name, friendly_name, icon, switchid): self._supported_features = SUPPORT_SET_SPEED | SUPPORT_OSCILLATE
"""Initialize the entity.""" self._speed = STATE_OFF
self._device = device self._oscillating = False
self._name = friendly_name
self._available = False @property
self._friendly_name = friendly_name def oscillating(self):
self._icon = icon """Return current oscillating status."""
self._switch_id = switchid # if self._speed == STATE_OFF:
self._status = self._device.status() # return False
self._state = False _LOGGER.debug("localtuya fan: oscillating = %s", self._oscillating)
self._supported_features = SUPPORT_SET_SPEED | SUPPORT_OSCILLATE return self._oscillating
self._speed = STATE_OFF
self._oscillating = False @property
def name(self) -> str:
@property """Get entity name."""
def oscillating(self): return self._name
"""Return current oscillating status."""
#if self._speed == STATE_OFF: @property
# return False def is_on(self):
_LOGGER.debug("localtuya fan: oscillating = %s", self._oscillating) """Check if Tuya switch is on."""
return self._oscillating return self._state
@property @property
def name(self) -> str: def speed(self) -> str:
"""Get entity name.""" """Return the current speed."""
return self._name return self._speed
@property @property
def is_on(self): def speed_list(self) -> list:
"""Check if Tuya switch is on.""" """Get the list of available speeds."""
return self._state return [STATE_OFF, SPEED_LOW, SPEED_MEDIUM, SPEED_HIGH]
# return ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12']
@property
def speed(self) -> str: def turn_on(self, speed: str = None, **kwargs) -> None:
"""Return the current speed.""" """Turn on the entity."""
return self._speed _LOGGER.debug("localtuya fan: turn_on speed to: %s", speed)
# if speed is None:
@property # speed = SPEED_MEDIUM
def speed_list(self) -> list: # self.set_speed(speed)
"""Get the list of available speeds.""" self._device.turn_on()
return [STATE_OFF, SPEED_LOW, SPEED_MEDIUM, SPEED_HIGH] if speed is not None:
# return ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12'] self.set_speed(speed)
self.schedule_update_ha_state()
def turn_on(self, speed: str = None, **kwargs) -> None:
"""Turn on the entity.""" def turn_off(self, **kwargs) -> None:
_LOGGER.debug("localtuya fan: turn_on speed to: %s", speed) """Turn off the entity."""
# if speed is None: _LOGGER.debug("localtuya fan: turn_off")
# speed = SPEED_MEDIUM self._device.set_status(False, "1")
# self.set_speed(speed) self._state = False
self._device.turn_on() self.schedule_update_ha_state()
if speed is not None:
self.set_speed(speed) def set_speed(self, speed: str) -> None:
self.schedule_update_ha_state() """Set the speed of the fan."""
_LOGGER.debug("localtuya fan: set_speed to: %s", speed)
def turn_off(self, **kwargs) -> None: self._speed = speed
"""Turn off the entity.""" # self.schedule_update_ha_state()
_LOGGER.debug("localtuya fan: turn_off") if speed == STATE_OFF:
self._device.set_status(False, '1') self._device.set_status(False, "1")
self._state = False self._state = False
self.schedule_update_ha_state() elif speed == SPEED_LOW:
self._device.set_value("2", "1")
def set_speed(self, speed: str) -> None: elif speed == SPEED_MEDIUM:
"""Set the speed of the fan.""" self._device.set_value("2", "2")
_LOGGER.debug("localtuya fan: set_speed to: %s", speed) elif speed == SPEED_HIGH:
self._speed = speed self._device.set_value("2", "3")
# self.schedule_update_ha_state() self.schedule_update_ha_state()
if speed == STATE_OFF:
self._device.set_status(False, '1') # def set_direction(self, direction: str) -> None:
self._state = False # """Set the direction of the fan."""
elif speed == SPEED_LOW: # self.direction = direction
self._device.set_value('2', '1') # self.schedule_update_ha_state()
elif speed == SPEED_MEDIUM:
self._device.set_value('2', '2') def oscillate(self, oscillating: bool) -> None:
elif speed == SPEED_HIGH: """Set oscillation."""
self._device.set_value('2', '3') self._oscillating = oscillating
self.schedule_update_ha_state() self._device.set_value("8", oscillating)
self.schedule_update_ha_state()
# def set_direction(self, direction: str) -> None:
# """Set the direction of the fan.""" # @property
# self.direction = direction # def current_direction(self) -> str:
# self.schedule_update_ha_state() # """Fan direction."""
# return self.direction
def oscillate(self, oscillating: bool) -> None:
"""Set oscillation.""" @property
self._oscillating = oscillating def unique_id(self):
self._device.set_value('8', oscillating) """Return unique device identifier."""
self.schedule_update_ha_state() return f"local_{self._device.id}_{self._switch_id}"
# @property @property
# def current_direction(self) -> str: def available(self):
# """Fan direction.""" """Return if device is available or not."""
# return self.direction return self._available
@property @property
def unique_id(self): def supported_features(self) -> int:
"""Return unique device identifier.""" """Flag supported features."""
_LOGGER.debug("localtuya fan unique_id = %s", self._device) return self._supported_features
return self._device.id
def update(self):
@property """Get state of Tuya switch."""
def available(self): success = False
"""Return if device is available or not.""" for i in range(3):
return self._available if success is False:
try:
@property status = self._device.status()
def supported_features(self) -> int: _LOGGER.debug("localtuya fan: status = %s", status)
"""Flag supported features.""" self._state = status["dps"]["1"]
return self._supported_features if status["dps"]["1"] == False:
self._speed = STATE_OFF
def update(self): elif int(status["dps"]["2"]) == 1:
"""Get state of Tuya switch.""" self._speed = SPEED_LOW
success = False elif int(status["dps"]["2"]) == 2:
for i in range(3): self._speed = SPEED_MEDIUM
if success is False: elif int(status["dps"]["2"]) == 3:
try: self._speed = SPEED_HIGH
status = self._device.status() # self._speed = status['dps']['2']
_LOGGER.debug("localtuya fan: status = %s", status) self._oscillating = status["dps"]["8"]
self._state = status['dps']['1'] success = True
if status['dps']['1'] == False: except ConnectionError:
self._speed = STATE_OFF if i + 1 == 3:
elif int(status['dps']['2']) == 1: success = False
self._speed = SPEED_LOW raise ConnectionError("Failed to update status.")
elif int(status['dps']['2']) == 2: self._available = success
self._speed = SPEED_MEDIUM
elif int(status['dps']['2']) == 3:
self._speed = SPEED_HIGH
# self._speed = status['dps']['2']
self._oscillating = status['dps']['8']
success = True
except ConnectionError:
if i+1 == 3:
success = False
raise ConnectionError("Failed to update status.")
self._available = success