Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The GeoNet NZ Volcano integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime, timedelta
6 import logging
7 
8 from aio_geojson_geonetnz_volcano import GeonetnzVolcanoFeedManager
9 import voluptuous as vol
10 
11 from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
12 from homeassistant.const import (
13  CONF_LATITUDE,
14  CONF_LONGITUDE,
15  CONF_RADIUS,
16  CONF_SCAN_INTERVAL,
17  CONF_UNIT_SYSTEM,
18  UnitOfLength,
19 )
20 from homeassistant.core import HomeAssistant, callback
21 from homeassistant.helpers import aiohttp_client, config_validation as cv
22 from homeassistant.helpers.dispatcher import async_dispatcher_send
23 from homeassistant.helpers.event import async_track_time_interval
24 from homeassistant.helpers.typing import ConfigType
25 from homeassistant.util.unit_conversion import DistanceConverter
26 
27 from .config_flow import configured_instances
28 from .const import (
29  DEFAULT_RADIUS,
30  DEFAULT_SCAN_INTERVAL,
31  DOMAIN,
32  FEED,
33  IMPERIAL_UNITS,
34  PLATFORMS,
35 )
36 
37 _LOGGER = logging.getLogger(__name__)
38 
39 CONFIG_SCHEMA = vol.Schema(
40  {
41  DOMAIN: vol.Schema(
42  {
43  vol.Optional(CONF_LATITUDE): cv.latitude,
44  vol.Optional(CONF_LONGITUDE): cv.longitude,
45  vol.Optional(CONF_RADIUS, default=DEFAULT_RADIUS): vol.Coerce(float),
46  vol.Optional(
47  CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
48  ): cv.time_period,
49  }
50  )
51  },
52  extra=vol.ALLOW_EXTRA,
53 )
54 
55 
56 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
57  """Set up the GeoNet NZ Volcano component."""
58  if DOMAIN not in config:
59  return True
60 
61  conf = config[DOMAIN]
62 
63  latitude = conf.get(CONF_LATITUDE, hass.config.latitude)
64  longitude = conf.get(CONF_LONGITUDE, hass.config.longitude)
65  scan_interval = conf[CONF_SCAN_INTERVAL]
66 
67  identifier = f"{latitude}, {longitude}"
68  if identifier in configured_instances(hass):
69  return True
70 
71  hass.async_create_task(
72  hass.config_entries.flow.async_init(
73  DOMAIN,
74  context={"source": SOURCE_IMPORT},
75  data={
76  CONF_LATITUDE: latitude,
77  CONF_LONGITUDE: longitude,
78  CONF_RADIUS: conf[CONF_RADIUS],
79  CONF_SCAN_INTERVAL: scan_interval,
80  },
81  )
82  )
83 
84  return True
85 
86 
87 async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
88  """Set up the GeoNet NZ Volcano component as config entry."""
89  hass.data.setdefault(DOMAIN, {})
90  hass.data[DOMAIN].setdefault(FEED, {})
91 
92  radius = config_entry.data[CONF_RADIUS]
93  unit_system = config_entry.data[CONF_UNIT_SYSTEM]
94  if unit_system == IMPERIAL_UNITS:
95  radius = DistanceConverter.convert(
96  radius, UnitOfLength.MILES, UnitOfLength.KILOMETERS
97  )
98  # Create feed entity manager for all platforms.
99  manager = GeonetnzVolcanoFeedEntityManager(hass, config_entry, radius, unit_system)
100  hass.data[DOMAIN][FEED][config_entry.entry_id] = manager
101  _LOGGER.debug("Feed entity manager added for %s", config_entry.entry_id)
102  await manager.async_init()
103  return True
104 
105 
106 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
107  """Unload an GeoNet NZ Volcano component config entry."""
108  manager = hass.data[DOMAIN][FEED].pop(entry.entry_id)
109  await manager.async_stop()
110  return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
111 
112 
114  """Feed Entity Manager for GeoNet NZ Volcano feed."""
115 
116  def __init__(self, hass, config_entry, radius_in_km, unit_system):
117  """Initialize the Feed Entity Manager."""
118  self._hass_hass = hass
119  self._config_entry_config_entry = config_entry
120  coordinates = (
121  config_entry.data[CONF_LATITUDE],
122  config_entry.data[CONF_LONGITUDE],
123  )
124  websession = aiohttp_client.async_get_clientsession(hass)
125  self._feed_manager_feed_manager = GeonetnzVolcanoFeedManager(
126  websession,
127  self._generate_entity_generate_entity,
128  self._update_entity_update_entity,
129  self._remove_entity_remove_entity,
130  coordinates,
131  filter_radius=radius_in_km,
132  )
133  self._config_entry_id_config_entry_id = config_entry.entry_id
134  self._scan_interval_scan_interval = timedelta(seconds=config_entry.data[CONF_SCAN_INTERVAL])
135  self._unit_system_unit_system = unit_system
136  self._track_time_remove_callback_track_time_remove_callback = None
137  self.listenerslisteners = []
138 
139  async def async_init(self):
140  """Schedule initial and regular updates based on configured time interval."""
141 
142  await self._hass_hass.config_entries.async_forward_entry_setups(
143  self._config_entry_config_entry, PLATFORMS
144  )
145 
146  async def update(event_time):
147  """Update."""
148  await self.async_updateasync_update()
149 
150  # Trigger updates at regular intervals.
151  self._track_time_remove_callback_track_time_remove_callback = async_track_time_interval(
152  self._hass_hass, update, self._scan_interval_scan_interval
153  )
154 
155  _LOGGER.debug("Feed entity manager initialized")
156 
157  async def async_update(self):
158  """Refresh data."""
159  await self._feed_manager_feed_manager.update()
160  _LOGGER.debug("Feed entity manager updated")
161 
162  async def async_stop(self):
163  """Stop this feed entity manager from refreshing."""
164  for unsub_dispatcher in self.listenerslisteners:
165  unsub_dispatcher()
166  self.listenerslisteners = []
167  if self._track_time_remove_callback_track_time_remove_callback:
168  self._track_time_remove_callback_track_time_remove_callback()
169  _LOGGER.debug("Feed entity manager stopped")
170 
171  @callback
173  """Return manager specific event to signal new entity."""
174  return f"geonetnz_volcano_new_sensor_{self._config_entry_id}"
175 
176  def get_entry(self, external_id):
177  """Get feed entry by external id."""
178  return self._feed_manager_feed_manager.feed_entries.get(external_id)
179 
180  def last_update(self) -> datetime | None:
181  """Return the last update of this feed."""
182  return self._feed_manager_feed_manager.last_update
183 
184  def last_update_successful(self) -> datetime | None:
185  """Return the last successful update of this feed."""
186  return self._feed_manager_feed_manager.last_update_successful
187 
188  async def _generate_entity(self, external_id):
189  """Generate new entity."""
191  self._hass_hass,
192  self.async_event_new_entityasync_event_new_entity(),
193  self,
194  external_id,
195  self._unit_system_unit_system,
196  )
197 
198  async def _update_entity(self, external_id):
199  """Update entity."""
200  async_dispatcher_send(self._hass_hass, f"geonetnz_volcano_update_{external_id}")
201 
202  async def _remove_entity(self, external_id):
203  """Ignore removing entity."""
def __init__(self, hass, config_entry, radius_in_km, unit_system)
Definition: __init__.py:116
set[str] configured_instances(HomeAssistant hass)
Definition: config_flow.py:29
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:106
bool async_setup_entry(HomeAssistant hass, ConfigEntry config_entry)
Definition: __init__.py:87
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:56
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679