Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """HMAC-based One-time Password auth module.
2 
3 Sending HOTP through notify service
4 """
5 
6 from __future__ import annotations
7 
8 import asyncio
9 from collections import OrderedDict
10 import logging
11 from typing import Any, cast
12 
13 import attr
14 import voluptuous as vol
15 
16 from homeassistant.const import CONF_EXCLUDE, CONF_INCLUDE
17 from homeassistant.core import HomeAssistant, callback
18 from homeassistant.data_entry_flow import FlowResult
19 from homeassistant.exceptions import ServiceNotFound
20 from homeassistant.helpers import config_validation as cv
21 from homeassistant.helpers.storage import Store
22 
23 from . import (
24  MULTI_FACTOR_AUTH_MODULE_SCHEMA,
25  MULTI_FACTOR_AUTH_MODULES,
26  MultiFactorAuthModule,
27  SetupFlow,
28 )
29 
30 REQUIREMENTS = ["pyotp==2.8.0"]
31 
32 CONF_MESSAGE = "message"
33 
34 CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend(
35  {
36  vol.Optional(CONF_INCLUDE): vol.All(cv.ensure_list, [cv.string]),
37  vol.Optional(CONF_EXCLUDE): vol.All(cv.ensure_list, [cv.string]),
38  vol.Optional(CONF_MESSAGE, default="{} is your Home Assistant login code"): str,
39  },
40  extra=vol.PREVENT_EXTRA,
41 )
42 
43 STORAGE_VERSION = 1
44 STORAGE_KEY = "auth_module.notify"
45 STORAGE_USERS = "users"
46 STORAGE_USER_ID = "user_id"
47 
48 INPUT_FIELD_CODE = "code"
49 
50 _LOGGER = logging.getLogger(__name__)
51 
52 
53 def _generate_secret() -> str:
54  """Generate a secret."""
55  import pyotp # pylint: disable=import-outside-toplevel
56 
57  return str(pyotp.random_base32())
58 
59 
60 def _generate_random() -> int:
61  """Generate a 32 digit number."""
62  import pyotp # pylint: disable=import-outside-toplevel
63 
64  return int(pyotp.random_base32(length=32, chars=list("1234567890")))
65 
66 
67 def _generate_otp(secret: str, count: int) -> str:
68  """Generate one time password."""
69  import pyotp # pylint: disable=import-outside-toplevel
70 
71  return str(pyotp.HOTP(secret).at(count))
72 
73 
74 def _verify_otp(secret: str, otp: str, count: int) -> bool:
75  """Verify one time password."""
76  import pyotp # pylint: disable=import-outside-toplevel
77 
78  return bool(pyotp.HOTP(secret).verify(otp, count))
79 
80 
81 @attr.s(slots=True)
83  """Store notify setting for one user."""
84 
85  secret: str = attr.ib(factory=_generate_secret) # not persistent
86  counter: int = attr.ib(factory=_generate_random) # not persistent
87  notify_service: str | None = attr.ib(default=None)
88  target: str | None = attr.ib(default=None)
89 
90 
91 type _UsersDict = dict[str, NotifySetting]
92 
93 
94 @MULTI_FACTOR_AUTH_MODULES.register("notify")
95 class NotifyAuthModule(MultiFactorAuthModule):
96  """Auth module send hmac-based one time password by notify service."""
97 
98  DEFAULT_TITLE = "Notify One-Time Password"
99 
100  def __init__(self, hass: HomeAssistant, config: dict[str, Any]) -> None:
101  """Initialize the user data store."""
102  super().__init__(hass, config)
103  self._user_settings_user_settings: _UsersDict | None = None
104  self._user_store_user_store = Store[dict[str, dict[str, Any]]](
105  hass, STORAGE_VERSION, STORAGE_KEY, private=True, atomic_writes=True
106  )
107  self._include_include = config.get(CONF_INCLUDE, [])
108  self._exclude_exclude = config.get(CONF_EXCLUDE, [])
109  self._message_template_message_template = config[CONF_MESSAGE]
110  self._init_lock_init_lock = asyncio.Lock()
111 
112  @property
113  def input_schema(self) -> vol.Schema:
114  """Validate login flow input data."""
115  return vol.Schema({vol.Required(INPUT_FIELD_CODE): str})
116 
117  async def _async_load(self) -> None:
118  """Load stored data."""
119  async with self._init_lock_init_lock:
120  if self._user_settings_user_settings is not None:
121  return
122 
123  if (data := await self._user_store_user_store.async_load()) is None:
124  data = cast(dict[str, dict[str, Any]], {STORAGE_USERS: {}})
125 
126  self._user_settings_user_settings = {
127  user_id: NotifySetting(**setting)
128  for user_id, setting in data.get(STORAGE_USERS, {}).items()
129  }
130 
131  async def _async_save(self) -> None:
132  """Save data."""
133  if self._user_settings_user_settings is None:
134  return
135 
136  await self._user_store_user_store.async_save(
137  {
138  STORAGE_USERS: {
139  user_id: attr.asdict(
140  notify_setting,
141  filter=attr.filters.exclude(
142  attr.fields(NotifySetting).secret,
143  attr.fields(NotifySetting).counter,
144  ),
145  )
146  for user_id, notify_setting in self._user_settings_user_settings.items()
147  }
148  }
149  )
150 
151  @callback
152  def aync_get_available_notify_services(self) -> list[str]:
153  """Return list of notify services."""
154  unordered_services = set()
155 
156  for service in self.hass.services.async_services_for_domain("notify"):
157  if service not in self._exclude_exclude:
158  unordered_services.add(service)
159 
160  if self._include_include:
161  unordered_services &= set(self._include_include)
162 
163  return sorted(unordered_services)
164 
165  async def async_setup_flow(self, user_id: str) -> SetupFlow:
166  """Return a data entry flow handler for setup module.
167 
168  Mfa module should extend SetupFlow
169  """
170  return NotifySetupFlow(
171  self, self.input_schemainput_schema, user_id, self.aync_get_available_notify_servicesaync_get_available_notify_services()
172  )
173 
174  async def async_setup_user(self, user_id: str, setup_data: Any) -> Any:
175  """Set up auth module for user."""
176  if self._user_settings_user_settings is None:
177  await self._async_load_async_load()
178  assert self._user_settings_user_settings is not None
179 
180  self._user_settings_user_settings[user_id] = NotifySetting(
181  notify_service=setup_data.get("notify_service"),
182  target=setup_data.get("target"),
183  )
184 
185  await self._async_save_async_save()
186 
187  async def async_depose_user(self, user_id: str) -> None:
188  """Depose auth module for user."""
189  if self._user_settings_user_settings is None:
190  await self._async_load_async_load()
191  assert self._user_settings_user_settings is not None
192 
193  if self._user_settings_user_settings.pop(user_id, None):
194  await self._async_save_async_save()
195 
196  async def async_is_user_setup(self, user_id: str) -> bool:
197  """Return whether user is setup."""
198  if self._user_settings_user_settings is None:
199  await self._async_load_async_load()
200  assert self._user_settings_user_settings is not None
201 
202  return user_id in self._user_settings_user_settings
203 
204  async def async_validate(self, user_id: str, user_input: dict[str, Any]) -> bool:
205  """Return True if validation passed."""
206  if self._user_settings_user_settings is None:
207  await self._async_load_async_load()
208  assert self._user_settings_user_settings is not None
209 
210  if (notify_setting := self._user_settings_user_settings.get(user_id)) is None:
211  return False
212 
213  # user_input has been validate in caller
214  return await self.hass.async_add_executor_job(
215  _verify_otp,
216  notify_setting.secret,
217  user_input.get(INPUT_FIELD_CODE, ""),
218  notify_setting.counter,
219  )
220 
221  async def async_initialize_login_mfa_step(self, user_id: str) -> None:
222  """Generate code and notify user."""
223  if self._user_settings_user_settings is None:
224  await self._async_load_async_load()
225  assert self._user_settings_user_settings is not None
226 
227  if (notify_setting := self._user_settings_user_settings.get(user_id)) is None:
228  raise ValueError("Cannot find user_id")
229 
230  def generate_secret_and_one_time_password() -> str:
231  """Generate and send one time password."""
232  assert notify_setting
233  # secret and counter are not persistent
234  notify_setting.secret = _generate_secret()
235  notify_setting.counter = _generate_random()
236  return _generate_otp(notify_setting.secret, notify_setting.counter)
237 
238  code = await self.hass.async_add_executor_job(
239  generate_secret_and_one_time_password
240  )
241 
242  await self.async_notify_userasync_notify_user(user_id, code)
243 
244  async def async_notify_user(self, user_id: str, code: str) -> None:
245  """Send code by user's notify service."""
246  if self._user_settings_user_settings is None:
247  await self._async_load_async_load()
248  assert self._user_settings_user_settings is not None
249 
250  if (notify_setting := self._user_settings_user_settings.get(user_id)) is None:
251  _LOGGER.error("Cannot find user %s", user_id)
252  return
253 
254  await self.async_notifyasync_notify(
255  code,
256  notify_setting.notify_service, # type: ignore[arg-type]
257  notify_setting.target,
258  )
259 
260  async def async_notify(
261  self, code: str, notify_service: str, target: str | None = None
262  ) -> None:
263  """Send code by notify service."""
264  data = {"message": self._message_template_message_template.format(code)}
265  if target:
266  data["target"] = [target]
267 
268  await self.hass.services.async_call("notify", notify_service, data)
269 
270 
271 class NotifySetupFlow(SetupFlow):
272  """Handler for the setup flow."""
273 
274  def __init__(
275  self,
276  auth_module: NotifyAuthModule,
277  setup_schema: vol.Schema,
278  user_id: str,
279  available_notify_services: list[str],
280  ) -> None:
281  """Initialize the setup flow."""
282  super().__init__(auth_module, setup_schema, user_id)
283  # to fix typing complaint
284  self._auth_module: NotifyAuthModule = auth_module
285  self._available_notify_services_available_notify_services = available_notify_services
286  self._secret_secret: str | None = None
287  self._count_count: int | None = None
288  self._notify_service_notify_service: str | None = None
289  self._target_target: str | None = None
290 
291  async def async_step_init(
292  self, user_input: dict[str, str] | None = None
293  ) -> FlowResult:
294  """Let user select available notify services."""
295  errors: dict[str, str] = {}
296 
297  hass = self._auth_module.hass
298  if user_input:
299  self._notify_service_notify_service = user_input["notify_service"]
300  self._target_target = user_input.get("target")
301  self._secret_secret = await hass.async_add_executor_job(_generate_secret)
302  self._count_count = await hass.async_add_executor_job(_generate_random)
303 
304  return await self.async_step_setupasync_step_setup()
305 
306  if not self._available_notify_services_available_notify_services:
307  return self.async_abort(reason="no_available_service")
308 
309  schema: dict[str, Any] = OrderedDict()
310  schema["notify_service"] = vol.In(self._available_notify_services_available_notify_services)
311  schema["target"] = vol.Optional(str)
312 
313  return self.async_show_form(
314  step_id="init", data_schema=vol.Schema(schema), errors=errors
315  )
316 
317  async def async_step_setup(
318  self, user_input: dict[str, str] | None = None
319  ) -> FlowResult:
320  """Verify user can receive one-time password."""
321  errors: dict[str, str] = {}
322 
323  hass = self._auth_module.hass
324  assert self._secret_secret and self._count_count
325  if user_input:
326  verified = await hass.async_add_executor_job(
327  _verify_otp, self._secret_secret, user_input["code"], self._count_count
328  )
329  if verified:
330  await self._auth_module.async_setup_user(
331  self._user_id,
332  {"notify_service": self._notify_service_notify_service, "target": self._target_target},
333  )
334  return self.async_create_entry(data={})
335 
336  errors["base"] = "invalid_code"
337 
338  # generate code every time, no retry logic
339  code = await hass.async_add_executor_job(
340  _generate_otp, self._secret_secret, self._count_count
341  )
342 
343  assert self._notify_service_notify_service
344  try:
345  await self._auth_module.async_notify(
346  code, self._notify_service_notify_service, self._target_target
347  )
348  except ServiceNotFound:
349  return self.async_abort(reason="notify_service_not_exist")
350 
351  return self.async_show_form(
352  step_id="setup",
353  data_schema=self._setup_schema,
354  description_placeholders={"notify_service": self._notify_service_notify_service},
355  errors=errors,
356  )
None async_notify(self, str code, str notify_service, str|None target=None)
Definition: notify.py:262
bool async_validate(self, str user_id, dict[str, Any] user_input)
Definition: notify.py:204
None async_initialize_login_mfa_step(self, str user_id)
Definition: notify.py:221
SetupFlow async_setup_flow(self, str user_id)
Definition: notify.py:165
None async_notify_user(self, str user_id, str code)
Definition: notify.py:244
None __init__(self, HomeAssistant hass, dict[str, Any] config)
Definition: notify.py:100
Any async_setup_user(self, str user_id, Any setup_data)
Definition: notify.py:174
None __init__(self, NotifyAuthModule auth_module, vol.Schema setup_schema, str user_id, list[str] available_notify_services)
Definition: notify.py:280
FlowResult async_step_init(self, dict[str, str]|None user_input=None)
Definition: notify.py:293
FlowResult async_step_setup(self, dict[str, str]|None user_input=None)
Definition: notify.py:319
str _generate_otp(str secret, int count)
Definition: notify.py:67
bool _verify_otp(str secret, str otp, int count)
Definition: notify.py:74
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_load(HomeAssistant hass)
None async_save(self, _T data)
Definition: storage.py:424