Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Provides the Canary DataUpdateCoordinator."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import ValuesView
7 from datetime import timedelta
8 import logging
9 
10 from canary.api import Api
11 from canary.model import Location, Reading
12 from requests.exceptions import ConnectTimeout, HTTPError
13 
14 from homeassistant.core import HomeAssistant
15 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
16 
17 from .const import DOMAIN
18 from .model import CanaryData
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 
24  """Class to manage fetching Canary data."""
25 
26  def __init__(self, hass: HomeAssistant, *, api: Api) -> None:
27  """Initialize global Canary data updater."""
28  self.canarycanary = api
29  update_interval = timedelta(seconds=30)
30 
31  super().__init__(
32  hass,
33  _LOGGER,
34  name=DOMAIN,
35  update_interval=update_interval,
36  )
37 
38  def _update_data(self) -> CanaryData:
39  """Fetch data from Canary via sync functions."""
40  locations_by_id: dict[str, Location] = {}
41  readings_by_device_id: dict[str, ValuesView[Reading]] = {}
42 
43  for location in self.canarycanary.get_locations():
44  location_id = location.location_id
45  locations_by_id[location_id] = location
46 
47  for device in location.devices:
48  if device.is_online:
49  readings_by_device_id[device.device_id] = (
50  self.canarycanary.get_latest_readings(device.device_id)
51  )
52 
53  return {
54  "locations": locations_by_id,
55  "readings": readings_by_device_id,
56  }
57 
58  async def _async_update_data(self) -> CanaryData:
59  """Fetch data from Canary."""
60 
61  try:
62  async with asyncio.timeout(15):
63  return await self.hasshass.async_add_executor_job(self._update_data_update_data)
64  except (ConnectTimeout, HTTPError) as error:
65  raise UpdateFailed(f"Invalid response from API: {error}") from error