1 """Support for IGN Sismologia (Earthquakes) Feeds."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import timedelta
10 from georss_ign_sismologia_client
import (
11 IgnSismologiaFeedEntry,
12 IgnSismologiaFeedManager,
14 import voluptuous
as vol
17 PLATFORM_SCHEMA
as GEO_LOCATION_PLATFORM_SCHEMA,
25 EVENT_HOMEASSISTANT_START,
35 _LOGGER = logging.getLogger(__name__)
37 ATTR_EXTERNAL_ID =
"external_id"
38 ATTR_IMAGE_URL =
"image_url"
39 ATTR_MAGNITUDE =
"magnitude"
40 ATTR_PUBLICATION_DATE =
"publication_date"
41 ATTR_REGION =
"region"
44 CONF_MINIMUM_MAGNITUDE =
"minimum_magnitude"
46 DEFAULT_MINIMUM_MAGNITUDE = 0.0
47 DEFAULT_RADIUS_IN_KM = 50.0
51 SOURCE =
"ign_sismologia"
53 PLATFORM_SCHEMA = GEO_LOCATION_PLATFORM_SCHEMA.extend(
55 vol.Optional(CONF_LATITUDE): cv.latitude,
56 vol.Optional(CONF_LONGITUDE): cv.longitude,
57 vol.Optional(CONF_RADIUS, default=DEFAULT_RADIUS_IN_KM): vol.Coerce(float),
59 CONF_MINIMUM_MAGNITUDE, default=DEFAULT_MINIMUM_MAGNITUDE
68 add_entities: AddEntitiesCallback,
69 discovery_info: DiscoveryInfoType |
None =
None,
71 """Set up the IGN Sismologia Feed platform."""
72 scan_interval: timedelta = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
73 coordinates: tuple[float, float] = (
74 config.get(CONF_LATITUDE, hass.config.latitude),
75 config.get(CONF_LONGITUDE, hass.config.longitude),
77 radius_in_km: float = config[CONF_RADIUS]
78 minimum_magnitude: float = config[CONF_MINIMUM_MAGNITUDE]
81 hass, add_entities, scan_interval, coordinates, radius_in_km, minimum_magnitude
84 def start_feed_manager(event: Event) ->
None:
85 """Start feed manager."""
88 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_feed_manager)
92 """Feed Entity Manager for IGN Sismologia GeoRSS feed."""
97 add_entities: AddEntitiesCallback,
98 scan_interval: timedelta,
99 coordinates: tuple[float, float],
101 minimum_magnitude: float,
103 """Initialize the Feed Entity Manager."""
111 filter_radius=radius_in_km,
112 filter_minimum_magnitude=minimum_magnitude,
118 """Start up this manager."""
123 """Schedule regular updates at the specified interval."""
128 def get_entry(self, external_id: str) -> IgnSismologiaFeedEntry |
None:
129 """Get feed entry by external id."""
130 return self.
_feed_manager_feed_manager.feed_entries.get(external_id)
133 """Generate new entity."""
148 """Represents an external event with IGN Sismologia feed data."""
150 _attr_icon =
"mdi:pulse"
151 _attr_should_poll =
False
152 _attr_source = SOURCE
153 _attr_unit_of_measurement = UnitOfLength.KILOMETERS
156 self, feed_manager: IgnSismologiaFeedEntityManager, external_id: str
158 """Initialize entity with data from feed entry."""
170 """Call when entity is added to hass."""
173 f
"ign_sismologia_delete_{self._external_id}",
178 f
"ign_sismologia_update_{self._external_id}",
184 """Remove this entity."""
187 self.
hasshass.async_create_task(self.
async_removeasync_remove(force_remove=
True))
191 """Call update method."""
195 """Update this entity from the data held in the feed manager."""
196 _LOGGER.debug(
"Updating %s", self.
_external_id_external_id)
202 """Update the internal state from the provided feed entry."""
203 self.
_title_title = feed_entry.title
208 self.
_region_region = feed_entry.region
209 self.
_magnitude_magnitude = feed_entry.magnitude
211 self.
_image_url_image_url = feed_entry.image_url
215 """Return the name of the entity."""
217 return f
"M {self._magnitude:.1f} - {self._region}"
219 return f
"M {self._magnitude:.1f}"
226 """Return the device state attributes."""
231 (ATTR_TITLE, self.
_title_title),
232 (ATTR_REGION, self.
_region_region),
237 if value
or isinstance(value, bool)
None __init__(self, HomeAssistant hass, AddEntitiesCallback add_entities, timedelta scan_interval, tuple[float, float] coordinates, float radius_in_km, float minimum_magnitude)
IgnSismologiaFeedEntry|None get_entry(self, str external_id)
None _init_regular_updates(self)
None _generate_entity(self, str external_id)
None _remove_entity(self, str external_id)
None _update_entity(self, str external_id)
None _delete_callback(self)
None __init__(self, IgnSismologiaFeedEntityManager feed_manager, str external_id)
None async_added_to_hass(self)
None _update_callback(self)
dict[str, Any] extra_state_attributes(self)
None _update_from_feed(self, IgnSismologiaFeedEntry feed_entry)
None async_schedule_update_ha_state(self, bool force_refresh=False)
None async_remove(self, *bool force_remove=False)
config_entries.ConfigEntry|None get_entry(HomeAssistant hass, websocket_api.ActiveConnection connection, str entry_id, int msg_id)
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
IssData update(pyiss.ISS iss)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None dispatcher_send(HomeAssistant hass, str signal, *Any args)