Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The Nmap Tracker integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from dataclasses import dataclass
7 from datetime import datetime, timedelta
8 from functools import partial
9 import logging
10 from typing import Final
11 
12 import aiooui
13 from getmac import get_mac_address
14 from nmap import PortScanner, PortScannerError
15 
17  CONF_CONSIDER_HOME,
18  CONF_SCAN_INTERVAL,
19  DEFAULT_CONSIDER_HOME,
20 )
21 from homeassistant.config_entries import ConfigEntry
22 from homeassistant.const import CONF_EXCLUDE, CONF_HOSTS, EVENT_HOMEASSISTANT_STARTED
23 from homeassistant.core import CoreState, HomeAssistant, callback
24 from homeassistant.helpers import entity_registry as er
26 from homeassistant.helpers.device_registry import format_mac
27 from homeassistant.helpers.dispatcher import async_dispatcher_send
28 from homeassistant.helpers.event import async_track_time_interval
29 import homeassistant.util.dt as dt_util
30 
31 from .const import (
32  CONF_HOME_INTERVAL,
33  CONF_OPTIONS,
34  DOMAIN,
35  NMAP_TRACKED_DEVICES,
36  PLATFORMS,
37  TRACKER_SCAN_INTERVAL,
38 )
39 
40 # Some version of nmap will fail with 'Assertion failed: htn.toclock_running == true (Target.cc: stopTimeOutClock: 503)\n'
41 NMAP_TRANSIENT_FAILURE: Final = "Assertion failed: htn.toclock_running == true"
42 MAX_SCAN_ATTEMPTS: Final = 16
43 
44 
45 def short_hostname(hostname: str) -> str:
46  """Return the first part of the hostname."""
47  return hostname.split(".")[0]
48 
49 
50 def human_readable_name(hostname: str, vendor: str, mac_address: str) -> str:
51  """Generate a human readable name."""
52  if hostname:
53  return short_hostname(hostname)
54  if vendor:
55  return f"{vendor} {mac_address[-8:]}"
56  return f"Nmap Tracker {mac_address}"
57 
58 
59 @dataclass
60 class NmapDevice:
61  """Class for keeping track of an nmap tracked device."""
62 
63  mac_address: str
64  hostname: str
65  name: str
66  ipv4: str
67  manufacturer: str
68  reason: str
69  last_update: datetime
70  first_offline: datetime | None
71 
72 
74  """Storage class for all nmap trackers."""
75 
76  def __init__(self) -> None:
77  """Initialize the data."""
78  self.tracked: dict[str, NmapDevice] = {}
79  self.ipv4_last_mac: dict[str, str] = {}
80  self.config_entry_owner: dict[str, str] = {}
81 
82 
83 _LOGGER = logging.getLogger(__name__)
84 
85 
86 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
87  """Set up Nmap Tracker from a config entry."""
88  domain_data = hass.data.setdefault(DOMAIN, {})
89  devices = domain_data.setdefault(NMAP_TRACKED_DEVICES, NmapTrackedDevices())
90  scanner = domain_data[entry.entry_id] = NmapDeviceScanner(hass, entry, devices)
91  await scanner.async_setup()
92  entry.async_on_unload(entry.add_update_listener(_async_update_listener))
93  await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
94  return True
95 
96 
97 async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
98  """Handle options update."""
99  await hass.config_entries.async_reload(entry.entry_id)
100 
101 
102 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
103  """Unload a config entry."""
104  unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
105 
106  if unload_ok:
107  _async_untrack_devices(hass, entry)
108  hass.data[DOMAIN].pop(entry.entry_id)
109 
110  return unload_ok
111 
112 
113 @callback
114 def _async_untrack_devices(hass: HomeAssistant, entry: ConfigEntry) -> None:
115  """Remove tracking for devices owned by this config entry."""
116  devices = hass.data[DOMAIN][NMAP_TRACKED_DEVICES]
117  remove_mac_addresses = [
118  mac_address
119  for mac_address, entry_id in devices.config_entry_owner.items()
120  if entry_id == entry.entry_id
121  ]
122  for mac_address in remove_mac_addresses:
123  if device := devices.tracked.pop(mac_address, None):
124  devices.ipv4_last_mac.pop(device.ipv4, None)
125  del devices.config_entry_owner[mac_address]
126 
127 
128 def signal_device_update(mac_address) -> str:
129  """Signal specific per nmap tracker entry to signal updates in device."""
130  return f"{DOMAIN}-device-update-{mac_address}"
131 
132 
134  """Scanner for devices using nmap."""
135 
136  def __init__(
137  self, hass: HomeAssistant, entry: ConfigEntry, devices: NmapTrackedDevices
138  ) -> None:
139  """Initialize the scanner."""
140  self.devicesdevices = devices
141  self.home_intervalhome_interval = None
142  self.consider_homeconsider_home = DEFAULT_CONSIDER_HOME
143 
144  self._hass_hass = hass
145  self._entry_entry = entry
146 
147  self._scan_lock_scan_lock = None
148  self._stopping_stopping = False
149  self._scanner_scanner = None
150 
151  self._entry_id_entry_id = entry.entry_id
152  self._hosts_hosts = None
153  self._options_options = None
154  self._exclude_exclude = None
155  self._scan_interval_scan_interval = None
156 
157  self._known_mac_addresses_known_mac_addresses: dict[str, str] = {}
158  self._finished_first_scan_finished_first_scan = False
159  self._last_results_last_results: list[NmapDevice] = []
160 
161  async def async_setup(self):
162  """Set up the tracker."""
163  config = self._entry_entry.options
164  self._scan_interval_scan_interval = timedelta(
165  seconds=config.get(CONF_SCAN_INTERVAL, TRACKER_SCAN_INTERVAL)
166  )
167  hosts_list = cv.ensure_list_csv(config[CONF_HOSTS])
168  self._hosts_hosts = [host for host in hosts_list if host != ""]
169  excludes_list = cv.ensure_list_csv(config[CONF_EXCLUDE])
170  self._exclude_exclude = [exclude for exclude in excludes_list if exclude != ""]
171  self._options_options = config[CONF_OPTIONS]
172  self.home_intervalhome_interval = timedelta(
173  minutes=cv.positive_int(config[CONF_HOME_INTERVAL])
174  )
175  if config.get(CONF_CONSIDER_HOME):
176  self.consider_homeconsider_home = timedelta(
177  seconds=cv.positive_float(config[CONF_CONSIDER_HOME])
178  )
179  self._scan_lock_scan_lock = asyncio.Lock()
180  if self._hass_hass.state is CoreState.running:
181  await self._async_start_scanner_async_start_scanner()
182  return
183 
184  self._entry_entry.async_on_unload(
185  self._hass_hass.bus.async_listen(
186  EVENT_HOMEASSISTANT_STARTED, self._async_start_scanner_async_start_scanner
187  )
188  )
189  registry = er.async_get(self._hass_hass)
190  self._known_mac_addresses_known_mac_addresses = {
191  entry.unique_id: entry.original_name
192  for entry in registry.entities.get_entries_for_config_entry_id(
193  self._entry_id_entry_id
194  )
195  }
196 
197  @property
198  def signal_device_new(self) -> str:
199  """Signal specific per nmap tracker entry to signal new device."""
200  return f"{DOMAIN}-device-new-{self._entry_id}"
201 
202  @property
203  def signal_device_missing(self) -> str:
204  """Signal specific per nmap tracker entry to signal a missing device."""
205  return f"{DOMAIN}-device-missing-{self._entry_id}"
206 
207  @callback
208  def _async_stop(self):
209  """Stop the scanner."""
210  self._stopping_stopping = True
211 
212  async def _async_start_scanner(self, *_):
213  """Start the scanner."""
214  self._entry_entry.async_on_unload(self._async_stop_async_stop)
215  self._entry_entry.async_on_unload(
217  self._hass_hass,
218  self._async_scan_devices_async_scan_devices,
219  self._scan_interval_scan_interval,
220  )
221  )
222  if not aiooui.is_loaded():
223  await aiooui.async_load()
224  self._hass_hass.async_create_task(self._async_scan_devices_async_scan_devices())
225 
226  def _build_options(self):
227  """Build the command line and strip out last results that do not need to be updated."""
228  options = self._options_options
229  if self.home_intervalhome_interval:
230  boundary = dt_util.now() - self.home_intervalhome_interval
231  last_results = [
232  device for device in self._last_results_last_results if device.last_update > boundary
233  ]
234  if last_results:
235  exclude_hosts = self._exclude_exclude + [device.ipv4 for device in last_results]
236  else:
237  exclude_hosts = self._exclude_exclude
238  else:
239  last_results = []
240  exclude_hosts = self._exclude_exclude
241  if exclude_hosts:
242  options += f" --exclude {','.join(exclude_hosts)}"
243  # Report reason
244  if "--reason" not in options:
245  options += " --reason"
246  # Report down hosts
247  if "-v" not in options:
248  options += " -v"
249  self._last_results_last_results = last_results
250  return options
251 
252  async def _async_scan_devices(self, *_):
253  """Scan devices and dispatch."""
254  if self._scan_lock_scan_lock.locked():
255  _LOGGER.debug(
256  "Nmap scanning is taking longer than the scheduled interval: %s",
257  TRACKER_SCAN_INTERVAL,
258  )
259  return
260 
261  async with self._scan_lock_scan_lock:
262  try:
263  await self._async_run_nmap_scan_async_run_nmap_scan()
264  except PortScannerError as ex:
265  _LOGGER.error("Nmap scanning failed: %s", ex)
266 
267  if not self._finished_first_scan_finished_first_scan:
268  self._finished_first_scan_finished_first_scan = True
269  await self._async_mark_missing_devices_as_not_home_async_mark_missing_devices_as_not_home()
270 
272  # After all config entries have finished their first
273  # scan we mark devices that were not found as not_home
274  # from unavailable
275  now = dt_util.now()
276  for mac_address, original_name in self._known_mac_addresses_known_mac_addresses.items():
277  if mac_address in self.devicesdevices.tracked:
278  continue
279  self.devicesdevices.config_entry_owner[mac_address] = self._entry_id_entry_id
280  self.devicesdevices.tracked[mac_address] = NmapDevice(
281  mac_address,
282  None,
283  original_name,
284  None,
285  aiooui.get_vendor(mac_address),
286  "Device not found in initial scan",
287  now,
288  1,
289  )
290  async_dispatcher_send(self._hass_hass, self.signal_device_missingsignal_device_missing, mac_address)
291 
292  def _run_nmap_scan(self):
293  """Run nmap and return the result."""
294  options = self._build_options_build_options()
295  if not self._scanner_scanner:
296  self._scanner_scanner = PortScanner()
297  _LOGGER.debug("Scanning %s with args: %s", self._hosts_hosts, options)
298  for attempt in range(MAX_SCAN_ATTEMPTS):
299  try:
300  result = self._scanner_scanner.scan(
301  hosts=" ".join(self._hosts_hosts),
302  arguments=options,
303  timeout=TRACKER_SCAN_INTERVAL * 10,
304  )
305  break
306  except PortScannerError as ex:
307  if attempt < (MAX_SCAN_ATTEMPTS - 1) and NMAP_TRANSIENT_FAILURE in str(
308  ex
309  ):
310  _LOGGER.debug("Nmap saw transient error %s", NMAP_TRANSIENT_FAILURE)
311  continue
312  raise
313  _LOGGER.debug(
314  "Finished scanning %s with args: %s",
315  self._hosts_hosts,
316  options,
317  )
318  return result
319 
320  @callback
321  def _async_device_offline(self, ipv4: str, reason: str, now: datetime) -> None:
322  """Mark an IP offline."""
323  if not (formatted_mac := self.devicesdevices.ipv4_last_mac.get(ipv4)):
324  return
325  if not (device := self.devicesdevices.tracked.get(formatted_mac)):
326  # Device was unloaded
327  return
328  if not device.first_offline:
329  _LOGGER.debug(
330  "Setting first_offline for %s (%s) to: %s", ipv4, formatted_mac, now
331  )
332  device.first_offline = now
333  return
334  if device.first_offline + self.consider_homeconsider_home > now:
335  _LOGGER.debug(
336  (
337  "Device %s (%s) has NOT been offline (first offline at: %s) long"
338  " enough to be considered not home: %s"
339  ),
340  ipv4,
341  formatted_mac,
342  device.first_offline,
343  self.consider_homeconsider_home,
344  )
345  return
346  _LOGGER.debug(
347  (
348  "Device %s (%s) has been offline (first offline at: %s) long enough to"
349  " be considered not home: %s"
350  ),
351  ipv4,
352  formatted_mac,
353  device.first_offline,
354  self.consider_homeconsider_home,
355  )
356  device.reason = reason
357  async_dispatcher_send(self._hass_hass, signal_device_update(formatted_mac), False)
358  del self.devicesdevices.ipv4_last_mac[ipv4]
359 
360  async def _async_run_nmap_scan(self):
361  """Scan the network for devices and dispatch events."""
362  result = await self._hass_hass.async_add_executor_job(self._run_nmap_scan_run_nmap_scan)
363  if self._stopping_stopping:
364  return
365 
366  devices = self.devicesdevices
367  entry_id = self._entry_id_entry_id
368  now = dt_util.now()
369  for ipv4, info in result["scan"].items():
370  status = info["status"]
371  reason = status["reason"]
372  if status["state"] != "up":
373  self._async_device_offline_async_device_offline(ipv4, reason, now)
374  continue
375  # Mac address only returned if nmap ran as root
376  mac = info["addresses"].get(
377  "mac"
378  ) or await self._hass_hass.async_add_executor_job(
379  partial(get_mac_address, ip=ipv4)
380  )
381  if mac is None:
382  self._async_device_offline_async_device_offline(ipv4, "No MAC address found", now)
383  _LOGGER.warning("No MAC address found for %s", ipv4)
384  continue
385 
386  formatted_mac = format_mac(mac)
387  if (
388  devices.config_entry_owner.setdefault(formatted_mac, entry_id)
389  != entry_id
390  ):
391  continue
392 
393  hostname = info["hostnames"][0]["name"] if info["hostnames"] else ipv4
394  vendor = info.get("vendor", {}).get(mac) or aiooui.get_vendor(mac)
395  name = human_readable_name(hostname, vendor, mac)
396  device = NmapDevice(
397  formatted_mac, hostname, name, ipv4, vendor, reason, now, None
398  )
399 
400  new = formatted_mac not in devices.tracked
401  devices.tracked[formatted_mac] = device
402  devices.ipv4_last_mac[ipv4] = formatted_mac
403  self._last_results_last_results.append(device)
404 
405  if new:
406  async_dispatcher_send(self._hass_hass, self.signal_device_newsignal_device_new, formatted_mac)
407  else:
409  self._hass_hass, signal_device_update(formatted_mac), True
410  )
None _async_device_offline(self, str ipv4, str reason, datetime now)
Definition: __init__.py:321
None __init__(self, HomeAssistant hass, ConfigEntry entry, NmapTrackedDevices devices)
Definition: __init__.py:138
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
str signal_device_update(mac_address)
Definition: __init__.py:128
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:102
None _async_update_listener(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:97
None _async_untrack_devices(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:114
str short_hostname(str hostname)
Definition: __init__.py:45
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:86
str human_readable_name(str hostname, str vendor, str mac_address)
Definition: __init__.py:50
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679