Home Assistant Unofficial Reference 2024.12.1
update_coordinator.py
Go to the documentation of this file.
1 """Helpers to help coordinate updates."""
2 
3 from __future__ import annotations
4 
5 from abc import abstractmethod
6 import asyncio
7 from collections.abc import Awaitable, Callable, Coroutine, Generator
8 from datetime import datetime, timedelta
9 import logging
10 from random import randint
11 from time import monotonic
12 from typing import Any, Generic, Protocol
13 import urllib.error
14 
15 import aiohttp
16 from propcache import cached_property
17 import requests
18 from typing_extensions import TypeVar
19 
20 from homeassistant import config_entries
21 from homeassistant.const import EVENT_HOMEASSISTANT_STOP
22 from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
23 from homeassistant.exceptions import (
24  ConfigEntryAuthFailed,
25  ConfigEntryError,
26  ConfigEntryNotReady,
27  HomeAssistantError,
28 )
29 from homeassistant.util.dt import utcnow
30 
31 from . import entity, event
32 from .debounce import Debouncer
33 from .frame import report_usage
34 from .typing import UNDEFINED, UndefinedType
35 
36 REQUEST_REFRESH_DEFAULT_COOLDOWN = 10
37 REQUEST_REFRESH_DEFAULT_IMMEDIATE = True
38 
39 _DataT = TypeVar("_DataT", default=dict[str, Any])
40 _DataUpdateCoordinatorT = TypeVar(
41  "_DataUpdateCoordinatorT",
42  bound="DataUpdateCoordinator[Any]",
43  default="DataUpdateCoordinator[dict[str, Any]]",
44 )
45 
46 
48  """Raised when an update has failed."""
49 
50 
52  """Base protocol type for DataUpdateCoordinator."""
53 
54  @callback
56  self, update_callback: CALLBACK_TYPE, context: Any = None
57  ) -> Callable[[], None]:
58  """Listen for data updates."""
59 
60 
62  """Class to manage fetching data from single endpoint.
63 
64  Setting :attr:`always_update` to ``False`` will cause coordinator to only
65  callback listeners when data has changed. This requires that the data
66  implements ``__eq__`` or uses a python object that already does.
67  """
68 
69  def __init__(
70  self,
71  hass: HomeAssistant,
72  logger: logging.Logger,
73  *,
74  config_entry: config_entries.ConfigEntry | None | UndefinedType = UNDEFINED,
75  name: str,
76  update_interval: timedelta | None = None,
77  update_method: Callable[[], Awaitable[_DataT]] | None = None,
78  setup_method: Callable[[], Awaitable[None]] | None = None,
79  request_refresh_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None,
80  always_update: bool = True,
81  ) -> None:
82  """Initialize global data updater."""
83  self.hasshass = hass
84  self.loggerlogger = logger
85  self.namename = name
86  self.update_methodupdate_method = update_method
87  self.setup_methodsetup_method = setup_method
88  self._update_interval_seconds_update_interval_seconds: float | None = None
89  self.update_intervalupdate_intervalupdate_intervalupdate_interval = update_interval
90  self._shutdown_requested_shutdown_requested = False
91  if config_entry is UNDEFINED:
92  self.config_entryconfig_entry = config_entries.current_entry.get()
93  # This should be deprecated once all core integrations are updated
94  # to pass in the config entry explicitly.
95  else:
96  self.config_entryconfig_entry = config_entry
97  self.always_updatealways_update = always_update
98 
99  # It's None before the first successful update.
100  # Components should call async_config_entry_first_refresh
101  # to make sure the first update was successful.
102  # Set type to just T to remove annoying checks that data is not None
103  # when it was already checked during setup.
104  self.datadata: _DataT = None # type: ignore[assignment]
105 
106  # Pick a random microsecond in range 0.05..0.50 to stagger the refreshes
107  # and avoid a thundering herd.
108  self._microsecond_microsecond = (
109  randint(event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX) / 10**6
110  )
111 
112  self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
113  self._unsub_refresh_unsub_refresh: CALLBACK_TYPE | None = None
114  self._unsub_shutdown_unsub_shutdown: CALLBACK_TYPE | None = None
115  self._request_refresh_task: asyncio.TimerHandle | None = None
116  self.last_update_successlast_update_success = True
117  self.last_exceptionlast_exception: Exception | None = None
118 
119  if request_refresh_debouncer is None:
120  request_refresh_debouncer = Debouncer(
121  hass,
122  logger,
123  cooldown=REQUEST_REFRESH_DEFAULT_COOLDOWN,
124  immediate=REQUEST_REFRESH_DEFAULT_IMMEDIATE,
125  function=self.async_refreshasync_refresh,
126  )
127  else:
128  request_refresh_debouncer.function = self.async_refreshasync_refresh
129 
130  self._debounced_refresh_debounced_refresh = request_refresh_debouncer
131 
132  if self.config_entryconfig_entry:
133  self.config_entryconfig_entry.async_on_unload(self.async_shutdownasync_shutdown)
134 
135  async def async_register_shutdown(self) -> None:
136  """Register shutdown on HomeAssistant stop.
137 
138  Should only be used by coordinators that are not linked to a config entry.
139  """
140  if self.config_entryconfig_entry:
141  raise RuntimeError("This should only be used outside of config entries.")
142 
143  async def _on_hass_stop(_: Event) -> None:
144  """Shutdown coordinator on HomeAssistant stop."""
145  await self.async_shutdownasync_shutdown()
146 
147  self._unsub_shutdown_unsub_shutdown = self.hasshass.bus.async_listen_once(
148  EVENT_HOMEASSISTANT_STOP, _on_hass_stop
149  )
150 
151  @callback
153  self, update_callback: CALLBACK_TYPE, context: Any = None
154  ) -> Callable[[], None]:
155  """Listen for data updates."""
156  schedule_refresh = not self._listeners
157 
158  @callback
159  def remove_listener() -> None:
160  """Remove update listener."""
161  self._listeners.pop(remove_listener)
162  if not self._listeners:
163  self._unschedule_refresh_unschedule_refresh()
164 
165  self._listeners[remove_listener] = (update_callback, context)
166 
167  # This is the first listener, set up interval.
168  if schedule_refresh:
169  self._schedule_refresh_schedule_refresh()
170 
171  return remove_listener
172 
173  @callback
174  def async_update_listeners(self) -> None:
175  """Update all registered listeners."""
176  for update_callback, _ in list(self._listeners.values()):
177  update_callback()
178 
179  async def async_shutdown(self) -> None:
180  """Cancel any scheduled call, and ignore new runs."""
181  self._shutdown_requested_shutdown_requested = True
182  self._async_unsub_refresh_async_unsub_refresh()
183  self._async_unsub_shutdown_async_unsub_shutdown()
184  self._debounced_refresh_debounced_refresh.async_shutdown()
185 
186  @callback
187  def _unschedule_refresh(self) -> None:
188  """Unschedule any pending refresh since there is no longer any listeners."""
189  self._async_unsub_refresh_async_unsub_refresh()
190  self._debounced_refresh_debounced_refresh.async_cancel()
191 
192  def async_contexts(self) -> Generator[Any]:
193  """Return all registered contexts."""
194  yield from (
195  context for _, context in self._listeners.values() if context is not None
196  )
197 
198  def _async_unsub_refresh(self) -> None:
199  """Cancel any scheduled call."""
200  if self._unsub_refresh_unsub_refresh:
201  self._unsub_refresh_unsub_refresh()
202  self._unsub_refresh_unsub_refresh = None
203 
204  def _async_unsub_shutdown(self) -> None:
205  """Cancel any scheduled call."""
206  if self._unsub_shutdown_unsub_shutdown:
207  self._unsub_shutdown_unsub_shutdown()
208  self._unsub_shutdown_unsub_shutdown = None
209 
210  @property
211  def update_interval(self) -> timedelta | None:
212  """Interval between updates."""
213  return self._update_interval_update_interval
214 
215  @update_interval.setter
216  def update_interval(self, value: timedelta | None) -> None:
217  """Set interval between updates."""
218  self._update_interval_update_interval = value
219  self._update_interval_seconds_update_interval_seconds = value.total_seconds() if value else None
220 
221  @callback
222  def _schedule_refresh(self) -> None:
223  """Schedule a refresh."""
224  if self._update_interval_seconds_update_interval_seconds is None:
225  return
226 
227  if self.config_entryconfig_entry and self.config_entryconfig_entry.pref_disable_polling:
228  return
229 
230  # We do not cancel the debouncer here. If the refresh interval is shorter
231  # than the debouncer cooldown, this would cause the debounce to never be called
232  self._async_unsub_refresh_async_unsub_refresh()
233 
234  # We use loop.call_at because DataUpdateCoordinator does
235  # not need an exact update interval which also avoids
236  # calling dt_util.utcnow() on every update.
237  hass = self.hasshass
238  loop = hass.loop
239 
240  next_refresh = (
241  int(loop.time()) + self._microsecond_microsecond + self._update_interval_seconds_update_interval_seconds
242  )
243  self._unsub_refresh_unsub_refresh = loop.call_at(
244  next_refresh, self.__wrap_handle_refresh_interval__wrap_handle_refresh_interval
245  ).cancel
246 
247  @callback
249  """Handle a refresh interval occurrence."""
250  if self.config_entryconfig_entry:
251  self.config_entryconfig_entry.async_create_background_task(
252  self.hasshass,
253  self._handle_refresh_interval_handle_refresh_interval(),
254  name=f"{self.name} - {self.config_entry.title} - refresh",
255  eager_start=True,
256  )
257  else:
258  self.hasshass.async_create_background_task(
259  self._handle_refresh_interval_handle_refresh_interval(),
260  name=f"{self.name} - refresh",
261  eager_start=True,
262  )
263 
264  async def _handle_refresh_interval(self, _now: datetime | None = None) -> None:
265  """Handle a refresh interval occurrence."""
266  self._unsub_refresh_unsub_refresh = None
267  await self._async_refresh_async_refresh(log_failures=True, scheduled=True)
268 
269  async def async_request_refresh(self) -> None:
270  """Request a refresh.
271 
272  Refresh will wait a bit to see if it can batch them.
273  """
274  await self._debounced_refresh_debounced_refresh.async_call()
275 
276  async def _async_update_data(self) -> _DataT:
277  """Fetch the latest data from the source."""
278  if self.update_methodupdate_method is None:
279  raise NotImplementedError("Update method not implemented")
280  return await self.update_methodupdate_method()
281 
282  async def async_config_entry_first_refresh(self) -> None:
283  """Refresh data for the first time when a config entry is setup.
284 
285  Will automatically raise ConfigEntryNotReady if the refresh
286  fails. Additionally logging is handled by config entry setup
287  to ensure that multiple retries do not cause log spam.
288  """
289  if self.config_entryconfig_entry is None:
290  report_usage(
291  "uses `async_config_entry_first_refresh`, which is only supported "
292  "for coordinators with a config entry",
293  breaks_in_ha_version="2025.11",
294  )
295  elif (
296  self.config_entryconfig_entry.state
297  is not config_entries.ConfigEntryState.SETUP_IN_PROGRESS
298  ):
299  report_usage(
300  "uses `async_config_entry_first_refresh`, which is only supported "
301  f"when entry state is {config_entries.ConfigEntryState.SETUP_IN_PROGRESS}, "
302  f"but it is in state {self.config_entry.state}",
303  breaks_in_ha_version="2025.11",
304  )
305  if await self.__wrap_async_setup__wrap_async_setup():
306  await self._async_refresh_async_refresh(
307  log_failures=False, raise_on_auth_failed=True, raise_on_entry_error=True
308  )
309  if self.last_update_successlast_update_success:
310  return
311  ex = ConfigEntryNotReady()
312  ex.__cause__ = self.last_exceptionlast_exception
313  raise ex
314 
315  async def __wrap_async_setup(self) -> bool:
316  """Error handling for _async_setup."""
317  try:
318  await self._async_setup_async_setup()
319  except (
320  TimeoutError,
321  requests.exceptions.Timeout,
322  aiohttp.ClientError,
323  requests.exceptions.RequestException,
324  urllib.error.URLError,
325  UpdateFailed,
326  ) as err:
327  self.last_exceptionlast_exception = err
328 
329  except (ConfigEntryError, ConfigEntryAuthFailed) as err:
330  self.last_exceptionlast_exception = err
331  self.last_update_successlast_update_success = False
332  raise
333 
334  except Exception as err: # pylint: disable=broad-except
335  self.last_exceptionlast_exception = err
336  self.loggerlogger.exception("Unexpected error fetching %s data", self.namename)
337  else:
338  return True
339 
340  self.last_update_successlast_update_success = False
341  return False
342 
343  async def _async_setup(self) -> None:
344  """Set up the coordinator.
345 
346  Can be overwritten by integrations to load data or resources
347  only once during the first refresh.
348  """
349  if self.setup_methodsetup_method is None:
350  return None
351  return await self.setup_methodsetup_method()
352 
353  async def async_refresh(self) -> None:
354  """Refresh data and log errors."""
355  await self._async_refresh_async_refresh(log_failures=True)
356 
357  async def _async_refresh( # noqa: C901
358  self,
359  log_failures: bool = True,
360  raise_on_auth_failed: bool = False,
361  scheduled: bool = False,
362  raise_on_entry_error: bool = False,
363  ) -> None:
364  """Refresh data."""
365  self._async_unsub_refresh_async_unsub_refresh()
366  self._debounced_refresh_debounced_refresh.async_cancel()
367 
368  if self._shutdown_requested_shutdown_requested or scheduled and self.hasshass.is_stopping:
369  return
370 
371  if log_timing := self.loggerlogger.isEnabledFor(logging.DEBUG):
372  start = monotonic()
373 
374  auth_failed = False
375  previous_update_success = self.last_update_successlast_update_success
376  previous_data = self.datadata
377 
378  try:
379  self.datadata = await self._async_update_data_async_update_data()
380 
381  except (TimeoutError, requests.exceptions.Timeout) as err:
382  self.last_exceptionlast_exception = err
383  if self.last_update_successlast_update_success:
384  if log_failures:
385  self.loggerlogger.error("Timeout fetching %s data", self.namename)
386  self.last_update_successlast_update_success = False
387 
388  except (aiohttp.ClientError, requests.exceptions.RequestException) as err:
389  self.last_exceptionlast_exception = err
390  if self.last_update_successlast_update_success:
391  if log_failures:
392  self.loggerlogger.error("Error requesting %s data: %s", self.namename, err)
393  self.last_update_successlast_update_success = False
394 
395  except urllib.error.URLError as err:
396  self.last_exceptionlast_exception = err
397  if self.last_update_successlast_update_success:
398  if log_failures:
399  if err.reason == "timed out":
400  self.loggerlogger.error("Timeout fetching %s data", self.namename)
401  else:
402  self.loggerlogger.error(
403  "Error requesting %s data: %s", self.namename, err
404  )
405  self.last_update_successlast_update_success = False
406 
407  except UpdateFailed as err:
408  self.last_exceptionlast_exception = err
409  if self.last_update_successlast_update_success:
410  if log_failures:
411  self.loggerlogger.error("Error fetching %s data: %s", self.namename, err)
412  self.last_update_successlast_update_success = False
413 
414  except ConfigEntryError as err:
415  self.last_exceptionlast_exception = err
416  if self.last_update_successlast_update_success:
417  if log_failures:
418  self.loggerlogger.error(
419  "Config entry setup failed while fetching %s data: %s",
420  self.namename,
421  err,
422  )
423  self.last_update_successlast_update_success = False
424  if raise_on_entry_error:
425  raise
426 
427  except ConfigEntryAuthFailed as err:
428  auth_failed = True
429  self.last_exceptionlast_exception = err
430  if self.last_update_successlast_update_success:
431  if log_failures:
432  self.loggerlogger.error(
433  "Authentication failed while fetching %s data: %s",
434  self.namename,
435  err,
436  )
437  self.last_update_successlast_update_success = False
438  if raise_on_auth_failed:
439  raise
440 
441  if self.config_entryconfig_entry:
442  self.config_entryconfig_entry.async_start_reauth(self.hasshass)
443  except NotImplementedError as err:
444  self.last_exceptionlast_exception = err
445  raise
446 
447  except Exception as err:
448  self.last_exceptionlast_exception = err
449  self.last_update_successlast_update_success = False
450  self.loggerlogger.exception("Unexpected error fetching %s data", self.namename)
451 
452  else:
453  if not self.last_update_successlast_update_success:
454  self.last_update_successlast_update_success = True
455  self.loggerlogger.info("Fetching %s data recovered", self.namename)
456 
457  finally:
458  if log_timing:
459  self.loggerlogger.debug(
460  "Finished fetching %s data in %.3f seconds (success: %s)",
461  self.namename,
462  monotonic() - start, # pylint: disable=possibly-used-before-assignment
463  self.last_update_successlast_update_success,
464  )
465  if not auth_failed and self._listeners and not self.hasshass.is_stopping:
466  self._schedule_refresh_schedule_refresh()
467 
468  self._async_refresh_finished_async_refresh_finished()
469 
470  if not self.last_update_successlast_update_success and not previous_update_success:
471  return
472 
473  if (
474  self.always_updatealways_update
475  or self.last_update_successlast_update_success != previous_update_success
476  or previous_data != self.datadata
477  ):
478  self.async_update_listenersasync_update_listeners()
479 
480  @callback
481  def _async_refresh_finished(self) -> None:
482  """Handle when a refresh has finished.
483 
484  Called when refresh is finished before listeners are updated.
485 
486  To be overridden by subclasses.
487  """
488 
489  @callback
490  def async_set_update_error(self, err: Exception) -> None:
491  """Manually set an error, log the message and notify listeners."""
492  self.last_exceptionlast_exception = err
493  if self.last_update_successlast_update_success:
494  self.loggerlogger.error("Error requesting %s data: %s", self.namename, err)
495  self.last_update_successlast_update_success = False
496  self.async_update_listenersasync_update_listeners()
497 
498  @callback
499  def async_set_updated_data(self, data: _DataT) -> None:
500  """Manually update data, notify listeners and reset refresh interval."""
501  self._async_unsub_refresh_async_unsub_refresh()
502  self._debounced_refresh_debounced_refresh.async_cancel()
503 
504  self.datadata = data
505  self.last_update_successlast_update_success = True
506  self.loggerlogger.debug(
507  "Manually updated %s data",
508  self.namename,
509  )
510 
511  if self._listeners:
512  self._schedule_refresh_schedule_refresh()
513 
514  self.async_update_listenersasync_update_listeners()
515 
516 
518  """DataUpdateCoordinator which keeps track of the last successful update."""
519 
520  last_update_success_time: datetime | None = None
521 
522  @callback
523  def _async_refresh_finished(self) -> None:
524  """Handle when a refresh has finished."""
525  if self.last_update_successlast_update_success:
526  self.last_update_success_timelast_update_success_time = utcnow()
527 
528 
530  _BaseDataUpdateCoordinatorT: BaseDataUpdateCoordinatorProtocol
531 ](entity.Entity):
532  """Base class for all Coordinator entities."""
533 
534  def __init__(
535  self, coordinator: _BaseDataUpdateCoordinatorT, context: Any = None
536  ) -> None:
537  """Create the entity with a DataUpdateCoordinator."""
538  self.coordinator = coordinator
539  self.coordinator_context = context
540 
541  @cached_property
542  def should_poll(self) -> bool:
543  """No need to poll. Coordinator notifies entity of updates."""
544  return False
545 
546  async def async_added_to_hass(self) -> None:
547  """When entity is added to hass."""
548  await super().async_added_to_hass()
549  self.async_on_remove(
550  self.coordinator.async_add_listener(
551  self._handle_coordinator_update, self.coordinator_context
552  )
553  )
554 
555  @callback
556  def _handle_coordinator_update(self) -> None:
557  """Handle updated data from the coordinator."""
558  self.async_write_ha_state()
559 
560  @abstractmethod
561  async def async_update(self) -> None:
562  """Update the entity.
563 
564  Only used by the generic entity update service.
565  """
566 
567 
568 class CoordinatorEntity(BaseCoordinatorEntity[_DataUpdateCoordinatorT]):
569  """A class for entities using DataUpdateCoordinator."""
570 
571  def __init__(
572  self, coordinator: _DataUpdateCoordinatorT, context: Any = None
573  ) -> None:
574  """Create the entity with a DataUpdateCoordinator.
575 
576  Passthrough to BaseCoordinatorEntity.
577 
578  Necessary to bind TypeVar to correct scope.
579  """
580  super().__init__(coordinator, context)
581 
582  @property
583  def available(self) -> bool:
584  """Return if entity is available."""
585  return self.coordinator.last_update_success
586 
587  async def async_update(self) -> None:
588  """Update the entity.
589 
590  Only used by the generic entity update service.
591  """
592  # Ignore manual update requests if the entity is disabled
593  if not self.enabled:
594  return
595 
596  await self.coordinator.async_request_refresh()
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None __init__(self, _DataUpdateCoordinatorT coordinator, Any context=None)
None _async_refresh(self, bool log_failures=True, bool raise_on_auth_failed=False, bool scheduled=False, bool raise_on_entry_error=False)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None __init__(self, HomeAssistant hass, logging.Logger logger, *config_entries.ConfigEntry|None|UndefinedType config_entry=UNDEFINED, str name, timedelta|None update_interval=None, Callable[[], Awaitable[_DataT]]|None update_method=None, Callable[[], Awaitable[None]]|None setup_method=None, Debouncer[Coroutine[Any, Any, None]]|None request_refresh_debouncer=None, bool always_update=True)
None async_add_listener(HomeAssistant hass, Callable[[], None] listener)
Definition: __init__.py:82
None report_usage(str what, *str|None breaks_in_ha_version=None, ReportBehavior core_behavior=ReportBehavior.ERROR, ReportBehavior core_integration_behavior=ReportBehavior.LOG, ReportBehavior custom_integration_behavior=ReportBehavior.LOG, set[str]|None exclude_integrations=None, str|None integration_domain=None, int level=logging.WARNING)
Definition: frame.py:195
None __init__(self, _BaseDataUpdateCoordinatorT coordinator, Any context=None)