1 """ONVIF event abstraction."""
3 from __future__
import annotations
6 from collections.abc
import Callable
9 from aiohttp.web
import Request
10 from httpx
import RemoteProtocolError, RequestError, TransportError
11 from onvif
import ONVIFCamera
12 from onvif.client
import (
14 PullPointManager
as ONVIFPullPointManager,
15 retry_connection_error,
17 from onvif.exceptions
import ONVIFError
18 from onvif.util
import stringify_onvif_error
19 from zeep.exceptions
import Fault, ValidationError, XMLParseError
28 from .const
import DOMAIN, LOGGER
29 from .models
import Event, PullPointManagerState, WebHookManagerState
30 from .parsers
import PARSERS
34 UNHANDLED_TOPICS: set[str] = {
"tns1:MediaControl/VideoEncoderConfiguration"}
36 SUBSCRIPTION_ERRORS = (Fault, TimeoutError, TransportError)
37 CREATE_ERRORS = (ONVIFError, Fault, RequestError, XMLParseError, ValidationError)
38 SET_SYNCHRONIZATION_POINT_ERRORS = (*SUBSCRIPTION_ERRORS, TypeError)
39 UNSUBSCRIBE_ERRORS = (XMLParseError, *SUBSCRIPTION_ERRORS)
40 RENEW_ERRORS = (ONVIFError, RequestError, XMLParseError, *SUBSCRIPTION_ERRORS)
51 SUBSCRIPTION_TIME = dt.timedelta(minutes=10)
58 SUBSCRIPTION_RENEW_INTERVAL = 8 * 60
61 SUBSCRIPTION_ATTEMPTS = 2
64 SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR = 60
66 PULLPOINT_POLL_TIME = dt.timedelta(seconds=60)
67 PULLPOINT_MESSAGE_LIMIT = 100
68 PULLPOINT_COOLDOWN_TIME = 0.75
72 """ONVIF Event Manager."""
78 config_entry: ConfigEntry,
81 """Initialize event manager."""
91 self._uid_by_platform: dict[str, set[str]] = {}
92 self._events: dict[str, Event] = {}
93 self.
_listeners_listeners: list[CALLBACK_TYPE] = []
97 """Return True if event manager is started."""
99 self.
webhook_managerwebhook_manager.state == WebHookManagerState.STARTED
100 or self.
pullpoint_managerpullpoint_manager.state == PullPointManagerState.STARTED
105 """Listen for data updates."""
110 self.
_listeners_listeners.append(update_callback)
113 def remove_listener() -> None:
114 """Remove update listener."""
117 return remove_listener
121 """Remove data update."""
122 if update_callback
in self.
_listeners_listeners:
125 async
def async_start(self, try_pullpoint: bool, try_webhook: bool) -> bool:
126 """Start polling events."""
128 event_via_pull_point = (
132 return events_via_webhook
or event_via_pull_point
135 """Unsubscribe from events."""
142 """Update listeners."""
143 for update_callback
in self.
_listeners_listeners:
147 """Parse notification message."""
149 assert unique_id
is not None
164 topic = msg.Topic._value_1.rstrip(
"/.")
166 if not (parser := PARSERS.get(topic)):
167 if topic
not in UNHANDLED_TOPICS:
169 "%s: No registered handler for event from %s: %s",
174 UNHANDLED_TOPICS.add(topic)
177 event = await parser(unique_id, msg)
181 "%s: Unable to parse event from %s: %s", self.
namename, unique_id, msg
186 self._events[event.uid] = event
189 """Retrieve event for given id."""
190 return self._events.
get(uid)
193 """Retrieve events for given platform."""
194 return [event
for event
in self._events.values()
if event.platform == platform]
197 """Retrieve uids for a given platform."""
198 if (possible_uids := self._uid_by_platform.
get(platform))
is None:
199 uids: set[str] = set()
200 self._uid_by_platform[platform] = uids
206 """Mark webhook as failed."""
207 if self.
pullpoint_managerpullpoint_manager.state != PullPointManagerState.PAUSED:
209 LOGGER.debug(
"%s: Switching to PullPoint for events", self.
namename)
214 """Mark webhook as working."""
215 if self.
pullpoint_managerpullpoint_manager.state != PullPointManagerState.STARTED:
217 LOGGER.debug(
"%s: Switching to webhook for events", self.
namename)
222 """Mark all events as stale when the subscriptions fail since we are out of sync."""
228 """ONVIF PullPoint Manager.
230 If the camera supports webhooks and the webhook is reachable, the pullpoint
231 manager will keep the pull point subscription alive, but will not poll for
232 messages unless the webhook fails.
235 def __init__(self, event_manager: EventManager) ->
None:
236 """Initialize pullpoint manager."""
237 self.
statestate = PullPointManagerState.STOPPED
241 self.
_hass_hass = event_manager.hass
242 self.
_name_name = event_manager.name
249 f
"{self._name}: pull messages",
254 """Start pullpoint subscription."""
256 self.
statestate == PullPointManagerState.STOPPED
257 ),
"PullPoint manager already started"
258 LOGGER.debug(
"%s: Starting PullPoint manager", self.
_name_name)
260 self.
statestate = PullPointManagerState.FAILED
262 self.
statestate = PullPointManagerState.STARTED
268 """Pause pullpoint subscription."""
269 LOGGER.debug(
"%s: Pausing PullPoint manager", self.
_name_name)
270 self.
statestate = PullPointManagerState.PAUSED
282 """Resume pullpoint subscription."""
283 LOGGER.debug(
"%s: Resuming PullPoint manager", self.
_name_name)
284 self.
statestate = PullPointManagerState.STARTED
290 """Unsubscribe from PullPoint and cancel callbacks."""
291 self.
statestate = PullPointManagerState.STOPPED
295 """Start pullpoint subscription."""
298 except CREATE_ERRORS
as err:
300 "%s: Device does not support PullPoint service or has too many subscriptions: %s",
308 """Cancel and unsubscribe from PullPoint."""
314 @retry_connection_error(SUBSCRIPTION_ATTEMPTS)
316 """Create pullpoint subscription."""
318 SUBSCRIPTION_TIME, self.
_event_manager_event_manager.async_mark_events_stale
323 """Unsubscribe the pullpoint subscription."""
326 LOGGER.debug(
"%s: Unsubscribing from PullPoint", self.
_name_name)
329 except UNSUBSCRIBE_ERRORS
as err:
332 "%s: Failed to unsubscribe PullPoint subscription;"
333 " This is normal if the device restarted: %s"
341 """Pull messages from device."""
346 "%s: Pulling PullPoint messages timeout=%s limit=%s",
349 PULLPOINT_MESSAGE_LIMIT,
351 next_pull_delay =
None
354 if self.
_hass_hass.is_running:
355 response = await service.PullMessages(
357 "MessageLimit": PULLPOINT_MESSAGE_LIMIT,
358 "Timeout": PULLPOINT_POLL_TIME,
363 "%s: PullPoint skipped because Home Assistant is not running yet",
366 except RemoteProtocolError
as err:
372 "%s: PullPoint subscription encountered a remote protocol error "
373 "(this is normal for some cameras): %s",
381 "%s: Failed to fetch PullPoint subscription messages: %s",
388 except (XMLParseError, RequestError, TimeoutError, TransportError)
as err:
390 "%s: PullPoint subscription encountered an unexpected error and will be retried "
391 "(this is normal for some cameras): %s",
397 next_pull_delay = SUBSCRIPTION_RESTART_INTERVAL_ON_ERROR
401 if self.
statestate != PullPointManagerState.STARTED:
405 "%s: PullPoint state is %s (likely due to working webhook), skipping PullPoint messages",
416 if (notification_message := response.NotificationMessage)
and (
417 number_of_events := len(notification_message)
420 "%s: continuous PullMessages: %s event(s)",
424 await event_manager.async_parse_messages(notification_message)
425 event_manager.async_callback_listeners()
427 LOGGER.debug(
"%s: continuous PullMessages: no events", self.
_name_name)
431 """Cancel the PullPoint task."""
438 """Schedule async_pull_messages to run.
440 Used as fallback when webhook is not working.
442 Must not check if the webhook is working.
445 if self.
statestate != PullPointManagerState.STARTED:
448 when = delay
if delay
is not None else PULLPOINT_COOLDOWN_TIME
455 self, _now: dt.datetime |
None =
None
457 """Pull messages from device in the background."""
460 "%s: PullPoint message pull is already in process, skipping pull",
467 f
"{self._name} background pull messages",
472 """Manage ONVIF webhook subscriptions.
474 If the camera supports webhooks, we will use that instead of
475 pullpoint subscriptions as soon as we detect that the camera
476 can reach our webhook.
479 def __init__(self, event_manager: EventManager) ->
None:
480 """Initialize webhook manager."""
481 self.
statestate = WebHookManagerState.STOPPED
485 self.
_hass_hass = event_manager.hass
486 config_entry = event_manager.config_entry
491 unique_id = config_entry.unique_id
492 assert unique_id
is not None
493 webhook_id =
format_mac(unique_id).replace(
":",
"").lower()
495 self.
_name_name = event_manager.name
502 """Start polling events."""
503 LOGGER.debug(
"%s: Starting webhook manager", self.
_name_name)
505 self.
statestate == WebHookManagerState.STOPPED
506 ),
"Webhook manager already started"
507 assert self.
_webhook_url_webhook_url
is None,
"Webhook already registered"
510 self.
statestate = WebHookManagerState.FAILED
512 self.
statestate = WebHookManagerState.STARTED
516 """Unsubscribe from events."""
517 self.
statestate = WebHookManagerState.STOPPED
521 @retry_connection_error(SUBSCRIPTION_ATTEMPTS)
523 """Create webhook subscription."""
525 "%s: Creating webhook subscription with URL: %s",
532 interval=SUBSCRIPTION_TIME,
533 subscription_lost_callback=self.
_event_manager_event_manager.async_mark_events_stale,
535 except ValidationError
as err:
539 "%s: validation error while creating webhook subscription: %s",
546 "%s: Webhook subscription created with URL: %s",
555 except CREATE_ERRORS
as err:
558 "%s: Device does not support notification service or too many subscriptions: %s",
567 """Register the webhook for motion events."""
571 base_url =
get_url(self.
_hass_hass, prefer_external=
False)
572 except NoURLAvailableError:
574 base_url =
get_url(self.
_hass_hass, prefer_external=
True)
575 except NoURLAvailableError:
580 webhook.async_register(
583 webhook_path = webhook.async_generate_path(webhook_id)
585 LOGGER.debug(
"%s: Registered webhook: %s", self.
_name_name, webhook_id)
589 """Unregister the webhook for motion events."""
598 self, hass: HomeAssistant, webhook_id: str, request: Request
600 """Handle incoming webhook."""
601 content: bytes |
None =
None
603 content = await request.read()
604 except ConnectionResetError
as ex:
605 LOGGER.error(
"Error reading webhook: %s", ex)
607 except asyncio.CancelledError
as ex:
608 LOGGER.error(
"Error reading webhook: %s", ex)
611 self.
_hass_hass.async_create_background_task(
613 f
"ONVIF event webhook for {self._name}",
617 self, hass: HomeAssistant, webhook_id: str, content: bytes |
None
619 """Process incoming webhook data in the background."""
625 event_manager.async_webhook_failed()
629 "%s: Received webhook before notification manager is setup", self.
_name_name
633 LOGGER.debug(
"%s: Failed to process webhook %s", self.
_name_name, webhook_id)
636 "%s: Processed webhook %s with %s event(s)",
639 len(result.NotificationMessage),
641 event_manager.async_webhook_working()
642 await event_manager.async_parse_messages(result.NotificationMessage)
643 event_manager.async_callback_listeners()
646 """Unsubscribe from the webhook."""
649 LOGGER.debug(
"%s: Unsubscribing from webhook", self.
_name_name)
652 except UNSUBSCRIBE_ERRORS
as err:
655 "%s: Failed to unsubscribe webhook subscription;"
656 " This is normal if the device restarted: %s"
None async_mark_events_stale(self)
None async_parse_messages(self, messages)
None async_remove_listener(self, CALLBACK_TYPE update_callback)
None __init__(self, HomeAssistant hass, ONVIFCamera device, ConfigEntry config_entry, str name)
list[Event] get_platform(self, platform)
bool async_start(self, bool try_pullpoint, bool try_webhook)
None async_webhook_failed(self)
Event|None get_uid(self, str uid)
None async_callback_listeners(self)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback)
None async_webhook_working(self)
set[str] get_uids_by_platform(self, str platform)
None async_schedule_pull_messages(self, float|None delay=None)
None _async_create_pullpoint_subscription(self)
None _async_unsubscribe_pullpoint(self)
bool _async_start_pullpoint(self)
None _async_cancel_and_unsubscribe(self)
None __init__(self, EventManager event_manager)
None _async_pull_messages(self)
None async_cancel_pull_messages(self)
None _async_background_pull_messages_or_reschedule(self, dt.datetime|None _now=None)
None _async_create_webhook_subscription(self)
None _async_handle_webhook(self, HomeAssistant hass, str webhook_id, Request request)
def _async_unregister_webhook(self)
None _async_unsubscribe_webhook(self)
None _async_register_webhook(self)
bool _async_start_webhook(self)
None __init__(self, EventManager event_manager)
None _async_process_webhook(self, HomeAssistant hass, str webhook_id, bytes|None content)
AppriseNotificationService|None get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
bool add(self, _T matcher)
bool remove(self, _T matcher)
web.Response get(self, web.Request request, str config_key)
str stringify_onvif_error(Exception error)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
str get_url(HomeAssistant hass, *bool require_current_request=False, bool require_ssl=False, bool require_standard_port=False, bool require_cloud=False, bool allow_internal=True, bool allow_external=True, bool allow_cloud=True, bool|None allow_ip=None, bool|None prefer_external=None, bool prefer_cloud=False)