Home Assistant Unofficial Reference 2024.12.1
device_tracker.py
Go to the documentation of this file.
1 """Support for OpenWRT (ubus) routers."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import re
7 
8 from openwrt.ubus import Ubus
9 import voluptuous as vol
10 
12  DOMAIN as DEVICE_TRACKER_DOMAIN,
13  PLATFORM_SCHEMA as DEVICE_TRACKER_PLATFORM_SCHEMA,
14  DeviceScanner,
15 )
16 from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
17 from homeassistant.core import HomeAssistant
19 from homeassistant.helpers.typing import ConfigType
20 
21 _LOGGER = logging.getLogger(__name__)
22 
23 CONF_DHCP_SOFTWARE = "dhcp_software"
24 DEFAULT_DHCP_SOFTWARE = "dnsmasq"
25 DHCP_SOFTWARES = ["dnsmasq", "odhcpd", "none"]
26 
27 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
28  {
29  vol.Required(CONF_HOST): cv.string,
30  vol.Required(CONF_PASSWORD): cv.string,
31  vol.Required(CONF_USERNAME): cv.string,
32  vol.Optional(CONF_DHCP_SOFTWARE, default=DEFAULT_DHCP_SOFTWARE): vol.In(
33  DHCP_SOFTWARES
34  ),
35  }
36 )
37 
38 
39 def get_scanner(hass: HomeAssistant, config: ConfigType) -> DeviceScanner | None:
40  """Validate the configuration and return an ubus scanner."""
41  config = config[DEVICE_TRACKER_DOMAIN]
42 
43  dhcp_sw = config[CONF_DHCP_SOFTWARE]
44  scanner: DeviceScanner
45  if dhcp_sw == "dnsmasq":
46  scanner = DnsmasqUbusDeviceScanner(config)
47  elif dhcp_sw == "odhcpd":
48  scanner = OdhcpdUbusDeviceScanner(config)
49  else:
50  scanner = UbusDeviceScanner(config)
51 
52  return scanner if scanner.success_init else None
53 
54 
56  """If remove rebooted, it lost our session so rebuild one and try again."""
57 
58  def decorator(self, *args, **kwargs):
59  """Wrap the function to refresh session_id on PermissionError."""
60  try:
61  return func(self, *args, **kwargs)
62  except PermissionError:
63  _LOGGER.warning(
64  "Invalid session detected. Trying to refresh session_id and re-run RPC"
65  )
66  self.ubus.connect()
67 
68  return func(self, *args, **kwargs)
69 
70  return decorator
71 
72 
73 class UbusDeviceScanner(DeviceScanner):
74  """Class which queries a wireless router running OpenWrt firmware.
75 
76  Adapted from Tomato scanner.
77  """
78 
79  def __init__(self, config):
80  """Initialize the scanner."""
81  self.hosthost = config[CONF_HOST]
82  self.usernameusername = config[CONF_USERNAME]
83  self.passwordpassword = config[CONF_PASSWORD]
84 
85  self.parse_api_patternparse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
86  self.last_resultslast_results = {}
87  self.urlurl = f"http://{self.host}/ubus"
88 
89  self.ubusubus = Ubus(self.urlurl, self.usernameusername, self.passwordpassword)
90  self.hostapdhostapd = []
91  self.mac2namemac2name = None
92  self.success_initsuccess_init = self.ubusubus.connect() is not None
93 
94  def scan_devices(self):
95  """Scan for new devices and return a list with found device IDs."""
96  self._update_info_update_info()
97  return self.last_resultslast_results
98 
99  def _generate_mac2name(self):
100  """Return empty MAC to name dict. Overridden if DHCP server is set."""
101  self.mac2namemac2name = {}
102 
103  @_refresh_on_access_denied
104  def get_device_name(self, device):
105  """Return the name of the given device or None if we don't know."""
106  if self.mac2namemac2name is None:
107  self._generate_mac2name_generate_mac2name()
108  if self.mac2namemac2name is None:
109  # Generation of mac2name dictionary failed
110  return None
111  return self.mac2namemac2name.get(device.upper(), None)
112 
113  async def async_get_extra_attributes(self, device: str) -> dict[str, str]:
114  """Return the host to distinguish between multiple routers."""
115  return {"host": self.hosthost}
116 
117  @_refresh_on_access_denied
118  def _update_info(self):
119  """Ensure the information from the router is up to date.
120 
121  Returns boolean if scanning successful.
122  """
123  if not self.success_initsuccess_init:
124  return False
125 
126  _LOGGER.debug("Checking hostapd")
127 
128  if not self.hostapdhostapd:
129  hostapd = self.ubusubus.get_hostapd()
130  self.hostapdhostapd.extend(hostapd.keys())
131 
132  self.last_resultslast_results = []
133  results = 0
134  # for each access point
135  for hostapd in self.hostapdhostapd:
136  if result := self.ubusubus.get_hostapd_clients(hostapd):
137  results = results + 1
138  # Check for each device is authorized (valid wpa key)
139  for key in result["clients"]:
140  device = result["clients"][key]
141  if device["authorized"]:
142  self.last_resultslast_results.append(key)
143 
144  return bool(results)
145 
146 
148  """Implement the Ubus device scanning for the dnsmasq DHCP server."""
149 
150  def __init__(self, config):
151  """Initialize the scanner."""
152  super().__init__(config)
153  self.leasefileleasefile = None
154 
156  if self.leasefileleasefile is None:
157  if result := self.ubusubus.get_uci_config("dhcp", "dnsmasq"):
158  values = result["values"].values()
159  self.leasefileleasefile = next(iter(values))["leasefile"]
160  else:
161  return
162 
163  result = self.ubusubus.file_read(self.leasefileleasefile)
164  if result:
165  self.mac2namemac2namemac2name = {}
166  for line in result["data"].splitlines():
167  hosts = line.split(" ")
168  self.mac2namemac2namemac2name[hosts[1].upper()] = hosts[3]
169  else:
170  # Error, handled in the ubus.file_read()
171  return
172 
173 
175  """Implement the Ubus device scanning for the odhcp DHCP server."""
176 
178  if result := self.ubusubus.get_dhcp_method("ipv4leases"):
179  self.mac2namemac2namemac2name = {}
180  for device in result["device"].values():
181  for lease in device["leases"]:
182  mac = lease["mac"] # mac = aabbccddeeff
183  # Convert it to expected format with colon
184  mac = ":".join(mac[i : i + 2] for i in range(0, len(mac), 2))
185  self.mac2namemac2namemac2name[mac.upper()] = lease["hostname"]
186  else:
187  # Error, handled in the ubus.get_dhcp_method()
188  return
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
DeviceScanner|None get_scanner(HomeAssistant hass, ConfigType config)