1 """Mixin class for handling harmony callback subscriptions."""
3 from __future__
import annotations
7 from typing
import Any, NamedTuple
11 _LOGGER = logging.getLogger(__name__)
13 type NoParamCallback = HassJob[[], Any] |
None
14 type ActivityCallback = HassJob[[tuple], Any] |
None
18 """Callback type for Harmony Hub notifications."""
20 connected: NoParamCallback
21 disconnected: NoParamCallback
22 config_updated: NoParamCallback
23 activity_starting: ActivityCallback
24 activity_started: ActivityCallback
28 """Base implementation for a subscriber."""
30 def __init__(self, hass: HomeAssistant) ->
None:
31 """Initialize an subscriber."""
34 self._subscriptions: list[HarmonyCallback] = []
38 """Acquire the lock."""
43 """Release the lock."""
49 """Add a callback subscriber."""
50 self._subscriptions.append(update_callbacks)
52 def _unsubscribe() -> None:
59 """Remove a callback subscriber."""
60 self._subscriptions.
remove(update_callback)
63 _LOGGER.debug(
"config_updated")
67 _LOGGER.debug(
"connected")
73 _LOGGER.debug(
"disconnected")
79 _LOGGER.debug(
"activity %s starting", activity_info)
83 _LOGGER.debug(
"activity %s started", activity_info)
88 self, callback_func_name: str, argument: tuple |
None =
None
90 for subscription
in self._subscriptions:
91 if current_callback_job := getattr(subscription, callback_func_name):
93 self.
_hass_hass.async_run_hass_job(current_callback_job, argument)
95 self.
_hass_hass.async_run_hass_job(current_callback_job)
None _disconnected(self, str|None _=None)
None async_unlock_start_activity(self)
None async_unsubscribe(self, HarmonyCallback update_callback)
None _connected(self, str|None _=None)
None _activity_started(self, tuple activity_info)
None _config_updated(self, dict|None _=None)
None __init__(self, HomeAssistant hass)
None _call_callbacks(self, str callback_func_name, tuple|None argument=None)
CALLBACK_TYPE async_subscribe(self, HarmonyCallback update_callbacks)
None async_lock_start_activity(self)
None _activity_starting(self, tuple activity_info)
bool remove(self, _T matcher)