1 """Config flow for Mealie."""
3 from collections.abc
import Mapping
6 from aiomealie
import MealieAuthenticationError, MealieClient, MealieConnectionError
7 import voluptuous
as vol
13 from .const
import DOMAIN, LOGGER, MIN_REQUIRED_MEALIE_VERSION
14 from .utils
import create_version
16 USER_SCHEMA = vol.Schema(
18 vol.Required(CONF_HOST): str,
19 vol.Required(CONF_API_TOKEN): str,
20 vol.Optional(CONF_VERIFY_SSL, default=
True): bool,
23 REAUTH_SCHEMA = vol.Schema(
25 vol.Required(CONF_API_TOKEN): str,
31 """Mealie config flow."""
33 host: str |
None =
None
34 verify_ssl: bool =
True
38 ) -> tuple[dict[str, str], str |
None]:
39 """Check connection to the Mealie API."""
40 assert self.
hosthost
is not None
42 if "/hassio/ingress/" in self.
hosthost:
43 return {
"base":
"ingress_url"},
None
45 client = MealieClient(
51 info = await client.get_user_info()
52 about = await client.get_about()
54 except MealieConnectionError:
55 return {
"base":
"cannot_connect"},
None
56 except MealieAuthenticationError:
57 return {
"base":
"invalid_auth"},
None
59 LOGGER.exception(
"Unexpected error")
60 return {
"base":
"unknown"},
None
61 if version.valid
and version < MIN_REQUIRED_MEALIE_VERSION:
62 return {
"base":
"mealie_version"},
None
63 return {}, info.user_id
66 self, user_input: dict[str, Any] |
None =
None
67 ) -> ConfigFlowResult:
68 """Handle a flow initialized by the user."""
69 errors: dict[str, str] = {}
71 self.
hosthost = user_input[CONF_HOST]
74 user_input[CONF_API_TOKEN],
85 data_schema=USER_SCHEMA,
89 async
def async_step_reauth(
90 self, entry_data: Mapping[str, Any]
91 ) -> ConfigFlowResult:
92 """Perform reauth upon an API authentication error."""
93 self.
hosthost = entry_data[CONF_HOST]
94 self.
verify_sslverify_ssl = entry_data.get(CONF_VERIFY_SSL,
True)
95 return await self.async_step_reauth_confirm()
97 async
def async_step_reauth_confirm(
98 self, user_input: dict[str, Any] |
None =
None
99 ) -> ConfigFlowResult:
100 """Confirm reauth dialog."""
101 errors: dict[str, str] = {}
104 user_input[CONF_API_TOKEN],
111 data_updates={CONF_API_TOKEN: user_input[CONF_API_TOKEN]},
114 step_id=
"reauth_confirm",
115 data_schema=REAUTH_SCHEMA,
119 async
def async_step_reconfigure(
120 self, user_input: dict[str, Any] |
None =
None
121 ) -> ConfigFlowResult:
122 """Handle reconfiguration of the integration."""
123 errors: dict[str, str] = {}
125 self.
hosthost = user_input[CONF_HOST]
126 self.
verify_sslverify_ssl = user_input[CONF_VERIFY_SSL]
128 user_input[CONF_API_TOKEN],
136 CONF_VERIFY_SSL: user_input[CONF_VERIFY_SSL],
137 CONF_HOST: user_input[CONF_HOST],
138 CONF_API_TOKEN: user_input[CONF_API_TOKEN],
142 step_id=
"reconfigure",
143 data_schema=USER_SCHEMA,
tuple[dict[str, str], str|None] check_connection(self, str api_token)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry _get_reauth_entry(self)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_update_reload_and_abort(self, ConfigEntry entry, *str|None|UndefinedType unique_id=UNDEFINED, str|UndefinedType title=UNDEFINED, Mapping[str, Any]|UndefinedType data=UNDEFINED, Mapping[str, Any]|UndefinedType data_updates=UNDEFINED, Mapping[str, Any]|UndefinedType options=UNDEFINED, str|UndefinedType reason=UNDEFINED, bool reload_even_if_entry_is_unchanged=True)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigEntry _get_reconfigure_entry(self)
None _abort_if_unique_id_mismatch(self, *str reason="unique_id_mismatch", Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
AwesomeVersion create_version(str version)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)