Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """SwitchBot Cloud coordinator."""
2 
3 from asyncio import timeout
4 from logging import getLogger
5 from typing import Any
6 
7 from switchbot_api import CannotConnect, Device, Remote, SwitchBotAPI
8 
9 from homeassistant.core import HomeAssistant
10 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
11 
12 from .const import DEFAULT_SCAN_INTERVAL, DOMAIN
13 
14 _LOGGER = getLogger(__name__)
15 
16 type Status = dict[str, Any] | None
17 
18 
20  """SwitchBot Cloud coordinator."""
21 
22  _api: SwitchBotAPI
23  _device_id: str
24 
25  def __init__(
26  self, hass: HomeAssistant, api: SwitchBotAPI, device: Device | Remote
27  ) -> None:
28  """Initialize SwitchBot Cloud."""
29  super().__init__(
30  hass,
31  _LOGGER,
32  name=DOMAIN,
33  update_interval=DEFAULT_SCAN_INTERVAL,
34  )
35  self._api_api = api
36  self._device_id_device_id = device.device_id
37  self._should_poll_should_poll = not isinstance(device, Remote)
38 
39  async def _async_update_data(self) -> Status:
40  """Fetch data from API endpoint."""
41  if not self._should_poll_should_poll:
42  return None
43  try:
44  _LOGGER.debug("Refreshing %s", self._device_id_device_id)
45  async with timeout(10):
46  status: Status = await self._api_api.get_status(self._device_id_device_id)
47  _LOGGER.debug("Refreshing %s with %s", self._device_id_device_id, status)
48  return status
49  except CannotConnect as err:
50  raise UpdateFailed(f"Error communicating with API: {err}") from err
None __init__(self, HomeAssistant hass, SwitchBotAPI api, Device|Remote device)
Definition: coordinator.py:27
def get_status(hass, host, port)
Definition: panel.py:387