Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for FYTA integration."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from datetime import datetime, timedelta
7 import logging
8 from typing import TYPE_CHECKING
9 
10 from fyta_cli.fyta_connector import FytaConnector
11 from fyta_cli.fyta_exceptions import (
12  FytaAuthentificationError,
13  FytaConnectionError,
14  FytaPasswordError,
15  FytaPlantError,
16 )
17 from fyta_cli.fyta_models import Plant
18 
19 from homeassistant.const import CONF_ACCESS_TOKEN
20 from homeassistant.core import HomeAssistant
21 from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
23 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
24 
25 from .const import CONF_EXPIRATION, DOMAIN
26 
27 if TYPE_CHECKING:
28  from . import FytaConfigEntry
29 
30 _LOGGER = logging.getLogger(__name__)
31 
32 
33 class FytaCoordinator(DataUpdateCoordinator[dict[int, Plant]]):
34  """Fyta custom coordinator."""
35 
36  config_entry: FytaConfigEntry
37 
38  def __init__(self, hass: HomeAssistant, fyta: FytaConnector) -> None:
39  """Initialize my coordinator."""
40  super().__init__(
41  hass,
42  _LOGGER,
43  name="FYTA Coordinator",
44  update_interval=timedelta(minutes=4),
45  )
46  self.fytafyta = fyta
47  self._plants_last_update_plants_last_update: set[int] = set()
48  self.new_device_callbacks: list[Callable[[int], None]] = []
49 
50  async def _async_update_data(
51  self,
52  ) -> dict[int, Plant]:
53  """Fetch data from API endpoint."""
54 
55  if (
56  self.fytafyta.expiration is None
57  or self.fytafyta.expiration.timestamp() < datetime.now().timestamp()
58  ):
59  await self.renew_authenticationrenew_authentication()
60 
61  try:
62  data = await self.fytafyta.update_all_plants()
63  except (FytaConnectionError, FytaPlantError) as err:
64  raise UpdateFailed(
65  translation_domain=DOMAIN, translation_key="update_error"
66  ) from err
67  _LOGGER.debug("Data successfully updated")
68 
69  # data must be assigned before _async_add_remove_devices, as it is uses to set-up possible new devices
70  self.datadatadata = data
71  self._async_add_remove_devices_async_add_remove_devices()
72 
73  return data
74 
75  def _async_add_remove_devices(self) -> None:
76  """Add new devices, remove non-existing devices."""
77  if not self._plants_last_update_plants_last_update:
78  self._plants_last_update_plants_last_update = set(self.fytafyta.plant_list.keys())
79 
80  if (
81  current_plants := set(self.fytafyta.plant_list.keys())
82  ) == self._plants_last_update_plants_last_update:
83  return
84 
85  _LOGGER.debug(
86  "Check for new and removed plant(s): old plants: %s; new plants: %s",
87  ", ".join(map(str, self._plants_last_update_plants_last_update)),
88  ", ".join(map(str, current_plants)),
89  )
90 
91  # remove old plants
92  if removed_plants := self._plants_last_update_plants_last_update - current_plants:
93  _LOGGER.debug("Removed plant(s): %s", ", ".join(map(str, removed_plants)))
94 
95  device_registry = dr.async_get(self.hasshass)
96  for plant_id in removed_plants:
97  if device := device_registry.async_get_device(
98  identifiers={
99  (
100  DOMAIN,
101  f"{self.config_entry.entry_id}-{plant_id}",
102  )
103  }
104  ):
105  device_registry.async_update_device(
106  device_id=device.id,
107  remove_config_entry_id=self.config_entryconfig_entry.entry_id,
108  )
109  _LOGGER.debug("Device removed from device registry: %s", device.id)
110 
111  # add new devices
112  if new_plants := current_plants - self._plants_last_update_plants_last_update:
113  _LOGGER.debug("New plant(s) found: %s", ", ".join(map(str, new_plants)))
114  for plant_id in new_plants:
115  for callback in self.new_device_callbacks:
116  callback(plant_id)
117  _LOGGER.debug("Device added: %s", plant_id)
118 
119  self._plants_last_update_plants_last_update = current_plants
120 
121  async def renew_authentication(self) -> bool:
122  """Renew access token for FYTA API."""
123 
124  try:
125  credentials = await self.fytafyta.login()
126  except FytaConnectionError as ex:
127  raise ConfigEntryNotReady(
128  translation_domain=DOMAIN, translation_key="config_entry_not_ready"
129  ) from ex
130  except (FytaAuthentificationError, FytaPasswordError) as ex:
131  raise ConfigEntryAuthFailed(
132  translation_domain=DOMAIN,
133  translation_key="auth_failed",
134  ) from ex
135 
136  new_config_entry = {**self.config_entryconfig_entry.data}
137  new_config_entry[CONF_ACCESS_TOKEN] = credentials.access_token
138  new_config_entry[CONF_EXPIRATION] = credentials.expiration.isoformat()
139 
140  self.hasshass.config_entries.async_update_entry(
141  self.config_entryconfig_entry, data=new_config_entry
142  )
143 
144  _LOGGER.debug("Credentials successfully updated")
145 
146  return True
None __init__(self, HomeAssistant hass, FytaConnector fyta)
Definition: coordinator.py:38