Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Flux LED/MagicLight integration coordinator."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 from typing import Final
8 
9 from flux_led.aio import AIOWifiLedBulb
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.core import HomeAssistant
13 from homeassistant.helpers.debounce import Debouncer
14 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
15 
16 from .const import FLUX_LED_EXCEPTIONS
17 
18 _LOGGER = logging.getLogger(__name__)
19 
20 
21 REQUEST_REFRESH_DELAY: Final = 2.0
22 
23 
25  """DataUpdateCoordinator to gather data for a specific flux_led device."""
26 
27  def __init__(
28  self, hass: HomeAssistant, device: AIOWifiLedBulb, entry: ConfigEntry
29  ) -> None:
30  """Initialize DataUpdateCoordinator to gather data for specific device."""
31  self.devicedevice = device
32  self.titletitle = entry.title
33  self.entryentry = entry
34  self.force_next_updateforce_next_update = False
35  super().__init__(
36  hass,
37  _LOGGER,
38  name=self.devicedevice.ipaddr,
39  update_interval=timedelta(seconds=10),
40  # We don't want an immediate refresh since the device
41  # takes a moment to reflect the state change
42  request_refresh_debouncer=Debouncer(
43  hass, _LOGGER, cooldown=REQUEST_REFRESH_DELAY, immediate=False
44  ),
45  always_update=False,
46  )
47 
48  async def _async_update_data(self) -> None:
49  """Fetch all device and sensor data from api."""
50  try:
51  await self.devicedevice.async_update(force=self.force_next_updateforce_next_update)
52  except FLUX_LED_EXCEPTIONS as ex:
53  raise UpdateFailed(ex) from ex
54  finally:
55  self.force_next_updateforce_next_update = False
None __init__(self, HomeAssistant hass, AIOWifiLedBulb device, ConfigEntry entry)
Definition: coordinator.py:29