Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The enphase_envoy component."""
2 
3 from __future__ import annotations
4 
5 import contextlib
6 import datetime
7 from datetime import timedelta
8 import logging
9 from typing import Any
10 
11 from pyenphase import Envoy, EnvoyError, EnvoyTokenAuth
12 
13 from homeassistant.config_entries import ConfigEntry
14 from homeassistant.const import CONF_NAME, CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME
15 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
16 from homeassistant.exceptions import ConfigEntryAuthFailed
17 from homeassistant.helpers.event import async_track_time_interval
18 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
19 import homeassistant.util.dt as dt_util
20 
21 from .const import INVALID_AUTH_ERRORS
22 
23 SCAN_INTERVAL = timedelta(seconds=60)
24 
25 TOKEN_REFRESH_CHECK_INTERVAL = timedelta(days=1)
26 STALE_TOKEN_THRESHOLD = timedelta(days=30).total_seconds()
27 NOTIFICATION_ID = "enphase_envoy_notification"
28 
29 _LOGGER = logging.getLogger(__name__)
30 
31 
32 type EnphaseConfigEntry = ConfigEntry[EnphaseUpdateCoordinator]
33 
34 
36  """DataUpdateCoordinator to gather data from any envoy."""
37 
38  envoy_serial_number: str
39  envoy_firmware: str
40 
41  def __init__(
42  self, hass: HomeAssistant, envoy: Envoy, entry: EnphaseConfigEntry
43  ) -> None:
44  """Initialize DataUpdateCoordinator for the envoy."""
45  self.envoyenvoy = envoy
46  entry_data = entry.data
47  self.entryentry = entry
48  self.usernameusername = entry_data[CONF_USERNAME]
49  self.passwordpassword = entry_data[CONF_PASSWORD]
50  self._setup_complete_setup_complete = False
51  self.envoy_firmwareenvoy_firmware = ""
52  self._cancel_token_refresh_cancel_token_refresh: CALLBACK_TYPE | None = None
53  super().__init__(
54  hass,
55  _LOGGER,
56  name=entry_data[CONF_NAME],
57  update_interval=SCAN_INTERVAL,
58  always_update=False,
59  )
60 
61  @callback
62  def _async_refresh_token_if_needed(self, now: datetime.datetime) -> None:
63  """Proactively refresh token if its stale in case cloud services goes down."""
64  assert isinstance(self.envoyenvoy.auth, EnvoyTokenAuth)
65  expire_time = self.envoyenvoy.auth.expire_timestamp
66  remain = expire_time - now.timestamp()
67  fresh = remain > STALE_TOKEN_THRESHOLD
68  name = self.namename
69  _LOGGER.debug("%s: %s seconds remaining on token fresh=%s", name, remain, fresh)
70  if not fresh:
71  self.hasshass.async_create_background_task(
72  self._async_try_refresh_token_async_try_refresh_token(), "{name} token refresh"
73  )
74 
75  async def _async_try_refresh_token(self) -> None:
76  """Try to refresh token."""
77  assert isinstance(self.envoyenvoy.auth, EnvoyTokenAuth)
78  _LOGGER.debug("%s: Trying to refresh token", self.namename)
79  try:
80  await self.envoyenvoy.auth.refresh()
81  except EnvoyError as err:
82  # If we can't refresh the token, we try again later
83  # If the token actually ends up expiring, we'll
84  # re-authenticate with username/password and get a new token
85  # or log an error if that fails
86  _LOGGER.debug("%s: Error refreshing token: %s", err, self.namename)
87  return
88  self._async_update_saved_token_async_update_saved_token()
89 
90  @callback
91  def _async_mark_setup_complete(self) -> None:
92  """Mark setup as complete and setup token refresh if needed."""
93  self._setup_complete_setup_complete = True
94  self.async_cancel_token_refreshasync_cancel_token_refresh()
95  if not isinstance(self.envoyenvoy.auth, EnvoyTokenAuth):
96  return
97  self._cancel_token_refresh_cancel_token_refresh = async_track_time_interval(
98  self.hasshass,
99  self._async_refresh_token_if_needed_async_refresh_token_if_needed,
100  TOKEN_REFRESH_CHECK_INTERVAL,
101  cancel_on_shutdown=True,
102  )
103 
104  async def _async_setup_and_authenticate(self) -> None:
105  """Set up and authenticate with the envoy."""
106  envoy = self.envoyenvoy
107  await envoy.setup()
108  assert envoy.serial_number is not None
109  self.envoy_serial_numberenvoy_serial_number = envoy.serial_number
110  if token := self.entryentry.data.get(CONF_TOKEN):
111  with contextlib.suppress(*INVALID_AUTH_ERRORS):
112  # Always set the username and password
113  # so we can refresh the token if needed
114  await envoy.authenticate(
115  username=self.usernameusername, password=self.passwordpassword, token=token
116  )
117  # The token is valid, but we still want
118  # to refresh it if it's stale right away
119  self._async_refresh_token_if_needed_async_refresh_token_if_needed(dt_util.utcnow())
120  return
121  # token likely expired or firmware changed
122  # so we fall through to authenticate with
123  # username/password
124  await self.envoyenvoy.authenticate(username=self.usernameusername, password=self.passwordpassword)
125  # Password auth succeeded, so we can update the token
126  # if we are using EnvoyTokenAuth
127  self._async_update_saved_token_async_update_saved_token()
128 
129  def _async_update_saved_token(self) -> None:
130  """Update saved token in config entry."""
131  envoy = self.envoyenvoy
132  if not isinstance(envoy.auth, EnvoyTokenAuth):
133  return
134  # update token in config entry so we can
135  # startup without hitting the Cloud API
136  # as long as the token is valid
137  _LOGGER.debug("%s: Updating token in config entry from auth", self.namename)
138  self.hasshass.config_entries.async_update_entry(
139  self.entryentry,
140  data={
141  **self.entryentry.data,
142  CONF_TOKEN: envoy.auth.token,
143  },
144  )
145 
146  async def _async_update_data(self) -> dict[str, Any]:
147  """Fetch all device and sensor data from api."""
148  envoy = self.envoyenvoy
149  for tries in range(2):
150  try:
151  if not self._setup_complete_setup_complete:
152  await self._async_setup_and_authenticate_async_setup_and_authenticate()
153  self._async_mark_setup_complete_async_mark_setup_complete()
154  # dump all received data in debug mode to assist troubleshooting
155  envoy_data = await envoy.update()
156  except INVALID_AUTH_ERRORS as err:
157  if self._setup_complete_setup_complete and tries == 0:
158  # token likely expired or firmware changed, try to re-authenticate
159  self._setup_complete_setup_complete = False
160  continue
161  raise ConfigEntryAuthFailed from err
162  except EnvoyError as err:
163  raise UpdateFailed(f"Error communicating with API: {err}") from err
164 
165  # if we have a firmware version from previous setup, compare to current one
166  # when envoy gets new firmware there will be an authentication failure
167  # which results in getting fw version again, if so reload the integration.
168  if (current_firmware := self.envoy_firmwareenvoy_firmware) and current_firmware != (
169  new_firmware := envoy.firmware
170  ):
171  _LOGGER.warning(
172  "Envoy firmware changed from: %s to: %s, reloading enphase envoy integration",
173  current_firmware,
174  new_firmware,
175  )
176  # reload the integration to get all established again
177  self.hasshass.async_create_task(
178  self.hasshass.config_entries.async_reload(self.entryentry.entry_id)
179  )
180  # remember firmware version for next time
181  self.envoy_firmwareenvoy_firmware = envoy.firmware
182  _LOGGER.debug("Envoy data: %s", envoy_data)
183  return envoy_data.raw
184 
185  raise RuntimeError("Unreachable code in _async_update_data") # pragma: no cover
186 
187  @callback
188  def async_cancel_token_refresh(self) -> None:
189  """Cancel token refresh."""
190  if self._cancel_token_refresh_cancel_token_refresh:
191  self._cancel_token_refresh_cancel_token_refresh()
192  self._cancel_token_refresh_cancel_token_refresh = None
None __init__(self, HomeAssistant hass, Envoy envoy, EnphaseConfigEntry entry)
Definition: coordinator.py:43
def authenticate(HomeAssistant hass, host, port, servers)
Definition: config_flow.py:104
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679