Home Assistant Unofficial Reference 2024.12.1
device_tracker.py
Go to the documentation of this file.
1 """Support for Aruba Access Points."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import re
7 from typing import Any
8 
9 import pexpect
10 import voluptuous as vol
11 
13  DOMAIN as DEVICE_TRACKER_DOMAIN,
14  PLATFORM_SCHEMA as DEVICE_TRACKER_PLATFORM_SCHEMA,
15  DeviceScanner,
16 )
17 from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
18 from homeassistant.core import HomeAssistant
20 from homeassistant.helpers.typing import ConfigType
21 
22 _LOGGER = logging.getLogger(__name__)
23 
24 _DEVICES_REGEX = re.compile(
25  r"(?P<name>([^\s]+)?)\s+"
26  r"(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s+"
27  r"(?P<mac>([0-9a-f]{2}[:-]){5}([0-9a-f]{2}))\s+"
28 )
29 
30 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
31  {
32  vol.Required(CONF_HOST): cv.string,
33  vol.Required(CONF_PASSWORD): cv.string,
34  vol.Required(CONF_USERNAME): cv.string,
35  }
36 )
37 
38 
39 def get_scanner(hass: HomeAssistant, config: ConfigType) -> ArubaDeviceScanner | None:
40  """Validate the configuration and return a Aruba scanner."""
41  scanner = ArubaDeviceScanner(config[DEVICE_TRACKER_DOMAIN])
42 
43  return scanner if scanner.success_init else None
44 
45 
46 class ArubaDeviceScanner(DeviceScanner):
47  """Class which queries a Aruba Access Point for connected devices."""
48 
49  def __init__(self, config: dict[str, Any]) -> None:
50  """Initialize the scanner."""
51  self.host: str = config[CONF_HOST]
52  self.username: str = config[CONF_USERNAME]
53  self.password: str = config[CONF_PASSWORD]
54 
55  self.last_resultslast_results: dict[str, dict[str, str]] = {}
56 
57  # Test the router is accessible.
58  data = self.get_aruba_dataget_aruba_data()
59  self.success_initsuccess_init = data is not None
60 
61  def scan_devices(self) -> list[str]:
62  """Scan for new devices and return a list with found device IDs."""
63  self._update_info_update_info()
64  return [client["mac"] for client in self.last_resultslast_results.values()]
65 
66  def get_device_name(self, device: str) -> str | None:
67  """Return the name of the given device or None if we don't know."""
68  if not self.last_resultslast_results:
69  return None
70  for client in self.last_resultslast_results.values():
71  if client["mac"] == device:
72  return client["name"]
73  return None
74 
75  def _update_info(self) -> bool:
76  """Ensure the information from the Aruba Access Point is up to date.
77 
78  Return boolean if scanning successful.
79  """
80  if not self.success_initsuccess_init:
81  return False
82 
83  if not (data := self.get_aruba_dataget_aruba_data()):
84  return False
85 
86  self.last_resultslast_results = data
87  return True
88 
89  def get_aruba_data(self) -> dict[str, dict[str, str]] | None:
90  """Retrieve data from Aruba Access Point and return parsed result."""
91 
92  connect = f"ssh {self.username}@{self.host} -o HostKeyAlgorithms=ssh-rsa"
93  ssh = pexpect.spawn(connect)
94  query = ssh.expect(
95  [
96  "password:",
97  pexpect.TIMEOUT,
98  pexpect.EOF,
99  "continue connecting (yes/no)?",
100  "Host key verification failed.",
101  "Connection refused",
102  "Connection timed out",
103  ],
104  timeout=120,
105  )
106  if query == 1:
107  _LOGGER.error("Timeout")
108  return None
109  if query == 2:
110  _LOGGER.error("Unexpected response from router")
111  return None
112  if query == 3:
113  ssh.sendline("yes")
114  ssh.expect("password:")
115  elif query == 4:
116  _LOGGER.error("Host key changed")
117  return None
118  elif query == 5:
119  _LOGGER.error("Connection refused by server")
120  return None
121  elif query == 6:
122  _LOGGER.error("Connection timed out")
123  return None
124  ssh.sendline(self.password)
125  ssh.expect("#")
126  ssh.sendline("show clients")
127  ssh.expect("#")
128  devices_result = ssh.before.split(b"\r\n")
129  ssh.sendline("exit")
130 
131  devices: dict[str, dict[str, str]] = {}
132  for device in devices_result:
133  if match := _DEVICES_REGEX.search(device.decode("utf-8")):
134  devices[match.group("ip")] = {
135  "ip": match.group("ip"),
136  "mac": match.group("mac").upper(),
137  "name": match.group("name"),
138  }
139  return devices
ArubaDeviceScanner|None get_scanner(HomeAssistant hass, ConfigType config)