1 """The Nmap Tracker integration."""
3 from __future__
import annotations
6 from dataclasses
import dataclass
7 from datetime
import datetime, timedelta
8 from functools
import partial
10 from typing
import Final
13 from getmac
import get_mac_address
14 from nmap
import PortScanner, PortScannerError
19 DEFAULT_CONSIDER_HOME,
37 TRACKER_SCAN_INTERVAL,
41 NMAP_TRANSIENT_FAILURE: Final =
"Assertion failed: htn.toclock_running == true"
42 MAX_SCAN_ATTEMPTS: Final = 16
46 """Return the first part of the hostname."""
47 return hostname.split(
".")[0]
51 """Generate a human readable name."""
55 return f
"{vendor} {mac_address[-8:]}"
56 return f
"Nmap Tracker {mac_address}"
61 """Class for keeping track of an nmap tracked device."""
70 first_offline: datetime |
None
74 """Storage class for all nmap trackers."""
77 """Initialize the data."""
78 self.tracked: dict[str, NmapDevice] = {}
79 self.ipv4_last_mac: dict[str, str] = {}
80 self.config_entry_owner: dict[str, str] = {}
83 _LOGGER = logging.getLogger(__name__)
87 """Set up Nmap Tracker from a config entry."""
88 domain_data = hass.data.setdefault(DOMAIN, {})
91 await scanner.async_setup()
92 entry.async_on_unload(entry.add_update_listener(_async_update_listener))
93 await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
98 """Handle options update."""
99 await hass.config_entries.async_reload(entry.entry_id)
103 """Unload a config entry."""
104 unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
108 hass.data[DOMAIN].pop(entry.entry_id)
115 """Remove tracking for devices owned by this config entry."""
116 devices = hass.data[DOMAIN][NMAP_TRACKED_DEVICES]
117 remove_mac_addresses = [
119 for mac_address, entry_id
in devices.config_entry_owner.items()
120 if entry_id == entry.entry_id
122 for mac_address
in remove_mac_addresses:
123 if device := devices.tracked.pop(mac_address,
None):
124 devices.ipv4_last_mac.pop(device.ipv4,
None)
125 del devices.config_entry_owner[mac_address]
129 """Signal specific per nmap tracker entry to signal updates in device."""
130 return f
"{DOMAIN}-device-update-{mac_address}"
134 """Scanner for devices using nmap."""
137 self, hass: HomeAssistant, entry: ConfigEntry, devices: NmapTrackedDevices
139 """Initialize the scanner."""
162 """Set up the tracker."""
163 config = self.
_entry_entry.options
165 seconds=config.get(CONF_SCAN_INTERVAL, TRACKER_SCAN_INTERVAL)
167 hosts_list = cv.ensure_list_csv(config[CONF_HOSTS])
168 self.
_hosts_hosts = [host
for host
in hosts_list
if host !=
""]
169 excludes_list = cv.ensure_list_csv(config[CONF_EXCLUDE])
170 self.
_exclude_exclude = [exclude
for exclude
in excludes_list
if exclude !=
""]
171 self.
_options_options = config[CONF_OPTIONS]
173 minutes=cv.positive_int(config[CONF_HOME_INTERVAL])
175 if config.get(CONF_CONSIDER_HOME):
177 seconds=cv.positive_float(config[CONF_CONSIDER_HOME])
180 if self.
_hass_hass.state
is CoreState.running:
184 self.
_entry_entry.async_on_unload(
185 self.
_hass_hass.bus.async_listen(
189 registry = er.async_get(self.
_hass_hass)
191 entry.unique_id: entry.original_name
192 for entry
in registry.entities.get_entries_for_config_entry_id(
199 """Signal specific per nmap tracker entry to signal new device."""
200 return f
"{DOMAIN}-device-new-{self._entry_id}"
204 """Signal specific per nmap tracker entry to signal a missing device."""
205 return f
"{DOMAIN}-device-missing-{self._entry_id}"
209 """Stop the scanner."""
213 """Start the scanner."""
215 self.
_entry_entry.async_on_unload(
222 if not aiooui.is_loaded():
223 await aiooui.async_load()
227 """Build the command line and strip out last results that do not need to be updated."""
232 device
for device
in self.
_last_results_last_results
if device.last_update > boundary
235 exclude_hosts = self.
_exclude_exclude + [device.ipv4
for device
in last_results]
237 exclude_hosts = self.
_exclude_exclude
240 exclude_hosts = self.
_exclude_exclude
242 options += f
" --exclude {','.join(exclude_hosts)}"
244 if "--reason" not in options:
245 options +=
" --reason"
247 if "-v" not in options:
253 """Scan devices and dispatch."""
256 "Nmap scanning is taking longer than the scheduled interval: %s",
257 TRACKER_SCAN_INTERVAL,
264 except PortScannerError
as ex:
265 _LOGGER.error(
"Nmap scanning failed: %s", ex)
277 if mac_address
in self.
devicesdevices.tracked:
279 self.
devicesdevices.config_entry_owner[mac_address] = self.
_entry_id_entry_id
285 aiooui.get_vendor(mac_address),
286 "Device not found in initial scan",
293 """Run nmap and return the result."""
296 self.
_scanner_scanner = PortScanner()
297 _LOGGER.debug(
"Scanning %s with args: %s", self.
_hosts_hosts, options)
298 for attempt
in range(MAX_SCAN_ATTEMPTS):
300 result = self.
_scanner_scanner.scan(
301 hosts=
" ".join(self.
_hosts_hosts),
303 timeout=TRACKER_SCAN_INTERVAL * 10,
306 except PortScannerError
as ex:
307 if attempt < (MAX_SCAN_ATTEMPTS - 1)
and NMAP_TRANSIENT_FAILURE
in str(
310 _LOGGER.debug(
"Nmap saw transient error %s", NMAP_TRANSIENT_FAILURE)
314 "Finished scanning %s with args: %s",
322 """Mark an IP offline."""
323 if not (formatted_mac := self.
devicesdevices.ipv4_last_mac.get(ipv4)):
325 if not (device := self.
devicesdevices.tracked.get(formatted_mac)):
328 if not device.first_offline:
330 "Setting first_offline for %s (%s) to: %s", ipv4, formatted_mac, now
332 device.first_offline = now
334 if device.first_offline + self.
consider_homeconsider_home > now:
337 "Device %s (%s) has NOT been offline (first offline at: %s) long"
338 " enough to be considered not home: %s"
342 device.first_offline,
348 "Device %s (%s) has been offline (first offline at: %s) long enough to"
349 " be considered not home: %s"
353 device.first_offline,
356 device.reason = reason
358 del self.
devicesdevices.ipv4_last_mac[ipv4]
361 """Scan the network for devices and dispatch events."""
369 for ipv4, info
in result[
"scan"].items():
370 status = info[
"status"]
371 reason = status[
"reason"]
372 if status[
"state"] !=
"up":
376 mac = info[
"addresses"].
get(
378 )
or await self.
_hass_hass.async_add_executor_job(
379 partial(get_mac_address, ip=ipv4)
383 _LOGGER.warning(
"No MAC address found for %s", ipv4)
388 devices.config_entry_owner.setdefault(formatted_mac, entry_id)
393 hostname = info[
"hostnames"][0][
"name"]
if info[
"hostnames"]
else ipv4
394 vendor = info.get(
"vendor", {}).
get(mac)
or aiooui.get_vendor(mac)
397 formatted_mac, hostname, name, ipv4, vendor, reason, now,
None
400 new = formatted_mac
not in devices.tracked
401 devices.tracked[formatted_mac] = device
402 devices.ipv4_last_mac[ipv4] = formatted_mac
str signal_device_new(self)
def _async_mark_missing_devices_as_not_home(self)
def _async_start_scanner(self, *_)
def _async_scan_devices(self, *_)
str signal_device_missing(self)
None _async_device_offline(self, str ipv4, str reason, datetime now)
None __init__(self, HomeAssistant hass, ConfigEntry entry, NmapTrackedDevices devices)
def _async_run_nmap_scan(self)
web.Response get(self, web.Request request, str config_key)
str signal_device_update(mac_address)
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
None _async_update_listener(HomeAssistant hass, ConfigEntry entry)
None _async_untrack_devices(HomeAssistant hass, ConfigEntry entry)
str short_hostname(str hostname)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
str human_readable_name(str hostname, str vendor, str mac_address)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)