Home Assistant Unofficial Reference 2024.12.1
api.py
Go to the documentation of this file.
1 """API for Home Connect bound to HASS OAuth."""
2 
3 from asyncio import run_coroutine_threadsafe
4 import logging
5 
6 import homeconnect
7 from homeconnect.api import HomeConnectAppliance, HomeConnectError
8 
9 from homeassistant.config_entries import ConfigEntry
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers import config_entry_oauth2_flow
12 from homeassistant.helpers.dispatcher import dispatcher_send
13 
14 from .const import ATTR_KEY, ATTR_VALUE, BSH_ACTIVE_PROGRAM, SIGNAL_UPDATE_ENTITIES
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 
19 class ConfigEntryAuth(homeconnect.HomeConnectAPI):
20  """Provide Home Connect authentication tied to an OAuth2 based config entry."""
21 
22  def __init__(
23  self,
24  hass: HomeAssistant,
25  config_entry: ConfigEntry,
26  implementation: config_entry_oauth2_flow.AbstractOAuth2Implementation,
27  ) -> None:
28  """Initialize Home Connect Auth."""
29  self.hasshass = hass
30  self.config_entryconfig_entry = config_entry
31  self.sessionsession = config_entry_oauth2_flow.OAuth2Session(
32  hass, config_entry, implementation
33  )
34  super().__init__(self.sessionsession.token)
35  self.devicesdevices: list[HomeConnectDevice] = []
36 
37  def refresh_tokens(self) -> dict:
38  """Refresh and return new Home Connect tokens using Home Assistant OAuth2 session."""
39  run_coroutine_threadsafe(
40  self.sessionsession.async_ensure_token_valid(), self.hasshass.loop
41  ).result()
42 
43  return self.sessionsession.token
44 
45  def get_devices(self) -> list[HomeConnectAppliance]:
46  """Get a dictionary of devices."""
47  appl: list[HomeConnectAppliance] = self.get_appliances()
48  self.devicesdevices = [HomeConnectDevice(self.hasshass, app) for app in appl]
49  return self.devicesdevices
50 
51 
53  """Generic Home Connect device."""
54 
55  def __init__(self, hass: HomeAssistant, appliance: HomeConnectAppliance) -> None:
56  """Initialize the device class."""
57  self.hasshass = hass
58  self.applianceappliance = appliance
59 
60  def initialize(self) -> None:
61  """Fetch the info needed to initialize the device."""
62  try:
63  self.applianceappliance.get_status()
64  except (HomeConnectError, ValueError):
65  _LOGGER.debug("Unable to fetch appliance status. Probably offline")
66  try:
67  self.applianceappliance.get_settings()
68  except (HomeConnectError, ValueError):
69  _LOGGER.debug("Unable to fetch settings. Probably offline")
70  try:
71  program_active = self.applianceappliance.get_programs_active()
72  except (HomeConnectError, ValueError):
73  _LOGGER.debug("Unable to fetch active programs. Probably offline")
74  program_active = None
75  if program_active and ATTR_KEY in program_active:
76  self.applianceappliance.status[BSH_ACTIVE_PROGRAM] = {
77  ATTR_VALUE: program_active[ATTR_KEY]
78  }
79  self.applianceappliance.listen_events(callback=self.event_callbackevent_callback)
80 
81  def event_callback(self, appliance: HomeConnectAppliance) -> None:
82  """Handle event."""
83  _LOGGER.debug("Update triggered on %s", appliance.name)
84  _LOGGER.debug(self.applianceappliance.status)
85  dispatcher_send(self.hasshass, SIGNAL_UPDATE_ENTITIES, appliance.haId)
list[HomeConnectAppliance] get_devices(self)
Definition: api.py:45
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, config_entry_oauth2_flow.AbstractOAuth2Implementation implementation)
Definition: api.py:27
None __init__(self, HomeAssistant hass, HomeConnectAppliance appliance)
Definition: api.py:55
None event_callback(self, HomeConnectAppliance appliance)
Definition: api.py:81
def get_status(hass, host, port)
Definition: panel.py:387
None dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:137