1 """Config flow for Homeassistant Analytics integration."""
3 from __future__
import annotations
7 from python_homeassistant_analytics
import (
8 HomeassistantAnalyticsClient,
9 HomeassistantAnalyticsConnectionError,
11 from python_homeassistant_analytics.models
import IntegrationType
12 import voluptuous
as vol
30 CONF_TRACKED_CUSTOM_INTEGRATIONS,
31 CONF_TRACKED_INTEGRATIONS,
36 INTEGRATION_TYPES_WITHOUT_ANALYTICS = (
37 IntegrationType.BRAND,
38 IntegrationType.ENTITY,
39 IntegrationType.VIRTUAL,
44 """Handle a config flow for Homeassistant Analytics."""
49 config_entry: ConfigEntry,
50 ) -> HomeassistantAnalyticsOptionsFlowHandler:
51 """Get the options flow for this handler."""
55 self, user_input: dict[str, Any] |
None =
None
56 ) -> ConfigFlowResult:
57 """Handle the initial step."""
58 errors: dict[str, str] = {}
59 if user_input
is not None:
62 not user_input.get(CONF_TRACKED_ADDONS),
63 not user_input.get(CONF_TRACKED_INTEGRATIONS),
64 not user_input.get(CONF_TRACKED_CUSTOM_INTEGRATIONS),
67 errors[
"base"] =
"no_integrations_selected"
70 title=
"Home Assistant Analytics Insights",
73 CONF_TRACKED_ADDONS: user_input.get(CONF_TRACKED_ADDONS, []),
74 CONF_TRACKED_INTEGRATIONS: user_input.get(
75 CONF_TRACKED_INTEGRATIONS, []
77 CONF_TRACKED_CUSTOM_INTEGRATIONS: user_input.get(
78 CONF_TRACKED_CUSTOM_INTEGRATIONS, []
83 client = HomeassistantAnalyticsClient(
87 addons = await client.get_addons()
88 integrations = await client.get_integrations()
89 custom_integrations = await client.get_custom_integrations()
90 except HomeassistantAnalyticsConnectionError:
91 LOGGER.exception(
"Error connecting to Home Assistant analytics")
94 LOGGER.exception(
"Unexpected error")
100 label=integration.title,
102 for domain, integration
in integrations.items()
103 if integration.integration_type
not in INTEGRATION_TYPES_WITHOUT_ANALYTICS
108 data_schema=vol.Schema(
112 options=
list(addons),
126 options=
list(custom_integrations),
137 """Handle Homeassistant Analytics options."""
140 self, user_input: dict[str, Any] |
None =
None
141 ) -> ConfigFlowResult:
142 """Manage the options."""
143 errors: dict[str, str] = {}
144 if user_input
is not None:
147 not user_input.get(CONF_TRACKED_ADDONS),
148 not user_input.get(CONF_TRACKED_INTEGRATIONS),
149 not user_input.get(CONF_TRACKED_CUSTOM_INTEGRATIONS),
152 errors[
"base"] =
"no_integrations_selected"
157 CONF_TRACKED_ADDONS: user_input.get(CONF_TRACKED_ADDONS, []),
158 CONF_TRACKED_INTEGRATIONS: user_input.get(
159 CONF_TRACKED_INTEGRATIONS, []
161 CONF_TRACKED_CUSTOM_INTEGRATIONS: user_input.get(
162 CONF_TRACKED_CUSTOM_INTEGRATIONS, []
167 client = HomeassistantAnalyticsClient(
171 addons = await client.get_addons()
172 integrations = await client.get_integrations()
173 custom_integrations = await client.get_custom_integrations()
174 except HomeassistantAnalyticsConnectionError:
175 LOGGER.exception(
"Error connecting to Home Assistant analytics")
176 return self.
async_abortasync_abort(reason=
"cannot_connect")
181 label=integration.title,
183 for domain, integration
in integrations.items()
184 if integration.integration_type
not in INTEGRATION_TYPES_WITHOUT_ANALYTICS
194 options=
list(addons),
208 options=
list(custom_integrations),
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
HomeassistantAnalyticsOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)