Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Define a custom coordinator for the Weheat heatpump integration."""
2 
3 from datetime import timedelta
4 
5 from weheat.abstractions.discovery import HeatPumpDiscovery
6 from weheat.abstractions.heat_pump import HeatPump
7 from weheat.exceptions import (
8  ApiException,
9  BadRequestException,
10  ForbiddenException,
11  NotFoundException,
12  ServiceException,
13  UnauthorizedException,
14 )
15 
16 from homeassistant.const import CONF_ACCESS_TOKEN
17 from homeassistant.core import HomeAssistant
18 from homeassistant.exceptions import ConfigEntryAuthFailed
19 from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session
20 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
21 
22 from .const import API_URL, DOMAIN, LOGGER, UPDATE_INTERVAL
23 
24 EXCEPTIONS = (
25  ServiceException,
26  NotFoundException,
27  ForbiddenException,
28  BadRequestException,
29  ApiException,
30 )
31 
32 
34  """A custom coordinator for the Weheat heatpump integration."""
35 
36  def __init__(
37  self,
38  hass: HomeAssistant,
39  session: OAuth2Session,
40  heat_pump: HeatPumpDiscovery.HeatPumpInfo,
41  ) -> None:
42  """Initialize the data coordinator."""
43  super().__init__(
44  hass,
45  logger=LOGGER,
46  name=DOMAIN,
47  update_interval=timedelta(seconds=UPDATE_INTERVAL),
48  )
49  self.heat_pump_infoheat_pump_info = heat_pump
50  self._heat_pump_data_heat_pump_data = HeatPump(API_URL, heat_pump.uuid)
51 
52  self.sessionsession = session
53 
54  @property
55  def heatpump_id(self) -> str:
56  """Return the heat pump id."""
57  return self.heat_pump_infoheat_pump_info.uuid
58 
59  @property
60  def readable_name(self) -> str | None:
61  """Return the readable name of the heat pump."""
62  if self.heat_pump_infoheat_pump_info.name:
63  return self.heat_pump_infoheat_pump_info.name
64  return self.heat_pump_infoheat_pump_info.model
65 
66  @property
67  def model(self) -> str:
68  """Return the model of the heat pump."""
69  return self.heat_pump_infoheat_pump_info.model
70 
71  def fetch_data(self) -> HeatPump:
72  """Get the data from the API."""
73  try:
74  self._heat_pump_data_heat_pump_data.get_status(self.sessionsession.token[CONF_ACCESS_TOKEN])
75  except UnauthorizedException as error:
76  raise ConfigEntryAuthFailed from error
77  except EXCEPTIONS as error:
78  raise UpdateFailed(error) from error
79 
80  return self._heat_pump_data_heat_pump_data
81 
82  async def _async_update_data(self) -> HeatPump:
83  """Fetch data from the API."""
84  await self.sessionsession.async_ensure_token_valid()
85 
86  return await self.hasshass.async_add_executor_job(self.fetch_datafetch_data)
None __init__(self, HomeAssistant hass, OAuth2Session session, HeatPumpDiscovery.HeatPumpInfo heat_pump)
Definition: coordinator.py:41
def get_status(hass, host, port)
Definition: panel.py:387