1 """Message routing coordinators for handling NASweb push notifications."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from datetime
import datetime, timedelta
10 from typing
import Any
12 from aiohttp.web
import Request, Response
13 from webio_api
import WebioAPI
14 from webio_api.const
import KEY_DEVICE_SERIAL, KEY_OUTPUTS, KEY_TYPE, TYPE_STATUS_UPDATE
20 from .const
import STATUS_UPDATE_MAX_TIME_INTERVAL
22 _LOGGER = logging.getLogger(__name__)
26 """Coordinator redirecting push notifications for this integration to appropriate NASwebCoordinator."""
29 """Initialize coordinator."""
30 self._coordinators: dict[str, NASwebCoordinator] = {}
33 """Add NASwebCoordinator to possible notification targets."""
34 self._coordinators[serial] = coordinator
35 _LOGGER.debug(
"Added NASwebCoordinator for NASweb[%s]", serial)
38 """Remove NASwebCoordinator from possible notification targets."""
39 self._coordinators.pop(serial)
40 _LOGGER.debug(
"Removed NASwebCoordinator for NASweb[%s]", serial)
43 """Check if there is any registered coordinator for push notifications."""
44 return len(self._coordinators) > 0
47 """Wait for first status update to confirm connection with NASweb."""
48 nasweb_coordinator = self._coordinators.
get(serial)
49 if nasweb_coordinator
is None:
50 _LOGGER.error(
"Cannot check connection. No device match serial number")
52 for counter
in range(10):
53 _LOGGER.debug(
"Checking connection with: %s (%s)", serial, counter)
54 if nasweb_coordinator.is_connection_confirmed():
56 await asyncio.sleep(1)
60 self, hass: HomeAssistant, webhook_id: str, request: Request
62 """Handle webhook request from Push API."""
65 notification = await request.json()
66 serial = notification.get(KEY_DEVICE_SERIAL,
None)
67 _LOGGER.debug(
"Received push: %s", notification)
69 _LOGGER.warning(
"Received notification without nasweb identifier")
71 nasweb_coordinator = self._coordinators.
get(serial)
72 if nasweb_coordinator
is None:
73 _LOGGER.warning(
"Received notification for not registered nasweb")
75 await nasweb_coordinator.handle_push_notification(notification)
76 return Response(body=
'{"response": "ok"}', content_type=
"application/json")
80 """Coordinator managing status of single NASweb device.
82 Since status updates are managed through push notifications, this class schedules
83 periodic checks to ensure that devices are marked unavailable if updates
84 haven't been received for a prolonged period.
88 self, hass: HomeAssistant, webio_api: WebioAPI, name: str =
"NASweb[default]"
90 """Initialize NASweb coordinator."""
95 job_name = f
"NASwebCoordinator[{name}]"
98 self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object |
None]] = {}
99 data: dict[str, Any] = {}
100 data[KEY_OUTPUTS] = self.
webio_apiwebio_api.outputs
104 """Check whether coordinator received status update from NASweb."""
109 self, update_callback: CALLBACK_TYPE, context: Any =
None
110 ) -> Callable[[],
None]:
111 """Listen for data updates."""
112 schedule_update_check =
not self._listeners
115 def remove_listener() -> None:
116 """Remove update listener."""
117 self._listeners.pop(remove_listener)
118 if not self._listeners:
121 self._listeners[remove_listener] = (update_callback, context)
123 if schedule_update_check:
125 return remove_listener
129 """Update data and notify listeners."""
132 _LOGGER.debug(
"Updated %s data", self.
namename)
139 """Update all registered listeners."""
140 for update_callback, _
in list(self._listeners.values()):
144 """Handle max update interval occurrence.
146 This method is called when `STATUS_UPDATE_MAX_TIME_INTERVAL` has passed without
147 receiving a status update. It only needs to trigger state update of entities
148 which then change their state accordingly.
155 """Schedule a task to trigger entities state update after `STATUS_UPDATE_MAX_TIME_INTERVAL`.
157 This method schedules a task (`_handle_max_update_interval`) to be executed after
158 `STATUS_UPDATE_MAX_TIME_INTERVAL` seconds without status update, which enables entities
159 to change their state to unavailable. After each status update this task is rescheduled.
162 now = self.
_hass_hass.loop.time()
164 now +
timedelta(seconds=STATUS_UPDATE_MAX_TIME_INTERVAL).total_seconds()
173 """Cancel any scheduled update check call."""
179 """Handle incoming push notification from NASweb."""
180 msg_type = notification.get(KEY_TYPE)
181 _LOGGER.debug(
"Received push notification: %s", msg_type)
183 if msg_type == TYPE_STATUS_UPDATE:
188 """Process status update from NASweb."""
189 self.
webio_apiwebio_api.update_device_status(new_status)
190 new_data = {KEY_OUTPUTS: self.
webio_apiwebio_api.outputs}
None __init__(self, HomeAssistant hass, WebioAPI webio_api, str name="NASweb[default]")
None _handle_max_update_interval(self, datetime now)
None async_update_listeners(self)
None _async_unsub_last_update_check(self)
bool is_connection_confirmed(self)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None async_set_updated_data(self, dict[str, Any] data)
None handle_push_notification(self, dict notification)
None _schedule_last_update_check(self)
None process_status_update(self, dict new_status)
Response|None handle_webhook_request(self, HomeAssistant hass, str webhook_id, Request request)
None remove_coordinator(self, str serial)
None add_coordinator(self, str serial, NASwebCoordinator coordinator)
bool check_connection(self, str serial)
bool has_coordinators(self)
web.Response get(self, web.Request request, str config_key)