1 """Provides data updates from the Control4 controller for platforms."""
3 from collections
import defaultdict
7 from pyControl4.account
import C4Account
8 from pyControl4.director
import C4Director
9 from pyControl4.error_handling
import BadToken
16 from .const
import CONF_ACCOUNT, CONF_CONTROLLER_UNIQUE_ID, CONF_DIRECTOR, DOMAIN
18 _LOGGER = logging.getLogger(__name__)
22 hass: HomeAssistant, entry: ConfigEntry, variable_names: set[str]
23 ) -> dict[int, dict[str, Any]]:
24 """Retrieve data from the Control4 director."""
25 director: C4Director = hass.data[DOMAIN][entry.entry_id][CONF_DIRECTOR]
26 data = await director.getAllItemVariableValue(variable_names)
27 result_dict: defaultdict[int, dict[str, Any]] = defaultdict(dict)
29 result_dict[item[
"id"]][item[
"varName"]] = item[
"value"]
30 return dict(result_dict)
34 hass: HomeAssistant, entry: ConfigEntry, variable_names: set[str]
35 ) -> dict[int, dict[str, Any]]:
36 """Try to Retrieve data from the Control4 director for update_coordinator."""
40 _LOGGER.debug(
"Updating Control4 director token")
46 """Store updated authentication and director tokens in hass.data."""
48 account_session = aiohttp_client.async_get_clientsession(hass)
50 account = C4Account(config[CONF_USERNAME], config[CONF_PASSWORD], account_session)
51 await account.getAccountBearerToken()
53 controller_unique_id = config[CONF_CONTROLLER_UNIQUE_ID]
54 director_token_dict = await account.getDirectorBearerToken(controller_unique_id)
55 director_session = aiohttp_client.async_get_clientsession(hass, verify_ssl=
False)
57 director = C4Director(
58 config[CONF_HOST], director_token_dict[CONF_TOKEN], director_session
61 _LOGGER.debug(
"Saving new tokens in hass data")
62 entry_data = hass.data[DOMAIN][entry.entry_id]
63 entry_data[CONF_ACCOUNT] = account
64 entry_data[CONF_DIRECTOR] = director
dict[int, dict[str, Any]] update_variables_for_config_entry(HomeAssistant hass, ConfigEntry entry, set[str] variable_names)
dict[int, dict[str, Any]] _update_variables_for_config_entry(HomeAssistant hass, ConfigEntry entry, set[str] variable_names)
def refresh_tokens(HomeAssistant hass, ConfigEntry entry)