"""Platform to locally control Tuya-based vacuum devices.""" import logging from functools import partial import voluptuous as vol from homeassistant.components.vacuum import ( DOMAIN, STATE_CLEANING, STATE_DOCKED, STATE_IDLE, STATE_RETURNING, STATE_PAUSED, SUPPORT_BATTERY, SUPPORT_FAN_SPEED, SUPPORT_PAUSE, SUPPORT_RETURN_HOME, SUPPORT_START, SUPPORT_STATE, SUPPORT_STATUS, SUPPORT_STOP, SUPPORT_PAUSE, StateVacuumEntity, ) from .common import LocalTuyaEntity, async_setup_entry from .const import ( CONF_POWERGO_DP, CONF_IDLE_STATUS_VALUE, CONF_RETURNING_STATUS_VALUE, CONF_DOCKED_STATUS_VALUE, CONF_BATTERY_DP, CONF_MODE_DP, CONF_MODES, CONF_FAN_SPEED_DP, CONF_FAN_SPEEDS, CONF_CLEAN_TIME_DP, CONF_CLEAN_AREA_DP, CONF_PAUSED_STATE, ) _LOGGER = logging.getLogger(__name__) CLEAN_TIME = "clean_time" CLEAN_AREA = "clean_area" MODES_LIST = "cleaning_mode_list" MODE = "cleaning_mode" DEFAULT_IDLE_STATUS = "standby,sleep,pause" DEFAULT_RETURNING_STATUS = "docking" DEFAULT_DOCKED_STATUS = "charging,chargecompleted" DEFAULT_MODES = "chargego,smart,standby,wall_follow,spiral,single" DEFAULT_FAN_SPEEDS = "low,normal,high" DEFAULT_PAUSED_STATE = "paused" def flow_schema(dps): """Return schema used in config flow.""" return { vol.Required(CONF_IDLE_STATUS_VALUE, default=DEFAULT_IDLE_STATUS): str, vol.Required(CONF_POWERGO_DP): vol.In(dps), vol.Required(CONF_DOCKED_STATUS_VALUE, default=DEFAULT_DOCKED_STATUS): str, vol.Optional(CONF_RETURNING_STATUS_VALUE, default=DEFAULT_RETURNING_STATUS): str, vol.Optional(CONF_BATTERY_DP): vol.In(dps), vol.Optional(CONF_MODE_DP): vol.In(dps), vol.Optional(CONF_MODES, default=DEFAULT_MODES): str, vol.Optional(CONF_FAN_SPEED_DP): vol.In(dps), vol.Optional(CONF_FAN_SPEEDS, default=DEFAULT_FAN_SPEEDS): str, vol.Optional(CONF_CLEAN_TIME_DP): vol.In(dps), vol.Optional(CONF_CLEAN_AREA_DP): vol.In(dps), vol.Optional(CONF_PAUSED_STATE, default=DEFAULT_PAUSED_STATE): str, } class LocaltuyaVacuum(LocalTuyaEntity, StateVacuumEntity): """Tuya vacuum device.""" def __init__(self, device, config_entry, switchid, **kwargs): """Initialize a new LocaltuyaVacuum.""" super().__init__(device, config_entry, switchid ,_LOGGER, **kwargs) self._state = None self._battery_level = None self._attrs = {} self._idle_status_list = [] if self.has_config(CONF_IDLE_STATUS_VALUE): self._idle_status_list = self._config[CONF_IDLE_STATUS_VALUE].split(",") self._modes_list = [] if self.has_config(CONF_MODES): self._modes_list = self._config[CONF_MODES].split(",") self._attrs[MODES_LIST] = self._modes_list self._return_mode = self._modes_list[0] self._docked_status_list = [] if self.has_config(CONF_DOCKED_STATUS_VALUE): self._docked_status_list = self._config[CONF_DOCKED_STATUS_VALUE].split(",") self._fan_speed_list = [] if self.has_config(CONF_FAN_SPEEDS): self._fan_speed_list = self._config[CONF_FAN_SPEEDS].split(",") self._fan_speed = "" self._cleaning_mode = "" print("Initialized vacuum [{}]".format(self.name)) @property def supported_features(self): """Flag supported features.""" supported_features = SUPPORT_START | SUPPORT_PAUSE | SUPPORT_STOP | SUPPORT_STATUS | SUPPORT_STATE if self.has_config(CONF_RETURNING_STATUS_VALUE): supported_features = supported_features | SUPPORT_RETURN_HOME if self.has_config(CONF_FAN_SPEED_DP): supported_features = supported_features | SUPPORT_FAN_SPEED if self.has_config(CONF_BATTERY_DP): supported_features = supported_features | SUPPORT_BATTERY return supported_features @property def state(self): """Return the vacuum state.""" return self._state @property def battery_level(self): """Return the current battery level.""" return self._battery_level @property def device_state_attributes(self): """Return the specific state attributes of this vacuum cleaner.""" return self._attrs @property def fan_speed(self): """Return the current fan speed.""" return self._fan_speed @property def fan_speed_list(self) -> list: """Return the list of available fan speeds.""" return self._fan_speed_list @property def error(self): """Return error message.""" return "" async def async_start(self, **kwargs): """Turn the vacuum on and start cleaning.""" await self._device.set_dp(True, self._config[CONF_POWERGO_DP]) async def async_pause(self, **kwargs): """Stop the vacuum cleaner, do not return to base.""" await self._device.set_dp(False, self._config[CONF_POWERGO_DP]) async def async_return_to_base(self, **kwargs): """Set the vacuum cleaner to return to the dock.""" if self._return_mode: await self._device.set_dp(self._return_mode, self._config[CONF_MODE_DP]) else: _LOGGER.error("Missing command for return home in commands set.") async def async_stop(self, **kwargs): """Turn the vacuum off stopping the cleaning and returning home.""" if self._return_mode: await self._device.set_dp(self._return_mode, self._config[CONF_MODE_DP]) else: _LOGGER.error("Missing command for return home in commands set.") async def async_clean_spot(self, **kwargs): """Perform a spot clean-up.""" return None async def async_locate(self, **kwargs): """Locate the vacuum cleaner.""" return None async def async_set_fan_speed(self, **kwargs): """Set the fan speed.""" fan_speed = kwargs["fan_speed"] await self._device.set_dp(fan_speed, self._config[CONF_FAN_SPEED_DP]) async def async_send_command(self, command, params=None, **kwargs): """Send a command to a vacuum cleaner.""" if command == "set_mode" and 'mode' in params: mode = params['mode'] await self._device.set_dp(mode, self._config[CONF_MODE_DP]) def status_updated(self): """Device status was updated.""" state_value = str(self.dps(self._dp_id)) if state_value in self._idle_status_list: self._state = STATE_IDLE elif state_value in self._docked_status_list: self._state = STATE_DOCKED elif state_value == self._config[CONF_RETURNING_STATUS_VALUE]: self._state = STATE_RETURNING elif state_value == self._config[CONF_PAUSED_STATE]: self._state = STATE_PAUSED else: self._state = STATE_CLEANING if self.has_config(CONF_BATTERY_DP): self._battery_level = self.dps_conf(CONF_BATTERY_DP) self._cleaning_mode = "" if self.has_config(CONF_MODES): self._cleaning_mode = self.dps_conf(CONF_MODE_DP) self._attrs[MODE] = self._cleaning_mode self._fan_speed = "" if self.has_config(CONF_FAN_SPEEDS): self._fan_speed = self.dps_conf(CONF_FAN_SPEED_DP) if self.has_config(CONF_CLEAN_TIME_DP): self._attrs[CLEAN_TIME] = self.dps_conf(CONF_CLEAN_TIME_DP) if self.has_config(CONF_CLEAN_AREA_DP): self._attrs[CLEAN_AREA] = self.dps_conf(CONF_CLEAN_AREA_DP) async_setup_entry = partial(async_setup_entry, DOMAIN, LocaltuyaVacuum, flow_schema)