1 """Support for Queensland Bushfire Alert Feeds."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import timedelta
10 from georss_qld_bushfire_alert_client
import (
11 QldBushfireAlertFeedEntry,
12 QldBushfireAlertFeedManager,
14 import voluptuous
as vol
17 PLATFORM_SCHEMA
as GEO_LOCATION_PLATFORM_SCHEMA,
25 EVENT_HOMEASSISTANT_START,
35 _LOGGER = logging.getLogger(__name__)
37 ATTR_CATEGORY =
"category"
38 ATTR_EXTERNAL_ID =
"external_id"
39 ATTR_PUBLICATION_DATE =
"publication_date"
40 ATTR_STATUS =
"status"
41 ATTR_UPDATED_DATE =
"updated_date"
43 CONF_CATEGORIES =
"categories"
45 DEFAULT_RADIUS_IN_KM = 20.0
49 SIGNAL_DELETE_ENTITY =
"qld_bushfire_delete_{}"
50 SIGNAL_UPDATE_ENTITY =
"qld_bushfire_update_{}"
52 SOURCE =
"qld_bushfire"
62 PLATFORM_SCHEMA = GEO_LOCATION_PLATFORM_SCHEMA.extend(
64 vol.Optional(CONF_LATITUDE): cv.latitude,
65 vol.Optional(CONF_LONGITUDE): cv.longitude,
66 vol.Optional(CONF_RADIUS, default=DEFAULT_RADIUS_IN_KM): vol.Coerce(float),
67 vol.Optional(CONF_CATEGORIES, default=[]): vol.All(
68 cv.ensure_list, [vol.In(VALID_CATEGORIES)]
77 add_entities: AddEntitiesCallback,
78 discovery_info: DiscoveryInfoType |
None =
None,
80 """Set up the Queensland Bushfire Alert Feed platform."""
81 scan_interval: timedelta = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
82 coordinates: tuple[float, float] = (
83 config.get(CONF_LATITUDE, hass.config.latitude),
84 config.get(CONF_LONGITUDE, hass.config.longitude),
86 radius_in_km: float = config[CONF_RADIUS]
87 categories: list[str] = config[CONF_CATEGORIES]
90 hass, add_entities, scan_interval, coordinates, radius_in_km, categories
93 def start_feed_manager(event: Event) ->
None:
94 """Start feed manager."""
97 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_feed_manager)
101 """Feed Entity Manager for Qld Bushfire Alert GeoRSS feed."""
106 add_entities: AddEntitiesCallback,
107 scan_interval: timedelta,
108 coordinates: tuple[float, float],
110 categories: list[str],
112 """Initialize the Feed Entity Manager."""
119 filter_radius=radius_in_km,
120 filter_categories=categories,
126 """Start up this manager."""
131 """Schedule regular updates at the specified interval."""
136 cancel_on_shutdown=
True,
139 def get_entry(self, external_id: str) -> QldBushfireAlertFeedEntry |
None:
140 """Get feed entry by external id."""
141 return self.
_feed_manager_feed_manager.feed_entries.get(external_id)
144 """Generate new entity."""
159 """Represents an external event with Qld Bushfire feed data."""
161 _attr_icon =
"mdi:fire"
162 _attr_should_poll =
False
163 _attr_source = SOURCE
164 _attr_unit_of_measurement = UnitOfLength.KILOMETERS
167 self, feed_manager: QldBushfireFeedEntityManager, external_id: str
169 """Initialize entity with data from feed entry."""
180 """Call when entity is added to hass."""
183 SIGNAL_DELETE_ENTITY.format(self.
_external_id_external_id),
188 SIGNAL_UPDATE_ENTITY.format(self.
_external_id_external_id),
194 """Remove this entity."""
197 self.
hasshass.async_create_task(self.
async_removeasync_remove(force_remove=
True))
201 """Call update method."""
205 """Update this entity from the data held in the feed manager."""
206 _LOGGER.debug(
"Updating %s", self.
_external_id_external_id)
212 """Update the internal state from the provided feed entry."""
218 self.
_category_category = feed_entry.category
221 self.
_status_status = feed_entry.status
225 """Return the device state attributes."""
230 (ATTR_CATEGORY, self.
_category_category),
233 (ATTR_STATUS, self.
_status_status),
235 if value
or isinstance(value, bool)
None __init__(self, HomeAssistant hass, AddEntitiesCallback add_entities, timedelta scan_interval, tuple[float, float] coordinates, float radius_in_km, list[str] categories)
None _init_regular_updates(self)
None _remove_entity(self, str external_id)
None _update_entity(self, str external_id)
None _generate_entity(self, str external_id)
QldBushfireAlertFeedEntry|None get_entry(self, str external_id)
None _update_from_feed(self, QldBushfireAlertFeedEntry feed_entry)
None _update_callback(self)
dict[str, Any] extra_state_attributes(self)
None __init__(self, QldBushfireFeedEntityManager feed_manager, str external_id)
None _delete_callback(self)
None async_added_to_hass(self)
None async_schedule_update_ha_state(self, bool force_refresh=False)
None async_remove(self, *bool force_remove=False)
config_entries.ConfigEntry|None get_entry(HomeAssistant hass, websocket_api.ActiveConnection connection, str entry_id, int msg_id)
IssData update(pyiss.ISS iss)
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None dispatcher_send(HomeAssistant hass, str signal, *Any args)