1 """The Netatmo integration."""
5 from aiohttp.web
import Request
25 _LOGGER = logging.getLogger(__name__)
34 hass: HomeAssistant, webhook_id: str, request: Request
36 """Handle webhook callback."""
38 data = await request.json()
39 except ValueError
as err:
40 _LOGGER.error(
"Error in data: %s", err)
43 _LOGGER.debug(
"Got webhook data: %s", data)
45 event_type = data.get(ATTR_EVENT_TYPE)
47 if event_type
in SUBEVENT_TYPE_MAP:
50 for event_data
in data.get(SUBEVENT_TYPE_MAP[event_type], []):
58 """Evaluate events from webhook."""
59 event_type = event_data.get(ATTR_EVENT_TYPE,
"None")
61 if event_type ==
"person":
62 for person
in event_data.get(ATTR_PERSONS, {}):
63 person_event_data =
dict(event_data)
64 person_event_data[ATTR_ID] = person.get(ATTR_ID)
65 person_event_data[ATTR_NAME] = hass.data[DOMAIN][DATA_PERSONS][
66 event_data[ATTR_HOME_ID]
67 ].
get(person_event_data[ATTR_ID], DEFAULT_PERSON)
68 person_event_data[ATTR_IS_KNOWN] = person.get(ATTR_IS_KNOWN)
69 person_event_data[ATTR_FACE_URL] = person.get(ATTR_FACE_URL)
79 _LOGGER.debug(
"%s: %s", event_type, data)
82 f
"signal-{DOMAIN}-webhook-{event_type}",
83 {
"type": event_type,
"data": data},
91 if event_type
in EVENT_ID_MAP:
92 data_device_id = data[EVENT_ID_MAP[event_type]]
93 event_data[ATTR_DEVICE_ID] = hass.data[DOMAIN][DATA_DEVICE_IDS].
get(
98 event_type=NETATMO_EVENT,
99 event_data=event_data,
web.Response get(self, web.Request request, str config_key)
None async_evaluate_event(HomeAssistant hass, dict event_data)
None async_send_event(HomeAssistant hass, str event_type, dict data)
None async_handle_webhook(HomeAssistant hass, str webhook_id, Request request)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)