1 """Helpers for Home Assistant dispatcher & internal component/platform."""
3 from __future__
import annotations
5 from collections
import defaultdict
6 from collections.abc
import Callable, Coroutine
7 from functools
import partial
9 from typing
import Any, overload
16 get_hassjob_callable_job_type,
25 _LOGGER = logging.getLogger(__name__)
26 DATA_DISPATCHER =
"dispatcher"
29 type _DispatcherDataType[*_Ts] = dict[
30 SignalType[*_Ts] | str,
32 Callable[[*_Ts], Any] | Callable[..., Any],
33 HassJob[...,
None | Coroutine[Any, Any,
None]] |
None,
41 hass: HomeAssistant, signal: SignalType[*_Ts], target: Callable[[*_Ts],
None]
42 ) -> Callable[[],
None]: ...
48 hass: HomeAssistant, signal: str, target: Callable[...,
None]
49 ) -> Callable[[],
None]: ...
55 signal: SignalType[*_Ts],
56 target: Callable[[*_Ts],
None],
57 ) -> Callable[[],
None]:
58 """Connect a callable function to a signal."""
59 async_unsub = run_callback_threadsafe(
60 hass.loop, async_dispatcher_connect, hass, signal, target
63 def remove_dispatcher() -> None:
64 """Remove signal listener."""
65 run_callback_threadsafe(hass.loop, async_unsub).result()
67 return remove_dispatcher
71 def _async_remove_dispatcher[*_Ts](
72 dispatchers: _DispatcherDataType[*_Ts],
73 signal: SignalType[*_Ts] | str,
74 target: Callable[[*_Ts], Any] | Callable[..., Any],
76 """Remove signal listener."""
78 signal_dispatchers = dispatchers[signal]
79 del signal_dispatchers[target]
82 if not signal_dispatchers:
83 del dispatchers[signal]
84 except (KeyError, ValueError):
87 _LOGGER.warning(
"Unable to remove unknown dispatcher %s", target)
94 hass: HomeAssistant, signal: SignalType[*_Ts], target: Callable[[*_Ts], Any]
95 ) -> Callable[[],
None]: ...
102 hass: HomeAssistant, signal: str, target: Callable[..., Any]
103 ) -> Callable[[],
None]: ...
110 signal: SignalType[*_Ts] | str,
111 target: Callable[[*_Ts], Any] | Callable[..., Any],
112 ) -> Callable[[],
None]:
113 """Connect a callable function to a signal.
115 This method must be run in the event loop.
117 if DATA_DISPATCHER
not in hass.data:
118 hass.data[DATA_DISPATCHER] = defaultdict(dict)
119 dispatchers: _DispatcherDataType[*_Ts] = hass.data[DATA_DISPATCHER]
120 dispatchers[signal][target] =
None
125 return partial(_async_remove_dispatcher, dispatchers, signal, target)
131 hass: HomeAssistant, signal: SignalType[*_Ts], *args: *_Ts
142 hass: HomeAssistant, signal: SignalType[*_Ts], *args: *_Ts
144 """Send signal and data."""
145 hass.loop.call_soon_threadsafe(async_dispatcher_send_internal, hass, signal, *args)
149 signal: SignalType[*_Ts] | str,
150 target: Callable[[*_Ts], Any] | Callable[..., Any],
153 """Format error message."""
157 f
"Exception in {getattr(target, "__name__
", None) or target} "
158 f
"when dispatching '{signal}': {args}"
162 def _generate_job[*_Ts](
163 signal: SignalType[*_Ts] | str, target: Callable[[*_Ts], Any] | Callable[..., Any]
164 ) -> HassJob[..., Coroutine[Any, Any,
None] |
None]:
165 """Generate a HassJob for a signal and target."""
167 name = f
"dispatcher {signal}"
168 if job_type
is HassJobType.Callback:
172 return HassJob(target, name, job_type=job_type)
175 target, partial(_format_err, signal, target), job_type=job_type
186 hass: HomeAssistant, signal: SignalType[*_Ts], *args: *_Ts
199 hass: HomeAssistant, signal: SignalType[*_Ts] | str, *args: *_Ts
201 """Send signal and data.
203 This method must be run in the event loop.
214 hass.verify_event_loop_thread(
"async_dispatcher_send")
215 async_dispatcher_send_internal(hass, signal, *args)
220 def async_dispatcher_send_internal[*_Ts](
221 hass: HomeAssistant, signal: SignalType[*_Ts] | str, *args: *_Ts
223 """Send signal and data.
225 This method is intended to only be used by core internally
226 and should not be considered a stable API. We will make
227 breaking changes to this function in the future and it
228 should not be used in integrations.
230 This method must be run in the event loop.
232 if (maybe_dispatchers := hass.data.get(DATA_DISPATCHER))
is None:
234 dispatchers: _DispatcherDataType[*_Ts] = maybe_dispatchers
235 if (target_list := dispatchers.get(signal))
is None:
238 for target, job
in list(target_list.items()):
240 job = _generate_job(signal, target)
241 target_list[target] = job
245 if job.job_type
is HassJobType.Callback:
249 log_exception(partial(_format_err, signal, target), *args)
251 hass.async_run_hass_job(job, *args)
str _format_err(str name, *Any args)
HassJobType get_hassjob_callable_job_type(Callable[..., Any] target)
Callable[[], None] dispatcher_connect(HomeAssistant hass, str signal, Callable[..., None] target)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None dispatcher_send(HomeAssistant hass, str signal, *Any args)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)