Home Assistant Unofficial Reference 2024.12.1
scanner.py
Go to the documentation of this file.
1 """Support for Xiaomi Yeelight WiFi color bulb."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import ValuesView
7 import contextlib
8 from datetime import datetime
9 from functools import partial
10 from ipaddress import IPv4Address
11 import logging
12 from typing import Self
13 from urllib.parse import urlparse
14 
15 from async_upnp_client.search import SsdpSearchListener
16 from async_upnp_client.utils import CaseInsensitiveDict
17 
18 from homeassistant import config_entries
19 from homeassistant.components import network, ssdp
20 from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
21 from homeassistant.helpers import discovery_flow
22 from homeassistant.helpers.event import async_call_later, async_track_time_interval
23 from homeassistant.util.async_ import create_eager_task
24 
25 from .const import (
26  DISCOVERY_ATTEMPTS,
27  DISCOVERY_INTERVAL,
28  DISCOVERY_SEARCH_INTERVAL,
29  DISCOVERY_TIMEOUT,
30  DOMAIN,
31  SSDP_ST,
32  SSDP_TARGET,
33 )
34 
35 _LOGGER = logging.getLogger(__name__)
36 
37 
38 @callback
39 def _set_future_if_not_done(future: asyncio.Future[None]) -> None:
40  if not future.done():
41  future.set_result(None)
42 
43 
45  """Scan for Yeelight devices."""
46 
47  _scanner: Self | None = None
48 
49  @classmethod
50  @callback
51  def async_get(cls, hass: HomeAssistant) -> YeelightScanner:
52  """Get scanner instance."""
53  if cls._scanner_scanner is None:
54  cls._scanner_scanner = cls(hass)
55  return cls._scanner_scanner
56 
57  def __init__(self, hass: HomeAssistant) -> None:
58  """Initialize class."""
59  self._hass_hass = hass
60  self._host_discovered_events: dict[str, list[asyncio.Event]] = {}
61  self._unique_id_capabilities: dict[str, CaseInsensitiveDict] = {}
62  self._host_capabilities: dict[str, CaseInsensitiveDict] = {}
63  self._track_interval_track_interval: CALLBACK_TYPE | None = None
64  self._listeners: list[SsdpSearchListener] = []
65  self._setup_future_setup_future: asyncio.Future[None] | None = None
66 
67  async def async_setup(self) -> None:
68  """Set up the scanner."""
69  if self._setup_future_setup_future is not None:
70  await self._setup_future_setup_future
71  return
72 
73  self._setup_future_setup_future = self._hass_hass.loop.create_future()
74  connected_futures: list[asyncio.Future[None]] = []
75  for source_ip in await self._async_build_source_set_async_build_source_set():
76  future = self._hass_hass.loop.create_future()
77  connected_futures.append(future)
78  source = (str(source_ip), 0)
79  self._listeners.append(
80  SsdpSearchListener(
81  callback=self._async_process_entry_async_process_entry,
82  search_target=SSDP_ST,
83  target=SSDP_TARGET,
84  source=source,
85  connect_callback=partial(_set_future_if_not_done, future),
86  )
87  )
88 
89  results = await asyncio.gather(
90  *(
91  create_eager_task(listener.async_start())
92  for listener in self._listeners
93  ),
94  return_exceptions=True,
95  )
96  failed_listeners = []
97  for idx, result in enumerate(results):
98  if not isinstance(result, Exception):
99  continue
100  _LOGGER.warning(
101  "Failed to setup listener for %s: %s",
102  self._listeners[idx].source,
103  result,
104  )
105  failed_listeners.append(self._listeners[idx])
106  _set_future_if_not_done(connected_futures[idx])
107 
108  for listener in failed_listeners:
109  self._listeners.remove(listener)
110 
111  await asyncio.wait(connected_futures)
113  self._hass_hass, self.async_scanasync_scan, DISCOVERY_INTERVAL, cancel_on_shutdown=True
114  )
115  self.async_scanasync_scan()
116  _set_future_if_not_done(self._setup_future_setup_future)
117 
118  async def _async_build_source_set(self) -> set[IPv4Address]:
119  """Build the list of ssdp sources."""
120  adapters = await network.async_get_adapters(self._hass_hass)
121  sources: set[IPv4Address] = set()
122  if network.async_only_default_interface_enabled(adapters):
123  sources.add(IPv4Address("0.0.0.0"))
124  return sources
125 
126  return {
127  source_ip
128  for source_ip in await network.async_get_enabled_source_ips(self._hass_hass)
129  if isinstance(source_ip, IPv4Address) and not source_ip.is_loopback
130  }
131 
132  async def async_discover(self) -> ValuesView[CaseInsensitiveDict]:
133  """Discover bulbs."""
134  _LOGGER.debug("Yeelight discover with interval %s", DISCOVERY_SEARCH_INTERVAL)
135  await self.async_setupasync_setup()
136  for _ in range(DISCOVERY_ATTEMPTS):
137  self.async_scanasync_scan()
138  await asyncio.sleep(DISCOVERY_SEARCH_INTERVAL.total_seconds())
139  return self._unique_id_capabilities.values()
140 
141  @callback
142  def async_scan(self, _: datetime | None = None) -> None:
143  """Send discovery packets."""
144  _LOGGER.debug("Yeelight scanning")
145  for listener in self._listeners:
146  listener.async_search()
147 
148  async def async_get_capabilities(self, host: str) -> CaseInsensitiveDict | None:
149  """Get capabilities via SSDP."""
150  if host in self._host_capabilities:
151  return self._host_capabilities[host]
152 
153  host_event = asyncio.Event()
154  self._host_discovered_events.setdefault(host, []).append(host_event)
155  await self.async_setupasync_setup()
156 
157  for listener in self._listeners:
158  listener.async_search((host, SSDP_TARGET[1]))
159 
160  with contextlib.suppress(TimeoutError):
161  async with asyncio.timeout(DISCOVERY_TIMEOUT):
162  await host_event.wait()
163 
164  self._host_discovered_events[host].remove(host_event)
165  return self._host_capabilities.get(host)
166 
167  def _async_discovered_by_ssdp(self, response: CaseInsensitiveDict) -> None:
168  @callback
169  def _async_start_flow(*_) -> None:
170  discovery_flow.async_create_flow(
171  self._hass_hass,
172  DOMAIN,
173  context={"source": config_entries.SOURCE_SSDP},
175  ssdp_usn="",
176  ssdp_st=SSDP_ST,
177  ssdp_headers=response,
178  upnp={},
179  ),
180  )
181 
182  # Delay starting the flow in case the discovery is the result
183  # of another discovery
185  self._hass_hass, 1, HassJob(_async_start_flow, cancel_on_shutdown=True)
186  )
187 
188  @callback
189  def _async_process_entry(self, headers: CaseInsensitiveDict) -> None:
190  """Process a discovery."""
191  _LOGGER.debug("Discovered via SSDP: %s", headers)
192  unique_id = headers["id"]
193  host = urlparse(headers["location"]).hostname
194  assert host
195  current_entry = self._unique_id_capabilities.get(unique_id)
196  # Make sure we handle ip changes
197  if not current_entry or host != urlparse(current_entry["location"]).hostname:
198  _LOGGER.debug("Yeelight discovered with %s", headers)
199  self._async_discovered_by_ssdp_async_discovered_by_ssdp(headers)
200  self._host_capabilities[host] = headers
201  self._unique_id_capabilities[unique_id] = headers
202  for event in self._host_discovered_events.get(host, []):
203  event.set()
CaseInsensitiveDict|None async_get_capabilities(self, str host)
Definition: scanner.py:148
None async_scan(self, datetime|None _=None)
Definition: scanner.py:142
YeelightScanner async_get(cls, HomeAssistant hass)
Definition: scanner.py:51
ValuesView[CaseInsensitiveDict] async_discover(self)
Definition: scanner.py:132
None _async_discovered_by_ssdp(self, CaseInsensitiveDict response)
Definition: scanner.py:167
None _async_process_entry(self, CaseInsensitiveDict headers)
Definition: scanner.py:189
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None _set_future_if_not_done(asyncio.Future[None] future)
Definition: scanner.py:39
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679