Home Assistant Unofficial Reference 2024.12.1
auth_provider_homeassistant.py
Go to the documentation of this file.
1 """Offer API to configure the Home Assistant auth provider."""
2 
3 from __future__ import annotations
4 
5 from typing import Any
6 
7 import voluptuous as vol
8 
9 from homeassistant.auth.providers import homeassistant as auth_ha
10 from homeassistant.components import websocket_api
11 from homeassistant.core import HomeAssistant, callback
12 from homeassistant.exceptions import Unauthorized
13 
14 
15 @callback
16 def async_setup(hass: HomeAssistant) -> bool:
17  """Enable the Home Assistant views."""
18  websocket_api.async_register_command(hass, websocket_create)
19  websocket_api.async_register_command(hass, websocket_delete)
20  websocket_api.async_register_command(hass, websocket_change_password)
21  websocket_api.async_register_command(hass, websocket_admin_change_password)
22  websocket_api.async_register_command(hass, websocket_admin_change_username)
23  return True
24 
25 
26 @websocket_api.websocket_command( { vol.Required("type"): "config/auth_provider/homeassistant/create",
27  vol.Required("user_id"): str,
28  vol.Required("username"): str,
29  vol.Required("password"): str,
30  }
31 )
32 @websocket_api.require_admin
33 @websocket_api.async_response
34 async def websocket_create(
35  hass: HomeAssistant,
37  msg: dict[str, Any],
38 ) -> None:
39  """Create credentials and attach to a user."""
40  provider = auth_ha.async_get_provider(hass)
41 
42  if (user := await hass.auth.async_get_user(msg["user_id"])) is None:
43  connection.send_error(msg["id"], "not_found", "User not found")
44  return
45 
46  if user.system_generated:
47  connection.send_error(
48  msg["id"],
49  "system_generated",
50  "Cannot add credentials to a system generated user.",
51  )
52  return
53 
54  await provider.async_add_auth(msg["username"], msg["password"])
55 
56  credentials = await provider.async_get_or_create_credentials(
57  {"username": msg["username"]}
58  )
59  await hass.auth.async_link_user(user, credentials)
60 
61  connection.send_result(msg["id"])
62 
63 
64 @websocket_api.websocket_command( { vol.Required("type"): "config/auth_provider/homeassistant/delete",
65  vol.Required("username"): str,
66  }
67 )
68 @websocket_api.require_admin
69 @websocket_api.async_response
70 async def websocket_delete(
71  hass: HomeAssistant,
73  msg: dict[str, Any],
74 ) -> None:
75  """Delete username and related credential."""
76  provider = auth_ha.async_get_provider(hass)
77  credentials = await provider.async_get_or_create_credentials(
78  {"username": msg["username"]}
79  )
80 
81  # if not new, an existing credential exists.
82  # Removing the credential will also remove the auth.
83  if not credentials.is_new:
84  await hass.auth.async_remove_credentials(credentials)
85 
86  connection.send_result(msg["id"])
87  return
88 
89  await provider.async_remove_auth(msg["username"])
90 
91  connection.send_result(msg["id"])
92 
93 
94 @websocket_api.websocket_command( { vol.Required("type"): "config/auth_provider/homeassistant/change_password",
95  vol.Required("current_password"): str,
96  vol.Required("new_password"): str,
97  }
98 )
99 @websocket_api.async_response
100 async def websocket_change_password(
101  hass: HomeAssistant,
102  connection: websocket_api.ActiveConnection,
103  msg: dict[str, Any],
104 ) -> None:
105  """Change current user password."""
106  if (user := connection.user) is None:
107  connection.send_error(msg["id"], "user_not_found", "User not found") # type: ignore[unreachable]
108  return
109 
110  provider = auth_ha.async_get_provider(hass)
111  username = None
112  for credential in user.credentials:
113  if credential.auth_provider_type == provider.type:
114  username = credential.data["username"]
115  break
116 
117  if username is None:
118  connection.send_error(
119  msg["id"], "credentials_not_found", "Credentials not found"
120  )
121  return
122 
123  try:
124  await provider.async_validate_login(username, msg["current_password"])
125  except auth_ha.InvalidAuth:
126  connection.send_error(
127  msg["id"], "invalid_current_password", "Invalid current password"
128  )
129  return
130 
131  await provider.async_change_password(username, msg["new_password"])
132 
133  connection.send_result(msg["id"])
134 
135 
136 @websocket_api.websocket_command( { vol.Required( "type" ): "config/auth_provider/homeassistant/admin_change_password",
137  vol.Required("user_id"): str,
138  vol.Required("password"): str,
139  }
140 )
141 @websocket_api.require_admin
142 @websocket_api.async_response
144  hass: HomeAssistant,
145  connection: websocket_api.ActiveConnection,
146  msg: dict[str, Any],
147 ) -> None:
148  """Change password of any user."""
149  if not connection.user.is_owner:
150  raise Unauthorized(context=connection.context(msg))
151 
152  if (user := await hass.auth.async_get_user(msg["user_id"])) is None:
153  connection.send_error(msg["id"], "user_not_found", "User not found")
154  return
155 
156  provider = auth_ha.async_get_provider(hass)
157 
158  username = None
159  for credential in user.credentials:
160  if credential.auth_provider_type == provider.type:
161  username = credential.data["username"]
162  break
163 
164  if username is None:
165  connection.send_error(
166  msg["id"], "credentials_not_found", "Credentials not found"
167  )
168  return
169 
170  await provider.async_change_password(username, msg["password"])
171  connection.send_result(msg["id"])
172 
173 
174 @websocket_api.websocket_command( { vol.Required( "type" ): "config/auth_provider/homeassistant/admin_change_username",
175  vol.Required("user_id"): str,
176  vol.Required("username"): str,
177  }
178 )
179 @websocket_api.require_admin
180 @websocket_api.async_response
182  hass: HomeAssistant,
183  connection: websocket_api.ActiveConnection,
184  msg: dict[str, Any],
185 ) -> None:
186  """Change the username for any user."""
187  if not connection.user.is_owner:
188  raise Unauthorized(context=connection.context(msg))
189 
190  if (user := await hass.auth.async_get_user(msg["user_id"])) is None:
191  connection.send_error(msg["id"], "user_not_found", "User not found")
192  return
193 
194  provider = auth_ha.async_get_provider(hass)
195  found_credential = None
196  for credential in user.credentials:
197  if credential.auth_provider_type == provider.type:
198  found_credential = credential
199  break
200 
201  if found_credential is None:
202  connection.send_error(
203  msg["id"], "credentials_not_found", "Credentials not found"
204  )
205  return
206 
207  await provider.async_change_username(found_credential, msg["username"])
208  connection.send_result(msg["id"])
209 
None websocket_admin_change_password(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_admin_change_username(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_delete(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_change_password(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_create(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)