1 """Entity manager for generic GeoJSON events."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import datetime
9 from aio_geojson_generic_client
import GenericFeedManager
10 from aio_geojson_generic_client.feed_entry
import GenericFeedEntry
20 DEFAULT_UPDATE_INTERVAL,
26 _LOGGER = logging.getLogger(__name__)
30 """Feed Entity Manager for GeoJSON feeds."""
35 config_entry: ConfigEntry,
37 """Initialize the GeoJSON Feed Manager."""
38 self._hass: HomeAssistant = hass
39 self.entry_id: str = config_entry.entry_id
40 websession = aiohttp_client.async_get_clientsession(hass)
41 self._feed_manager: GenericFeedManager = GenericFeedManager(
47 config_entry.data[CONF_LATITUDE],
48 config_entry.data[CONF_LONGITUDE],
50 config_entry.data[CONF_URL],
51 filter_radius=config_entry.data[CONF_RADIUS],
54 self.
listenerslisteners: list[Callable[[],
None]] = []
55 self.signal_new_entity: str = (
56 f
"{DOMAIN}_new_geolocation_{config_entry.entry_id}"
60 """Schedule initial and regular updates based on configured time interval."""
62 async
def update(event_time: datetime) ->
None:
68 self._hass, update, DEFAULT_UPDATE_INTERVAL
71 _LOGGER.debug(
"Feed entity manager initialized")
75 await self._feed_manager.
update()
76 _LOGGER.debug(
"Feed entity manager updated")
79 """Stop this feed entity manager from refreshing."""
80 for unsub_dispatcher
in self.
listenerslisteners:
85 _LOGGER.debug(
"Feed entity manager stopped")
87 def get_entry(self, external_id: str) -> GenericFeedEntry |
None:
88 """Get feed entry by external id."""
89 return self._feed_manager.feed_entries.get(external_id)
92 """Generate new entity."""
95 self.signal_new_entity,
None _generate_entity(self, str external_id)
None _update_entity(self, str external_id)
GenericFeedEntry|None get_entry(self, str external_id)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry)
_track_time_remove_callback
None _remove_entity(self, str external_id)
IssData update(pyiss.ISS iss)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)