1 """API for Home Connect bound to HASS OAuth."""
3 from asyncio
import run_coroutine_threadsafe
7 from homeconnect.api
import HomeConnectAppliance, HomeConnectError
14 from .const
import ATTR_KEY, ATTR_VALUE, BSH_ACTIVE_PROGRAM, SIGNAL_UPDATE_ENTITIES
16 _LOGGER = logging.getLogger(__name__)
20 """Provide Home Connect authentication tied to an OAuth2 based config entry."""
25 config_entry: ConfigEntry,
26 implementation: config_entry_oauth2_flow.AbstractOAuth2Implementation,
28 """Initialize Home Connect Auth."""
31 self.
sessionsession = config_entry_oauth2_flow.OAuth2Session(
32 hass, config_entry, implementation
35 self.
devicesdevices: list[HomeConnectDevice] = []
38 """Refresh and return new Home Connect tokens using Home Assistant OAuth2 session."""
39 run_coroutine_threadsafe(
40 self.
sessionsession.async_ensure_token_valid(), self.
hasshass.loop
43 return self.
sessionsession.token
46 """Get a dictionary of devices."""
47 appl: list[HomeConnectAppliance] = self.get_appliances()
53 """Generic Home Connect device."""
55 def __init__(self, hass: HomeAssistant, appliance: HomeConnectAppliance) ->
None:
56 """Initialize the device class."""
61 """Fetch the info needed to initialize the device."""
64 except (HomeConnectError, ValueError):
65 _LOGGER.debug(
"Unable to fetch appliance status. Probably offline")
68 except (HomeConnectError, ValueError):
69 _LOGGER.debug(
"Unable to fetch settings. Probably offline")
71 program_active = self.
applianceappliance.get_programs_active()
72 except (HomeConnectError, ValueError):
73 _LOGGER.debug(
"Unable to fetch active programs. Probably offline")
75 if program_active
and ATTR_KEY
in program_active:
76 self.
applianceappliance.status[BSH_ACTIVE_PROGRAM] = {
77 ATTR_VALUE: program_active[ATTR_KEY]
83 _LOGGER.debug(
"Update triggered on %s", appliance.name)
84 _LOGGER.debug(self.
applianceappliance.status)
list[HomeConnectAppliance] get_devices(self)
dict refresh_tokens(self)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, config_entry_oauth2_flow.AbstractOAuth2Implementation implementation)
None __init__(self, HomeAssistant hass, HomeConnectAppliance appliance)
None event_callback(self, HomeConnectAppliance appliance)
def get_status(hass, host, port)
None dispatcher_send(HomeAssistant hass, str signal, *Any args)