1 """HMAC-based One-time Password auth module.
3 Sending HOTP through notify service
6 from __future__
import annotations
9 from collections
import OrderedDict
11 from typing
import Any, cast
14 import voluptuous
as vol
24 MULTI_FACTOR_AUTH_MODULE_SCHEMA,
25 MULTI_FACTOR_AUTH_MODULES,
26 MultiFactorAuthModule,
30 REQUIREMENTS = [
"pyotp==2.8.0"]
32 CONF_MESSAGE =
"message"
34 CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend(
36 vol.Optional(CONF_INCLUDE): vol.All(cv.ensure_list, [cv.string]),
37 vol.Optional(CONF_EXCLUDE): vol.All(cv.ensure_list, [cv.string]),
38 vol.Optional(CONF_MESSAGE, default=
"{} is your Home Assistant login code"): str,
40 extra=vol.PREVENT_EXTRA,
44 STORAGE_KEY =
"auth_module.notify"
45 STORAGE_USERS =
"users"
46 STORAGE_USER_ID =
"user_id"
48 INPUT_FIELD_CODE =
"code"
50 _LOGGER = logging.getLogger(__name__)
54 """Generate a secret."""
57 return str(pyotp.random_base32())
61 """Generate a 32 digit number."""
64 return int(pyotp.random_base32(length=32, chars=
list(
"1234567890")))
68 """Generate one time password."""
71 return str(pyotp.HOTP(secret).at(count))
75 """Verify one time password."""
78 return bool(pyotp.HOTP(secret).verify(otp, count))
83 """Store notify setting for one user."""
85 secret: str = attr.ib(factory=_generate_secret)
86 counter: int = attr.ib(factory=_generate_random)
87 notify_service: str |
None = attr.ib(default=
None)
88 target: str |
None = attr.ib(default=
None)
91 type _UsersDict = dict[str, NotifySetting]
94 @MULTI_FACTOR_AUTH_MODULES.register("notify")
96 """Auth module send hmac-based one time password by notify service."""
98 DEFAULT_TITLE =
"Notify One-Time Password"
100 def __init__(self, hass: HomeAssistant, config: dict[str, Any]) ->
None:
101 """Initialize the user data store."""
105 hass, STORAGE_VERSION, STORAGE_KEY, private=
True, atomic_writes=
True
107 self.
_include_include = config.get(CONF_INCLUDE, [])
108 self.
_exclude_exclude = config.get(CONF_EXCLUDE, [])
114 """Validate login flow input data."""
115 return vol.Schema({vol.Required(INPUT_FIELD_CODE): str})
118 """Load stored data."""
124 data = cast(dict[str, dict[str, Any]], {STORAGE_USERS: {}})
128 for user_id, setting
in data.get(STORAGE_USERS, {}).items()
139 user_id: attr.asdict(
141 filter=attr.filters.exclude(
142 attr.fields(NotifySetting).secret,
143 attr.fields(NotifySetting).counter,
146 for user_id, notify_setting
in self.
_user_settings_user_settings.items()
153 """Return list of notify services."""
154 unordered_services = set()
156 for service
in self.hass.services.async_services_for_domain(
"notify"):
157 if service
not in self.
_exclude_exclude:
158 unordered_services.add(service)
161 unordered_services &= set(self.
_include_include)
163 return sorted(unordered_services)
166 """Return a data entry flow handler for setup module.
168 Mfa module should extend SetupFlow
175 """Set up auth module for user."""
181 notify_service=setup_data.get(
"notify_service"),
182 target=setup_data.get(
"target"),
188 """Depose auth module for user."""
197 """Return whether user is setup."""
204 async
def async_validate(self, user_id: str, user_input: dict[str, Any]) -> bool:
205 """Return True if validation passed."""
214 return await self.hass.async_add_executor_job(
216 notify_setting.secret,
217 user_input.get(INPUT_FIELD_CODE,
""),
218 notify_setting.counter,
222 """Generate code and notify user."""
228 raise ValueError(
"Cannot find user_id")
230 def generate_secret_and_one_time_password() -> str:
231 """Generate and send one time password."""
232 assert notify_setting
236 return _generate_otp(notify_setting.secret, notify_setting.counter)
238 code = await self.hass.async_add_executor_job(
239 generate_secret_and_one_time_password
245 """Send code by user's notify service."""
251 _LOGGER.error(
"Cannot find user %s", user_id)
256 notify_setting.notify_service,
257 notify_setting.target,
261 self, code: str, notify_service: str, target: str |
None =
None
263 """Send code by notify service."""
266 data[
"target"] = [target]
268 await self.hass.services.async_call(
"notify", notify_service, data)
272 """Handler for the setup flow."""
276 auth_module: NotifyAuthModule,
277 setup_schema: vol.Schema,
279 available_notify_services: list[str],
281 """Initialize the setup flow."""
282 super().
__init__(auth_module, setup_schema, user_id)
284 self._auth_module: NotifyAuthModule = auth_module
286 self.
_secret_secret: str |
None =
None
287 self.
_count_count: int |
None =
None
289 self.
_target_target: str |
None =
None
292 self, user_input: dict[str, str] |
None =
None
294 """Let user select available notify services."""
295 errors: dict[str, str] = {}
297 hass = self._auth_module.hass
300 self.
_target_target = user_input.get(
"target")
301 self.
_secret_secret = await hass.async_add_executor_job(_generate_secret)
302 self.
_count_count = await hass.async_add_executor_job(_generate_random)
307 return self.async_abort(reason=
"no_available_service")
309 schema: dict[str, Any] = OrderedDict()
311 schema[
"target"] = vol.Optional(str)
313 return self.async_show_form(
314 step_id=
"init", data_schema=vol.Schema(schema), errors=errors
318 self, user_input: dict[str, str] |
None =
None
320 """Verify user can receive one-time password."""
321 errors: dict[str, str] = {}
323 hass = self._auth_module.hass
326 verified = await hass.async_add_executor_job(
327 _verify_otp, self.
_secret_secret, user_input[
"code"], self.
_count_count
330 await self._auth_module.async_setup_user(
334 return self.async_create_entry(data={})
336 errors[
"base"] =
"invalid_code"
339 code = await hass.async_add_executor_job(
345 await self._auth_module.async_notify(
348 except ServiceNotFound:
349 return self.async_abort(reason=
"notify_service_not_exist")
351 return self.async_show_form(
353 data_schema=self._setup_schema,
354 description_placeholders={
"notify_service": self.
_notify_service_notify_service},
vol.Schema input_schema(self)
list[str] aync_get_available_notify_services(self)
bool async_is_user_setup(self, str user_id)
None async_notify(self, str code, str notify_service, str|None target=None)
bool async_validate(self, str user_id, dict[str, Any] user_input)
None async_initialize_login_mfa_step(self, str user_id)
SetupFlow async_setup_flow(self, str user_id)
None async_notify_user(self, str user_id, str code)
None __init__(self, HomeAssistant hass, dict[str, Any] config)
Any async_setup_user(self, str user_id, Any setup_data)
None async_depose_user(self, str user_id)
None __init__(self, NotifyAuthModule auth_module, vol.Schema setup_schema, str user_id, list[str] available_notify_services)
_available_notify_services
FlowResult async_step_init(self, dict[str, str]|None user_input=None)
FlowResult async_step_setup(self, dict[str, str]|None user_input=None)
str _generate_otp(str secret, int count)
bool _verify_otp(str secret, str otp, int count)
web.Response get(self, web.Request request, str config_key)
None async_load(HomeAssistant hass)
None async_save(self, _T data)