Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Message routing coordinators for handling NASweb push notifications."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from datetime import datetime, timedelta
8 import logging
9 import time
10 from typing import Any
11 
12 from aiohttp.web import Request, Response
13 from webio_api import WebioAPI
14 from webio_api.const import KEY_DEVICE_SERIAL, KEY_OUTPUTS, KEY_TYPE, TYPE_STATUS_UPDATE
15 
16 from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
17 from homeassistant.helpers import event
18 from homeassistant.helpers.update_coordinator import BaseDataUpdateCoordinatorProtocol
19 
20 from .const import STATUS_UPDATE_MAX_TIME_INTERVAL
21 
22 _LOGGER = logging.getLogger(__name__)
23 
24 
26  """Coordinator redirecting push notifications for this integration to appropriate NASwebCoordinator."""
27 
28  def __init__(self) -> None:
29  """Initialize coordinator."""
30  self._coordinators: dict[str, NASwebCoordinator] = {}
31 
32  def add_coordinator(self, serial: str, coordinator: NASwebCoordinator) -> None:
33  """Add NASwebCoordinator to possible notification targets."""
34  self._coordinators[serial] = coordinator
35  _LOGGER.debug("Added NASwebCoordinator for NASweb[%s]", serial)
36 
37  def remove_coordinator(self, serial: str) -> None:
38  """Remove NASwebCoordinator from possible notification targets."""
39  self._coordinators.pop(serial)
40  _LOGGER.debug("Removed NASwebCoordinator for NASweb[%s]", serial)
41 
42  def has_coordinators(self) -> bool:
43  """Check if there is any registered coordinator for push notifications."""
44  return len(self._coordinators) > 0
45 
46  async def check_connection(self, serial: str) -> bool:
47  """Wait for first status update to confirm connection with NASweb."""
48  nasweb_coordinator = self._coordinators.get(serial)
49  if nasweb_coordinator is None:
50  _LOGGER.error("Cannot check connection. No device match serial number")
51  return False
52  for counter in range(10):
53  _LOGGER.debug("Checking connection with: %s (%s)", serial, counter)
54  if nasweb_coordinator.is_connection_confirmed():
55  return True
56  await asyncio.sleep(1)
57  return False
58 
60  self, hass: HomeAssistant, webhook_id: str, request: Request
61  ) -> Response | None:
62  """Handle webhook request from Push API."""
63  if not self.has_coordinatorshas_coordinators():
64  return None
65  notification = await request.json()
66  serial = notification.get(KEY_DEVICE_SERIAL, None)
67  _LOGGER.debug("Received push: %s", notification)
68  if serial is None:
69  _LOGGER.warning("Received notification without nasweb identifier")
70  return None
71  nasweb_coordinator = self._coordinators.get(serial)
72  if nasweb_coordinator is None:
73  _LOGGER.warning("Received notification for not registered nasweb")
74  return None
75  await nasweb_coordinator.handle_push_notification(notification)
76  return Response(body='{"response": "ok"}', content_type="application/json")
77 
78 
80  """Coordinator managing status of single NASweb device.
81 
82  Since status updates are managed through push notifications, this class schedules
83  periodic checks to ensure that devices are marked unavailable if updates
84  haven't been received for a prolonged period.
85  """
86 
87  def __init__(
88  self, hass: HomeAssistant, webio_api: WebioAPI, name: str = "NASweb[default]"
89  ) -> None:
90  """Initialize NASweb coordinator."""
91  self._hass_hass = hass
92  self.namename = name
93  self.webio_apiwebio_api = webio_api
94  self._last_update_last_update: float | None = None
95  job_name = f"NASwebCoordinator[{name}]"
96  self._job_job = HassJob(self._handle_max_update_interval_handle_max_update_interval, job_name)
97  self._unsub_last_update_check_unsub_last_update_check: CALLBACK_TYPE | None = None
98  self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
99  data: dict[str, Any] = {}
100  data[KEY_OUTPUTS] = self.webio_apiwebio_api.outputs
101  self.async_set_updated_dataasync_set_updated_data(data)
102 
103  def is_connection_confirmed(self) -> bool:
104  """Check whether coordinator received status update from NASweb."""
105  return self._last_update_last_update is not None
106 
107  @callback
109  self, update_callback: CALLBACK_TYPE, context: Any = None
110  ) -> Callable[[], None]:
111  """Listen for data updates."""
112  schedule_update_check = not self._listeners
113 
114  @callback
115  def remove_listener() -> None:
116  """Remove update listener."""
117  self._listeners.pop(remove_listener)
118  if not self._listeners:
119  self._async_unsub_last_update_check_async_unsub_last_update_check()
120 
121  self._listeners[remove_listener] = (update_callback, context)
122  # This is the first listener, set up interval.
123  if schedule_update_check:
124  self._schedule_last_update_check_schedule_last_update_check()
125  return remove_listener
126 
127  @callback
128  def async_set_updated_data(self, data: dict[str, Any]) -> None:
129  """Update data and notify listeners."""
130  self.datadata = data
131  self.last_updatelast_update = self._hass_hass.loop.time()
132  _LOGGER.debug("Updated %s data", self.namename)
133  if self._listeners:
134  self._schedule_last_update_check_schedule_last_update_check()
135  self.async_update_listenersasync_update_listeners()
136 
137  @callback
138  def async_update_listeners(self) -> None:
139  """Update all registered listeners."""
140  for update_callback, _ in list(self._listeners.values()):
141  update_callback()
142 
143  async def _handle_max_update_interval(self, now: datetime) -> None:
144  """Handle max update interval occurrence.
145 
146  This method is called when `STATUS_UPDATE_MAX_TIME_INTERVAL` has passed without
147  receiving a status update. It only needs to trigger state update of entities
148  which then change their state accordingly.
149  """
150  self._unsub_last_update_check_unsub_last_update_check = None
151  if self._listeners:
152  self.async_update_listenersasync_update_listeners()
153 
154  def _schedule_last_update_check(self) -> None:
155  """Schedule a task to trigger entities state update after `STATUS_UPDATE_MAX_TIME_INTERVAL`.
156 
157  This method schedules a task (`_handle_max_update_interval`) to be executed after
158  `STATUS_UPDATE_MAX_TIME_INTERVAL` seconds without status update, which enables entities
159  to change their state to unavailable. After each status update this task is rescheduled.
160  """
161  self._async_unsub_last_update_check_async_unsub_last_update_check()
162  now = self._hass_hass.loop.time()
163  next_check = (
164  now + timedelta(seconds=STATUS_UPDATE_MAX_TIME_INTERVAL).total_seconds()
165  )
166  self._unsub_last_update_check_unsub_last_update_check = event.async_call_at(
167  self._hass_hass,
168  self._job_job,
169  next_check,
170  )
171 
173  """Cancel any scheduled update check call."""
174  if self._unsub_last_update_check_unsub_last_update_check:
175  self._unsub_last_update_check_unsub_last_update_check()
176  self._unsub_last_update_check_unsub_last_update_check = None
177 
178  async def handle_push_notification(self, notification: dict) -> None:
179  """Handle incoming push notification from NASweb."""
180  msg_type = notification.get(KEY_TYPE)
181  _LOGGER.debug("Received push notification: %s", msg_type)
182 
183  if msg_type == TYPE_STATUS_UPDATE:
184  await self.process_status_updateprocess_status_update(notification)
185  self._last_update_last_update = time.time()
186 
187  async def process_status_update(self, new_status: dict) -> None:
188  """Process status update from NASweb."""
189  self.webio_apiwebio_api.update_device_status(new_status)
190  new_data = {KEY_OUTPUTS: self.webio_apiwebio_api.outputs}
191  self.async_set_updated_dataasync_set_updated_data(new_data)
None __init__(self, HomeAssistant hass, WebioAPI webio_api, str name="NASweb[default]")
Definition: coordinator.py:89
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
Definition: coordinator.py:110
Response|None handle_webhook_request(self, HomeAssistant hass, str webhook_id, Request request)
Definition: coordinator.py:61
None add_coordinator(self, str serial, NASwebCoordinator coordinator)
Definition: coordinator.py:32
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88