Home Assistant Unofficial Reference 2024.12.1
device_tracker.py
Go to the documentation of this file.
1 """Support for fetching WiFi associations through SNMP."""
2 
3 from __future__ import annotations
4 
5 import binascii
6 import logging
7 from typing import TYPE_CHECKING
8 
9 from pysnmp.error import PySnmpError
10 from pysnmp.hlapi.asyncio import (
11  CommunityData,
12  Udp6TransportTarget,
13  UdpTransportTarget,
14  UsmUserData,
15  bulkWalkCmd,
16  isEndOfMib,
17 )
18 import voluptuous as vol
19 
21  DOMAIN as DEVICE_TRACKER_DOMAIN,
22  PLATFORM_SCHEMA as DEVICE_TRACKER_PLATFORM_SCHEMA,
23  DeviceScanner,
24 )
25 from homeassistant.const import CONF_HOST
26 from homeassistant.core import HomeAssistant
28 from homeassistant.helpers.typing import ConfigType
29 
30 from .const import (
31  CONF_AUTH_KEY,
32  CONF_BASEOID,
33  CONF_COMMUNITY,
34  CONF_PRIV_KEY,
35  DEFAULT_AUTH_PROTOCOL,
36  DEFAULT_COMMUNITY,
37  DEFAULT_PORT,
38  DEFAULT_PRIV_PROTOCOL,
39  DEFAULT_TIMEOUT,
40  DEFAULT_VERSION,
41  SNMP_VERSIONS,
42 )
43 from .util import RequestArgsType, async_create_request_cmd_args
44 
45 _LOGGER = logging.getLogger(__name__)
46 
47 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
48  {
49  vol.Required(CONF_BASEOID): cv.string,
50  vol.Required(CONF_HOST): cv.string,
51  vol.Optional(CONF_COMMUNITY, default=DEFAULT_COMMUNITY): cv.string,
52  vol.Inclusive(CONF_AUTH_KEY, "keys"): cv.string,
53  vol.Inclusive(CONF_PRIV_KEY, "keys"): cv.string,
54  }
55 )
56 
57 
59  hass: HomeAssistant, config: ConfigType
60 ) -> SnmpScanner | None:
61  """Validate the configuration and return an SNMP scanner."""
62  scanner = SnmpScanner(config[DEVICE_TRACKER_DOMAIN])
63  await scanner.async_init(hass)
64 
65  return scanner if scanner.success_init else None
66 
67 
68 class SnmpScanner(DeviceScanner):
69  """Queries any SNMP capable Access Point for connected devices."""
70 
71  def __init__(self, config):
72  """Initialize the scanner and test the target device."""
73  host = config[CONF_HOST]
74  community = config[CONF_COMMUNITY]
75  baseoid = config[CONF_BASEOID]
76  authkey = config.get(CONF_AUTH_KEY)
77  authproto = DEFAULT_AUTH_PROTOCOL
78  privkey = config.get(CONF_PRIV_KEY)
79  privproto = DEFAULT_PRIV_PROTOCOL
80 
81  try:
82  # Try IPv4 first.
83  target = UdpTransportTarget((host, DEFAULT_PORT), timeout=DEFAULT_TIMEOUT)
84  except PySnmpError:
85  # Then try IPv6.
86  try:
87  target = Udp6TransportTarget(
88  (host, DEFAULT_PORT), timeout=DEFAULT_TIMEOUT
89  )
90  except PySnmpError as err:
91  _LOGGER.error("Invalid SNMP host: %s", err)
92  return
93 
94  if authkey is not None or privkey is not None:
95  if not authkey:
96  authproto = "none"
97  if not privkey:
98  privproto = "none"
99 
100  self._auth_data_auth_data = UsmUserData(
101  community,
102  authKey=authkey or None,
103  privKey=privkey or None,
104  authProtocol=authproto,
105  privProtocol=privproto,
106  )
107  else:
108  self._auth_data_auth_data = CommunityData(
109  community, mpModel=SNMP_VERSIONS[DEFAULT_VERSION]
110  )
111 
112  self._target_target = target
113  self.request_argsrequest_args: RequestArgsType | None = None
114  self.baseoidbaseoid = baseoid
115  self.last_resultslast_results = []
116  self.success_initsuccess_init = False
117 
118  async def async_init(self, hass: HomeAssistant) -> None:
119  """Make a one-off read to check if the target device is reachable and readable."""
120  self.request_argsrequest_args = await async_create_request_cmd_args(
121  hass, self._auth_data_auth_data, self._target_target, self.baseoidbaseoid
122  )
123  data = await self.async_get_snmp_dataasync_get_snmp_data()
124  self.success_initsuccess_init = data is not None
125 
126  async def async_scan_devices(self):
127  """Scan for new devices and return a list with found device IDs."""
128  await self._async_update_info_async_update_info()
129  return [client["mac"] for client in self.last_resultslast_results if client.get("mac")]
130 
131  async def async_get_device_name(self, device: str) -> str | None:
132  """Return the name of the given device or None if we don't know."""
133  # We have no names
134  return None
135 
136  async def _async_update_info(self):
137  """Ensure the information from the device is up to date.
138 
139  Return boolean if scanning successful.
140  """
141  if not self.success_initsuccess_init:
142  return False
143 
144  if not (data := await self.async_get_snmp_dataasync_get_snmp_data()):
145  return False
146 
147  self.last_resultslast_results = data
148  return True
149 
150  async def async_get_snmp_data(self):
151  """Fetch MAC addresses from access point via SNMP."""
152  devices = []
153  if TYPE_CHECKING:
154  assert self.request_argsrequest_args is not None
155 
156  engine, auth_data, target, context_data, object_type = self.request_argsrequest_args
157  walker = bulkWalkCmd(
158  engine,
159  auth_data,
160  target,
161  context_data,
162  0,
163  50,
164  object_type,
165  lexicographicMode=False,
166  )
167  async for errindication, errstatus, errindex, res in walker:
168  if errindication:
169  _LOGGER.error("SNMPLIB error: %s", errindication)
170  return None
171  if errstatus:
172  _LOGGER.error(
173  "SNMP error: %s at %s",
174  errstatus.prettyPrint(),
175  errindex and res[int(errindex) - 1][0] or "?",
176  )
177  return None
178 
179  for _oid, value in res:
180  if not isEndOfMib(res):
181  try:
182  mac = binascii.hexlify(value.asOctets()).decode("utf-8")
183  except AttributeError:
184  continue
185  _LOGGER.debug("Found MAC address: %s", mac)
186  mac = ":".join([mac[i : i + 2] for i in range(0, len(mac), 2)])
187  devices.append({"mac": mac})
188  return devices
SnmpScanner|None async_get_scanner(HomeAssistant hass, ConfigType config)
RequestArgsType async_create_request_cmd_args(HomeAssistant hass, UsmUserData|CommunityData auth_data, UdpTransportTarget|Udp6TransportTarget target, str object_id)
Definition: util.py:63