Home Assistant Unofficial Reference 2024.12.1
subscriber.py
Go to the documentation of this file.
1 """Mixin class for handling harmony callback subscriptions."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 import logging
7 from typing import Any, NamedTuple
8 
9 from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
10 
11 _LOGGER = logging.getLogger(__name__)
12 
13 type NoParamCallback = HassJob[[], Any] | None
14 type ActivityCallback = HassJob[[tuple], Any] | None
15 
16 
17 class HarmonyCallback(NamedTuple):
18  """Callback type for Harmony Hub notifications."""
19 
20  connected: NoParamCallback
21  disconnected: NoParamCallback
22  config_updated: NoParamCallback
23  activity_starting: ActivityCallback
24  activity_started: ActivityCallback
25 
26 
28  """Base implementation for a subscriber."""
29 
30  def __init__(self, hass: HomeAssistant) -> None:
31  """Initialize an subscriber."""
32  super().__init__()
33  self._hass_hass = hass
34  self._subscriptions: list[HarmonyCallback] = []
35  self._activity_lock_activity_lock = asyncio.Lock()
36 
37  async def async_lock_start_activity(self) -> None:
38  """Acquire the lock."""
39  await self._activity_lock_activity_lock.acquire()
40 
41  @callback
42  def async_unlock_start_activity(self) -> None:
43  """Release the lock."""
44  if self._activity_lock_activity_lock.locked():
45  self._activity_lock_activity_lock.release()
46 
47  @callback
48  def async_subscribe(self, update_callbacks: HarmonyCallback) -> CALLBACK_TYPE:
49  """Add a callback subscriber."""
50  self._subscriptions.append(update_callbacks)
51 
52  def _unsubscribe() -> None:
53  self.async_unsubscribeasync_unsubscribe(update_callbacks)
54 
55  return _unsubscribe
56 
57  @callback
58  def async_unsubscribe(self, update_callback: HarmonyCallback) -> None:
59  """Remove a callback subscriber."""
60  self._subscriptions.remove(update_callback)
61 
62  def _config_updated(self, _: dict | None = None) -> None:
63  _LOGGER.debug("config_updated")
64  self._call_callbacks_call_callbacks("config_updated")
65 
66  def _connected(self, _: str | None = None) -> None:
67  _LOGGER.debug("connected")
68  self.async_unlock_start_activityasync_unlock_start_activity()
69  self._available_available = True
70  self._call_callbacks_call_callbacks("connected")
71 
72  def _disconnected(self, _: str | None = None) -> None:
73  _LOGGER.debug("disconnected")
74  self.async_unlock_start_activityasync_unlock_start_activity()
75  self._available_available = False
76  self._call_callbacks_call_callbacks("disconnected")
77 
78  def _activity_starting(self, activity_info: tuple) -> None:
79  _LOGGER.debug("activity %s starting", activity_info)
80  self._call_callbacks_call_callbacks("activity_starting", activity_info)
81 
82  def _activity_started(self, activity_info: tuple) -> None:
83  _LOGGER.debug("activity %s started", activity_info)
84  self.async_unlock_start_activityasync_unlock_start_activity()
85  self._call_callbacks_call_callbacks("activity_started", activity_info)
86 
88  self, callback_func_name: str, argument: tuple | None = None
89  ) -> None:
90  for subscription in self._subscriptions:
91  if current_callback_job := getattr(subscription, callback_func_name):
92  if argument:
93  self._hass_hass.async_run_hass_job(current_callback_job, argument)
94  else:
95  self._hass_hass.async_run_hass_job(current_callback_job)
None async_unsubscribe(self, HarmonyCallback update_callback)
Definition: subscriber.py:58
None _call_callbacks(self, str callback_func_name, tuple|None argument=None)
Definition: subscriber.py:89
CALLBACK_TYPE async_subscribe(self, HarmonyCallback update_callbacks)
Definition: subscriber.py:48
bool remove(self, _T matcher)
Definition: match.py:214