1 """Coordinator object for the Rachio integration."""
3 from datetime
import datetime, timedelta
5 from operator
import itemgetter
8 from rachiopy
import Rachio
9 from requests.exceptions
import Timeout
21 KEY_PROGRAM_RUN_SUMMARIES,
26 _LOGGER = logging.getLogger(__name__)
36 """Coordinator Class for Rachio Hose Timers."""
42 config_entry: ConfigEntry,
46 """Initialize the Rachio Update Coordinator."""
53 config_entry=config_entry,
54 name=f
"{DOMAIN} update coordinator",
57 update_interval=
timedelta(minutes=(base_count + 1)),
60 hass, _LOGGER, cooldown=UPDATE_DELAY_TIME, immediate=
False
65 """Update smart hose timer data."""
67 data = await self.
hasshasshass.async_add_executor_job(
70 except Timeout
as err:
71 raise UpdateFailed(f
"Could not connect to the Rachio API: {err}")
from err
72 return {valve[KEY_ID]: valve
for valve
in data[1][KEY_VALVES]}
76 """Coordinator for fetching hose timer schedules."""
82 config_entry: ConfigEntry,
85 """Initialize a Rachio schedule coordinator."""
92 config_entry=config_entry,
93 name=f
"{DOMAIN} schedule update coordinator",
98 """Retrieve data for the past week and the next 60 days."""
99 _now: datetime = dt_util.now()
102 start: dict[str, int] = {
103 YEAR: _time_start.year,
104 MONTH: _time_start.month,
105 DAY: _time_start.day,
107 end: dict[str, int] = {
108 YEAR: _time_end.year,
109 MONTH: _time_end.month,
114 schedule = await self.
hasshasshass.async_add_executor_job(
115 self.
rachiorachio.summary.get_valve_day_views,
120 except Timeout
as err:
121 raise UpdateFailed(f
"Could not connect to the Rachio API: {err}")
from err
124 for event
in schedule[1][KEY_DAY_VIEWS]:
125 events.extend(event[KEY_PROGRAM_RUN_SUMMARIES])
126 return sorted(events, key=itemgetter(KEY_START_TIME))
list[dict[str, Any]] _async_update_data(self)
None __init__(self, HomeAssistant hass, Rachio rachio, ConfigEntry config_entry, base_station)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, Rachio rachio, ConfigEntry config_entry, base_station, int base_count)