809 lines
29 KiB
Python
809 lines
29 KiB
Python
"""Config flow for LocalTuya integration integration."""
|
|
import errno
|
|
import logging
|
|
import time
|
|
from importlib import import_module
|
|
|
|
import homeassistant.helpers.config_validation as cv
|
|
import homeassistant.helpers.entity_registry as er
|
|
import voluptuous as vol
|
|
from homeassistant import config_entries, core, exceptions
|
|
from homeassistant.const import (
|
|
CONF_CLIENT_ID,
|
|
CONF_CLIENT_SECRET,
|
|
CONF_DEVICE_ID,
|
|
CONF_DEVICES,
|
|
CONF_ENTITIES,
|
|
CONF_FRIENDLY_NAME,
|
|
CONF_HOST,
|
|
CONF_ID,
|
|
CONF_NAME,
|
|
CONF_PLATFORM,
|
|
CONF_REGION,
|
|
CONF_SCAN_INTERVAL,
|
|
CONF_USERNAME,
|
|
)
|
|
from homeassistant.core import callback
|
|
|
|
from .cloud_api import TuyaCloudApi
|
|
from .common import pytuya
|
|
from .const import (
|
|
ATTR_UPDATED_AT,
|
|
CONF_ACTION,
|
|
CONF_ADD_DEVICE,
|
|
CONF_DPS_STRINGS,
|
|
CONF_EDIT_DEVICE,
|
|
CONF_LOCAL_KEY,
|
|
CONF_MODEL,
|
|
CONF_NO_CLOUD,
|
|
CONF_PRODUCT_NAME,
|
|
CONF_PROTOCOL_VERSION,
|
|
CONF_RESET_DPIDS,
|
|
CONF_SETUP_CLOUD,
|
|
CONF_USER_ID,
|
|
DATA_CLOUD,
|
|
DATA_DISCOVERY,
|
|
DOMAIN,
|
|
PLATFORMS,
|
|
CONF_MANUAL_DPS,
|
|
)
|
|
from .discovery import discover
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
ENTRIES_VERSION = 2
|
|
|
|
PLATFORM_TO_ADD = "platform_to_add"
|
|
NO_ADDITIONAL_ENTITIES = "no_additional_entities"
|
|
SELECTED_DEVICE = "selected_device"
|
|
|
|
CUSTOM_DEVICE = "..."
|
|
|
|
CONF_ACTIONS = {
|
|
CONF_ADD_DEVICE: "Add a new device",
|
|
CONF_EDIT_DEVICE: "Edit a device",
|
|
CONF_SETUP_CLOUD: "Reconfigure Cloud API account",
|
|
}
|
|
|
|
CONFIGURE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_ACTION, default=CONF_ADD_DEVICE): vol.In(CONF_ACTIONS),
|
|
}
|
|
)
|
|
|
|
CLOUD_SETUP_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_REGION, default="eu"): vol.In(["eu", "us", "cn", "in"]),
|
|
vol.Optional(CONF_CLIENT_ID): cv.string,
|
|
vol.Optional(CONF_CLIENT_SECRET): cv.string,
|
|
vol.Optional(CONF_USER_ID): cv.string,
|
|
vol.Optional(CONF_USERNAME, default=DOMAIN): cv.string,
|
|
vol.Required(CONF_NO_CLOUD, default=False): bool,
|
|
}
|
|
)
|
|
|
|
CONFIGURE_DEVICE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_FRIENDLY_NAME): str,
|
|
vol.Required(CONF_LOCAL_KEY): str,
|
|
vol.Required(CONF_HOST): str,
|
|
vol.Required(CONF_DEVICE_ID): str,
|
|
vol.Required(CONF_PROTOCOL_VERSION, default="3.3"): vol.In(
|
|
["3.1", "3.2", "3.3", "3.4"]
|
|
),
|
|
vol.Optional(CONF_SCAN_INTERVAL): int,
|
|
vol.Optional(CONF_MANUAL_DPS): str,
|
|
vol.Optional(CONF_RESET_DPIDS): str,
|
|
}
|
|
)
|
|
|
|
DEVICE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_HOST): cv.string,
|
|
vol.Required(CONF_DEVICE_ID): cv.string,
|
|
vol.Required(CONF_LOCAL_KEY): cv.string,
|
|
vol.Required(CONF_FRIENDLY_NAME): cv.string,
|
|
vol.Required(CONF_PROTOCOL_VERSION, default="3.3"): vol.In(
|
|
["3.1", "3.2", "3.3", "3.4"]
|
|
),
|
|
vol.Optional(CONF_SCAN_INTERVAL): int,
|
|
vol.Optional(CONF_MANUAL_DPS): cv.string,
|
|
vol.Optional(CONF_RESET_DPIDS): str,
|
|
}
|
|
)
|
|
|
|
PICK_ENTITY_SCHEMA = vol.Schema(
|
|
{vol.Required(PLATFORM_TO_ADD, default="switch"): vol.In(PLATFORMS)}
|
|
)
|
|
|
|
|
|
def devices_schema(discovered_devices, cloud_devices_list, add_custom_device=True):
|
|
"""Create schema for devices step."""
|
|
devices = {}
|
|
for dev_id, dev_host in discovered_devices.items():
|
|
dev_name = dev_id
|
|
if dev_id in cloud_devices_list.keys():
|
|
dev_name = cloud_devices_list[dev_id][CONF_NAME]
|
|
devices[dev_id] = f"{dev_name} ({dev_host})"
|
|
|
|
if add_custom_device:
|
|
devices.update({CUSTOM_DEVICE: CUSTOM_DEVICE})
|
|
|
|
# devices.update(
|
|
# {
|
|
# ent.data[CONF_DEVICE_ID]: ent.data[CONF_FRIENDLY_NAME]
|
|
# for ent in entries
|
|
# }
|
|
# )
|
|
return vol.Schema({vol.Required(SELECTED_DEVICE): vol.In(devices)})
|
|
|
|
|
|
def options_schema(entities):
|
|
"""Create schema for options."""
|
|
entity_names = [
|
|
f"{entity[CONF_ID]}: {entity[CONF_FRIENDLY_NAME]}" for entity in entities
|
|
]
|
|
return vol.Schema(
|
|
{
|
|
vol.Required(CONF_FRIENDLY_NAME): str,
|
|
vol.Required(CONF_HOST): str,
|
|
vol.Required(CONF_LOCAL_KEY): str,
|
|
vol.Required(CONF_PROTOCOL_VERSION, default="3.3"): vol.In(
|
|
["3.1", "3.2", "3.3", "3.4"]
|
|
),
|
|
vol.Optional(CONF_SCAN_INTERVAL): int,
|
|
vol.Optional(CONF_MANUAL_DPS): str,
|
|
vol.Optional(CONF_RESET_DPIDS): str,
|
|
vol.Required(
|
|
CONF_ENTITIES, description={"suggested_value": entity_names}
|
|
): cv.multi_select(entity_names),
|
|
}
|
|
)
|
|
|
|
|
|
def schema_defaults(schema, dps_list=None, **defaults):
|
|
"""Create a new schema with default values filled in."""
|
|
copy = schema.extend({})
|
|
for field, field_type in copy.schema.items():
|
|
if isinstance(field_type, vol.In):
|
|
value = None
|
|
for dps in dps_list or []:
|
|
if dps.startswith(f"{defaults.get(field)} "):
|
|
value = dps
|
|
break
|
|
|
|
if value in field_type.container:
|
|
field.default = vol.default_factory(value)
|
|
continue
|
|
|
|
if field.schema in defaults:
|
|
field.default = vol.default_factory(defaults[field])
|
|
return copy
|
|
|
|
|
|
def dps_string_list(dps_data):
|
|
"""Return list of friendly DPS values."""
|
|
return [f"{id} (value: {value})" for id, value in dps_data.items()]
|
|
|
|
|
|
def gen_dps_strings():
|
|
"""Generate list of DPS values."""
|
|
return [f"{dp} (value: ?)" for dp in range(1, 256)]
|
|
|
|
|
|
def platform_schema(platform, dps_strings, allow_id=True, yaml=False):
|
|
"""Generate input validation schema for a platform."""
|
|
schema = {}
|
|
if yaml:
|
|
# In YAML mode we force the specified platform to match flow schema
|
|
schema[vol.Required(CONF_PLATFORM)] = vol.In([platform])
|
|
if allow_id:
|
|
schema[vol.Required(CONF_ID)] = vol.In(dps_strings)
|
|
schema[vol.Required(CONF_FRIENDLY_NAME)] = str
|
|
return vol.Schema(schema).extend(flow_schema(platform, dps_strings))
|
|
|
|
|
|
def flow_schema(platform, dps_strings):
|
|
"""Return flow schema for a specific platform."""
|
|
integration_module = ".".join(__name__.split(".")[:-1])
|
|
return import_module("." + platform, integration_module).flow_schema(dps_strings)
|
|
|
|
|
|
def strip_dps_values(user_input, dps_strings):
|
|
"""Remove values and keep only index for DPS config items."""
|
|
stripped = {}
|
|
for field, value in user_input.items():
|
|
if value in dps_strings:
|
|
stripped[field] = int(user_input[field].split(" ")[0])
|
|
else:
|
|
stripped[field] = user_input[field]
|
|
return stripped
|
|
|
|
|
|
def config_schema():
|
|
"""Build schema used for setting up component."""
|
|
entity_schemas = [
|
|
platform_schema(platform, range(1, 256), yaml=True) for platform in PLATFORMS
|
|
]
|
|
return vol.Schema(
|
|
{
|
|
DOMAIN: vol.All(
|
|
cv.ensure_list,
|
|
[
|
|
DEVICE_SCHEMA.extend(
|
|
{vol.Required(CONF_ENTITIES): [vol.Any(*entity_schemas)]}
|
|
)
|
|
],
|
|
)
|
|
},
|
|
extra=vol.ALLOW_EXTRA,
|
|
)
|
|
|
|
|
|
async def validate_input(hass: core.HomeAssistant, data):
|
|
"""Validate the user input allows us to connect."""
|
|
detected_dps = {}
|
|
|
|
interface = None
|
|
|
|
reset_ids = None
|
|
try:
|
|
interface = await pytuya.connect(
|
|
data[CONF_HOST],
|
|
data[CONF_DEVICE_ID],
|
|
data[CONF_LOCAL_KEY],
|
|
float(data[CONF_PROTOCOL_VERSION]),
|
|
)
|
|
if CONF_RESET_DPIDS in data:
|
|
reset_ids_str = data[CONF_RESET_DPIDS].split(",")
|
|
reset_ids = []
|
|
for reset_id in reset_ids_str:
|
|
reset_ids.append(int(reset_id.strip()))
|
|
_LOGGER.debug(
|
|
"Reset DPIDs configured: %s (%s)",
|
|
data[CONF_RESET_DPIDS],
|
|
reset_ids,
|
|
)
|
|
try:
|
|
detected_dps = await interface.detect_available_dps()
|
|
except Exception: # pylint: disable=broad-except
|
|
try:
|
|
_LOGGER.debug("Initial state update failed, trying reset command")
|
|
if len(reset_ids) > 0:
|
|
await interface.reset(reset_ids)
|
|
detected_dps = await interface.detect_available_dps()
|
|
except Exception: # pylint: disable=broad-except
|
|
_LOGGER.debug("No DPS able to be detected")
|
|
detected_dps = {}
|
|
|
|
# if manual DPs are set, merge these.
|
|
_LOGGER.debug("Detected DPS: %s", detected_dps)
|
|
if CONF_MANUAL_DPS in data:
|
|
|
|
manual_dps_list = [dps.strip() for dps in data[CONF_MANUAL_DPS].split(",")]
|
|
_LOGGER.debug(
|
|
"Manual DPS Setting: %s (%s)", data[CONF_MANUAL_DPS], manual_dps_list
|
|
)
|
|
# merge the lists
|
|
for new_dps in manual_dps_list + (reset_ids or []):
|
|
# If the DPS not in the detected dps list, then add with a
|
|
# default value indicating that it has been manually added
|
|
if str(new_dps) not in detected_dps:
|
|
detected_dps[new_dps] = -1
|
|
|
|
except (ConnectionRefusedError, ConnectionResetError) as ex:
|
|
raise CannotConnect from ex
|
|
except ValueError as ex:
|
|
raise InvalidAuth from ex
|
|
finally:
|
|
if interface:
|
|
await interface.close()
|
|
|
|
# Indicate an error if no datapoints found as the rest of the flow
|
|
# won't work in this case
|
|
if not detected_dps:
|
|
raise EmptyDpsList
|
|
|
|
_LOGGER.debug("Total DPS: %s", detected_dps)
|
|
|
|
return dps_string_list(detected_dps)
|
|
|
|
|
|
async def attempt_cloud_connection(hass, user_input):
|
|
"""Create device."""
|
|
cloud_api = TuyaCloudApi(
|
|
hass,
|
|
user_input.get(CONF_REGION),
|
|
user_input.get(CONF_CLIENT_ID),
|
|
user_input.get(CONF_CLIENT_SECRET),
|
|
user_input.get(CONF_USER_ID),
|
|
)
|
|
|
|
res = await cloud_api.async_get_access_token()
|
|
if res != "ok":
|
|
_LOGGER.error("Cloud API connection failed: %s", res)
|
|
return cloud_api, {"reason": "authentication_failed", "msg": res}
|
|
|
|
res = await cloud_api.async_get_devices_list()
|
|
if res != "ok":
|
|
_LOGGER.error("Cloud API get_devices_list failed: %s", res)
|
|
return cloud_api, {"reason": "device_list_failed", "msg": res}
|
|
_LOGGER.info("Cloud API connection succeeded.")
|
|
|
|
return cloud_api, {}
|
|
|
|
|
|
class LocaltuyaConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|
"""Handle a config flow for LocalTuya integration."""
|
|
|
|
VERSION = ENTRIES_VERSION
|
|
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
|
|
|
|
@staticmethod
|
|
@callback
|
|
def async_get_options_flow(config_entry):
|
|
"""Get options flow for this handler."""
|
|
return LocalTuyaOptionsFlowHandler(config_entry)
|
|
|
|
def __init__(self):
|
|
"""Initialize a new LocaltuyaConfigFlow."""
|
|
|
|
async def async_step_user(self, user_input=None):
|
|
"""Handle the initial step."""
|
|
errors = {}
|
|
placeholders = {}
|
|
if user_input is not None:
|
|
if user_input.get(CONF_NO_CLOUD):
|
|
for i in [CONF_CLIENT_ID, CONF_CLIENT_SECRET, CONF_USER_ID]:
|
|
user_input[i] = ""
|
|
return await self._create_entry(user_input)
|
|
|
|
cloud_api, res = await attempt_cloud_connection(self.hass, user_input)
|
|
|
|
if not res:
|
|
return await self._create_entry(user_input)
|
|
errors["base"] = res["reason"]
|
|
placeholders = {"msg": res["msg"]}
|
|
|
|
defaults = {}
|
|
defaults.update(user_input or {})
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
data_schema=schema_defaults(CLOUD_SETUP_SCHEMA, **defaults),
|
|
errors=errors,
|
|
description_placeholders=placeholders,
|
|
)
|
|
|
|
async def _create_entry(self, user_input):
|
|
"""Register new entry."""
|
|
# if self._async_current_entries():
|
|
# return self.async_abort(reason="already_configured")
|
|
|
|
await self.async_set_unique_id(user_input.get(CONF_USER_ID))
|
|
user_input[CONF_DEVICES] = {}
|
|
|
|
return self.async_create_entry(
|
|
title=user_input.get(CONF_USERNAME),
|
|
data=user_input,
|
|
)
|
|
|
|
async def async_step_import(self, user_input):
|
|
"""Handle import from YAML."""
|
|
_LOGGER.error(
|
|
"Configuration via YAML file is no longer supported by this integration."
|
|
)
|
|
|
|
|
|
class LocalTuyaOptionsFlowHandler(config_entries.OptionsFlow):
|
|
"""Handle options flow for LocalTuya integration."""
|
|
|
|
def __init__(self, config_entry):
|
|
"""Initialize localtuya options flow."""
|
|
self.config_entry = config_entry
|
|
# self.dps_strings = config_entry.data.get(CONF_DPS_STRINGS, gen_dps_strings())
|
|
# self.entities = config_entry.data[CONF_ENTITIES]
|
|
self.selected_device = None
|
|
self.editing_device = False
|
|
self.device_data = None
|
|
self.dps_strings = []
|
|
self.selected_platform = None
|
|
self.discovered_devices = {}
|
|
self.entities = []
|
|
|
|
async def async_step_init(self, user_input=None):
|
|
"""Manage basic options."""
|
|
# device_id = self.config_entry.data[CONF_DEVICE_ID]
|
|
if user_input is not None:
|
|
if user_input.get(CONF_ACTION) == CONF_SETUP_CLOUD:
|
|
return await self.async_step_cloud_setup()
|
|
if user_input.get(CONF_ACTION) == CONF_ADD_DEVICE:
|
|
return await self.async_step_add_device()
|
|
if user_input.get(CONF_ACTION) == CONF_EDIT_DEVICE:
|
|
return await self.async_step_edit_device()
|
|
|
|
return self.async_show_form(
|
|
step_id="init",
|
|
data_schema=CONFIGURE_SCHEMA,
|
|
)
|
|
|
|
async def async_step_cloud_setup(self, user_input=None):
|
|
"""Handle the initial step."""
|
|
errors = {}
|
|
placeholders = {}
|
|
if user_input is not None:
|
|
if user_input.get(CONF_NO_CLOUD):
|
|
new_data = self.config_entry.data.copy()
|
|
new_data.update(user_input)
|
|
for i in [CONF_CLIENT_ID, CONF_CLIENT_SECRET, CONF_USER_ID]:
|
|
new_data[i] = ""
|
|
self.hass.config_entries.async_update_entry(
|
|
self.config_entry,
|
|
data=new_data,
|
|
)
|
|
return self.async_create_entry(
|
|
title=new_data.get(CONF_USERNAME), data={}
|
|
)
|
|
|
|
cloud_api, res = await attempt_cloud_connection(self.hass, user_input)
|
|
|
|
if not res:
|
|
new_data = self.config_entry.data.copy()
|
|
new_data.update(user_input)
|
|
cloud_devs = cloud_api.device_list
|
|
for dev_id, dev in new_data[CONF_DEVICES].items():
|
|
if CONF_MODEL not in dev and dev_id in cloud_devs:
|
|
model = cloud_devs[dev_id].get(CONF_PRODUCT_NAME)
|
|
new_data[CONF_DEVICES][dev_id][CONF_MODEL] = model
|
|
new_data[ATTR_UPDATED_AT] = str(int(time.time() * 1000))
|
|
|
|
self.hass.config_entries.async_update_entry(
|
|
self.config_entry,
|
|
data=new_data,
|
|
)
|
|
return self.async_create_entry(
|
|
title=new_data.get(CONF_USERNAME), data={}
|
|
)
|
|
errors["base"] = res["reason"]
|
|
placeholders = {"msg": res["msg"]}
|
|
|
|
defaults = self.config_entry.data.copy()
|
|
defaults.update(user_input or {})
|
|
defaults[CONF_NO_CLOUD] = False
|
|
|
|
return self.async_show_form(
|
|
step_id="cloud_setup",
|
|
data_schema=schema_defaults(CLOUD_SETUP_SCHEMA, **defaults),
|
|
errors=errors,
|
|
description_placeholders=placeholders,
|
|
)
|
|
|
|
async def async_step_add_device(self, user_input=None):
|
|
"""Handle adding a new device."""
|
|
# Use cache if available or fallback to manual discovery
|
|
self.editing_device = False
|
|
self.selected_device = None
|
|
errors = {}
|
|
if user_input is not None:
|
|
if user_input[SELECTED_DEVICE] != CUSTOM_DEVICE:
|
|
self.selected_device = user_input[SELECTED_DEVICE]
|
|
|
|
return await self.async_step_configure_device()
|
|
|
|
self.discovered_devices = {}
|
|
data = self.hass.data.get(DOMAIN)
|
|
|
|
if data and DATA_DISCOVERY in data:
|
|
self.discovered_devices = data[DATA_DISCOVERY].devices
|
|
else:
|
|
try:
|
|
self.discovered_devices = await discover()
|
|
except OSError as ex:
|
|
if ex.errno == errno.EADDRINUSE:
|
|
errors["base"] = "address_in_use"
|
|
else:
|
|
errors["base"] = "discovery_failed"
|
|
except Exception: # pylint: disable= broad-except
|
|
_LOGGER.exception("discovery failed")
|
|
errors["base"] = "discovery_failed"
|
|
|
|
devices = {
|
|
dev_id: dev["ip"]
|
|
for dev_id, dev in self.discovered_devices.items()
|
|
if dev["gwId"] not in self.config_entry.data[CONF_DEVICES]
|
|
}
|
|
|
|
return self.async_show_form(
|
|
step_id="add_device",
|
|
data_schema=devices_schema(
|
|
devices, self.hass.data[DOMAIN][DATA_CLOUD].device_list
|
|
),
|
|
errors=errors,
|
|
)
|
|
|
|
async def async_step_edit_device(self, user_input=None):
|
|
"""Handle editing a device."""
|
|
self.editing_device = True
|
|
# Use cache if available or fallback to manual discovery
|
|
errors = {}
|
|
if user_input is not None:
|
|
self.selected_device = user_input[SELECTED_DEVICE]
|
|
dev_conf = self.config_entry.data[CONF_DEVICES][self.selected_device]
|
|
self.dps_strings = dev_conf.get(CONF_DPS_STRINGS, gen_dps_strings())
|
|
self.entities = dev_conf[CONF_ENTITIES]
|
|
|
|
return await self.async_step_configure_device()
|
|
|
|
devices = {}
|
|
for dev_id, configured_dev in self.config_entry.data[CONF_DEVICES].items():
|
|
devices[dev_id] = configured_dev[CONF_HOST]
|
|
|
|
return self.async_show_form(
|
|
step_id="edit_device",
|
|
data_schema=devices_schema(
|
|
devices, self.hass.data[DOMAIN][DATA_CLOUD].device_list, False
|
|
),
|
|
errors=errors,
|
|
)
|
|
|
|
async def async_step_configure_device(self, user_input=None):
|
|
"""Handle input of basic info."""
|
|
errors = {}
|
|
dev_id = self.selected_device
|
|
if user_input is not None:
|
|
try:
|
|
self.device_data = user_input.copy()
|
|
if dev_id is not None:
|
|
# self.device_data[CONF_PRODUCT_KEY] = self.devices[
|
|
# self.selected_device
|
|
# ]["productKey"]
|
|
cloud_devs = self.hass.data[DOMAIN][DATA_CLOUD].device_list
|
|
if dev_id in cloud_devs:
|
|
self.device_data[CONF_MODEL] = cloud_devs[dev_id].get(
|
|
CONF_PRODUCT_NAME
|
|
)
|
|
if self.editing_device:
|
|
self.device_data.update(
|
|
{
|
|
CONF_DEVICE_ID: dev_id,
|
|
CONF_DPS_STRINGS: self.dps_strings,
|
|
CONF_ENTITIES: [],
|
|
}
|
|
)
|
|
if user_input[CONF_ENTITIES]:
|
|
entity_ids = [
|
|
int(entity.split(":")[0])
|
|
for entity in user_input[CONF_ENTITIES]
|
|
]
|
|
device_config = self.config_entry.data[CONF_DEVICES][dev_id]
|
|
self.entities = [
|
|
entity
|
|
for entity in device_config[CONF_ENTITIES]
|
|
if entity[CONF_ID] in entity_ids
|
|
]
|
|
return await self.async_step_configure_entity()
|
|
|
|
self.dps_strings = await validate_input(self.hass, user_input)
|
|
return await self.async_step_pick_entity_type()
|
|
except CannotConnect:
|
|
errors["base"] = "cannot_connect"
|
|
except InvalidAuth:
|
|
errors["base"] = "invalid_auth"
|
|
except EmptyDpsList:
|
|
errors["base"] = "empty_dps"
|
|
except Exception: # pylint: disable=broad-except
|
|
_LOGGER.exception("Unexpected exception")
|
|
errors["base"] = "unknown"
|
|
|
|
defaults = {}
|
|
if self.editing_device:
|
|
# If selected device exists as a config entry, load config from it
|
|
defaults = self.config_entry.data[CONF_DEVICES][dev_id].copy()
|
|
schema = schema_defaults(options_schema(self.entities), **defaults)
|
|
placeholders = {"for_device": f" for device `{dev_id}`"}
|
|
else:
|
|
defaults[CONF_PROTOCOL_VERSION] = "3.3"
|
|
defaults[CONF_HOST] = ""
|
|
defaults[CONF_DEVICE_ID] = ""
|
|
defaults[CONF_LOCAL_KEY] = ""
|
|
defaults[CONF_FRIENDLY_NAME] = ""
|
|
if dev_id is not None:
|
|
# Insert default values from discovery and cloud if present
|
|
device = self.discovered_devices[dev_id]
|
|
defaults[CONF_HOST] = device.get("ip")
|
|
defaults[CONF_DEVICE_ID] = device.get("gwId")
|
|
defaults[CONF_PROTOCOL_VERSION] = device.get("version")
|
|
cloud_devs = self.hass.data[DOMAIN][DATA_CLOUD].device_list
|
|
if dev_id in cloud_devs:
|
|
defaults[CONF_LOCAL_KEY] = cloud_devs[dev_id].get(CONF_LOCAL_KEY)
|
|
defaults[CONF_FRIENDLY_NAME] = cloud_devs[dev_id].get(CONF_NAME)
|
|
schema = schema_defaults(CONFIGURE_DEVICE_SCHEMA, **defaults)
|
|
|
|
placeholders = {"for_device": ""}
|
|
|
|
return self.async_show_form(
|
|
step_id="configure_device",
|
|
data_schema=schema,
|
|
errors=errors,
|
|
description_placeholders=placeholders,
|
|
)
|
|
|
|
async def async_step_pick_entity_type(self, user_input=None):
|
|
"""Handle asking if user wants to add another entity."""
|
|
if user_input is not None:
|
|
if user_input.get(NO_ADDITIONAL_ENTITIES):
|
|
config = {
|
|
**self.device_data,
|
|
CONF_DPS_STRINGS: self.dps_strings,
|
|
CONF_ENTITIES: self.entities,
|
|
}
|
|
|
|
dev_id = self.device_data.get(CONF_DEVICE_ID)
|
|
if dev_id in self.config_entry.data[CONF_DEVICES]:
|
|
self.hass.config_entries.async_update_entry(
|
|
self.config_entry, data=config
|
|
)
|
|
return self.async_abort(
|
|
reason="device_success",
|
|
description_placeholders={
|
|
"dev_name": config.get(CONF_FRIENDLY_NAME),
|
|
"action": "updated",
|
|
},
|
|
)
|
|
|
|
new_data = self.config_entry.data.copy()
|
|
new_data[ATTR_UPDATED_AT] = str(int(time.time() * 1000))
|
|
new_data[CONF_DEVICES].update({dev_id: config})
|
|
|
|
self.hass.config_entries.async_update_entry(
|
|
self.config_entry,
|
|
data=new_data,
|
|
)
|
|
return self.async_create_entry(title="", data={})
|
|
|
|
self.selected_platform = user_input[PLATFORM_TO_ADD]
|
|
return await self.async_step_configure_entity()
|
|
|
|
# Add a checkbox that allows bailing out from config flow if at least one
|
|
# entity has been added
|
|
schema = PICK_ENTITY_SCHEMA
|
|
if self.selected_platform is not None:
|
|
schema = schema.extend(
|
|
{vol.Required(NO_ADDITIONAL_ENTITIES, default=True): bool}
|
|
)
|
|
|
|
return self.async_show_form(step_id="pick_entity_type", data_schema=schema)
|
|
|
|
def available_dps_strings(self):
|
|
"""Return list of DPs use by the device's entities."""
|
|
available_dps = []
|
|
used_dps = [str(entity[CONF_ID]) for entity in self.entities]
|
|
for dp_string in self.dps_strings:
|
|
dp = dp_string.split(" ")[0]
|
|
if dp not in used_dps:
|
|
available_dps.append(dp_string)
|
|
return available_dps
|
|
|
|
async def async_step_entity(self, user_input=None):
|
|
"""Manage entity settings."""
|
|
errors = {}
|
|
if user_input is not None:
|
|
entity = strip_dps_values(user_input, self.dps_strings)
|
|
entity[CONF_ID] = self.current_entity[CONF_ID]
|
|
entity[CONF_PLATFORM] = self.current_entity[CONF_PLATFORM]
|
|
self.device_data[CONF_ENTITIES].append(entity)
|
|
|
|
if len(self.entities) == len(self.device_data[CONF_ENTITIES]):
|
|
self.hass.config_entries.async_update_entry(
|
|
self.config_entry,
|
|
title=self.device_data[CONF_FRIENDLY_NAME],
|
|
data=self.device_data,
|
|
)
|
|
return self.async_create_entry(title="", data={})
|
|
|
|
schema = platform_schema(
|
|
self.current_entity[CONF_PLATFORM], self.dps_strings, allow_id=False
|
|
)
|
|
return self.async_show_form(
|
|
step_id="entity",
|
|
errors=errors,
|
|
data_schema=schema_defaults(
|
|
schema, self.dps_strings, **self.current_entity
|
|
),
|
|
description_placeholders={
|
|
"id": self.current_entity[CONF_ID],
|
|
"platform": self.current_entity[CONF_PLATFORM],
|
|
},
|
|
)
|
|
|
|
async def async_step_configure_entity(self, user_input=None):
|
|
"""Manage entity settings."""
|
|
errors = {}
|
|
if user_input is not None:
|
|
if self.editing_device:
|
|
entity = strip_dps_values(user_input, self.dps_strings)
|
|
entity[CONF_ID] = self.current_entity[CONF_ID]
|
|
entity[CONF_PLATFORM] = self.current_entity[CONF_PLATFORM]
|
|
self.device_data[CONF_ENTITIES].append(entity)
|
|
|
|
if len(self.entities) == len(self.device_data[CONF_ENTITIES]):
|
|
# finished editing device. Let's store the new config entry....
|
|
dev_id = self.device_data[CONF_DEVICE_ID]
|
|
new_data = self.config_entry.data.copy()
|
|
entry_id = self.config_entry.entry_id
|
|
# removing entities from registry (they will be recreated)
|
|
ent_reg = await er.async_get_registry(self.hass)
|
|
reg_entities = {
|
|
ent.unique_id: ent.entity_id
|
|
for ent in er.async_entries_for_config_entry(ent_reg, entry_id)
|
|
if dev_id in ent.unique_id
|
|
}
|
|
for entity_id in reg_entities.values():
|
|
ent_reg.async_remove(entity_id)
|
|
|
|
new_data[CONF_DEVICES][dev_id] = self.device_data
|
|
new_data[ATTR_UPDATED_AT] = str(int(time.time() * 1000))
|
|
self.hass.config_entries.async_update_entry(
|
|
self.config_entry,
|
|
data=new_data,
|
|
)
|
|
return self.async_create_entry(title="", data={})
|
|
else:
|
|
user_input[CONF_PLATFORM] = self.selected_platform
|
|
self.entities.append(strip_dps_values(user_input, self.dps_strings))
|
|
# new entity added. Let's check if there are more left...
|
|
user_input = None
|
|
if len(self.available_dps_strings()) == 0:
|
|
user_input = {NO_ADDITIONAL_ENTITIES: True}
|
|
return await self.async_step_pick_entity_type(user_input)
|
|
|
|
if self.editing_device:
|
|
schema = platform_schema(
|
|
self.current_entity[CONF_PLATFORM], self.dps_strings, allow_id=False
|
|
)
|
|
schema = schema_defaults(schema, self.dps_strings, **self.current_entity)
|
|
placeholders = {
|
|
"entity": f"entity with DP {self.current_entity[CONF_ID]}",
|
|
"platform": self.current_entity[CONF_PLATFORM],
|
|
}
|
|
else:
|
|
available_dps = self.available_dps_strings()
|
|
schema = platform_schema(self.selected_platform, available_dps)
|
|
placeholders = {
|
|
"entity": "an entity",
|
|
"platform": self.selected_platform,
|
|
}
|
|
|
|
return self.async_show_form(
|
|
step_id="configure_entity",
|
|
data_schema=schema,
|
|
errors=errors,
|
|
description_placeholders=placeholders,
|
|
)
|
|
|
|
async def async_step_yaml_import(self, user_input=None):
|
|
"""Manage YAML imports."""
|
|
_LOGGER.error(
|
|
"Configuration via YAML file is no longer supported by this integration."
|
|
)
|
|
# if user_input is not None:
|
|
# return self.async_create_entry(title="", data={})
|
|
# return self.async_show_form(step_id="yaml_import")
|
|
|
|
@property
|
|
def current_entity(self):
|
|
"""Existing configuration for entity currently being edited."""
|
|
return self.entities[len(self.device_data[CONF_ENTITIES])]
|
|
|
|
|
|
class CannotConnect(exceptions.HomeAssistantError):
|
|
"""Error to indicate we cannot connect."""
|
|
|
|
|
|
class InvalidAuth(exceptions.HomeAssistantError):
|
|
"""Error to indicate there is invalid auth."""
|
|
|
|
|
|
class EmptyDpsList(exceptions.HomeAssistantError):
|
|
"""Error to indicate no datapoints found."""
|