1 """Support for Google Actions Smart Home Control."""
3 from __future__
import annotations
5 from datetime
import timedelta
6 from http
import HTTPStatus
11 from aiohttp
import ClientError, ClientResponseError
12 from aiohttp.web
import Request, Response
29 CONF_EXPOSE_BY_DEFAULT,
33 CONF_SECURE_DEVICES_PIN,
36 GOOGLE_ASSISTANT_API_ENDPOINT,
39 REPORT_STATE_BASE_URL,
40 REQUEST_SYNC_BASE_URL,
43 STORE_GOOGLE_LOCAL_WEBHOOK_ID,
45 from .helpers
import AbstractConfig
46 from .smart_home
import async_handle_message
48 _LOGGER = logging.getLogger(__name__)
52 now =
int(time.timestamp())
56 "scope": HOMEGRAPH_SCOPE,
57 "aud": HOMEGRAPH_TOKEN_URL,
61 return jwt.encode(jwt_raw, key, algorithm=
"RS256")
65 hass: HomeAssistant, jwt_signed: str
66 ) -> dict[str, Any] | list[Any] | Any:
68 "Authorization": f
"Bearer {jwt_signed}",
69 "Content-Type":
"application/x-www-form-urlencoded",
72 "grant_type":
"urn:ietf:params:oauth:grant-type:jwt-bearer",
73 "assertion": jwt_signed,
77 async
with session.post(HOMEGRAPH_TOKEN_URL, headers=headers, data=data)
as res:
78 res.raise_for_status()
79 return await res.json()
83 """Config for manual setup of Google."""
85 _store: GoogleConfigStore
88 """Initialize the config."""
95 """Perform async initialization of config."""
106 """Return if Google is enabled."""
111 """Return entity config."""
112 return self.
_config_config.
get(CONF_ENTITY_CONFIG)
or {}
116 """Return entity config."""
117 return self.
_config_config.
get(CONF_SECURE_DEVICES_PIN)
121 """Return if states should be proactively reported."""
122 return self.
_config_config.
get(CONF_REPORT_STATE)
125 """Map webhook ID to a Home Assistant user ID.
127 Any action initiated by Google Assistant via the local SDK will be attributed
128 to the returned user ID.
130 Return None if no user id is found for the webhook_id.
134 found_agent_user_id =
None
135 for agent_user_id, agent_user_data
in self.
_store_store.agent_user_ids.items():
136 if agent_user_data[STORE_GOOGLE_LOCAL_WEBHOOK_ID] == webhook_id:
137 found_agent_user_id = agent_user_id
140 return found_agent_user_id
143 """Return the webhook ID to be used for actions for a given agent user id via the local SDK."""
144 if data := self.
_store_store.agent_user_ids.get(agent_user_id):
145 return data[STORE_GOOGLE_LOCAL_WEBHOOK_ID]
149 """Get agent user ID making request."""
150 return context.user_id
153 """Map webhook ID to a Google agent user ID.
155 Return None if no agent user id is found for the webhook_id.
157 for agent_user_id, agent_user_data
in self.
_store_store.agent_user_ids.items():
158 if agent_user_data[STORE_GOOGLE_LOCAL_WEBHOOK_ID] == webhook_id:
164 """Return if entity should be exposed."""
165 expose_by_default = self.
_config_config.
get(CONF_EXPOSE_BY_DEFAULT)
166 exposed_domains = self.
_config_config.
get(CONF_EXPOSED_DOMAINS)
168 if state.attributes.get(
"view")
is not None:
172 if state.entity_id
in CLOUD_NEVER_EXPOSED_ENTITIES:
175 entity_registry = er.async_get(self.
hasshass)
176 registry_entry = entity_registry.async_get(state.entity_id)
179 registry_entry.entity_category
is not None
180 or registry_entry.hidden_by
is not None
183 auxiliary_entity =
False
187 domain_exposed_by_default = (
188 expose_by_default
and state.domain
in exposed_domains
193 entity_exposed_by_default = domain_exposed_by_default
and not auxiliary_entity
198 is_default_exposed = entity_exposed_by_default
and explicit_expose
is not False
200 return is_default_exposed
or explicit_expose
203 """If an entity should have 2FA checked."""
207 if CONF_SERVICE_ACCOUNT
in self.
_config_config:
209 REQUEST_SYNC_BASE_URL, {
"agentUserId": agent_user_id}
212 _LOGGER.error(
"No configuration for request_sync available")
213 return HTTPStatus.INTERNAL_SERVER_ERROR
216 """Add a synced and known agent_user_id.
218 Called before sending a sync response to Google.
220 self.
_store_store.add_agent_user_id(agent_user_id)
223 """Turn off report state and disable further state reporting.
226 - The user disconnects their account from Google.
227 - When the cloud configuration is initialized
228 - When sync entities fails with 404
230 self.
_store_store.pop_agent_user_id(agent_user_id)
234 """Return known agent users."""
235 return self.
_store_store.agent_user_ids
238 if CONF_SERVICE_ACCOUNT
not in self.
_config_config:
239 _LOGGER.error(
"Trying to get homegraph api token without service account")
242 now = dt_util.utcnow()
248 self.
_config_config[CONF_SERVICE_ACCOUNT][CONF_CLIENT_EMAIL],
249 self.
_config_config[CONF_SERVICE_ACCOUNT][CONF_PRIVATE_KEY],
256 """Call a homegraph api with authentication."""
261 "Authorization": f
"Bearer {self._access_token}",
264 async
with session.post(url, headers=headers, json=data)
as res:
266 "Response on %s with data %s was %s", url, data, await res.text()
268 res.raise_for_status()
275 except ClientResponseError
as error:
276 if error.status == HTTPStatus.UNAUTHORIZED:
278 "Request for %s unauthorized, renewing token and retrying", url
283 except ClientResponseError
as error:
284 _LOGGER.error(
"Request for %s failed: %d", url, error.status)
286 except (TimeoutError, ClientError):
287 _LOGGER.error(
"Could not contact %s", url)
288 return HTTPStatus.INTERNAL_SERVER_ERROR
291 self, message: dict[str, Any], agent_user_id: str, event_id: str |
None =
None
293 """Send a state report to Google."""
295 "requestId": uuid4().hex,
296 "agentUserId": agent_user_id,
299 if event_id
is not None:
300 data[
"eventId"] = event_id
305 """A configuration store for google assistant."""
308 _STORAGE_VERSION_MINOR = 2
309 _STORAGE_KEY = DOMAIN
310 _data: dict[str, Any]
313 """Initialize a configuration store."""
315 self._store: Store[dict[str, Any]] =
Store(
323 """Finish initializing the ConfigStore."""
324 should_save_data =
False
325 if (data := await self._store.
async_load())
is None:
330 STORE_AGENT_USER_IDS: {},
332 should_save_data =
True
334 for agent_user_id, agent_user_data
in data[STORE_AGENT_USER_IDS].items():
335 if STORE_GOOGLE_LOCAL_WEBHOOK_ID
not in agent_user_data:
336 data[STORE_AGENT_USER_IDS][agent_user_id] = {
338 STORE_GOOGLE_LOCAL_WEBHOOK_ID: webhook.async_generate_id(),
340 should_save_data =
True
349 """Return a list of connected agent user_ids."""
350 return self.
_data_data[STORE_AGENT_USER_IDS]
354 """Add an agent user id to store."""
355 if agent_user_id
not in self.
_data_data[STORE_AGENT_USER_IDS]:
356 self.
_data_data[STORE_AGENT_USER_IDS][agent_user_id] = {
357 STORE_GOOGLE_LOCAL_WEBHOOK_ID: webhook.async_generate_id(),
363 """Remove agent user id from store."""
364 if agent_user_id
in self.
_data_data[STORE_AGENT_USER_IDS]:
365 self.
_data_data[STORE_AGENT_USER_IDS].pop(agent_user_id,
None)
370 """Handle Google Assistant requests."""
372 url = GOOGLE_ASSISTANT_API_ENDPOINT
373 name =
"api:google_assistant"
377 """Initialize the Google Assistant request handler."""
380 async
def post(self, request: Request) -> Response:
381 """Handle Google Assistant requests."""
382 message: dict = await request.json()
384 request.app[KEY_HASS],
386 request[
"hass_user"].id,
387 request[
"hass_user"].id,
391 return self.json(result)
395 """Return stored users.
397 This is called by the cloud integration to import from the previously shared store.
399 path = hass.config.path(STORAGE_DIR, GoogleConfigStore._STORAGE_KEY)
401 store_data = await hass.async_add_executor_job(json_util.load_json, path)
402 except HomeAssistantError:
406 not isinstance(store_data, dict)
407 or not (data := store_data.get(
"data"))
408 or not isinstance(data, dict)
409 or not (agent_user_ids := data.get(
"agent_user_ids"))
410 or not isinstance(agent_user_ids, dict)
413 return list(agent_user_ids)
None async_enable_local_sdk(self)
Response post(self, Request request)
def __init__(self, config)
None async_initialize(self)
dict[str, Any] agent_user_ids(self)
None add_agent_user_id(self, str agent_user_id)
None __init__(self, HomeAssistant hass)
int _STORAGE_VERSION_MINOR
None pop_agent_user_id(self, str agent_user_id)
def __init__(self, hass, config)
def async_call_homegraph_api(self, url, data)
def get_local_user_id(self, webhook_id)
def secure_devices_pin(self)
def async_disconnect_agent_user(self, str agent_user_id)
def get_local_webhook_id(self, agent_user_id)
def get_agent_user_id_from_webhook(self, webhook_id)
def should_report_state(self)
def _async_update_token(self, force=False)
def async_connect_agent_user(self, str agent_user_id)
HTTPStatus async_report_state(self, dict[str, Any] message, str agent_user_id, str|None event_id=None)
def get_agent_user_id_from_context(self, context)
def async_initialize(self)
def should_2fa(self, state)
HTTPStatus _async_request_sync_devices(self, str agent_user_id)
def async_get_agent_users(self)
bool should_expose(self, state)
dict[str, Any] async_handle_message(HomeAssistant hass, dict[str, Any] message)
web.Response get(self, web.Request request, str config_key)
list[str] async_get_users(HomeAssistant hass)
dict[str, Any]|list[Any]|Any _get_homegraph_token(HomeAssistant hass, str jwt_signed)
def _get_homegraph_jwt(time, iss, key)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
None async_load(HomeAssistant hass)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)
None async_save(self, _T data)