Home Assistant Unofficial Reference 2024.12.1
http.py
Go to the documentation of this file.
1 """Support for Google Actions Smart Home Control."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from http import HTTPStatus
7 import logging
8 from typing import Any
9 from uuid import uuid4
10 
11 from aiohttp import ClientError, ClientResponseError
12 from aiohttp.web import Request, Response
13 import jwt
14 
15 from homeassistant.components import webhook
16 from homeassistant.components.http import KEY_HASS, HomeAssistantView
17 from homeassistant.const import CLOUD_NEVER_EXPOSED_ENTITIES
18 from homeassistant.core import HomeAssistant, callback
19 from homeassistant.exceptions import HomeAssistantError
20 from homeassistant.helpers import entity_registry as er
21 from homeassistant.helpers.aiohttp_client import async_get_clientsession
22 from homeassistant.helpers.storage import STORAGE_DIR, Store
23 from homeassistant.util import dt as dt_util, json as json_util
24 
25 from .const import (
26  CONF_CLIENT_EMAIL,
27  CONF_ENTITY_CONFIG,
28  CONF_EXPOSE,
29  CONF_EXPOSE_BY_DEFAULT,
30  CONF_EXPOSED_DOMAINS,
31  CONF_PRIVATE_KEY,
32  CONF_REPORT_STATE,
33  CONF_SECURE_DEVICES_PIN,
34  CONF_SERVICE_ACCOUNT,
35  DOMAIN,
36  GOOGLE_ASSISTANT_API_ENDPOINT,
37  HOMEGRAPH_SCOPE,
38  HOMEGRAPH_TOKEN_URL,
39  REPORT_STATE_BASE_URL,
40  REQUEST_SYNC_BASE_URL,
41  SOURCE_CLOUD,
42  STORE_AGENT_USER_IDS,
43  STORE_GOOGLE_LOCAL_WEBHOOK_ID,
44 )
45 from .helpers import AbstractConfig
46 from .smart_home import async_handle_message
47 
48 _LOGGER = logging.getLogger(__name__)
49 
50 
51 def _get_homegraph_jwt(time, iss, key):
52  now = int(time.timestamp())
53 
54  jwt_raw = {
55  "iss": iss,
56  "scope": HOMEGRAPH_SCOPE,
57  "aud": HOMEGRAPH_TOKEN_URL,
58  "iat": now,
59  "exp": now + 3600,
60  }
61  return jwt.encode(jwt_raw, key, algorithm="RS256")
62 
63 
65  hass: HomeAssistant, jwt_signed: str
66 ) -> dict[str, Any] | list[Any] | Any:
67  headers = {
68  "Authorization": f"Bearer {jwt_signed}",
69  "Content-Type": "application/x-www-form-urlencoded",
70  }
71  data = {
72  "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
73  "assertion": jwt_signed,
74  }
75 
76  session = async_get_clientsession(hass)
77  async with session.post(HOMEGRAPH_TOKEN_URL, headers=headers, data=data) as res:
78  res.raise_for_status()
79  return await res.json()
80 
81 
83  """Config for manual setup of Google."""
84 
85  _store: GoogleConfigStore
86 
87  def __init__(self, hass, config):
88  """Initialize the config."""
89  super().__init__(hass)
90  self._config_config = config
91  self._access_token_access_token = None
92  self._access_token_renew_access_token_renew = None
93 
94  async def async_initialize(self):
95  """Perform async initialization of config."""
96  # We need to initialize the store before calling super
97  self._store_store = GoogleConfigStore(self.hasshass)
98  await self._store_store.async_initialize()
99 
100  await super().async_initialize()
101 
102  self.async_enable_local_sdkasync_enable_local_sdk()
103 
104  @property
105  def enabled(self):
106  """Return if Google is enabled."""
107  return True
108 
109  @property
110  def entity_config(self):
111  """Return entity config."""
112  return self._config_config.get(CONF_ENTITY_CONFIG) or {}
113 
114  @property
116  """Return entity config."""
117  return self._config_config.get(CONF_SECURE_DEVICES_PIN)
118 
119  @property
121  """Return if states should be proactively reported."""
122  return self._config_config.get(CONF_REPORT_STATE)
123 
124  def get_local_user_id(self, webhook_id):
125  """Map webhook ID to a Home Assistant user ID.
126 
127  Any action initiated by Google Assistant via the local SDK will be attributed
128  to the returned user ID.
129 
130  Return None if no user id is found for the webhook_id.
131  """
132  # Note: The manually setup Google Assistant currently returns the Google agent
133  # user ID instead of a valid Home Assistant user ID
134  found_agent_user_id = None
135  for agent_user_id, agent_user_data in self._store_store.agent_user_ids.items():
136  if agent_user_data[STORE_GOOGLE_LOCAL_WEBHOOK_ID] == webhook_id:
137  found_agent_user_id = agent_user_id
138  break
139 
140  return found_agent_user_id
141 
142  def get_local_webhook_id(self, agent_user_id):
143  """Return the webhook ID to be used for actions for a given agent user id via the local SDK."""
144  if data := self._store_store.agent_user_ids.get(agent_user_id):
145  return data[STORE_GOOGLE_LOCAL_WEBHOOK_ID]
146  return None
147 
148  def get_agent_user_id_from_context(self, context):
149  """Get agent user ID making request."""
150  return context.user_id
151 
152  def get_agent_user_id_from_webhook(self, webhook_id):
153  """Map webhook ID to a Google agent user ID.
154 
155  Return None if no agent user id is found for the webhook_id.
156  """
157  for agent_user_id, agent_user_data in self._store_store.agent_user_ids.items():
158  if agent_user_data[STORE_GOOGLE_LOCAL_WEBHOOK_ID] == webhook_id:
159  return agent_user_id
160 
161  return None
162 
163  def should_expose(self, state) -> bool:
164  """Return if entity should be exposed."""
165  expose_by_default = self._config_config.get(CONF_EXPOSE_BY_DEFAULT)
166  exposed_domains = self._config_config.get(CONF_EXPOSED_DOMAINS)
167 
168  if state.attributes.get("view") is not None:
169  # Ignore entities that are views
170  return False
171 
172  if state.entity_id in CLOUD_NEVER_EXPOSED_ENTITIES:
173  return False
174 
175  entity_registry = er.async_get(self.hasshass)
176  registry_entry = entity_registry.async_get(state.entity_id)
177  if registry_entry:
178  auxiliary_entity = (
179  registry_entry.entity_category is not None
180  or registry_entry.hidden_by is not None
181  )
182  else:
183  auxiliary_entity = False
184 
185  explicit_expose = self.entity_configentity_configentity_config.get(state.entity_id, {}).get(CONF_EXPOSE)
186 
187  domain_exposed_by_default = (
188  expose_by_default and state.domain in exposed_domains
189  )
190 
191  # Expose an entity by default if the entity's domain is exposed by default
192  # and the entity is not a config or diagnostic entity
193  entity_exposed_by_default = domain_exposed_by_default and not auxiliary_entity
194 
195  # Expose an entity if the entity's is exposed by default and
196  # the configuration doesn't explicitly exclude it from being
197  # exposed, or if the entity is explicitly exposed
198  is_default_exposed = entity_exposed_by_default and explicit_expose is not False
199 
200  return is_default_exposed or explicit_expose
201 
202  def should_2fa(self, state):
203  """If an entity should have 2FA checked."""
204  return True
205 
206  async def _async_request_sync_devices(self, agent_user_id: str) -> HTTPStatus:
207  if CONF_SERVICE_ACCOUNT in self._config_config:
208  return await self.async_call_homegraph_apiasync_call_homegraph_api(
209  REQUEST_SYNC_BASE_URL, {"agentUserId": agent_user_id}
210  )
211 
212  _LOGGER.error("No configuration for request_sync available")
213  return HTTPStatus.INTERNAL_SERVER_ERROR
214 
215  async def async_connect_agent_user(self, agent_user_id: str):
216  """Add a synced and known agent_user_id.
217 
218  Called before sending a sync response to Google.
219  """
220  self._store_store.add_agent_user_id(agent_user_id)
221 
222  async def async_disconnect_agent_user(self, agent_user_id: str):
223  """Turn off report state and disable further state reporting.
224 
225  Called when:
226  - The user disconnects their account from Google.
227  - When the cloud configuration is initialized
228  - When sync entities fails with 404
229  """
230  self._store_store.pop_agent_user_id(agent_user_id)
231 
232  @callback
234  """Return known agent users."""
235  return self._store_store.agent_user_ids
236 
237  async def _async_update_token(self, force=False):
238  if CONF_SERVICE_ACCOUNT not in self._config_config:
239  _LOGGER.error("Trying to get homegraph api token without service account")
240  return
241 
242  now = dt_util.utcnow()
243  if not self._access_token_access_token or now > self._access_token_renew_access_token_renew or force:
244  token = await _get_homegraph_token(
245  self.hasshass,
247  now,
248  self._config_config[CONF_SERVICE_ACCOUNT][CONF_CLIENT_EMAIL],
249  self._config_config[CONF_SERVICE_ACCOUNT][CONF_PRIVATE_KEY],
250  ),
251  )
252  self._access_token_access_token = token["access_token"]
253  self._access_token_renew_access_token_renew = now + timedelta(seconds=token["expires_in"])
254 
255  async def async_call_homegraph_api(self, url, data):
256  """Call a homegraph api with authentication."""
257  session = async_get_clientsession(self.hasshass)
258 
259  async def _call():
260  headers = {
261  "Authorization": f"Bearer {self._access_token}",
262  "X-GFE-SSL": "yes",
263  }
264  async with session.post(url, headers=headers, json=data) as res:
265  _LOGGER.debug(
266  "Response on %s with data %s was %s", url, data, await res.text()
267  )
268  res.raise_for_status()
269  return res.status
270 
271  try:
272  await self._async_update_token_async_update_token()
273  try:
274  return await _call()
275  except ClientResponseError as error:
276  if error.status == HTTPStatus.UNAUTHORIZED:
277  _LOGGER.warning(
278  "Request for %s unauthorized, renewing token and retrying", url
279  )
280  await self._async_update_token_async_update_token(True)
281  return await _call()
282  raise
283  except ClientResponseError as error:
284  _LOGGER.error("Request for %s failed: %d", url, error.status)
285  return error.status
286  except (TimeoutError, ClientError):
287  _LOGGER.error("Could not contact %s", url)
288  return HTTPStatus.INTERNAL_SERVER_ERROR
289 
291  self, message: dict[str, Any], agent_user_id: str, event_id: str | None = None
292  ) -> HTTPStatus:
293  """Send a state report to Google."""
294  data = {
295  "requestId": uuid4().hex,
296  "agentUserId": agent_user_id,
297  "payload": message,
298  }
299  if event_id is not None:
300  data["eventId"] = event_id
301  return await self.async_call_homegraph_apiasync_call_homegraph_api(REPORT_STATE_BASE_URL, data)
302 
303 
305  """A configuration store for google assistant."""
306 
307  _STORAGE_VERSION = 1
308  _STORAGE_VERSION_MINOR = 2
309  _STORAGE_KEY = DOMAIN
310  _data: dict[str, Any]
311 
312  def __init__(self, hass: HomeAssistant) -> None:
313  """Initialize a configuration store."""
314  self._hass_hass = hass
315  self._store: Store[dict[str, Any]] = Store(
316  hass,
317  self._STORAGE_VERSION_STORAGE_VERSION,
318  self._STORAGE_KEY_STORAGE_KEY,
319  minor_version=self._STORAGE_VERSION_MINOR_STORAGE_VERSION_MINOR,
320  )
321 
322  async def async_initialize(self) -> None:
323  """Finish initializing the ConfigStore."""
324  should_save_data = False
325  if (data := await self._store.async_load()) is None:
326  # if the store is not found create an empty one
327  # Note that the first request is always a cloud request,
328  # and that will store the correct agent user id to be used for local requests
329  data = {
330  STORE_AGENT_USER_IDS: {},
331  }
332  should_save_data = True
333 
334  for agent_user_id, agent_user_data in data[STORE_AGENT_USER_IDS].items():
335  if STORE_GOOGLE_LOCAL_WEBHOOK_ID not in agent_user_data:
336  data[STORE_AGENT_USER_IDS][agent_user_id] = {
337  **agent_user_data,
338  STORE_GOOGLE_LOCAL_WEBHOOK_ID: webhook.async_generate_id(),
339  }
340  should_save_data = True
341 
342  if should_save_data:
343  await self._store.async_save(data)
344 
345  self._data_data = data
346 
347  @property
348  def agent_user_ids(self) -> dict[str, Any]:
349  """Return a list of connected agent user_ids."""
350  return self._data_data[STORE_AGENT_USER_IDS]
351 
352  @callback
353  def add_agent_user_id(self, agent_user_id: str) -> None:
354  """Add an agent user id to store."""
355  if agent_user_id not in self._data_data[STORE_AGENT_USER_IDS]:
356  self._data_data[STORE_AGENT_USER_IDS][agent_user_id] = {
357  STORE_GOOGLE_LOCAL_WEBHOOK_ID: webhook.async_generate_id(),
358  }
359  self._store.async_delay_save(lambda: self._data_data, 1.0)
360 
361  @callback
362  def pop_agent_user_id(self, agent_user_id: str) -> None:
363  """Remove agent user id from store."""
364  if agent_user_id in self._data_data[STORE_AGENT_USER_IDS]:
365  self._data_data[STORE_AGENT_USER_IDS].pop(agent_user_id, None)
366  self._store.async_delay_save(lambda: self._data_data, 1.0)
367 
368 
369 class GoogleAssistantView(HomeAssistantView):
370  """Handle Google Assistant requests."""
371 
372  url = GOOGLE_ASSISTANT_API_ENDPOINT
373  name = "api:google_assistant"
374  requires_auth = True
375 
376  def __init__(self, config):
377  """Initialize the Google Assistant request handler."""
378  self.configconfig = config
379 
380  async def post(self, request: Request) -> Response:
381  """Handle Google Assistant requests."""
382  message: dict = await request.json()
383  result = await async_handle_message(
384  request.app[KEY_HASS],
385  self.configconfig,
386  request["hass_user"].id,
387  request["hass_user"].id,
388  message,
389  SOURCE_CLOUD,
390  )
391  return self.json(result)
392 
393 
394 async def async_get_users(hass: HomeAssistant) -> list[str]:
395  """Return stored users.
396 
397  This is called by the cloud integration to import from the previously shared store.
398  """
399  path = hass.config.path(STORAGE_DIR, GoogleConfigStore._STORAGE_KEY) # noqa: SLF001
400  try:
401  store_data = await hass.async_add_executor_job(json_util.load_json, path)
402  except HomeAssistantError:
403  return []
404 
405  if (
406  not isinstance(store_data, dict)
407  or not (data := store_data.get("data"))
408  or not isinstance(data, dict)
409  or not (agent_user_ids := data.get("agent_user_ids"))
410  or not isinstance(agent_user_ids, dict)
411  ):
412  return []
413  return list(agent_user_ids)
def async_disconnect_agent_user(self, str agent_user_id)
Definition: http.py:222
def async_connect_agent_user(self, str agent_user_id)
Definition: http.py:215
HTTPStatus async_report_state(self, dict[str, Any] message, str agent_user_id, str|None event_id=None)
Definition: http.py:292
HTTPStatus _async_request_sync_devices(self, str agent_user_id)
Definition: http.py:206
dict[str, Any] async_handle_message(HomeAssistant hass, dict[str, Any] message)
Definition: intent.py:119
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
list[str] async_get_users(HomeAssistant hass)
Definition: http.py:394
dict[str, Any]|list[Any]|Any _get_homegraph_token(HomeAssistant hass, str jwt_signed)
Definition: http.py:66
def _get_homegraph_jwt(time, iss, key)
Definition: http.py:51
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
None async_load(HomeAssistant hass)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)
Definition: storage.py:444
None async_save(self, _T data)
Definition: storage.py:424