Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the Enigma2 integration."""
2 
3 import logging
4 
5 from openwebif.api import OpenWebIfDevice, OpenWebIfStatus
6 from yarl import URL
7 
8 from homeassistant.config_entries import ConfigEntry
9 from homeassistant.const import (
10  ATTR_CONNECTIONS,
11  ATTR_IDENTIFIERS,
12  CONF_HOST,
13  CONF_PASSWORD,
14  CONF_PORT,
15  CONF_SSL,
16  CONF_USERNAME,
17  CONF_VERIFY_SSL,
18 )
19 from homeassistant.core import HomeAssistant
20 from homeassistant.helpers.aiohttp_client import async_create_clientsession
22  CONNECTION_NETWORK_MAC,
23  DeviceInfo,
24  format_mac,
25 )
26 from homeassistant.helpers.entity_component import DEFAULT_SCAN_INTERVAL
27 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
28 
29 from .const import CONF_SOURCE_BOUQUET, DOMAIN
30 
31 LOGGER = logging.getLogger(__package__)
32 
33 
35  """The Enigma2 data update coordinator."""
36 
37  device: OpenWebIfDevice
38 
39  def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
40  """Initialize the Enigma2 data update coordinator."""
41 
42  super().__init__(
43  hass, logger=LOGGER, name=DOMAIN, update_interval=DEFAULT_SCAN_INTERVAL
44  )
45 
46  base_url = URL.build(
47  scheme="http" if not config_entry.data[CONF_SSL] else "https",
48  host=config_entry.data[CONF_HOST],
49  port=config_entry.data[CONF_PORT],
50  user=config_entry.data.get(CONF_USERNAME),
51  password=config_entry.data.get(CONF_PASSWORD),
52  )
53 
55  hass, verify_ssl=config_entry.data[CONF_VERIFY_SSL], base_url=base_url
56  )
57 
58  self.devicedevice = OpenWebIfDevice(
59  session, source_bouquet=config_entry.options.get(CONF_SOURCE_BOUQUET)
60  )
61 
62  self.device_infodevice_info = DeviceInfo(
63  configuration_url=base_url,
64  name=config_entry.data[CONF_HOST],
65  )
66 
67  async def _async_setup(self) -> None:
68  """Provide needed data to the device info."""
69 
70  about = await self.devicedevice.get_about()
71  self.devicedevice.mac_address = about["info"]["ifaces"][0]["mac"]
72  self.device_infodevice_info["model"] = about["info"]["model"]
73  self.device_infodevice_info["manufacturer"] = about["info"]["brand"]
74  self.device_infodevice_info[ATTR_IDENTIFIERS] = {
75  (DOMAIN, format_mac(iface["mac"]))
76  for iface in about["info"]["ifaces"]
77  if "mac" in iface and iface["mac"] is not None
78  }
79  self.device_infodevice_info[ATTR_CONNECTIONS] = {
80  (CONNECTION_NETWORK_MAC, format_mac(iface["mac"]))
81  for iface in about["info"]["ifaces"]
82  if "mac" in iface and iface["mac"] is not None
83  }
84 
85  async def _async_update_data(self) -> OpenWebIfStatus:
86  await self.devicedevice.update()
87  return self.devicedevice.status
None __init__(self, HomeAssistant hass, ConfigEntry config_entry)
Definition: coordinator.py:39
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
aiohttp.ClientSession async_create_clientsession()
Definition: coordinator.py:51