1 """Coordinator for FYTA integration."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import datetime, timedelta
8 from typing
import TYPE_CHECKING
10 from fyta_cli.fyta_connector
import FytaConnector
11 from fyta_cli.fyta_exceptions
import (
12 FytaAuthentificationError,
17 from fyta_cli.fyta_models
import Plant
25 from .const
import CONF_EXPIRATION, DOMAIN
28 from .
import FytaConfigEntry
30 _LOGGER = logging.getLogger(__name__)
34 """Fyta custom coordinator."""
36 config_entry: FytaConfigEntry
38 def __init__(self, hass: HomeAssistant, fyta: FytaConnector) ->
None:
39 """Initialize my coordinator."""
43 name=
"FYTA Coordinator",
48 self.new_device_callbacks: list[Callable[[int],
None]] = []
52 ) -> dict[int, Plant]:
53 """Fetch data from API endpoint."""
56 self.
fytafyta.expiration
is None
57 or self.
fytafyta.expiration.timestamp() < datetime.now().timestamp()
62 data = await self.
fytafyta.update_all_plants()
63 except (FytaConnectionError, FytaPlantError)
as err:
65 translation_domain=DOMAIN, translation_key=
"update_error"
67 _LOGGER.debug(
"Data successfully updated")
76 """Add new devices, remove non-existing devices."""
81 current_plants := set(self.
fytafyta.plant_list.keys())
86 "Check for new and removed plant(s): old plants: %s; new plants: %s",
88 ", ".join(map(str, current_plants)),
93 _LOGGER.debug(
"Removed plant(s): %s",
", ".join(map(str, removed_plants)))
95 device_registry = dr.async_get(self.
hasshass)
96 for plant_id
in removed_plants:
97 if device := device_registry.async_get_device(
101 f
"{self.config_entry.entry_id}-{plant_id}",
105 device_registry.async_update_device(
107 remove_config_entry_id=self.
config_entryconfig_entry.entry_id,
109 _LOGGER.debug(
"Device removed from device registry: %s", device.id)
113 _LOGGER.debug(
"New plant(s) found: %s",
", ".join(map(str, new_plants)))
114 for plant_id
in new_plants:
115 for callback
in self.new_device_callbacks:
117 _LOGGER.debug(
"Device added: %s", plant_id)
122 """Renew access token for FYTA API."""
125 credentials = await self.
fytafyta.login()
126 except FytaConnectionError
as ex:
128 translation_domain=DOMAIN, translation_key=
"config_entry_not_ready"
130 except (FytaAuthentificationError, FytaPasswordError)
as ex:
132 translation_domain=DOMAIN,
133 translation_key=
"auth_failed",
136 new_config_entry = {**self.
config_entryconfig_entry.data}
137 new_config_entry[CONF_ACCESS_TOKEN] = credentials.access_token
138 new_config_entry[CONF_EXPIRATION] = credentials.expiration.isoformat()
140 self.
hasshass.config_entries.async_update_entry(
144 _LOGGER.debug(
"Credentials successfully updated")
dict[int, Plant] _async_update_data(self)
None _async_add_remove_devices(self)
bool renew_authentication(self)
None __init__(self, HomeAssistant hass, FytaConnector fyta)