Home Assistant Unofficial Reference 2024.12.1
data.py
Go to the documentation of this file.
1 """Base class for protect data."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict
6 from collections.abc import Callable, Generator, Iterable
7 from datetime import datetime, timedelta
8 from functools import partial
9 import logging
10 from typing import TYPE_CHECKING, Any, cast
11 
12 from uiprotect import ProtectApiClient
13 from uiprotect.data import (
14  NVR,
15  Camera,
16  Event,
17  EventType,
18  ModelType,
19  ProtectAdoptableDeviceModel,
20  WSSubscriptionMessage,
21 )
22 from uiprotect.exceptions import ClientError, NotAuthorized
23 from uiprotect.utils import log_event
24 from uiprotect.websocket import WebsocketState
25 
26 from homeassistant.config_entries import ConfigEntry
27 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
28 from homeassistant.helpers import device_registry as dr
30  async_dispatcher_connect,
31  async_dispatcher_send,
32 )
33 from homeassistant.helpers.event import async_track_time_interval
34 
35 from .const import (
36  AUTH_RETRIES,
37  CONF_DISABLE_RTSP,
38  CONF_MAX_MEDIA,
39  DEFAULT_MAX_MEDIA,
40  DEVICES_THAT_ADOPT,
41  DISPATCH_ADD,
42  DISPATCH_ADOPT,
43  DISPATCH_CHANNELS,
44  DOMAIN,
45 )
46 from .utils import async_get_devices_by_type
47 
48 _LOGGER = logging.getLogger(__name__)
49 type ProtectDeviceType = ProtectAdoptableDeviceModel | NVR
50 type UFPConfigEntry = ConfigEntry[ProtectData]
51 
52 
53 @callback
55  hass: HomeAssistant, entry: UFPConfigEntry
56 ) -> bool:
57  """Check if the last update was successful for a config entry."""
58  return hasattr(entry, "runtime_data") and entry.runtime_data.last_update_success
59 
60 
61 @callback
62 def _async_dispatch_id(entry: UFPConfigEntry, dispatch: str) -> str:
63  """Generate entry specific dispatch ID."""
64  return f"{DOMAIN}.{entry.entry_id}.{dispatch}"
65 
66 
68  """Coordinate updates."""
69 
70  def __init__(
71  self,
72  hass: HomeAssistant,
73  protect: ProtectApiClient,
74  update_interval: timedelta,
75  entry: UFPConfigEntry,
76  ) -> None:
77  """Initialize an subscriber."""
78  self._entry_entry = entry
79  self._hass_hass = hass
80  self._update_interval_update_interval = update_interval
81  self._subscriptions: defaultdict[
82  str, set[Callable[[ProtectDeviceType], None]]
83  ] = defaultdict(set)
84  self._pending_camera_ids: set[str] = set()
85  self._unsubs_unsubs: list[CALLBACK_TYPE] = []
86  self._auth_failures_auth_failures = 0
87  self.last_update_successlast_update_success = False
88  self.apiapi = protect
89  self.adopt_signaladopt_signal = _async_dispatch_id(entry, DISPATCH_ADOPT)
90  self.add_signaladd_signal = _async_dispatch_id(entry, DISPATCH_ADD)
91  self.channels_signalchannels_signal = _async_dispatch_id(entry, DISPATCH_CHANNELS)
92 
93  @property
94  def disable_stream(self) -> bool:
95  """Check if RTSP is disabled."""
96  return self._entry_entry.options.get(CONF_DISABLE_RTSP, False)
97 
98  @property
99  def max_events(self) -> int:
100  """Max number of events to load at once."""
101  return self._entry_entry.options.get(CONF_MAX_MEDIA, DEFAULT_MAX_MEDIA)
102 
103  @callback
105  self, add_callback: Callable[[ProtectAdoptableDeviceModel], None]
106  ) -> None:
107  """Add an callback for on device adopt."""
108  self._entry_entry.async_on_unload(
109  async_dispatcher_connect(self._hass_hass, self.adopt_signaladopt_signal, add_callback)
110  )
111 
113  self, device_types: Iterable[ModelType], ignore_unadopted: bool = True
114  ) -> Generator[ProtectAdoptableDeviceModel]:
115  """Get all devices matching types."""
116  bootstrap = self.apiapi.bootstrap
117  for device_type in device_types:
118  for device in async_get_devices_by_type(bootstrap, device_type).values():
119  if ignore_unadopted and not device.is_adopted_by_us:
120  continue
121  yield device
122 
123  def get_cameras(self, ignore_unadopted: bool = True) -> Generator[Camera]:
124  """Get all cameras."""
125  return cast(
126  Generator[Camera], self.get_by_typesget_by_types({ModelType.CAMERA}, ignore_unadopted)
127  )
128 
129  @callback
130  def async_setup(self) -> None:
131  """Subscribe and do the refresh."""
132  self.last_update_successlast_update_success = True
133  self._async_update_change_async_update_change(True, force_update=True)
134  api = self.apiapi
135  self._unsubs_unsubs = [
136  api.subscribe_websocket_state(self._async_websocket_state_changed_async_websocket_state_changed),
137  api.subscribe_websocket(self._async_process_ws_message_async_process_ws_message),
139  self._hass_hass, self._async_poll_async_poll, self._update_interval_update_interval
140  ),
141  ]
142 
143  @callback
144  def _async_websocket_state_changed(self, state: WebsocketState) -> None:
145  """Handle a change in the websocket state."""
146  self._async_update_change_async_update_change(state is WebsocketState.CONNECTED)
147 
149  self,
150  success: bool,
151  force_update: bool = False,
152  exception: Exception | None = None,
153  ) -> None:
154  """Process a change in update success."""
155  was_success = self.last_update_successlast_update_success
156  self.last_update_successlast_update_success = success
157 
158  if not success:
159  level = logging.ERROR if was_success else logging.DEBUG
160  title = self._entry_entry.title
161  _LOGGER.log(level, "%s: Connection lost", title, exc_info=exception)
162  self._async_process_updates_async_process_updates()
163  return
164 
165  self._auth_failures_auth_failures = 0
166  if not was_success:
167  _LOGGER.warning("%s: Connection restored", self._entry_entry.title)
168  self._async_process_updates_async_process_updates()
169  elif force_update:
170  self._async_process_updates_async_process_updates()
171 
172  async def async_stop(self, *args: Any) -> None:
173  """Stop processing data."""
174  for unsub in self._unsubs_unsubs:
175  unsub()
176  self._unsubs_unsubs.clear()
177  await self.apiapi.async_disconnect_ws()
178 
179  async def async_refresh(self) -> None:
180  """Update the data."""
181  try:
182  await self.apiapi.update()
183  except NotAuthorized as ex:
184  if self._auth_failures_auth_failures < AUTH_RETRIES:
185  _LOGGER.exception("Auth error while updating")
186  self._auth_failures_auth_failures += 1
187  else:
188  await self.async_stopasync_stop()
189  _LOGGER.exception("Reauthentication required")
190  self._entry_entry.async_start_reauth(self._hass_hass)
191  self._async_update_change_async_update_change(False, exception=ex)
192  except ClientError as ex:
193  self._async_update_change_async_update_change(False, exception=ex)
194  else:
195  self._async_update_change_async_update_change(True, force_update=True)
196 
197  @callback
198  def async_add_pending_camera_id(self, camera_id: str) -> None:
199  """Add pending camera.
200 
201  A "pending camera" is one that has been adopted by not had its camera channels
202  initialized yet. Will cause Websocket code to check for channels to be
203  initialized for the camera and issue a dispatch once they do.
204  """
205  self._pending_camera_ids.add(camera_id)
206 
207  @callback
208  def _async_add_device(self, device: ProtectAdoptableDeviceModel) -> None:
209  if device.is_adopted_by_us:
210  _LOGGER.debug("Device adopted: %s", device.id)
211  async_dispatcher_send(self._hass_hass, self.adopt_signaladopt_signal, device)
212  else:
213  _LOGGER.debug("New device detected: %s", device.id)
214  async_dispatcher_send(self._hass_hass, self.add_signaladd_signal, device)
215 
216  @callback
217  def _async_remove_device(self, device: ProtectAdoptableDeviceModel) -> None:
218  registry = dr.async_get(self._hass_hass)
219  device_entry = registry.async_get_device(
220  connections={(dr.CONNECTION_NETWORK_MAC, device.mac)}
221  )
222  if device_entry:
223  _LOGGER.debug("Device removed: %s", device.id)
224  registry.async_update_device(
225  device_entry.id, remove_config_entry_id=self._entry_entry.entry_id
226  )
227 
228  @callback
230  self, device: ProtectAdoptableDeviceModel | NVR, changed_data: dict[str, Any]
231  ) -> None:
232  self._async_signal_device_update_async_signal_device_update(device)
233  if (
234  device.model is ModelType.CAMERA
235  and device.id in self._pending_camera_ids
236  and "channels" in changed_data
237  ):
238  self._pending_camera_ids.remove(device.id)
239  async_dispatcher_send(self._hass_hass, self.channels_signalchannels_signal, device)
240 
241  # trigger update for all Cameras with LCD screens when NVR Doorbell settings updates
242  if "doorbell_settings" in changed_data:
243  _LOGGER.debug(
244  "Doorbell messages updated. Updating devices with LCD screens"
245  )
246  self.apiapi.bootstrap.nvr.update_all_messages()
247  for camera in self.get_camerasget_cameras():
248  if camera.feature_flags.has_lcd_screen:
249  self._async_signal_device_update_async_signal_device_update(camera)
250 
251  @callback
252  def _async_process_ws_message(self, message: WSSubscriptionMessage) -> None:
253  """Process a message from the websocket."""
254  if (new_obj := message.new_obj) is None:
255  if isinstance(message.old_obj, ProtectAdoptableDeviceModel):
256  self._async_remove_device_async_remove_device(message.old_obj)
257  return
258 
259  model_type = new_obj.model
260  if model_type is ModelType.EVENT:
261  if TYPE_CHECKING:
262  assert isinstance(new_obj, Event)
263  if _LOGGER.isEnabledFor(logging.DEBUG):
264  log_event(new_obj)
265  if (
266  (new_obj.type is EventType.DEVICE_ADOPTED)
267  and (metadata := new_obj.metadata)
268  and (device_id := metadata.device_id)
269  and (device := self.apiapi.bootstrap.get_device_from_id(device_id))
270  ):
271  self._async_add_device_async_add_device(device)
272  elif camera := new_obj.camera:
273  self._async_signal_device_update_async_signal_device_update(camera)
274  elif light := new_obj.light:
275  self._async_signal_device_update_async_signal_device_update(light)
276  elif sensor := new_obj.sensor:
277  self._async_signal_device_update_async_signal_device_update(sensor)
278  return
279 
280  if model_type is ModelType.LIVEVIEW and len(self.apiapi.bootstrap.viewers) > 0:
281  # alert user viewport needs restart so voice clients can get new options
282  _LOGGER.warning(
283  "Liveviews updated. Restart Home Assistant to update Viewport select"
284  " options"
285  )
286  return
287 
288  if message.old_obj is None and isinstance(new_obj, ProtectAdoptableDeviceModel):
289  self._async_add_device_async_add_device(new_obj)
290  return
291 
292  if getattr(new_obj, "is_adopted_by_us", True) and hasattr(new_obj, "mac"):
293  if TYPE_CHECKING:
294  assert isinstance(new_obj, (ProtectAdoptableDeviceModel, NVR))
295  self._async_update_device_async_update_device(new_obj, message.changed_data)
296 
297  @callback
298  def _async_process_updates(self) -> None:
299  """Process update from the protect data."""
300  self._async_signal_device_update_async_signal_device_update(self.apiapi.bootstrap.nvr)
301  for device in self.get_by_typesget_by_types(DEVICES_THAT_ADOPT):
302  self._async_signal_device_update_async_signal_device_update(device)
303 
304  @callback
305  def _async_poll(self, now: datetime) -> None:
306  """Poll the Protect API."""
307  self._entry_entry.async_create_background_task(
308  self._hass_hass,
309  self.async_refreshasync_refresh(),
310  name=f"{DOMAIN} {self._entry.title} refresh",
311  eager_start=True,
312  )
313 
314  @callback
316  self, mac: str, update_callback: Callable[[ProtectDeviceType], None]
317  ) -> CALLBACK_TYPE:
318  """Add an callback subscriber."""
319  self._subscriptions[mac].add(update_callback)
320  return partial(self._async_unsubscribe_async_unsubscribe, mac, update_callback)
321 
322  @callback
324  self, mac: str, update_callback: Callable[[ProtectDeviceType], None]
325  ) -> None:
326  """Remove a callback subscriber."""
327  self._subscriptions[mac].remove(update_callback)
328  if not self._subscriptions[mac]:
329  del self._subscriptions[mac]
330 
331  @callback
332  def _async_signal_device_update(self, device: ProtectDeviceType) -> None:
333  """Call the callbacks for a device_id."""
334  mac = device.mac
335  if not (subscriptions := self._subscriptions.get(mac)):
336  return
337  _LOGGER.debug("Updating device: %s (%s)", device.name, mac)
338  for update_callback in subscriptions:
339  update_callback(device)
340 
341 
342 @callback
344  hass: HomeAssistant, config_entry_ids: set[str]
345 ) -> ProtectApiClient | None:
346  """Find the UFP instance for the config entry ids."""
347  return next(
348  iter(
349  entry.runtime_data.api
350  for entry_id in config_entry_ids
351  if (entry := hass.config_entries.async_get_entry(entry_id))
352  and entry.domain == DOMAIN
353  and hasattr(entry, "runtime_data")
354  ),
355  None,
356  )
357 
358 
359 @callback
360 def async_get_ufp_entries(hass: HomeAssistant) -> list[UFPConfigEntry]:
361  """Get all the UFP entries."""
362  return cast(
363  list[UFPConfigEntry],
364  [
365  entry
366  for entry in hass.config_entries.async_entries(
367  DOMAIN, include_ignore=True, include_disabled=True
368  )
369  if hasattr(entry, "runtime_data")
370  ],
371  )
372 
373 
374 @callback
375 def async_get_data_for_nvr_id(hass: HomeAssistant, nvr_id: str) -> ProtectData | None:
376  """Find the ProtectData instance for the NVR id."""
377  return next(
378  iter(
379  entry.runtime_data
380  for entry in async_get_ufp_entries(hass)
381  if entry.runtime_data.api.bootstrap.nvr.id == nvr_id
382  ),
383  None,
384  )
385 
386 
387 @callback
389  hass: HomeAssistant, entry_id: str
390 ) -> ProtectData | None:
391  """Find the ProtectData instance for a config entry id."""
392  if (entry := hass.config_entries.async_get_entry(entry_id)) and hasattr(
393  entry, "runtime_data"
394  ):
395  entry = cast(UFPConfigEntry, entry)
396  return entry.runtime_data
397  return None
None _async_remove_device(self, ProtectAdoptableDeviceModel device)
Definition: data.py:217
None _async_update_change(self, bool success, bool force_update=False, Exception|None exception=None)
Definition: data.py:153
None _async_update_device(self, ProtectAdoptableDeviceModel|NVR device, dict[str, Any] changed_data)
Definition: data.py:231
CALLBACK_TYPE async_subscribe(self, str mac, Callable[[ProtectDeviceType], None] update_callback)
Definition: data.py:317
None async_add_pending_camera_id(self, str camera_id)
Definition: data.py:198
Generator[Camera] get_cameras(self, bool ignore_unadopted=True)
Definition: data.py:123
None _async_add_device(self, ProtectAdoptableDeviceModel device)
Definition: data.py:208
None _async_process_ws_message(self, WSSubscriptionMessage message)
Definition: data.py:252
None __init__(self, HomeAssistant hass, ProtectApiClient protect, timedelta update_interval, UFPConfigEntry entry)
Definition: data.py:76
Generator[ProtectAdoptableDeviceModel] get_by_types(self, Iterable[ModelType] device_types, bool ignore_unadopted=True)
Definition: data.py:114
None _async_signal_device_update(self, ProtectDeviceType device)
Definition: data.py:332
None _async_unsubscribe(self, str mac, Callable[[ProtectDeviceType], None] update_callback)
Definition: data.py:325
None _async_websocket_state_changed(self, WebsocketState state)
Definition: data.py:144
None async_subscribe_adopt(self, Callable[[ProtectAdoptableDeviceModel], None] add_callback)
Definition: data.py:106
bool add(self, _T matcher)
Definition: match.py:185
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
str _async_dispatch_id(UFPConfigEntry entry, str dispatch)
Definition: data.py:62
ProtectData|None async_get_data_for_entry_id(HomeAssistant hass, str entry_id)
Definition: data.py:390
ProtectApiClient|None async_ufp_instance_for_config_entry_ids(HomeAssistant hass, set[str] config_entry_ids)
Definition: data.py:345
bool async_last_update_was_successful(HomeAssistant hass, UFPConfigEntry entry)
Definition: data.py:56
ProtectData|None async_get_data_for_nvr_id(HomeAssistant hass, str nvr_id)
Definition: data.py:375
list[UFPConfigEntry] async_get_ufp_entries(HomeAssistant hass)
Definition: data.py:360
dict[str, ProtectAdoptableDeviceModel] async_get_devices_by_type(Bootstrap bootstrap, ModelType device_type)
Definition: utils.py:75
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679