Home Assistant Unofficial Reference 2024.12.1
event.py
Go to the documentation of this file.
1 """ONVIF event abstraction."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 import datetime as dt
8 
9 from aiohttp.web import Request
10 from httpx import RemoteProtocolError, RequestError, TransportError
11 from onvif import ONVIFCamera
12 from onvif.client import (
13  NotificationManager,
14  PullPointManager as ONVIFPullPointManager,
15  retry_connection_error,
16 )
17 from onvif.exceptions import ONVIFError
18 from onvif.util import stringify_onvif_error
19 from zeep.exceptions import Fault, ValidationError, XMLParseError
20 
21 from homeassistant.components import webhook
22 from homeassistant.config_entries import ConfigEntry
23 from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
24 from homeassistant.helpers.device_registry import format_mac
25 from homeassistant.helpers.event import async_call_later
26 from homeassistant.helpers.network import NoURLAvailableError, get_url
27 
28 from .const import DOMAIN, LOGGER
29 from .models import Event, PullPointManagerState, WebHookManagerState
30 from .parsers import PARSERS
31 
32 # Topics in this list are ignored because we do not want to create
33 # entities for them.
34 UNHANDLED_TOPICS: set[str] = {"tns1:MediaControl/VideoEncoderConfiguration"}
35 
36 SUBSCRIPTION_ERRORS = (Fault, TimeoutError, TransportError)
37 CREATE_ERRORS = (ONVIFError, Fault, RequestError, XMLParseError, ValidationError)
38 SET_SYNCHRONIZATION_POINT_ERRORS = (*SUBSCRIPTION_ERRORS, TypeError)
39 UNSUBSCRIBE_ERRORS = (XMLParseError, *SUBSCRIPTION_ERRORS)
40 RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS)
41 #
42 # We only keep the subscription alive for 10 minutes, and will keep
43 # renewing it every 8 minutes. This is to avoid the camera
44 # accumulating subscriptions which will be impossible to clean up
45 # since ONVIF does not provide a way to list existing subscriptions.
46 #
47 # If we max out the number of subscriptions, the camera will stop
48 # sending events to us, and we will not be able to recover until
49 # the subscriptions expire or the camera is rebooted.
50 #
51 SUBSCRIPTION_TIME = dt.timedelta(minutes=10)
52 
53 # SUBSCRIPTION_RENEW_INTERVAL Must be less than the
54 # overall timeout of 90 * (SUBSCRIPTION_ATTEMPTS) 2 = 180 seconds
55 #
56 # We use 8 minutes between renewals to make sure we never hit the
57 # 10 minute limit even if the first renewal attempt fails
58 SUBSCRIPTION_RENEW_INTERVAL = 8 * 60
59 
60 # The number of attempts to make when creating or renewing a subscription
61 SUBSCRIPTION_ATTEMPTS = 2
62 
63 # The time to wait before trying to restart the subscription if it fails
64 SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR = 60
65 
66 PULLPOINT_POLL_TIME = dt.timedelta(seconds=60)
67 PULLPOINT_MESSAGE_LIMIT = 100
68 PULLPOINT_COOLDOWN_TIME = 0.75
69 
70 
72  """ONVIF Event Manager."""
73 
74  def __init__(
75  self,
76  hass: HomeAssistant,
77  device: ONVIFCamera,
78  config_entry: ConfigEntry,
79  name: str,
80  ) -> None:
81  """Initialize event manager."""
82  self.hasshass = hass
83  self.devicedevice = device
84  self.config_entryconfig_entry = config_entry
85  self.unique_idunique_id = config_entry.unique_id
86  self.namename = name
87 
88  self.webhook_managerwebhook_manager = WebHookManager(self)
89  self.pullpoint_managerpullpoint_manager = PullPointManager(self)
90 
91  self._uid_by_platform: dict[str, set[str]] = {}
92  self._events: dict[str, Event] = {}
93  self._listeners_listeners: list[CALLBACK_TYPE] = []
94 
95  @property
96  def started(self) -> bool:
97  """Return True if event manager is started."""
98  return (
99  self.webhook_managerwebhook_manager.state == WebHookManagerState.STARTED
100  or self.pullpoint_managerpullpoint_manager.state == PullPointManagerState.STARTED
101  )
102 
103  @callback
104  def async_add_listener(self, update_callback: CALLBACK_TYPE) -> Callable[[], None]:
105  """Listen for data updates."""
106  # We always have to listen for events or we will never
107  # know which sensors to create. In practice we always have
108  # a listener anyways since binary_sensor and sensor will
109  # create a listener when they are created.
110  self._listeners_listeners.append(update_callback)
111 
112  @callback
113  def remove_listener() -> None:
114  """Remove update listener."""
115  self.async_remove_listenerasync_remove_listener(update_callback)
116 
117  return remove_listener
118 
119  @callback
120  def async_remove_listener(self, update_callback: CALLBACK_TYPE) -> None:
121  """Remove data update."""
122  if update_callback in self._listeners_listeners:
123  self._listeners_listeners.remove(update_callback)
124 
125  async def async_start(self, try_pullpoint: bool, try_webhook: bool) -> bool:
126  """Start polling events."""
127  # Always start pull point first, since it will populate the event list
128  event_via_pull_point = (
129  try_pullpoint and await self.pullpoint_managerpullpoint_manager.async_start()
130  )
131  events_via_webhook = try_webhook and await self.webhook_managerwebhook_manager.async_start()
132  return events_via_webhook or event_via_pull_point
133 
134  async def async_stop(self) -> None:
135  """Unsubscribe from events."""
136  self._listeners_listeners = []
137  await self.pullpoint_managerpullpoint_manager.async_stop()
138  await self.webhook_managerwebhook_manager.async_stop()
139 
140  @callback
141  def async_callback_listeners(self) -> None:
142  """Update listeners."""
143  for update_callback in self._listeners_listeners:
144  update_callback()
145 
146  async def async_parse_messages(self, messages) -> None:
147  """Parse notification message."""
148  unique_id = self.unique_idunique_id
149  assert unique_id is not None
150  for msg in messages:
151  # Guard against empty message
152  if not msg.Topic:
153  continue
154 
155  # Topic may look like the following
156  #
157  # tns1:RuleEngine/CellMotionDetector/Motion//.
158  # tns1:RuleEngine/CellMotionDetector/Motion
159  # tns1:RuleEngine/CellMotionDetector/Motion/
160  # tns1:UserAlarm/IVA/HumanShapeDetect
161  #
162  # Our parser expects the topic to be
163  # tns1:RuleEngine/CellMotionDetector/Motion
164  topic = msg.Topic._value_1.rstrip("/.") # noqa: SLF001
165 
166  if not (parser := PARSERS.get(topic)):
167  if topic not in UNHANDLED_TOPICS:
168  LOGGER.warning(
169  "%s: No registered handler for event from %s: %s",
170  self.namename,
171  unique_id,
172  msg,
173  )
174  UNHANDLED_TOPICS.add(topic)
175  continue
176 
177  event = await parser(unique_id, msg)
178 
179  if not event:
180  LOGGER.warning(
181  "%s: Unable to parse event from %s: %s", self.namename, unique_id, msg
182  )
183  return
184 
185  self.get_uids_by_platformget_uids_by_platform(event.platform).add(event.uid)
186  self._events[event.uid] = event
187 
188  def get_uid(self, uid: str) -> Event | None:
189  """Retrieve event for given id."""
190  return self._events.get(uid)
191 
192  def get_platform(self, platform) -> list[Event]:
193  """Retrieve events for given platform."""
194  return [event for event in self._events.values() if event.platform == platform]
195 
196  def get_uids_by_platform(self, platform: str) -> set[str]:
197  """Retrieve uids for a given platform."""
198  if (possible_uids := self._uid_by_platform.get(platform)) is None:
199  uids: set[str] = set()
200  self._uid_by_platform[platform] = uids
201  return uids
202  return possible_uids
203 
204  @callback
205  def async_webhook_failed(self) -> None:
206  """Mark webhook as failed."""
207  if self.pullpoint_managerpullpoint_manager.state != PullPointManagerState.PAUSED:
208  return
209  LOGGER.debug("%s: Switching to PullPoint for events", self.namename)
210  self.pullpoint_managerpullpoint_manager.async_resume()
211 
212  @callback
213  def async_webhook_working(self) -> None:
214  """Mark webhook as working."""
215  if self.pullpoint_managerpullpoint_manager.state != PullPointManagerState.STARTED:
216  return
217  LOGGER.debug("%s: Switching to webhook for events", self.namename)
218  self.pullpoint_managerpullpoint_manager.async_pause()
219 
220  @callback
221  def async_mark_events_stale(self) -> None:
222  """Mark all events as stale when the subscriptions fail since we are out of sync."""
223  self._events.clear()
224  self.async_callback_listenersasync_callback_listeners()
225 
226 
228  """ONVIF PullPoint Manager.
229 
230  If the camera supports webhooks and the webhook is reachable, the pullpoint
231  manager will keep the pull point subscription alive, but will not poll for
232  messages unless the webhook fails.
233  """
234 
235  def __init__(self, event_manager: EventManager) -> None:
236  """Initialize pullpoint manager."""
237  self.statestate = PullPointManagerState.STOPPED
238 
239  self._event_manager_event_manager = event_manager
240  self._device_device = event_manager.device
241  self._hass_hass = event_manager.hass
242  self._name_name = event_manager.name
243 
244  self._pullpoint_manager_pullpoint_manager: ONVIFPullPointManager | None = None
245 
246  self._cancel_pull_messages_cancel_pull_messages: CALLBACK_TYPE | None = None
247  self._pull_messages_job_pull_messages_job = HassJob(
248  self._async_background_pull_messages_or_reschedule_async_background_pull_messages_or_reschedule,
249  f"{self._name}: pull messages",
250  )
251  self._pull_messages_task_pull_messages_task: asyncio.Task[None] | None = None
252 
253  async def async_start(self) -> bool:
254  """Start pullpoint subscription."""
255  assert (
256  self.statestate == PullPointManagerState.STOPPED
257  ), "PullPoint manager already started"
258  LOGGER.debug("%s: Starting PullPoint manager", self._name_name)
259  if not await self._async_start_pullpoint_async_start_pullpoint():
260  self.statestate = PullPointManagerState.FAILED
261  return False
262  self.statestate = PullPointManagerState.STARTED
263  self.async_schedule_pull_messagesasync_schedule_pull_messages()
264  return True
265 
266  @callback
267  def async_pause(self) -> None:
268  """Pause pullpoint subscription."""
269  LOGGER.debug("%s: Pausing PullPoint manager", self._name_name)
270  self.statestate = PullPointManagerState.PAUSED
271  # Cancel the renew job so we don't renew the subscription
272  # and stop pulling messages.
273  self.async_cancel_pull_messagesasync_cancel_pull_messages()
274  if self._pullpoint_manager_pullpoint_manager:
275  self._pullpoint_manager_pullpoint_manager.pause()
276  # We do not unsubscribe from the pullpoint subscription and instead
277  # let the subscription expire since some cameras will terminate all
278  # subscriptions if we unsubscribe which will break the webhook.
279 
280  @callback
281  def async_resume(self) -> None:
282  """Resume pullpoint subscription."""
283  LOGGER.debug("%s: Resuming PullPoint manager", self._name_name)
284  self.statestate = PullPointManagerState.STARTED
285  if self._pullpoint_manager_pullpoint_manager:
286  self._pullpoint_manager_pullpoint_manager.resume()
287  self.async_schedule_pull_messagesasync_schedule_pull_messages()
288 
289  async def async_stop(self) -> None:
290  """Unsubscribe from PullPoint and cancel callbacks."""
291  self.statestate = PullPointManagerState.STOPPED
292  await self._async_cancel_and_unsubscribe_async_cancel_and_unsubscribe()
293 
294  async def _async_start_pullpoint(self) -> bool:
295  """Start pullpoint subscription."""
296  try:
297  await self._async_create_pullpoint_subscription_async_create_pullpoint_subscription()
298  except CREATE_ERRORS as err:
299  LOGGER.debug(
300  "%s: Device does not support PullPoint service or has too many subscriptions: %s",
301  self._name_name,
303  )
304  return False
305  return True
306 
307  async def _async_cancel_and_unsubscribe(self) -> None:
308  """Cancel and unsubscribe from PullPoint."""
309  self.async_cancel_pull_messagesasync_cancel_pull_messages()
310  if self._pull_messages_task_pull_messages_task:
311  self._pull_messages_task_pull_messages_task.cancel()
312  await self._async_unsubscribe_pullpoint_async_unsubscribe_pullpoint()
313 
314  @retry_connection_error(SUBSCRIPTION_ATTEMPTS)
315  async def _async_create_pullpoint_subscription(self) -> None:
316  """Create pullpoint subscription."""
317  self._pullpoint_manager_pullpoint_manager = await self._device_device.create_pullpoint_manager(
318  SUBSCRIPTION_TIME, self._event_manager_event_manager.async_mark_events_stale
319  )
320  await self._pullpoint_manager_pullpoint_manager.set_synchronization_point()
321 
322  async def _async_unsubscribe_pullpoint(self) -> None:
323  """Unsubscribe the pullpoint subscription."""
324  if not self._pullpoint_manager_pullpoint_manager or self._pullpoint_manager_pullpoint_manager.closed:
325  return
326  LOGGER.debug("%s: Unsubscribing from PullPoint", self._name_name)
327  try:
328  await self._pullpoint_manager_pullpoint_manager.shutdown()
329  except UNSUBSCRIBE_ERRORS as err:
330  LOGGER.debug(
331  (
332  "%s: Failed to unsubscribe PullPoint subscription;"
333  " This is normal if the device restarted: %s"
334  ),
335  self._name_name,
337  )
338  self._pullpoint_manager_pullpoint_manager = None
339 
340  async def _async_pull_messages(self) -> None:
341  """Pull messages from device."""
342  if self._pullpoint_manager_pullpoint_manager is None:
343  return
344  service = self._pullpoint_manager_pullpoint_manager.get_service()
345  LOGGER.debug(
346  "%s: Pulling PullPoint messages timeout=%s limit=%s",
347  self._name_name,
348  PULLPOINT_POLL_TIME,
349  PULLPOINT_MESSAGE_LIMIT,
350  )
351  next_pull_delay = None
352  response = None
353  try:
354  if self._hass_hass.is_running:
355  response = await service.PullMessages(
356  {
357  "MessageLimit": PULLPOINT_MESSAGE_LIMIT,
358  "Timeout": PULLPOINT_POLL_TIME,
359  }
360  )
361  else:
362  LOGGER.debug(
363  "%s: PullPoint skipped because Home Assistant is not running yet",
364  self._name_name,
365  )
366  except RemoteProtocolError as err:
367  # Either a shutdown event or the camera closed the connection. Because
368  # http://datatracker.ietf.org/doc/html/rfc2616#section-8.1.4 allows the server
369  # to close the connection at any time, we treat this as a normal. Some
370  # cameras may close the connection if there are no messages to pull.
371  LOGGER.debug(
372  "%s: PullPoint subscription encountered a remote protocol error "
373  "(this is normal for some cameras): %s",
374  self._name_name,
376  )
377  except Fault as err:
378  # Device may not support subscriptions so log at debug level
379  # when we get an XMLParseError
380  LOGGER.debug(
381  "%s: Failed to fetch PullPoint subscription messages: %s",
382  self._name_name,
384  )
385  # Treat errors as if the camera restarted. Assume that the pullpoint
386  # subscription is no longer valid.
387  self._pullpoint_manager_pullpoint_manager.resume()
388  except (XMLParseError, RequestError, TimeoutError, TransportError) as err:
389  LOGGER.debug(
390  "%s: PullPoint subscription encountered an unexpected error and will be retried "
391  "(this is normal for some cameras): %s",
392  self._name_name,
394  )
395  # Avoid renewing the subscription too often since it causes problems
396  # for some cameras, mainly the Tapo ones.
397  next_pull_delay = SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR
398  finally:
399  self.async_schedule_pull_messagesasync_schedule_pull_messages(next_pull_delay)
400 
401  if self.statestate != PullPointManagerState.STARTED:
402  # If the webhook became started working during the long poll,
403  # and we got paused, our data is stale and we should not process it.
404  LOGGER.debug(
405  "%s: PullPoint state is %s (likely due to working webhook), skipping PullPoint messages",
406  self._name_name,
407  self.statestate,
408  )
409  return
410 
411  if not response:
412  return
413 
414  # Parse response
415  event_manager = self._event_manager_event_manager
416  if (notification_message := response.NotificationMessage) and (
417  number_of_events := len(notification_message)
418  ):
419  LOGGER.debug(
420  "%s: continuous PullMessages: %s event(s)",
421  self._name_name,
422  number_of_events,
423  )
424  await event_manager.async_parse_messages(notification_message)
425  event_manager.async_callback_listeners()
426  else:
427  LOGGER.debug("%s: continuous PullMessages: no events", self._name_name)
428 
429  @callback
430  def async_cancel_pull_messages(self) -> None:
431  """Cancel the PullPoint task."""
432  if self._cancel_pull_messages_cancel_pull_messages:
433  self._cancel_pull_messages_cancel_pull_messages()
434  self._cancel_pull_messages_cancel_pull_messages = None
435 
436  @callback
437  def async_schedule_pull_messages(self, delay: float | None = None) -> None:
438  """Schedule async_pull_messages to run.
439 
440  Used as fallback when webhook is not working.
441 
442  Must not check if the webhook is working.
443  """
444  self.async_cancel_pull_messagesasync_cancel_pull_messages()
445  if self.statestate != PullPointManagerState.STARTED:
446  return
447  if self._pullpoint_manager_pullpoint_manager:
448  when = delay if delay is not None else PULLPOINT_COOLDOWN_TIME
449  self._cancel_pull_messages_cancel_pull_messages = async_call_later(
450  self._hass_hass, when, self._pull_messages_job_pull_messages_job
451  )
452 
453  @callback
455  self, _now: dt.datetime | None = None
456  ) -> None:
457  """Pull messages from device in the background."""
458  if self._pull_messages_task_pull_messages_task and not self._pull_messages_task_pull_messages_task.done():
459  LOGGER.debug(
460  "%s: PullPoint message pull is already in process, skipping pull",
461  self._name_name,
462  )
463  self.async_schedule_pull_messagesasync_schedule_pull_messages()
464  return
465  self._pull_messages_task_pull_messages_task = self._hass_hass.async_create_background_task(
466  self._async_pull_messages_async_pull_messages(),
467  f"{self._name} background pull messages",
468  )
469 
470 
472  """Manage ONVIF webhook subscriptions.
473 
474  If the camera supports webhooks, we will use that instead of
475  pullpoint subscriptions as soon as we detect that the camera
476  can reach our webhook.
477  """
478 
479  def __init__(self, event_manager: EventManager) -> None:
480  """Initialize webhook manager."""
481  self.statestate = WebHookManagerState.STOPPED
482 
483  self._event_manager_event_manager = event_manager
484  self._device_device = event_manager.device
485  self._hass_hass = event_manager.hass
486  config_entry = event_manager.config_entry
487 
488  self._old_webhook_unique_id_old_webhook_unique_id = f"{DOMAIN}_{config_entry.entry_id}"
489  # Some cameras have a limit on the length of the webhook URL
490  # so we use a shorter unique ID for the webhook.
491  unique_id = config_entry.unique_id
492  assert unique_id is not None
493  webhook_id = format_mac(unique_id).replace(":", "").lower()
494  self._webhook_unique_id_webhook_unique_id = f"{DOMAIN}{webhook_id}"
495  self._name_name = event_manager.name
496 
497  self._webhook_url_webhook_url: str | None = None
498 
499  self._notification_manager_notification_manager: NotificationManager | None = None
500 
501  async def async_start(self) -> bool:
502  """Start polling events."""
503  LOGGER.debug("%s: Starting webhook manager", self._name_name)
504  assert (
505  self.statestate == WebHookManagerState.STOPPED
506  ), "Webhook manager already started"
507  assert self._webhook_url_webhook_url is None, "Webhook already registered"
508  self._async_register_webhook_async_register_webhook()
509  if not await self._async_start_webhook_async_start_webhook():
510  self.statestate = WebHookManagerState.FAILED
511  return False
512  self.statestate = WebHookManagerState.STARTED
513  return True
514 
515  async def async_stop(self) -> None:
516  """Unsubscribe from events."""
517  self.statestate = WebHookManagerState.STOPPED
518  await self._async_unsubscribe_webhook_async_unsubscribe_webhook()
519  self._async_unregister_webhook_async_unregister_webhook()
520 
521  @retry_connection_error(SUBSCRIPTION_ATTEMPTS)
522  async def _async_create_webhook_subscription(self) -> None:
523  """Create webhook subscription."""
524  LOGGER.debug(
525  "%s: Creating webhook subscription with URL: %s",
526  self._name_name,
527  self._webhook_url_webhook_url,
528  )
529  try:
530  self._notification_manager_notification_manager = await self._device_device.create_notification_manager(
531  address=self._webhook_url_webhook_url,
532  interval=SUBSCRIPTION_TIME,
533  subscription_lost_callback=self._event_manager_event_manager.async_mark_events_stale,
534  )
535  except ValidationError as err:
536  # This should only happen if there is a problem with the webhook URL
537  # that is causing it to not be well formed.
538  LOGGER.exception(
539  "%s: validation error while creating webhook subscription: %s",
540  self._name_name,
541  err,
542  )
543  raise
544  await self._notification_manager_notification_manager.set_synchronization_point()
545  LOGGER.debug(
546  "%s: Webhook subscription created with URL: %s",
547  self._name_name,
548  self._webhook_url_webhook_url,
549  )
550 
551  async def _async_start_webhook(self) -> bool:
552  """Start webhook."""
553  try:
554  await self._async_create_webhook_subscription_async_create_webhook_subscription()
555  except CREATE_ERRORS as err:
556  self._event_manager_event_manager.async_webhook_failed()
557  LOGGER.debug(
558  "%s: Device does not support notification service or too many subscriptions: %s",
559  self._name_name,
561  )
562  return False
563  return True
564 
565  @callback
566  def _async_register_webhook(self) -> None:
567  """Register the webhook for motion events."""
568  LOGGER.debug("%s: Registering webhook: %s", self._name_name, self._webhook_unique_id_webhook_unique_id)
569 
570  try:
571  base_url = get_url(self._hass_hass, prefer_external=False)
572  except NoURLAvailableError:
573  try:
574  base_url = get_url(self._hass_hass, prefer_external=True)
575  except NoURLAvailableError:
576  return
577 
578  webhook_id = self._webhook_unique_id_webhook_unique_id
579  self._async_unregister_webhook_async_unregister_webhook()
580  webhook.async_register(
581  self._hass_hass, DOMAIN, webhook_id, webhook_id, self._async_handle_webhook_async_handle_webhook
582  )
583  webhook_path = webhook.async_generate_path(webhook_id)
584  self._webhook_url_webhook_url = f"{base_url}{webhook_path}"
585  LOGGER.debug("%s: Registered webhook: %s", self._name_name, webhook_id)
586 
587  @callback
589  """Unregister the webhook for motion events."""
590  LOGGER.debug(
591  "%s: Unregistering webhook %s", self._name_name, self._webhook_unique_id_webhook_unique_id
592  )
593  webhook.async_unregister(self._hass_hass, self._old_webhook_unique_id_old_webhook_unique_id)
594  webhook.async_unregister(self._hass_hass, self._webhook_unique_id_webhook_unique_id)
595  self._webhook_url_webhook_url = None
596 
598  self, hass: HomeAssistant, webhook_id: str, request: Request
599  ) -> None:
600  """Handle incoming webhook."""
601  content: bytes | None = None
602  try:
603  content = await request.read()
604  except ConnectionResetError as ex:
605  LOGGER.error("Error reading webhook: %s", ex)
606  return
607  except asyncio.CancelledError as ex:
608  LOGGER.error("Error reading webhook: %s", ex)
609  raise
610  finally:
611  self._hass_hass.async_create_background_task(
612  self._async_process_webhook_async_process_webhook(hass, webhook_id, content),
613  f"ONVIF event webhook for {self._name}",
614  )
615 
617  self, hass: HomeAssistant, webhook_id: str, content: bytes | None
618  ) -> None:
619  """Process incoming webhook data in the background."""
620  event_manager = self._event_manager_event_manager
621  if content is None:
622  # webhook is marked as not working as something
623  # went wrong. We will mark it as working again
624  # when we receive a valid notification.
625  event_manager.async_webhook_failed()
626  return
627  if not self._notification_manager_notification_manager:
628  LOGGER.debug(
629  "%s: Received webhook before notification manager is setup", self._name_name
630  )
631  return
632  if not (result := self._notification_manager_notification_manager.process(content)):
633  LOGGER.debug("%s: Failed to process webhook %s", self._name_name, webhook_id)
634  return
635  LOGGER.debug(
636  "%s: Processed webhook %s with %s event(s)",
637  self._name_name,
638  webhook_id,
639  len(result.NotificationMessage),
640  )
641  event_manager.async_webhook_working()
642  await event_manager.async_parse_messages(result.NotificationMessage)
643  event_manager.async_callback_listeners()
644 
645  async def _async_unsubscribe_webhook(self) -> None:
646  """Unsubscribe from the webhook."""
647  if not self._notification_manager_notification_manager or self._notification_manager_notification_manager.closed:
648  return
649  LOGGER.debug("%s: Unsubscribing from webhook", self._name_name)
650  try:
651  await self._notification_manager_notification_manager.shutdown()
652  except UNSUBSCRIBE_ERRORS as err:
653  LOGGER.debug(
654  (
655  "%s: Failed to unsubscribe webhook subscription;"
656  " This is normal if the device restarted: %s"
657  ),
658  self._name_name,
660  )
661  self._notification_manager_notification_manager = None
None async_remove_listener(self, CALLBACK_TYPE update_callback)
Definition: event.py:120
None __init__(self, HomeAssistant hass, ONVIFCamera device, ConfigEntry config_entry, str name)
Definition: event.py:80
list[Event] get_platform(self, platform)
Definition: event.py:192
bool async_start(self, bool try_pullpoint, bool try_webhook)
Definition: event.py:125
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback)
Definition: event.py:104
set[str] get_uids_by_platform(self, str platform)
Definition: event.py:196
None async_schedule_pull_messages(self, float|None delay=None)
Definition: event.py:437
None __init__(self, EventManager event_manager)
Definition: event.py:235
None _async_background_pull_messages_or_reschedule(self, dt.datetime|None _now=None)
Definition: event.py:456
None _async_handle_webhook(self, HomeAssistant hass, str webhook_id, Request request)
Definition: event.py:599
None __init__(self, EventManager event_manager)
Definition: event.py:479
None _async_process_webhook(self, HomeAssistant hass, str webhook_id, bytes|None content)
Definition: event.py:618
AppriseNotificationService|None get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:39
bool add(self, _T matcher)
Definition: match.py:185
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
str stringify_onvif_error(Exception error)
Definition: util.py:17
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597
str get_url(HomeAssistant hass, *bool require_current_request=False, bool require_ssl=False, bool require_standard_port=False, bool require_cloud=False, bool allow_internal=True, bool allow_external=True, bool allow_cloud=True, bool|None allow_ip=None, bool|None prefer_external=None, bool prefer_cloud=False)
Definition: network.py:131