Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Auth providers for Home Assistant."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 import logging
7 import types
8 from typing import Any
9 
10 import voluptuous as vol
11 from voluptuous.humanize import humanize_error
12 
13 from homeassistant import requirements
14 from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE
15 from homeassistant.core import HomeAssistant, callback
16 from homeassistant.data_entry_flow import FlowHandler
17 from homeassistant.exceptions import HomeAssistantError
18 from homeassistant.helpers.importlib import async_import_module
19 from homeassistant.util import dt as dt_util
20 from homeassistant.util.decorator import Registry
21 from homeassistant.util.hass_dict import HassKey
22 
23 from ..auth_store import AuthStore
24 from ..const import MFA_SESSION_EXPIRATION
25 from ..models import (
26  AuthFlowContext,
27  AuthFlowResult,
28  Credentials,
29  RefreshToken,
30  User,
31  UserMeta,
32 )
33 
34 _LOGGER = logging.getLogger(__name__)
35 DATA_REQS: HassKey[set[str]] = HassKey("auth_prov_reqs_processed")
36 
37 AUTH_PROVIDERS: Registry[str, type[AuthProvider]] = Registry()
38 
39 AUTH_PROVIDER_SCHEMA = vol.Schema(
40  {
41  vol.Required(CONF_TYPE): str,
42  vol.Optional(CONF_NAME): str,
43  # Specify ID if you have two auth providers for same type.
44  vol.Optional(CONF_ID): str,
45  },
46  extra=vol.ALLOW_EXTRA,
47 )
48 
49 
51  """Provider of user authentication."""
52 
53  DEFAULT_TITLE = "Unnamed auth provider"
54 
55  def __init__(
56  self, hass: HomeAssistant, store: AuthStore, config: dict[str, Any]
57  ) -> None:
58  """Initialize an auth provider."""
59  self.hasshass = hass
60  self.storestore = store
61  self.configconfig = config
62 
63  @property
64  def id(self) -> str | None:
65  """Return id of the auth provider.
66 
67  Optional, can be None.
68  """
69  return self.configconfig.get(CONF_ID)
70 
71  @property
72  def type(self) -> str:
73  """Return type of the provider."""
74  return self.configconfig[CONF_TYPE] # type: ignore[no-any-return]
75 
76  @property
77  def name(self) -> str:
78  """Return the name of the auth provider."""
79  return self.configconfig.get(CONF_NAME, self.DEFAULT_TITLEDEFAULT_TITLE) # type: ignore[no-any-return]
80 
81  @property
82  def support_mfa(self) -> bool:
83  """Return whether multi-factor auth supported by the auth provider."""
84  return True
85 
86  async def async_credentials(self) -> list[Credentials]:
87  """Return all credentials of this provider."""
88  users = await self.storestore.async_get_users()
89  return [
90  credentials
91  for user in users
92  for credentials in user.credentials
93  if (
94  credentials.auth_provider_type == self.typetype
95  and credentials.auth_provider_id == self.idid
96  )
97  ]
98 
99  @callback
100  def async_create_credentials(self, data: dict[str, str]) -> Credentials:
101  """Create credentials."""
102  return Credentials(
103  auth_provider_type=self.typetype, auth_provider_id=self.idid, data=data
104  )
105 
106  # Implement by extending class
107 
108  async def async_login_flow(self, context: AuthFlowContext | None) -> LoginFlow:
109  """Return the data flow for logging in with auth provider.
110 
111  Auth provider should extend LoginFlow and return an instance.
112  """
113  raise NotImplementedError
114 
116  self, flow_result: Mapping[str, str]
117  ) -> Credentials:
118  """Get credentials based on the flow result."""
119  raise NotImplementedError
120 
122  self, credentials: Credentials
123  ) -> UserMeta:
124  """Return extra user metadata for credentials.
125 
126  Will be used to populate info when creating a new user.
127  """
128  raise NotImplementedError
129 
130  async def async_initialize(self) -> None:
131  """Initialize the auth provider."""
132 
133  @callback
135  self, refresh_token: RefreshToken, remote_ip: str | None = None
136  ) -> None:
137  """Verify a refresh token is still valid.
138 
139  Optional hook for an auth provider to verify validity of a refresh token.
140  Should raise InvalidAuthError on errors.
141  """
142 
143 
145  hass: HomeAssistant, store: AuthStore, config: dict[str, Any]
146 ) -> AuthProvider:
147  """Initialize an auth provider from a config."""
148  provider_name: str = config[CONF_TYPE]
149  module = await load_auth_provider_module(hass, provider_name)
150 
151  try:
152  config = module.CONFIG_SCHEMA(config)
153  except vol.Invalid as err:
154  _LOGGER.error(
155  "Invalid configuration for auth provider %s: %s",
156  provider_name,
157  humanize_error(config, err),
158  )
159  raise
160 
161  return AUTH_PROVIDERS[provider_name](hass, store, config)
162 
163 
165  hass: HomeAssistant, provider: str
166 ) -> types.ModuleType:
167  """Load an auth provider."""
168  try:
169  module = await async_import_module(
170  hass, f"homeassistant.auth.providers.{provider}"
171  )
172  except ImportError as err:
173  _LOGGER.error("Unable to load auth provider %s: %s", provider, err)
174  raise HomeAssistantError(
175  f"Unable to load auth provider {provider}: {err}"
176  ) from err
177 
178  if hass.config.skip_pip or not hasattr(module, "REQUIREMENTS"):
179  return module
180 
181  if (processed := hass.data.get(DATA_REQS)) is None:
182  processed = hass.data[DATA_REQS] = set()
183  elif provider in processed:
184  return module
185 
186  reqs = module.REQUIREMENTS
187  await requirements.async_process_requirements(
188  hass, f"auth provider {provider}", reqs
189  )
190 
191  processed.add(provider)
192  return module
193 
194 
195 class LoginFlow(FlowHandler[AuthFlowContext, AuthFlowResult, tuple[str, str]]):
196  """Handler for the login flow."""
197 
198  _flow_result = AuthFlowResult
199 
200  def __init__(self, auth_provider: AuthProvider) -> None:
201  """Initialize the login flow."""
202  self._auth_provider_auth_provider = auth_provider
203  self._auth_module_id_auth_module_id: str | None = None
204  self._auth_manager_auth_manager = auth_provider.hass.auth
205  self.available_mfa_modules: dict[str, str] = {}
206  self.created_atcreated_at = dt_util.utcnow()
207  self.invalid_mfa_timesinvalid_mfa_times = 0
208  self.user: User | None = None
209  self.credential: Credentials | None = None
210 
211  async def async_step_init(
212  self, user_input: dict[str, str] | None = None
213  ) -> AuthFlowResult:
214  """Handle the first step of login flow.
215 
216  Return self.async_show_form(step_id='init') if user_input is None.
217  Return await self.async_finish(flow_result) if login init step pass.
218  """
219  raise NotImplementedError
220 
222  self, user_input: dict[str, str] | None = None
223  ) -> AuthFlowResult:
224  """Handle the step of select mfa module."""
225  errors = {}
226 
227  if user_input is not None:
228  auth_module = user_input.get("multi_factor_auth_module")
229  if auth_module in self.available_mfa_modules:
230  self._auth_module_id_auth_module_id = auth_module
231  return await self.async_step_mfaasync_step_mfa()
232  errors["base"] = "invalid_auth_module"
233 
234  if len(self.available_mfa_modules) == 1:
235  self._auth_module_id_auth_module_id = list(self.available_mfa_modules)[0]
236  return await self.async_step_mfaasync_step_mfa()
237 
238  return self.async_show_formasync_show_form(
239  step_id="select_mfa_module",
240  data_schema=vol.Schema(
241  {"multi_factor_auth_module": vol.In(self.available_mfa_modules)}
242  ),
243  errors=errors,
244  )
245 
246  async def async_step_mfa(
247  self, user_input: dict[str, str] | None = None
248  ) -> AuthFlowResult:
249  """Handle the step of mfa validation."""
250  assert self.credential
251  assert self.user
252 
253  errors = {}
254 
255  assert self._auth_module_id_auth_module_id is not None
256  auth_module = self._auth_manager_auth_manager.get_auth_mfa_module(self._auth_module_id_auth_module_id)
257  if auth_module is None:
258  # Given an invalid input to async_step_select_mfa_module
259  # will show invalid_auth_module error
260  return await self.async_step_select_mfa_moduleasync_step_select_mfa_module(user_input={})
261 
262  if user_input is None and hasattr(
263  auth_module, "async_initialize_login_mfa_step"
264  ):
265  try:
266  await auth_module.async_initialize_login_mfa_step(self.user.id)
267  except HomeAssistantError:
268  _LOGGER.exception("Error initializing MFA step")
269  return self.async_abortasync_abort(reason="unknown_error")
270 
271  if user_input is not None:
272  expires = self.created_atcreated_at + MFA_SESSION_EXPIRATION
273  if dt_util.utcnow() > expires:
274  return self.async_abortasync_abort(reason="login_expired")
275 
276  result = await auth_module.async_validate(self.user.id, user_input)
277  if not result:
278  errors["base"] = "invalid_code"
279  self.invalid_mfa_timesinvalid_mfa_times += 1
280  if self.invalid_mfa_timesinvalid_mfa_times >= auth_module.MAX_RETRY_TIME > 0:
281  return self.async_abortasync_abort(reason="too_many_retry")
282 
283  if not errors:
284  return await self.async_finishasync_finish(self.credential)
285 
286  description_placeholders: dict[str, str] = {
287  "mfa_module_name": auth_module.name,
288  "mfa_module_id": auth_module.id,
289  }
290 
291  return self.async_show_formasync_show_form(
292  step_id="mfa",
293  data_schema=auth_module.input_schema,
294  description_placeholders=description_placeholders,
295  errors=errors,
296  )
297 
298  async def async_finish(self, flow_result: Any) -> AuthFlowResult:
299  """Handle the pass of login flow."""
300  return self.async_create_entryasync_create_entry(data=flow_result)
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
Definition: __init__.py:117
None __init__(self, HomeAssistant hass, AuthStore store, dict[str, Any] config)
Definition: __init__.py:57
None async_validate_refresh_token(self, RefreshToken refresh_token, str|None remote_ip=None)
Definition: __init__.py:136
UserMeta async_user_meta_for_credentials(self, Credentials credentials)
Definition: __init__.py:123
list[Credentials] async_credentials(self)
Definition: __init__.py:86
Credentials async_create_credentials(self, dict[str, str] data)
Definition: __init__.py:100
LoginFlow async_login_flow(self, AuthFlowContext|None context)
Definition: __init__.py:108
AuthFlowResult async_step_select_mfa_module(self, dict[str, str]|None user_input=None)
Definition: __init__.py:223
None __init__(self, AuthProvider auth_provider)
Definition: __init__.py:200
AuthFlowResult async_finish(self, Any flow_result)
Definition: __init__.py:298
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
Definition: __init__.py:213
AuthFlowResult async_step_mfa(self, dict[str, str]|None user_input=None)
Definition: __init__.py:248
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
AuthProvider auth_provider_from_config(HomeAssistant hass, AuthStore store, dict[str, Any] config)
Definition: __init__.py:146
types.ModuleType load_auth_provider_module(HomeAssistant hass, str provider)
Definition: __init__.py:166
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
list[str] async_get_users(HomeAssistant hass)
Definition: http.py:394
str humanize_error(HomeAssistant hass, vol.Invalid validation_error, str domain, dict config, str|None link, int max_sub_error_length=MAX_VALIDATION_ERROR_ITEM_LENGTH)
Definition: config.py:520
ModuleType async_import_module(HomeAssistant hass, str name)
Definition: importlib.py:30