1 """Config flow for Neato Botvac."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
12 from .const
import NEATO_DOMAIN
16 config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=NEATO_DOMAIN
18 """Config flow to handle Neato Botvac OAuth2 authentication."""
25 return logging.getLogger(__name__)
28 self, user_input: dict[str, Any] |
None =
None
29 ) -> ConfigFlowResult:
30 """Create an entry for the flow."""
31 current_entries = self._async_current_entries()
32 if self.
sourcesource != SOURCE_REAUTH
and current_entries:
34 return self.async_abort(reason=
"already_configured")
39 self, entry_data: Mapping[str, Any]
40 ) -> ConfigFlowResult:
41 """Perform reauth upon migration of old entries."""
45 self, user_input: dict[str, Any] |
None =
None
46 ) -> ConfigFlowResult:
47 """Confirm reauth upon migration of old entries."""
48 if user_input
is None:
49 return self.async_show_form(step_id=
"reauth_confirm")
53 """Create an entry for the flow. Update an entry if one already exist."""
54 current_entries = self._async_current_entries()
55 if self.
sourcesource == SOURCE_REAUTH
and current_entries:
57 self.hass.config_entries.async_update_entry(
58 current_entries[0], title=self.flow_impl.name, data=data
60 self.hass.async_create_task(
61 self.hass.config_entries.async_reload(current_entries[0].entry_id)
63 return self.async_abort(reason=
"reauth_successful")
64 return self.async_create_entry(title=self.flow_impl.name, data=data)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
logging.Logger logger(self)
ConfigFlowResult async_oauth_create_entry(self, dict[str, Any] data)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)