1 """Config flow for Risco integration."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
9 from pyrisco
import CannotConnectError, RiscoCloud, RiscoLocal, UnauthorizedError
10 import voluptuous
as vol
32 CONF_CODE_ARM_REQUIRED,
33 CONF_CODE_DISARM_REQUIRED,
34 CONF_COMMUNICATION_DELAY,
36 CONF_HA_STATES_TO_RISCO,
37 CONF_RISCO_STATES_TO_HA,
38 DEFAULT_ADVANCED_OPTIONS,
41 MAX_COMMUNICATION_DELAY,
46 _LOGGER = logging.getLogger(__name__)
49 CLOUD_SCHEMA = vol.Schema(
51 vol.Required(CONF_USERNAME): str,
52 vol.Required(CONF_PASSWORD): str,
53 vol.Required(CONF_PIN): str,
56 LOCAL_SCHEMA = vol.Schema(
58 vol.Required(CONF_HOST): str,
59 vol.Required(CONF_PORT, default=1000): int,
60 vol.Required(CONF_PIN): str,
64 AlarmControlPanelState.ARMED_AWAY.value,
65 AlarmControlPanelState.ARMED_HOME.value,
66 AlarmControlPanelState.ARMED_NIGHT.value,
67 AlarmControlPanelState.ARMED_CUSTOM_BYPASS.value,
72 hass: HomeAssistant, data: dict[str, Any]
74 """Validate the user input allows us to connect to Risco Cloud.
76 Data has the keys from CLOUD_SCHEMA with values provided by the user.
78 risco = RiscoCloud(data[CONF_USERNAME], data[CONF_PASSWORD], data[CONF_PIN])
85 return {
"title": risco.site_name}
89 hass: HomeAssistant, data: Mapping[str, str]
91 """Validate the user input allows us to connect to a local panel.
93 Data has the keys from LOCAL_SCHEMA with values provided by the user.
101 communication_delay=comm_delay,
104 await risco.connect()
105 except CannotConnectError:
106 if comm_delay >= MAX_COMMUNICATION_DELAY:
113 await risco.disconnect()
114 return {
"title": site_id,
"comm_delay": comm_delay}
118 """Handle a config flow for Risco."""
123 """Init the config flow."""
129 config_entry: ConfigEntry,
130 ) -> RiscoOptionsFlowHandler:
131 """Define the config flow to handle options."""
135 self, user_input: dict[str, Any] |
None =
None
136 ) -> ConfigFlowResult:
137 """Handle the initial step."""
140 menu_options=[
"cloud",
"local"],
144 self, user_input: dict[str, Any] |
None =
None
145 ) -> ConfigFlowResult:
146 """Configure a cloud based alarm."""
147 errors: dict[str, str] = {}
148 if user_input
is not None:
155 except CannotConnectError:
156 errors[
"base"] =
"cannot_connect"
157 except UnauthorizedError:
158 errors[
"base"] =
"invalid_auth"
160 _LOGGER.exception(
"Unexpected exception")
161 errors[
"base"] =
"unknown"
165 self.hass.config_entries.async_update_entry(
168 unique_id=user_input[CONF_USERNAME],
170 await self.hass.config_entries.async_reload(self.
_reauth_entry_reauth_entry.entry_id)
174 step_id=
"cloud", data_schema=CLOUD_SCHEMA, errors=errors
178 self, entry_data: Mapping[str, Any]
179 ) -> ConfigFlowResult:
180 """Handle configuration by re-auth."""
185 self, user_input: dict[str, Any] |
None =
None
186 ) -> ConfigFlowResult:
187 """Configure a local based alarm."""
188 errors: dict[str, str] = {}
189 if user_input
is not None:
192 except CannotConnectError
as ex:
193 _LOGGER.debug(
"Cannot connect", exc_info=ex)
194 errors[
"base"] =
"cannot_connect"
195 except UnauthorizedError:
196 errors[
"base"] =
"invalid_auth"
198 _LOGGER.exception(
"Unexpected exception")
199 errors[
"base"] =
"unknown"
208 CONF_TYPE: TYPE_LOCAL,
209 CONF_COMMUNICATION_DELAY: info[
"comm_delay"],
214 step_id=
"local", data_schema=LOCAL_SCHEMA, errors=errors
219 """Handle a Risco options flow."""
221 def __init__(self, config_entry: ConfigEntry) ->
None:
223 self.
_data_data = {**DEFAULT_OPTIONS, **config_entry.options}
229 CONF_CODE_ARM_REQUIRED, default=self.
_data_data[CONF_CODE_ARM_REQUIRED]
232 CONF_CODE_DISARM_REQUIRED,
233 default=self.
_data_data[CONF_CODE_DISARM_REQUIRED],
238 self.
_data_data = {**DEFAULT_ADVANCED_OPTIONS, **self.
_data_data}
239 schema = schema.extend(
242 CONF_SCAN_INTERVAL, default=self.
_data_data[CONF_SCAN_INTERVAL]
245 CONF_CONCURRENCY, default=self.
_data_data[CONF_CONCURRENCY]
252 self, user_input: dict[str, Any] |
None =
None
253 ) -> ConfigFlowResult:
254 """Manage the options."""
255 if user_input
is not None:
256 self.
_data_data = {**self.
_data_data, **user_input}
262 self, user_input: dict[str, Any] |
None =
None
263 ) -> ConfigFlowResult:
264 """Map Risco states to HA states."""
265 if user_input
is not None:
266 self.
_data_data[CONF_RISCO_STATES_TO_HA] = user_input
269 risco_to_ha = self.
_data_data[CONF_RISCO_STATES_TO_HA]
270 options = vol.Schema(
272 vol.Required(risco_state, default=risco_to_ha[risco_state]): vol.In(
275 for risco_state
in RISCO_STATES
279 return self.
async_show_formasync_show_form(step_id=
"risco_to_ha", data_schema=options)
282 self, user_input: dict[str, Any] |
None =
None
283 ) -> ConfigFlowResult:
284 """Map HA states to Risco states."""
285 if user_input
is not None:
286 self.
_data_data[CONF_HA_STATES_TO_RISCO] = user_input
290 risco_to_ha = self.
_data_data[CONF_RISCO_STATES_TO_HA]
293 for ha_state
in HA_STATES:
294 if ha_state
not in risco_to_ha.values():
299 for risco_state
in RISCO_STATES
300 if risco_to_ha[risco_state] == ha_state
302 current = self.
_data_data[CONF_HA_STATES_TO_RISCO].
get(ha_state)
303 if current
not in values:
305 options[vol.Required(ha_state, default=current)] = vol.In(values)
308 step_id=
"ha_to_risco", data_schema=vol.Schema(options)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
RiscoOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_local(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult async_step_cloud(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
vol.Schema _options_schema(self)
ConfigFlowResult async_step_risco_to_ha(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_ha_to_risco(self, dict[str, Any]|None user_input=None)
None __init__(self, ConfigEntry config_entry)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
bool show_advanced_options(self)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_menu(self, *str|None step_id=None, Container[str] menu_options, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
web.Response get(self, web.Request request, str config_key)
dict[str, Any] validate_local_input(HomeAssistant hass, Mapping[str, str] data)
dict[str, str] validate_cloud_input(HomeAssistant hass, dict[str, Any] data)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)