1 """Data update coordinator for the ultraheat api."""
7 from ultraheat_api.response
import HeatMeterResponse
8 from ultraheat_api.service
import HeatMeterService
13 from .const
import POLLING_INTERVAL, ULTRAHEAT_TIMEOUT
15 _LOGGER = logging.getLogger(__name__)
19 """Coordinator for getting data from the ultraheat api."""
21 def __init__(self, hass: HomeAssistant, api: HeatMeterService) ->
None:
22 """Initialize my coordinator."""
27 update_interval=POLLING_INTERVAL,
32 """Fetch data from API endpoint."""
34 async
with asyncio.timeout(ULTRAHEAT_TIMEOUT):
35 return await self.
hasshass.async_add_executor_job(self.
apiapi.read)
36 except (FileNotFoundError, serial.SerialException)
as err:
37 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
None __init__(self, HomeAssistant hass, HeatMeterService api)
HeatMeterResponse _async_update_data(self)