Home Assistant Unofficial Reference 2024.12.1
condition.py
Go to the documentation of this file.
1 """Offer reusable conditions."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import deque
7 from collections.abc import Callable, Container, Generator
8 from contextlib import contextmanager
9 from datetime import datetime, time as dt_time, timedelta
10 import functools as ft
11 import logging
12 import re
13 import sys
14 from typing import Any, Protocol, cast
15 
16 import voluptuous as vol
17 
18 from homeassistant.components import zone as zone_cmp
19 from homeassistant.components.sensor import SensorDeviceClass
20 from homeassistant.const import (
21  ATTR_DEVICE_CLASS,
22  ATTR_GPS_ACCURACY,
23  ATTR_LATITUDE,
24  ATTR_LONGITUDE,
25  CONF_ABOVE,
26  CONF_AFTER,
27  CONF_ATTRIBUTE,
28  CONF_BEFORE,
29  CONF_BELOW,
30  CONF_CONDITION,
31  CONF_DEVICE_ID,
32  CONF_ENABLED,
33  CONF_ENTITY_ID,
34  CONF_FOR,
35  CONF_ID,
36  CONF_MATCH,
37  CONF_STATE,
38  CONF_VALUE_TEMPLATE,
39  CONF_WEEKDAY,
40  CONF_ZONE,
41  ENTITY_MATCH_ALL,
42  ENTITY_MATCH_ANY,
43  STATE_UNAVAILABLE,
44  STATE_UNKNOWN,
45  SUN_EVENT_SUNRISE,
46  SUN_EVENT_SUNSET,
47  WEEKDAYS,
48 )
49 from homeassistant.core import HomeAssistant, State, callback
50 from homeassistant.exceptions import (
51  ConditionError,
52  ConditionErrorContainer,
53  ConditionErrorIndex,
54  ConditionErrorMessage,
55  HomeAssistantError,
56  TemplateError,
57 )
58 from homeassistant.loader import IntegrationNotFound, async_get_integration
59 from homeassistant.util.async_ import run_callback_threadsafe
60 import homeassistant.util.dt as dt_util
61 
62 from . import config_validation as cv, entity_registry as er
63 from .sun import get_astral_event_date
64 from .template import Template, render_complex
65 from .trace import (
66  TraceElement,
67  trace_append_element,
68  trace_path,
69  trace_path_get,
70  trace_stack_cv,
71  trace_stack_pop,
72  trace_stack_push,
73  trace_stack_top,
74 )
75 from .typing import ConfigType, TemplateVarsType
76 
77 ASYNC_FROM_CONFIG_FORMAT = "async_{}_from_config"
78 FROM_CONFIG_FORMAT = "{}_from_config"
79 VALIDATE_CONFIG_FORMAT = "{}_validate_config"
80 
81 _PLATFORM_ALIASES = {
82  "and": None,
83  "device": "device_automation",
84  "not": None,
85  "numeric_state": None,
86  "or": None,
87  "state": None,
88  "sun": None,
89  "template": None,
90  "time": None,
91  "trigger": None,
92  "zone": None,
93 }
94 
95 INPUT_ENTITY_ID = re.compile(
96  r"^input_(?:select|text|number|boolean|datetime)\.(?!.+__)(?!_)[\da-z_]+(?<!_)$"
97 )
98 
99 
100 class ConditionProtocol(Protocol):
101  """Define the format of device_condition modules.
102 
103  Each module must define either CONDITION_SCHEMA or async_validate_condition_config.
104  """
105 
106  CONDITION_SCHEMA: vol.Schema
107 
109  self, hass: HomeAssistant, config: ConfigType
110  ) -> ConfigType:
111  """Validate config."""
112 
114  self, hass: HomeAssistant, config: ConfigType
115  ) -> ConditionCheckerType:
116  """Evaluate state based on configuration."""
117 
118 
119 type ConditionCheckerType = Callable[[HomeAssistant, TemplateVarsType], bool | None]
120 
121 
122 def condition_trace_append(variables: TemplateVarsType, path: str) -> TraceElement:
123  """Append a TraceElement to trace[path]."""
124  trace_element = TraceElement(variables, path)
125  trace_append_element(trace_element)
126  return trace_element
127 
128 
129 def condition_trace_set_result(result: bool, **kwargs: Any) -> None:
130  """Set the result of TraceElement at the top of the stack."""
131  node = trace_stack_top(trace_stack_cv)
132 
133  # The condition function may be called directly, in which case tracing
134  # is not setup
135  if not node:
136  return
137 
138  node.set_result(result=result, **kwargs)
139 
140 
141 def condition_trace_update_result(**kwargs: Any) -> None:
142  """Update the result of TraceElement at the top of the stack."""
143  node = trace_stack_top(trace_stack_cv)
144 
145  # The condition function may be called directly, in which case tracing
146  # is not setup
147  if not node:
148  return
149 
150  node.update_result(**kwargs)
151 
152 
153 @contextmanager
154 def trace_condition(variables: TemplateVarsType) -> Generator[TraceElement]:
155  """Trace condition evaluation."""
156  should_pop = True
157  trace_element = trace_stack_top(trace_stack_cv)
158  if trace_element and trace_element.reuse_by_child:
159  should_pop = False
160  trace_element.reuse_by_child = False
161  else:
162  trace_element = condition_trace_append(variables, trace_path_get())
163  trace_stack_push(trace_stack_cv, trace_element)
164  try:
165  yield trace_element
166  except Exception as ex:
167  trace_element.set_error(ex)
168  raise
169  finally:
170  if should_pop:
171  trace_stack_pop(trace_stack_cv)
172 
173 
174 def trace_condition_function(condition: ConditionCheckerType) -> ConditionCheckerType:
175  """Wrap a condition function to enable basic tracing."""
176 
177  @ft.wraps(condition)
178  def wrapper(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool | None:
179  """Trace condition."""
180  with trace_condition(variables):
181  result = condition(hass, variables)
182  condition_trace_update_result(result=result)
183  return result
184 
185  return wrapper
186 
187 
189  hass: HomeAssistant, config: ConfigType
190 ) -> ConditionProtocol | None:
191  platform = config[CONF_CONDITION]
192  platform = _PLATFORM_ALIASES.get(platform, platform)
193  if platform is None:
194  return None
195  try:
196  integration = await async_get_integration(hass, platform)
197  except IntegrationNotFound:
198  raise HomeAssistantError(
199  f'Invalid condition "{platform}" specified {config}'
200  ) from None
201  try:
202  return await integration.async_get_platform("condition")
203  except ImportError:
204  raise HomeAssistantError(
205  f"Integration '{platform}' does not provide condition support"
206  ) from None
207 
208 
210  hass: HomeAssistant,
211  config: ConfigType,
212 ) -> ConditionCheckerType:
213  """Turn a condition configuration into a method.
214 
215  Should be run on the event loop.
216  """
217  factory: Any = None
218  platform = await _async_get_condition_platform(hass, config)
219 
220  if platform is None:
221  condition = config.get(CONF_CONDITION)
222  for fmt in (ASYNC_FROM_CONFIG_FORMAT, FROM_CONFIG_FORMAT):
223  factory = getattr(sys.modules[__name__], fmt.format(condition), None)
224 
225  if factory:
226  break
227  else:
228  factory = platform.async_condition_from_config
229 
230  # Check if condition is not enabled
231  if CONF_ENABLED in config:
232  enabled = config[CONF_ENABLED]
233  if isinstance(enabled, Template):
234  try:
235  enabled = enabled.async_render(limited=True)
236  except TemplateError as err:
237  raise HomeAssistantError(
238  f"Error rendering condition enabled template: {err}"
239  ) from err
240  if not enabled:
241 
242  @trace_condition_function
243  def disabled_condition(
244  hass: HomeAssistant, variables: TemplateVarsType = None
245  ) -> bool | None:
246  """Condition not enabled, will act as if it didn't exist."""
247  return None
248 
249  return disabled_condition
250 
251  # Check for partials to properly determine if coroutine function
252  check_factory = factory
253  while isinstance(check_factory, ft.partial):
254  check_factory = check_factory.func
255 
256  if asyncio.iscoroutinefunction(check_factory):
257  return cast(ConditionCheckerType, await factory(hass, config))
258  return cast(ConditionCheckerType, factory(config))
259 
260 
262  hass: HomeAssistant, config: ConfigType
263 ) -> ConditionCheckerType:
264  """Create multi condition matcher using 'AND'."""
265  checks = [await async_from_config(hass, entry) for entry in config["conditions"]]
266 
267  @trace_condition_function
268  def if_and_condition(
269  hass: HomeAssistant, variables: TemplateVarsType = None
270  ) -> bool:
271  """Test and condition."""
272  errors = []
273  for index, check in enumerate(checks):
274  try:
275  with trace_path(["conditions", str(index)]):
276  if check(hass, variables) is False:
277  return False
278  except ConditionError as ex:
279  errors.append(
280  ConditionErrorIndex("and", index=index, total=len(checks), error=ex)
281  )
282 
283  # Raise the errors if no check was false
284  if errors:
285  raise ConditionErrorContainer("and", errors=errors)
286 
287  return True
288 
289  return if_and_condition
290 
291 
293  hass: HomeAssistant, config: ConfigType
294 ) -> ConditionCheckerType:
295  """Create multi condition matcher using 'OR'."""
296  checks = [await async_from_config(hass, entry) for entry in config["conditions"]]
297 
298  @trace_condition_function
299  def if_or_condition(
300  hass: HomeAssistant, variables: TemplateVarsType = None
301  ) -> bool:
302  """Test or condition."""
303  errors = []
304  for index, check in enumerate(checks):
305  try:
306  with trace_path(["conditions", str(index)]):
307  if check(hass, variables) is True:
308  return True
309  except ConditionError as ex:
310  errors.append(
311  ConditionErrorIndex("or", index=index, total=len(checks), error=ex)
312  )
313 
314  # Raise the errors if no check was true
315  if errors:
316  raise ConditionErrorContainer("or", errors=errors)
317 
318  return False
319 
320  return if_or_condition
321 
322 
324  hass: HomeAssistant, config: ConfigType
325 ) -> ConditionCheckerType:
326  """Create multi condition matcher using 'NOT'."""
327  checks = [await async_from_config(hass, entry) for entry in config["conditions"]]
328 
329  @trace_condition_function
330  def if_not_condition(
331  hass: HomeAssistant, variables: TemplateVarsType = None
332  ) -> bool:
333  """Test not condition."""
334  errors = []
335  for index, check in enumerate(checks):
336  try:
337  with trace_path(["conditions", str(index)]):
338  if check(hass, variables):
339  return False
340  except ConditionError as ex:
341  errors.append(
342  ConditionErrorIndex("not", index=index, total=len(checks), error=ex)
343  )
344 
345  # Raise the errors if no check was true
346  if errors:
347  raise ConditionErrorContainer("not", errors=errors)
348 
349  return True
350 
351  return if_not_condition
352 
353 
355  hass: HomeAssistant,
356  entity: str | State | None,
357  below: float | str | None = None,
358  above: float | str | None = None,
359  value_template: Template | None = None,
360  variables: TemplateVarsType = None,
361 ) -> bool:
362  """Test a numeric state condition."""
363  return run_callback_threadsafe(
364  hass.loop,
365  async_numeric_state,
366  hass,
367  entity,
368  below,
369  above,
370  value_template,
371  variables,
372  ).result()
373 
374 
376  hass: HomeAssistant,
377  entity: str | State | None,
378  below: float | str | None = None,
379  above: float | str | None = None,
380  value_template: Template | None = None,
381  variables: TemplateVarsType = None,
382  attribute: str | None = None,
383 ) -> bool:
384  """Test a numeric state condition."""
385  if entity is None:
386  raise ConditionErrorMessage("numeric_state", "no entity specified")
387 
388  if isinstance(entity, str):
389  entity_id = entity
390 
391  if (entity := hass.states.get(entity)) is None:
392  raise ConditionErrorMessage("numeric_state", f"unknown entity {entity_id}")
393  else:
394  entity_id = entity.entity_id
395 
396  if attribute is not None and attribute not in entity.attributes:
398  False,
399  message=f"attribute '{attribute}' of entity {entity_id} does not exist",
400  )
401  return False
402 
403  value: Any = None
404  if value_template is None:
405  if attribute is None:
406  value = entity.state
407  else:
408  value = entity.attributes.get(attribute)
409  else:
410  variables = dict(variables or {})
411  variables["state"] = entity
412  try:
413  value = value_template.async_render(variables)
414  except TemplateError as ex:
415  raise ConditionErrorMessage(
416  "numeric_state", f"template error: {ex}"
417  ) from ex
418 
419  # Known states or attribute values that never match the numeric condition
420  if value in (None, STATE_UNAVAILABLE, STATE_UNKNOWN):
422  False,
423  message=f"value '{value}' is non-numeric and treated as False",
424  )
425  return False
426 
427  try:
428  fvalue = float(value)
429  except (ValueError, TypeError) as ex:
430  raise ConditionErrorMessage(
431  "numeric_state",
432  f"entity {entity_id} state '{value}' cannot be processed as a number",
433  ) from ex
434 
435  if below is not None:
436  if isinstance(below, str):
437  if not (below_entity := hass.states.get(below)):
438  raise ConditionErrorMessage(
439  "numeric_state", f"unknown 'below' entity {below}"
440  )
441  if below_entity.state in (
442  STATE_UNAVAILABLE,
443  STATE_UNKNOWN,
444  ):
445  return False
446  try:
447  if fvalue >= float(below_entity.state):
449  False,
450  state=fvalue,
451  wanted_state_below=float(below_entity.state),
452  )
453  return False
454  except (ValueError, TypeError) as ex:
455  raise ConditionErrorMessage(
456  "numeric_state",
457  (
458  f"the 'below' entity {below} state '{below_entity.state}'"
459  " cannot be processed as a number"
460  ),
461  ) from ex
462  elif fvalue >= below:
463  condition_trace_set_result(False, state=fvalue, wanted_state_below=below)
464  return False
465 
466  if above is not None:
467  if isinstance(above, str):
468  if not (above_entity := hass.states.get(above)):
469  raise ConditionErrorMessage(
470  "numeric_state", f"unknown 'above' entity {above}"
471  )
472  if above_entity.state in (
473  STATE_UNAVAILABLE,
474  STATE_UNKNOWN,
475  ):
476  return False
477  try:
478  if fvalue <= float(above_entity.state):
480  False,
481  state=fvalue,
482  wanted_state_above=float(above_entity.state),
483  )
484  return False
485  except (ValueError, TypeError) as ex:
486  raise ConditionErrorMessage(
487  "numeric_state",
488  (
489  f"the 'above' entity {above} state '{above_entity.state}'"
490  " cannot be processed as a number"
491  ),
492  ) from ex
493  elif fvalue <= above:
494  condition_trace_set_result(False, state=fvalue, wanted_state_above=above)
495  return False
496 
497  condition_trace_set_result(True, state=fvalue)
498  return True
499 
500 
501 def async_numeric_state_from_config(config: ConfigType) -> ConditionCheckerType:
502  """Wrap action method with state based condition."""
503  entity_ids = config.get(CONF_ENTITY_ID, [])
504  attribute = config.get(CONF_ATTRIBUTE)
505  below = config.get(CONF_BELOW)
506  above = config.get(CONF_ABOVE)
507  value_template = config.get(CONF_VALUE_TEMPLATE)
508 
509  @trace_condition_function
510  def if_numeric_state(
511  hass: HomeAssistant, variables: TemplateVarsType = None
512  ) -> bool:
513  """Test numeric state condition."""
514  errors = []
515  for index, entity_id in enumerate(entity_ids):
516  try:
517  with trace_path(["entity_id", str(index)]), trace_condition(variables):
518  if not async_numeric_state(
519  hass,
520  entity_id,
521  below,
522  above,
523  value_template,
524  variables,
525  attribute,
526  ):
527  return False
528  except ConditionError as ex:
529  errors.append(
531  "numeric_state", index=index, total=len(entity_ids), error=ex
532  )
533  )
534 
535  # Raise the errors if no check was false
536  if errors:
537  raise ConditionErrorContainer("numeric_state", errors=errors)
538 
539  return True
540 
541  return if_numeric_state
542 
543 
544 def state(
545  hass: HomeAssistant,
546  entity: str | State | None,
547  req_state: Any,
548  for_period: timedelta | None = None,
549  attribute: str | None = None,
550  variables: TemplateVarsType = None,
551 ) -> bool:
552  """Test if state matches requirements.
553 
554  Async friendly.
555  """
556  if entity is None:
557  raise ConditionErrorMessage("state", "no entity specified")
558 
559  if isinstance(entity, str):
560  entity_id = entity
561 
562  if (entity := hass.states.get(entity)) is None:
563  raise ConditionErrorMessage("state", f"unknown entity {entity_id}")
564  else:
565  entity_id = entity.entity_id
566 
567  if attribute is not None and attribute not in entity.attributes:
569  False,
570  message=f"attribute '{attribute}' of entity {entity_id} does not exist",
571  )
572  return False
573 
574  assert isinstance(entity, State)
575 
576  if attribute is None:
577  value: Any = entity.state
578  else:
579  value = entity.attributes.get(attribute)
580 
581  if not isinstance(req_state, list):
582  req_state = [req_state]
583 
584  is_state = False
585  for req_state_value in req_state:
586  state_value = req_state_value
587  if (
588  isinstance(req_state_value, str)
589  and INPUT_ENTITY_ID.match(req_state_value) is not None
590  ):
591  if not (state_entity := hass.states.get(req_state_value)):
592  raise ConditionErrorMessage(
593  "state", f"the 'state' entity {req_state_value} is unavailable"
594  )
595  state_value = state_entity.state
596  is_state = value == state_value
597  if is_state:
598  break
599 
600  if for_period is None or not is_state:
601  condition_trace_set_result(is_state, state=value, wanted_state=state_value)
602  return is_state
603 
604  try:
605  for_period = cv.positive_time_period(render_complex(for_period, variables))
606  except TemplateError as ex:
607  raise ConditionErrorMessage("state", f"template error: {ex}") from ex
608  except vol.Invalid as ex:
609  raise ConditionErrorMessage("state", f"schema error: {ex}") from ex
610 
611  duration = dt_util.utcnow() - cast(timedelta, for_period)
612  duration_ok = duration > entity.last_changed
613  condition_trace_set_result(duration_ok, state=value, duration=duration)
614  return duration_ok
615 
616 
617 def state_from_config(config: ConfigType) -> ConditionCheckerType:
618  """Wrap action method with state based condition."""
619  entity_ids = config.get(CONF_ENTITY_ID, [])
620  req_states: str | list[str] = config.get(CONF_STATE, [])
621  for_period = config.get(CONF_FOR)
622  attribute = config.get(CONF_ATTRIBUTE)
623  match = config.get(CONF_MATCH, ENTITY_MATCH_ALL)
624 
625  if not isinstance(req_states, list):
626  req_states = [req_states]
627 
628  @trace_condition_function
629  def if_state(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
630  """Test if condition."""
631  errors = []
632  result: bool = match != ENTITY_MATCH_ANY
633  for index, entity_id in enumerate(entity_ids):
634  try:
635  with trace_path(["entity_id", str(index)]), trace_condition(variables):
636  if state(
637  hass, entity_id, req_states, for_period, attribute, variables
638  ):
639  result = True
640  elif match == ENTITY_MATCH_ALL:
641  return False
642  except ConditionError as ex:
643  errors.append(
645  "state", index=index, total=len(entity_ids), error=ex
646  )
647  )
648 
649  # Raise the errors if no check was false
650  if errors:
651  raise ConditionErrorContainer("state", errors=errors)
652 
653  return result
654 
655  return if_state
656 
657 
658 def sun(
659  hass: HomeAssistant,
660  before: str | None = None,
661  after: str | None = None,
662  before_offset: timedelta | None = None,
663  after_offset: timedelta | None = None,
664 ) -> bool:
665  """Test if current time matches sun requirements."""
666  utcnow = dt_util.utcnow()
667  today = dt_util.as_local(utcnow).date()
668  before_offset = before_offset or timedelta(0)
669  after_offset = after_offset or timedelta(0)
670 
671  sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, today)
672  sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, today)
673 
674  has_sunrise_condition = SUN_EVENT_SUNRISE in (before, after)
675  has_sunset_condition = SUN_EVENT_SUNSET in (before, after)
676 
677  after_sunrise = today > dt_util.as_local(cast(datetime, sunrise)).date()
678  if after_sunrise and has_sunrise_condition:
679  tomorrow = today + timedelta(days=1)
680  sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, tomorrow)
681 
682  after_sunset = today > dt_util.as_local(cast(datetime, sunset)).date()
683  if after_sunset and has_sunset_condition:
684  tomorrow = today + timedelta(days=1)
685  sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, tomorrow)
686 
687  # Special case: before sunrise OR after sunset
688  # This will handle the very rare case in the polar region when the sun rises/sets
689  # but does not set/rise.
690  # However this entire condition does not handle those full days of darkness
691  # or light, the following should be used instead:
692  #
693  # condition:
694  # condition: state
695  # entity_id: sun.sun
696  # state: 'above_horizon' (or 'below_horizon')
697  #
698  if before == SUN_EVENT_SUNRISE and after == SUN_EVENT_SUNSET:
699  wanted_time_before = cast(datetime, sunrise) + before_offset
700  condition_trace_update_result(wanted_time_before=wanted_time_before)
701  wanted_time_after = cast(datetime, sunset) + after_offset
702  condition_trace_update_result(wanted_time_after=wanted_time_after)
703  return utcnow < wanted_time_before or utcnow > wanted_time_after
704 
705  if sunrise is None and has_sunrise_condition:
706  # There is no sunrise today
707  condition_trace_set_result(False, message="no sunrise today")
708  return False
709 
710  if sunset is None and has_sunset_condition:
711  # There is no sunset today
712  condition_trace_set_result(False, message="no sunset today")
713  return False
714 
715  if before == SUN_EVENT_SUNRISE:
716  wanted_time_before = cast(datetime, sunrise) + before_offset
717  condition_trace_update_result(wanted_time_before=wanted_time_before)
718  if utcnow > wanted_time_before:
719  return False
720 
721  if before == SUN_EVENT_SUNSET:
722  wanted_time_before = cast(datetime, sunset) + before_offset
723  condition_trace_update_result(wanted_time_before=wanted_time_before)
724  if utcnow > wanted_time_before:
725  return False
726 
727  if after == SUN_EVENT_SUNRISE:
728  wanted_time_after = cast(datetime, sunrise) + after_offset
729  condition_trace_update_result(wanted_time_after=wanted_time_after)
730  if utcnow < wanted_time_after:
731  return False
732 
733  if after == SUN_EVENT_SUNSET:
734  wanted_time_after = cast(datetime, sunset) + after_offset
735  condition_trace_update_result(wanted_time_after=wanted_time_after)
736  if utcnow < wanted_time_after:
737  return False
738 
739  return True
740 
741 
742 def sun_from_config(config: ConfigType) -> ConditionCheckerType:
743  """Wrap action method with sun based condition."""
744  before = config.get("before")
745  after = config.get("after")
746  before_offset = config.get("before_offset")
747  after_offset = config.get("after_offset")
748 
749  @trace_condition_function
750  def sun_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
751  """Validate time based if-condition."""
752  return sun(hass, before, after, before_offset, after_offset)
753 
754  return sun_if
755 
756 
758  hass: HomeAssistant, value_template: Template, variables: TemplateVarsType = None
759 ) -> bool:
760  """Test if template condition matches."""
761  return run_callback_threadsafe(
762  hass.loop, async_template, hass, value_template, variables
763  ).result()
764 
765 
767  hass: HomeAssistant,
768  value_template: Template,
769  variables: TemplateVarsType = None,
770  trace_result: bool = True,
771 ) -> bool:
772  """Test if template condition matches."""
773  try:
774  info = value_template.async_render_to_info(variables, parse_result=False)
775  value = info.result()
776  except TemplateError as ex:
777  raise ConditionErrorMessage("template", str(ex)) from ex
778 
779  result = value.lower() == "true"
780  if trace_result:
781  condition_trace_set_result(result, entities=list(info.entities))
782  return result
783 
784 
785 def async_template_from_config(config: ConfigType) -> ConditionCheckerType:
786  """Wrap action method with state based condition."""
787  value_template = cast(Template, config.get(CONF_VALUE_TEMPLATE))
788 
789  @trace_condition_function
790  def template_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
791  """Validate template based if-condition."""
792  return async_template(hass, value_template, variables)
793 
794  return template_if
795 
796 
797 def time(
798  hass: HomeAssistant,
799  before: dt_time | str | None = None,
800  after: dt_time | str | None = None,
801  weekday: str | Container[str] | None = None,
802 ) -> bool:
803  """Test if local time condition matches.
804 
805  Handle the fact that time is continuous and we may be testing for
806  a period that crosses midnight. In that case it is easier to test
807  for the opposite. "(23:59 <= now < 00:01)" would be the same as
808  "not (00:01 <= now < 23:59)".
809  """
810  now = dt_util.now()
811  now_time = now.time()
812 
813  if after is None:
814  after = dt_time(0)
815  elif isinstance(after, str):
816  if not (after_entity := hass.states.get(after)):
817  raise ConditionErrorMessage("time", f"unknown 'after' entity {after}")
818  if after_entity.domain == "input_datetime":
819  after = dt_time(
820  after_entity.attributes.get("hour", 23),
821  after_entity.attributes.get("minute", 59),
822  after_entity.attributes.get("second", 59),
823  )
824  elif after_entity.domain == "time" and after_entity.state not in (
825  STATE_UNAVAILABLE,
826  STATE_UNKNOWN,
827  ):
828  after = datetime.strptime(after_entity.state, "%H:%M:%S").time()
829  elif (
830  after_entity.attributes.get(ATTR_DEVICE_CLASS)
831  == SensorDeviceClass.TIMESTAMP
832  ) and after_entity.state not in (
833  STATE_UNAVAILABLE,
834  STATE_UNKNOWN,
835  ):
836  after_datetime = dt_util.parse_datetime(after_entity.state)
837  if after_datetime is None:
838  return False
839  after = dt_util.as_local(after_datetime).time()
840  else:
841  return False
842 
843  if before is None:
844  before = dt_time(23, 59, 59, 999999)
845  elif isinstance(before, str):
846  if not (before_entity := hass.states.get(before)):
847  raise ConditionErrorMessage("time", f"unknown 'before' entity {before}")
848  if before_entity.domain == "input_datetime":
849  before = dt_time(
850  before_entity.attributes.get("hour", 23),
851  before_entity.attributes.get("minute", 59),
852  before_entity.attributes.get("second", 59),
853  )
854  elif before_entity.domain == "time":
855  try:
856  before = datetime.strptime(before_entity.state, "%H:%M:%S").time()
857  except ValueError:
858  return False
859  elif (
860  before_entity.attributes.get(ATTR_DEVICE_CLASS)
861  == SensorDeviceClass.TIMESTAMP
862  ) and before_entity.state not in (
863  STATE_UNAVAILABLE,
864  STATE_UNKNOWN,
865  ):
866  before_timedatime = dt_util.parse_datetime(before_entity.state)
867  if before_timedatime is None:
868  return False
869  before = dt_util.as_local(before_timedatime).time()
870  else:
871  return False
872 
873  if after < before:
874  condition_trace_update_result(after=after, now_time=now_time, before=before)
875  if not after <= now_time < before:
876  return False
877  else:
878  condition_trace_update_result(after=after, now_time=now_time, before=before)
879  if before <= now_time < after:
880  return False
881 
882  if weekday is not None:
883  now_weekday = WEEKDAYS[now.weekday()]
884 
885  condition_trace_update_result(weekday=weekday, now_weekday=now_weekday)
886  if (
887  isinstance(weekday, str)
888  and weekday != now_weekday
889  or now_weekday not in weekday
890  ):
891  return False
892 
893  return True
894 
895 
896 def time_from_config(config: ConfigType) -> ConditionCheckerType:
897  """Wrap action method with time based condition."""
898  before = config.get(CONF_BEFORE)
899  after = config.get(CONF_AFTER)
900  weekday = config.get(CONF_WEEKDAY)
901 
902  @trace_condition_function
903  def time_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
904  """Validate time based if-condition."""
905  return time(hass, before, after, weekday)
906 
907  return time_if
908 
909 
910 def zone(
911  hass: HomeAssistant,
912  zone_ent: str | State | None,
913  entity: str | State | None,
914 ) -> bool:
915  """Test if zone-condition matches.
916 
917  Async friendly.
918  """
919  if zone_ent is None:
920  raise ConditionErrorMessage("zone", "no zone specified")
921 
922  if isinstance(zone_ent, str):
923  zone_ent_id = zone_ent
924 
925  if (zone_ent := hass.states.get(zone_ent)) is None:
926  raise ConditionErrorMessage("zone", f"unknown zone {zone_ent_id}")
927 
928  if entity is None:
929  raise ConditionErrorMessage("zone", "no entity specified")
930 
931  if isinstance(entity, str):
932  entity_id = entity
933 
934  if (entity := hass.states.get(entity)) is None:
935  raise ConditionErrorMessage("zone", f"unknown entity {entity_id}")
936  else:
937  entity_id = entity.entity_id
938 
939  if entity.state in (
940  STATE_UNAVAILABLE,
941  STATE_UNKNOWN,
942  ):
943  return False
944 
945  latitude = entity.attributes.get(ATTR_LATITUDE)
946  longitude = entity.attributes.get(ATTR_LONGITUDE)
947 
948  if latitude is None:
949  raise ConditionErrorMessage(
950  "zone", f"entity {entity_id} has no 'latitude' attribute"
951  )
952 
953  if longitude is None:
954  raise ConditionErrorMessage(
955  "zone", f"entity {entity_id} has no 'longitude' attribute"
956  )
957 
958  return zone_cmp.in_zone(
959  zone_ent, latitude, longitude, entity.attributes.get(ATTR_GPS_ACCURACY, 0)
960  )
961 
962 
963 def zone_from_config(config: ConfigType) -> ConditionCheckerType:
964  """Wrap action method with zone based condition."""
965  entity_ids = config.get(CONF_ENTITY_ID, [])
966  zone_entity_ids = config.get(CONF_ZONE, [])
967 
968  @trace_condition_function
969  def if_in_zone(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
970  """Test if condition."""
971  errors = []
972 
973  all_ok = True
974  for entity_id in entity_ids:
975  entity_ok = False
976  for zone_entity_id in zone_entity_ids:
977  try:
978  if zone(hass, zone_entity_id, entity_id):
979  entity_ok = True
980  except ConditionErrorMessage as ex:
981  errors.append(
983  "zone",
984  (
985  f"error matching {entity_id} with {zone_entity_id}:"
986  f" {ex.message}"
987  ),
988  )
989  )
990 
991  if not entity_ok:
992  all_ok = False
993 
994  # Raise the errors only if no definitive result was found
995  if errors and not all_ok:
996  raise ConditionErrorContainer("zone", errors=errors)
997 
998  return all_ok
999 
1000  return if_in_zone
1001 
1002 
1004  hass: HomeAssistant, config: ConfigType
1005 ) -> ConditionCheckerType:
1006  """Test a trigger condition."""
1007  trigger_id = config[CONF_ID]
1008 
1009  @trace_condition_function
1010  def trigger_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
1011  """Validate trigger based if-condition."""
1012  return (
1013  variables is not None
1014  and "trigger" in variables
1015  and variables["trigger"].get("id") in trigger_id
1016  )
1017 
1018  return trigger_if
1019 
1020 
1022  hass: HomeAssistant, config: ConfigType
1023 ) -> ConfigType:
1024  """Validate numeric_state condition config."""
1025 
1026  registry = er.async_get(hass)
1027  config = dict(config)
1028  config[CONF_ENTITY_ID] = er.async_validate_entity_ids(
1029  registry, cv.entity_ids_or_uuids(config[CONF_ENTITY_ID])
1030  )
1031  return config
1032 
1033 
1034 def state_validate_config(hass: HomeAssistant, config: ConfigType) -> ConfigType:
1035  """Validate state condition config."""
1036 
1037  registry = er.async_get(hass)
1038  config = dict(config)
1039  config[CONF_ENTITY_ID] = er.async_validate_entity_ids(
1040  registry, cv.entity_ids_or_uuids(config[CONF_ENTITY_ID])
1041  )
1042  return config
1043 
1044 
1046  hass: HomeAssistant, config: ConfigType
1047 ) -> ConfigType:
1048  """Validate config."""
1049  condition = config[CONF_CONDITION]
1050  if condition in ("and", "not", "or"):
1051  conditions = []
1052  for sub_cond in config["conditions"]:
1053  sub_cond = await async_validate_condition_config(hass, sub_cond)
1054  conditions.append(sub_cond)
1055  config["conditions"] = conditions
1056  return config
1057 
1058  platform = await _async_get_condition_platform(hass, config)
1059  if platform is not None and hasattr(platform, "async_validate_condition_config"):
1060  return await platform.async_validate_condition_config(hass, config)
1061  if platform is None and condition in ("numeric_state", "state"):
1062  validator = cast(
1063  Callable[[HomeAssistant, ConfigType], ConfigType],
1064  getattr(sys.modules[__name__], VALIDATE_CONFIG_FORMAT.format(condition)),
1065  )
1066  return validator(hass, config)
1067 
1068  return config
1069 
1070 
1072  hass: HomeAssistant, conditions: list[ConfigType]
1073 ) -> list[ConfigType | Template]:
1074  """Validate config."""
1075  # No gather here because async_validate_condition_config is unlikely
1076  # to suspend and the overhead of creating many tasks is not worth it
1077  return [await async_validate_condition_config(hass, cond) for cond in conditions]
1078 
1079 
1081  hass: HomeAssistant,
1082  condition_configs: list[ConfigType],
1083  logger: logging.Logger,
1084  name: str,
1085 ) -> Callable[[TemplateVarsType], bool]:
1086  """AND all conditions."""
1087  checks: list[ConditionCheckerType] = [
1088  await async_from_config(hass, condition_config)
1089  for condition_config in condition_configs
1090  ]
1091 
1092  def check_conditions(variables: TemplateVarsType = None) -> bool:
1093  """AND all conditions."""
1094  errors: list[ConditionErrorIndex] = []
1095  for index, check in enumerate(checks):
1096  try:
1097  with trace_path(["condition", str(index)]):
1098  if check(hass, variables) is False:
1099  return False
1100  except ConditionError as ex:
1101  errors.append(
1103  "condition", index=index, total=len(checks), error=ex
1104  )
1105  )
1106 
1107  if errors:
1108  logger.warning(
1109  "Error evaluating condition in '%s':\n%s",
1110  name,
1111  ConditionErrorContainer("condition", errors=errors),
1112  )
1113  return False
1114 
1115  return True
1116 
1117  return check_conditions
1118 
1119 
1120 @callback
1121 def async_extract_entities(config: ConfigType | Template) -> set[str]:
1122  """Extract entities from a condition."""
1123  referenced: set[str] = set()
1124  to_process = deque([config])
1125 
1126  while to_process:
1127  config = to_process.popleft()
1128  if isinstance(config, Template):
1129  continue
1130 
1131  condition = config[CONF_CONDITION]
1132 
1133  if condition in ("and", "not", "or"):
1134  to_process.extend(config["conditions"])
1135  continue
1136 
1137  entity_ids = config.get(CONF_ENTITY_ID)
1138 
1139  if isinstance(entity_ids, str):
1140  entity_ids = [entity_ids]
1141 
1142  if entity_ids is not None:
1143  referenced.update(entity_ids)
1144 
1145  return referenced
1146 
1147 
1148 @callback
1149 def async_extract_devices(config: ConfigType | Template) -> set[str]:
1150  """Extract devices from a condition."""
1151  referenced = set()
1152  to_process = deque([config])
1153 
1154  while to_process:
1155  config = to_process.popleft()
1156  if isinstance(config, Template):
1157  continue
1158 
1159  condition = config[CONF_CONDITION]
1160 
1161  if condition in ("and", "not", "or"):
1162  to_process.extend(config["conditions"])
1163  continue
1164 
1165  if condition != "device":
1166  continue
1167 
1168  if (device_id := config.get(CONF_DEVICE_ID)) is not None:
1169  referenced.add(device_id)
1170 
1171  return referenced
ConfigType async_validate_condition_config(self, HomeAssistant hass, ConfigType config)
Definition: condition.py:110
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
condition.ConditionCheckerType async_condition_from_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:51
bool zone(HomeAssistant hass, str|State|None zone_ent, str|State|None entity)
Definition: condition.py:914
bool state(HomeAssistant hass, str|State|None entity, Any req_state, timedelta|None for_period=None, str|None attribute=None, TemplateVarsType variables=None)
Definition: condition.py:551
None condition_trace_set_result(bool result, **Any kwargs)
Definition: condition.py:129
ConditionProtocol|None _async_get_condition_platform(HomeAssistant hass, ConfigType config)
Definition: condition.py:190
ConditionCheckerType async_and_from_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:263
Callable[[TemplateVarsType], bool] async_conditions_from_config(HomeAssistant hass, list[ConfigType] condition_configs, logging.Logger logger, str name)
Definition: condition.py:1085
bool numeric_state(HomeAssistant hass, str|State|None entity, float|str|None below=None, float|str|None above=None, Template|None value_template=None, TemplateVarsType variables=None)
Definition: condition.py:361
ConditionCheckerType async_not_from_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:325
None condition_trace_update_result(**Any kwargs)
Definition: condition.py:141
ConditionCheckerType state_from_config(ConfigType config)
Definition: condition.py:617
bool template(HomeAssistant hass, Template value_template, TemplateVarsType variables=None)
Definition: condition.py:759
set[str] async_extract_devices(ConfigType|Template config)
Definition: condition.py:1149
ConditionCheckerType async_from_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:212
bool async_numeric_state(HomeAssistant hass, str|State|None entity, float|str|None below=None, float|str|None above=None, Template|None value_template=None, TemplateVarsType variables=None, str|None attribute=None)
Definition: condition.py:383
ConditionCheckerType time_from_config(ConfigType config)
Definition: condition.py:896
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
Definition: condition.py:802
ConditionCheckerType trace_condition_function(ConditionCheckerType condition)
Definition: condition.py:174
ConditionCheckerType async_or_from_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:294
list[ConfigType|Template] async_validate_conditions_config(HomeAssistant hass, list[ConfigType] conditions)
Definition: condition.py:1073
set[str] async_extract_entities(ConfigType|Template config)
Definition: condition.py:1121
ConfigType async_validate_condition_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:1047
ConditionCheckerType sun_from_config(ConfigType config)
Definition: condition.py:742
ConditionCheckerType async_template_from_config(ConfigType config)
Definition: condition.py:785
TraceElement condition_trace_append(TemplateVarsType variables, str path)
Definition: condition.py:122
ConfigType state_validate_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:1034
bool async_template(HomeAssistant hass, Template value_template, TemplateVarsType variables=None, bool trace_result=True)
Definition: condition.py:771
Generator[TraceElement] trace_condition(TemplateVarsType variables)
Definition: condition.py:154
ConditionCheckerType async_numeric_state_from_config(ConfigType config)
Definition: condition.py:501
ConditionCheckerType zone_from_config(ConfigType config)
Definition: condition.py:963
ConfigType numeric_state_validate_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:1023
bool sun(HomeAssistant hass, str|None before=None, str|None after=None, timedelta|None before_offset=None, timedelta|None after_offset=None)
Definition: condition.py:664
ConditionCheckerType async_trigger_from_config(HomeAssistant hass, ConfigType config)
Definition: condition.py:1005
datetime.datetime|None get_astral_event_date(HomeAssistant hass, str event, datetime.date|datetime.datetime|None date=None)
Definition: sun.py:117
Any render_complex(Any value, TemplateVarsType variables=None, bool limited=False, bool parse_result=True)
Definition: template.py:240
None trace_stack_pop(ContextVar[list[Any]|None] trace_stack_var)
Definition: trace.py:146
Generator[None] trace_path(str|list[str] suffix)
Definition: trace.py:251
None trace_append_element(TraceElement trace_element, int|None maxlen=None)
Definition: trace.py:184
Integration async_get_integration(HomeAssistant hass, str domain)
Definition: loader.py:1354
def check(config_dir, secrets=False)