1 """The Thread integration."""
3 from __future__
import annotations
5 from collections.abc
import Callable
8 from typing
import cast
10 from python_otbr_api.mdns
import StateBitmap
11 from zeroconf
import (
12 BadTypeInNameException,
16 instance_name_from_service_info,
18 from zeroconf.asyncio
import AsyncServiceInfo, AsyncZeroconf
23 _LOGGER = logging.getLogger(__name__)
25 KNOWN_BRANDS: dict[str |
None, str] = {
27 "Apple Inc.":
"apple",
28 "Aqara":
"aqara_gateway",
30 "Google Inc.":
"google",
31 "HomeAssistant":
"homeassistant",
32 "Home Assistant":
"homeassistant",
33 "Nanoleaf":
"nanoleaf",
34 "OpenThread":
"openthread",
37 THREAD_TYPE =
"_meshcop._udp.local."
42 @dataclasses.dataclass
44 """Thread router discovery data."""
48 border_agent_id: str |
None
52 model_name: str |
None
53 network_name: str |
None
55 thread_version: str |
None
56 unconfigured: bool |
None
57 vendor_name: str |
None
61 service: AsyncServiceInfo,
64 ) -> ThreadRouterDiscoveryData:
65 """Get a ThreadRouterDiscoveryData from an AsyncServiceInfo."""
67 def try_decode(value: bytes |
None) -> str |
None:
68 """Try decoding UTF-8."""
73 except UnicodeDecodeError:
76 service_properties = service.properties
77 border_agent_id = service_properties.get(b
"id")
78 model_name = try_decode(service_properties.get(b
"mn"))
79 network_name = try_decode(service_properties.get(b
"nn"))
80 server = service.server
81 thread_version = try_decode(service_properties.get(b
"tv"))
82 vendor_name = try_decode(service_properties.get(b
"vn"))
85 brand = KNOWN_BRANDS.get(vendor_name)
86 if brand ==
"homeassistant":
88 if (state_bitmap_b := service_properties.get(b
"sb"))
is not None:
90 state_bitmap = StateBitmap.from_bytes(state_bitmap_b)
91 if not state_bitmap.is_active:
94 _LOGGER.debug(
"Failed to decode state bitmap in service %s", service)
95 if service_properties.get(b
"at")
is None:
99 instance_name=instance_name_from_service_info(service),
100 addresses=service.parsed_addresses(),
101 border_agent_id=border_agent_id.hex()
if border_agent_id
is not None else None,
103 extended_address=ext_addr.hex(),
104 extended_pan_id=ext_pan_id.hex(),
105 model_name=model_name,
106 network_name=network_name,
108 thread_version=thread_version,
109 unconfigured=unconfigured,
110 vendor_name=vendor_name,
115 """Return all meshcop records already in the zeroconf cache."""
118 records = aiozc.zeroconf.cache.async_all_by_details(THREAD_TYPE, TYPE_PTR, CLASS_IN)
119 for record
in records:
120 record = cast(DNSPointer, record)
123 info = AsyncServiceInfo(THREAD_TYPE, record.alias)
124 except BadTypeInNameException
as ex:
126 "Ignoring record with bad type in name: %s: %s", record.alias, ex
130 if not info.load_from_cache(aiozc.zeroconf):
134 service_properties = info.properties
136 if not (xa := service_properties.get(b
"xa")):
137 _LOGGER.debug(
"Ignoring record without xa %s", info)
139 if not (xp := service_properties.get(b
"xp")):
140 _LOGGER.debug(
"Ignoring record without xp %s", info)
149 """mDNS based Thread router discovery."""
152 """Service listener which listens for thread routers."""
157 aiozc: AsyncZeroconf,
158 router_discovered: Callable,
159 router_removed: Callable,
164 self._known_routers: dict[str, tuple[str, ThreadRouterDiscoveryData]] = {}
168 def add_service(self, zc: Zeroconf, type_: str, name: str) ->
None:
169 """Handle service added."""
170 _LOGGER.debug(
"add_service %s", name)
174 """Handle service removed."""
175 _LOGGER.debug(
"remove_service %s", name)
176 if name
not in self._known_routers:
178 extended_mac_address, _ = self._known_routers.pop(name)
182 """Handle service updated."""
183 _LOGGER.debug(
"update_service %s", name)
187 """Add or update a service."""
190 while service
is None and tries < 4:
191 service = await self.
_aiozc_aiozc.async_get_service_info(type_, name)
195 _LOGGER.debug(
"_add_update_service failed to add %s, %s", type_, name)
198 _LOGGER.debug(
"_add_update_service %s %s", name, service)
199 service_properties = service.properties
202 if not (xa := service_properties.get(b
"xa")):
204 "Discovered unsupported Thread router without extended address: %s",
208 if not (xp := service_properties.get(b
"xp")):
210 "Discovered unsupported Thread router without extended pan ID: %s",
216 extended_mac_address = xa.hex()
217 if name
in self._known_routers
and self._known_routers[name] == (
218 extended_mac_address,
222 "_add_update_service suppressing identical update for %s", name
225 self._known_routers[name] = (extended_mac_address, data)
231 router_discovered: Callable[[str, ThreadRouterDiscoveryData],
None],
232 router_removed: Callable[[str],
None],
236 self.
_aiozc_aiozc: AsyncZeroconf |
None =
None
244 """Start discovery."""
245 self.
_aiozc_aiozc = aiozc = await zeroconf.async_get_async_instance(self.
_hass_hass)
249 await aiozc.async_add_service_listener(THREAD_TYPE, self.
_service_listener_service_listener)
252 """Stop discovery."""
def _add_update_service(self, str type_, str name)
None __init__(self, HomeAssistant hass, AsyncZeroconf aiozc, Callable router_discovered, Callable router_removed)
None add_service(self, Zeroconf zc, str type_, str name)
None remove_service(self, Zeroconf zc, str type_, str name)
None update_service(self, Zeroconf zc, str type_, str name)
None __init__(self, HomeAssistant hass, Callable[[str, ThreadRouterDiscoveryData], None] router_discovered, Callable[[str], None] router_removed)
list[ThreadRouterDiscoveryData] async_read_zeroconf_cache(AsyncZeroconf aiozc)
ThreadRouterDiscoveryData async_discovery_data_from_service(AsyncServiceInfo service, bytes ext_addr, bytes ext_pan_id)