1 """The dhcp integration."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from dataclasses
import dataclass
8 from datetime
import timedelta
9 from fnmatch
import translate
10 from functools
import lru_cache
14 from typing
import Any, Final
17 from aiodiscover
import DiscoverHosts
18 from aiodiscover.discovery
import (
19 HOSTNAME
as DISCOVERY_HOSTNAME,
20 IP_ADDRESS
as DISCOVERY_IP_ADDRESS,
21 MAC_ADDRESS
as DISCOVERY_MAC_ADDRESS,
23 from cached_ipaddress
import cached_ip_addresses
25 from homeassistant
import config_entries
31 CONNECTED_DEVICE_REGISTERED,
32 DOMAIN
as DEVICE_TRACKER_DOMAIN,
36 EVENT_HOMEASSISTANT_STARTED,
37 EVENT_HOMEASSISTANT_STOP,
42 EventStateChangedData,
49 config_validation
as cv,
50 device_registry
as dr,
57 async_track_state_added_domain,
58 async_track_time_interval,
63 from .const
import DOMAIN
65 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
67 HOSTNAME: Final =
"hostname"
68 MAC_ADDRESS: Final =
"macaddress"
69 IP_ADDRESS: Final =
"ip"
70 REGISTERED_DEVICES: Final =
"registered_devices"
74 _LOGGER = logging.getLogger(__name__)
77 @dataclass(slots=True)
79 """Prepared info from dhcp entries."""
86 @dataclass(slots=True)
88 """Prepared info from dhcp entries."""
90 registered_devices_domains: set[str]
91 no_oui_matchers: dict[str, list[DHCPMatcher]]
92 oui_matchers: dict[str, list[DHCPMatcher]]
96 integration_matchers: list[DHCPMatcher],
98 """Index the integration matchers.
100 We have three types of matchers:
102 1. Registered devices
103 2. Devices with no OUI - index by first char of lower() hostname
104 3. Devices with OUI - index by OUI
106 registered_devices_domains: set[str] = set()
107 no_oui_matchers: dict[str, list[DHCPMatcher]] = {}
108 oui_matchers: dict[str, list[DHCPMatcher]] = {}
109 for matcher
in integration_matchers:
110 domain = matcher[
"domain"]
111 if REGISTERED_DEVICES
in matcher:
112 registered_devices_domains.add(domain)
115 if mac_address := matcher.get(MAC_ADDRESS):
116 oui_matchers.setdefault(mac_address[:6], []).append(matcher)
119 if hostname := matcher.get(HOSTNAME):
120 first_char = hostname[0].lower()
121 no_oui_matchers.setdefault(first_char, []).append(matcher)
124 registered_devices_domains=registered_devices_domains,
125 no_oui_matchers=no_oui_matchers,
126 oui_matchers=oui_matchers,
130 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
131 """Set up the dhcp component."""
132 watchers: list[WatcherBase] = []
133 address_data: dict[str, dict[str, str]] = {}
139 device_watcher.async_start()
140 watchers.append(device_watcher)
143 hass, address_data, integration_matchers
145 device_tracker_registered_watcher.async_start()
146 watchers.append(device_tracker_registered_watcher)
149 await aiodhcpwatcher.async_init()
151 network_watcher =
NetworkWatcher(hass, address_data, integration_matchers)
152 network_watcher.async_start()
153 watchers.append(network_watcher)
155 dhcp_watcher =
DHCPWatcher(hass, address_data, integration_matchers)
156 await dhcp_watcher.async_start()
157 watchers.append(dhcp_watcher)
160 hass, address_data, integration_matchers
162 rediscovery_watcher.async_start()
163 watchers.append(rediscovery_watcher)
167 for watcher
in watchers:
170 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop)
172 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _async_initialize)
177 """Base class for dhcp and device tracker watching."""
182 address_data: dict[str, dict[str, str]],
183 integration_matchers: DhcpMatchers,
185 """Initialize class."""
191 self.
_unsub_unsub: Callable[[],
None] |
None =
None
195 """Stop scanning for new devices on the network."""
205 unformatted_mac_address: str,
208 """Process a client."""
209 if (made_ip_address := cached_ip_addresses(ip_address))
is None:
211 _LOGGER.debug(
"Ignoring invalid IP Address: %s", ip_address)
215 made_ip_address.is_link_local
216 or made_ip_address.is_loopback
217 or made_ip_address.is_unspecified
222 formatted_mac =
format_mac(unformatted_mac_address)
226 mac_address = formatted_mac.replace(
":",
"")
227 compressed_ip_address = made_ip_address.compressed
233 and data[IP_ADDRESS] == compressed_ip_address
234 and data[HOSTNAME].startswith(hostname)
240 data = {IP_ADDRESS: compressed_ip_address, HOSTNAME: hostname}
243 lowercase_hostname = hostname.lower()
244 uppercase_mac = mac_address.upper()
247 "Processing updated address data for %s: mac=%s hostname=%s",
253 matched_domains: set[str] = set()
255 registered_devices_domains = matchers.registered_devices_domains
257 dev_reg = dr.async_get(self.
hasshass)
258 if device := dev_reg.async_get_device(
259 connections={(CONNECTION_NETWORK_MAC, formatted_mac)}
261 for entry_id
in device.config_entries:
263 entry := self.
hasshass.config_entries.async_get_entry(entry_id)
264 )
and entry.domain
in registered_devices_domains:
265 matched_domains.add(entry.domain)
267 oui = uppercase_mac[:6]
268 lowercase_hostname_first_char = (
269 lowercase_hostname[0]
if len(lowercase_hostname)
else ""
271 for matcher
in itertools.chain(
272 matchers.no_oui_matchers.get(lowercase_hostname_first_char, ()),
273 matchers.oui_matchers.get(oui, ()),
275 domain = matcher[
"domain"]
277 matcher_hostname := matcher.get(HOSTNAME)
279 lowercase_hostname, matcher_hostname
283 _LOGGER.debug(
"Matched %s against %s", data, matcher)
284 matched_domains.add(domain)
286 if not matched_domains:
294 for domain
in matched_domains:
295 discovery_flow.async_create_flow(
298 {
"source": config_entries.SOURCE_DHCP},
301 hostname=lowercase_hostname,
302 macaddress=mac_address,
304 discovery_key=discovery_key,
309 """Class to query ptr records routers."""
314 address_data: dict[str, dict[str, str]],
315 integration_matchers: DhcpMatchers,
317 """Initialize class."""
318 super().
__init__(hass, address_data, integration_matchers)
324 """Stop scanning for new devices on the network."""
332 """Start scanning for new devices on the network."""
338 name=
"DHCP network watcher",
344 """Start a new discovery task if one is not running."""
348 self.
async_discoverasync_discover(), name=
"dhcp discovery", eager_start=
True
352 """Process discovery."""
356 host[DISCOVERY_IP_ADDRESS],
357 host[DISCOVERY_HOSTNAME],
358 host[DISCOVERY_MAC_ADDRESS],
363 """Class to watch dhcp data from routers."""
367 """Stop watching for new device trackers."""
371 for state
in self.
hasshass.states.async_all(DEVICE_TRACKER_DOMAIN):
376 """Process a device tracker state change event."""
381 """Process a device tracker state."""
382 if state
is None or state.state != STATE_HOME:
385 attributes = state.attributes
387 if attributes.get(ATTR_SOURCE_TYPE) != SourceType.ROUTER:
390 ip_address = attributes.get(ATTR_IP)
391 hostname = attributes.get(ATTR_HOST_NAME,
"")
392 mac_address = attributes.get(ATTR_MAC)
394 if ip_address
is None or mac_address
is None:
401 """Class to watch data from device tracker registrations."""
405 """Stop watching for device tracker registrations."""
412 """Process a device tracker state."""
413 ip_address = data[ATTR_IP]
414 hostname = data[ATTR_HOST_NAME]
or ""
415 mac_address = data[ATTR_MAC]
417 if ip_address
is None or mac_address
is None:
424 """Class to watch dhcp requests."""
428 """Process a dhcp request."""
430 response.ip_address, response.hostname, response.mac_address
434 """Start watching for dhcp packets."""
439 """Class to trigger rediscovery on config entry removal."""
444 entry: config_entries.ConfigEntry,
446 """Handle config entry changes."""
447 for discovery_key
in entry.discovery_keys[DOMAIN]:
448 if discovery_key.version != 1
or not isinstance(discovery_key.key, str):
450 mac_address = discovery_key.key
451 _LOGGER.debug(
"Rediscover service %s", mac_address)
462 """Start watching for config entry removals."""
465 config_entries.signal_discovered_config_entry_removed(DOMAIN),
470 @lru_cache(maxsize=4096, typed=True)
472 """Compile a fnmatch pattern."""
473 return re.compile(translate(pattern))
476 @lru_cache(maxsize=1024, typed=True)
478 """Memorized version of fnmatch that has a larger lru_cache.
480 The default version of fnmatch only has a lru_cache of 256 entries.
481 With many devices we quickly reach that limit and end up compiling
482 the same pattern over and over again.
484 DHCP has its own memorized fnmatch with its own lru_cache
485 since the data is going to be relatively the same
486 since the devices will not change frequently
None _async_process_dhcp_request(self, aiodhcpwatcher.DHCPRequest response)
None _async_process_device_data(self, dict[str, str|None] data)
None _async_process_device_state(self, State|None state)
None _async_process_device_event(self, Event[EventStateChangedData] event)
None async_discover(self)
None __init__(self, HomeAssistant hass, dict[str, dict[str, str]] address_data, DhcpMatchers integration_matchers)
None async_start_discover(self, *Any _)
None _handle_config_entry_removed(self, config_entries.ConfigEntry entry)
None __init__(self, HomeAssistant hass, dict[str, dict[str, str]] address_data, DhcpMatchers integration_matchers)
None async_process_client(self, str ip_address, str hostname, str unformatted_mac_address, bool force=False)
list[_T] match(self, BluetoothServiceInfoBleak service_info)
web.Response get(self, web.Request request, str config_key)
bool _memorized_fnmatch(str name, str pattern)
bool async_setup(HomeAssistant hass, ConfigType config)
DhcpMatchers async_index_integration_matchers(list[DHCPMatcher] integration_matchers)
re.Pattern _compile_fnmatch(str pattern)
None _async_stop(HomeAssistant hass, bool restart)
None _async_initialize(HomeAssistant hass, ConfigEntry entry, YeelightDevice device)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
CALLBACK_TYPE async_track_state_added_domain(HomeAssistant hass, str|Iterable[str] domains, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
list[DHCPMatcher] async_get_dhcp(HomeAssistant hass)