Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator object for the Rachio integration."""
2 
3 from datetime import datetime, timedelta
4 import logging
5 from operator import itemgetter
6 from typing import Any
7 
8 from rachiopy import Rachio
9 from requests.exceptions import Timeout
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.core import HomeAssistant
13 from homeassistant.helpers.debounce import Debouncer
14 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
15 from homeassistant.util import dt as dt_util
16 
17 from .const import (
18  DOMAIN,
19  KEY_DAY_VIEWS,
20  KEY_ID,
21  KEY_PROGRAM_RUN_SUMMARIES,
22  KEY_START_TIME,
23  KEY_VALVES,
24 )
25 
26 _LOGGER = logging.getLogger(__name__)
27 
28 DAY = "day"
29 MONTH = "month"
30 YEAR = "year"
31 
32 UPDATE_DELAY_TIME = 8
33 
34 
36  """Coordinator Class for Rachio Hose Timers."""
37 
38  def __init__(
39  self,
40  hass: HomeAssistant,
41  rachio: Rachio,
42  config_entry: ConfigEntry,
43  base_station,
44  base_count: int,
45  ) -> None:
46  """Initialize the Rachio Update Coordinator."""
47  self.hasshasshass = hass
48  self.rachiorachio = rachio
49  self.base_stationbase_station = base_station
50  super().__init__(
51  hass,
52  _LOGGER,
53  config_entry=config_entry,
54  name=f"{DOMAIN} update coordinator",
55  # To avoid exceeding the rate limit, increase polling interval for
56  # each additional base station on the account
57  update_interval=timedelta(minutes=(base_count + 1)),
58  # Debouncer used because the API takes a bit to update state changes
59  request_refresh_debouncer=Debouncer(
60  hass, _LOGGER, cooldown=UPDATE_DELAY_TIME, immediate=False
61  ),
62  )
63 
64  async def _async_update_data(self) -> dict[str, Any]:
65  """Update smart hose timer data."""
66  try:
67  data = await self.hasshasshass.async_add_executor_job(
68  self.rachiorachio.valve.list_valves, self.base_stationbase_station[KEY_ID]
69  )
70  except Timeout as err:
71  raise UpdateFailed(f"Could not connect to the Rachio API: {err}") from err
72  return {valve[KEY_ID]: valve for valve in data[1][KEY_VALVES]}
73 
74 
76  """Coordinator for fetching hose timer schedules."""
77 
78  def __init__(
79  self,
80  hass: HomeAssistant,
81  rachio: Rachio,
82  config_entry: ConfigEntry,
83  base_station,
84  ) -> None:
85  """Initialize a Rachio schedule coordinator."""
86  self.hasshasshass = hass
87  self.rachiorachio = rachio
88  self.base_stationbase_station = base_station
89  super().__init__(
90  hass,
91  _LOGGER,
92  config_entry=config_entry,
93  name=f"{DOMAIN} schedule update coordinator",
94  update_interval=timedelta(minutes=30),
95  )
96 
97  async def _async_update_data(self) -> list[dict[str, Any]]:
98  """Retrieve data for the past week and the next 60 days."""
99  _now: datetime = dt_util.now()
100  _time_start = _now - timedelta(days=7)
101  _time_end = _now + timedelta(days=60)
102  start: dict[str, int] = {
103  YEAR: _time_start.year,
104  MONTH: _time_start.month,
105  DAY: _time_start.day,
106  }
107  end: dict[str, int] = {
108  YEAR: _time_end.year,
109  MONTH: _time_end.month,
110  DAY: _time_end.day,
111  }
112 
113  try:
114  schedule = await self.hasshasshass.async_add_executor_job(
115  self.rachiorachio.summary.get_valve_day_views,
116  self.base_stationbase_station[KEY_ID],
117  start,
118  end,
119  )
120  except Timeout as err:
121  raise UpdateFailed(f"Could not connect to the Rachio API: {err}") from err
122  events = []
123  # Flatten and sort dates
124  for event in schedule[1][KEY_DAY_VIEWS]:
125  events.extend(event[KEY_PROGRAM_RUN_SUMMARIES])
126  return sorted(events, key=itemgetter(KEY_START_TIME))
None __init__(self, HomeAssistant hass, Rachio rachio, ConfigEntry config_entry, base_station)
Definition: coordinator.py:84
None __init__(self, HomeAssistant hass, Rachio rachio, ConfigEntry config_entry, base_station, int base_count)
Definition: coordinator.py:45