Home Assistant Unofficial Reference 2024.12.1
discovery_flow.py
Go to the documentation of this file.
1 """The discovery flow helper."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Coroutine
6 import dataclasses
7 from typing import TYPE_CHECKING, Any, NamedTuple, Self
8 
9 from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
10 from homeassistant.core import CoreState, Event, HomeAssistant, callback
11 from homeassistant.loader import bind_hass
12 from homeassistant.util.async_ import gather_with_limited_concurrency
13 from homeassistant.util.hass_dict import HassKey
14 
15 if TYPE_CHECKING:
16  from homeassistant.config_entries import ConfigFlowContext, ConfigFlowResult
17 
18 FLOW_INIT_LIMIT = 20
19 DISCOVERY_FLOW_DISPATCHER: HassKey[FlowDispatcher] = HassKey(
20  "discovery_flow_dispatcher"
21 )
22 
23 
24 @dataclasses.dataclass(kw_only=True, slots=True)
26  """Serializable discovery key."""
27 
28  domain: str
29  key: str | tuple[str, ...]
30  version: int
31 
32  @classmethod
33  def from_json_dict(cls, json_dict: dict[str, Any]) -> Self:
34  """Construct from JSON dict."""
35  if type(key := json_dict["key"]) is list:
36  key = tuple(key)
37  return cls(domain=json_dict["domain"], key=key, version=json_dict["version"])
38 
39 
40 @bind_hass
41 @callback
43  hass: HomeAssistant,
44  domain: str,
45  context: ConfigFlowContext,
46  data: Any,
47  *,
48  discovery_key: DiscoveryKey | None = None,
49 ) -> None:
50  """Create a discovery flow."""
51  dispatcher: FlowDispatcher | None = None
52  if DISCOVERY_FLOW_DISPATCHER in hass.data:
53  dispatcher = hass.data[DISCOVERY_FLOW_DISPATCHER]
54  elif hass.state is not CoreState.running:
55  dispatcher = hass.data[DISCOVERY_FLOW_DISPATCHER] = FlowDispatcher(hass)
56  dispatcher.async_setup()
57 
58  if discovery_key:
59  context = context | {"discovery_key": discovery_key}
60 
61  if not dispatcher or dispatcher.started:
62  if init_coro := _async_init_flow(hass, domain, context, data):
63  hass.async_create_background_task(
64  init_coro, f"discovery flow {domain} {context}", eager_start=True
65  )
66  return
67 
68  dispatcher.async_create(domain, context, data)
69 
70 
71 @callback
73  hass: HomeAssistant, domain: str, context: ConfigFlowContext, data: Any
74 ) -> Coroutine[None, None, ConfigFlowResult] | None:
75  """Create a discovery flow."""
76  # Avoid spawning flows that have the same initial discovery data
77  # as ones in progress as it may cause additional device probing
78  # which can overload devices since zeroconf/ssdp updates can happen
79  # multiple times in the same minute
80  if (
81  hass.config_entries.flow.async_has_matching_discovery_flow(
82  domain, context, data
83  )
84  or hass.is_stopping
85  ):
86  return None
87 
88  return hass.config_entries.flow.async_init(domain, context=context, data=data)
89 
90 
91 class PendingFlowKey(NamedTuple):
92  """Key for pending flows."""
93 
94  domain: str
95  source: str
96 
97 
98 class PendingFlowValue(NamedTuple):
99  """Value for pending flows."""
100 
101  context: ConfigFlowContext
102  data: Any
103 
104 
106  """Dispatch discovery flows."""
107 
108  def __init__(self, hass: HomeAssistant) -> None:
109  """Init the discovery dispatcher."""
110  self.hasshass = hass
111  self.startedstarted = False
112  self.pending_flowspending_flows: dict[PendingFlowKey, list[PendingFlowValue]] = {}
113 
114  @callback
115  def async_setup(self) -> None:
116  """Set up the flow disptcher."""
117  self.hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self._async_start_async_start)
118 
119  async def _async_start(self, event: Event) -> None:
120  """Start processing pending flows."""
121  pending_flows = self.pending_flowspending_flows
122  self.pending_flowspending_flows = {}
123  self.startedstarted = True
124  init_coros = (
125  init_coro
126  for flow_key, flows in pending_flows.items()
127  for flow_values in flows
128  if (
129  init_coro := _async_init_flow(
130  self.hasshass,
131  flow_key.domain,
132  flow_values.context,
133  flow_values.data,
134  )
135  )
136  )
137  await gather_with_limited_concurrency(FLOW_INIT_LIMIT, *init_coros)
138 
139  @callback
140  def async_create(self, domain: str, context: ConfigFlowContext, data: Any) -> None:
141  """Create and add or queue a flow."""
142  key = PendingFlowKey(domain, context["source"])
143  values = PendingFlowValue(context, data)
144  existing = self.pending_flowspending_flows.setdefault(key, [])
145  if not any(existing_values.data == data for existing_values in existing):
146  existing.append(values)
Self from_json_dict(cls, dict[str, Any] json_dict)
None async_create(self, str domain, ConfigFlowContext context, Any data)
Coroutine[None, None, ConfigFlowResult]|None _async_init_flow(HomeAssistant hass, str domain, ConfigFlowContext context, Any data)
None async_create_flow(HomeAssistant hass, str domain, ConfigFlowContext context, Any data, *DiscoveryKey|None discovery_key=None)
Any gather_with_limited_concurrency(int limit, *Any tasks, bool return_exceptions=False)
Definition: async_.py:103