1 """Helper classes for Google Assistant integration."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
6 from asyncio
import gather
7 from collections.abc
import Callable, Collection, Mapping
8 from datetime
import datetime, timedelta
9 from functools
import lru_cache
10 from http
import HTTPStatus
13 from typing
import Any
15 from aiohttp.web
import json_response
16 from awesomeversion
import AwesomeVersion
22 ATTR_SUPPORTED_FEATURES,
23 CLOUD_NEVER_EXPOSED_ENTITIES,
30 device_registry
as dr,
31 entity_registry
as er,
43 DEVICE_CLASS_TO_GOOGLE_TYPES,
45 DOMAIN_TO_GOOGLE_TYPES,
46 ERR_FUNCTION_NOT_SUPPORTED,
50 from .data_redaction
import async_redact_msg
51 from .error
import SmartHomeError
54 _LOGGER = logging.getLogger(__name__)
55 LOCAL_SDK_VERSION_HEADER =
"HA-Cloud-Version"
56 LOCAL_SDK_MIN_VERSION = AwesomeVersion(
"2.1.5")
61 hass: HomeAssistant, entity_id: str
63 er.RegistryEntry |
None,
64 dr.DeviceEntry |
None,
67 """Get registry entries."""
68 ent_reg = er.async_get(hass)
69 dev_reg = dr.async_get(hass)
70 area_reg = ar.async_get(hass)
72 if (entity_entry := ent_reg.async_get(entity_id))
and entity_entry.device_id:
73 device_entry = dev_reg.devices.get(entity_entry.device_id)
77 if entity_entry
and entity_entry.area_id:
78 area_id = entity_entry.area_id
79 elif device_entry
and device_entry.area_id:
80 area_id = device_entry.area_id
84 if area_id
is not None:
85 area_entry = area_reg.async_get_area(area_id)
89 return entity_entry, device_entry, area_entry
93 """Hold the configuration for Google Assistant."""
95 _unsub_report_state: Callable[[],
None] |
None =
None
97 def __init__(self, hass: HomeAssistant) ->
None:
98 """Initialize abstract config."""
100 self._google_sync_unsub: dict[str, CALLBACK_TYPE] = {}
104 self.is_supported_cache: dict[str, tuple[int |
None, bool]] = {}
105 self._on_deinitialize: list[CALLBACK_TYPE] = []
108 """Perform async initialization of config."""
112 async
def sync_google(_):
113 """Sync entities to Google."""
116 self._on_deinitialize.append(start.async_at_start(self.
hasshass, sync_google))
120 """Remove listeners."""
121 _LOGGER.debug(
"async_deinitialize")
122 while self._on_deinitialize:
123 self._on_deinitialize.pop()()
128 """Return if Google is enabled."""
133 """Return entity config."""
138 """Return entity config."""
142 """Return if we're actively reporting states."""
147 """Return if we're actively accepting local messages."""
153 """Return if states should be proactively reported."""
157 """Return if local is connected."""
166 """Map webhook ID to a Home Assistant user ID.
168 Any action initiated by Google Assistant via the local SDK will be attributed
169 to the returned user ID.
171 Return None if no user id is found for the webhook_id.
176 """Return the webhook ID to be used for actions for a given agent user id via the local SDK."""
180 """Get agent user ID from context."""
184 """Map webhook ID to a Google agent user ID.
186 Return None if no agent user id is found for the webhook_id.
191 """Return if entity should be exposed."""
195 """If an entity should have 2FA checked."""
199 self, message: dict[str, Any], agent_user_id: str, event_id: str |
None =
None
200 ) -> HTTPStatus |
None:
201 """Send a state report to Google."""
204 """Send a state report to Google for all previously synced users."""
213 """Enable proactive mode."""
216 from .report_state
import async_enable_report_state
223 """Disable report state."""
229 """Sync all entities to Google."""
231 self._google_sync_unsub.pop(agent_user_id,
lambda:
None)()
233 if status == HTTPStatus.NOT_FOUND:
238 """Sync all entities to Google for all registered agents."""
248 return max(res, default=204)
251 self, agent_user_id: str, event_id: str, payload: dict[str, Any]
253 """Sync notifications to Google."""
255 self._google_sync_unsub.pop(agent_user_id,
lambda:
None)()
256 status = await self.
async_report_stateasync_report_state(payload, agent_user_id, event_id)
257 assert status
is not None
258 if status == HTTPStatus.NOT_FOUND:
263 self, event_id: str, payload: dict[str, Any]
265 """Sync notification to Google for all registered agents."""
267 return HTTPStatus.NO_CONTENT
275 return max(res, default=HTTPStatus.NO_CONTENT)
279 """Schedule a sync."""
281 async
def _schedule_callback(_now):
282 """Handle a scheduled sync callback."""
283 self._google_sync_unsub.pop(agent_user_id,
None)
286 self._google_sync_unsub.pop(agent_user_id,
lambda:
None)()
289 self.
hasshass, SYNC_DELAY, _schedule_callback
294 """Schedule a sync for all registered agents."""
299 """Trigger a sync with Google.
301 Return value is the HTTP status code of the sync request.
303 raise NotImplementedError
307 """Add a synced and known agent_user_id.
309 Called before sending a sync response to Google.
314 """Turn off report state and disable further state reporting.
317 - The user disconnects their account from Google.
318 - When the cloud configuration is initialized
319 - When sync entities fails with 404
325 """Return known agent users."""
329 """Enable the local SDK."""
330 _LOGGER.debug(
"async_enable_local_sdk")
331 setup_successful =
True
332 setup_webhook_ids = []
335 if self.
hasshass.config.api
and self.
hasshass.config.api.use_ssl:
341 setup_successful =
False
345 "Register webhook handler %s for agent user id %s",
350 webhook.async_register(
353 "Local Support for " + user_agent_id,
358 setup_webhook_ids.append(webhook_id)
361 "Webhook handler %s for agent user id %s is already defined!",
365 setup_successful =
False
368 if not setup_successful:
370 "Local fulfillment failed to setup, falling back to cloud fulfillment"
372 for setup_webhook_id
in setup_webhook_ids:
373 webhook.async_unregister(self.
hasshass, setup_webhook_id)
379 """Disable the local SDK."""
380 _LOGGER.debug(
"async_disable_local_sdk")
387 "Unregister webhook handler %s for agent user id %s",
391 webhook.async_unregister(self.
hasshass, webhook_id)
396 """Handle an incoming local SDK message."""
399 from .
import smart_home
404 version = request.headers.get(
"HA-Cloud-Version")
406 not version
or AwesomeVersion(version) < LOCAL_SDK_MIN_VERSION
410 "Local SDK version is too old (%s), check documentation on how to"
411 " update to the latest version"
417 payload = await request.json()
419 if _LOGGER.isEnabledFor(logging.DEBUG):
421 if isinstance(payload, dict):
422 msgid = payload.get(
"requestId")
424 "Received local message %s from %s (JS %s)",
427 request.headers.get(
"HA-Cloud-Version",
"unknown"),
435 "Cannot process request for webhook %s as no linked agent user is"
441 webhook.async_unregister(self.
hasshass, webhook_id)
445 return json_response(
446 smart_home.api_disabled_response(payload, agent_user_id)
449 result = await smart_home.async_handle_message(
458 if _LOGGER.isEnabledFor(logging.DEBUG):
459 if isinstance(payload, dict):
460 _LOGGER.debug(
"Responding to local message %s", msgid)
462 _LOGGER.debug(
"Empty response to local message %s", msgid)
464 return json_response(result)
468 """Hold data associated with a particular request."""
472 config: AbstractConfig,
476 devices: list[dict] |
None,
478 """Initialize the request data."""
487 """Return if this is a local request."""
488 return self.
sourcesource == SOURCE_LOCAL
492 """Google type based on domain and device class."""
493 typ = DEVICE_CLASS_TO_GOOGLE_TYPES.get((domain, device_class))
495 return typ
if typ
is not None else DOMAIN_TO_GOOGLE_TYPES[domain]
498 @lru_cache(maxsize=4096)
500 """Return all supported traits for state."""
501 domain = state.domain
502 attributes = state.attributes
503 features = attributes.get(ATTR_SUPPORTED_FEATURES, 0)
505 if not isinstance(features, int):
507 "Entity %s contains invalid supported_features value %s",
513 device_class = state.attributes.get(ATTR_DEVICE_CLASS)
516 for Trait
in trait.TRAITS
517 if Trait.supported(domain, features, device_class, attributes)
522 """Adaptation of Entity expressed in Google's terms."""
524 __slots__ = (
"hass",
"config",
"state",
"entity_id",
"_traits")
527 self, hass: HomeAssistant, config: AbstractConfig, state: State
529 """Initialize a Google entity."""
537 """Return the representation."""
538 return f
"<GoogleEntity {self.state.entity_id}: {self.state.name}>"
542 """Return traits for entity."""
543 if self.
_traits_traits
is not None:
545 state = self.
statestate
547 Trait(self.
hasshass, state, self.
configconfig)
554 """If entity should be exposed."""
559 """Return if the entity should be exposed locally."""
563 self.
statestate.domain, self.
statestate.attributes.get(ATTR_DEVICE_CLASS)
565 not in NOT_EXPOSE_LOCAL
571 """Return if entity is supported."""
572 return bool(self.
traitstraits())
576 """Return if the entity might encounter 2FA."""
577 if not self.
configconfig.should_2fa(self.
statestate):
584 """Return if the entity might encounter 2FA based on just traits."""
585 state = self.
statestate
586 domain = state.domain
587 features = state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
588 device_class = state.attributes.get(ATTR_DEVICE_CLASS)
591 trait.might_2fa(domain, features, device_class)
for trait
in self.
traitstraits()
595 """Serialize entity for a SYNC response.
597 https://developers.google.com/actions/smarthome/create-app#actiondevicessync
599 state = self.
statestate
600 traits = self.
traitstraits()
601 entity_config = self.
configconfig.entity_config.get(state.entity_id, {})
602 name = (entity_config.get(CONF_NAME)
or state.name).strip()
611 "id": state.entity_id,
612 "name": {
"name": name},
614 "traits": [trait.name
for trait
in traits],
615 "willReportState": self.
configconfig.should_report_state,
617 state.domain, state.attributes.get(ATTR_DEVICE_CLASS)
621 if (config_aliases := entity_config.get(CONF_ALIASES, []))
or (
622 entity_entry
and entity_entry.aliases
624 device[
"name"][
"nicknames"] = [name, *config_aliases]
626 device[
"name"][
"nicknames"].extend(entity_entry.aliases)
630 device[
"otherDeviceIds"] = [{
"deviceId": self.
entity_identity_id}]
631 device[
"customData"] = {
632 "webhookId": self.
configconfig.get_local_webhook_id(agent_user_id),
633 "httpPort":
URL(
get_url(self.
hasshass, allow_external=
False)).port,
634 "uuid": instance_uuid,
639 device[
"attributes"].
update(trt.sync_attributes())
643 device.update(trt.sync_options())
646 if room := entity_config.get(CONF_ROOM_HINT):
647 device[
"roomHint"] = room
648 elif area_entry
and area_entry.name:
649 device[
"roomHint"] = area_entry.name
655 if "matter" in self.
hasshass.config.components
and any(
656 x
for x
in device_entry.identifiers
if x[0] ==
"matter"
665 device[
"matterUniqueId"] = matter_info[
"unique_id"]
666 device[
"matterOriginalVendorId"] = matter_info[
"vendor_id"]
667 device[
"matterOriginalProductId"] = matter_info[
"product_id"]
672 if device_entry.manufacturer:
673 device_info[
"manufacturer"] = device_entry.manufacturer
674 if device_entry.model:
675 device_info[
"model"] = device_entry.model
676 if device_entry.sw_version:
677 device_info[
"swVersion"] = device_entry.sw_version
680 device[
"deviceInfo"] = device_info
686 """Serialize entity for a QUERY response.
688 https://developers.google.com/actions/smarthome/create-app#actiondevicesquery
690 state = self.
statestate
692 if state.state == STATE_UNAVAILABLE:
693 return {
"online":
False}
695 attrs = {
"online":
True}
697 for trt
in self.
traitstraits():
704 """Serialize the payload for notifications to be sent."""
705 notifications: dict[str, Any] = {}
707 for trt
in self.
traitstraits():
708 deep_update(notifications, trt.query_notifications()
or {})
710 return notifications
or None
714 """Serialize entity for a REACHABLE_DEVICE response."""
715 return {
"verificationId": self.
entity_identity_id}
717 async
def execute(self, data, command_payload):
718 """Execute a command.
720 https://developers.google.com/actions/smarthome/create-app#actiondevicesexecute
722 command = command_payload[
"command"]
723 params = command_payload.get(
"params", {})
724 challenge = command_payload.get(
"challenge", {})
726 for trt
in self.
traitstraits():
727 if trt.can_execute(command, params):
728 await trt.execute(command, data, params, challenge)
734 ERR_FUNCTION_NOT_SUPPORTED,
735 f
"Unable to execute {command} for {self.state.entity_id}",
740 """Update the entity with latest info from Home Assistant."""
743 if self.
_traits_traits
is None:
746 for trt
in self.
_traits_traits:
747 trt.state = self.
statestate
751 """Update a nested dictionary with another nested dictionary."""
752 for key, value
in source.items():
753 if isinstance(value, Mapping):
754 target[key] =
deep_update(target.get(key, {}), value)
762 hass: HomeAssistant, config: AbstractConfig, state: State
763 ) -> GoogleEntity |
None:
764 """Return a GoogleEntity if entity is supported checking the cache first.
766 This function will check the cache, and call async_get_google_entity_if_supported
767 if the entity is not in the cache, which will update the cache.
769 entity_id = state.entity_id
770 is_supported_cache = config.is_supported_cache
771 features: int |
None = state.attributes.get(ATTR_SUPPORTED_FEATURES)
772 if result := is_supported_cache.get(entity_id):
773 cached_features, supported = result
774 if cached_features == features:
775 return GoogleEntity(hass, config, state)
if supported
else None
782 hass: HomeAssistant, config: AbstractConfig, state: State
783 ) -> GoogleEntity |
None:
784 """Return a GoogleEntity if entity is supported.
786 This function will update the cache, but it does not check the cache first.
788 features: int |
None = state.attributes.get(ATTR_SUPPORTED_FEATURES)
790 is_supported = bool(entity.traits())
791 config.is_supported_cache[state.entity_id] = (features, is_supported)
792 return entity
if is_supported
else None
797 hass: HomeAssistant, config: AbstractConfig
798 ) -> list[GoogleEntity]:
799 """Return all entities that are supported by Google."""
800 entities: list[GoogleEntity] = []
801 is_supported_cache = config.is_supported_cache
802 for state
in hass.states.async_all():
803 entity_id = state.entity_id
804 if entity_id
in CLOUD_NEVER_EXPOSED_ENTITIES:
809 features: int |
None = state.attributes.get(ATTR_SUPPORTED_FEATURES)
810 if result := is_supported_cache.get(entity_id):
811 cached_features, supported = result
812 if cached_features == features:
819 entities.append(entity)
def is_reporting_state(self)
def is_local_sdk_active(self)
def async_connect_agent_user(self, str agent_user_id)
None __init__(self, HomeAssistant hass)
def async_sync_entities(self, str agent_user_id)
None async_deinitialize(self)
bool should_expose(self, state)
None async_schedule_google_sync_all(self)
HTTPStatus|None async_report_state(self, dict[str, Any] message, str agent_user_id, str|None event_id=None)
def async_disconnect_agent_user(self, str agent_user_id)
def should_2fa(self, state)
def get_agent_user_id_from_context(self, context)
None async_disable_local_sdk(self)
HTTPStatus async_sync_notification(self, str agent_user_id, str event_id, dict[str, Any] payload)
int async_sync_entities_all(self)
int _async_request_sync_devices(self, str agent_user_id)
None async_enable_report_state(self)
bool is_local_connected(self)
def async_report_state_all(self, message)
def get_local_webhook_id(self, agent_user_id)
None async_enable_local_sdk(self)
def get_agent_user_id_from_webhook(self, webhook_id)
def get_local_user_id(self, webhook_id)
HTTPStatus async_sync_notification_all(self, str event_id, dict[str, Any] payload)
Collection[str] async_get_agent_users(self)
def secure_devices_pin(self)
None async_disable_report_state(self)
None async_initialize(self)
def should_report_state(self)
def _handle_local_webhook(self, hass, webhook_id, request)
def async_schedule_google_sync(self, str agent_user_id)
def execute(self, data, command_payload)
dict[str, Any]|None notifications_serialize(self)
None __init__(self, HomeAssistant hass, AbstractConfig config, State state)
bool might_2fa_traits(self)
def reachable_device_serialize(self)
def query_serialize(self)
def sync_serialize(self, agent_user_id, instance_uuid)
list[trait._Trait] traits(self)
bool should_expose_local(self)
def is_local_request(self)
None __init__(self, AbstractConfig config, str user_id, str source, str request_id, list[dict]|None devices)
dict[str, Any] async_redact_msg(dict[str, Any] msg, str agent_user_id)
GoogleEntity|None async_get_google_entity_if_supported_cached(HomeAssistant hass, AbstractConfig config, State state)
list[GoogleEntity] async_get_entities(HomeAssistant hass, AbstractConfig config)
GoogleEntity|None async_get_google_entity_if_supported(HomeAssistant hass, AbstractConfig config, State state)
def get_google_type(domain, device_class)
def deep_update(target, source)
tuple[ er.RegistryEntry|None, dr.DeviceEntry|None, ar.AreaEntry|None,] _get_registry_entries(HomeAssistant hass, str entity_id)
list[type[trait._Trait]] supported_traits_for_state(State state)
IssData update(pyiss.ISS iss)
MatterDeviceInfo|None get_matter_device_info(HomeAssistant hass, str device_id)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
str get_url(HomeAssistant hass, *bool require_current_request=False, bool require_ssl=False, bool require_standard_port=False, bool require_cloud=False, bool allow_internal=True, bool allow_external=True, bool allow_cloud=True, bool|None allow_ip=None, bool|None prefer_external=None, bool prefer_cloud=False)
str partial_redact(str|Any x, int unmasked_prefix=4, int unmasked_suffix=4)