1 """Update coordinators for rainbird."""
3 from __future__
import annotations
6 from dataclasses
import dataclass
11 from pyrainbird.async_client
import (
12 AsyncRainbirdController,
14 RainbirdDeviceBusyException,
16 from pyrainbird.data
import ModelAndVersion, Schedule
24 from .const
import DOMAIN, MANUFACTURER, TIMEOUT_SECONDS
26 UPDATE_INTERVAL = datetime.timedelta(minutes=1)
29 CALENDAR_UPDATE_INTERVAL = datetime.timedelta(minutes=15)
33 DEBOUNCER_COOLDOWN = 5
38 _LOGGER = logging.getLogger(__name__)
43 """Data retrieved from a Rain Bird device."""
46 active_zones: set[int]
52 """Create a rainbird async_create_clientsession with a connection limit."""
53 return aiohttp.ClientSession(
54 connector=aiohttp.TCPConnector(limit=CONECTION_LIMIT),
59 """Coordinator for rainbird API calls."""
65 controller: AsyncRainbirdController,
66 unique_id: str |
None,
67 model_info: ModelAndVersion,
69 """Initialize RainbirdUpdateCoordinator."""
74 update_interval=UPDATE_INTERVAL,
76 hass, _LOGGER, cooldown=DEBOUNCER_COOLDOWN, immediate=
False
81 self._zones: set[int] |
None =
None
86 """Return the API client for the device."""
91 """Return the config entry unique id."""
96 """Device name for the rainbird controller."""
97 return f
"{MANUFACTURER} Controller"
101 """Return information about the device."""
106 identifiers={(DOMAIN, self.
_unique_id_unique_id)},
107 manufacturer=MANUFACTURER,
109 sw_version=f
"{self._model_info.major}.{self._model_info.minor}",
113 """Fetch data from Rain Bird device."""
115 async
with asyncio.timeout(TIMEOUT_SECONDS):
117 except RainbirdDeviceBusyException
as err:
119 except RainbirdApiException
as err:
123 """Fetch data from the Rain Bird device.
125 Rainbird devices can only reliably handle a single request at a time,
126 so the requests are sent serially.
128 available_stations = await self.
_controller_controller.get_available_stations()
129 states = await self.
_controller_controller.get_zone_states()
130 rain = await self.
_controller_controller.get_rain_sensor_state()
131 rain_delay = await self.
_controller_controller.get_rain_delay()
133 zones=available_stations.active_set,
134 active_zones=states.active_set,
136 rain_delay=rain_delay,
141 """Coordinator for rainbird irrigation schedule calls."""
143 config_entry: ConfigEntry
149 controller: AsyncRainbirdController,
151 """Initialize ZoneStateUpdateCoordinator."""
157 update_interval=CALENDAR_UPDATE_INTERVAL,
162 """Fetch data from Rain Bird device."""
164 async
with asyncio.timeout(TIMEOUT_SECONDS):
165 return await self.
_controller_controller.get_schedule()
166 except RainbirdApiException
as err:
167 raise UpdateFailed(f
"Error communicating with Device: {err}")
from err
None __init__(self, HomeAssistant hass, str name, AsyncRainbirdController controller)
Schedule _async_update_data(self)
RainbirdDeviceState _async_update_data(self)
None __init__(self, HomeAssistant hass, str name, AsyncRainbirdController controller, str|None unique_id, ModelAndVersion model_info)
RainbirdDeviceState _fetch_data(self)
DeviceInfo|None device_info(self)
AsyncRainbirdController controller(self)
_DataT _async_update_data(self)
aiohttp.ClientSession async_create_clientsession()