Home Assistant Unofficial Reference 2024.12.1
webhook.py
Go to the documentation of this file.
1 """The Netatmo integration."""
2 
3 import logging
4 
5 from aiohttp.web import Request
6 
7 from homeassistant.const import ATTR_DEVICE_ID, ATTR_ID, ATTR_NAME
8 from homeassistant.core import HomeAssistant
9 from homeassistant.helpers.dispatcher import async_dispatcher_send
10 
11 from .const import (
12  ATTR_EVENT_TYPE,
13  ATTR_FACE_URL,
14  ATTR_HOME_ID,
15  ATTR_IS_KNOWN,
16  ATTR_PERSONS,
17  DATA_DEVICE_IDS,
18  DATA_PERSONS,
19  DEFAULT_PERSON,
20  DOMAIN,
21  EVENT_ID_MAP,
22  NETATMO_EVENT,
23 )
24 
25 _LOGGER = logging.getLogger(__name__)
26 
27 SUBEVENT_TYPE_MAP = {
28  "outdoor": "",
29  "therm_mode": "",
30 }
31 
32 
34  hass: HomeAssistant, webhook_id: str, request: Request
35 ) -> None:
36  """Handle webhook callback."""
37  try:
38  data = await request.json()
39  except ValueError as err:
40  _LOGGER.error("Error in data: %s", err)
41  return
42 
43  _LOGGER.debug("Got webhook data: %s", data)
44 
45  event_type = data.get(ATTR_EVENT_TYPE)
46 
47  if event_type in SUBEVENT_TYPE_MAP:
48  async_send_event(hass, event_type, data)
49 
50  for event_data in data.get(SUBEVENT_TYPE_MAP[event_type], []):
51  async_evaluate_event(hass, event_data)
52 
53  else:
54  async_evaluate_event(hass, data)
55 
56 
57 def async_evaluate_event(hass: HomeAssistant, event_data: dict) -> None:
58  """Evaluate events from webhook."""
59  event_type = event_data.get(ATTR_EVENT_TYPE, "None")
60 
61  if event_type == "person":
62  for person in event_data.get(ATTR_PERSONS, {}):
63  person_event_data = dict(event_data)
64  person_event_data[ATTR_ID] = person.get(ATTR_ID)
65  person_event_data[ATTR_NAME] = hass.data[DOMAIN][DATA_PERSONS][
66  event_data[ATTR_HOME_ID]
67  ].get(person_event_data[ATTR_ID], DEFAULT_PERSON)
68  person_event_data[ATTR_IS_KNOWN] = person.get(ATTR_IS_KNOWN)
69  person_event_data[ATTR_FACE_URL] = person.get(ATTR_FACE_URL)
70 
71  async_send_event(hass, event_type, person_event_data)
72 
73  else:
74  async_send_event(hass, event_type, event_data)
75 
76 
77 def async_send_event(hass: HomeAssistant, event_type: str, data: dict) -> None:
78  """Send events."""
79  _LOGGER.debug("%s: %s", event_type, data)
81  hass,
82  f"signal-{DOMAIN}-webhook-{event_type}",
83  {"type": event_type, "data": data},
84  )
85 
86  event_data = {
87  "type": event_type,
88  "data": data,
89  }
90 
91  if event_type in EVENT_ID_MAP:
92  data_device_id = data[EVENT_ID_MAP[event_type]]
93  event_data[ATTR_DEVICE_ID] = hass.data[DOMAIN][DATA_DEVICE_IDS].get(
94  data_device_id
95  )
96 
97  hass.bus.async_fire(
98  event_type=NETATMO_EVENT,
99  event_data=event_data,
100  )
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_evaluate_event(HomeAssistant hass, dict event_data)
Definition: webhook.py:57
None async_send_event(HomeAssistant hass, str event_type, dict data)
Definition: webhook.py:77
None async_handle_webhook(HomeAssistant hass, str webhook_id, Request request)
Definition: webhook.py:35
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193