1 """The microBees Coordinator."""
4 from dataclasses
import dataclass
5 from datetime
import timedelta
6 from http
import HTTPStatus
10 from microBeesPy
import Actuator, Bee, MicroBees, MicroBeesException, Sensor
16 _LOGGER = logging.getLogger(__name__)
21 """Microbees data from the Coordinator."""
24 actuators: dict[int, Actuator]
25 sensors: dict[int, Sensor]
29 """MicroBees coordinator."""
31 def __init__(self, hass: HomeAssistant, microbees: MicroBees) ->
None:
32 """Initialize microBees coordinator."""
36 name=
"microBees Coordinator",
42 """Fetch data from API endpoint."""
43 async
with asyncio.timeout(10):
45 bees = await self.
microbeesmicrobees.getBees()
46 except aiohttp.ClientResponseError
as err:
47 if err.status
is HTTPStatus.UNAUTHORIZED:
49 "Token not valid, trigger renewal"
51 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
53 except MicroBeesException
as err:
54 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
60 bees_dict[bee.id] = bee
61 for actuator
in bee.actuators:
62 actuators_dict[actuator.id] = actuator
63 for sensor
in bee.sensors:
64 sensors_dict[sensor.id] = sensor
66 bees=bees_dict, actuators=actuators_dict, sensors=sensors_dict
None __init__(self, HomeAssistant hass, MicroBees microbees)
MicroBeesCoordinatorData _async_update_data(self)