Home Assistant Unofficial Reference 2024.12.1
device_tracker.py
Go to the documentation of this file.
1 """Support for device tracking of Huawei LTE routers."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import re
7 from typing import Any, cast
8 
9 from stringcase import snakecase
10 
12  DOMAIN as DEVICE_TRACKER_DOMAIN,
13  ScannerEntity,
14 )
15 from homeassistant.config_entries import ConfigEntry
16 from homeassistant.core import HomeAssistant, callback
17 from homeassistant.helpers import entity_registry as er
18 from homeassistant.helpers.dispatcher import async_dispatcher_connect
19 from homeassistant.helpers.entity import Entity
20 from homeassistant.helpers.entity_platform import AddEntitiesCallback
21 
22 from . import Router
23 from .const import (
24  CONF_TRACK_WIRED_CLIENTS,
25  DEFAULT_TRACK_WIRED_CLIENTS,
26  DOMAIN,
27  KEY_LAN_HOST_INFO,
28  KEY_WLAN_HOST_LIST,
29  UPDATE_SIGNAL,
30 )
31 from .entity import HuaweiLteBaseEntity
32 
33 _LOGGER = logging.getLogger(__name__)
34 
35 _DEVICE_SCAN = f"{DEVICE_TRACKER_DOMAIN}/device_scan"
36 
37 type _HostType = dict[str, Any]
38 
39 
41  router: Router, ignore_subscriptions: bool = False
42 ) -> list[_HostType] | None:
43  for key in KEY_LAN_HOST_INFO, KEY_WLAN_HOST_LIST:
44  if not ignore_subscriptions and key not in router.subscriptions:
45  continue
46  try:
47  return cast(list[_HostType], router.data[key]["Hosts"]["Host"])
48  except KeyError:
49  _LOGGER.debug("%s[%s][%s] not in data", key, "Hosts", "Host")
50  return None
51 
52 
54  hass: HomeAssistant,
55  config_entry: ConfigEntry,
56  async_add_entities: AddEntitiesCallback,
57 ) -> None:
58  """Set up from config entry."""
59 
60  # Grab hosts list once to examine whether the initial fetch has got some data for
61  # us, i.e. if wlan host list is supported. Only set up a subscription and proceed
62  # with adding and tracking entities if it is.
63  router = hass.data[DOMAIN].routers[config_entry.entry_id]
64  if (hosts := _get_hosts(router, True)) is None:
65  return
66 
67  # Initialize already tracked entities
68  tracked: set[str] = set()
69  registry = er.async_get(hass)
70  known_entities: list[Entity] = []
71  track_wired_clients = router.config_entry.options.get(
72  CONF_TRACK_WIRED_CLIENTS, DEFAULT_TRACK_WIRED_CLIENTS
73  )
74  for entity in registry.entities.get_entries_for_config_entry_id(
75  config_entry.entry_id
76  ):
77  if entity.domain == DEVICE_TRACKER_DOMAIN:
78  mac = entity.unique_id.partition("-")[2]
79  # Do not add known wired clients if not tracking them (any more)
80  skip = False
81  if not track_wired_clients:
82  for host in hosts:
83  if host.get("MacAddress") == mac:
84  skip = not _is_wireless(host)
85  break
86  if not skip:
87  tracked.add(entity.unique_id)
88  known_entities.append(HuaweiLteScannerEntity(router, mac))
89  async_add_entities(known_entities, True)
90 
91  # Tell parent router to poll hosts list to gather new devices
92  router.subscriptions[KEY_LAN_HOST_INFO].append(_DEVICE_SCAN)
93  router.subscriptions[KEY_WLAN_HOST_LIST].append(_DEVICE_SCAN)
94 
95  async def _async_maybe_add_new_entities(unique_id: str) -> None:
96  """Add new entities if the update signal comes from our router."""
97  if config_entry.unique_id == unique_id:
98  async_add_new_entities(router, async_add_entities, tracked)
99 
100  # Register to handle router data updates
101  disconnect_dispatcher = async_dispatcher_connect(
102  hass, UPDATE_SIGNAL, _async_maybe_add_new_entities
103  )
104  config_entry.async_on_unload(disconnect_dispatcher)
105 
106  # Add new entities from initial scan
107  async_add_new_entities(router, async_add_entities, tracked)
108 
109 
110 def _is_wireless(host: _HostType) -> bool:
111  # LAN host info entries have an "InterfaceType" property, "Ethernet" / "Wireless".
112  # WLAN host list ones don't, but they're expected to be all wireless.
113  return cast(str, host.get("InterfaceType", "Wireless")) != "Ethernet"
114 
115 
116 def _is_connected(host: _HostType | None) -> bool:
117  # LAN host info entries have an "Active" property, "1" or "0".
118  # WLAN host list ones don't, but that call appears to return active hosts only.
119  return False if host is None else cast(str, host.get("Active", "1")) != "0"
120 
121 
122 def _is_us(host: _HostType) -> bool:
123  """Try to determine if the host entry is us, the HA instance."""
124  # LAN host info entries have an "isLocalDevice" property, "1" / "0"; WLAN host list ones don't.
125  return cast(str, host.get("isLocalDevice", "0")) == "1"
126 
127 
128 @callback
130  router: Router,
131  async_add_entities: AddEntitiesCallback,
132  tracked: set[str],
133 ) -> None:
134  """Add new entities that are not already being tracked."""
135  if not (hosts := _get_hosts(router)):
136  return
137 
138  track_wired_clients = router.config_entry.options.get(
139  CONF_TRACK_WIRED_CLIENTS, DEFAULT_TRACK_WIRED_CLIENTS
140  )
141 
142  new_entities: list[Entity] = []
143  for host in (
144  x
145  for x in hosts
146  if not _is_us(x)
147  and _is_connected(x)
148  and x.get("MacAddress")
149  and (track_wired_clients or _is_wireless(x))
150  ):
151  entity = HuaweiLteScannerEntity(router, host["MacAddress"])
152  if entity.unique_id in tracked:
153  continue
154  tracked.add(entity.unique_id)
155  new_entities.append(entity)
156  async_add_entities(new_entities, True)
157 
158 
159 def _better_snakecase(text: str) -> str:
160  # Awaiting https://github.com/okunishinishi/python-stringcase/pull/18
161  if text == text.upper():
162  # All uppercase to all lowercase to get http for HTTP, not h_t_t_p
163  text = text.lower()
164  else:
165  # Three or more consecutive uppercase with middle part lowercased
166  # to get http_response for HTTPResponse, not h_t_t_p_response
167  text = re.sub(
168  r"([A-Z])([A-Z]+)([A-Z](?:[^A-Z]|$))",
169  lambda match: f"{match.group(1)}{match.group(2).lower()}{match.group(3)}",
170  text,
171  )
172  return cast(str, snakecase(text))
173 
174 
176  """Huawei LTE router scanner entity."""
177 
178  _ip_address: str | None = None
179  _is_connected: bool = False
180  _hostname: str | None = None
181 
182  def __init__(self, router: Router, mac_address: str) -> None:
183  """Initialize."""
184  super().__init__(router)
185  self._extra_state_attributes_extra_state_attributes: dict[str, Any] = {}
186  self._mac_address_mac_address = mac_address
187 
188  @property
189  def name(self) -> str:
190  """Return the name of the entity."""
191  return self.hostnamehostname or self.mac_addressmac_address
192 
193  @property
194  def _device_unique_id(self) -> str:
195  return self.mac_addressmac_address
196 
197  @property
198  def ip_address(self) -> str | None:
199  """Return the primary ip address of the device."""
200  return self._ip_address_ip_address
201 
202  @property
203  def mac_address(self) -> str:
204  """Return the mac address of the device."""
205  return self._mac_address_mac_address
206 
207  @property
208  def hostname(self) -> str | None:
209  """Return hostname of the device."""
210  return self._hostname_hostname
211 
212  @property
213  def is_connected(self) -> bool:
214  """Get whether the entity is connected."""
215  return self._is_connected_is_connected
216 
217  @property
218  def extra_state_attributes(self) -> dict[str, Any]:
219  """Get additional attributes related to entity state."""
220  return self._extra_state_attributes_extra_state_attributes
221 
222  async def async_update(self) -> None:
223  """Update state."""
224  if (hosts := _get_hosts(self.routerrouter)) is None:
225  self._available_available_available = False
226  return
227  self._available_available_available = True
228  host = next(
229  (x for x in hosts if x.get("MacAddress") == self._mac_address_mac_address), None
230  )
231  self._is_connected_is_connected = _is_connected(host)
232  if host is not None:
233  # IpAddress can contain multiple semicolon separated addresses.
234  # Pick one for model sanity; e.g. the dhcp component to which it is fed, parses and expects to see just one.
235  self._ip_address_ip_address = (host.get("IpAddress") or "").split(";", 2)[0] or None
236  self._hostname_hostname = host.get("HostName")
237  self._extra_state_attributes_extra_state_attributes = {
238  _better_snakecase(k): v
239  for k, v in host.items()
240  if k
241  in {
242  "AddressSource",
243  "AssociatedSsid",
244  "InterfaceType",
245  }
246  }
None async_add_new_entities(Router router, AddEntitiesCallback async_add_entities, set[str] tracked)
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
list[_HostType]|None _get_hosts(Router router, bool ignore_subscriptions=False)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103