Home Assistant Unofficial Reference 2024.12.1
entity.py
Go to the documentation of this file.
1 """Provide entity classes for group entities."""
2 
3 from __future__ import annotations
4 
5 from abc import abstractmethod
6 from collections.abc import Callable, Collection, Mapping
7 import logging
8 from typing import Any
9 
10 from homeassistant.const import ATTR_ASSUMED_STATE, ATTR_ENTITY_ID, STATE_OFF, STATE_ON
11 from homeassistant.core import (
12  CALLBACK_TYPE,
13  Event,
14  EventStateChangedData,
15  HomeAssistant,
16  State,
17  callback,
18  split_entity_id,
19 )
20 from homeassistant.helpers import start
21 from homeassistant.helpers.entity import Entity, async_generate_entity_id
22 from homeassistant.helpers.entity_component import EntityComponent
23 from homeassistant.helpers.event import async_track_state_change_event
24 
25 from .const import ATTR_AUTO, ATTR_ORDER, DATA_COMPONENT, DOMAIN, GROUP_ORDER, REG_KEY
26 from .registry import GroupIntegrationRegistry, SingleStateType
27 
28 ENTITY_ID_FORMAT = DOMAIN + ".{}"
29 
30 _PACKAGE_LOGGER = logging.getLogger(__package__)
31 
32 _LOGGER = logging.getLogger(__name__)
33 
34 
36  """Representation of a Group of entities."""
37 
38  _unrecorded_attributes = frozenset({ATTR_ENTITY_ID})
39 
40  _attr_should_poll = False
41  _entity_ids: list[str]
42 
43  @callback
45  self,
46  preview_callback: Callable[[str, Mapping[str, Any]], None],
47  ) -> CALLBACK_TYPE:
48  """Render a preview."""
49 
50  for entity_id in self._entity_ids:
51  if (state := self.hasshass.states.get(entity_id)) is None:
52  continue
53  self.async_update_supported_featuresasync_update_supported_features(entity_id, state)
54 
55  @callback
56  def async_state_changed_listener(
57  event: Event[EventStateChangedData] | None,
58  ) -> None:
59  """Handle child updates."""
60  self.async_update_group_stateasync_update_group_state()
61  if event:
62  self.async_update_supported_featuresasync_update_supported_features(
63  event.data["entity_id"], event.data["new_state"]
64  )
65  calculated_state = self._async_calculate_state_async_calculate_state()
66  preview_callback(calculated_state.state, calculated_state.attributes)
67 
68  async_state_changed_listener(None)
70  self.hasshass, self._entity_ids, async_state_changed_listener
71  )
72 
73  async def async_added_to_hass(self) -> None:
74  """Register listeners."""
75  for entity_id in self._entity_ids:
76  if (state := self.hasshass.states.get(entity_id)) is None:
77  continue
78  self.async_update_supported_featuresasync_update_supported_features(entity_id, state)
79 
80  @callback
81  def async_state_changed_listener(
82  event: Event[EventStateChangedData],
83  ) -> None:
84  """Handle child updates."""
85  self.async_set_contextasync_set_context(event.context)
86  self.async_update_supported_featuresasync_update_supported_features(
87  event.data["entity_id"], event.data["new_state"]
88  )
89  self.async_defer_or_update_ha_stateasync_defer_or_update_ha_state()
90 
91  self.async_on_removeasync_on_remove(
93  self.hasshass, self._entity_ids, async_state_changed_listener
94  )
95  )
96  self.async_on_removeasync_on_remove(start.async_at_start(self.hasshass, self._update_at_start_update_at_start))
97 
98  @callback
99  def _update_at_start(self, _: HomeAssistant) -> None:
100  """Update the group state at start."""
101  self.async_update_group_stateasync_update_group_state()
102  self.async_write_ha_stateasync_write_ha_state()
103 
104  @callback
106  """Only update once at start."""
107  if not self.hasshass.is_running:
108  return
109 
110  self.async_update_group_stateasync_update_group_state()
111  self.async_write_ha_stateasync_write_ha_state()
112 
113  @abstractmethod
114  @callback
115  def async_update_group_state(self) -> None:
116  """Abstract method to update the entity."""
117 
118  @callback
120  self,
121  entity_id: str,
122  new_state: State | None,
123  ) -> None:
124  """Update dictionaries with supported features."""
125 
126 
127 class Group(Entity):
128  """Track a group of entity ids."""
129 
130  _unrecorded_attributes = frozenset({ATTR_ENTITY_ID, ATTR_ORDER, ATTR_AUTO})
131 
132  _attr_should_poll = False
133  tracking: tuple[str, ...]
134  trackable: tuple[str, ...]
135  single_state_type_key: SingleStateType | None
136  _registry: GroupIntegrationRegistry
137 
138  def __init__(
139  self,
140  hass: HomeAssistant,
141  name: str,
142  *,
143  created_by_service: bool,
144  entity_ids: Collection[str] | None,
145  icon: str | None,
146  mode: bool | None,
147  order: int | None,
148  ) -> None:
149  """Initialize a group.
150 
151  This Object has factory function for creation.
152  """
153  self.hasshasshass = hass
154  self._attr_name_attr_name = name
155  self._state_state: str | None = None
156  self._attr_icon_attr_icon = icon
157  self._entity_ids_entity_ids = entity_ids
158  self._on_off_on_off: dict[str, bool] = {}
159  self._assumed_assumed: dict[str, bool] = {}
160  self._on_states_on_states: set[str] = set()
161  self.created_by_servicecreated_by_service = created_by_service
162  self.modemode = any
163  if mode:
164  self.modemode = all
165  self._order_order = order
166  self._assumed_state_assumed_state = False
167  self._async_unsub_state_changed_async_unsub_state_changed: CALLBACK_TYPE | None = None
168 
169  @staticmethod
170  @callback
172  hass: HomeAssistant,
173  name: str,
174  *,
175  created_by_service: bool,
176  entity_ids: Collection[str] | None,
177  icon: str | None,
178  mode: bool | None,
179  object_id: str | None,
180  order: int | None,
181  ) -> Group:
182  """Create a group entity."""
183  if order is None:
184  hass.data.setdefault(GROUP_ORDER, 0)
185  order = hass.data[GROUP_ORDER]
186  # Keep track of the group order without iterating
187  # every state in the state machine every time
188  # we setup a new group
189  hass.data[GROUP_ORDER] += 1
190 
191  group = Group(
192  hass,
193  name,
194  created_by_service=created_by_service,
195  entity_ids=entity_ids,
196  icon=icon,
197  mode=mode,
198  order=order,
199  )
200 
201  group.entity_id = async_generate_entity_id(
202  ENTITY_ID_FORMAT, object_id or name, hass=hass
203  )
204 
205  return group
206 
207  @staticmethod
209  hass: HomeAssistant,
210  name: str,
211  *,
212  created_by_service: bool,
213  entity_ids: Collection[str] | None,
214  icon: str | None,
215  mode: bool | None,
216  object_id: str | None,
217  order: int | None,
218  ) -> Group:
219  """Initialize a group.
220 
221  This method must be run in the event loop.
222  """
223  group = Group.async_create_group_entity(
224  hass,
225  name,
226  created_by_service=created_by_service,
227  entity_ids=entity_ids,
228  icon=icon,
229  mode=mode,
230  object_id=object_id,
231  order=order,
232  )
233 
234  # If called before the platform async_setup is called (test cases)
235  await async_get_component(hass).async_add_entities([group])
236  return group
237 
238  def set_name(self, value: str) -> None:
239  """Set Group name."""
240  self._attr_name_attr_name = value
241 
242  @property
243  def state(self) -> str | None:
244  """Return the state of the group."""
245  return self._state_state
246 
247  def set_icon(self, value: str | None) -> None:
248  """Set Icon for group."""
249  self._attr_icon_attr_icon = value
250 
251  @property
252  def extra_state_attributes(self) -> dict[str, Any]:
253  """Return the state attributes for the group."""
254  data = {ATTR_ENTITY_ID: self.trackingtracking, ATTR_ORDER: self._order_order}
255  if self.created_by_servicecreated_by_service:
256  data[ATTR_AUTO] = True
257 
258  return data
259 
260  @property
261  def assumed_state(self) -> bool:
262  """Test if any member has an assumed state."""
263  return self._assumed_state_assumed_state
264 
265  @callback
267  self, entity_ids: Collection[str] | None
268  ) -> None:
269  """Update the member entity IDs.
270 
271  This method must be run in the event loop.
272  """
273  self._async_stop_async_stop()
274  self._set_tracked_set_tracked(entity_ids)
275  self._reset_tracked_state_reset_tracked_state()
276  self._async_start_async_start()
277 
278  def _set_tracked(self, entity_ids: Collection[str] | None) -> None:
279  """Tuple of entities to be tracked."""
280  # tracking are the entities we want to track
281  # trackable are the entities we actually watch
282 
283  if not entity_ids:
284  self.trackingtracking = ()
285  self.trackabletrackable = ()
286  self.single_state_type_keysingle_state_type_key = None
287  return
288 
289  registry = self._registry_registry
290  excluded_domains = registry.exclude_domains
291 
292  tracking: list[str] = []
293  trackable: list[str] = []
294  single_state_type_set: set[SingleStateType] = set()
295  for ent_id in entity_ids:
296  ent_id_lower = ent_id.lower()
297  domain = split_entity_id(ent_id_lower)[0]
298  tracking.append(ent_id_lower)
299  if domain not in excluded_domains:
300  trackable.append(ent_id_lower)
301  if domain in registry.state_group_mapping:
302  single_state_type_set.add(registry.state_group_mapping[domain])
303  elif domain == DOMAIN:
304  # If a group contains another group we check if that group
305  # has a specific single state type
306  if ent_id in registry.state_group_mapping:
307  single_state_type_set.add(registry.state_group_mapping[ent_id])
308  else:
309  single_state_type_set.add(SingleStateType(STATE_ON, STATE_OFF))
310 
311  if len(single_state_type_set) == 1:
312  self.single_state_type_keysingle_state_type_key = next(iter(single_state_type_set))
313  # To support groups with nested groups we store the state type
314  # per group entity_id if there is a single state type
315  registry.state_group_mapping[self.entity_identity_id] = self.single_state_type_keysingle_state_type_key
316  else:
317  self.single_state_type_keysingle_state_type_key = None
318 
319  self.trackabletrackable = tuple(trackable)
320  self.trackingtracking = tuple(tracking)
321 
322  @callback
323  def _async_deregister(self) -> None:
324  """Deregister group entity from the registry."""
325  registry = self._registry_registry
326  if self.entity_identity_id in registry.state_group_mapping:
327  registry.state_group_mapping.pop(self.entity_identity_id)
328 
329  @callback
330  def _async_start(self, _: HomeAssistant | None = None) -> None:
331  """Start tracking members and write state."""
332  self._reset_tracked_state_reset_tracked_state()
333  self._async_start_tracking_async_start_tracking()
334  self.async_write_ha_stateasync_write_ha_state()
335 
336  @callback
337  def _async_start_tracking(self) -> None:
338  """Start tracking members.
339 
340  This method must be run in the event loop.
341  """
342  if self.trackabletrackable and self._async_unsub_state_changed_async_unsub_state_changed is None:
344  self.hasshasshass, self.trackabletrackable, self._async_state_changed_listener_async_state_changed_listener
345  )
346 
347  self._async_update_group_state_async_update_group_state()
348 
349  @callback
350  def _async_stop(self) -> None:
351  """Unregister the group from Home Assistant.
352 
353  This method must be run in the event loop.
354  """
355  if self._async_unsub_state_changed_async_unsub_state_changed:
356  self._async_unsub_state_changed_async_unsub_state_changed()
357  self._async_unsub_state_changed_async_unsub_state_changed = None
358 
359  @callback
360  def async_update_group_state(self) -> None:
361  """Query all members and determine current group state."""
362  self._state_state = None
363  self._async_update_group_state_async_update_group_state()
364 
365  async def async_added_to_hass(self) -> None:
366  """Handle addition to Home Assistant."""
367  self._registry_registry = self.hasshasshass.data[REG_KEY]
368  self._set_tracked_set_tracked(self._entity_ids_entity_ids)
369  self.async_on_removeasync_on_remove(start.async_at_start(self.hasshasshass, self._async_start_async_start))
370  self.async_on_removeasync_on_remove(self._async_deregister_async_deregister)
371 
372  async def async_will_remove_from_hass(self) -> None:
373  """Handle removal from Home Assistant."""
374  self._async_stop_async_stop()
375 
377  self, event: Event[EventStateChangedData]
378  ) -> None:
379  """Respond to a member state changing.
380 
381  This method must be run in the event loop.
382  """
383  # removed
384  if self._async_unsub_state_changed_async_unsub_state_changed is None:
385  return
386 
387  self.async_set_contextasync_set_context(event.context)
388 
389  if (new_state := event.data["new_state"]) is None:
390  # The state was removed from the state machine
391  self._reset_tracked_state_reset_tracked_state()
392 
393  self._async_update_group_state_async_update_group_state(new_state)
394  self.async_write_ha_stateasync_write_ha_state()
395 
396  def _reset_tracked_state(self) -> None:
397  """Reset tracked state."""
398  self._on_off_on_off = {}
399  self._assumed_assumed = {}
400  self._on_states_on_states = set()
401 
402  for entity_id in self.trackabletrackable:
403  if (state := self.hasshasshass.states.get(entity_id)) is not None:
404  self._see_state_see_state(state)
405 
406  def _see_state(self, new_state: State) -> None:
407  """Keep track of the state."""
408  entity_id = new_state.entity_id
409  domain = new_state.domain
410  state = new_state.state
411  registry = self._registry_registry
412  self._assumed_assumed[entity_id] = bool(new_state.attributes.get(ATTR_ASSUMED_STATE))
413 
414  if domain not in registry.on_states_by_domain:
415  # Handle the group of a group case
416  if state in registry.on_off_mapping:
417  self._on_states_on_states.add(state)
418  elif state in registry.off_on_mapping:
419  self._on_states_on_states.add(registry.off_on_mapping[state])
420  self._on_off_on_off[entity_id] = state in registry.on_off_mapping
421  else:
422  entity_on_state = registry.on_states_by_domain[domain]
423  if domain in registry.on_states_by_domain:
424  self._on_states_on_states.update(entity_on_state)
425  self._on_off_on_off[entity_id] = state in entity_on_state
426 
427  @callback
428  def _async_update_group_state(self, tr_state: State | None = None) -> None:
429  """Update group state.
430 
431  Optionally you can provide the only state changed since last update
432  allowing this method to take shortcuts.
433 
434  This method must be run in the event loop.
435  """
436  # To store current states of group entities. Might not be needed.
437  if tr_state:
438  self._see_state_see_state(tr_state)
439 
440  if not self._on_off_on_off:
441  return
442 
443  if (
444  tr_state is None
445  or self._assumed_state_assumed_state
446  and not tr_state.attributes.get(ATTR_ASSUMED_STATE)
447  ):
448  self._assumed_state_assumed_state = self.modemode(self._assumed_assumed.values())
449 
450  elif tr_state.attributes.get(ATTR_ASSUMED_STATE):
451  self._assumed_state_assumed_state = True
452 
453  num_on_states = len(self._on_states_on_states)
454  # If all the entity domains we are tracking
455  # have the same on state we use this state
456  # and its hass.data[REG_KEY].on_off_mapping to off
457  if num_on_states == 1:
458  on_state = next(iter(self._on_states_on_states))
459  # If we do not have an on state for any domains
460  # we use None (which will be STATE_UNKNOWN)
461  elif num_on_states == 0:
462  self._state_state = None
463  return
464  if self.single_state_type_keysingle_state_type_key:
465  on_state = self.single_state_type_keysingle_state_type_key.on_state
466  # If the entity domains have more than one
467  # on state, we use STATE_ON/STATE_OFF
468  else:
469  on_state = STATE_ON
470  group_is_on = self.modemode(self._on_off_on_off.values())
471  if group_is_on:
472  self._state_state = on_state
473  elif self.single_state_type_keysingle_state_type_key:
474  self._state_state = self.single_state_type_keysingle_state_type_key.off_state
475  else:
476  self._state_state = STATE_OFF
477 
478 
479 def async_get_component(hass: HomeAssistant) -> EntityComponent[Group]:
480  """Get the group entity component."""
481  if (component := hass.data.get(DATA_COMPONENT)) is None:
482  component = hass.data[DATA_COMPONENT] = EntityComponent[Group](
483  _PACKAGE_LOGGER, DOMAIN, hass
484  )
485  return component
None _update_at_start(self, HomeAssistant _)
Definition: entity.py:99
CALLBACK_TYPE async_start_preview(self, Callable[[str, Mapping[str, Any]], None] preview_callback)
Definition: entity.py:47
None async_update_supported_features(self, str entity_id, State|None new_state)
Definition: entity.py:123
None _see_state(self, State new_state)
Definition: entity.py:406
dict[str, Any] extra_state_attributes(self)
Definition: entity.py:252
None async_update_tracked_entity_ids(self, Collection[str]|None entity_ids)
Definition: entity.py:268
None set_icon(self, str|None value)
Definition: entity.py:247
None _async_update_group_state(self, State|None tr_state=None)
Definition: entity.py:428
None _async_state_changed_listener(self, Event[EventStateChangedData] event)
Definition: entity.py:378
None _set_tracked(self, Collection[str]|None entity_ids)
Definition: entity.py:278
Group async_create_group(HomeAssistant hass, str name, *bool created_by_service, Collection[str]|None entity_ids, str|None icon, bool|None mode, str|None object_id, int|None order)
Definition: entity.py:218
Group async_create_group_entity(HomeAssistant hass, str name, *bool created_by_service, Collection[str]|None entity_ids, str|None icon, bool|None mode, str|None object_id, int|None order)
Definition: entity.py:181
None _async_start(self, HomeAssistant|None _=None)
Definition: entity.py:330
None __init__(self, HomeAssistant hass, str name, *bool created_by_service, Collection[str]|None entity_ids, str|None icon, bool|None mode, int|None order)
Definition: entity.py:148
CalculatedState _async_calculate_state(self)
Definition: entity.py:1059
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
None async_set_context(self, Context context)
Definition: entity.py:937
bool add(self, _T matcher)
Definition: match.py:185
EntityComponent[Group] async_get_component(HomeAssistant hass)
Definition: entity.py:479
tuple[str, str] split_entity_id(str entity_id)
Definition: core.py:214
str async_generate_entity_id(str entity_id_format, str|None name, Iterable[str]|None current_ids=None, HomeAssistant|None hass=None)
Definition: entity.py:119
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)
Definition: event.py:314