1 """Data coordinator for the dwd_weather_warnings integration."""
3 from __future__
import annotations
5 from dwdwfsapi
import DwdWeatherWarningsAPI
13 CONF_REGION_DEVICE_TRACKER,
14 CONF_REGION_IDENTIFIER,
15 DEFAULT_SCAN_INTERVAL,
19 from .exceptions
import EntityNotFoundError
20 from .util
import get_position_data
22 type DwdWeatherWarningsConfigEntry = ConfigEntry[DwdWeatherWarningsCoordinator]
26 """Custom coordinator for the dwd_weather_warnings integration."""
28 config_entry: DwdWeatherWarningsConfigEntry
29 api: DwdWeatherWarningsAPI
31 def __init__(self, hass: HomeAssistant) ->
None:
32 """Initialize the dwd_weather_warnings coordinator."""
34 hass, LOGGER, name=DOMAIN, update_interval=DEFAULT_SCAN_INTERVAL
41 """Set up coordinator."""
42 if region_identifier := self.
config_entryconfig_entry.data.get(CONF_REGION_IDENTIFIER):
43 self.
apiapi = await self.
hasshass.async_add_executor_job(
44 DwdWeatherWarningsAPI, region_identifier
48 CONF_REGION_DEVICE_TRACKER
52 """Get the latest data from the DWD Weather Warnings API."""
56 except (EntityNotFoundError, AttributeError)
as err:
57 raise UpdateFailed(f
"Error fetching position: {err!r}")
from err
61 distance = location.distance(
68 if distance
is None or distance > 50:
73 self.
apiapi = await self.
hasshass.async_add_executor_job(
74 DwdWeatherWarningsAPI, position
78 await self.
hasshass.async_add_executor_job(self.
apiapi.update)
82 await self.
hasshass.async_add_executor_job(self.
apiapi.update)
None __init__(self, HomeAssistant hass)
None _async_update_data(self)
tuple[float, float]|None get_position_data(HomeAssistant hass, str registry_id)