Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Mikrotik router class."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 import ssl
8 from typing import Any
9 
10 import librouteros
11 from librouteros.login import plain as login_plain, token as login_token
12 
13 from homeassistant.config_entries import ConfigEntry
14 from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_VERIFY_SSL
15 from homeassistant.core import HomeAssistant
16 from homeassistant.exceptions import ConfigEntryAuthFailed
17 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
18 
19 from .const import (
20  ARP,
21  ATTR_FIRMWARE,
22  ATTR_MODEL,
23  ATTR_SERIAL_NUMBER,
24  CAPSMAN,
25  CONF_ARP_PING,
26  CONF_DETECTION_TIME,
27  CONF_FORCE_DHCP,
28  DEFAULT_DETECTION_TIME,
29  DHCP,
30  DOMAIN,
31  IDENTITY,
32  INFO,
33  IS_CAPSMAN,
34  IS_WIFI,
35  IS_WIFIWAVE2,
36  IS_WIRELESS,
37  MIKROTIK_SERVICES,
38  NAME,
39  WIFI,
40  WIFIWAVE2,
41  WIRELESS,
42 )
43 from .device import Device
44 from .errors import CannotConnect, LoginError
45 
46 _LOGGER = logging.getLogger(__name__)
47 
48 
50  """Handle all communication with the Mikrotik API."""
51 
52  def __init__(
53  self, hass: HomeAssistant, config_entry: ConfigEntry, api: librouteros.Api
54  ) -> None:
55  """Initialize the Mikrotik Client."""
56  self.hasshass = hass
57  self.config_entryconfig_entry = config_entry
58  self.apiapi = api
59  self._host: str = self.config_entryconfig_entry.data[CONF_HOST]
60  self.all_devicesall_devices: dict[str, dict[str, Any]] = {}
61  self.devices: dict[str, Device] = {}
62  self.support_capsmansupport_capsman: bool = False
63  self.support_wirelesssupport_wireless: bool = False
64  self.support_wifiwave2support_wifiwave2: bool = False
65  self.support_wifisupport_wifi: bool = False
66  self.hostnamehostname: str = ""
67  self.modelmodel: str = ""
68  self.firmwarefirmware: str = ""
69  self.serial_numberserial_number: str = ""
70 
71  @staticmethod
72  def load_mac(devices: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
73  """Load dictionary using MAC address as key."""
74  mac_devices = {}
75  for device in devices:
76  if "mac-address" in device:
77  mac = device["mac-address"]
78  mac_devices[mac] = device
79  return mac_devices
80 
81  @property
82  def arp_enabled(self) -> bool:
83  """Return arp_ping option setting."""
84  return self.config_entryconfig_entry.options.get(CONF_ARP_PING, False)
85 
86  @property
87  def force_dhcp(self) -> bool:
88  """Return force_dhcp option setting."""
89  return self.config_entryconfig_entry.options.get(CONF_FORCE_DHCP, False)
90 
91  def get_info(self, param: str) -> str:
92  """Return device model name."""
93  cmd = IDENTITY if param == NAME else INFO
94  if data := self.commandcommand(MIKROTIK_SERVICES[cmd], suppress_errors=(cmd == INFO)):
95  return str(data[0].get(param))
96  return ""
97 
98  def get_hub_details(self) -> None:
99  """Get Hub info."""
100  self.hostnamehostname = self.get_infoget_info(NAME)
101  self.modelmodel = self.get_infoget_info(ATTR_MODEL)
102  self.firmwarefirmware = self.get_infoget_info(ATTR_FIRMWARE)
103  self.serial_numberserial_number = self.get_infoget_info(ATTR_SERIAL_NUMBER)
104  self.support_capsmansupport_capsman = bool(
105  self.commandcommand(MIKROTIK_SERVICES[IS_CAPSMAN], suppress_errors=True)
106  )
107  self.support_wirelesssupport_wireless = bool(
108  self.commandcommand(MIKROTIK_SERVICES[IS_WIRELESS], suppress_errors=True)
109  )
110  self.support_wifiwave2support_wifiwave2 = bool(
111  self.commandcommand(MIKROTIK_SERVICES[IS_WIFIWAVE2], suppress_errors=True)
112  )
113  self.support_wifisupport_wifi = bool(
114  self.commandcommand(MIKROTIK_SERVICES[IS_WIFI], suppress_errors=True)
115  )
116 
117  def get_list_from_interface(self, interface: str) -> dict[str, dict[str, Any]]:
118  """Get devices from interface."""
119  if result := self.commandcommand(MIKROTIK_SERVICES[interface]):
120  return self.load_macload_mac(result)
121  return {}
122 
123  def restore_device(self, mac: str) -> None:
124  """Restore a missing device after restart."""
125  self.devices[mac] = Device(mac, self.all_devicesall_devices[mac])
126 
127  def update_devices(self) -> None:
128  """Get list of devices with latest status."""
129  arp_devices = {}
130  device_list = {}
131  wireless_devices = {}
132  try:
133  self.all_devicesall_devices = self.get_list_from_interfaceget_list_from_interface(DHCP)
134  if self.support_capsmansupport_capsman:
135  _LOGGER.debug("Hub is a CAPSman manager")
136  device_list = wireless_devices = self.get_list_from_interfaceget_list_from_interface(CAPSMAN)
137  elif self.support_wirelesssupport_wireless:
138  _LOGGER.debug("Hub supports wireless Interface")
139  device_list = wireless_devices = self.get_list_from_interfaceget_list_from_interface(WIRELESS)
140  elif self.support_wifiwave2support_wifiwave2:
141  _LOGGER.debug("Hub supports wifiwave2 Interface")
142  device_list = wireless_devices = self.get_list_from_interfaceget_list_from_interface(WIFIWAVE2)
143  elif self.support_wifisupport_wifi:
144  _LOGGER.debug("Hub supports wifi Interface")
145  device_list = wireless_devices = self.get_list_from_interfaceget_list_from_interface(WIFI)
146 
147  if not device_list or self.force_dhcpforce_dhcp:
148  device_list = self.all_devicesall_devices
149  _LOGGER.debug("Falling back to DHCP for scanning devices")
150 
151  if self.arp_enabledarp_enabled:
152  _LOGGER.debug("Using arp-ping to check devices")
153  arp_devices = self.get_list_from_interfaceget_list_from_interface(ARP)
154 
155  # get new hub firmware version if updated
156  self.firmwarefirmware = self.get_infoget_info(ATTR_FIRMWARE)
157 
158  except CannotConnect as err:
159  raise UpdateFailed from err
160  except LoginError as err:
161  raise ConfigEntryAuthFailed from err
162 
163  if not device_list:
164  return
165 
166  for mac, params in device_list.items():
167  if mac not in self.devices:
168  self.devices[mac] = Device(mac, self.all_devicesall_devices.get(mac, {}))
169  else:
170  self.devices[mac].update(params=self.all_devicesall_devices.get(mac, {}))
171 
172  if mac in wireless_devices:
173  # if wireless is supported then wireless_params are params
174  self.devices[mac].update(
175  wireless_params=wireless_devices[mac], active=True
176  )
177  continue
178  # for wired devices or when forcing dhcp check for active-address
179  if not params.get("active-address"):
180  self.devices[mac].update(active=False)
181  continue
182  # ping check the rest of active devices if arp ping is enabled
183  active = True
184  if self.arp_enabledarp_enabled and mac in arp_devices:
185  active = self.do_arp_pingdo_arp_ping(
186  str(params.get("active-address")),
187  str(arp_devices[mac].get("interface")),
188  )
189  self.devices[mac].update(active=active)
190 
191  def do_arp_ping(self, ip_address: str, interface: str) -> bool:
192  """Attempt to arp ping MAC address via interface."""
193  _LOGGER.debug("pinging - %s", ip_address)
194  params = {
195  "arp-ping": "yes",
196  "interval": "100ms",
197  "count": 3,
198  "interface": interface,
199  "address": ip_address,
200  }
201  cmd = "/ping"
202  data = self.commandcommand(cmd, params)
203  if data:
204  status = 0
205  for result in data:
206  if "status" in result:
207  status += 1
208  if status == len(data):
209  _LOGGER.debug(
210  "Mikrotik %s - %s arp_ping timed out", ip_address, interface
211  )
212  return False
213  return True
214 
215  def command(
216  self,
217  cmd: str,
218  params: dict[str, Any] | None = None,
219  suppress_errors: bool = False,
220  ) -> list[dict[str, Any]]:
221  """Retrieve data from Mikrotik API."""
222  _LOGGER.debug("Running command %s", cmd)
223  try:
224  if params:
225  return list(self.apiapi(cmd=cmd, **params))
226  return list(self.apiapi(cmd=cmd))
227  except (
228  librouteros.exceptions.ConnectionClosed,
229  OSError,
230  TimeoutError,
231  ) as api_error:
232  _LOGGER.error("Mikrotik %s connection error %s", self._host, api_error)
233  # try to reconnect
234  self.apiapi = get_api(dict(self.config_entryconfig_entry.data))
235  # we still have to raise CannotConnect to fail the update.
236  raise CannotConnect from api_error
237  except librouteros.exceptions.ProtocolError as api_error:
238  emsg = "Mikrotik %s failed to retrieve data. cmd=[%s] Error: %s"
239  if suppress_errors and "no such command prefix" in str(api_error):
240  _LOGGER.debug(emsg, self._host, cmd, api_error)
241  return []
242  _LOGGER.warning(emsg, self._host, cmd, api_error)
243  return []
244 
245 
247  """Mikrotik Hub Object."""
248 
249  def __init__(
250  self, hass: HomeAssistant, config_entry: ConfigEntry, api: librouteros.Api
251  ) -> None:
252  """Initialize the Mikrotik Client."""
253  self.hasshasshass = hass
254  self.config_entryconfig_entry: ConfigEntry = config_entry
255  self._mk_data_mk_data = MikrotikData(self.hasshasshass, self.config_entryconfig_entry, api)
256  super().__init__(
257  self.hasshasshass,
258  _LOGGER,
259  name=f"{DOMAIN} - {self.host}",
260  update_interval=timedelta(seconds=10),
261  )
262 
263  @property
264  def host(self) -> str:
265  """Return the host of this hub."""
266  return str(self.config_entryconfig_entry.data[CONF_HOST])
267 
268  @property
269  def hostname(self) -> str:
270  """Return the hostname of the hub."""
271  return self._mk_data_mk_data.hostname
272 
273  @property
274  def model(self) -> str:
275  """Return the model of the hub."""
276  return self._mk_data_mk_data.model
277 
278  @property
279  def firmware(self) -> str:
280  """Return the firmware of the hub."""
281  return self._mk_data_mk_data.firmware
282 
283  @property
284  def serial_num(self) -> str:
285  """Return the serial number of the hub."""
286  return self._mk_data_mk_data.serial_number
287 
288  @property
289  def option_detection_time(self) -> timedelta:
290  """Config entry option defining number of seconds from last seen to away."""
291  return timedelta(
292  seconds=self.config_entryconfig_entry.options.get(
293  CONF_DETECTION_TIME, DEFAULT_DETECTION_TIME
294  )
295  )
296 
297  @property
298  def api(self) -> MikrotikData:
299  """Represent Mikrotik data object."""
300  return self._mk_data_mk_data
301 
302  async def _async_update_data(self) -> None:
303  """Update Mikrotik devices information."""
304  await self.hasshasshass.async_add_executor_job(self._mk_data_mk_data.update_devices)
305 
306 
307 def get_api(entry: dict[str, Any]) -> librouteros.Api:
308  """Connect to Mikrotik hub."""
309  _LOGGER.debug("Connecting to Mikrotik hub [%s]", entry[CONF_HOST])
310 
311  _login_method = (login_plain, login_token)
312  kwargs = {"login_methods": _login_method, "port": entry["port"], "encoding": "utf8"}
313 
314  if entry[CONF_VERIFY_SSL]:
315  ssl_context = ssl.create_default_context()
316  ssl_context.check_hostname = False
317  ssl_context.verify_mode = ssl.CERT_NONE
318  _ssl_wrapper = ssl_context.wrap_socket
319  kwargs["ssl_wrapper"] = _ssl_wrapper
320 
321  try:
322  api = librouteros.connect(
323  entry[CONF_HOST],
324  entry[CONF_USERNAME],
325  entry[CONF_PASSWORD],
326  **kwargs,
327  )
328  except (
329  librouteros.exceptions.LibRouterosError,
330  OSError,
331  TimeoutError,
332  ) as api_error:
333  _LOGGER.error("Mikrotik %s error: %s", entry[CONF_HOST], api_error)
334  if "invalid user name or password" in str(api_error):
335  raise LoginError from api_error
336  raise CannotConnect from api_error
337 
338  _LOGGER.debug("Connected to %s successfully", entry[CONF_HOST])
339  return api
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, librouteros.Api api)
Definition: coordinator.py:251
dict[str, dict[str, Any]] get_list_from_interface(self, str interface)
Definition: coordinator.py:117
bool do_arp_ping(self, str ip_address, str interface)
Definition: coordinator.py:191
list[dict[str, Any]] command(self, str cmd, dict[str, Any]|None params=None, bool suppress_errors=False)
Definition: coordinator.py:220
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, librouteros.Api api)
Definition: coordinator.py:54
dict[str, dict[str, Any]] load_mac(list[dict[str, Any]] devices)
Definition: coordinator.py:72
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
librouteros.Api get_api(dict[str, Any] entry)
Definition: coordinator.py:307