Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Base coordinator."""
2 
3 from asyncio import Semaphore
4 from collections.abc import Awaitable, Callable
5 from datetime import timedelta
6 from logging import Logger
7 
8 from homeassistant.config_entries import ConfigEntry
9 from homeassistant.core import HomeAssistant
10 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
11 
12 
13 class DevoloDataUpdateCoordinator[_DataT](DataUpdateCoordinator[_DataT]):
14  """Class to manage fetching data from devolo Home Network devices."""
15 
16  def __init__(
17  self,
18  hass: HomeAssistant,
19  logger: Logger,
20  *,
21  config_entry: ConfigEntry,
22  name: str,
23  semaphore: Semaphore,
24  update_interval: timedelta,
25  update_method: Callable[[], Awaitable[_DataT]],
26  ) -> None:
27  """Initialize global data updater."""
28  super().__init__(
29  hass,
30  logger,
31  config_entry=config_entry,
32  name=name,
33  update_interval=update_interval,
34  update_method=update_method,
35  )
36  self._semaphore_semaphore = semaphore
37 
38  async def _async_update_data(self) -> _DataT:
39  """Fetch the latest data from the source."""
40  async with self._semaphore_semaphore:
41  return await super()._async_update_data()
None __init__(self, HomeAssistant hass, Logger logger, *ConfigEntry config_entry, str name, Semaphore semaphore, timedelta update_interval, Callable[[], Awaitable[_DataT]] update_method)
Definition: coordinator.py:26