Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for Linear."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Awaitable, Callable
6 from dataclasses import dataclass
7 from datetime import timedelta
8 import logging
9 from typing import Any
10 
11 from linear_garage_door import Linear
12 from linear_garage_door.errors import InvalidLoginError
13 
14 from homeassistant.config_entries import ConfigEntry
15 from homeassistant.core import HomeAssistant
16 from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
17 from homeassistant.helpers.aiohttp_client import async_get_clientsession
18 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 
23 @dataclass
25  """Linear device dataclass."""
26 
27  name: str
28  subdevices: dict[str, dict[str, str]]
29 
30 
31 class LinearUpdateCoordinator(DataUpdateCoordinator[dict[str, LinearDevice]]):
32  """DataUpdateCoordinator for Linear."""
33 
34  _devices: list[dict[str, Any]] | None = None
35  config_entry: ConfigEntry
36 
37  def __init__(self, hass: HomeAssistant) -> None:
38  """Initialize DataUpdateCoordinator for Linear."""
39  super().__init__(
40  hass,
41  _LOGGER,
42  name="Linear Garage Door",
43  update_interval=timedelta(seconds=60),
44  )
45  self.site_idsite_id = self.config_entryconfig_entry.data["site_id"]
46 
47  async def _async_update_data(self) -> dict[str, LinearDevice]:
48  """Get the data for Linear."""
49 
50  async def update_data(linear: Linear) -> dict[str, Any]:
51  if not self._devices_devices:
52  self._devices_devices = await linear.get_devices(self.site_idsite_id)
53 
54  data = {}
55 
56  for device in self._devices_devices:
57  device_id = str(device["id"])
58  state = await linear.get_device_state(device_id)
59  data[device_id] = LinearDevice(device["name"], state)
60  return data
61 
62  return await self.execute(update_data)
63 
64  async def execute[_T](self, func: Callable[[Linear], Awaitable[_T]]) -> _T:
65  """Execute an API call."""
66  linear = Linear()
67  try:
68  await linear.login(
69  email=self.config_entryconfig_entry.data["email"],
70  password=self.config_entryconfig_entry.data["password"],
71  device_id=self.config_entryconfig_entry.data["device_id"],
72  client_session=async_get_clientsession(self.hasshass),
73  )
74  except InvalidLoginError as err:
75  if (
76  str(err)
77  == "Login error: Login provided is invalid, please check the email and password"
78  ):
79  raise ConfigEntryAuthFailed from err
80  raise ConfigEntryNotReady from err
81  result = await func(linear)
82  await linear.close()
83  return result
def execute(hass, filename, source, data=None, return_response=False)
Definition: __init__.py:194
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)