Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the ultraheat api."""
2 
3 import asyncio
4 import logging
5 
6 import serial
7 from ultraheat_api.response import HeatMeterResponse
8 from ultraheat_api.service import HeatMeterService
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import POLLING_INTERVAL, ULTRAHEAT_TIMEOUT
14 
15 _LOGGER = logging.getLogger(__name__)
16 
17 
18 class UltraheatCoordinator(DataUpdateCoordinator[HeatMeterResponse]):
19  """Coordinator for getting data from the ultraheat api."""
20 
21  def __init__(self, hass: HomeAssistant, api: HeatMeterService) -> None:
22  """Initialize my coordinator."""
23  super().__init__(
24  hass,
25  _LOGGER,
26  name="ultraheat",
27  update_interval=POLLING_INTERVAL,
28  )
29  self.apiapi = api
30 
31  async def _async_update_data(self) -> HeatMeterResponse:
32  """Fetch data from API endpoint."""
33  try:
34  async with asyncio.timeout(ULTRAHEAT_TIMEOUT):
35  return await self.hasshass.async_add_executor_job(self.apiapi.read)
36  except (FileNotFoundError, serial.SerialException) as err:
37  raise UpdateFailed(f"Error communicating with API: {err}") from err
None __init__(self, HomeAssistant hass, HeatMeterService api)
Definition: coordinator.py:21