1 """Offer API to configure the Home Assistant auth provider."""
3 from __future__
import annotations
7 import voluptuous
as vol
17 """Enable the Home Assistant views."""
18 websocket_api.async_register_command(hass, websocket_create)
19 websocket_api.async_register_command(hass, websocket_delete)
20 websocket_api.async_register_command(hass, websocket_change_password)
21 websocket_api.async_register_command(hass, websocket_admin_change_password)
22 websocket_api.async_register_command(hass, websocket_admin_change_username)
26 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/auth_provider/homeassistant/create",
27 vol.Required(
"user_id"): str,
28 vol.Required(
"username"): str,
29 vol.Required(
"password"): str,
32 @websocket_api.require_admin
33 @websocket_api.async_response
39 """Create credentials and attach to a user."""
40 provider = auth_ha.async_get_provider(hass)
42 if (user := await hass.auth.async_get_user(msg[
"user_id"]))
is None:
43 connection.send_error(msg[
"id"],
"not_found",
"User not found")
46 if user.system_generated:
47 connection.send_error(
50 "Cannot add credentials to a system generated user.",
54 await provider.async_add_auth(msg[
"username"], msg[
"password"])
56 credentials = await provider.async_get_or_create_credentials(
57 {
"username": msg[
"username"]}
59 await hass.auth.async_link_user(user, credentials)
61 connection.send_result(msg[
"id"])
64 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/auth_provider/homeassistant/delete",
65 vol.Required(
"username"): str,
68 @websocket_api.require_admin
69 @websocket_api.async_response
75 """Delete username and related credential."""
76 provider = auth_ha.async_get_provider(hass)
77 credentials = await provider.async_get_or_create_credentials(
78 {
"username": msg[
"username"]}
83 if not credentials.is_new:
84 await hass.auth.async_remove_credentials(credentials)
86 connection.send_result(msg[
"id"])
89 await provider.async_remove_auth(msg[
"username"])
91 connection.send_result(msg[
"id"])
94 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/auth_provider/homeassistant/change_password",
95 vol.Required(
"current_password"): str,
96 vol.Required(
"new_password"): str,
99 @websocket_api.async_response
105 """Change current user password."""
106 if (user := connection.user)
is None:
107 connection.send_error(msg[
"id"],
"user_not_found",
"User not found")
110 provider = auth_ha.async_get_provider(hass)
112 for credential
in user.credentials:
113 if credential.auth_provider_type == provider.type:
114 username = credential.data[
"username"]
118 connection.send_error(
119 msg[
"id"],
"credentials_not_found",
"Credentials not found"
124 await provider.async_validate_login(username, msg[
"current_password"])
125 except auth_ha.InvalidAuth:
126 connection.send_error(
127 msg[
"id"],
"invalid_current_password",
"Invalid current password"
131 await provider.async_change_password(username, msg[
"new_password"])
133 connection.send_result(msg[
"id"])
136 @websocket_api.websocket_command(
{
vol.Required(
"type"
):
"config/auth_provider/homeassistant/admin_change_password",
137 vol.Required(
"user_id"): str,
138 vol.Required(
"password"): str,
141 @websocket_api.require_admin
142 @websocket_api.async_response
148 """Change password of any user."""
149 if not connection.user.is_owner:
152 if (user := await hass.auth.async_get_user(msg[
"user_id"]))
is None:
153 connection.send_error(msg[
"id"],
"user_not_found",
"User not found")
156 provider = auth_ha.async_get_provider(hass)
159 for credential
in user.credentials:
160 if credential.auth_provider_type == provider.type:
161 username = credential.data[
"username"]
165 connection.send_error(
166 msg[
"id"],
"credentials_not_found",
"Credentials not found"
170 await provider.async_change_password(username, msg[
"password"])
171 connection.send_result(msg[
"id"])
174 @websocket_api.websocket_command(
{
vol.Required(
"type"
):
"config/auth_provider/homeassistant/admin_change_username",
175 vol.Required(
"user_id"): str,
176 vol.Required(
"username"): str,
179 @websocket_api.require_admin
180 @websocket_api.async_response
186 """Change the username for any user."""
187 if not connection.user.is_owner:
190 if (user := await hass.auth.async_get_user(msg[
"user_id"]))
is None:
191 connection.send_error(msg[
"id"],
"user_not_found",
"User not found")
194 provider = auth_ha.async_get_provider(hass)
195 found_credential =
None
196 for credential
in user.credentials:
197 if credential.auth_provider_type == provider.type:
198 found_credential = credential
201 if found_credential
is None:
202 connection.send_error(
203 msg[
"id"],
"credentials_not_found",
"Credentials not found"
207 await provider.async_change_username(found_credential, msg[
"username"])
208 connection.send_result(msg[
"id"])
209
None websocket_admin_change_password(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bool async_setup(HomeAssistant hass)
None websocket_admin_change_username(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_delete(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_change_password(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None websocket_create(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)