Home Assistant Unofficial Reference 2024.12.1
template_entity.py
Go to the documentation of this file.
1 """TemplateEntity utility class."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Mapping
6 import contextlib
7 import itertools
8 import logging
9 from typing import Any, cast
10 
11 from propcache import under_cached_property
12 import voluptuous as vol
13 
14 from homeassistant.components.blueprint import CONF_USE_BLUEPRINT
15 from homeassistant.const import (
16  CONF_ENTITY_PICTURE_TEMPLATE,
17  CONF_FRIENDLY_NAME,
18  CONF_ICON,
19  CONF_ICON_TEMPLATE,
20  CONF_NAME,
21  CONF_PATH,
22  CONF_VARIABLES,
23  STATE_UNKNOWN,
24 )
25 from homeassistant.core import (
26  CALLBACK_TYPE,
27  Context,
28  Event,
29  EventStateChangedData,
30  HomeAssistant,
31  State,
32  callback,
33  validate_state,
34 )
35 from homeassistant.exceptions import TemplateError
37 from homeassistant.helpers.entity import Entity
38 from homeassistant.helpers.event import (
39  TrackTemplate,
40  TrackTemplateResult,
41  TrackTemplateResultInfo,
42  async_track_template_result,
43 )
44 from homeassistant.helpers.script import Script, _VarsType
45 from homeassistant.helpers.start import async_at_start
47  Template,
48  TemplateStateFromEntityId,
49  result_as_boolean,
50 )
52  TEMPLATE_ENTITY_BASE_SCHEMA,
53  make_template_entity_base_schema,
54 )
55 from homeassistant.helpers.typing import ConfigType
56 
57 from .const import (
58  CONF_ATTRIBUTE_TEMPLATES,
59  CONF_ATTRIBUTES,
60  CONF_AVAILABILITY,
61  CONF_AVAILABILITY_TEMPLATE,
62  CONF_PICTURE,
63 )
64 
65 _LOGGER = logging.getLogger(__name__)
66 
67 TEMPLATE_ENTITY_AVAILABILITY_SCHEMA = vol.Schema(
68  {
69  vol.Optional(CONF_AVAILABILITY): cv.template,
70  }
71 )
72 
73 TEMPLATE_ENTITY_ICON_SCHEMA = vol.Schema(
74  {
75  vol.Optional(CONF_ICON): cv.template,
76  }
77 )
78 
79 TEMPLATE_ENTITY_COMMON_SCHEMA = vol.Schema(
80  {
81  vol.Optional(CONF_ATTRIBUTES): vol.Schema({cv.string: cv.template}),
82  vol.Optional(CONF_AVAILABILITY): cv.template,
83  vol.Optional(CONF_VARIABLES): cv.SCRIPT_VARIABLES_SCHEMA,
84  }
85 ).extend(TEMPLATE_ENTITY_BASE_SCHEMA.schema)
86 
87 
88 def make_template_entity_common_schema(default_name: str) -> vol.Schema:
89  """Return a schema with default name."""
90  return vol.Schema(
91  {
92  vol.Optional(CONF_ATTRIBUTES): vol.Schema({cv.string: cv.template}),
93  vol.Optional(CONF_AVAILABILITY): cv.template,
94  }
95  ).extend(make_template_entity_base_schema(default_name).schema)
96 
97 
98 TEMPLATE_ENTITY_ATTRIBUTES_SCHEMA_LEGACY = vol.Schema(
99  {
100  vol.Optional(CONF_ATTRIBUTE_TEMPLATES, default={}): vol.Schema(
101  {cv.string: cv.template}
102  ),
103  }
104 )
105 
106 TEMPLATE_ENTITY_AVAILABILITY_SCHEMA_LEGACY = vol.Schema(
107  {
108  vol.Optional(CONF_AVAILABILITY_TEMPLATE): cv.template,
109  }
110 )
111 
112 TEMPLATE_ENTITY_COMMON_SCHEMA_LEGACY = vol.Schema(
113  {
114  vol.Optional(CONF_ENTITY_PICTURE_TEMPLATE): cv.template,
115  vol.Optional(CONF_ICON_TEMPLATE): cv.template,
116  }
117 ).extend(TEMPLATE_ENTITY_AVAILABILITY_SCHEMA_LEGACY.schema)
118 
119 
120 LEGACY_FIELDS = {
121  CONF_ICON_TEMPLATE: CONF_ICON,
122  CONF_ENTITY_PICTURE_TEMPLATE: CONF_PICTURE,
123  CONF_AVAILABILITY_TEMPLATE: CONF_AVAILABILITY,
124  CONF_ATTRIBUTE_TEMPLATES: CONF_ATTRIBUTES,
125  CONF_FRIENDLY_NAME: CONF_NAME,
126 }
127 
128 
130  hass: HomeAssistant,
131  entity_cfg: dict[str, Any],
132  extra_legacy_fields: dict[str, str] | None = None,
133 ) -> dict[str, Any]:
134  """Rewrite legacy config."""
135  entity_cfg = {**entity_cfg}
136  if extra_legacy_fields is None:
137  extra_legacy_fields = {}
138 
139  for from_key, to_key in itertools.chain(
140  LEGACY_FIELDS.items(), extra_legacy_fields.items()
141  ):
142  if from_key not in entity_cfg or to_key in entity_cfg:
143  continue
144 
145  val = entity_cfg.pop(from_key)
146  if isinstance(val, str):
147  val = Template(val, hass)
148  entity_cfg[to_key] = val
149 
150  if CONF_NAME in entity_cfg and isinstance(entity_cfg[CONF_NAME], str):
151  entity_cfg[CONF_NAME] = Template(entity_cfg[CONF_NAME], hass)
152 
153  return entity_cfg
154 
155 
157  """Attribute value linked to template result."""
158 
159  def __init__(
160  self,
161  entity: Entity,
162  attribute: str,
163  template: Template,
164  validator: Callable[[Any], Any] | None = None,
165  on_update: Callable[[Any], None] | None = None,
166  none_on_template_error: bool | None = False,
167  ) -> None:
168  """Template attribute."""
169  self._entity_entity = entity
170  self._attribute_attribute = attribute
171  self.templatetemplate = template
172  self.validatorvalidator = validator
173  self.on_updateon_update = on_update
174  self.async_updateasync_update = None
175  self.none_on_template_errornone_on_template_error = none_on_template_error
176 
177  @callback
178  def async_setup(self) -> None:
179  """Config update path for the attribute."""
180  if self.on_updateon_update:
181  return
182 
183  if not hasattr(self._entity_entity, self._attribute_attribute):
184  raise AttributeError(f"Attribute '{self._attribute}' does not exist.")
185 
186  self.on_updateon_update = self._default_update_default_update
187 
188  @callback
189  def _default_update(self, result: str | TemplateError) -> None:
190  attr_result = None if isinstance(result, TemplateError) else result
191  setattr(self._entity_entity, self._attribute_attribute, attr_result)
192 
193  @callback
195  self,
196  event: Event[EventStateChangedData] | None,
197  template: Template,
198  last_result: str | TemplateError | None,
199  result: str | TemplateError,
200  ) -> None:
201  """Handle a template result event callback."""
202  if isinstance(result, TemplateError):
203  _LOGGER.error(
204  (
205  "TemplateError('%s') "
206  "while processing template '%s' "
207  "for attribute '%s' in entity '%s'"
208  ),
209  result,
210  self.templatetemplate,
211  self._attribute_attribute,
212  self._entity_entity.entity_id,
213  )
214  if self.none_on_template_errornone_on_template_error:
215  self._default_update_default_update(result)
216  else:
217  assert self.on_updateon_update
218  self.on_updateon_update(result)
219  return
220 
221  if not self.validatorvalidator:
222  assert self.on_updateon_update
223  self.on_updateon_update(result)
224  return
225 
226  try:
227  validated = self.validatorvalidator(result)
228  except vol.Invalid as ex:
229  _LOGGER.error(
230  (
231  "Error validating template result '%s' "
232  "from template '%s' "
233  "for attribute '%s' in entity %s "
234  "validation message '%s'"
235  ),
236  result,
237  self.templatetemplate,
238  self._attribute_attribute,
239  self._entity_entity.entity_id,
240  ex.msg,
241  )
242  assert self.on_updateon_update
243  self.on_updateon_update(None)
244  return
245 
246  assert self.on_updateon_update
247  self.on_updateon_update(validated)
248  return
249 
250 
251 class TemplateEntity(Entity): # pylint: disable=hass-enforce-class-module
252  """Entity that uses templates to calculate attributes."""
253 
254  _attr_available = True
255  _attr_entity_picture = None
256  _attr_icon = None
257 
258  def __init__(
259  self,
260  hass: HomeAssistant,
261  *,
262  availability_template: Template | None = None,
263  icon_template: Template | None = None,
264  entity_picture_template: Template | None = None,
265  attribute_templates: dict[str, Template] | None = None,
266  config: ConfigType | None = None,
267  fallback_name: str | None = None,
268  unique_id: str | None = None,
269  ) -> None:
270  """Template Entity."""
271  self._template_attrs: dict[Template, list[_TemplateAttribute]] = {}
272  self._template_result_info_template_result_info: TrackTemplateResultInfo | None = None
273  self._attr_extra_state_attributes_attr_extra_state_attributes = {}
274  self._self_ref_update_count_self_ref_update_count = 0
275  self._attr_unique_id_attr_unique_id = unique_id
276  self._preview_callback_preview_callback: (
277  Callable[
278  [
279  str | None,
280  dict[str, Any] | None,
281  dict[str, bool | set[str]] | None,
282  str | None,
283  ],
284  None,
285  ]
286  | None
287  ) = None
288  if config is None:
289  self._attribute_templates_attribute_templates = attribute_templates
290  self._availability_template_availability_template = availability_template
291  self._icon_template_icon_template = icon_template
292  self._entity_picture_template_entity_picture_template = entity_picture_template
293  self._friendly_name_template_friendly_name_template = None
294  self._run_variables_run_variables = {}
295  self._blueprint_inputs_blueprint_inputs = None
296  else:
297  self._attribute_templates_attribute_templates = config.get(CONF_ATTRIBUTES)
298  self._availability_template_availability_template = config.get(CONF_AVAILABILITY)
299  self._icon_template_icon_template = config.get(CONF_ICON)
300  self._entity_picture_template_entity_picture_template = config.get(CONF_PICTURE)
301  self._friendly_name_template_friendly_name_template = config.get(CONF_NAME)
302  self._run_variables_run_variables = config.get(CONF_VARIABLES, {})
303  self._blueprint_inputs_blueprint_inputs = config.get("raw_blueprint_inputs")
304 
305  class DummyState(State):
306  """None-state for template entities not yet added to the state machine."""
307 
308  def __init__(self) -> None:
309  """Initialize a new state."""
310  super().__init__("unknown.unknown", STATE_UNKNOWN)
311  self.entity_identity_identity_id = None # type: ignore[assignment]
312 
313  @under_cached_property
314  def name(self) -> str:
315  """Name of this state."""
316  return "<None>"
317 
318  variables = {"this": DummyState()}
319 
320  # Try to render the name as it can influence the entity ID
321  self._attr_name_attr_name = fallback_name
322  if self._friendly_name_template_friendly_name_template:
323  with contextlib.suppress(TemplateError):
324  self._attr_name_attr_name = self._friendly_name_template_friendly_name_template.async_render(
325  variables=variables, parse_result=False
326  )
327 
328  # Templates will not render while the entity is unavailable, try to render the
329  # icon and picture templates.
330  if self._entity_picture_template_entity_picture_template:
331  with contextlib.suppress(TemplateError):
332  self._attr_entity_picture_attr_entity_picture = self._entity_picture_template_entity_picture_template.async_render(
333  variables=variables, parse_result=False
334  )
335 
336  if self._icon_template_icon_template:
337  with contextlib.suppress(TemplateError):
338  self._attr_icon_attr_icon = self._icon_template_icon_template.async_render(
339  variables=variables, parse_result=False
340  )
341 
342  @callback
343  def _render_variables(self) -> dict:
344  if isinstance(self._run_variables_run_variables, dict):
345  return self._run_variables_run_variables
346 
347  return self._run_variables_run_variables.async_render(
348  self.hasshass,
349  {
350  "this": TemplateStateFromEntityId(self.hasshass, self.entity_identity_identity_id),
351  },
352  )
353 
354  @callback
355  def _update_available(self, result: str | TemplateError) -> None:
356  if isinstance(result, TemplateError):
357  self._attr_available_attr_available_attr_available = True
358  return
359 
360  self._attr_available_attr_available_attr_available = result_as_boolean(result)
361 
362  @callback
363  def _update_state(self, result: str | TemplateError) -> None:
364  if self._availability_template_availability_template:
365  return
366 
367  self._attr_available_attr_available_attr_available = not isinstance(result, TemplateError)
368 
369  @callback
371  self, attribute_key: str, attribute_template: Template
372  ) -> None:
373  """Create a template tracker for the attribute."""
374 
375  def _update_attribute(result: str | TemplateError) -> None:
376  attr_result = None if isinstance(result, TemplateError) else result
377  self._attr_extra_state_attributes_attr_extra_state_attributes[attribute_key] = attr_result
378 
379  self.add_template_attributeadd_template_attribute(
380  attribute_key, attribute_template, None, _update_attribute
381  )
382 
383  @property
384  def referenced_blueprint(self) -> str | None:
385  """Return referenced blueprint or None."""
386  if self._blueprint_inputs_blueprint_inputs is None:
387  return None
388  return cast(str, self._blueprint_inputs_blueprint_inputs[CONF_USE_BLUEPRINT][CONF_PATH])
389 
391  self,
392  attribute: str,
393  template: Template,
394  validator: Callable[[Any], Any] | None = None,
395  on_update: Callable[[Any], None] | None = None,
396  none_on_template_error: bool = False,
397  ) -> None:
398  """Call in the constructor to add a template linked to a attribute.
399 
400  Parameters
401  ----------
402  attribute
403  The name of the attribute to link to. This attribute must exist
404  unless a custom on_update method is supplied.
405  template
406  The template to calculate.
407  validator
408  Validator function to parse the result and ensure it's valid.
409  on_update
410  Called to store the template result rather than storing it
411  the supplied attribute. Passed the result of the validator, or None
412  if the template or validator resulted in an error.
413  none_on_template_error
414  If True, the attribute will be set to None if the template errors.
415 
416  """
417  if self.hasshass is None:
418  raise ValueError("hass cannot be None")
419  if template.hass is None:
420  raise ValueError("template.hass cannot be None")
421  template_attribute = _TemplateAttribute(
422  self, attribute, template, validator, on_update, none_on_template_error
423  )
424  self._template_attrs.setdefault(template, [])
425  self._template_attrs[template].append(template_attribute)
426 
427  @callback
429  self,
430  event: Event[EventStateChangedData] | None,
431  updates: list[TrackTemplateResult],
432  ) -> None:
433  """Call back the results to the attributes."""
434  if event:
435  self.async_set_contextasync_set_context(event.context)
436 
437  entity_id = event and event.data["entity_id"]
438 
439  if entity_id and entity_id == self.entity_identity_identity_id:
440  self._self_ref_update_count_self_ref_update_count += 1
441  else:
442  self._self_ref_update_count_self_ref_update_count = 0
443 
444  if self._self_ref_update_count_self_ref_update_count > len(self._template_attrs):
445  for update in updates:
446  _LOGGER.warning(
447  (
448  "Template loop detected while processing event: %s, skipping"
449  " template render for Template[%s]"
450  ),
451  event,
452  update.template.template,
453  )
454  return
455 
456  for update in updates:
457  for template_attr in self._template_attrs[update.template]:
458  template_attr.handle_result(
459  event, update.template, update.last_result, update.result
460  )
461 
462  if not self._preview_callback_preview_callback:
463  self.async_write_ha_stateasync_write_ha_state()
464  return
465 
466  try:
467  calculated_state = self._async_calculate_state_async_calculate_state()
468  validate_state(calculated_state.state)
469  except Exception as err: # noqa: BLE001
470  self._preview_callback_preview_callback(None, None, None, str(err))
471  else:
472  assert self._template_result_info_template_result_info
473  self._preview_callback_preview_callback(
474  calculated_state.state,
475  calculated_state.attributes,
476  self._template_result_info_template_result_info.listeners,
477  None,
478  )
479 
480  @callback
482  self,
483  _hass: HomeAssistant | None,
484  log_fn: Callable[[int, str], None] | None = None,
485  ) -> None:
486  template_var_tups: list[TrackTemplate] = []
487  has_availability_template = False
488 
489  variables = {
490  "this": TemplateStateFromEntityId(self.hasshass, self.entity_identity_identity_id),
491  **self._render_variables_render_variables(),
492  }
493 
494  for template, attributes in self._template_attrs.items():
495  template_var_tup = TrackTemplate(template, variables)
496  is_availability_template = False
497  for attribute in attributes:
498  if attribute._attribute == "_attr_available": # noqa: SLF001
499  has_availability_template = True
500  is_availability_template = True
501  attribute.async_setup()
502  # Insert the availability template first in the list
503  if is_availability_template:
504  template_var_tups.insert(0, template_var_tup)
505  else:
506  template_var_tups.append(template_var_tup)
507 
508  result_info = async_track_template_result(
509  self.hasshass,
510  template_var_tups,
511  self._handle_results_handle_results,
512  log_fn=log_fn,
513  has_super_template=has_availability_template,
514  )
515  self.async_on_removeasync_on_remove(result_info.async_remove)
516  self._template_result_info_template_result_info = result_info
517  result_info.async_refresh()
518 
519  @callback
520  def _async_setup_templates(self) -> None:
521  """Set up templates."""
522  if self._availability_template_availability_template is not None:
523  self.add_template_attributeadd_template_attribute(
524  "_attr_available",
525  self._availability_template_availability_template,
526  None,
527  self._update_available_update_available,
528  )
529  if self._attribute_templates_attribute_templates is not None:
530  for key, value in self._attribute_templates_attribute_templates.items():
531  self._add_attribute_template_add_attribute_template(key, value)
532  if self._icon_template_icon_template is not None:
533  self.add_template_attributeadd_template_attribute(
534  "_attr_icon", self._icon_template_icon_template, vol.Or(cv.whitespace, cv.icon)
535  )
536  if self._entity_picture_template_entity_picture_template is not None:
537  self.add_template_attributeadd_template_attribute(
538  "_attr_entity_picture", self._entity_picture_template_entity_picture_template, cv.string
539  )
540  if (
541  self._friendly_name_template_friendly_name_template is not None
542  and not self._friendly_name_template_friendly_name_template.is_static
543  ):
544  self.add_template_attributeadd_template_attribute(
545  "_attr_name", self._friendly_name_template_friendly_name_template, cv.string
546  )
547 
548  @callback
550  self,
551  preview_callback: Callable[
552  [
553  str | None,
554  Mapping[str, Any] | None,
555  dict[str, bool | set[str]] | None,
556  str | None,
557  ],
558  None,
559  ],
560  ) -> CALLBACK_TYPE:
561  """Render a preview."""
562 
563  def log_template_error(level: int, msg: str) -> None:
564  preview_callback(None, None, None, msg)
565 
566  self._preview_callback_preview_callback = preview_callback
567  self._async_setup_templates_async_setup_templates()
568  try:
569  self._async_template_startup_async_template_startup(None, log_template_error)
570  except Exception as err: # noqa: BLE001
571  preview_callback(None, None, None, str(err))
572  return self._call_on_remove_callbacks_call_on_remove_callbacks
573 
574  async def async_added_to_hass(self) -> None:
575  """Run when entity about to be added to hass."""
576  self._async_setup_templates_async_setup_templates()
577 
578  async_at_start(self.hasshass, self._async_template_startup_async_template_startup)
579 
580  async def async_update(self) -> None:
581  """Call for forced update."""
582  assert self._template_result_info_template_result_info
583  self._template_result_info_template_result_info.async_refresh()
584 
585  async def async_run_script(
586  self,
587  script: Script,
588  *,
589  run_variables: _VarsType | None = None,
590  context: Context | None = None,
591  ) -> None:
592  """Run an action script."""
593  if run_variables is None:
594  run_variables = {}
595  await script.async_run(
596  run_variables={
597  "this": TemplateStateFromEntityId(self.hasshass, self.entity_identity_identity_id),
598  **self._render_variables_render_variables(),
599  **run_variables,
600  },
601  context=context,
602  )
None async_run_script(self, Script script, *_VarsType|None run_variables=None, Context|None context=None)
None _async_template_startup(self, HomeAssistant|None _hass, Callable[[int, str], None]|None log_fn=None)
CALLBACK_TYPE async_start_preview(self, Callable[[str|None, Mapping[str, Any]|None, dict[str, bool|set[str]]|None, str|None,], None,] preview_callback)
None add_template_attribute(self, str attribute, Template template, Callable[[Any], Any]|None validator=None, Callable[[Any], None]|None on_update=None, bool none_on_template_error=False)
None _add_attribute_template(self, str attribute_key, Template attribute_template)
None _handle_results(self, Event[EventStateChangedData]|None event, list[TrackTemplateResult] updates)
None __init__(self, HomeAssistant hass, *Template|None availability_template=None, Template|None icon_template=None, Template|None entity_picture_template=None, dict[str, Template]|None attribute_templates=None, ConfigType|None config=None, str|None fallback_name=None, str|None unique_id=None)
None __init__(self, Entity entity, str attribute, Template template, Callable[[Any], Any]|None validator=None, Callable[[Any], None]|None on_update=None, bool|None none_on_template_error=False)
None handle_result(self, Event[EventStateChangedData]|None event, Template template, str|TemplateError|None last_result, str|TemplateError result)
CalculatedState _async_calculate_state(self)
Definition: entity.py:1059
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
str|UndefinedType|None name(self)
Definition: entity.py:738
None async_set_context(self, Context context)
Definition: entity.py:937
dict[str, Any] rewrite_common_legacy_to_modern_conf(HomeAssistant hass, dict[str, Any] entity_cfg, dict[str, str]|None extra_legacy_fields=None)
vol.Schema make_template_entity_common_schema(str default_name)
str validate_state(str state)
Definition: core.py:243
TrackTemplateResultInfo async_track_template_result(HomeAssistant hass, Sequence[TrackTemplate] track_templates, TrackTemplateResultListener action, bool strict=False, Callable[[int, str], None]|None log_fn=None, bool has_super_template=False)
Definition: event.py:1345
CALLBACK_TYPE async_at_start(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)
Definition: start.py:61
bool result_as_boolean(Any|None template_result)
Definition: template.py:1277
vol.Schema make_template_entity_base_schema(str default_name)