Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The Application Credentials integration.
2 
3 This integration provides APIs for managing local OAuth credentials on behalf
4 of other integrations. Integrations register an authorization server, and then
5 the APIs are used to add one or more client credentials. Integrations may also
6 provide credentials from yaml for backwards compatibility.
7 """
8 
9 from __future__ import annotations
10 
11 from dataclasses import dataclass
12 import logging
13 from typing import Any, Protocol
14 
15 import voluptuous as vol
16 
17 from homeassistant.components import websocket_api
18 from homeassistant.components.websocket_api import ActiveConnection
19 from homeassistant.config_entries import ConfigEntry
20 from homeassistant.const import (
21  CONF_CLIENT_ID,
22  CONF_CLIENT_SECRET,
23  CONF_DOMAIN,
24  CONF_ID,
25  CONF_NAME,
26 )
27 from homeassistant.core import HomeAssistant, callback
28 from homeassistant.exceptions import HomeAssistantError
29 from homeassistant.helpers import collection, config_entry_oauth2_flow
31 from homeassistant.helpers.storage import Store
32 from homeassistant.helpers.typing import ConfigType, VolDictType
33 from homeassistant.loader import (
34  IntegrationNotFound,
35  async_get_application_credentials,
36  async_get_integration,
37 )
38 from homeassistant.util import slugify
39 from homeassistant.util.hass_dict import HassKey
40 
41 __all__ = ["ClientCredential", "AuthorizationServer", "async_import_client_credential"]
42 
43 _LOGGER = logging.getLogger(__name__)
44 
45 DOMAIN = "application_credentials"
46 
47 STORAGE_KEY = DOMAIN
48 STORAGE_VERSION = 1
49 DATA_COMPONENT: HassKey[ApplicationCredentialsStorageCollection] = HassKey(DOMAIN)
50 CONF_AUTH_DOMAIN = "auth_domain"
51 DEFAULT_IMPORT_NAME = "Import from configuration.yaml"
52 
53 CREATE_FIELDS: VolDictType = {
54  vol.Required(CONF_DOMAIN): cv.string,
55  vol.Required(CONF_CLIENT_ID): vol.All(cv.string, vol.Strip),
56  vol.Required(CONF_CLIENT_SECRET): vol.All(cv.string, vol.Strip),
57  vol.Optional(CONF_AUTH_DOMAIN): cv.string,
58  vol.Optional(CONF_NAME): cv.string,
59 }
60 UPDATE_FIELDS: VolDictType = {} # Not supported
61 
62 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
63 
64 
65 @dataclass
67  """Represent an OAuth client credential."""
68 
69  client_id: str
70  client_secret: str
71  name: str | None = None
72 
73 
74 @dataclass
76  """Represent an OAuth2 Authorization Server."""
77 
78  authorize_url: str
79  token_url: str
80 
81 
82 class ApplicationCredentialsStorageCollection(collection.DictStorageCollection):
83  """Application credential collection stored in storage."""
84 
85  CREATE_SCHEMA = vol.Schema(CREATE_FIELDS)
86 
87  async def _process_create_data(self, data: dict[str, str]) -> dict[str, str]:
88  """Validate the config is valid."""
89  result = self.CREATE_SCHEMACREATE_SCHEMA(data)
90  domain = result[CONF_DOMAIN]
91  if not await _get_platform(self.hass, domain):
92  raise ValueError(f"No application_credentials platform for {domain}")
93  return result
94 
95  @callback
96  def _get_suggested_id(self, info: dict[str, str]) -> str:
97  """Suggest an ID based on the config."""
98  return f"{info[CONF_DOMAIN]}.{info[CONF_CLIENT_ID]}"
99 
100  async def _update_data(
101  self, item: dict[str, str], update_data: dict[str, str]
102  ) -> dict[str, str]:
103  """Return a new updated data object."""
104  raise ValueError("Updates not supported")
105 
106  async def async_delete_item(self, item_id: str) -> None:
107  """Delete item, verifying credential is not in use."""
108  if item_id not in self.data:
109  raise collection.ItemNotFound(item_id)
110 
111  # Cannot delete a credential currently in use by a ConfigEntry
112  current = self.data[item_id]
113  entries = self.hass.config_entries.async_entries(current[CONF_DOMAIN])
114  for entry in entries:
115  if entry.data.get("auth_implementation") == item_id:
116  raise HomeAssistantError(
117  f"Cannot delete credential in use by integration {entry.domain}"
118  )
119 
120  await super().async_delete_item(item_id)
121 
122  async def async_import_item(self, info: dict[str, str]) -> None:
123  """Import an yaml credential if it does not already exist."""
124  suggested_id = self._get_suggested_id_get_suggested_id(info)
125  if self.id_manager.has_id(slugify(suggested_id)):
126  return
127  await self.async_create_item(info)
128 
129  def async_client_credentials(self, domain: str) -> dict[str, ClientCredential]:
130  """Return ClientCredentials in storage for the specified domain."""
131  credentials = {}
132  for item in self.async_items():
133  if item[CONF_DOMAIN] != domain:
134  continue
135  auth_domain = item.get(CONF_AUTH_DOMAIN, item[CONF_ID])
136  credentials[auth_domain] = ClientCredential(
137  client_id=item[CONF_CLIENT_ID],
138  client_secret=item[CONF_CLIENT_SECRET],
139  name=item.get(CONF_NAME),
140  )
141  return credentials
142 
143 
144 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
145  """Set up Application Credentials."""
146  hass.data[DOMAIN] = {}
147 
148  id_manager = collection.IDManager()
149  storage_collection = ApplicationCredentialsStorageCollection(
150  Store(hass, STORAGE_VERSION, STORAGE_KEY),
151  id_manager,
152  )
153  await storage_collection.async_load()
154  hass.data[DATA_COMPONENT] = storage_collection
155 
156  collection.DictStorageCollectionWebsocket(
157  storage_collection, DOMAIN, DOMAIN, CREATE_FIELDS, UPDATE_FIELDS
158  ).async_setup(hass)
159 
160  websocket_api.async_register_command(hass, handle_integration_list)
161  websocket_api.async_register_command(hass, handle_config_entry)
162 
163  config_entry_oauth2_flow.async_add_implementation_provider(
164  hass, DOMAIN, _async_provide_implementation
165  )
166 
167  return True
168 
169 
171  hass: HomeAssistant,
172  domain: str,
173  credential: ClientCredential,
174  auth_domain: str | None = None,
175 ) -> None:
176  """Import an existing credential from configuration.yaml."""
177  if DOMAIN not in hass.data:
178  raise ValueError("Integration 'application_credentials' not setup")
179  item = {
180  CONF_DOMAIN: domain,
181  CONF_CLIENT_ID: credential.client_id,
182  CONF_CLIENT_SECRET: credential.client_secret,
183  CONF_AUTH_DOMAIN: auth_domain if auth_domain else domain,
184  }
185  item[CONF_NAME] = credential.name if credential.name else DEFAULT_IMPORT_NAME
186  await hass.data[DATA_COMPONENT].async_import_item(item)
187 
188 
189 class AuthImplementation(config_entry_oauth2_flow.LocalOAuth2Implementation):
190  """Application Credentials local oauth2 implementation."""
191 
192  def __init__(
193  self,
194  hass: HomeAssistant,
195  auth_domain: str,
196  credential: ClientCredential,
197  authorization_server: AuthorizationServer,
198  ) -> None:
199  """Initialize AuthImplementation."""
200  super().__init__(
201  hass,
202  auth_domain,
203  credential.client_id,
204  credential.client_secret,
205  authorization_server.authorize_url,
206  authorization_server.token_url,
207  )
208  self._name_name = credential.name
209 
210  @property
211  def name(self) -> str:
212  """Name of the implementation."""
213  return self._name_name or self.client_id
214 
215 
217  hass: HomeAssistant, domain: str
218 ) -> list[config_entry_oauth2_flow.AbstractOAuth2Implementation]:
219  """Return registered OAuth implementations."""
220 
221  platform = await _get_platform(hass, domain)
222  if not platform:
223  return []
224 
225  credentials = hass.data[DATA_COMPONENT].async_client_credentials(domain)
226  if hasattr(platform, "async_get_auth_implementation"):
227  return [
228  await platform.async_get_auth_implementation(hass, auth_domain, credential)
229  for auth_domain, credential in credentials.items()
230  ]
231  authorization_server = await platform.async_get_authorization_server(hass)
232  return [
233  AuthImplementation(hass, auth_domain, credential, authorization_server)
234  for auth_domain, credential in credentials.items()
235  ]
236 
237 
239  hass: HomeAssistant,
240  config_entry: ConfigEntry,
241 ) -> str | None:
242  """Return the item id of an application credential for an existing ConfigEntry."""
243  if not await _get_platform(hass, config_entry.domain) or not (
244  auth_domain := config_entry.data.get("auth_implementation")
245  ):
246  return None
247 
248  for item in hass.data[DATA_COMPONENT].async_items():
249  item_id = item[CONF_ID]
250  if (
251  item[CONF_DOMAIN] == config_entry.domain
252  and item.get(CONF_AUTH_DOMAIN, item_id) == auth_domain
253  ):
254  return item_id
255  return None
256 
257 
259  """Define the format that application_credentials platforms may have.
260 
261  Most platforms typically just implement async_get_authorization_server, and
262  the default oauth implementation will be used. Otherwise a platform may
263  implement async_get_auth_implementation to give their use a custom
264  AbstractOAuth2Implementation.
265  """
266 
268  self, hass: HomeAssistant
269  ) -> AuthorizationServer:
270  """Return authorization server, for the default auth implementation."""
271 
273  self, hass: HomeAssistant, auth_domain: str, credential: ClientCredential
274  ) -> config_entry_oauth2_flow.AbstractOAuth2Implementation:
275  """Return a custom auth implementation."""
276 
278  self, hass: HomeAssistant
279  ) -> dict[str, str]:
280  """Return description placeholders for the credentials dialog."""
281 
282 
283 async def _get_platform(
284  hass: HomeAssistant, integration_domain: str
285 ) -> ApplicationCredentialsProtocol | None:
286  """Register an application_credentials platform."""
287  try:
288  integration = await async_get_integration(hass, integration_domain)
289  except IntegrationNotFound as err:
290  _LOGGER.debug("Integration '%s' does not exist: %s", integration_domain, err)
291  return None
292  try:
293  platform = await integration.async_get_platform("application_credentials")
294  except ImportError as err:
295  _LOGGER.debug(
296  "Integration '%s' does not provide application_credentials: %s",
297  integration_domain,
298  err,
299  )
300  return None
301  if not hasattr(platform, "async_get_authorization_server") and not hasattr(
302  platform, "async_get_auth_implementation"
303  ):
304  raise ValueError(
305  f"Integration '{integration_domain}' platform {DOMAIN} did not implement"
306  " 'async_get_authorization_server' or 'async_get_auth_implementation'"
307  )
308  return platform
309 
310 
311 async def _async_integration_config(hass: HomeAssistant, domain: str) -> dict[str, Any]:
312  platform = await _get_platform(hass, domain)
313  if platform and hasattr(platform, "async_get_description_placeholders"):
314  placeholders = await platform.async_get_description_placeholders(hass)
315  return {"description_placeholders": placeholders}
316  return {}
317 
318 
319 @websocket_api.websocket_command( {vol.Required("type"): "application_credentials/config"}
320 )
321 @websocket_api.async_response
322 async def handle_integration_list(
323  hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
324 ) -> None:
325  """Handle integrations command."""
326  domains = await async_get_application_credentials(hass)
327  result = {
328  "domains": domains,
329  "integrations": {
330  domain: await _async_integration_config(hass, domain) for domain in domains
331  },
332  }
333  connection.send_result(msg["id"], result)
334 
335 
336 @websocket_api.websocket_command( { vol.Required("type"): "application_credentials/config_entry",
337  vol.Required("config_entry_id"): str,
338  }
339 )
340 @websocket_api.async_response
341 async def handle_config_entry(
342  hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
343 ) -> None:
344  """Return application credentials information for a config entry."""
345  entry_id = msg["config_entry_id"]
346  config_entry = hass.config_entries.async_get_entry(entry_id)
347  if not config_entry:
348  connection.send_error(
349  msg["id"],
350  "invalid_config_entry_id",
351  f"Config entry not found: {entry_id}",
352  )
353  return
354  result = {}
355  if application_credentials_id := await _async_config_entry_app_credentials(
356  hass, config_entry
357  ):
358  result["application_credentials_id"] = application_credentials_id
359  connection.send_result(msg["id"], result)
360 
config_entry_oauth2_flow.AbstractOAuth2Implementation async_get_auth_implementation(self, HomeAssistant hass, str auth_domain, ClientCredential credential)
Definition: __init__.py:274
dict[str, str] async_get_description_placeholders(self, HomeAssistant hass)
Definition: __init__.py:279
AuthorizationServer async_get_authorization_server(self, HomeAssistant hass)
Definition: __init__.py:269
dict[str, str] _update_data(self, dict[str, str] item, dict[str, str] update_data)
Definition: __init__.py:102
None __init__(self, HomeAssistant hass, str auth_domain, ClientCredential credential, AuthorizationServer authorization_server)
Definition: __init__.py:198
None handle_config_entry(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:346
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:144
list[config_entry_oauth2_flow.AbstractOAuth2Implementation] _async_provide_implementation(HomeAssistant hass, str domain)
Definition: __init__.py:218
ApplicationCredentialsProtocol|None _get_platform(HomeAssistant hass, str integration_domain)
Definition: __init__.py:285
dict[str, Any] _async_integration_config(HomeAssistant hass, str domain)
Definition: __init__.py:311
None async_import_client_credential(HomeAssistant hass, str domain, ClientCredential credential, str|None auth_domain=None)
Definition: __init__.py:175
str|None _async_config_entry_app_credentials(HomeAssistant hass, ConfigEntry config_entry)
Definition: __init__.py:241
None handle_integration_list(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:325
list[str] async_get_application_credentials(HomeAssistant hass)
Definition: loader.py:453
Integration async_get_integration(HomeAssistant hass, str domain)
Definition: loader.py:1354
str slugify(str|None text, *str separator="_")
Definition: __init__.py:41