1 """DataUpdateCoordinator for the Ambient Weather Network integration."""
3 from __future__
import annotations
5 from datetime
import datetime, timedelta
6 from typing
import Any, cast
8 from aioambient
import OpenAPI
9 from aioambient.errors
import RequestError
17 from .const
import API_LAST_DATA, DOMAIN, LOGGER
18 from .helper
import get_station_name
24 """The Ambient Network Data Update Coordinator."""
26 config_entry: ConfigEntry
28 last_measured: datetime |
None =
None
30 def __init__(self, hass: HomeAssistant, api: OpenAPI) ->
None:
31 """Initialize the coordinator."""
32 super().
__init__(hass, LOGGER, name=DOMAIN, update_interval=SCAN_INTERVAL)
36 """Fetch the latest data from the Ambient Network."""
39 response = await self.
apiapi.get_device_details(
42 except RequestError
as ex:
43 raise UpdateFailed(
"Cannot connect to Ambient Network")
from ex
47 if (last_data := response.get(API_LAST_DATA))
is None:
49 f
"Station '{self.config_entry.title}' did not report any data"
54 if (ts := last_data.get(
"created_at"))
is not None or (
55 ts := last_data.get(
"dateutc")
58 ts / 1000, tz=dt_util.DEFAULT_TIME_ZONE
61 return cast(dict[str, Any], last_data)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, OpenAPI api)
str get_station_name(dict[str, Any] station)