1 """Weather data coordinator for the AEMET OpenData service."""
3 from __future__
import annotations
5 from asyncio
import timeout
6 from dataclasses
import dataclass
7 from datetime
import timedelta
9 from typing
import Any, Final, cast
11 from aemet_opendata.const
import (
18 from aemet_opendata.exceptions
import AemetError
19 from aemet_opendata.helpers
import dict_nested_value
20 from aemet_opendata.interface
import AEMET
27 from .const
import CONDITIONS_MAP, DOMAIN, FORECAST_MAP
29 _LOGGER = logging.getLogger(__name__)
31 API_TIMEOUT: Final[int] = 120
34 type AemetConfigEntry = ConfigEntry[AemetData]
39 """Aemet runtime data."""
42 coordinator: WeatherUpdateCoordinator
46 """Weather data update coordinator."""
51 entry: AemetConfigEntry,
54 """Initialize coordinator."""
62 update_interval=WEATHER_UPDATE_INTERVAL,
66 """Update coordinator data."""
67 async
with timeout(API_TIMEOUT):
70 except AemetError
as error:
77 AOD_FORECAST_DAILY: self.
aemet_forecastaemet_forecast(data, AOD_FORECAST_DAILY),
78 AOD_FORECAST_HOURLY: self.
aemet_forecastaemet_forecast(data, AOD_FORECAST_HOURLY),
88 """Return the forecast array."""
89 forecasts = dict_nested_value(data, [AOD_TOWN, forecast_mode, AOD_FORECAST])
90 forecast_map = FORECAST_MAP[forecast_mode]
91 forecast_list: list[dict[str, Any]] = []
92 for forecast
in forecasts:
93 cur_forecast: dict[str, Any] = {}
94 for api_key, ha_key
in forecast_map.items():
95 value = forecast[api_key]
96 if api_key == AOD_CONDITION:
97 value = CONDITIONS_MAP.get(value)
98 cur_forecast[ha_key] = value
99 forecast_list += [cur_forecast]
100 return cast(list[Forecast], forecast_list)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, AemetConfigEntry entry, AEMET aemet)
list[Forecast] aemet_forecast(self, dict[str, Any] data, str forecast_mode)
IssData update(pyiss.ISS iss)