1 """Helpers to help coordinate updates."""
3 from __future__
import annotations
5 from abc
import abstractmethod
7 from collections.abc
import Awaitable, Callable, Coroutine, Generator
8 from datetime
import datetime, timedelta
10 from random
import randint
11 from time
import monotonic
12 from typing
import Any, Generic, Protocol
16 from propcache
import cached_property
18 from typing_extensions
import TypeVar
20 from homeassistant
import config_entries
24 ConfigEntryAuthFailed,
31 from .
import entity, event
32 from .debounce
import Debouncer
33 from .frame
import report_usage
34 from .typing
import UNDEFINED, UndefinedType
36 REQUEST_REFRESH_DEFAULT_COOLDOWN = 10
37 REQUEST_REFRESH_DEFAULT_IMMEDIATE =
True
39 _DataT = TypeVar(
"_DataT", default=dict[str, Any])
40 _DataUpdateCoordinatorT = TypeVar(
41 "_DataUpdateCoordinatorT",
42 bound=
"DataUpdateCoordinator[Any]",
43 default=
"DataUpdateCoordinator[dict[str, Any]]",
48 """Raised when an update has failed."""
52 """Base protocol type for DataUpdateCoordinator."""
56 self, update_callback: CALLBACK_TYPE, context: Any =
None
57 ) -> Callable[[],
None]:
58 """Listen for data updates."""
62 """Class to manage fetching data from single endpoint.
64 Setting :attr:`always_update` to ``False`` will cause coordinator to only
65 callback listeners when data has changed. This requires that the data
66 implements ``__eq__`` or uses a python object that already does.
72 logger: logging.Logger,
76 update_interval: timedelta |
None =
None,
77 update_method: Callable[[], Awaitable[_DataT]] |
None =
None,
78 setup_method: Callable[[], Awaitable[
None]] |
None =
None,
79 request_refresh_debouncer: Debouncer[Coroutine[Any, Any,
None]] |
None =
None,
80 always_update: bool =
True,
82 """Initialize global data updater."""
91 if config_entry
is UNDEFINED:
104 self.
datadata: _DataT =
None
109 randint(event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX) / 10**6
112 self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object |
None]] = {}
115 self._request_refresh_task: asyncio.TimerHandle |
None =
None
119 if request_refresh_debouncer
is None:
123 cooldown=REQUEST_REFRESH_DEFAULT_COOLDOWN,
124 immediate=REQUEST_REFRESH_DEFAULT_IMMEDIATE,
128 request_refresh_debouncer.function = self.
async_refreshasync_refresh
136 """Register shutdown on HomeAssistant stop.
138 Should only be used by coordinators that are not linked to a config entry.
141 raise RuntimeError(
"This should only be used outside of config entries.")
143 async
def _on_hass_stop(_: Event) ->
None:
144 """Shutdown coordinator on HomeAssistant stop."""
148 EVENT_HOMEASSISTANT_STOP, _on_hass_stop
153 self, update_callback: CALLBACK_TYPE, context: Any =
None
154 ) -> Callable[[],
None]:
155 """Listen for data updates."""
156 schedule_refresh =
not self._listeners
159 def remove_listener() -> None:
160 """Remove update listener."""
161 self._listeners.pop(remove_listener)
162 if not self._listeners:
165 self._listeners[remove_listener] = (update_callback, context)
171 return remove_listener
175 """Update all registered listeners."""
176 for update_callback, _
in list(self._listeners.values()):
180 """Cancel any scheduled call, and ignore new runs."""
188 """Unschedule any pending refresh since there is no longer any listeners."""
193 """Return all registered contexts."""
195 context
for _, context
in self._listeners.values()
if context
is not None
199 """Cancel any scheduled call."""
205 """Cancel any scheduled call."""
212 """Interval between updates."""
215 @update_interval.setter
217 """Set interval between updates."""
223 """Schedule a refresh."""
249 """Handle a refresh interval occurrence."""
251 self.
config_entryconfig_entry.async_create_background_task(
254 name=f
"{self.name} - {self.config_entry.title} - refresh",
258 self.
hasshass.async_create_background_task(
260 name=f
"{self.name} - refresh",
265 """Handle a refresh interval occurrence."""
267 await self.
_async_refresh_async_refresh(log_failures=
True, scheduled=
True)
270 """Request a refresh.
272 Refresh will wait a bit to see if it can batch them.
277 """Fetch the latest data from the source."""
279 raise NotImplementedError(
"Update method not implemented")
283 """Refresh data for the first time when a config entry is setup.
285 Will automatically raise ConfigEntryNotReady if the refresh
286 fails. Additionally logging is handled by config entry setup
287 to ensure that multiple retries do not cause log spam.
291 "uses `async_config_entry_first_refresh`, which is only supported "
292 "for coordinators with a config entry",
293 breaks_in_ha_version=
"2025.11",
297 is not config_entries.ConfigEntryState.SETUP_IN_PROGRESS
300 "uses `async_config_entry_first_refresh`, which is only supported "
301 f
"when entry state is {config_entries.ConfigEntryState.SETUP_IN_PROGRESS}, "
302 f
"but it is in state {self.config_entry.state}",
303 breaks_in_ha_version=
"2025.11",
307 log_failures=
False, raise_on_auth_failed=
True, raise_on_entry_error=
True
316 """Error handling for _async_setup."""
321 requests.exceptions.Timeout,
323 requests.exceptions.RequestException,
324 urllib.error.URLError,
329 except (ConfigEntryError, ConfigEntryAuthFailed)
as err:
334 except Exception
as err:
336 self.
loggerlogger.exception(
"Unexpected error fetching %s data", self.
namename)
344 """Set up the coordinator.
346 Can be overwritten by integrations to load data or resources
347 only once during the first refresh.
354 """Refresh data and log errors."""
359 log_failures: bool =
True,
360 raise_on_auth_failed: bool =
False,
361 scheduled: bool =
False,
362 raise_on_entry_error: bool =
False,
371 if log_timing := self.
loggerlogger.isEnabledFor(logging.DEBUG):
376 previous_data = self.
datadata
381 except (TimeoutError, requests.exceptions.Timeout)
as err:
385 self.
loggerlogger.error(
"Timeout fetching %s data", self.
namename)
388 except (aiohttp.ClientError, requests.exceptions.RequestException)
as err:
392 self.
loggerlogger.error(
"Error requesting %s data: %s", self.
namename, err)
395 except urllib.error.URLError
as err:
399 if err.reason ==
"timed out":
400 self.
loggerlogger.error(
"Timeout fetching %s data", self.
namename)
403 "Error requesting %s data: %s", self.
namename, err
407 except UpdateFailed
as err:
411 self.
loggerlogger.error(
"Error fetching %s data: %s", self.
namename, err)
414 except ConfigEntryError
as err:
419 "Config entry setup failed while fetching %s data: %s",
424 if raise_on_entry_error:
427 except ConfigEntryAuthFailed
as err:
433 "Authentication failed while fetching %s data: %s",
438 if raise_on_auth_failed:
443 except NotImplementedError
as err:
447 except Exception
as err:
450 self.
loggerlogger.exception(
"Unexpected error fetching %s data", self.
namename)
455 self.
loggerlogger.info(
"Fetching %s data recovered", self.
namename)
460 "Finished fetching %s data in %.3f seconds (success: %s)",
465 if not auth_failed
and self._listeners
and not self.
hasshass.is_stopping:
476 or previous_data != self.
datadata
482 """Handle when a refresh has finished.
484 Called when refresh is finished before listeners are updated.
486 To be overridden by subclasses.
491 """Manually set an error, log the message and notify listeners."""
494 self.
loggerlogger.error(
"Error requesting %s data: %s", self.
namename, err)
500 """Manually update data, notify listeners and reset refresh interval."""
507 "Manually updated %s data",
518 """DataUpdateCoordinator which keeps track of the last successful update."""
520 last_update_success_time: datetime |
None =
None
524 """Handle when a refresh has finished."""
530 _BaseDataUpdateCoordinatorT: BaseDataUpdateCoordinatorProtocol
532 """Base class for all Coordinator entities."""
535 self, coordinator: _BaseDataUpdateCoordinatorT, context: Any =
None
537 """Create the entity with a DataUpdateCoordinator."""
538 self.coordinator = coordinator
539 self.coordinator_context = context
543 """No need to poll. Coordinator notifies entity of updates."""
547 """When entity is added to hass."""
549 self.async_on_remove(
551 self._handle_coordinator_update, self.coordinator_context
557 """Handle updated data from the coordinator."""
558 self.async_write_ha_state()
562 """Update the entity.
564 Only used by the generic entity update service.
568 class CoordinatorEntity(BaseCoordinatorEntity[_DataUpdateCoordinatorT]):
569 """A class for entities using DataUpdateCoordinator."""
572 self, coordinator: _DataUpdateCoordinatorT, context: Any =
None
574 """Create the entity with a DataUpdateCoordinator.
576 Passthrough to BaseCoordinatorEntity.
578 Necessary to bind TypeVar to correct scope.
580 super().
__init__(coordinator, context)
584 """Return if entity is available."""
585 return self.coordinator.last_update_success
588 """Update the entity.
590 Only used by the generic entity update service.
596 await self.coordinator.async_request_refresh()
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None __init__(self, _DataUpdateCoordinatorT coordinator, Any context=None)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)
None async_config_entry_first_refresh(self)
None __wrap_handle_refresh_interval(self)
bool __wrap_async_setup(self)
None _schedule_refresh(self)
None _async_refresh(self, bool log_failures=True, bool raise_on_auth_failed=False, bool scheduled=False, bool raise_on_entry_error=False)
None _async_unsub_shutdown(self)
_DataT _async_update_data(self)
None async_set_updated_data(self, _DataT data)
None async_set_update_error(self, Exception err)
None _unschedule_refresh(self)
None async_shutdown(self)
None _async_refresh_finished(self)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None async_register_shutdown(self)
None _handle_refresh_interval(self, datetime|None _now=None)
None __init__(self, HomeAssistant hass, logging.Logger logger, *config_entries.ConfigEntry|None|UndefinedType config_entry=UNDEFINED, str name, timedelta|None update_interval=None, Callable[[], Awaitable[_DataT]]|None update_method=None, Callable[[], Awaitable[None]]|None setup_method=None, Debouncer[Coroutine[Any, Any, None]]|None request_refresh_debouncer=None, bool always_update=True)
Generator[Any] async_contexts(self)
None async_update_listeners(self)
None async_request_refresh(self)
None _async_unsub_refresh(self)
None _async_refresh_finished(self)
None async_add_listener(HomeAssistant hass, Callable[[], None] listener)
None report_usage(str what, *str|None breaks_in_ha_version=None, ReportBehavior core_behavior=ReportBehavior.ERROR, ReportBehavior core_integration_behavior=ReportBehavior.LOG, ReportBehavior custom_integration_behavior=ReportBehavior.LOG, set[str]|None exclude_integrations=None, str|None integration_domain=None, int level=logging.WARNING)
None __init__(self, _BaseDataUpdateCoordinatorT coordinator, Any context=None)
None _handle_coordinator_update(self)
None async_added_to_hass(self)