Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data Update coordinator for ZAMG weather data."""
2 
3 from __future__ import annotations
4 
5 from zamg import ZamgData as ZamgDevice
6 from zamg.exceptions import ZamgError, ZamgNoDataError
7 
8 from homeassistant.config_entries import ConfigEntry
9 from homeassistant.core import HomeAssistant
10 from homeassistant.helpers.aiohttp_client import async_get_clientsession
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import CONF_STATION_ID, DOMAIN, LOGGER, MIN_TIME_BETWEEN_UPDATES
14 
15 
17  """Class to manage fetching ZAMG weather data."""
18 
19  config_entry: ConfigEntry
20  data: dict = {}
21  api_fields: list[str] | None = None
22 
23  def __init__(
24  self,
25  hass: HomeAssistant,
26  *,
27  entry: ConfigEntry,
28  ) -> None:
29  """Initialize global ZAMG data updater."""
30  self.zamgzamg = ZamgDevice(session=async_get_clientsession(hass))
31  self.zamgzamg.set_default_station(entry.data[CONF_STATION_ID])
32  super().__init__(
33  hass,
34  LOGGER,
35  name=DOMAIN,
36  update_interval=MIN_TIME_BETWEEN_UPDATES,
37  )
38 
39  async def _async_update_data(self) -> ZamgDevice:
40  """Fetch data from ZAMG api."""
41  try:
42  if self.api_fields:
43  self.zamgzamg.set_parameters(self.api_fields)
44  self.zamgzamg.request_timeout = 60.0
45  device = await self.zamgzamg.update()
46  except ZamgNoDataError as error:
47  raise UpdateFailed("No response from API") from error
48  except ZamgError as error:
49  raise UpdateFailed(f"Invalid response from API: {error}") from error
50  self.datadatadata = device
51  self.datadatadata["last_update"] = self.zamgzamg.last_update
52  self.datadatadata["Name"] = self.zamgzamg.get_station_name
53  return device
None __init__(self, HomeAssistant hass, *ConfigEntry entry)
Definition: coordinator.py:28
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)