1 """Time-based One Time Password auth module."""
3 from __future__
import annotations
7 from typing
import Any, cast
9 import voluptuous
as vol
17 MULTI_FACTOR_AUTH_MODULE_SCHEMA,
18 MULTI_FACTOR_AUTH_MODULES,
19 MultiFactorAuthModule,
23 REQUIREMENTS = [
"pyotp==2.8.0",
"PyQRCode==1.2.1"]
25 CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({}, extra=vol.PREVENT_EXTRA)
28 STORAGE_KEY =
"auth_module.totp"
29 STORAGE_USERS =
"users"
30 STORAGE_USER_ID =
"user_id"
31 STORAGE_OTA_SECRET =
"ota_secret"
33 INPUT_FIELD_CODE =
"code"
35 DUMMY_SECRET =
"FPPTH34D4E3MI2HG"
39 """Generate a base64 PNG string represent QR Code image of data."""
42 qr_code = pyqrcode.create(data)
44 with BytesIO()
as buffer:
45 qr_code.svg(file=buffer, scale=4)
52 '<?xml version="1.0" encoding="UTF-8"?>'
53 '<svg xmlns="http://www.w3.org/2000/svg"'
61 """Generate a secret, url, and QR code."""
64 ota_secret = pyotp.random_base32()
65 url = pyotp.totp.TOTP(ota_secret).provisioning_uri(
66 username, issuer_name=
"Home Assistant"
69 return ota_secret, url, image
72 @MULTI_FACTOR_AUTH_MODULES.register("totp")
74 """Auth module validate time-based one time password."""
76 DEFAULT_TITLE =
"Time-based One Time Password"
79 def __init__(self, hass: HomeAssistant, config: dict[str, Any]) ->
None:
80 """Initialize the user data store."""
82 self.
_users_users: dict[str, str] |
None =
None
83 self.
_user_store_user_store = Store[dict[str, dict[str, str]]](
84 hass, STORAGE_VERSION, STORAGE_KEY, private=
True, atomic_writes=
True
90 """Validate login flow input data."""
91 return vol.Schema({vol.Required(INPUT_FIELD_CODE): str})
94 """Load stored data."""
96 if self.
_users_users
is not None:
100 data = cast(dict[str, dict[str, str]], {STORAGE_USERS: {}})
102 self.
_users_users = data.get(STORAGE_USERS, {})
109 """Create a ota_secret for user."""
112 ota_secret: str = secret
or pyotp.random_base32()
114 self.
_users_users[user_id] = ota_secret
118 """Return a data entry flow handler for setup module.
120 Mfa module should extend SetupFlow
122 user = await self.hass.auth.async_get_user(user_id)
123 assert user
is not None
127 """Set up auth module for user."""
128 if self.
_users_users
is None:
131 result = await self.hass.async_add_executor_job(
139 """Depose auth module for user."""
140 if self.
_users_users
is None:
143 if self.
_users_users.pop(user_id,
None):
147 """Return whether user is setup."""
148 if self.
_users_users
is None:
151 return user_id
in self.
_users_users
153 async
def async_validate(self, user_id: str, user_input: dict[str, Any]) -> bool:
154 """Return True if validation passed."""
155 if self.
_users_users
is None:
160 return await self.hass.async_add_executor_job(
161 self.
_validate_2fa_validate_2fa, user_id, user_input.get(INPUT_FIELD_CODE,
"")
165 """Validate two factor authentication code."""
168 if (ota_secret := self.
_users_users.
get(user_id))
is None:
171 pyotp.TOTP(DUMMY_SECRET).verify(code, valid_window=1)
174 return bool(pyotp.TOTP(ota_secret).verify(code, valid_window=1))
178 """Handler for the setup flow."""
180 _auth_module: TotpAuthModule
186 self, auth_module: TotpAuthModule, setup_schema: vol.Schema, user: User
188 """Initialize the setup flow."""
189 super().
__init__(auth_module, setup_schema, user.id)
193 self, user_input: dict[str, str] |
None =
None
195 """Handle the first step of setup flow.
197 Return self.async_show_form(step_id='init') if user_input is None.
198 Return self.async_create_entry(data={'result': result}) if finish.
202 errors: dict[str, str] = {}
205 verified = await self.hass.async_add_executor_job(
206 pyotp.TOTP(self._ota_secret).verify, user_input[
"code"]
209 result = await self._auth_module.async_setup_user(
210 self._user_id, {
"secret": self._ota_secret}
212 return self.async_create_entry(data={
"result": result})
214 errors[
"base"] =
"invalid_code"
221 ) = await self._auth_module.hass.async_add_executor_job(
222 _generate_secret_and_qr_code,
226 return self.async_show_form(
228 data_schema=self._setup_schema,
229 description_placeholders={
230 "code": self._ota_secret,
232 "qr_code": self._image,
str _add_ota_secret(self, str user_id, str|None secret=None)
SetupFlow async_setup_flow(self, str user_id)
bool _validate_2fa(self, str user_id, str code)
str async_setup_user(self, str user_id, Any setup_data)
None async_depose_user(self, str user_id)
vol.Schema input_schema(self)
None __init__(self, HomeAssistant hass, dict[str, Any] config)
bool async_is_user_setup(self, str user_id)
bool async_validate(self, str user_id, dict[str, Any] user_input)
None __init__(self, TotpAuthModule auth_module, vol.Schema setup_schema, User user)
FlowResult async_step_init(self, dict[str, str]|None user_input=None)
tuple[str, str, str] _generate_secret_and_qr_code(str username)
str _generate_qr_code(str data)
web.Response get(self, web.Request request, str config_key)
None async_load(HomeAssistant hass)
None async_save(self, _T data)