Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The dhcp integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from dataclasses import dataclass
8 from datetime import timedelta
9 from fnmatch import translate
10 from functools import lru_cache
11 import itertools
12 import logging
13 import re
14 from typing import Any, Final
15 
16 import aiodhcpwatcher
17 from aiodiscover import DiscoverHosts
18 from aiodiscover.discovery import (
19  HOSTNAME as DISCOVERY_HOSTNAME,
20  IP_ADDRESS as DISCOVERY_IP_ADDRESS,
21  MAC_ADDRESS as DISCOVERY_MAC_ADDRESS,
22 )
23 from cached_ipaddress import cached_ip_addresses
24 
25 from homeassistant import config_entries
27  ATTR_HOST_NAME,
28  ATTR_IP,
29  ATTR_MAC,
30  ATTR_SOURCE_TYPE,
31  CONNECTED_DEVICE_REGISTERED,
32  DOMAIN as DEVICE_TRACKER_DOMAIN,
33  SourceType,
34 )
35 from homeassistant.const import (
36  EVENT_HOMEASSISTANT_STARTED,
37  EVENT_HOMEASSISTANT_STOP,
38  STATE_HOME,
39 )
40 from homeassistant.core import (
41  Event,
42  EventStateChangedData,
43  HomeAssistant,
44  State,
45  callback,
46 )
47 from homeassistant.data_entry_flow import BaseServiceInfo
48 from homeassistant.helpers import (
49  config_validation as cv,
50  device_registry as dr,
51  discovery_flow,
52 )
53 from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC, format_mac
54 from homeassistant.helpers.discovery_flow import DiscoveryKey
55 from homeassistant.helpers.dispatcher import async_dispatcher_connect
56 from homeassistant.helpers.event import (
57  async_track_state_added_domain,
58  async_track_time_interval,
59 )
60 from homeassistant.helpers.typing import ConfigType
61 from homeassistant.loader import DHCPMatcher, async_get_dhcp
62 
63 from .const import DOMAIN
64 
65 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
66 
67 HOSTNAME: Final = "hostname"
68 MAC_ADDRESS: Final = "macaddress"
69 IP_ADDRESS: Final = "ip"
70 REGISTERED_DEVICES: Final = "registered_devices"
71 SCAN_INTERVAL = timedelta(minutes=60)
72 
73 
74 _LOGGER = logging.getLogger(__name__)
75 
76 
77 @dataclass(slots=True)
79  """Prepared info from dhcp entries."""
80 
81  ip: str
82  hostname: str
83  macaddress: str
84 
85 
86 @dataclass(slots=True)
88  """Prepared info from dhcp entries."""
89 
90  registered_devices_domains: set[str]
91  no_oui_matchers: dict[str, list[DHCPMatcher]]
92  oui_matchers: dict[str, list[DHCPMatcher]]
93 
94 
96  integration_matchers: list[DHCPMatcher],
97 ) -> DhcpMatchers:
98  """Index the integration matchers.
99 
100  We have three types of matchers:
101 
102  1. Registered devices
103  2. Devices with no OUI - index by first char of lower() hostname
104  3. Devices with OUI - index by OUI
105  """
106  registered_devices_domains: set[str] = set()
107  no_oui_matchers: dict[str, list[DHCPMatcher]] = {}
108  oui_matchers: dict[str, list[DHCPMatcher]] = {}
109  for matcher in integration_matchers:
110  domain = matcher["domain"]
111  if REGISTERED_DEVICES in matcher:
112  registered_devices_domains.add(domain)
113  continue
114 
115  if mac_address := matcher.get(MAC_ADDRESS):
116  oui_matchers.setdefault(mac_address[:6], []).append(matcher)
117  continue
118 
119  if hostname := matcher.get(HOSTNAME):
120  first_char = hostname[0].lower()
121  no_oui_matchers.setdefault(first_char, []).append(matcher)
122 
123  return DhcpMatchers(
124  registered_devices_domains=registered_devices_domains,
125  no_oui_matchers=no_oui_matchers,
126  oui_matchers=oui_matchers,
127  )
128 
129 
130 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
131  """Set up the dhcp component."""
132  watchers: list[WatcherBase] = []
133  address_data: dict[str, dict[str, str]] = {}
134  integration_matchers = async_index_integration_matchers(await async_get_dhcp(hass))
135  # For the passive classes we need to start listening
136  # for state changes and connect the dispatchers before
137  # everything else starts up or we will miss events
138  device_watcher = DeviceTrackerWatcher(hass, address_data, integration_matchers)
139  device_watcher.async_start()
140  watchers.append(device_watcher)
141 
142  device_tracker_registered_watcher = DeviceTrackerRegisteredWatcher(
143  hass, address_data, integration_matchers
144  )
145  device_tracker_registered_watcher.async_start()
146  watchers.append(device_tracker_registered_watcher)
147 
148  async def _async_initialize(event: Event) -> None:
149  await aiodhcpwatcher.async_init()
150 
151  network_watcher = NetworkWatcher(hass, address_data, integration_matchers)
152  network_watcher.async_start()
153  watchers.append(network_watcher)
154 
155  dhcp_watcher = DHCPWatcher(hass, address_data, integration_matchers)
156  await dhcp_watcher.async_start()
157  watchers.append(dhcp_watcher)
158 
159  rediscovery_watcher = RediscoveryWatcher(
160  hass, address_data, integration_matchers
161  )
162  rediscovery_watcher.async_start()
163  watchers.append(rediscovery_watcher)
164 
165  @callback
166  def _async_stop(event: Event) -> None:
167  for watcher in watchers:
168  watcher.async_stop()
169 
170  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop)
171 
172  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _async_initialize)
173  return True
174 
175 
177  """Base class for dhcp and device tracker watching."""
178 
179  def __init__(
180  self,
181  hass: HomeAssistant,
182  address_data: dict[str, dict[str, str]],
183  integration_matchers: DhcpMatchers,
184  ) -> None:
185  """Initialize class."""
186  super().__init__()
187 
188  self.hasshass = hass
189  self._integration_matchers_integration_matchers = integration_matchers
190  self._address_data_address_data = address_data
191  self._unsub_unsub: Callable[[], None] | None = None
192 
193  @callback
194  def async_stop(self) -> None:
195  """Stop scanning for new devices on the network."""
196  if self._unsub_unsub:
197  self._unsub_unsub()
198  self._unsub_unsub = None
199 
200  @callback
202  self,
203  ip_address: str,
204  hostname: str,
205  unformatted_mac_address: str,
206  force: bool = False,
207  ) -> None:
208  """Process a client."""
209  if (made_ip_address := cached_ip_addresses(ip_address)) is None:
210  # Ignore invalid addresses
211  _LOGGER.debug("Ignoring invalid IP Address: %s", ip_address)
212  return
213 
214  if (
215  made_ip_address.is_link_local
216  or made_ip_address.is_loopback
217  or made_ip_address.is_unspecified
218  ):
219  # Ignore self assigned addresses, loopback, invalid
220  return
221 
222  formatted_mac = format_mac(unformatted_mac_address)
223  # Historically, the MAC address was formatted without colons
224  # and since all consumers of this data are expecting it to be
225  # formatted without colons we will continue to do so
226  mac_address = formatted_mac.replace(":", "")
227  compressed_ip_address = made_ip_address.compressed
228 
229  data = self._address_data_address_data.get(mac_address)
230  if (
231  not force
232  and data
233  and data[IP_ADDRESS] == compressed_ip_address
234  and data[HOSTNAME].startswith(hostname)
235  ):
236  # If the address data is the same no need
237  # to process it
238  return
239 
240  data = {IP_ADDRESS: compressed_ip_address, HOSTNAME: hostname}
241  self._address_data_address_data[mac_address] = data
242 
243  lowercase_hostname = hostname.lower()
244  uppercase_mac = mac_address.upper()
245 
246  _LOGGER.debug(
247  "Processing updated address data for %s: mac=%s hostname=%s",
248  ip_address,
249  uppercase_mac,
250  lowercase_hostname,
251  )
252 
253  matched_domains: set[str] = set()
254  matchers = self._integration_matchers_integration_matchers
255  registered_devices_domains = matchers.registered_devices_domains
256 
257  dev_reg = dr.async_get(self.hasshass)
258  if device := dev_reg.async_get_device(
259  connections={(CONNECTION_NETWORK_MAC, formatted_mac)}
260  ):
261  for entry_id in device.config_entries:
262  if (
263  entry := self.hasshass.config_entries.async_get_entry(entry_id)
264  ) and entry.domain in registered_devices_domains:
265  matched_domains.add(entry.domain)
266 
267  oui = uppercase_mac[:6]
268  lowercase_hostname_first_char = (
269  lowercase_hostname[0] if len(lowercase_hostname) else ""
270  )
271  for matcher in itertools.chain(
272  matchers.no_oui_matchers.get(lowercase_hostname_first_char, ()),
273  matchers.oui_matchers.get(oui, ()),
274  ):
275  domain = matcher["domain"]
276  if (
277  matcher_hostname := matcher.get(HOSTNAME)
278  ) is not None and not _memorized_fnmatch(
279  lowercase_hostname, matcher_hostname
280  ):
281  continue
282 
283  _LOGGER.debug("Matched %s against %s", data, matcher)
284  matched_domains.add(domain)
285 
286  if not matched_domains:
287  return # avoid creating DiscoveryKey if there are no matches
288 
289  discovery_key = DiscoveryKey(
290  domain=DOMAIN,
291  key=mac_address,
292  version=1,
293  )
294  for domain in matched_domains:
295  discovery_flow.async_create_flow(
296  self.hasshass,
297  domain,
298  {"source": config_entries.SOURCE_DHCP},
300  ip=ip_address,
301  hostname=lowercase_hostname,
302  macaddress=mac_address,
303  ),
304  discovery_key=discovery_key,
305  )
306 
307 
309  """Class to query ptr records routers."""
310 
311  def __init__(
312  self,
313  hass: HomeAssistant,
314  address_data: dict[str, dict[str, str]],
315  integration_matchers: DhcpMatchers,
316  ) -> None:
317  """Initialize class."""
318  super().__init__(hass, address_data, integration_matchers)
319  self._discover_hosts_discover_hosts: DiscoverHosts | None = None
320  self._discover_task_discover_task: asyncio.Task | None = None
321 
322  @callback
323  def async_stop(self) -> None:
324  """Stop scanning for new devices on the network."""
325  super().async_stop()
326  if self._discover_task_discover_task:
327  self._discover_task_discover_task.cancel()
328  self._discover_task_discover_task = None
329 
330  @callback
331  def async_start(self) -> None:
332  """Start scanning for new devices on the network."""
333  self._discover_hosts_discover_hosts = DiscoverHosts()
335  self.hasshass,
336  self.async_start_discoverasync_start_discover,
337  SCAN_INTERVAL,
338  name="DHCP network watcher",
339  )
340  self.async_start_discoverasync_start_discover()
341 
342  @callback
343  def async_start_discover(self, *_: Any) -> None:
344  """Start a new discovery task if one is not running."""
345  if self._discover_task_discover_task and not self._discover_task_discover_task.done():
346  return
347  self._discover_task_discover_task = self.hasshass.async_create_background_task(
348  self.async_discoverasync_discover(), name="dhcp discovery", eager_start=True
349  )
350 
351  async def async_discover(self) -> None:
352  """Process discovery."""
353  assert self._discover_hosts_discover_hosts is not None
354  for host in await self._discover_hosts_discover_hosts.async_discover():
355  self.async_process_clientasync_process_client(
356  host[DISCOVERY_IP_ADDRESS],
357  host[DISCOVERY_HOSTNAME],
358  host[DISCOVERY_MAC_ADDRESS],
359  )
360 
361 
363  """Class to watch dhcp data from routers."""
364 
365  @callback
366  def async_start(self) -> None:
367  """Stop watching for new device trackers."""
369  self.hasshass, [DEVICE_TRACKER_DOMAIN], self._async_process_device_event_async_process_device_event
370  )
371  for state in self.hasshass.states.async_all(DEVICE_TRACKER_DOMAIN):
372  self._async_process_device_state_async_process_device_state(state)
373 
374  @callback
375  def _async_process_device_event(self, event: Event[EventStateChangedData]) -> None:
376  """Process a device tracker state change event."""
377  self._async_process_device_state_async_process_device_state(event.data["new_state"])
378 
379  @callback
380  def _async_process_device_state(self, state: State | None) -> None:
381  """Process a device tracker state."""
382  if state is None or state.state != STATE_HOME:
383  return
384 
385  attributes = state.attributes
386 
387  if attributes.get(ATTR_SOURCE_TYPE) != SourceType.ROUTER:
388  return
389 
390  ip_address = attributes.get(ATTR_IP)
391  hostname = attributes.get(ATTR_HOST_NAME, "")
392  mac_address = attributes.get(ATTR_MAC)
393 
394  if ip_address is None or mac_address is None:
395  return
396 
397  self.async_process_clientasync_process_client(ip_address, hostname, mac_address)
398 
399 
401  """Class to watch data from device tracker registrations."""
402 
403  @callback
404  def async_start(self) -> None:
405  """Stop watching for device tracker registrations."""
407  self.hasshass, CONNECTED_DEVICE_REGISTERED, self._async_process_device_data_async_process_device_data
408  )
409 
410  @callback
411  def _async_process_device_data(self, data: dict[str, str | None]) -> None:
412  """Process a device tracker state."""
413  ip_address = data[ATTR_IP]
414  hostname = data[ATTR_HOST_NAME] or ""
415  mac_address = data[ATTR_MAC]
416 
417  if ip_address is None or mac_address is None:
418  return
419 
420  self.async_process_clientasync_process_client(ip_address, hostname, mac_address)
421 
422 
424  """Class to watch dhcp requests."""
425 
426  @callback
427  def _async_process_dhcp_request(self, response: aiodhcpwatcher.DHCPRequest) -> None:
428  """Process a dhcp request."""
429  self.async_process_clientasync_process_client(
430  response.ip_address, response.hostname, response.mac_address
431  )
432 
433  async def async_start(self) -> None:
434  """Start watching for dhcp packets."""
435  self._unsub_unsub_unsub = await aiodhcpwatcher.async_start(self._async_process_dhcp_request_async_process_dhcp_request)
436 
437 
439  """Class to trigger rediscovery on config entry removal."""
440 
441  @callback
443  self,
444  entry: config_entries.ConfigEntry,
445  ) -> None:
446  """Handle config entry changes."""
447  for discovery_key in entry.discovery_keys[DOMAIN]:
448  if discovery_key.version != 1 or not isinstance(discovery_key.key, str):
449  continue
450  mac_address = discovery_key.key
451  _LOGGER.debug("Rediscover service %s", mac_address)
452  if data := self._address_data_address_data.get(mac_address):
453  self.async_process_clientasync_process_client(
454  data[IP_ADDRESS],
455  data[HOSTNAME],
456  mac_address,
457  True, # Force rediscovery
458  )
459 
460  @callback
461  def async_start(self) -> None:
462  """Start watching for config entry removals."""
464  self.hasshass,
465  config_entries.signal_discovered_config_entry_removed(DOMAIN),
466  self._handle_config_entry_removed_handle_config_entry_removed,
467  )
468 
469 
470 @lru_cache(maxsize=4096, typed=True)
471 def _compile_fnmatch(pattern: str) -> re.Pattern:
472  """Compile a fnmatch pattern."""
473  return re.compile(translate(pattern))
474 
475 
476 @lru_cache(maxsize=1024, typed=True)
477 def _memorized_fnmatch(name: str, pattern: str) -> bool:
478  """Memorized version of fnmatch that has a larger lru_cache.
479 
480  The default version of fnmatch only has a lru_cache of 256 entries.
481  With many devices we quickly reach that limit and end up compiling
482  the same pattern over and over again.
483 
484  DHCP has its own memorized fnmatch with its own lru_cache
485  since the data is going to be relatively the same
486  since the devices will not change frequently
487  """
488  return bool(_compile_fnmatch(pattern).match(name))
None _async_process_dhcp_request(self, aiodhcpwatcher.DHCPRequest response)
Definition: __init__.py:427
None _async_process_device_data(self, dict[str, str|None] data)
Definition: __init__.py:411
None _async_process_device_state(self, State|None state)
Definition: __init__.py:380
None _async_process_device_event(self, Event[EventStateChangedData] event)
Definition: __init__.py:375
None __init__(self, HomeAssistant hass, dict[str, dict[str, str]] address_data, DhcpMatchers integration_matchers)
Definition: __init__.py:316
None _handle_config_entry_removed(self, config_entries.ConfigEntry entry)
Definition: __init__.py:445
None __init__(self, HomeAssistant hass, dict[str, dict[str, str]] address_data, DhcpMatchers integration_matchers)
Definition: __init__.py:184
None async_process_client(self, str ip_address, str hostname, str unformatted_mac_address, bool force=False)
Definition: __init__.py:207
list[_T] match(self, BluetoothServiceInfoBleak service_info)
Definition: match.py:246
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool _memorized_fnmatch(str name, str pattern)
Definition: __init__.py:477
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:130
DhcpMatchers async_index_integration_matchers(list[DHCPMatcher] integration_matchers)
Definition: __init__.py:97
re.Pattern _compile_fnmatch(str pattern)
Definition: __init__.py:471
None _async_stop(HomeAssistant hass, bool restart)
Definition: __init__.py:392
None _async_initialize(HomeAssistant hass, ConfigEntry entry, YeelightDevice device)
Definition: __init__.py:146
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
CALLBACK_TYPE async_track_state_added_domain(HomeAssistant hass, str|Iterable[str] domains, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)
Definition: event.py:648
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679
list[DHCPMatcher] async_get_dhcp(HomeAssistant hass)
Definition: loader.py:531