Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The SSDP integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Coroutine, Mapping
7 from dataclasses import dataclass, field
8 from datetime import timedelta
9 from enum import Enum
10 from functools import partial
11 from ipaddress import IPv4Address, IPv6Address
12 import logging
13 import socket
14 from time import time
15 from typing import TYPE_CHECKING, Any
16 from urllib.parse import urljoin
17 import xml.etree.ElementTree as ET
18 
19 from async_upnp_client.aiohttp import AiohttpSessionRequester
20 from async_upnp_client.const import (
21  AddressTupleVXType,
22  DeviceIcon,
23  DeviceInfo,
24  DeviceOrServiceType,
25  SsdpSource,
26 )
27 from async_upnp_client.description_cache import DescriptionCache
28 from async_upnp_client.server import UpnpServer, UpnpServerDevice, UpnpServerService
29 from async_upnp_client.ssdp import (
30  SSDP_PORT,
31  determine_source_target,
32  fix_ipv6_address_scope_id,
33  is_ipv4_address,
34 )
35 from async_upnp_client.ssdp_listener import SsdpDevice, SsdpDeviceTracker, SsdpListener
36 from async_upnp_client.utils import CaseInsensitiveDict
37 
38 from homeassistant import config_entries
39 from homeassistant.components import network
40 from homeassistant.const import (
41  EVENT_HOMEASSISTANT_STARTED,
42  EVENT_HOMEASSISTANT_STOP,
43  MATCH_ALL,
44  __version__ as current_version,
45 )
46 from homeassistant.core import Event, HassJob, HomeAssistant, callback as core_callback
47 from homeassistant.data_entry_flow import BaseServiceInfo
48 from homeassistant.helpers import config_validation as cv, discovery_flow
49 from homeassistant.helpers.aiohttp_client import async_get_clientsession
50 from homeassistant.helpers.dispatcher import async_dispatcher_connect
51 from homeassistant.helpers.event import async_track_time_interval
52 from homeassistant.helpers.instance_id import async_get as async_get_instance_id
53 from homeassistant.helpers.network import NoURLAvailableError, get_url
54 from homeassistant.helpers.system_info import async_get_system_info
55 from homeassistant.helpers.typing import ConfigType
56 from homeassistant.loader import async_get_ssdp, bind_hass
57 from homeassistant.util.async_ import create_eager_task
58 from homeassistant.util.logging import catch_log_exception
59 
60 DOMAIN = "ssdp"
61 SSDP_SCANNER = "scanner"
62 UPNP_SERVER = "server"
63 UPNP_SERVER_MIN_PORT = 40000
64 UPNP_SERVER_MAX_PORT = 40100
65 SCAN_INTERVAL = timedelta(minutes=10)
66 
67 IPV4_BROADCAST = IPv4Address("255.255.255.255")
68 
69 # Attributes for accessing info from SSDP response
70 ATTR_SSDP_LOCATION = "ssdp_location"
71 ATTR_SSDP_ST = "ssdp_st"
72 ATTR_SSDP_NT = "ssdp_nt"
73 ATTR_SSDP_UDN = "ssdp_udn"
74 ATTR_SSDP_USN = "ssdp_usn"
75 ATTR_SSDP_EXT = "ssdp_ext"
76 ATTR_SSDP_SERVER = "ssdp_server"
77 ATTR_SSDP_BOOTID = "BOOTID.UPNP.ORG"
78 ATTR_SSDP_NEXTBOOTID = "NEXTBOOTID.UPNP.ORG"
79 # Attributes for accessing info from retrieved UPnP device description
80 ATTR_ST = "st"
81 ATTR_NT = "nt"
82 ATTR_UPNP_DEVICE_TYPE = "deviceType"
83 ATTR_UPNP_FRIENDLY_NAME = "friendlyName"
84 ATTR_UPNP_MANUFACTURER = "manufacturer"
85 ATTR_UPNP_MANUFACTURER_URL = "manufacturerURL"
86 ATTR_UPNP_MODEL_DESCRIPTION = "modelDescription"
87 ATTR_UPNP_MODEL_NAME = "modelName"
88 ATTR_UPNP_MODEL_NUMBER = "modelNumber"
89 ATTR_UPNP_MODEL_URL = "modelURL"
90 ATTR_UPNP_SERIAL = "serialNumber"
91 ATTR_UPNP_SERVICE_LIST = "serviceList"
92 ATTR_UPNP_UDN = "UDN"
93 ATTR_UPNP_UPC = "UPC"
94 ATTR_UPNP_PRESENTATION_URL = "presentationURL"
95 # Attributes for accessing info added by Home Assistant
96 ATTR_HA_MATCHING_DOMAINS = "x_homeassistant_matching_domains"
97 
98 PRIMARY_MATCH_KEYS = [
99  ATTR_UPNP_MANUFACTURER,
100  ATTR_ST,
101  ATTR_UPNP_DEVICE_TYPE,
102  ATTR_NT,
103  ATTR_UPNP_MANUFACTURER_URL,
104 ]
105 
106 _LOGGER = logging.getLogger(__name__)
107 
108 
109 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
110 
111 
112 @dataclass(slots=True)
114  """Prepared info from ssdp/upnp entries."""
115 
116  ssdp_usn: str
117  ssdp_st: str
118  upnp: Mapping[str, Any]
119  ssdp_location: str | None = None
120  ssdp_nt: str | None = None
121  ssdp_udn: str | None = None
122  ssdp_ext: str | None = None
123  ssdp_server: str | None = None
124  ssdp_headers: Mapping[str, Any] = field(default_factory=dict)
125  ssdp_all_locations: set[str] = field(default_factory=set)
126  x_homeassistant_matching_domains: set[str] = field(default_factory=set)
127 
128 
129 SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")
130 type SsdpHassJobCallback = HassJob[
131  [SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None
132 ]
133 
134 SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
135  SsdpSource.SEARCH_ALIVE: SsdpChange.ALIVE,
136  SsdpSource.SEARCH_CHANGED: SsdpChange.ALIVE,
137  SsdpSource.ADVERTISEMENT_ALIVE: SsdpChange.ALIVE,
138  SsdpSource.ADVERTISEMENT_BYEBYE: SsdpChange.BYEBYE,
139  SsdpSource.ADVERTISEMENT_UPDATE: SsdpChange.UPDATE,
140 }
141 
142 
143 def _format_err(name: str, *args: Any) -> str:
144  """Format error message."""
145  return f"Exception in SSDP callback {name}: {args}"
146 
147 
148 @bind_hass
150  hass: HomeAssistant,
151  callback: Callable[[SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None],
152  match_dict: dict[str, str] | None = None,
153 ) -> Callable[[], None]:
154  """Register to receive a callback on ssdp broadcast.
155 
156  Returns a callback that can be used to cancel the registration.
157  """
158  scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER]
159  job = HassJob(
160  catch_log_exception(
161  callback,
162  partial(_format_err, str(callback)),
163  ),
164  f"ssdp callback {match_dict}",
165  )
166  return await scanner.async_register_callback(job, match_dict)
167 
168 
169 @bind_hass
171  hass: HomeAssistant, udn: str, st: str
172 ) -> SsdpServiceInfo | None:
173  """Fetch the discovery info cache."""
174  scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER]
175  return await scanner.async_get_discovery_info_by_udn_st(udn, st)
176 
177 
178 @bind_hass
180  hass: HomeAssistant, st: str
181 ) -> list[SsdpServiceInfo]:
182  """Fetch all the entries matching the st."""
183  scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER]
184  return await scanner.async_get_discovery_info_by_st(st)
185 
186 
187 @bind_hass
189  hass: HomeAssistant, udn: str
190 ) -> list[SsdpServiceInfo]:
191  """Fetch all the entries matching the udn."""
192  scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER]
193  return await scanner.async_get_discovery_info_by_udn(udn)
194 
195 
196 async def async_build_source_set(hass: HomeAssistant) -> set[IPv4Address | IPv6Address]:
197  """Build the list of ssdp sources."""
198  return {
199  source_ip
200  for source_ip in await network.async_get_enabled_source_ips(hass)
201  if not source_ip.is_loopback
202  and not source_ip.is_global
203  and (source_ip.version == 6 and source_ip.scope_id or source_ip.version == 4)
204  }
205 
206 
207 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
208  """Set up the SSDP integration."""
209 
210  integration_matchers = IntegrationMatchers()
211  integration_matchers.async_setup(await async_get_ssdp(hass))
212 
213  scanner = Scanner(hass, integration_matchers)
214  server = Server(hass)
215  hass.data[DOMAIN] = {
216  SSDP_SCANNER: scanner,
217  UPNP_SERVER: server,
218  }
219 
220  await scanner.async_start()
221  await server.async_start()
222 
223  return True
224 
225 
226 @core_callback
228  hass: HomeAssistant,
229  callbacks: list[SsdpHassJobCallback],
230  discovery_info: SsdpServiceInfo,
231  ssdp_change: SsdpChange,
232 ) -> None:
233  for callback in callbacks:
234  try:
235  hass.async_run_hass_job(
236  callback, discovery_info, ssdp_change, background=True
237  )
238  except Exception:
239  _LOGGER.exception("Failed to callback info: %s", discovery_info)
240 
241 
242 @core_callback
244  headers: CaseInsensitiveDict, lower_match_dict: dict[str, str]
245 ) -> bool:
246  for header, val in lower_match_dict.items():
247  if val == MATCH_ALL:
248  if header not in headers:
249  return False
250  elif headers.get_lower(header) != val:
251  return False
252  return True
253 
254 
256  """Optimized integration matching."""
257 
258  def __init__(self) -> None:
259  """Init optimized integration matching."""
260  self._match_by_key_match_by_key: (
261  dict[str, dict[str, list[tuple[str, dict[str, str]]]]] | None
262  ) = None
263 
264  @core_callback
266  self, integration_matchers: dict[str, list[dict[str, str]]]
267  ) -> None:
268  """Build matchers by key.
269 
270  Here we convert the primary match keys into their own
271  dicts so we can do lookups of the primary match
272  key to find the match dict.
273  """
274  self._match_by_key_match_by_key = {}
275  for key in PRIMARY_MATCH_KEYS:
276  matchers_by_key = self._match_by_key_match_by_key[key] = {}
277  for domain, matchers in integration_matchers.items():
278  for matcher in matchers:
279  if match_value := matcher.get(key):
280  matchers_by_key.setdefault(match_value, []).append(
281  (domain, matcher)
282  )
283 
284  @core_callback
285  def async_matching_domains(self, info_with_desc: CaseInsensitiveDict) -> set[str]:
286  """Find domains matching the passed CaseInsensitiveDict."""
287  assert self._match_by_key_match_by_key is not None
288  return {
289  domain
290  for key, matchers_by_key in self._match_by_key_match_by_key.items()
291  if (match_value := info_with_desc.get(key))
292  for domain, matcher in matchers_by_key.get(match_value, ())
293  if info_with_desc.items() >= matcher.items()
294  }
295 
296 
297 class Scanner:
298  """Class to manage SSDP searching and SSDP advertisements."""
299 
300  def __init__(
301  self, hass: HomeAssistant, integration_matchers: IntegrationMatchers
302  ) -> None:
303  """Initialize class."""
304  self.hasshass = hass
305  self._cancel_scan_cancel_scan: Callable[[], None] | None = None
306  self._ssdp_listeners: list[SsdpListener] = []
307  self._device_tracker_device_tracker = SsdpDeviceTracker()
308  self._callbacks: list[tuple[SsdpHassJobCallback, dict[str, str]]] = []
309  self._description_cache_description_cache: DescriptionCache | None = None
310  self.integration_matchersintegration_matchers = integration_matchers
311 
312  @property
313  def _ssdp_devices(self) -> list[SsdpDevice]:
314  """Get all seen devices."""
315  return list(self._device_tracker_device_tracker.devices.values())
316 
318  self, callback: SsdpHassJobCallback, match_dict: dict[str, str] | None = None
319  ) -> Callable[[], None]:
320  """Register a callback."""
321  if match_dict is None:
322  lower_match_dict = {}
323  else:
324  lower_match_dict = {k.lower(): v for k, v in match_dict.items()}
325 
326  # Make sure any entries that happened
327  # before the callback was registered are fired
328  for ssdp_device in self._ssdp_devices_ssdp_devices:
329  for headers in ssdp_device.all_combined_headers.values():
330  if _async_headers_match(headers, lower_match_dict):
332  self.hasshass,
333  [callback],
334  await self._async_headers_to_discovery_info_async_headers_to_discovery_info(
335  ssdp_device, headers
336  ),
337  SsdpChange.ALIVE,
338  )
339 
340  callback_entry = (callback, lower_match_dict)
341  self._callbacks.append(callback_entry)
342 
343  @core_callback
344  def _async_remove_callback() -> None:
345  self._callbacks.remove(callback_entry)
346 
347  return _async_remove_callback
348 
349  async def async_stop(self, *_: Any) -> None:
350  """Stop the scanner."""
351  assert self._cancel_scan_cancel_scan is not None
352  self._cancel_scan_cancel_scan()
353 
354  await self._async_stop_ssdp_listeners_async_stop_ssdp_listeners()
355 
356  async def _async_stop_ssdp_listeners(self) -> None:
357  """Stop the SSDP listeners."""
358  await asyncio.gather(
359  *(
360  create_eager_task(listener.async_stop())
361  for listener in self._ssdp_listeners
362  ),
363  return_exceptions=True,
364  )
365 
366  async def async_scan(self, *_: Any) -> None:
367  """Scan for new entries using ssdp listeners."""
368  await self.async_scan_multicastasync_scan_multicast()
369  await self.async_scan_broadcastasync_scan_broadcast()
370 
371  async def async_scan_multicast(self, *_: Any) -> None:
372  """Scan for new entries using multicase target."""
373  for ssdp_listener in self._ssdp_listeners:
374  await ssdp_listener.async_search()
375 
376  async def async_scan_broadcast(self, *_: Any) -> None:
377  """Scan for new entries using broadcast target."""
378  # Some sonos devices only seem to respond if we send to the broadcast
379  # address. This matches pysonos' behavior
380  # https://github.com/amelchio/pysonos/blob/d4329b4abb657d106394ae69357805269708c996/pysonos/discovery.py#L120
381  for listener in self._ssdp_listeners:
382  if is_ipv4_address(listener.source):
383  await listener.async_search((str(IPV4_BROADCAST), SSDP_PORT))
384 
385  async def async_start(self) -> None:
386  """Start the scanners."""
387  session = async_get_clientsession(self.hasshass, verify_ssl=False)
388  requester = AiohttpSessionRequester(session, True, 10)
389  self._description_cache_description_cache = DescriptionCache(requester)
390 
391  await self._async_start_ssdp_listeners_async_start_ssdp_listeners()
392 
393  self.hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stopasync_stop)
395  self.hasshass, self.async_scanasync_scan, SCAN_INTERVAL, name="SSDP scanner"
396  )
397 
399  self.hasshass,
400  config_entries.signal_discovered_config_entry_removed(DOMAIN),
401  self._handle_config_entry_removed_handle_config_entry_removed,
402  )
403 
404  # Trigger the initial-scan.
405  await self.async_scanasync_scan()
406 
407  async def _async_start_ssdp_listeners(self) -> None:
408  """Start the SSDP Listeners."""
409  # Devices are shared between all sources.
410  for source_ip in await async_build_source_set(self.hasshass):
411  source_ip_str = str(source_ip)
412  if source_ip.version == 6:
413  source_tuple: AddressTupleVXType = (
414  source_ip_str,
415  0,
416  0,
417  int(getattr(source_ip, "scope_id")),
418  )
419  else:
420  source_tuple = (source_ip_str, 0)
421  source, target = determine_source_target(source_tuple)
422  source = fix_ipv6_address_scope_id(source) or source
423  self._ssdp_listeners.append(
424  SsdpListener(
425  callback=self._ssdp_listener_callback_ssdp_listener_callback,
426  source=source,
427  target=target,
428  device_tracker=self._device_tracker_device_tracker,
429  )
430  )
431  results = await asyncio.gather(
432  *(
433  create_eager_task(listener.async_start())
434  for listener in self._ssdp_listeners
435  ),
436  return_exceptions=True,
437  )
438  failed_listeners = []
439  for idx, result in enumerate(results):
440  if isinstance(result, Exception):
441  _LOGGER.debug(
442  "Failed to setup listener for %s: %s",
443  self._ssdp_listeners[idx].source,
444  result,
445  )
446  failed_listeners.append(self._ssdp_listeners[idx])
447  for listener in failed_listeners:
448  self._ssdp_listeners.remove(listener)
449 
450  @core_callback
452  self,
453  combined_headers: CaseInsensitiveDict,
454  ) -> list[SsdpHassJobCallback]:
455  """Return a list of callbacks that match."""
456  return [
457  callback
458  for callback, lower_match_dict in self._callbacks
459  if _async_headers_match(combined_headers, lower_match_dict)
460  ]
461 
463  self,
464  ssdp_device: SsdpDevice,
465  dst: DeviceOrServiceType,
466  source: SsdpSource,
467  ) -> None:
468  """Handle a device/service change."""
469  _LOGGER.debug(
470  "SSDP: ssdp_device: %s, dst: %s, source: %s", ssdp_device, dst, source
471  )
472 
473  assert self._description_cache_description_cache
474 
475  location = ssdp_device.location
476  _, info_desc = self._description_cache_description_cache.peek_description_dict(location)
477  if info_desc is None:
478  # Fetch info desc in separate task and process from there.
479  self.hasshass.async_create_background_task(
480  self._ssdp_listener_process_callback_with_lookup_ssdp_listener_process_callback_with_lookup(
481  ssdp_device, dst, source
482  ),
483  name=f"ssdp_info_desc_lookup_{location}",
484  eager_start=True,
485  )
486  return
487 
488  # Info desc known, process directly.
489  self._ssdp_listener_process_callback_ssdp_listener_process_callback(ssdp_device, dst, source, info_desc)
490 
492  self,
493  ssdp_device: SsdpDevice,
494  dst: DeviceOrServiceType,
495  source: SsdpSource,
496  ) -> None:
497  """Handle a device/service change."""
498  location = ssdp_device.location
499  self._ssdp_listener_process_callback_ssdp_listener_process_callback(
500  ssdp_device,
501  dst,
502  source,
503  await self._async_get_description_dict_async_get_description_dict(location),
504  )
505 
507  self,
508  ssdp_device: SsdpDevice,
509  dst: DeviceOrServiceType,
510  source: SsdpSource,
511  info_desc: Mapping[str, Any],
512  skip_callbacks: bool = False,
513  ) -> None:
514  """Handle a device/service change."""
515  matching_domains: set[str] = set()
516  combined_headers = ssdp_device.combined_headers(dst)
517  callbacks = self._async_get_matching_callbacks_async_get_matching_callbacks(combined_headers)
518 
519  # If there are no changes from a search, do not trigger a config flow
520  if source != SsdpSource.SEARCH_ALIVE:
521  matching_domains = self.integration_matchersintegration_matchers.async_matching_domains(
522  CaseInsensitiveDict(combined_headers.as_dict(), **info_desc)
523  )
524 
525  if (
526  not callbacks
527  and not matching_domains
528  and source != SsdpSource.ADVERTISEMENT_BYEBYE
529  ):
530  return
531 
533  ssdp_device, combined_headers, info_desc
534  )
535  discovery_info.x_homeassistant_matching_domains = matching_domains
536 
537  if callbacks and not skip_callbacks:
538  ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source]
539  _async_process_callbacks(self.hasshass, callbacks, discovery_info, ssdp_change)
540 
541  # Config flows should only be created for alive/update messages from alive devices
542  if source == SsdpSource.ADVERTISEMENT_BYEBYE:
543  self._async_dismiss_discoveries_async_dismiss_discoveries(discovery_info)
544  return
545 
546  _LOGGER.debug("Discovery info: %s", discovery_info)
547 
548  if not matching_domains:
549  return # avoid creating DiscoveryKey if there are no matches
550 
551  discovery_key = discovery_flow.DiscoveryKey(
552  domain=DOMAIN, key=ssdp_device.udn, version=1
553  )
554  for domain in matching_domains:
555  _LOGGER.debug("Discovered %s at %s", domain, ssdp_device.location)
556  discovery_flow.async_create_flow(
557  self.hasshass,
558  domain,
559  {"source": config_entries.SOURCE_SSDP},
560  discovery_info,
561  discovery_key=discovery_key,
562  )
563 
565  self, byebye_discovery_info: SsdpServiceInfo
566  ) -> None:
567  """Dismiss all discoveries for the given address."""
568  for flow in self.hasshass.config_entries.flow.async_progress_by_init_data_type(
569  SsdpServiceInfo,
570  lambda service_info: bool(
571  service_info.ssdp_st == byebye_discovery_info.ssdp_st
572  and service_info.ssdp_location == byebye_discovery_info.ssdp_location
573  ),
574  ):
575  self.hasshass.config_entries.flow.async_abort(flow["flow_id"])
576 
578  self, location: str | None
579  ) -> Mapping[str, str]:
580  """Get description dict."""
581  assert self._description_cache_description_cache is not None
582  cache = self._description_cache_description_cache
583 
584  has_description, description = cache.peek_description_dict(location)
585  if has_description:
586  return description or {}
587 
588  return await cache.async_get_description_dict(location) or {}
589 
591  self, ssdp_device: SsdpDevice, headers: CaseInsensitiveDict
592  ) -> SsdpServiceInfo:
593  """Combine the headers and description into discovery_info.
594 
595  Building this is a bit expensive so we only do it on demand.
596  """
597  location = headers["location"]
598  info_desc = await self._async_get_description_dict_async_get_description_dict(location)
600  ssdp_device, headers, info_desc
601  )
602 
604  self, udn: str, st: str
605  ) -> SsdpServiceInfo | None:
606  """Return discovery_info for a udn and st."""
607  for ssdp_device in self._ssdp_devices_ssdp_devices:
608  if ssdp_device.udn == udn:
609  if headers := ssdp_device.combined_headers(st):
610  return await self._async_headers_to_discovery_info_async_headers_to_discovery_info(
611  ssdp_device, headers
612  )
613  return None
614 
615  async def async_get_discovery_info_by_st(self, st: str) -> list[SsdpServiceInfo]:
616  """Return matching discovery_infos for a st."""
617  return [
618  await self._async_headers_to_discovery_info_async_headers_to_discovery_info(ssdp_device, headers)
619  for ssdp_device in self._ssdp_devices_ssdp_devices
620  if (headers := ssdp_device.combined_headers(st))
621  ]
622 
623  async def async_get_discovery_info_by_udn(self, udn: str) -> list[SsdpServiceInfo]:
624  """Return matching discovery_infos for a udn."""
625  return [
626  await self._async_headers_to_discovery_info_async_headers_to_discovery_info(ssdp_device, headers)
627  for ssdp_device in self._ssdp_devices_ssdp_devices
628  for headers in ssdp_device.all_combined_headers.values()
629  if ssdp_device.udn == udn
630  ]
631 
632  @core_callback
634  self,
635  entry: config_entries.ConfigEntry,
636  ) -> None:
637  """Handle config entry changes."""
638  if TYPE_CHECKING:
639  assert self._description_cache_description_cache is not None
640  cache = self._description_cache_description_cache
641  for discovery_key in entry.discovery_keys[DOMAIN]:
642  if discovery_key.version != 1 or not isinstance(discovery_key.key, str):
643  continue
644  udn = discovery_key.key
645  _LOGGER.debug("Rediscover service %s", udn)
646 
647  for ssdp_device in self._ssdp_devices_ssdp_devices:
648  if ssdp_device.udn != udn:
649  continue
650  for dst in ssdp_device.all_combined_headers:
651  has_cached_desc, info_desc = cache.peek_description_dict(
652  ssdp_device.location
653  )
654  if has_cached_desc and info_desc:
655  self._ssdp_listener_process_callback_ssdp_listener_process_callback(
656  ssdp_device,
657  dst,
658  SsdpSource.SEARCH,
659  info_desc,
660  True, # Skip integration callbacks
661  )
662 
663 
665  ssdp_device: SsdpDevice,
666  combined_headers: CaseInsensitiveDict,
667  info_desc: Mapping[str, Any],
668 ) -> SsdpServiceInfo:
669  """Convert headers and description to discovery_info."""
670  ssdp_usn = combined_headers["usn"]
671  ssdp_st = combined_headers.get_lower("st")
672  if isinstance(info_desc, CaseInsensitiveDict):
673  upnp_info = {**info_desc.as_dict()}
674  else:
675  upnp_info = {**info_desc}
676 
677  # Increase compatibility: depending on the message type,
678  # either the ST (Search Target, from M-SEARCH messages)
679  # or NT (Notification Type, from NOTIFY messages) header is mandatory
680  if not ssdp_st:
681  ssdp_st = combined_headers["nt"]
682 
683  # Ensure UPnP "udn" is set
684  if ATTR_UPNP_UDN not in upnp_info:
685  if udn := _udn_from_usn(ssdp_usn):
686  upnp_info[ATTR_UPNP_UDN] = udn
687 
688  return SsdpServiceInfo(
689  ssdp_usn=ssdp_usn,
690  ssdp_st=ssdp_st,
691  ssdp_ext=combined_headers.get_lower("ext"),
692  ssdp_server=combined_headers.get_lower("server"),
693  ssdp_location=combined_headers.get_lower("location"),
694  ssdp_udn=combined_headers.get_lower("_udn"),
695  ssdp_nt=combined_headers.get_lower("nt"),
696  ssdp_headers=combined_headers,
697  upnp=upnp_info,
698  ssdp_all_locations=set(ssdp_device.locations),
699  )
700 
701 
702 def _udn_from_usn(usn: str | None) -> str | None:
703  """Get the UDN from the USN."""
704  if usn is None:
705  return None
706  if usn.startswith("uuid:"):
707  return usn.split("::")[0]
708  return None
709 
710 
711 class HassUpnpServiceDevice(UpnpServerDevice):
712  """Hass Device."""
713 
714  DEVICE_DEFINITION = DeviceInfo(
715  device_type="urn:home-assistant.io:device:HomeAssistant:1",
716  friendly_name="filled_later_on",
717  manufacturer="Home Assistant",
718  manufacturer_url="https://www.home-assistant.io",
719  model_description=None,
720  model_name="filled_later_on",
721  model_number=current_version,
722  model_url="https://www.home-assistant.io",
723  serial_number="filled_later_on",
724  udn="filled_later_on",
725  upc=None,
726  presentation_url="https://my.home-assistant.io/",
727  url="/device.xml",
728  icons=[
729  DeviceIcon(
730  mimetype="image/png",
731  width=1024,
732  height=1024,
733  depth=24,
734  url="/static/icons/favicon-1024x1024.png",
735  ),
736  DeviceIcon(
737  mimetype="image/png",
738  width=512,
739  height=512,
740  depth=24,
741  url="/static/icons/favicon-512x512.png",
742  ),
743  DeviceIcon(
744  mimetype="image/png",
745  width=384,
746  height=384,
747  depth=24,
748  url="/static/icons/favicon-384x384.png",
749  ),
750  DeviceIcon(
751  mimetype="image/png",
752  width=192,
753  height=192,
754  depth=24,
755  url="/static/icons/favicon-192x192.png",
756  ),
757  ],
758  xml=ET.Element("server_device"),
759  )
760  EMBEDDED_DEVICES: list[type[UpnpServerDevice]] = []
761  SERVICES: list[type[UpnpServerService]] = []
762 
763 
764 async def _async_find_next_available_port(source: AddressTupleVXType) -> int:
765  """Get a free TCP port."""
766  family = socket.AF_INET if is_ipv4_address(source) else socket.AF_INET6
767  test_socket = socket.socket(family, socket.SOCK_STREAM)
768  test_socket.setblocking(False)
769  test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
770 
771  for port in range(UPNP_SERVER_MIN_PORT, UPNP_SERVER_MAX_PORT):
772  addr = (source[0],) + (port,) + source[2:]
773  try:
774  test_socket.bind(addr)
775  except OSError:
776  if port == UPNP_SERVER_MAX_PORT - 1:
777  raise
778  else:
779  return port
780 
781  raise RuntimeError("unreachable")
782 
783 
784 class Server:
785  """Class to be visible via SSDP searching and advertisements."""
786 
787  def __init__(self, hass: HomeAssistant) -> None:
788  """Initialize class."""
789  self.hasshass = hass
790  self._upnp_servers: list[UpnpServer] = []
791 
792  async def async_start(self) -> None:
793  """Start the server."""
794  bus = self.hasshass.bus
795  bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stopasync_stop)
796  bus.async_listen_once(
797  EVENT_HOMEASSISTANT_STARTED,
798  self._async_start_upnp_servers_async_start_upnp_servers,
799  )
800 
801  async def _async_get_instance_udn(self) -> str:
802  """Get Unique Device Name for this instance."""
803  instance_id = await async_get_instance_id(self.hasshass)
804  return f"uuid:{instance_id[0:8]}-{instance_id[8:12]}-{instance_id[12:16]}-{instance_id[16:20]}-{instance_id[20:32]}".upper()
805 
806  async def _async_start_upnp_servers(self, event: Event) -> None:
807  """Start the UPnP/SSDP servers."""
808  # Update UDN with our instance UDN.
809  udn = await self._async_get_instance_udn_async_get_instance_udn()
810  system_info = await async_get_system_info(self.hasshass)
811  model_name = system_info["installation_type"]
812  try:
813  presentation_url = get_url(self.hasshass, allow_ip=True, prefer_external=False)
814  except NoURLAvailableError:
815  _LOGGER.warning(
816  "Could not set up UPnP/SSDP server, as a presentation URL could"
817  " not be determined; Please configure your internal URL"
818  " in the Home Assistant general configuration"
819  )
820  return
821 
822  serial_number = await async_get_instance_id(self.hasshass)
823  HassUpnpServiceDevice.DEVICE_DEFINITION = (
824  HassUpnpServiceDevice.DEVICE_DEFINITION._replace(
825  udn=udn,
826  friendly_name=f"{self.hass.config.location_name} (Home Assistant)",
827  model_name=model_name,
828  presentation_url=presentation_url,
829  serial_number=serial_number,
830  )
831  )
832 
833  # Update icon URLs.
834  for index, icon in enumerate(HassUpnpServiceDevice.DEVICE_DEFINITION.icons):
835  new_url = urljoin(presentation_url, icon.url)
836  HassUpnpServiceDevice.DEVICE_DEFINITION.icons[index] = icon._replace(
837  url=new_url
838  )
839 
840  # Start a server on all source IPs.
841  boot_id = int(time())
842  for source_ip in await async_build_source_set(self.hasshass):
843  source_ip_str = str(source_ip)
844  if source_ip.version == 6:
845  source_tuple: AddressTupleVXType = (
846  source_ip_str,
847  0,
848  0,
849  int(getattr(source_ip, "scope_id")),
850  )
851  else:
852  source_tuple = (source_ip_str, 0)
853  source, target = determine_source_target(source_tuple)
854  source = fix_ipv6_address_scope_id(source) or source
855  http_port = await _async_find_next_available_port(source)
856  _LOGGER.debug("Binding UPnP HTTP server to: %s:%s", source_ip, http_port)
857  self._upnp_servers.append(
858  UpnpServer(
859  source=source,
860  target=target,
861  http_port=http_port,
862  server_device=HassUpnpServiceDevice,
863  boot_id=boot_id,
864  )
865  )
866  results = await asyncio.gather(
867  *(upnp_server.async_start() for upnp_server in self._upnp_servers),
868  return_exceptions=True,
869  )
870  failed_servers = []
871  for idx, result in enumerate(results):
872  if isinstance(result, Exception):
873  _LOGGER.debug(
874  "Failed to setup server for %s: %s",
875  self._upnp_servers[idx].source,
876  result,
877  )
878  failed_servers.append(self._upnp_servers[idx])
879  for server in failed_servers:
880  self._upnp_servers.remove(server)
881 
882  async def async_stop(self, *_: Any) -> None:
883  """Stop the server."""
884  await self._async_stop_upnp_servers_async_stop_upnp_servers()
885 
886  async def _async_stop_upnp_servers(self) -> None:
887  """Stop UPnP/SSDP servers."""
888  for server in self._upnp_servers:
889  await server.async_stop()
None async_setup(self, dict[str, list[dict[str, str]]] integration_matchers)
Definition: __init__.py:267
set[str] async_matching_domains(self, CaseInsensitiveDict info_with_desc)
Definition: __init__.py:285
None _ssdp_listener_callback(self, SsdpDevice ssdp_device, DeviceOrServiceType dst, SsdpSource source)
Definition: __init__.py:467
None __init__(self, HomeAssistant hass, IntegrationMatchers integration_matchers)
Definition: __init__.py:302
Callable[[], None] async_register_callback(self, SsdpHassJobCallback callback, dict[str, str]|None match_dict=None)
Definition: __init__.py:319
list[SsdpServiceInfo] async_get_discovery_info_by_st(self, str st)
Definition: __init__.py:615
None async_scan_broadcast(self, *Any _)
Definition: __init__.py:376
None _ssdp_listener_process_callback_with_lookup(self, SsdpDevice ssdp_device, DeviceOrServiceType dst, SsdpSource source)
Definition: __init__.py:496
SsdpServiceInfo|None async_get_discovery_info_by_udn_st(self, str udn, str st)
Definition: __init__.py:605
list[SsdpDevice] _ssdp_devices(self)
Definition: __init__.py:313
None _handle_config_entry_removed(self, config_entries.ConfigEntry entry)
Definition: __init__.py:636
None async_scan_multicast(self, *Any _)
Definition: __init__.py:371
SsdpServiceInfo _async_headers_to_discovery_info(self, SsdpDevice ssdp_device, CaseInsensitiveDict headers)
Definition: __init__.py:592
None _async_dismiss_discoveries(self, SsdpServiceInfo byebye_discovery_info)
Definition: __init__.py:566
None _ssdp_listener_process_callback(self, SsdpDevice ssdp_device, DeviceOrServiceType dst, SsdpSource source, Mapping[str, Any] info_desc, bool skip_callbacks=False)
Definition: __init__.py:513
Mapping[str, str] _async_get_description_dict(self, str|None location)
Definition: __init__.py:579
list[SsdpHassJobCallback] _async_get_matching_callbacks(self, CaseInsensitiveDict combined_headers)
Definition: __init__.py:454
list[SsdpServiceInfo] async_get_discovery_info_by_udn(self, str udn)
Definition: __init__.py:623
None __init__(self, HomeAssistant hass)
Definition: __init__.py:787
None _async_start_upnp_servers(self, Event event)
Definition: __init__.py:806
bool remove(self, _T matcher)
Definition: match.py:214
int _async_find_next_available_port(AddressTupleVXType source)
Definition: __init__.py:764
None _async_process_callbacks(HomeAssistant hass, list[SsdpHassJobCallback] callbacks, SsdpServiceInfo discovery_info, SsdpChange ssdp_change)
Definition: __init__.py:232
Callable[[], None] async_register_callback(HomeAssistant hass, Callable[[SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None]|None] callback, dict[str, str]|None match_dict=None)
Definition: __init__.py:153
list[SsdpServiceInfo] async_get_discovery_info_by_st(HomeAssistant hass, str st)
Definition: __init__.py:181
SsdpServiceInfo|None async_get_discovery_info_by_udn_st(HomeAssistant hass, str udn, str st)
Definition: __init__.py:172
set[IPv4Address|IPv6Address] async_build_source_set(HomeAssistant hass)
Definition: __init__.py:196
bool _async_headers_match(CaseInsensitiveDict headers, dict[str, str] lower_match_dict)
Definition: __init__.py:245
SsdpServiceInfo discovery_info_from_headers_and_description(SsdpDevice ssdp_device, CaseInsensitiveDict combined_headers, Mapping[str, Any] info_desc)
Definition: __init__.py:668
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:207
list[SsdpServiceInfo] async_get_discovery_info_by_udn(HomeAssistant hass, str udn)
Definition: __init__.py:190
str _format_err(str name, *Any args)
Definition: __init__.py:143
str|None _udn_from_usn(str|None usn)
Definition: __init__.py:702
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
Definition: condition.py:802
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679
str get_url(HomeAssistant hass, *bool require_current_request=False, bool require_ssl=False, bool require_standard_port=False, bool require_cloud=False, bool allow_internal=True, bool allow_external=True, bool allow_cloud=True, bool|None allow_ip=None, bool|None prefer_external=None, bool prefer_cloud=False)
Definition: network.py:131
dict[str, Any] async_get_system_info(HomeAssistant hass)
Definition: system_info.py:44
dict[str, list[dict[str, str]]] async_get_ssdp(HomeAssistant hass)
Definition: loader.py:609
bool is_ipv4_address(str address)
Definition: network.py:73