Home Assistant Unofficial Reference 2024.12.1
totp.py
Go to the documentation of this file.
1 """Time-based One Time Password auth module."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from io import BytesIO
7 from typing import Any, cast
8 
9 import voluptuous as vol
10 
11 from homeassistant.auth.models import User
12 from homeassistant.core import HomeAssistant
13 from homeassistant.data_entry_flow import FlowResult
14 from homeassistant.helpers.storage import Store
15 
16 from . import (
17  MULTI_FACTOR_AUTH_MODULE_SCHEMA,
18  MULTI_FACTOR_AUTH_MODULES,
19  MultiFactorAuthModule,
20  SetupFlow,
21 )
22 
23 REQUIREMENTS = ["pyotp==2.8.0", "PyQRCode==1.2.1"]
24 
25 CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({}, extra=vol.PREVENT_EXTRA)
26 
27 STORAGE_VERSION = 1
28 STORAGE_KEY = "auth_module.totp"
29 STORAGE_USERS = "users"
30 STORAGE_USER_ID = "user_id"
31 STORAGE_OTA_SECRET = "ota_secret"
32 
33 INPUT_FIELD_CODE = "code"
34 
35 DUMMY_SECRET = "FPPTH34D4E3MI2HG"
36 
37 
38 def _generate_qr_code(data: str) -> str:
39  """Generate a base64 PNG string represent QR Code image of data."""
40  import pyqrcode # pylint: disable=import-outside-toplevel
41 
42  qr_code = pyqrcode.create(data)
43 
44  with BytesIO() as buffer:
45  qr_code.svg(file=buffer, scale=4)
46  return str(
47  buffer.getvalue()
48  .decode("ascii")
49  .replace("\n", "")
50  .replace(
51  (
52  '<?xml version="1.0" encoding="UTF-8"?>'
53  '<svg xmlns="http://www.w3.org/2000/svg"'
54  ),
55  "<svg",
56  )
57  )
58 
59 
60 def _generate_secret_and_qr_code(username: str) -> tuple[str, str, str]:
61  """Generate a secret, url, and QR code."""
62  import pyotp # pylint: disable=import-outside-toplevel
63 
64  ota_secret = pyotp.random_base32()
65  url = pyotp.totp.TOTP(ota_secret).provisioning_uri(
66  username, issuer_name="Home Assistant"
67  )
68  image = _generate_qr_code(url)
69  return ota_secret, url, image
70 
71 
72 @MULTI_FACTOR_AUTH_MODULES.register("totp")
73 class TotpAuthModule(MultiFactorAuthModule):
74  """Auth module validate time-based one time password."""
75 
76  DEFAULT_TITLE = "Time-based One Time Password"
77  MAX_RETRY_TIME = 5
78 
79  def __init__(self, hass: HomeAssistant, config: dict[str, Any]) -> None:
80  """Initialize the user data store."""
81  super().__init__(hass, config)
82  self._users_users: dict[str, str] | None = None
83  self._user_store_user_store = Store[dict[str, dict[str, str]]](
84  hass, STORAGE_VERSION, STORAGE_KEY, private=True, atomic_writes=True
85  )
86  self._init_lock_init_lock = asyncio.Lock()
87 
88  @property
89  def input_schema(self) -> vol.Schema:
90  """Validate login flow input data."""
91  return vol.Schema({vol.Required(INPUT_FIELD_CODE): str})
92 
93  async def _async_load(self) -> None:
94  """Load stored data."""
95  async with self._init_lock_init_lock:
96  if self._users_users is not None:
97  return
98 
99  if (data := await self._user_store_user_store.async_load()) is None:
100  data = cast(dict[str, dict[str, str]], {STORAGE_USERS: {}})
101 
102  self._users_users = data.get(STORAGE_USERS, {})
103 
104  async def _async_save(self) -> None:
105  """Save data."""
106  await self._user_store_user_store.async_save({STORAGE_USERS: self._users_users or {}})
107 
108  def _add_ota_secret(self, user_id: str, secret: str | None = None) -> str:
109  """Create a ota_secret for user."""
110  import pyotp # pylint: disable=import-outside-toplevel
111 
112  ota_secret: str = secret or pyotp.random_base32()
113 
114  self._users_users[user_id] = ota_secret # type: ignore[index]
115  return ota_secret
116 
117  async def async_setup_flow(self, user_id: str) -> SetupFlow:
118  """Return a data entry flow handler for setup module.
119 
120  Mfa module should extend SetupFlow
121  """
122  user = await self.hass.auth.async_get_user(user_id)
123  assert user is not None
124  return TotpSetupFlow(self, self.input_schemainput_schema, user)
125 
126  async def async_setup_user(self, user_id: str, setup_data: Any) -> str:
127  """Set up auth module for user."""
128  if self._users_users is None:
129  await self._async_load_async_load()
130 
131  result = await self.hass.async_add_executor_job(
132  self._add_ota_secret_add_ota_secret, user_id, setup_data.get("secret")
133  )
134 
135  await self._async_save_async_save()
136  return result
137 
138  async def async_depose_user(self, user_id: str) -> None:
139  """Depose auth module for user."""
140  if self._users_users is None:
141  await self._async_load_async_load()
142 
143  if self._users_users.pop(user_id, None): # type: ignore[union-attr]
144  await self._async_save_async_save()
145 
146  async def async_is_user_setup(self, user_id: str) -> bool:
147  """Return whether user is setup."""
148  if self._users_users is None:
149  await self._async_load_async_load()
150 
151  return user_id in self._users_users # type: ignore[operator]
152 
153  async def async_validate(self, user_id: str, user_input: dict[str, Any]) -> bool:
154  """Return True if validation passed."""
155  if self._users_users is None:
156  await self._async_load_async_load()
157 
158  # user_input has been validate in caller
159  # set INPUT_FIELD_CODE as vol.Required is not user friendly
160  return await self.hass.async_add_executor_job(
161  self._validate_2fa_validate_2fa, user_id, user_input.get(INPUT_FIELD_CODE, "")
162  )
163 
164  def _validate_2fa(self, user_id: str, code: str) -> bool:
165  """Validate two factor authentication code."""
166  import pyotp # pylint: disable=import-outside-toplevel
167 
168  if (ota_secret := self._users_users.get(user_id)) is None: # type: ignore[union-attr]
169  # even we cannot find user, we still do verify
170  # to make timing the same as if user was found.
171  pyotp.TOTP(DUMMY_SECRET).verify(code, valid_window=1)
172  return False
173 
174  return bool(pyotp.TOTP(ota_secret).verify(code, valid_window=1))
175 
176 
177 class TotpSetupFlow(SetupFlow):
178  """Handler for the setup flow."""
179 
180  _auth_module: TotpAuthModule
181  _ota_secret: str
182  _url: str
183  _image: str
184 
185  def __init__(
186  self, auth_module: TotpAuthModule, setup_schema: vol.Schema, user: User
187  ) -> None:
188  """Initialize the setup flow."""
189  super().__init__(auth_module, setup_schema, user.id)
190  self._user_user = user
191 
192  async def async_step_init(
193  self, user_input: dict[str, str] | None = None
194  ) -> FlowResult:
195  """Handle the first step of setup flow.
196 
197  Return self.async_show_form(step_id='init') if user_input is None.
198  Return self.async_create_entry(data={'result': result}) if finish.
199  """
200  import pyotp # pylint: disable=import-outside-toplevel
201 
202  errors: dict[str, str] = {}
203 
204  if user_input:
205  verified = await self.hass.async_add_executor_job(
206  pyotp.TOTP(self._ota_secret).verify, user_input["code"]
207  )
208  if verified:
209  result = await self._auth_module.async_setup_user(
210  self._user_id, {"secret": self._ota_secret}
211  )
212  return self.async_create_entry(data={"result": result})
213 
214  errors["base"] = "invalid_code"
215 
216  else:
217  (
218  self._ota_secret,
219  self._url,
220  self._image,
221  ) = await self._auth_module.hass.async_add_executor_job(
222  _generate_secret_and_qr_code,
223  str(self._user_user.name),
224  )
225 
226  return self.async_show_form(
227  step_id="init",
228  data_schema=self._setup_schema,
229  description_placeholders={
230  "code": self._ota_secret,
231  "url": self._url,
232  "qr_code": self._image,
233  },
234  errors=errors,
235  )
str _add_ota_secret(self, str user_id, str|None secret=None)
Definition: totp.py:108
SetupFlow async_setup_flow(self, str user_id)
Definition: totp.py:117
bool _validate_2fa(self, str user_id, str code)
Definition: totp.py:164
str async_setup_user(self, str user_id, Any setup_data)
Definition: totp.py:126
None async_depose_user(self, str user_id)
Definition: totp.py:138
None __init__(self, HomeAssistant hass, dict[str, Any] config)
Definition: totp.py:79
bool async_is_user_setup(self, str user_id)
Definition: totp.py:146
bool async_validate(self, str user_id, dict[str, Any] user_input)
Definition: totp.py:153
None __init__(self, TotpAuthModule auth_module, vol.Schema setup_schema, User user)
Definition: totp.py:187
FlowResult async_step_init(self, dict[str, str]|None user_input=None)
Definition: totp.py:194
tuple[str, str, str] _generate_secret_and_qr_code(str username)
Definition: totp.py:60
str _generate_qr_code(str data)
Definition: totp.py:38
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_load(HomeAssistant hass)
None async_save(self, _T data)
Definition: storage.py:424