1 """The elkm1 integration discovery."""
3 from __future__
import annotations
6 from dataclasses
import asdict
9 from elkm1_lib.discovery
import AIOELKDiscovery, ElkSystem
11 from homeassistant
import config_entries
16 from .const
import DISCOVER_SCAN_TIMEOUT, DOMAIN
18 _LOGGER = logging.getLogger(__name__)
22 return mac_address.replace(
":",
"")[-6:]
31 """Update a config entry from a discovery."""
32 if not entry.unique_id
or ":" not in entry.unique_id:
33 _LOGGER.debug(
"Adding unique id from discovery: %s", device)
34 return hass.config_entries.async_update_entry(
35 entry, unique_id=dr.format_mac(device.mac_address)
37 _LOGGER.debug(
"Unique id is already present from discovery: %s", device)
42 hass: HomeAssistant, timeout: int, address: str |
None =
None
44 """Discover elkm1 devices."""
49 str(broadcast_address)
50 for broadcast_address
in await network.async_get_ipv4_broadcast_addresses(
55 scanner = AIOELKDiscovery()
56 combined_discoveries: dict[str, ElkSystem] = {}
57 for idx, discovered
in enumerate(
60 scanner.async_scan(timeout=timeout, address=target_address)
61 for target_address
in targets
63 return_exceptions=
True,
66 if isinstance(discovered, Exception):
67 _LOGGER.debug(
"Scanning %s failed with error: %s", targets[idx], discovered)
69 if isinstance(discovered, BaseException):
70 raise discovered
from None
71 for device
in discovered:
72 assert isinstance(device, ElkSystem)
73 combined_discoveries[device.ip_address] = device
75 return list(combined_discoveries.values())
79 """Direct discovery at a single ip instead of broadcast."""
83 if device.ip_address == host:
91 discovered_devices: list[ElkSystem],
93 """Trigger config flows for discovered devices."""
94 for device
in discovered_devices:
95 discovery_flow.async_create_flow(
98 context={
"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
ElkSystem|None async_discover_device(HomeAssistant hass, str host)
None async_trigger_discovery(HomeAssistant hass, list[ElkSystem] discovered_devices)
str _short_mac(str mac_address)
list[ElkSystem] async_discover_devices(HomeAssistant hass, int timeout, str|None address=None)
bool async_update_entry_from_discovery(HomeAssistant hass, config_entries.ConfigEntry entry, ElkSystem device)