1 """Support for NSW Rural Fire Service Feeds."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import datetime, timedelta
10 from aio_geojson_nsw_rfs_incidents
import NswRuralFireServiceIncidentsFeedManager
11 from aio_geojson_nsw_rfs_incidents.feed_entry
import (
12 NswRuralFireServiceIncidentsFeedEntry,
14 import voluptuous
as vol
17 PLATFORM_SCHEMA
as GEO_LOCATION_PLATFORM_SCHEMA,
26 EVENT_HOMEASSISTANT_START,
27 EVENT_HOMEASSISTANT_STOP,
33 async_dispatcher_connect,
34 async_dispatcher_send,
40 _LOGGER = logging.getLogger(__name__)
42 ATTR_CATEGORY =
"category"
43 ATTR_COUNCIL_AREA =
"council_area"
44 ATTR_EXTERNAL_ID =
"external_id"
46 ATTR_PUBLICATION_DATE =
"publication_date"
47 ATTR_RESPONSIBLE_AGENCY =
"responsible_agency"
49 ATTR_STATUS =
"status"
52 CONF_CATEGORIES =
"categories"
54 DEFAULT_RADIUS_IN_KM = 20.0
58 SIGNAL_DELETE_ENTITY =
"nsw_rural_fire_service_feed_delete_{}"
59 SIGNAL_UPDATE_ENTITY =
"nsw_rural_fire_service_feed_update_{}"
61 SOURCE =
"nsw_rural_fire_service_feed"
63 VALID_CATEGORIES = [
"Advice",
"Emergency Warning",
"Not Applicable",
"Watch and Act"]
65 PLATFORM_SCHEMA = GEO_LOCATION_PLATFORM_SCHEMA.extend(
67 vol.Optional(CONF_CATEGORIES, default=[]): vol.All(
68 cv.ensure_list, [vol.In(VALID_CATEGORIES)]
70 vol.Optional(CONF_LATITUDE): cv.latitude,
71 vol.Optional(CONF_LONGITUDE): cv.longitude,
72 vol.Optional(CONF_RADIUS, default=DEFAULT_RADIUS_IN_KM): vol.Coerce(float),
80 async_add_entities: AddEntitiesCallback,
81 discovery_info: DiscoveryInfoType |
None =
None,
83 """Set up the NSW Rural Fire Service Feed platform."""
84 scan_interval: timedelta = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
85 coordinates: tuple[float, float] = (
86 config.get(CONF_LATITUDE, hass.config.latitude),
87 config.get(CONF_LONGITUDE, hass.config.longitude),
89 radius_in_km: float = config[CONF_RADIUS]
90 categories: list[str] = config[CONF_CATEGORIES]
93 hass, async_add_entities, scan_interval, coordinates, radius_in_km, categories
96 async
def start_feed_manager(event: Event) ->
None:
97 """Start feed manager."""
98 await manager.async_init()
100 async
def stop_feed_manager(event: Event) ->
None:
101 """Stop feed manager."""
102 await manager.async_stop()
104 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, start_feed_manager)
105 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_feed_manager)
106 hass.async_create_task(manager.async_update())
110 """Feed Entity Manager for NSW Rural Fire Service GeoJSON feed."""
115 async_add_entities: AddEntitiesCallback,
116 scan_interval: timedelta,
117 coordinates: tuple[float, float],
119 categories: list[str],
121 """Initialize the Feed Entity Manager."""
123 websession = aiohttp_client.async_get_clientsession(hass)
130 filter_radius=radius_in_km,
131 filter_categories=categories,
138 """Schedule initial and regular updates based on configured time interval."""
140 async
def update(event_time: datetime) ->
None:
149 _LOGGER.debug(
"Feed entity manager initialized")
154 _LOGGER.debug(
"Feed entity manager updated")
157 """Stop this feed entity manager from refreshing."""
160 _LOGGER.debug(
"Feed entity manager stopped")
163 self, external_id: str
164 ) -> NswRuralFireServiceIncidentsFeedEntry |
None:
165 """Get feed entry by external id."""
166 return self.
_feed_manager_feed_manager.feed_entries.get(external_id)
169 """Generate new entity."""
184 """Represents an external event with NSW Rural Fire Service data."""
186 _attr_should_poll =
False
187 _attr_source = SOURCE
188 _attr_unit_of_measurement = UnitOfLength.KILOMETERS
191 self, feed_manager: NswRuralFireServiceFeedEntityManager, external_id: str
193 """Initialize entity with data from feed entry."""
209 """Call when entity is added to hass."""
212 SIGNAL_DELETE_ENTITY.format(self.
_external_id_external_id),
217 SIGNAL_UPDATE_ENTITY.format(self.
_external_id_external_id),
222 """Call when entity will be removed from hass."""
228 """Remove this entity."""
229 self.
hasshass.async_create_task(self.
async_removeasync_remove(force_remove=
True))
233 """Call update method."""
237 """Update this entity from the data held in the feed manager."""
238 _LOGGER.debug(
"Updating %s", self.
_external_id_external_id)
244 self, feed_entry: NswRuralFireServiceIncidentsFeedEntry
246 """Update the internal state from the provided feed entry."""
252 self.
_category_category = feed_entry.category
254 self.
_location_location = feed_entry.location
256 self.
_status_status = feed_entry.status
257 self.
_type_type = feed_entry.type
258 self.
_fire_fire = feed_entry.fire
259 self.
_size_size = feed_entry.size
264 """Return the icon to use in the frontend."""
267 return "mdi:alarm-light"
271 """Return the device state attributes."""
276 (ATTR_CATEGORY, self.
_category_category),
277 (ATTR_LOCATION, self.
_location_location),
280 (ATTR_STATUS, self.
_status_status),
281 (ATTR_TYPE, self.
_type_type),
282 (ATTR_FIRE, self.
_fire_fire),
283 (ATTR_SIZE, self.
_size_size),
286 if value
or isinstance(value, bool)
_track_time_remove_callback
None _remove_entity(self, str external_id)
None _update_entity(self, str external_id)
NswRuralFireServiceIncidentsFeedEntry|None get_entry(self, str external_id)
None __init__(self, HomeAssistant hass, AddEntitiesCallback async_add_entities, timedelta scan_interval, tuple[float, float] coordinates, float radius_in_km, list[str] categories)
None _generate_entity(self, str external_id)
None _delete_callback(self)
None async_will_remove_from_hass(self)
None _update_callback(self)
dict[str, Any] extra_state_attributes(self)
None __init__(self, NswRuralFireServiceFeedEntityManager feed_manager, str external_id)
None _update_from_feed(self, NswRuralFireServiceIncidentsFeedEntry feed_entry)
None async_added_to_hass(self)
None async_schedule_update_ha_state(self, bool force_refresh=False)
None async_remove(self, *bool force_remove=False)
config_entries.ConfigEntry|None get_entry(HomeAssistant hass, websocket_api.ActiveConnection connection, str entry_id, int msg_id)
IssData update(pyiss.ISS iss)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)