1 """Provides the Canary DataUpdateCoordinator."""
3 from __future__
import annotations
6 from collections.abc
import ValuesView
7 from datetime
import timedelta
10 from canary.api
import Api
11 from canary.model
import Location, Reading
12 from requests.exceptions
import ConnectTimeout, HTTPError
17 from .const
import DOMAIN
18 from .model
import CanaryData
20 _LOGGER = logging.getLogger(__name__)
24 """Class to manage fetching Canary data."""
26 def __init__(self, hass: HomeAssistant, *, api: Api) ->
None:
27 """Initialize global Canary data updater."""
35 update_interval=update_interval,
39 """Fetch data from Canary via sync functions."""
40 locations_by_id: dict[str, Location] = {}
41 readings_by_device_id: dict[str, ValuesView[Reading]] = {}
43 for location
in self.
canarycanary.get_locations():
44 location_id = location.location_id
45 locations_by_id[location_id] = location
47 for device
in location.devices:
49 readings_by_device_id[device.device_id] = (
50 self.
canarycanary.get_latest_readings(device.device_id)
54 "locations": locations_by_id,
55 "readings": readings_by_device_id,
59 """Fetch data from Canary."""
62 async
with asyncio.timeout(15):
63 return await self.
hasshass.async_add_executor_job(self.
_update_data_update_data)
64 except (ConnectTimeout, HTTPError)
as error:
65 raise UpdateFailed(f
"Invalid response from API: {error}")
from error
None __init__(self, HomeAssistant hass, *Api api)
CanaryData _update_data(self)
CanaryData _async_update_data(self)