Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The microBees Coordinator."""
2 
3 import asyncio
4 from dataclasses import dataclass
5 from datetime import timedelta
6 from http import HTTPStatus
7 import logging
8 
9 import aiohttp
10 from microBeesPy import Actuator, Bee, MicroBees, MicroBeesException, Sensor
11 
12 from homeassistant.core import HomeAssistant
13 from homeassistant.exceptions import ConfigEntryAuthFailed
14 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 
19 @dataclass
21  """Microbees data from the Coordinator."""
22 
23  bees: dict[int, Bee]
24  actuators: dict[int, Actuator]
25  sensors: dict[int, Sensor]
26 
27 
28 class MicroBeesUpdateCoordinator(DataUpdateCoordinator[MicroBeesCoordinatorData]):
29  """MicroBees coordinator."""
30 
31  def __init__(self, hass: HomeAssistant, microbees: MicroBees) -> None:
32  """Initialize microBees coordinator."""
33  super().__init__(
34  hass,
35  _LOGGER,
36  name="microBees Coordinator",
37  update_interval=timedelta(seconds=30),
38  )
39  self.microbeesmicrobees = microbees
40 
41  async def _async_update_data(self) -> MicroBeesCoordinatorData:
42  """Fetch data from API endpoint."""
43  async with asyncio.timeout(10):
44  try:
45  bees = await self.microbeesmicrobees.getBees()
46  except aiohttp.ClientResponseError as err:
47  if err.status is HTTPStatus.UNAUTHORIZED:
49  "Token not valid, trigger renewal"
50  ) from err
51  raise UpdateFailed(f"Error communicating with API: {err}") from err
52 
53  except MicroBeesException as err:
54  raise UpdateFailed(f"Error communicating with API: {err}") from err
55 
56  bees_dict = {}
57  actuators_dict = {}
58  sensors_dict = {}
59  for bee in bees:
60  bees_dict[bee.id] = bee
61  for actuator in bee.actuators:
62  actuators_dict[actuator.id] = actuator
63  for sensor in bee.sensors:
64  sensors_dict[sensor.id] = sensor
66  bees=bees_dict, actuators=actuators_dict, sensors=sensors_dict
67  )
None __init__(self, HomeAssistant hass, MicroBees microbees)
Definition: coordinator.py:31