1 """Data Update coordinator for ZAMG weather data."""
3 from __future__
import annotations
5 from zamg
import ZamgData
as ZamgDevice
6 from zamg.exceptions
import ZamgError, ZamgNoDataError
13 from .const
import CONF_STATION_ID, DOMAIN, LOGGER, MIN_TIME_BETWEEN_UPDATES
17 """Class to manage fetching ZAMG weather data."""
19 config_entry: ConfigEntry
21 api_fields: list[str] |
None =
None
29 """Initialize global ZAMG data updater."""
31 self.
zamgzamg.set_default_station(entry.data[CONF_STATION_ID])
36 update_interval=MIN_TIME_BETWEEN_UPDATES,
40 """Fetch data from ZAMG api."""
43 self.
zamgzamg.set_parameters(self.api_fields)
44 self.
zamgzamg.request_timeout = 60.0
46 except ZamgNoDataError
as error:
48 except ZamgError
as error:
49 raise UpdateFailed(f
"Invalid response from API: {error}")
from error
51 self.
datadatadata[
"last_update"] = self.
zamgzamg.last_update
52 self.
datadatadata[
"Name"] = self.
zamgzamg.get_station_name
ZamgDevice _async_update_data(self)
None __init__(self, HomeAssistant hass, *ConfigEntry entry)
IssData update(pyiss.ISS iss)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)