1 """Webhook handlers for mobile_app."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine
7 from contextlib
import suppress
8 from functools
import lru_cache, wraps
9 from http
import HTTPStatus
12 from typing
import Any
14 from aiohttp.web
import HTTPBadRequest, Request, Response, json_response
15 from nacl.exceptions
import CryptoError
16 from nacl.secret
import SecretBox
17 import voluptuous
as vol
23 notify
as hass_notify,
43 ATTR_SUPPORTED_FEATURES,
52 config_validation
as cv,
53 device_registry
as dr,
54 entity_registry
as er,
64 ATTR_CAMERA_ENTITY_ID,
71 ATTR_NO_LEGACY_ENCRYPTION,
73 ATTR_SENSOR_ATTRIBUTES,
74 ATTR_SENSOR_DEVICE_CLASS,
76 ATTR_SENSOR_ENTITY_CATEGORY,
80 ATTR_SENSOR_STATE_CLASS,
82 ATTR_SENSOR_TYPE_BINARY_SENSOR,
83 ATTR_SENSOR_TYPE_SENSOR,
84 ATTR_SENSOR_UNIQUE_ID,
87 ATTR_SUPPORTS_ENCRYPTION,
89 ATTR_TEMPLATE_VARIABLES,
90 ATTR_VERTICAL_ACCURACY,
92 ATTR_WEBHOOK_ENCRYPTED,
93 ATTR_WEBHOOK_ENCRYPTED_DATA,
102 ERR_ENCRYPTION_ALREADY_ENABLED,
103 ERR_ENCRYPTION_REQUIRED,
105 ERR_SENSOR_NOT_REGISTERED,
107 SIGNAL_LOCATION_UPDATE,
108 SIGNAL_SENSOR_UPDATE,
110 from .helpers
import (
112 decrypt_payload_legacy,
115 registration_context,
120 _LOGGER = logging.getLogger(__name__)
124 WEBHOOK_COMMANDS: Registry[
125 str, Callable[[HomeAssistant, ConfigEntry, Any], Coroutine[Any, Any, Response]]
128 SENSOR_TYPES = (ATTR_SENSOR_TYPE_BINARY_SENSOR, ATTR_SENSOR_TYPE_SENSOR)
130 WEBHOOK_PAYLOAD_SCHEMA = vol.Any(
133 vol.Required(ATTR_WEBHOOK_TYPE): cv.string,
134 vol.Optional(ATTR_WEBHOOK_DATA): vol.Any(dict, list),
139 vol.Required(ATTR_WEBHOOK_TYPE): cv.string,
140 vol.Required(ATTR_WEBHOOK_ENCRYPTED):
True,
141 vol.Optional(ATTR_WEBHOOK_ENCRYPTED_DATA): cv.string,
146 SENSOR_SCHEMA_FULL = vol.Schema(
148 vol.Optional(ATTR_SENSOR_ATTRIBUTES, default={}): dict,
149 vol.Optional(ATTR_SENSOR_ICON, default=
"mdi:cellphone"): vol.Any(
None, cv.icon),
150 vol.Required(ATTR_SENSOR_STATE): vol.Any(
None, bool, int, float, str),
151 vol.Required(ATTR_SENSOR_TYPE): vol.In(SENSOR_TYPES),
152 vol.Required(ATTR_SENSOR_UNIQUE_ID): cv.string,
158 """Decorate a webhook function with a schema."""
159 if isinstance(schema, dict):
160 schema = vol.Schema(schema)
163 """Wrap function so we validate schema."""
166 async
def validate_and_run(hass, config_entry, data):
167 """Validate input and call handler."""
170 except vol.Invalid
as ex:
171 err = vol.humanize.humanize_error(data, ex)
172 _LOGGER.error(
"Received invalid webhook payload: %s", err)
175 return await func(hass, config_entry, data)
177 return validate_and_run
183 hass: HomeAssistant, webhook_id: str, request: Request
185 """Handle webhook callback."""
186 if webhook_id
in hass.data[DOMAIN][DATA_DELETED_IDS]:
187 return Response(status=410)
189 config_entry: ConfigEntry = hass.data[DOMAIN][DATA_CONFIG_ENTRIES][webhook_id]
191 device_name: str = config_entry.data[ATTR_DEVICE_NAME]
194 req_data = await request.json()
196 _LOGGER.warning(
"Received invalid JSON from mobile_app device: %s", device_name)
200 ATTR_WEBHOOK_ENCRYPTED
not in req_data
201 and config_entry.data[ATTR_SUPPORTS_ENCRYPTION]
204 "Refusing to accept unencrypted webhook from %s",
207 return error_response(ERR_ENCRYPTION_REQUIRED,
"Encryption required")
211 except vol.Invalid
as ex:
212 err = vol.humanize.humanize_error(req_data, ex)
214 "Received invalid webhook from %s with payload: %s", device_name, err
218 webhook_type = req_data[ATTR_WEBHOOK_TYPE]
220 webhook_payload =
None
222 if ATTR_WEBHOOK_ENCRYPTED
in req_data:
223 enc_data = req_data[ATTR_WEBHOOK_ENCRYPTED_DATA]
225 webhook_payload =
decrypt_payload(config_entry.data[CONF_SECRET], enc_data)
226 if ATTR_NO_LEGACY_ENCRYPTION
not in config_entry.data:
227 data = {**config_entry.data, ATTR_NO_LEGACY_ENCRYPTION:
True}
228 hass.config_entries.async_update_entry(config_entry, data=data)
230 if ATTR_NO_LEGACY_ENCRYPTION
not in config_entry.data:
233 config_entry.data[CONF_SECRET], enc_data
237 "Ignoring encrypted payload because unable to decrypt"
240 _LOGGER.warning(
"Ignoring invalid JSON in encrypted payload")
242 _LOGGER.warning(
"Ignoring encrypted payload because unable to decrypt")
243 except ValueError
as err:
244 _LOGGER.warning(
"Ignoring invalid JSON in encrypted payload: %s", err)
246 webhook_payload = req_data.get(ATTR_WEBHOOK_DATA, {})
248 if webhook_payload
is None:
251 if webhook_type
not in WEBHOOK_COMMANDS:
253 "Received invalid webhook from %s of type: %s", device_name, webhook_type
258 "Received webhook payload from %s for type %s: %s",
265 return await asyncio.shield(
266 WEBHOOK_COMMANDS[webhook_type](hass, config_entry, webhook_payload)
270 @WEBHOOK_COMMANDS.register("call_service")
271 @validate_schema(
{
vol.Required(ATTR_DOMAIN): cv.string,
272 vol.Required(ATTR_SERVICE): cv.string,
273 vol.Optional(ATTR_SERVICE_DATA, default={}): dict,
277 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, Any]
279 """Handle a call service webhook."""
281 await hass.services.async_call(
284 data[ATTR_SERVICE_DATA],
288 except (vol.Invalid, ServiceNotFound, Exception)
as ex:
291 "Error when calling service during mobile_app "
292 "webhook (device name: %s): %s"
294 config_entry.data[ATTR_DEVICE_NAME],
297 raise HTTPBadRequest
from ex
302 @WEBHOOK_COMMANDS.register("fire_event")
303 @validate_schema(
{
vol.Required(ATTR_EVENT_TYPE): cv.string,
304 vol.Optional(ATTR_EVENT_DATA, default={}): dict,
308 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, Any]
310 """Handle a fire event webhook."""
311 event_type: str = data[ATTR_EVENT_TYPE]
314 data[ATTR_EVENT_DATA],
321 @WEBHOOK_COMMANDS.register("conversation_process")
322 @validate_schema(
{
vol.Required("text"): cv.string,
323 vol.Optional(
"language"): cv.string,
324 vol.Optional(
"conversation_id"): cv.string,
328 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, Any]
330 """Handle a conversation process webhook."""
331 result = await conversation.async_converse(
334 language=data.get(
"language"),
335 conversation_id=data.get(
"conversation_id"),
341 @WEBHOOK_COMMANDS.register("stream_camera")
342 @validate_schema({vol.Required(ATTR_CAMERA_ENTITY_ID): cv.string})
344 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, str]
346 """Handle a request to HLS-stream a camera."""
347 if (camera_state := hass.states.get(data[ATTR_CAMERA_ENTITY_ID]))
is None:
350 registration=config_entry.data,
351 status=HTTPStatus.BAD_REQUEST,
354 resp: dict[str, Any] = {
355 "mjpeg_path": f
"/api/camera_proxy_stream/{camera_state.entity_id}"
358 if camera_state.attributes[ATTR_SUPPORTED_FEATURES] & CameraEntityFeature.STREAM:
360 resp[
"hls_path"] = await camera.async_request_stream(
361 hass, camera_state.entity_id,
"hls"
363 except HomeAssistantError:
364 resp[
"hls_path"] =
None
366 resp[
"hls_path"] =
None
372 def _cached_template(template_str: str, hass: HomeAssistant) -> template.Template:
373 """Return a cached template."""
374 return template.Template(template_str, hass)
377 @WEBHOOK_COMMANDS.register("render_template")
378 @validate_schema(
{
str: {
vol.Required(ATTR_TEMPLATE): cv.string,
379 vol.Optional(ATTR_TEMPLATE_VARIABLES, default={}): dict,
384 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, Any]
386 """Handle a render template webhook."""
388 for key, item
in data.items():
391 resp[key] = tpl.async_render(item.get(ATTR_TEMPLATE_VARIABLES))
392 except TemplateError
as ex:
393 resp[key] = {
"error":
str(ex)}
398 @WEBHOOK_COMMANDS.register("update_location")
399 @validate_schema(
vol.All(
cv.key_dependency(ATTR_GPS, ATTR_GPS_ACCURACY),
402 vol.Optional(ATTR_LOCATION_NAME): cv.string,
403 vol.Optional(ATTR_GPS): cv.gps,
404 vol.Optional(ATTR_GPS_ACCURACY): cv.positive_int,
405 vol.Optional(ATTR_BATTERY): cv.positive_int,
406 vol.Optional(ATTR_SPEED): cv.positive_int,
407 vol.Optional(ATTR_ALTITUDE): vol.Coerce(float),
408 vol.Optional(ATTR_COURSE): cv.positive_int,
409 vol.Optional(ATTR_VERTICAL_ACCURACY): cv.positive_int,
415 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, Any]
417 """Handle an update location webhook."""
419 hass, SIGNAL_LOCATION_UPDATE.format(config_entry.entry_id), data
424 @WEBHOOK_COMMANDS.register("update_registration")
425 @validate_schema(
{
vol.Optional(ATTR_APP_DATA): SCHEMA_APP_DATA,
426 vol.Required(ATTR_APP_VERSION): cv.string,
427 vol.Required(ATTR_DEVICE_NAME): cv.string,
428 vol.Required(ATTR_MANUFACTURER): cv.string,
429 vol.Required(ATTR_MODEL): cv.string,
430 vol.Optional(ATTR_OS_VERSION): cv.string,
434 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, Any]
436 """Handle an update registration webhook."""
437 new_registration = {**config_entry.data, **data}
439 device_registry = dr.async_get(hass)
441 device_registry.async_get_or_create(
442 config_entry_id=config_entry.entry_id,
443 identifiers={(DOMAIN, config_entry.data[ATTR_DEVICE_ID])},
444 manufacturer=new_registration[ATTR_MANUFACTURER],
445 model=new_registration[ATTR_MODEL],
446 name=new_registration[ATTR_DEVICE_NAME],
447 sw_version=new_registration[ATTR_OS_VERSION],
450 hass.config_entries.async_update_entry(config_entry, data=new_registration)
452 await hass_notify.async_reload(hass, DOMAIN)
456 registration=new_registration,
460 @WEBHOOK_COMMANDS.register("enable_encryption")
462 hass: HomeAssistant, config_entry: ConfigEntry, data: Any
464 """Handle a encryption enable webhook."""
465 if config_entry.data[ATTR_SUPPORTS_ENCRYPTION]:
467 "Refusing to enable encryption for %s because it is already enabled!",
468 config_entry.data[ATTR_DEVICE_NAME],
471 ERR_ENCRYPTION_ALREADY_ENABLED,
"Encryption already enabled"
474 secret = secrets.token_hex(SecretBox.KEY_SIZE)
478 ATTR_SUPPORTS_ENCRYPTION:
True,
482 hass.config_entries.async_update_entry(config_entry, data=update_data)
484 return json_response({
"secret": secret})
488 """Validate we only set state class for sensors."""
490 ATTR_SENSOR_STATE_CLASS
in value
491 and value[ATTR_SENSOR_TYPE] != ATTR_SENSOR_TYPE_SENSOR
493 raise vol.Invalid(
"state_class only allowed for sensors")
498 def _gen_unique_id(webhook_id: str, sensor_unique_id: str) -> str:
499 """Return a unique sensor ID."""
500 return f
"{webhook_id}_{sensor_unique_id}"
504 """Return a unique sensor ID."""
505 return unique_id[len(webhook_id) + 1 :]
508 @WEBHOOK_COMMANDS.register("register_sensor")
509 @validate_schema(
vol.All(
{
vol.Optional(ATTR_SENSOR_ATTRIBUTES, default={}): dict,
510 vol.Optional(ATTR_SENSOR_DEVICE_CLASS): vol.Any(
512 vol.All(vol.Lower, vol.Coerce(BinarySensorDeviceClass)),
513 vol.All(vol.Lower, vol.Coerce(SensorDeviceClass)),
515 vol.Required(ATTR_SENSOR_NAME): cv.string,
516 vol.Required(ATTR_SENSOR_TYPE): vol.In(SENSOR_TYPES),
517 vol.Required(ATTR_SENSOR_UNIQUE_ID): cv.string,
518 vol.Optional(ATTR_SENSOR_UOM): vol.Any(
None, cv.string),
519 vol.Optional(ATTR_SENSOR_STATE, default=
None): vol.Any(
520 None, bool, int, float, str
522 vol.Optional(ATTR_SENSOR_ENTITY_CATEGORY): vol.Any(
523 None, vol.Coerce(EntityCategory)
525 vol.Optional(ATTR_SENSOR_ICON, default=
"mdi:cellphone"): vol.Any(
528 vol.Optional(ATTR_SENSOR_STATE_CLASS): vol.Any(
529 None, vol.Coerce(SensorStateClass)
531 vol.Optional(ATTR_SENSOR_DISABLED): bool,
533 _validate_state_class_sensor,
537 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, Any]
539 """Handle a register sensor webhook."""
540 entity_type: str = data[ATTR_SENSOR_TYPE]
541 unique_id: str = data[ATTR_SENSOR_UNIQUE_ID]
542 device_name: str = config_entry.data[ATTR_DEVICE_NAME]
544 unique_store_key =
_gen_unique_id(config_entry.data[CONF_WEBHOOK_ID], unique_id)
545 entity_registry = er.async_get(hass)
546 existing_sensor = entity_registry.async_get_entity_id(
547 entity_type, DOMAIN, unique_store_key
550 data[CONF_WEBHOOK_ID] = config_entry.data[CONF_WEBHOOK_ID]
555 "Re-register for %s of existing sensor %s", device_name, unique_id
558 entry = entity_registry.async_get(existing_sensor)
559 assert entry
is not None
560 changes: dict[str, Any] = {}
563 new_name := f
"{device_name} {data[ATTR_SENSOR_NAME]}"
564 ) != entry.original_name:
565 changes[
"original_name"] = new_name
568 should_be_disabled := data.get(ATTR_SENSOR_DISABLED)
569 )
is None or should_be_disabled == entry.disabled:
571 elif should_be_disabled:
572 changes[
"disabled_by"] = er.RegistryEntryDisabler.INTEGRATION
574 changes[
"disabled_by"] =
None
576 for ent_reg_key, data_key
in (
577 (
"device_class", ATTR_SENSOR_DEVICE_CLASS),
578 (
"unit_of_measurement", ATTR_SENSOR_UOM),
579 (
"entity_category", ATTR_SENSOR_ENTITY_CATEGORY),
580 (
"original_icon", ATTR_SENSOR_ICON),
582 if data_key
in data
and getattr(entry, ent_reg_key) != data[data_key]:
583 changes[ent_reg_key] = data[data_key]
586 entity_registry.async_update_entity(existing_sensor, **changes)
590 data[CONF_UNIQUE_ID] = unique_store_key
592 f
"{config_entry.data[ATTR_DEVICE_NAME]} {data[ATTR_SENSOR_NAME]}"
595 register_signal = f
"{DOMAIN}_{data[ATTR_SENSOR_TYPE]}_register"
600 registration=config_entry.data,
601 status=HTTPStatus.CREATED,
605 @WEBHOOK_COMMANDS.register("update_sensor_states")
606 @validate_schema(
vol.All(
cv.ensure_list,
[
# Partial schema, enough to identify schema.
# We don't validate everything because otherwise 1 invalid sensor
# will invalidate all sensors.
vol.Schema(
{
vol.Required(ATTR_SENSOR_TYPE): vol.In(SENSOR_TYPES),
607 vol.Required(ATTR_SENSOR_UNIQUE_ID): cv.string,
609 extra=vol.ALLOW_EXTRA,
615 hass: HomeAssistant, config_entry: ConfigEntry, data: list[dict[str, Any]]
617 """Handle an update sensor states webhook."""
618 device_name: str = config_entry.data[ATTR_DEVICE_NAME]
619 resp: dict[str, Any] = {}
620 entity_registry = er.async_get(hass)
623 entity_type: str = sensor[ATTR_SENSOR_TYPE]
625 unique_id: str = sensor[ATTR_SENSOR_UNIQUE_ID]
627 unique_store_key =
_gen_unique_id(config_entry.data[CONF_WEBHOOK_ID], unique_id)
630 entity_id := entity_registry.async_get_entity_id(
631 entity_type, DOMAIN, unique_store_key
635 "Refusing to update %s non-registered sensor: %s",
639 err_msg = f
"{entity_type} {unique_id} is not registered"
642 "error": {
"code": ERR_SENSOR_NOT_REGISTERED,
"message": err_msg},
648 except vol.Invalid
as err:
649 err_msg = vol.humanize.humanize_error(sensor, err)
651 "Received invalid sensor payload from %s for %s: %s",
658 "error": {
"code": ERR_INVALID_FORMAT,
"message": err_msg},
662 sensor[CONF_WEBHOOK_ID] = config_entry.data[CONF_WEBHOOK_ID]
665 f
"{SIGNAL_SENSOR_UPDATE}-{unique_store_key}",
669 resp[unique_id] = {
"success":
True}
672 entry = entity_registry.async_get(entity_id)
674 if entry
and entry.disabled_by:
675 resp[unique_id][
"is_disabled"] =
True
680 @WEBHOOK_COMMANDS.register("get_zones")
682 hass: HomeAssistant, config_entry: ConfigEntry, data: Any
684 """Handle a get zones webhook."""
686 hass.states.get(entity_id)
687 for entity_id
in sorted(hass.states.async_entity_ids(ZONE_DOMAIN))
692 @WEBHOOK_COMMANDS.register("get_config")
694 hass: HomeAssistant, config_entry: ConfigEntry, data: Any
696 """Handle a get config webhook."""
697 hass_config = hass.config.as_dict()
699 device: dr.DeviceEntry = hass.data[DOMAIN][DATA_DEVICES][
700 config_entry.data[CONF_WEBHOOK_ID]
704 "latitude": hass_config[
"latitude"],
705 "longitude": hass_config[
"longitude"],
706 "elevation": hass_config[
"elevation"],
707 "hass_device_id": device.id,
708 "unit_system": hass_config[
"unit_system"],
709 "location_name": hass_config[
"location_name"],
710 "time_zone": hass_config[
"time_zone"],
711 "components": hass_config[
"components"],
712 "version": hass_config[
"version"],
713 "theme_color": MANIFEST_JSON[
"theme_color"],
716 if CONF_CLOUDHOOK_URL
in config_entry.data:
717 resp[CONF_CLOUDHOOK_URL] = config_entry.data[CONF_CLOUDHOOK_URL]
719 if cloud.async_active_subscription(hass):
721 resp[CONF_REMOTE_UI_URL] = cloud.async_remote_ui_url(hass)
723 webhook_id = config_entry.data[CONF_WEBHOOK_ID]
726 for entry
in er.async_entries_for_config_entry(
727 er.async_get(hass), config_entry.entry_id
729 if entry.domain
in (
"binary_sensor",
"sensor"):
732 unique_id = entry.unique_id
734 entities[unique_id] = {
"disabled": entry.disabled}
736 resp[
"entities"] = entities
741 @WEBHOOK_COMMANDS.register("scan_tag")
742 @validate_schema({vol.Required("tag_id"): cv.string})
744 hass: HomeAssistant, config_entry: ConfigEntry, data: dict[str, str]
746 """Handle a fire event webhook."""
747 await tag.async_scan_tag(
750 hass.data[DOMAIN][DATA_DEVICES][config_entry.data[CONF_WEBHOOK_ID]].id,
754
JsonValueType|None decrypt_payload(str key, bytes ciphertext)
Response error_response(str code, str message, HTTPStatus status=HTTPStatus.BAD_REQUEST, dict|None headers=None)
dict safe_registration(dict registration)
Context registration_context(Mapping[str, Any] registration)
Response empty_okay_response(dict|None headers=None, HTTPStatus status=HTTPStatus.OK)
Response webhook_response(Any data, *Mapping[str, Any] registration, HTTPStatus status=HTTPStatus.OK, Mapping[str, str]|None headers=None)
JsonValueType|None decrypt_payload_legacy(str key, bytes ciphertext)
str _extract_sensor_unique_id(str webhook_id, str unique_id)
Response webhook_enable_encryption(HomeAssistant hass, ConfigEntry config_entry, Any data)
Response webhook_conversation_process(HomeAssistant hass, ConfigEntry config_entry, dict[str, Any] data)
Response webhook_stream_camera(HomeAssistant hass, ConfigEntry config_entry, dict[str, str] data)
Response webhook_update_location(HomeAssistant hass, ConfigEntry config_entry, dict[str, Any] data)
Response webhook_render_template(HomeAssistant hass, ConfigEntry config_entry, dict[str, Any] data)
Response webhook_update_sensor_states(HomeAssistant hass, ConfigEntry config_entry, list[dict[str, Any]] data)
str _gen_unique_id(str webhook_id, str sensor_unique_id)
Response handle_webhook(HomeAssistant hass, str webhook_id, Request request)
Response webhook_register_sensor(HomeAssistant hass, ConfigEntry config_entry, dict[str, Any] data)
Response webhook_scan_tag(HomeAssistant hass, ConfigEntry config_entry, dict[str, str] data)
Response webhook_get_config(HomeAssistant hass, ConfigEntry config_entry, Any data)
dict[str, Any] _validate_state_class_sensor(dict[str, Any] value)
Response webhook_update_registration(HomeAssistant hass, ConfigEntry config_entry, dict[str, Any] data)
def validate_schema(schema)
template.Template _cached_template(str template_str, HomeAssistant hass)
Response webhook_call_service(HomeAssistant hass, ConfigEntry config_entry, dict[str, Any] data)
Response webhook_get_zones(HomeAssistant hass, ConfigEntry config_entry, Any data)
Response webhook_fire_event(HomeAssistant hass, ConfigEntry config_entry, dict[str, Any] data)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)