1 """Support for OpenWRT (ubus) routers."""
3 from __future__
import annotations
8 from openwrt.ubus
import Ubus
9 import voluptuous
as vol
12 DOMAIN
as DEVICE_TRACKER_DOMAIN,
13 PLATFORM_SCHEMA
as DEVICE_TRACKER_PLATFORM_SCHEMA,
21 _LOGGER = logging.getLogger(__name__)
23 CONF_DHCP_SOFTWARE =
"dhcp_software"
24 DEFAULT_DHCP_SOFTWARE =
"dnsmasq"
25 DHCP_SOFTWARES = [
"dnsmasq",
"odhcpd",
"none"]
27 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
29 vol.Required(CONF_HOST): cv.string,
30 vol.Required(CONF_PASSWORD): cv.string,
31 vol.Required(CONF_USERNAME): cv.string,
32 vol.Optional(CONF_DHCP_SOFTWARE, default=DEFAULT_DHCP_SOFTWARE): vol.In(
39 def get_scanner(hass: HomeAssistant, config: ConfigType) -> DeviceScanner |
None:
40 """Validate the configuration and return an ubus scanner."""
41 config = config[DEVICE_TRACKER_DOMAIN]
43 dhcp_sw = config[CONF_DHCP_SOFTWARE]
44 scanner: DeviceScanner
45 if dhcp_sw ==
"dnsmasq":
47 elif dhcp_sw ==
"odhcpd":
52 return scanner
if scanner.success_init
else None
56 """If remove rebooted, it lost our session so rebuild one and try again."""
58 def decorator(self, *args, **kwargs):
59 """Wrap the function to refresh session_id on PermissionError."""
61 return func(self, *args, **kwargs)
62 except PermissionError:
64 "Invalid session detected. Trying to refresh session_id and re-run RPC"
68 return func(self, *args, **kwargs)
74 """Class which queries a wireless router running OpenWrt firmware.
76 Adapted from Tomato scanner.
80 """Initialize the scanner."""
81 self.
hosthost = config[CONF_HOST]
87 self.
urlurl = f
"http://{self.host}/ubus"
95 """Scan for new devices and return a list with found device IDs."""
100 """Return empty MAC to name dict. Overridden if DHCP server is set."""
103 @_refresh_on_access_denied
105 """Return the name of the given device or None if we don't know."""
111 return self.
mac2namemac2name.
get(device.upper(),
None)
114 """Return the host to distinguish between multiple routers."""
115 return {
"host": self.
hosthost}
117 @_refresh_on_access_denied
119 """Ensure the information from the router is up to date.
121 Returns boolean if scanning successful.
126 _LOGGER.debug(
"Checking hostapd")
129 hostapd = self.
ubusubus.get_hostapd()
130 self.
hostapdhostapd.extend(hostapd.keys())
135 for hostapd
in self.
hostapdhostapd:
136 if result := self.
ubusubus.get_hostapd_clients(hostapd):
137 results = results + 1
139 for key
in result[
"clients"]:
140 device = result[
"clients"][key]
141 if device[
"authorized"]:
148 """Implement the Ubus device scanning for the dnsmasq DHCP server."""
151 """Initialize the scanner."""
157 if result := self.
ubusubus.get_uci_config(
"dhcp",
"dnsmasq"):
158 values = result[
"values"].values()
159 self.
leasefileleasefile = next(iter(values))[
"leasefile"]
166 for line
in result[
"data"].splitlines():
167 hosts = line.split(
" ")
175 """Implement the Ubus device scanning for the odhcp DHCP server."""
178 if result := self.
ubusubus.get_dhcp_method(
"ipv4leases"):
180 for device
in result[
"device"].values():
181 for lease
in device[
"leases"]:
184 mac =
":".join(mac[i : i + 2]
for i
in range(0, len(mac), 2))
def _generate_mac2name(self)
def __init__(self, config)
def _generate_mac2name(self)
def get_device_name(self, device)
def _generate_mac2name(self)
def __init__(self, config)
dict[str, str] async_get_extra_attributes(self, str device)
web.Response get(self, web.Request request, str config_key)
def _refresh_on_access_denied(func)
DeviceScanner|None get_scanner(HomeAssistant hass, ConfigType config)