1 """The discovery flow helper."""
3 from __future__
import annotations
5 from collections.abc
import Coroutine
7 from typing
import TYPE_CHECKING, Any, NamedTuple, Self
19 DISCOVERY_FLOW_DISPATCHER: HassKey[FlowDispatcher] =
HassKey(
20 "discovery_flow_dispatcher"
24 @dataclasses.dataclass(kw_only=True, slots=True)
26 """Serializable discovery key."""
29 key: str | tuple[str, ...]
34 """Construct from JSON dict."""
35 if type(key := json_dict[
"key"])
is list:
37 return cls(domain=json_dict[
"domain"], key=key, version=json_dict[
"version"])
45 context: ConfigFlowContext,
48 discovery_key: DiscoveryKey |
None =
None,
50 """Create a discovery flow."""
51 dispatcher: FlowDispatcher |
None =
None
52 if DISCOVERY_FLOW_DISPATCHER
in hass.data:
53 dispatcher = hass.data[DISCOVERY_FLOW_DISPATCHER]
54 elif hass.state
is not CoreState.running:
55 dispatcher = hass.data[DISCOVERY_FLOW_DISPATCHER] =
FlowDispatcher(hass)
56 dispatcher.async_setup()
59 context = context | {
"discovery_key": discovery_key}
61 if not dispatcher
or dispatcher.started:
63 hass.async_create_background_task(
64 init_coro, f
"discovery flow {domain} {context}", eager_start=
True
68 dispatcher.async_create(domain, context, data)
73 hass: HomeAssistant, domain: str, context: ConfigFlowContext, data: Any
74 ) -> Coroutine[
None,
None, ConfigFlowResult] |
None:
75 """Create a discovery flow."""
81 hass.config_entries.flow.async_has_matching_discovery_flow(
88 return hass.config_entries.flow.async_init(domain, context=context, data=data)
92 """Key for pending flows."""
99 """Value for pending flows."""
101 context: ConfigFlowContext
106 """Dispatch discovery flows."""
109 """Init the discovery dispatcher."""
112 self.
pending_flowspending_flows: dict[PendingFlowKey, list[PendingFlowValue]] = {}
116 """Set up the flow disptcher."""
117 self.
hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self.
_async_start_async_start)
120 """Start processing pending flows."""
126 for flow_key, flows
in pending_flows.items()
127 for flow_values
in flows
140 def async_create(self, domain: str, context: ConfigFlowContext, data: Any) ->
None:
141 """Create and add or queue a flow."""
144 existing = self.
pending_flowspending_flows.setdefault(key, [])
145 if not any(existing_values.data == data
for existing_values
in existing):
146 existing.append(values)
Self from_json_dict(cls, dict[str, Any] json_dict)
None __init__(self, HomeAssistant hass)
None _async_start(self, Event event)
None async_create(self, str domain, ConfigFlowContext context, Any data)
Coroutine[None, None, ConfigFlowResult]|None _async_init_flow(HomeAssistant hass, str domain, ConfigFlowContext context, Any data)
None async_create_flow(HomeAssistant hass, str domain, ConfigFlowContext context, Any data, *DiscoveryKey|None discovery_key=None)
Any gather_with_limited_concurrency(int limit, *Any tasks, bool return_exceptions=False)