1 """Axis network device entity loader.
3 Central point to load entities for the different platforms.
6 from __future__
import annotations
8 from typing
import TYPE_CHECKING
10 from axis.models.event
import Event, EventOperation, EventTopic
15 from ..entity
import AxisEventDescription, AxisEventEntity
18 from .hub
import AxisHub
22 """Axis network device integration handling platforms for entity registration."""
25 """Initialize the Axis entity loader."""
28 self.registered_events: set[tuple[str, EventTopic, str]] = set()
29 self.topic_to_entity: dict[
34 type[AxisEventEntity],
43 async_add_entities: AddEntitiesCallback,
44 entity_class: type[AxisEventEntity],
45 descriptions: tuple[AxisEventDescription, ...],
47 """Register Axis entity platforms."""
48 topics: tuple[EventTopic, ...]
49 for description
in descriptions:
50 if isinstance(description.event_topic, EventTopic):
51 topics = (description.event_topic,)
53 topics = description.event_topic
55 self.topic_to_entity.setdefault(topic, []).append(
56 (async_add_entities, entity_class, description)
61 """Create Axis entities from event."""
62 event_id = (event.topic, event.topic_base, event.id)
63 if event_id
in self.registered_events:
66 self.registered_events.
add(event_id)
71 )
in self.topic_to_entity[event.topic_base]:
72 if not description.supported_fn(self.
hubhub, event):
78 """Prepare event listener that can populate platform entities."""
79 self.
hubhub.api.event.subscribe(
81 topic_filter=
tuple(self.topic_to_entity.keys()),
82 operation_filter=EventOperation.INITIALIZED,
None register_platform(self, AddEntitiesCallback async_add_entities, type[AxisEventEntity] entity_class, tuple[AxisEventDescription,...] descriptions)
None __init__(self, AxisHub hub)
None initialize_platforms(self)
None _create_entities_from_event(self, Event event)
bool add(self, _T matcher)