Home Assistant Unofficial Reference 2024.12.1
entity.py
Go to the documentation of this file.
1 """Shared Entity definition for UniFi Protect Integration."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Coroutine, Sequence
6 from dataclasses import dataclass
7 from datetime import datetime
8 from enum import Enum
9 from functools import partial
10 import logging
11 from operator import attrgetter
12 from typing import TYPE_CHECKING, Any, Generic, TypeVar
13 
14 from uiprotect import make_enabled_getter, make_required_getter, make_value_getter
15 from uiprotect.data import (
16  NVR,
17  Event,
18  ModelType,
19  ProtectAdoptableDeviceModel,
20  SmartDetectObjectType,
21  StateType,
22 )
23 
24 from homeassistant.core import callback
26 from homeassistant.helpers.device_registry import DeviceInfo
27 from homeassistant.helpers.entity import Entity, EntityDescription
28 
29 from .const import (
30  ATTR_EVENT_ID,
31  ATTR_EVENT_SCORE,
32  DEFAULT_ATTRIBUTION,
33  DEFAULT_BRAND,
34  DOMAIN,
35 )
36 from .data import ProtectData, ProtectDeviceType
37 
38 _LOGGER = logging.getLogger(__name__)
39 
40 T = TypeVar("T", bound=ProtectAdoptableDeviceModel | NVR)
41 
42 
43 class PermRequired(int, Enum):
44  """Type of permission level required for entity."""
45 
46  NO_WRITE = 1
47  WRITE = 2
48  DELETE = 3
49 
50 
51 @callback
53  data: ProtectData,
54  klass: type[BaseProtectEntity],
55  model_type: ModelType,
56  descs: Sequence[ProtectEntityDescription],
57  unadopted_descs: Sequence[ProtectEntityDescription] | None = None,
58  ufp_device: ProtectAdoptableDeviceModel | None = None,
59 ) -> list[BaseProtectEntity]:
60  if not descs and not unadopted_descs:
61  return []
62 
63  entities: list[BaseProtectEntity] = []
64  devices = (
65  [ufp_device]
66  if ufp_device is not None
67  else data.get_by_types({model_type}, ignore_unadopted=False)
68  )
69  auth_user = data.api.bootstrap.auth_user
70  for device in devices:
71  if TYPE_CHECKING:
72  assert isinstance(device, ProtectAdoptableDeviceModel)
73  if not device.is_adopted_by_us:
74  if unadopted_descs:
75  for description in unadopted_descs:
76  entities.append(
77  klass(
78  data,
79  device=device,
80  description=description,
81  )
82  )
83  _LOGGER.debug(
84  "Adding %s entity %s for %s",
85  klass.__name__,
86  description.name,
87  device.display_name,
88  )
89  continue
90 
91  can_write = device.can_write(auth_user)
92  for description in descs:
93  if (perms := description.ufp_perm) is not None:
94  if perms is PermRequired.WRITE and not can_write:
95  continue
96  if perms is PermRequired.NO_WRITE and can_write:
97  continue
98  if perms is PermRequired.DELETE and not device.can_delete(auth_user):
99  continue
100 
101  if not description.has_required(device):
102  continue
103 
104  entities.append(
105  klass(
106  data,
107  device=device,
108  description=description,
109  )
110  )
111  _LOGGER.debug(
112  "Adding %s entity %s for %s",
113  klass.__name__,
114  description.name,
115  device.display_name,
116  )
117 
118  return entities
119 
120 
121 _ALL_MODEL_TYPES = (
122  ModelType.CAMERA,
123  ModelType.LIGHT,
124  ModelType.SENSOR,
125  ModelType.VIEWPORT,
126  ModelType.DOORLOCK,
127  ModelType.CHIME,
128 )
129 
130 
131 @callback
133  model_type: ModelType,
134  model_descriptions: dict[ModelType, Sequence[ProtectEntityDescription]] | None,
135  all_descs: Sequence[ProtectEntityDescription] | None,
136 ) -> list[ProtectEntityDescription]:
137  """Combine all the descriptions with descriptions a model type."""
138  descs: list[ProtectEntityDescription] = list(all_descs) if all_descs else []
139  if model_descriptions and (model_descs := model_descriptions.get(model_type)):
140  descs.extend(model_descs)
141  return descs
142 
143 
144 @callback
146  data: ProtectData,
147  klass: type[BaseProtectEntity],
148  model_descriptions: dict[ModelType, Sequence[ProtectEntityDescription]]
149  | None = None,
150  all_descs: Sequence[ProtectEntityDescription] | None = None,
151  unadopted_descs: list[ProtectEntityDescription] | None = None,
152  ufp_device: ProtectAdoptableDeviceModel | None = None,
153 ) -> list[BaseProtectEntity]:
154  """Generate a list of all the device entities."""
155  if ufp_device is None:
156  entities: list[BaseProtectEntity] = []
157  for model_type in _ALL_MODEL_TYPES:
158  descs = _combine_model_descs(model_type, model_descriptions, all_descs)
159  entities.extend(
160  _async_device_entities(data, klass, model_type, descs, unadopted_descs)
161  )
162  return entities
163 
164  device_model_type = ufp_device.model
165  assert device_model_type is not None
166  descs = _combine_model_descs(device_model_type, model_descriptions, all_descs)
167  return _async_device_entities(
168  data, klass, device_model_type, descs, unadopted_descs, ufp_device
169  )
170 
171 
173  """Base class for UniFi protect entities."""
174 
175  device: ProtectDeviceType
176 
177  _attr_should_poll = False
178  _attr_attribution = DEFAULT_ATTRIBUTION
179  _state_attrs: tuple[str, ...] = ("_attr_available",)
180  _attr_has_entity_name = True
181  _async_get_ufp_enabled: Callable[[ProtectAdoptableDeviceModel], bool] | None = None
182 
183  def __init__(
184  self,
185  data: ProtectData,
186  device: ProtectDeviceType,
187  description: EntityDescription | None = None,
188  ) -> None:
189  """Initialize the entity."""
190  super().__init__()
191  self.datadata = data
192  self.devicedevice = device
193 
194  if description is None:
195  self._attr_unique_id_attr_unique_id = self.devicedevice.mac
196  self._attr_name_attr_name = None
197  else:
198  self.entity_descriptionentity_description = description
199  self._attr_unique_id_attr_unique_id = f"{self.device.mac}_{description.key}"
200  if isinstance(description, ProtectEntityDescription):
201  self._async_get_ufp_enabled_async_get_ufp_enabled = description.get_ufp_enabled
202 
203  self._async_set_device_info_async_set_device_info()
204  self._state_getters_state_getters = tuple(
205  partial(attrgetter(attr), self) for attr in self._state_attrs
206  )
207 
208  async def async_update(self) -> None:
209  """Update the entity.
210 
211  Only used by the generic entity update service.
212  """
213  await self.datadata.async_refresh()
214 
215  @callback
216  def _async_set_device_info(self) -> None:
217  """Set device info."""
218 
219  @callback
220  def _async_update_device_from_protect(self, device: ProtectDeviceType) -> None:
221  """Update Entity object from Protect device."""
222  was_available = self._attr_available_attr_available
223  if last_updated_success := self.datadata.last_update_success:
224  self.devicedevice = device
225 
226  if device.model is ModelType.NVR:
227  available = last_updated_success
228  else:
229  if TYPE_CHECKING:
230  assert isinstance(device, ProtectAdoptableDeviceModel)
231  connected = device.state is StateType.CONNECTED or (
232  not device.is_adopted_by_us and device.can_adopt
233  )
234  async_get_ufp_enabled = self._async_get_ufp_enabled_async_get_ufp_enabled
235  enabled = not async_get_ufp_enabled or async_get_ufp_enabled(device)
236  available = last_updated_success and connected and enabled
237 
238  if available != was_available:
239  self._attr_available_attr_available = available
240 
241  @callback
242  def _async_updated_event(self, device: ProtectDeviceType) -> None:
243  """When device is updated from Protect."""
244  previous_attrs = [getter() for getter in self._state_getters_state_getters]
245  self._async_update_device_from_protect_async_update_device_from_protect(device)
246  changed = False
247  for idx, getter in enumerate(self._state_getters_state_getters):
248  if previous_attrs[idx] != getter():
249  changed = True
250  break
251 
252  if changed:
253  if _LOGGER.isEnabledFor(logging.DEBUG):
254  device_name = device.name or ""
255  if hasattr(self, "entity_description") and self.entity_descriptionentity_description.name:
256  device_name += f" {self.entity_description.name}"
257 
258  _LOGGER.debug(
259  "Updating state [%s (%s)] %s -> %s",
260  device_name,
261  device.mac,
262  previous_attrs,
263  tuple((getattr(self, attr)) for attr in self._state_attrs),
264  )
265  self.async_write_ha_stateasync_write_ha_state()
266 
267  async def async_added_to_hass(self) -> None:
268  """When entity is added to hass."""
269  await super().async_added_to_hass()
270  self.async_on_removeasync_on_remove(
271  self.datadata.async_subscribe(self.devicedevice.mac, self._async_updated_event_async_updated_event)
272  )
273  self._async_update_device_from_protect_async_update_device_from_protect(self.devicedevice)
274 
275 
277  """Base class for entities with is_on property."""
278 
279  _state_attrs: tuple[str, ...] = ("_attr_available", "_attr_is_on")
280  _attr_is_on: bool | None
281  entity_description: ProtectEntityDescription
282 
284  self, device: ProtectAdoptableDeviceModel | NVR
285  ) -> None:
286  super()._async_update_device_from_protect(device)
287  was_on = self._attr_is_on_attr_is_on
288  if was_on != (is_on := self.entity_descriptionentity_description.get_ufp_value(device) is True):
289  self._attr_is_on_attr_is_on = is_on
290 
291 
293  """Base class for UniFi protect entities."""
294 
295  @callback
296  def _async_set_device_info(self) -> None:
297  self._attr_device_info_attr_device_info = DeviceInfo(
298  name=self.devicedevice.display_name,
299  manufacturer=DEFAULT_BRAND,
300  model=self.devicedevice.market_name or self.devicedevice.type,
301  model_id=self.devicedevice.type,
302  via_device=(DOMAIN, self.datadata.api.bootstrap.nvr.mac),
303  sw_version=self.devicedevice.firmware_version,
304  connections={(dr.CONNECTION_NETWORK_MAC, self.devicedevice.mac)},
305  configuration_url=self.devicedevice.protect_url,
306  )
307 
308 
310  """Base class for unifi protect entities."""
311 
312  device: NVR
313 
314  @callback
315  def _async_set_device_info(self) -> None:
316  self._attr_device_info_attr_device_info = DeviceInfo(
317  connections={(dr.CONNECTION_NETWORK_MAC, self.devicedevice.mac)},
318  identifiers={(DOMAIN, self.devicedevice.mac)},
319  manufacturer=DEFAULT_BRAND,
320  name=self.devicedevice.display_name,
321  model=self.devicedevice.type,
322  sw_version=str(self.devicedevice.version),
323  configuration_url=self.devicedevice.api.base_url,
324  )
325 
326 
328  """Adds motion event attributes to sensor."""
329 
330  entity_description: ProtectEventMixin
331  _unrecorded_attributes = frozenset({ATTR_EVENT_ID, ATTR_EVENT_SCORE})
332  _event: Event | None = None
333  _event_end: datetime | None = None
334 
335  @callback
336  def _set_event_done(self) -> None:
337  """Clear the event and state."""
338 
339  @callback
340  def _set_event_attrs(self, event: Event) -> None:
341  """Set event attrs."""
342  self._attr_extra_state_attributes_attr_extra_state_attributes = {
343  ATTR_EVENT_ID: event.id,
344  ATTR_EVENT_SCORE: event.score,
345  }
346 
347  @callback
349  # If the event is so short that the detection is received
350  # in the same message as the end of the event we need to write
351  # state and than clear the event and write state again.
352  self.async_write_ha_stateasync_write_ha_state()
353  self._set_event_done_set_event_done()
354  self.async_write_ha_stateasync_write_ha_state()
355 
356  @callback
358  self, prev_event: Event | None, prev_event_end: datetime | None
359  ) -> bool:
360  """Determine if the event has already ended.
361 
362  The event_end time is passed because the prev_event and event object
363  may be the same object, and the uiprotect code will mutate the
364  event object so we need to check the datetime object that was
365  saved from the last time the entity was updated.
366  """
367  return bool(
368  (event := self._event)
369  and event.end
370  and prev_event
371  and prev_event_end
372  and prev_event.id == event.id
373  )
374 
375 
376 @dataclass(frozen=True, kw_only=True)
378  """Base class for protect entity descriptions."""
379 
380  ufp_required_field: str | None = None
381  ufp_value: str | None = None
382  ufp_value_fn: Callable[[T], Any] | None = None
383  ufp_enabled: str | None = None
384  ufp_perm: PermRequired | None = None
385 
386  # The below are set in __post_init__
387  has_required: Callable[[T], bool] = bool
388  get_ufp_enabled: Callable[[T], bool] | None = None
389 
390  def get_ufp_value(self, obj: T) -> Any:
391  """Return value from UniFi Protect device; overridden in __post_init__."""
392  # ufp_value or ufp_value_fn are required, the
393  # RuntimeError is to catch any issues in the code
394  # with new descriptions.
395  raise RuntimeError( # pragma: no cover
396  f"`ufp_value` or `ufp_value_fn` is required for {self}"
397  )
398 
399  def __post_init__(self) -> None:
400  """Override get_ufp_value, has_required, and get_ufp_enabled if required."""
401  _setter = partial(object.__setattr__, self)
402 
403  if (ufp_value := self.ufp_value) is not None:
404  _setter("get_ufp_value", make_value_getter(ufp_value))
405  elif (ufp_value_fn := self.ufp_value_fn) is not None:
406  _setter("get_ufp_value", ufp_value_fn)
407 
408  if (ufp_enabled := self.ufp_enabled) is not None:
409  _setter("get_ufp_enabled", make_enabled_getter(ufp_enabled))
410 
411  if (ufp_required_field := self.ufp_required_field) is not None:
412  _setter("has_required", make_required_getter(ufp_required_field))
413 
414 
415 @dataclass(frozen=True, kw_only=True)
417  """Mixin for events."""
418 
419  ufp_event_obj: str | None = None
420  ufp_obj_type: SmartDetectObjectType | None = None
421 
422  def get_event_obj(self, obj: T) -> Event | None:
423  """Return value from UniFi Protect device."""
424  return None
425 
426  def has_matching_smart(self, event: Event) -> bool:
427  """Determine if the detection type is a match."""
428  return (
429  not (obj_type := self.ufp_obj_type) or obj_type in event.smart_detect_types
430  )
431 
432  def __post_init__(self) -> None:
433  """Override get_event_obj if ufp_event_obj is set."""
434  if (_ufp_event_obj := self.ufp_event_obj) is not None:
435  object.__setattr__(self, "get_event_obj", attrgetter(_ufp_event_obj))
436  super().__post_init__()
437 
438 
439 @dataclass(frozen=True, kw_only=True)
441  """Mixin for settable values."""
442 
443  ufp_set_method: str | None = None
444  ufp_set_method_fn: Callable[[T, Any], Coroutine[Any, Any, None]] | None = None
445 
446  async def ufp_set(self, obj: T, value: Any) -> None:
447  """Set value for UniFi Protect device."""
448  _LOGGER.debug("Setting %s to %s for %s", self.name, value, obj.display_name)
449  if self.ufp_set_method is not None:
450  await getattr(obj, self.ufp_set_method)(value)
451  elif self.ufp_set_method_fn is not None:
452  await self.ufp_set_method_fn(obj, value)
None __init__(self, ProtectData data, ProtectDeviceType device, EntityDescription|None description=None)
Definition: entity.py:188
None _async_update_device_from_protect(self, ProtectDeviceType device)
Definition: entity.py:220
None _async_updated_event(self, ProtectDeviceType device)
Definition: entity.py:242
bool _event_already_ended(self, Event|None prev_event, datetime|None prev_event_end)
Definition: entity.py:359
None _async_update_device_from_protect(self, ProtectAdoptableDeviceModel|NVR device)
Definition: entity.py:285
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
CALLBACK_TYPE async_subscribe(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING)
Definition: client.py:194
list[BaseProtectEntity] async_all_device_entities(ProtectData data, type[BaseProtectEntity] klass, dict[ModelType, Sequence[ProtectEntityDescription]]|None model_descriptions=None, Sequence[ProtectEntityDescription]|None all_descs=None, list[ProtectEntityDescription]|None unadopted_descs=None, ProtectAdoptableDeviceModel|None ufp_device=None)
Definition: entity.py:153
list[BaseProtectEntity] _async_device_entities(ProtectData data, type[BaseProtectEntity] klass, ModelType model_type, Sequence[ProtectEntityDescription] descs, Sequence[ProtectEntityDescription]|None unadopted_descs=None, ProtectAdoptableDeviceModel|None ufp_device=None)
Definition: entity.py:59
list[ProtectEntityDescription] _combine_model_descs(ModelType model_type, dict[ModelType, Sequence[ProtectEntityDescription]]|None model_descriptions, Sequence[ProtectEntityDescription]|None all_descs)
Definition: entity.py:136