Home Assistant Unofficial Reference 2024.12.1
director_utils.py
Go to the documentation of this file.
1 """Provides data updates from the Control4 controller for platforms."""
2 
3 from collections import defaultdict
4 import logging
5 from typing import Any
6 
7 from pyControl4.account import C4Account
8 from pyControl4.director import C4Director
9 from pyControl4.error_handling import BadToken
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME
13 from homeassistant.core import HomeAssistant
14 from homeassistant.helpers import aiohttp_client
15 
16 from .const import CONF_ACCOUNT, CONF_CONTROLLER_UNIQUE_ID, CONF_DIRECTOR, DOMAIN
17 
18 _LOGGER = logging.getLogger(__name__)
19 
20 
22  hass: HomeAssistant, entry: ConfigEntry, variable_names: set[str]
23 ) -> dict[int, dict[str, Any]]:
24  """Retrieve data from the Control4 director."""
25  director: C4Director = hass.data[DOMAIN][entry.entry_id][CONF_DIRECTOR]
26  data = await director.getAllItemVariableValue(variable_names)
27  result_dict: defaultdict[int, dict[str, Any]] = defaultdict(dict)
28  for item in data:
29  result_dict[item["id"]][item["varName"]] = item["value"]
30  return dict(result_dict)
31 
32 
34  hass: HomeAssistant, entry: ConfigEntry, variable_names: set[str]
35 ) -> dict[int, dict[str, Any]]:
36  """Try to Retrieve data from the Control4 director for update_coordinator."""
37  try:
38  return await _update_variables_for_config_entry(hass, entry, variable_names)
39  except BadToken:
40  _LOGGER.debug("Updating Control4 director token")
41  await refresh_tokens(hass, entry)
42  return await _update_variables_for_config_entry(hass, entry, variable_names)
43 
44 
45 async def refresh_tokens(hass: HomeAssistant, entry: ConfigEntry):
46  """Store updated authentication and director tokens in hass.data."""
47  config = entry.data
48  account_session = aiohttp_client.async_get_clientsession(hass)
49 
50  account = C4Account(config[CONF_USERNAME], config[CONF_PASSWORD], account_session)
51  await account.getAccountBearerToken()
52 
53  controller_unique_id = config[CONF_CONTROLLER_UNIQUE_ID]
54  director_token_dict = await account.getDirectorBearerToken(controller_unique_id)
55  director_session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False)
56 
57  director = C4Director(
58  config[CONF_HOST], director_token_dict[CONF_TOKEN], director_session
59  )
60 
61  _LOGGER.debug("Saving new tokens in hass data")
62  entry_data = hass.data[DOMAIN][entry.entry_id]
63  entry_data[CONF_ACCOUNT] = account
64  entry_data[CONF_DIRECTOR] = director
dict[int, dict[str, Any]] update_variables_for_config_entry(HomeAssistant hass, ConfigEntry entry, set[str] variable_names)
dict[int, dict[str, Any]] _update_variables_for_config_entry(HomeAssistant hass, ConfigEntry entry, set[str] variable_names)
def refresh_tokens(HomeAssistant hass, ConfigEntry entry)