Home Assistant Unofficial Reference 2024.12.1
core.py
Go to the documentation of this file.
1 """Core components of Home Assistant.
2 
3 Home Assistant is a Home Automation framework for observing the state
4 of entities and react to changes.
5 """
6 
7 from __future__ import annotations
8 
9 import asyncio
10 from collections import UserDict, defaultdict
11 from collections.abc import (
12  Callable,
13  Collection,
14  Coroutine,
15  Iterable,
16  KeysView,
17  Mapping,
18  ValuesView,
19 )
20 import concurrent.futures
21 from dataclasses import dataclass
22 import datetime
23 import enum
24 import functools
25 import inspect
26 import logging
27 import re
28 import threading
29 import time
30 from time import monotonic
31 from typing import (
32  TYPE_CHECKING,
33  Any,
34  Final,
35  Generic,
36  NotRequired,
37  Self,
38  TypedDict,
39  cast,
40  overload,
41 )
42 
43 from propcache import cached_property, under_cached_property
44 from typing_extensions import TypeVar
45 import voluptuous as vol
46 
47 from . import util
48 from .const import (
49  ATTR_DOMAIN,
50  ATTR_FRIENDLY_NAME,
51  ATTR_SERVICE,
52  ATTR_SERVICE_DATA,
53  COMPRESSED_STATE_ATTRIBUTES,
54  COMPRESSED_STATE_CONTEXT,
55  COMPRESSED_STATE_LAST_CHANGED,
56  COMPRESSED_STATE_LAST_UPDATED,
57  COMPRESSED_STATE_STATE,
58  EVENT_CALL_SERVICE,
59  EVENT_CORE_CONFIG_UPDATE,
60  EVENT_HOMEASSISTANT_CLOSE,
61  EVENT_HOMEASSISTANT_FINAL_WRITE,
62  EVENT_HOMEASSISTANT_START,
63  EVENT_HOMEASSISTANT_STARTED,
64  EVENT_HOMEASSISTANT_STOP,
65  EVENT_LOGGING_CHANGED,
66  EVENT_SERVICE_REGISTERED,
67  EVENT_SERVICE_REMOVED,
68  EVENT_STATE_CHANGED,
69  EVENT_STATE_REPORTED,
70  MATCH_ALL,
71  MAX_EXPECTED_ENTITY_IDS,
72  MAX_LENGTH_EVENT_EVENT_TYPE,
73  MAX_LENGTH_STATE_STATE,
74  __version__,
75 )
76 from .exceptions import (
77  HomeAssistantError,
78  InvalidEntityFormatError,
79  InvalidStateError,
80  MaxLengthExceeded,
81  ServiceNotFound,
82  ServiceValidationError,
83  Unauthorized,
84 )
85 from .helpers.deprecation import (
86  DeferredDeprecatedAlias,
87  DeprecatedConstantEnum,
88  EnumWithDeprecatedMembers,
89  all_with_deprecated_constants,
90  check_if_deprecated_constant,
91  dir_with_deprecated_constants,
92 )
93 from .helpers.json import json_bytes, json_fragment
94 from .helpers.typing import VolSchemaType
95 from .util import dt as dt_util
96 from .util.async_ import (
97  cancelling,
98  create_eager_task,
99  get_scheduled_timer_handles,
100  run_callback_threadsafe,
101  shutdown_run_callback_threadsafe,
102 )
103 from .util.event_type import EventType
104 from .util.executor import InterruptibleThreadPoolExecutor
105 from .util.hass_dict import HassDict
106 from .util.json import JsonObjectType
107 from .util.read_only_dict import ReadOnlyDict
108 from .util.timeout import TimeoutManager
109 from .util.ulid import ulid_at_time, ulid_now
110 
111 # Typing imports that create a circular dependency
112 if TYPE_CHECKING:
113  from .auth import AuthManager
114  from .components.http import HomeAssistantHTTP
115  from .config_entries import ConfigEntries
116  from .helpers.entity import StateInfo
117 
118 STOPPING_STAGE_SHUTDOWN_TIMEOUT = 20
119 STOP_STAGE_SHUTDOWN_TIMEOUT = 100
120 FINAL_WRITE_STAGE_SHUTDOWN_TIMEOUT = 60
121 CLOSE_STAGE_SHUTDOWN_TIMEOUT = 30
122 
123 
124 _SENTINEL = object()
125 _DataT = TypeVar("_DataT", bound=Mapping[str, Any], default=Mapping[str, Any])
126 type CALLBACK_TYPE = Callable[[], None]
127 
128 DOMAIN = "homeassistant"
129 
130 # How long to wait to log tasks that are blocking
131 BLOCK_LOG_TIMEOUT = 60
132 
133 type ServiceResponse = JsonObjectType | None
134 type EntityServiceResponse = dict[str, ServiceResponse]
135 
136 
138  enum.StrEnum,
139  metaclass=EnumWithDeprecatedMembers,
140  deprecated={
141  "DEFAULT": ("core_config.ConfigSource.DEFAULT", "2025.11.0"),
142  "DISCOVERED": ("core_config.ConfigSource.DISCOVERED", "2025.11.0"),
143  "STORAGE": ("core_config.ConfigSource.STORAGE", "2025.11.0"),
144  "YAML": ("core_config.ConfigSource.YAML", "2025.11.0"),
145  },
146 ):
147  """Source of core configuration."""
148 
149  DEFAULT = "default"
150  DISCOVERED = "discovered"
151  STORAGE = "storage"
152  YAML = "yaml"
153 
154 
155 class EventStateEventData(TypedDict):
156  """Base class for EVENT_STATE_CHANGED and EVENT_STATE_REPORTED data."""
157 
158  entity_id: str
159  new_state: State | None
160 
161 
163  """EVENT_STATE_CHANGED data.
164 
165  A state changed event is fired when on state write the state is changed.
166  """
167 
168  old_state: State | None
169 
170 
172  """EVENT_STATE_REPORTED data.
173 
174  A state reported event is fired when on state write the state is unchanged.
175  """
176 
177  old_last_reported: datetime.datetime
178 
179 
180 # SOURCE_* are deprecated as of Home Assistant 2022.2, use ConfigSource instead
181 _DEPRECATED_SOURCE_DISCOVERED = DeprecatedConstantEnum(
182  ConfigSource.DISCOVERED, "2025.1"
183 )
184 _DEPRECATED_SOURCE_STORAGE = DeprecatedConstantEnum(ConfigSource.STORAGE, "2025.1")
185 _DEPRECATED_SOURCE_YAML = DeprecatedConstantEnum(ConfigSource.YAML, "2025.1")
186 
187 
189  # pylint: disable-next=import-outside-toplevel
190  from . import core_config
191 
192  return core_config.Config
193 
194 
195 # The Config class was moved to core_config in Home Assistant 2024.11
196 _DEPRECATED_Config = DeferredDeprecatedAlias(
197  _deprecated_core_config, "homeassistant.core_config.Config", "2025.11"
198 )
199 
200 
201 # How long to wait until things that run on startup have to finish.
202 TIMEOUT_EVENT_START = 15
203 
204 
205 EVENTS_EXCLUDED_FROM_MATCH_ALL = {
206  EVENT_HOMEASSISTANT_CLOSE,
207  EVENT_STATE_REPORTED,
208 }
209 
210 _LOGGER = logging.getLogger(__name__)
211 
212 
213 @functools.lru_cache(MAX_EXPECTED_ENTITY_IDS)
214 def split_entity_id(entity_id: str) -> tuple[str, str]:
215  """Split a state entity ID into domain and object ID."""
216  domain, _, object_id = entity_id.partition(".")
217  if not domain or not object_id:
218  raise ValueError(f"Invalid entity ID {entity_id}")
219  return domain, object_id
220 
221 
222 _OBJECT_ID = r"(?!_)[\da-z_]+(?<!_)"
223 _DOMAIN = r"(?!.+__)" + _OBJECT_ID
224 VALID_DOMAIN = re.compile(r"^" + _DOMAIN + r"$")
225 VALID_ENTITY_ID = re.compile(r"^" + _DOMAIN + r"\." + _OBJECT_ID + r"$")
226 
227 
228 @functools.lru_cache(64)
229 def valid_domain(domain: str) -> bool:
230  """Test if a domain a valid format."""
231  return VALID_DOMAIN.match(domain) is not None
232 
233 
234 @functools.lru_cache(512)
235 def valid_entity_id(entity_id: str) -> bool:
236  """Test if an entity ID is a valid format.
237 
238  Format: <domain>.<entity> where both are slugs.
239  """
240  return VALID_ENTITY_ID.match(entity_id) is not None
241 
242 
243 def validate_state(state: str) -> str:
244  """Validate a state, raise if it not valid."""
245  if len(state) > MAX_LENGTH_STATE_STATE:
246  raise InvalidStateError(
247  f"Invalid state with length {len(state)}. "
248  "State max length is 255 characters."
249  )
250  return state
251 
252 
253 def callback[_CallableT: Callable[..., Any]](func: _CallableT) -> _CallableT:
254  """Annotation to mark method as safe to call from within the event loop."""
255  setattr(func, "_hass_callback", True)
256  return func
257 
258 
259 def is_callback(func: Callable[..., Any]) -> bool:
260  """Check if function is safe to be called in the event loop."""
261  return getattr(func, "_hass_callback", False) is True
262 
263 
264 def is_callback_check_partial(target: Callable[..., Any]) -> bool:
265  """Check if function is safe to be called in the event loop.
266 
267  This version of is_callback will also check if the target is a partial
268  and walk the chain of partials to find the original function.
269  """
270  check_target = target
271  while isinstance(check_target, functools.partial):
272  check_target = check_target.func
273  return is_callback(check_target)
274 
275 
276 class _Hass(threading.local):
277  """Container which makes a HomeAssistant instance available to the event loop."""
278 
279  hass: HomeAssistant | None = None
280 
281 
282 _hass = _Hass()
283 
284 
285 @callback
286 def async_get_hass() -> HomeAssistant:
287  """Return the HomeAssistant instance.
288 
289  Raises HomeAssistantError when called from the wrong thread.
290 
291  This should be used where it's very cumbersome or downright impossible to pass
292  hass to the code which needs it.
293  """
294  if not (hass := async_get_hass_or_none()):
295  raise HomeAssistantError("async_get_hass called from the wrong thread")
296  return hass
297 
298 
299 def async_get_hass_or_none() -> HomeAssistant | None:
300  """Return the HomeAssistant instance or None.
301 
302  Returns None when called from the wrong thread.
303  """
304  return _hass.hass
305 
306 
307 class ReleaseChannel(enum.StrEnum):
308  BETA = "beta"
309  DEV = "dev"
310  NIGHTLY = "nightly"
311  STABLE = "stable"
312 
313 
314 @callback
315 def get_release_channel() -> ReleaseChannel:
316  """Find release channel based on version number."""
317  version = __version__
318  if "dev0" in version:
319  return ReleaseChannel.DEV
320  if "dev" in version:
321  return ReleaseChannel.NIGHTLY
322  if "b" in version:
323  return ReleaseChannel.BETA
324  return ReleaseChannel.STABLE
325 
326 
327 @enum.unique
328 class HassJobType(enum.Enum):
329  """Represent a job type."""
330 
331  Coroutinefunction = 1
332  Callback = 2
333  Executor = 3
334 
335 
336 class HassJob[**_P, _R_co]:
337  """Represent a job to be run later.
338 
339  We check the callable type in advance
340  so we can avoid checking it every time
341  we run the job.
342  """
343 
344  __slots__ = ("target", "name", "_cancel_on_shutdown", "_cache")
345 
346  def __init__(
347  self,
348  target: Callable[_P, _R_co],
349  name: str | None = None,
350  *,
351  cancel_on_shutdown: bool | None = None,
352  job_type: HassJobType | None = None,
353  ) -> None:
354  """Create a job object."""
355  self.target: Final = target
356  self.namename = name
357  self._cancel_on_shutdown_cancel_on_shutdown = cancel_on_shutdown
358  self._cache: dict[str, Any] = {}
359  if job_type:
360  # Pre-set the cached_property so we
361  # avoid the function call
362  self._cache["job_type"] = job_type
363 
364  @under_cached_property
365  def job_type(self) -> HassJobType:
366  """Return the job type."""
367  return get_hassjob_callable_job_type(self.target)
368 
369  @property
370  def cancel_on_shutdown(self) -> bool | None:
371  """Return if the job should be cancelled on shutdown."""
372  return self._cancel_on_shutdown_cancel_on_shutdown
373 
374  def __repr__(self) -> str:
375  """Return the job."""
376  return f"<Job {self.name} {self.job_type} {self.target}>"
377 
378 
379 @dataclass(frozen=True)
381  """Container for a HassJob and arguments."""
382 
383  job: HassJob[..., Coroutine[Any, Any, Any] | Any]
384  args: Iterable[Any]
385 
386 
387 def get_hassjob_callable_job_type(target: Callable[..., Any]) -> HassJobType:
388  """Determine the job type from the callable."""
389  # Check for partials to properly determine if coroutine function
390  check_target = target
391  while isinstance(check_target, functools.partial):
392  check_target = check_target.func
393 
394  if asyncio.iscoroutinefunction(check_target):
395  return HassJobType.Coroutinefunction
396  if is_callback(check_target):
397  return HassJobType.Callback
398  if asyncio.iscoroutine(check_target):
399  raise ValueError("Coroutine not allowed to be passed to HassJob")
400  return HassJobType.Executor
401 
402 
403 class CoreState(enum.Enum):
404  """Represent the current state of Home Assistant."""
405 
406  not_running = "NOT_RUNNING"
407  starting = "STARTING"
408  running = "RUNNING"
409  stopping = "STOPPING"
410  final_write = "FINAL_WRITE"
411  stopped = "STOPPED"
412 
413  def __str__(self) -> str:
414  """Return the event."""
415  return self.value
416 
417 
419  """Root object of the Home Assistant home automation."""
420 
421  auth: AuthManager
422  http: HomeAssistantHTTP = None # type: ignore[assignment]
423  config_entries: ConfigEntries = None # type: ignore[assignment]
424 
425  def __new__(cls, config_dir: str) -> Self:
426  """Set the _hass thread local data."""
427  hass = super().__new__(cls)
428  _hass.hass = hass
429  return hass
430 
431  def __repr__(self) -> str:
432  """Return the representation."""
433  return f"<HomeAssistant {self.state}>"
434 
435  def __init__(self, config_dir: str) -> None:
436  """Initialize new Home Assistant object."""
437  # pylint: disable-next=import-outside-toplevel
438  from . import loader
439 
440  # pylint: disable-next=import-outside-toplevel
441  from .core_config import Config
442 
443  # This is a dictionary that any component can store any data on.
444  self.datadata = HassDict()
445  self.looploop = asyncio.get_running_loop()
446  self._tasks_tasks: set[asyncio.Future[Any]] = set()
447  self._background_tasks: set[asyncio.Future[Any]] = set()
448  self.busbus = EventBus(self)
449  self.servicesservices = ServiceRegistry(self)
450  self.statesstates = StateMachine(self.busbus, self.looploop)
451  self.configconfig = Config(self, config_dir)
452  self.configconfig.async_initialize()
453  self.componentscomponents = loader.Components(self)
454  self.helpershelpers = loader.Helpers(self)
455  self.statestate: CoreState = CoreState.not_running
456  self.exit_codeexit_code: int = 0
457  # If not None, use to signal end-of-loop
458  self._stopped_stopped: asyncio.Event | None = None
459  # Timeout handler for Core/Helper namespace
460  self.timeout: TimeoutManager = TimeoutManager()
461  self._stop_future_stop_future: concurrent.futures.Future[None] | None = None
462  self._shutdown_jobs: list[HassJobWithArgs] = []
464  max_workers=1, thread_name_prefix="ImportExecutor"
465  )
466  self.loop_thread_idloop_thread_id = getattr(self.looploop, "_thread_id")
467 
468  def verify_event_loop_thread(self, what: str) -> None:
469  """Report and raise if we are not running in the event loop thread."""
470  if self.loop_thread_idloop_thread_id != threading.get_ident():
471  # frame is a circular import, so we import it here
472  from .helpers import frame # pylint: disable=import-outside-toplevel
473 
474  frame.report_non_thread_safe_operation(what)
475 
476  @property
477  def _active_tasks(self) -> set[asyncio.Future[Any]]:
478  """Return all active tasks.
479 
480  This property is used in bootstrap to log all active tasks
481  so we can identify what is blocking startup.
482 
483  This property is marked as private to avoid accidental use
484  as it is not guaranteed to be present in future versions.
485  """
486  return self._tasks_tasks
487 
488  @cached_property
489  def is_running(self) -> bool:
490  """Return if Home Assistant is running."""
491  return self.statestate in (CoreState.starting, CoreState.running)
492 
493  @cached_property
494  def is_stopping(self) -> bool:
495  """Return if Home Assistant is stopping."""
496  return self.statestate in (CoreState.stopping, CoreState.final_write)
497 
498  def set_state(self, state: CoreState) -> None:
499  """Set the current state."""
500  self.statestate = state
501  for prop in ("is_running", "is_stopping"):
502  self.__dict__.pop(prop, None)
503 
504  def start(self) -> int:
505  """Start Home Assistant.
506 
507  Note: This function is only used for testing.
508  For regular use, use "await hass.run()".
509  """
510  # Register the async start
511  _future = asyncio.run_coroutine_threadsafe(self.async_startasync_start(), self.looploop)
512  # Run forever
513  # Block until stopped
514  _LOGGER.info("Starting Home Assistant core loop")
515  self.looploop.run_forever()
516  # The future is never retrieved but we still hold a reference to it
517  # to prevent the task from being garbage collected prematurely.
518  del _future
519  return self.exit_codeexit_code
520 
521  async def async_run(self, *, attach_signals: bool = True) -> int:
522  """Home Assistant main entry point.
523 
524  Start Home Assistant and block until stopped.
525 
526  This method is a coroutine.
527  """
528  if self.statestate is not CoreState.not_running:
529  raise RuntimeError("Home Assistant is already running")
530 
531  # _async_stop will set this instead of stopping the loop
532  self._stopped_stopped = asyncio.Event()
533 
534  await self.async_startasync_start()
535  if attach_signals:
536  # pylint: disable-next=import-outside-toplevel
537  from .helpers.signal import async_register_signal_handling
538 
540 
541  await self._stopped_stopped.wait()
542  return self.exit_codeexit_code
543 
544  async def async_start(self) -> None:
545  """Finalize startup from inside the event loop.
546 
547  This method is a coroutine.
548  """
549  _LOGGER.info("Starting Home Assistant")
550 
551  self.set_stateset_state(CoreState.starting)
552  self.busbus.async_fire_internal(EVENT_CORE_CONFIG_UPDATE)
553  self.busbus.async_fire_internal(EVENT_HOMEASSISTANT_START)
554 
555  if not self._tasks_tasks:
556  pending: set[asyncio.Future[Any]] | None = None
557  else:
558  _done, pending = await asyncio.wait(
559  self._tasks_tasks, timeout=TIMEOUT_EVENT_START
560  )
561 
562  if pending:
563  _LOGGER.warning(
564  (
565  "Something is blocking Home Assistant from wrapping up the start up"
566  " phase. We're going to continue anyway. Please report the"
567  " following info at"
568  " https://github.com/home-assistant/core/issues: %s"
569  " The system is waiting for tasks: %s"
570  ),
571  ", ".join(self.configconfig.components),
572  self._tasks_tasks,
573  )
574 
575  # Allow automations to set up the start triggers before changing state
576  await asyncio.sleep(0)
577 
578  if self.statestate is not CoreState.starting:
579  _LOGGER.warning(
580  "Home Assistant startup has been interrupted. "
581  "Its state may be inconsistent"
582  )
583  return
584 
585  self.set_stateset_state(CoreState.running)
586  self.busbus.async_fire_internal(EVENT_CORE_CONFIG_UPDATE)
587  self.busbus.async_fire_internal(EVENT_HOMEASSISTANT_STARTED)
588 
589  def add_job[*_Ts](
590  self, target: Callable[[*_Ts], Any] | Coroutine[Any, Any, Any], *args: *_Ts
591  ) -> None:
592  """Add a job to be executed by the event loop or by an executor.
593 
594  If the job is either a coroutine or decorated with @callback, it will be
595  run by the event loop, if not it will be run by an executor.
596 
597  target: target to call.
598  args: parameters for method to call.
599  """
600  if target is None:
601  raise ValueError("Don't call add_job with None")
602  if asyncio.iscoroutine(target):
603  self.looploop.call_soon_threadsafe(
604  functools.partial(self.async_create_task, target, eager_start=True)
605  )
606  return
607  self.looploop.call_soon_threadsafe(
608  functools.partial(self._async_add_hass_job, HassJob(target), *args)
609  )
610 
611  @overload
612  @callback
613  def async_add_job[_R, *_Ts](
614  self,
615  target: Callable[[*_Ts], Coroutine[Any, Any, _R]],
616  *args: *_Ts,
617  eager_start: bool = False,
618  ) -> asyncio.Future[_R] | None: ...
619 
620  @overload
621  @callback
622  def async_add_job[_R, *_Ts](
623  self,
624  target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R],
625  *args: *_Ts,
626  eager_start: bool = False,
627  ) -> asyncio.Future[_R] | None: ...
628 
629  @overload
630  @callback
631  def async_add_job[_R](
632  self,
633  target: Coroutine[Any, Any, _R],
634  *args: Any,
635  eager_start: bool = False,
636  ) -> asyncio.Future[_R] | None: ...
637 
638  @callback
639  def async_add_job[_R, *_Ts](
640  self,
641  target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R]
642  | Coroutine[Any, Any, _R],
643  *args: *_Ts,
644  eager_start: bool = False,
645  ) -> asyncio.Future[_R] | None:
646  """Add a job to be executed by the event loop or by an executor.
647 
648  If the job is either a coroutine or decorated with @callback, it will be
649  run by the event loop, if not it will be run by an executor.
650 
651  This method must be run in the event loop.
652 
653  target: target to call.
654  args: parameters for method to call.
655  """
656  # late import to avoid circular imports
657  from .helpers import frame # pylint: disable=import-outside-toplevel
658 
659  frame.report_usage(
660  "calls `async_add_job`, which should be reviewed against "
661  "https://developers.home-assistant.io/blog/2024/03/13/deprecate_add_run_job"
662  " for replacement options",
663  core_behavior=frame.ReportBehavior.LOG,
664  breaks_in_ha_version="2025.4",
665  )
666 
667  if target is None:
668  raise ValueError("Don't call async_add_job with None")
669 
670  if asyncio.iscoroutine(target):
671  return self.async_create_task(target, eager_start=eager_start)
672 
673  return self._async_add_hass_job(HassJob(target), *args)
674 
675  @overload
676  @callback
677  def async_add_hass_job[_R](
678  self,
679  hassjob: HassJob[..., Coroutine[Any, Any, _R]],
680  *args: Any,
681  eager_start: bool = False,
682  background: bool = False,
683  ) -> asyncio.Future[_R] | None: ...
684 
685  @overload
686  @callback
687  def async_add_hass_job[_R](
688  self,
689  hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
690  *args: Any,
691  eager_start: bool = False,
692  background: bool = False,
693  ) -> asyncio.Future[_R] | None: ...
694 
695  @callback
696  def async_add_hass_job[_R](
697  self,
698  hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
699  *args: Any,
700  eager_start: bool = False,
701  background: bool = False,
702  ) -> asyncio.Future[_R] | None:
703  """Add a HassJob from within the event loop.
704 
705  If eager_start is True, coroutine functions will be scheduled eagerly.
706  If background is True, the task will created as a background task.
707 
708  This method must be run in the event loop.
709  hassjob: HassJob to call.
710  args: parameters for method to call.
711  """
712  # late import to avoid circular imports
713  from .helpers import frame # pylint: disable=import-outside-toplevel
714 
715  frame.report_usage(
716  "calls `async_add_hass_job`, which should be reviewed against "
717  "https://developers.home-assistant.io/blog/2024/04/07/deprecate_add_hass_job"
718  " for replacement options",
719  core_behavior=frame.ReportBehavior.LOG,
720  breaks_in_ha_version="2025.5",
721  )
722 
723  return self._async_add_hass_job(hassjob, *args, background=background)
724 
725  @overload
726  @callback
727  def _async_add_hass_job[_R](
728  self,
729  hassjob: HassJob[..., Coroutine[Any, Any, _R]],
730  *args: Any,
731  background: bool = False,
732  ) -> asyncio.Future[_R] | None: ...
733 
734  @overload
735  @callback
736  def _async_add_hass_job[_R](
737  self,
738  hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
739  *args: Any,
740  background: bool = False,
741  ) -> asyncio.Future[_R] | None: ...
742 
743  @callback
744  def _async_add_hass_job[_R](
745  self,
746  hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
747  *args: Any,
748  background: bool = False,
749  ) -> asyncio.Future[_R] | None:
750  """Add a HassJob from within the event loop.
751 
752  If eager_start is True, coroutine functions will be scheduled eagerly.
753  If background is True, the task will created as a background task.
754 
755  This method must be run in the event loop.
756  hassjob: HassJob to call.
757  args: parameters for method to call.
758  """
759  task: asyncio.Future[_R]
760  # This code path is performance sensitive and uses
761  # if TYPE_CHECKING to avoid the overhead of constructing
762  # the type used for the cast. For history see:
763  # https://github.com/home-assistant/core/pull/71960
764  if hassjob.job_type is HassJobType.Coroutinefunction:
765  if TYPE_CHECKING:
766  hassjob = cast(HassJob[..., Coroutine[Any, Any, _R]], hassjob)
767  task = create_eager_task(
768  hassjob.target(*args), name=hassjob.name, loop=self.looploop
769  )
770  if task.done():
771  return task
772  elif hassjob.job_type is HassJobType.Callback:
773  if TYPE_CHECKING:
774  hassjob = cast(HassJob[..., _R], hassjob)
775  self.looploop.call_soon(hassjob.target, *args)
776  return None
777  else:
778  if TYPE_CHECKING:
779  hassjob = cast(HassJob[..., _R], hassjob)
780  task = self.looploop.run_in_executor(None, hassjob.target, *args)
781 
782  task_bucket = self._background_tasks if background else self._tasks_tasks
783  task_bucket.add(task)
784  task.add_done_callback(task_bucket.remove)
785 
786  return task
787 
789  self, target: Coroutine[Any, Any, Any], name: str | None = None
790  ) -> None:
791  """Add task to the executor pool.
792 
793  target: target to call.
794  """
795  self.looploop.call_soon_threadsafe(
796  functools.partial(
797  self.async_create_task_internal, target, name, eager_start=True
798  )
799  )
800 
801  @callback
802  def async_create_task[_R](
803  self,
804  target: Coroutine[Any, Any, _R],
805  name: str | None = None,
806  eager_start: bool = True,
807  ) -> asyncio.Task[_R]:
808  """Create a task from within the event loop.
809 
810  This method must be run in the event loop. If you are using this in your
811  integration, use the create task methods on the config entry instead.
812 
813  target: target to call.
814  """
815  if self.loop_thread_idloop_thread_id != threading.get_ident():
816  from .helpers import frame # pylint: disable=import-outside-toplevel
817 
818  frame.report_non_thread_safe_operation("hass.async_create_task")
819  return self.async_create_task_internal(target, name, eager_start)
820 
821  @callback
822  def async_create_task_internal[_R](
823  self,
824  target: Coroutine[Any, Any, _R],
825  name: str | None = None,
826  eager_start: bool = True,
827  ) -> asyncio.Task[_R]:
828  """Create a task from within the event loop, internal use only.
829 
830  This method is intended to only be used by core internally
831  and should not be considered a stable API. We will make
832  breaking changes to this function in the future and it
833  should not be used in integrations.
834 
835  This method must be run in the event loop. If you are using this in your
836  integration, use the create task methods on the config entry instead.
837 
838  target: target to call.
839  """
840  if eager_start:
841  task = create_eager_task(target, name=name, loop=self.looploop)
842  if task.done():
843  return task
844  else:
845  # Use loop.create_task
846  # to avoid the extra function call in asyncio.create_task.
847  task = self.looploop.create_task(target, name=name)
848  self._tasks_tasks.add(task)
849  task.add_done_callback(self._tasks_tasks.remove)
850  return task
851 
852  @callback
853  def async_create_background_task[_R](
854  self, target: Coroutine[Any, Any, _R], name: str, eager_start: bool = True
855  ) -> asyncio.Task[_R]:
856  """Create a task from within the event loop.
857 
858  This type of task is for background tasks that usually run for
859  the lifetime of Home Assistant or an integration's setup.
860 
861  A background task is different from a normal task:
862 
863  - Will not block startup
864  - Will be automatically cancelled on shutdown
865  - Calls to async_block_till_done will not wait for completion
866 
867  If you are using this in your integration, use the create task
868  methods on the config entry instead.
869 
870  This method must be run in the event loop.
871  """
872  if eager_start:
873  task = create_eager_task(target, name=name, loop=self.looploop)
874  if task.done():
875  return task
876  else:
877  # Use loop.create_task
878  # to avoid the extra function call in asyncio.create_task.
879  task = self.looploop.create_task(target, name=name)
880  self._background_tasks.add(task)
881  task.add_done_callback(self._background_tasks.remove)
882  return task
883 
884  @callback
885  def async_add_executor_job[*_Ts, _T](
886  self, target: Callable[[*_Ts], _T], *args: *_Ts
887  ) -> asyncio.Future[_T]:
888  """Add an executor job from within the event loop."""
889  task = self.looploop.run_in_executor(None, target, *args)
890 
891  tracked = asyncio.current_task() in self._tasks_tasks
892  task_bucket = self._tasks_tasks if tracked else self._background_tasks
893  task_bucket.add(task)
894  task.add_done_callback(task_bucket.remove)
895 
896  return task
897 
898  @callback
899  def async_add_import_executor_job[*_Ts, _T](
900  self, target: Callable[[*_Ts], _T], *args: *_Ts
901  ) -> asyncio.Future[_T]:
902  """Add an import executor job from within the event loop.
903 
904  The future returned from this method must be awaited in the event loop.
905  """
906  return self.looploop.run_in_executor(self.import_executorimport_executor, target, *args)
907 
908  @overload
909  @callback
910  def async_run_hass_job[_R](
911  self,
912  hassjob: HassJob[..., Coroutine[Any, Any, _R]],
913  *args: Any,
914  background: bool = False,
915  ) -> asyncio.Future[_R] | None: ...
916 
917  @overload
918  @callback
919  def async_run_hass_job[_R](
920  self,
921  hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
922  *args: Any,
923  background: bool = False,
924  ) -> asyncio.Future[_R] | None: ...
925 
926  @callback
927  def async_run_hass_job[_R](
928  self,
929  hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
930  *args: Any,
931  background: bool = False,
932  ) -> asyncio.Future[_R] | None:
933  """Run a HassJob from within the event loop.
934 
935  This method must be run in the event loop.
936 
937  If background is True, the task will created as a background task.
938 
939  hassjob: HassJob
940  args: parameters for method to call.
941  """
942  # This code path is performance sensitive and uses
943  # if TYPE_CHECKING to avoid the overhead of constructing
944  # the type used for the cast. For history see:
945  # https://github.com/home-assistant/core/pull/71960
946  if hassjob.job_type is HassJobType.Callback:
947  if TYPE_CHECKING:
948  hassjob = cast(HassJob[..., _R], hassjob)
949  hassjob.target(*args)
950  return None
951 
952  return self._async_add_hass_job(hassjob, *args, background=background)
953 
954  @overload
955  @callback
956  def async_run_job[_R, *_Ts](
957  self, target: Callable[[*_Ts], Coroutine[Any, Any, _R]], *args: *_Ts
958  ) -> asyncio.Future[_R] | None: ...
959 
960  @overload
961  @callback
962  def async_run_job[_R, *_Ts](
963  self, target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R], *args: *_Ts
964  ) -> asyncio.Future[_R] | None: ...
965 
966  @overload
967  @callback
968  def async_run_job[_R](
969  self, target: Coroutine[Any, Any, _R], *args: Any
970  ) -> asyncio.Future[_R] | None: ...
971 
972  @callback
973  def async_run_job[_R, *_Ts](
974  self,
975  target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R]
976  | Coroutine[Any, Any, _R],
977  *args: *_Ts,
978  ) -> asyncio.Future[_R] | None:
979  """Run a job from within the event loop.
980 
981  This method must be run in the event loop.
982 
983  target: target to call.
984  args: parameters for method to call.
985  """
986  # late import to avoid circular imports
987  from .helpers import frame # pylint: disable=import-outside-toplevel
988 
989  frame.report_usage(
990  "calls `async_run_job`, which should be reviewed against "
991  "https://developers.home-assistant.io/blog/2024/03/13/deprecate_add_run_job"
992  " for replacement options",
993  core_behavior=frame.ReportBehavior.LOG,
994  breaks_in_ha_version="2025.4",
995  )
996 
997  if asyncio.iscoroutine(target):
998  return self.async_create_task(target, eager_start=True)
999 
1000  return self.async_run_hass_job(HassJob(target), *args)
1001 
1002  def block_till_done(self, wait_background_tasks: bool = False) -> None:
1003  """Block until all pending work is done."""
1004  asyncio.run_coroutine_threadsafe(
1005  self.async_block_till_doneasync_block_till_done(wait_background_tasks=wait_background_tasks),
1006  self.looploop,
1007  ).result()
1008 
1009  async def async_block_till_done(self, wait_background_tasks: bool = False) -> None:
1010  """Block until all pending work is done."""
1011  # To flush out any call_soon_threadsafe
1012  await asyncio.sleep(0)
1013  start_time: float | None = None
1014  current_task = asyncio.current_task()
1015  while tasks := [
1016  task
1017  for task in (
1018  self._tasks_tasks | self._background_tasks
1019  if wait_background_tasks
1020  else self._tasks_tasks
1021  )
1022  if task is not current_task and not cancelling(task)
1023  ]:
1024  await self._await_and_log_pending_await_and_log_pending(tasks)
1025 
1026  if start_time is None:
1027  # Avoid calling monotonic() until we know
1028  # we may need to start logging blocked tasks.
1029  start_time = 0
1030  elif start_time == 0:
1031  # If we have waited twice then we set the start
1032  # time
1033  start_time = monotonic()
1034  elif monotonic() - start_time > BLOCK_LOG_TIMEOUT:
1035  # We have waited at least three loops and new tasks
1036  # continue to block. At this point we start
1037  # logging all waiting tasks.
1038  for task in tasks:
1039  _LOGGER.debug("Waiting for task: %s", task)
1040 
1042  self, pending: Collection[asyncio.Future[Any]]
1043  ) -> None:
1044  """Await and log tasks that take a long time."""
1045  wait_time = 0
1046  while pending:
1047  _, pending = await asyncio.wait(pending, timeout=BLOCK_LOG_TIMEOUT)
1048  if not pending:
1049  return
1050  wait_time += BLOCK_LOG_TIMEOUT
1051  for task in pending:
1052  _LOGGER.debug("Waited %s seconds for task: %s", wait_time, task)
1053 
1054  @overload
1055  @callback
1057  self, hassjob: HassJob[..., Coroutine[Any, Any, Any]], *args: Any
1058  ) -> CALLBACK_TYPE: ...
1059 
1060  @overload
1061  @callback
1063  self, hassjob: HassJob[..., Coroutine[Any, Any, Any] | Any], *args: Any
1064  ) -> CALLBACK_TYPE: ...
1065 
1066  @callback
1068  self, hassjob: HassJob[..., Coroutine[Any, Any, Any] | Any], *args: Any
1069  ) -> CALLBACK_TYPE:
1070  """Add a HassJob which will be executed on shutdown.
1071 
1072  This method must be run in the event loop.
1073 
1074  hassjob: HassJob
1075  args: parameters for method to call.
1076 
1077  Returns function to remove the job.
1078  """
1079  job_with_args = HassJobWithArgs(hassjob, args)
1080  self._shutdown_jobs.append(job_with_args)
1081 
1082  @callback
1083  def remove_job() -> None:
1084  self._shutdown_jobs.remove(job_with_args)
1085 
1086  return remove_job
1087 
1088  def stop(self) -> None:
1089  """Stop Home Assistant and shuts down all threads."""
1090  if self.statestate is CoreState.not_running: # just ignore
1091  return
1092  # The future is never retrieved, and we only hold a reference
1093  # to it to prevent it from being garbage collected.
1094  self._stop_future_stop_future = asyncio.run_coroutine_threadsafe(
1095  self.async_stopasync_stop(), self.looploop
1096  )
1097 
1098  async def async_stop(self, exit_code: int = 0, *, force: bool = False) -> None:
1099  """Stop Home Assistant and shuts down all threads.
1100 
1101  The "force" flag commands async_stop to proceed regardless of
1102  Home Assistant's current state. You should not set this flag
1103  unless you're testing.
1104 
1105  This method is a coroutine.
1106  """
1107  if not force:
1108  # Some tests require async_stop to run,
1109  # regardless of the state of the loop.
1110  if self.statestate is CoreState.not_running: # just ignore
1111  return
1112  if self.statestate in [CoreState.stopping, CoreState.final_write]:
1113  _LOGGER.info("Additional call to async_stop was ignored")
1114  return
1115  if self.statestate is CoreState.starting:
1116  # This may not work
1117  _LOGGER.warning(
1118  "Stopping Home Assistant before startup has completed may fail"
1119  )
1120 
1121  # Stage 1 - Run shutdown jobs
1122  try:
1123  async with self.timeout.async_timeout(STOPPING_STAGE_SHUTDOWN_TIMEOUT):
1124  tasks: list[asyncio.Future[Any]] = []
1125  for job in self._shutdown_jobs:
1126  task_or_none = self.async_run_hass_job(job.job, *job.args)
1127  if not task_or_none:
1128  continue
1129  tasks.append(task_or_none)
1130  if tasks:
1131  await asyncio.gather(*tasks, return_exceptions=True)
1132  except TimeoutError:
1133  _LOGGER.warning(
1134  "Timed out waiting for shutdown jobs to complete, the shutdown will"
1135  " continue"
1136  )
1137  self._async_log_running_tasks_async_log_running_tasks("run shutdown jobs")
1138 
1139  # Stage 2 - Stop integrations
1140 
1141  # Keep holding the reference to the tasks but do not allow them
1142  # to block shutdown. Only tasks created after this point will
1143  # be waited for.
1144  running_tasks = self._tasks_tasks
1145  # Avoid clearing here since we want the remove callbacks to fire
1146  # and remove the tasks from the original set which is now running_tasks
1147  self._tasks_tasks = set()
1148 
1149  # Cancel all background tasks
1150  for task in self._background_tasks:
1151  self._tasks_tasks.add(task)
1152  task.add_done_callback(self._tasks_tasks.remove)
1153  task.cancel("Home Assistant is stopping")
1154  self._cancel_cancellable_timers_cancel_cancellable_timers()
1155 
1156  self.exit_codeexit_code = exit_code
1157 
1158  self.set_stateset_state(CoreState.stopping)
1159  self.busbus.async_fire_internal(EVENT_HOMEASSISTANT_STOP)
1160  try:
1161  async with self.timeout.async_timeout(STOP_STAGE_SHUTDOWN_TIMEOUT):
1162  await self.async_block_till_doneasync_block_till_done()
1163  except TimeoutError:
1164  _LOGGER.warning(
1165  "Timed out waiting for integrations to stop, the shutdown will"
1166  " continue"
1167  )
1168  self._async_log_running_tasks_async_log_running_tasks("stop integrations")
1169 
1170  # Stage 3 - Final write
1171  self.set_stateset_state(CoreState.final_write)
1172  self.busbus.async_fire_internal(EVENT_HOMEASSISTANT_FINAL_WRITE)
1173  try:
1174  async with self.timeout.async_timeout(FINAL_WRITE_STAGE_SHUTDOWN_TIMEOUT):
1175  await self.async_block_till_doneasync_block_till_done()
1176  except TimeoutError:
1177  _LOGGER.warning(
1178  "Timed out waiting for final writes to complete, the shutdown will"
1179  " continue"
1180  )
1181  self._async_log_running_tasks_async_log_running_tasks("final write")
1182 
1183  # Stage 4 - Close
1184  self.set_stateset_state(CoreState.not_running)
1185  self.busbus.async_fire_internal(EVENT_HOMEASSISTANT_CLOSE)
1186 
1187  # Make a copy of running_tasks since a task can finish
1188  # while we are awaiting canceled tasks to get their result
1189  # which will result in the set size changing during iteration
1190  for task in list(running_tasks):
1191  if task.done() or cancelling(task):
1192  # Since we made a copy we need to check
1193  # to see if the task finished while we
1194  # were awaiting another task
1195  continue
1196  _LOGGER.warning(
1197  "Task %s was still running after final writes shutdown stage; "
1198  "Integrations should cancel non-critical tasks when receiving "
1199  "the stop event to prevent delaying shutdown",
1200  task,
1201  )
1202  task.cancel("Home Assistant final writes shutdown stage")
1203  try:
1204  async with asyncio.timeout(0.1):
1205  await task
1206  except asyncio.CancelledError:
1207  pass
1208  except TimeoutError:
1209  # Task may be shielded from cancellation.
1210  _LOGGER.exception(
1211  "Task %s could not be canceled during final shutdown stage", task
1212  )
1213  except Exception:
1214  _LOGGER.exception("Task %s error during final shutdown stage", task)
1215 
1216  # Prevent run_callback_threadsafe from scheduling any additional
1217  # callbacks in the event loop as callbacks created on the futures
1218  # it returns will never run after the final `self.async_block_till_done`
1219  # which will cause the futures to block forever when waiting for
1220  # the `result()` which will cause a deadlock when shutting down the executor.
1222 
1223  try:
1224  async with self.timeout.async_timeout(CLOSE_STAGE_SHUTDOWN_TIMEOUT):
1225  await self.async_block_till_doneasync_block_till_done()
1226  except TimeoutError:
1227  _LOGGER.warning(
1228  "Timed out waiting for close event to be processed, the shutdown will"
1229  " continue"
1230  )
1231  self._async_log_running_tasks_async_log_running_tasks("close")
1232 
1233  self.set_stateset_state(CoreState.stopped)
1234  self.import_executorimport_executor.shutdown()
1235 
1236  if self._stopped_stopped is not None:
1237  self._stopped_stopped.set()
1238 
1239  def _cancel_cancellable_timers(self) -> None:
1240  """Cancel timer handles marked as cancellable."""
1241  for handle in get_scheduled_timer_handles(self.looploop):
1242  if (
1243  not handle.cancelled()
1244  and (args := handle._args) # noqa: SLF001
1245  and type(job := args[0]) is HassJob
1246  and job.cancel_on_shutdown
1247  ):
1248  handle.cancel()
1249 
1250  def _async_log_running_tasks(self, stage: str) -> None:
1251  """Log all running tasks."""
1252  for task in self._tasks_tasks:
1253  _LOGGER.warning("Shutdown stage '%s': still running: %s", stage, task)
1254 
1255 
1256 class Context:
1257  """The context that triggered something."""
1258 
1259  __slots__ = ("id", "user_id", "parent_id", "origin_event", "_cache")
1260 
1262  self,
1263  user_id: str | None = None,
1264  parent_id: str | None = None,
1265  id: str | None = None, # pylint: disable=redefined-builtin
1266  ) -> None:
1267  """Init the context."""
1268  self.idid = id or ulid_now()
1269  self.user_iduser_id = user_id
1270  self.parent_idparent_id = parent_id
1271  self.origin_event: Event[Any] | None = None
1272  self._cache: dict[str, Any] = {}
1273 
1274  def __eq__(self, other: object) -> bool:
1275  """Compare contexts."""
1276  return isinstance(other, Context) and self.idid == other.id
1277 
1278  def __copy__(self) -> Context:
1279  """Create a shallow copy of this context."""
1280  return Context(user_id=self.user_iduser_id, parent_id=self.parent_idparent_id, id=self.idid)
1281 
1282  def __deepcopy__(self, memo: dict[int, Any]) -> Context:
1283  """Create a deep copy of this context."""
1284  return Context(user_id=self.user_iduser_id, parent_id=self.parent_idparent_id, id=self.idid)
1285 
1286  @under_cached_property
1287  def _as_dict(self) -> dict[str, str | None]:
1288  """Return a dictionary representation of the context.
1289 
1290  Callers should be careful to not mutate the returned dictionary
1291  as it will mutate the cached version.
1292  """
1293  return {
1294  "id": self.idid,
1295  "parent_id": self.parent_idparent_id,
1296  "user_id": self.user_iduser_id,
1297  }
1298 
1299  def as_dict(self) -> ReadOnlyDict[str, str | None]:
1300  """Return a ReadOnlyDict representation of the context."""
1301  return self._as_read_only_dict_as_read_only_dict
1302 
1303  @under_cached_property
1304  def _as_read_only_dict(self) -> ReadOnlyDict[str, str | None]:
1305  """Return a ReadOnlyDict representation of the context."""
1306  return ReadOnlyDict(self._as_dict_as_dict)
1307 
1308  @under_cached_property
1309  def json_fragment(self) -> json_fragment:
1310  """Return a JSON fragment of the context."""
1311  return json_fragment(json_bytes(self._as_dict_as_dict))
1312 
1313 
1314 class EventOrigin(enum.Enum):
1315  """Represent the origin of an event."""
1316 
1317  local = "LOCAL"
1318  remote = "REMOTE"
1319 
1320  def __str__(self) -> str:
1321  """Return the event."""
1322  return self.value
1323 
1324  @cached_property
1325  def idx(self) -> int:
1326  """Return the index of the origin."""
1327  return next((idx for idx, origin in enumerate(EventOrigin) if origin is self))
1328 
1329 
1330 class Event(Generic[_DataT]):
1331  """Representation of an event within the bus."""
1332 
1333  __slots__ = (
1334  "event_type",
1335  "data",
1336  "origin",
1337  "time_fired_timestamp",
1338  "context",
1339  "_cache",
1340  )
1341 
1343  self,
1344  event_type: EventType[_DataT] | str,
1345  data: _DataT | None = None,
1346  origin: EventOrigin = EventOrigin.local,
1347  time_fired_timestamp: float | None = None,
1348  context: Context | None = None,
1349  ) -> None:
1350  """Initialize a new event."""
1351  self.event_typeevent_type = event_type
1352  self.data: _DataT = data or {} # type: ignore[assignment]
1353  self.originorigin = origin
1354  self.time_fired_timestamptime_fired_timestamp = time_fired_timestamp or time.time()
1355  if not context:
1356  context = Context(id=ulid_at_time(self.time_fired_timestamptime_fired_timestamp))
1357  self.contextcontext = context
1358  if not context.origin_event:
1359  context.origin_event = self
1360  self._cache: dict[str, Any] = {}
1361 
1362  @under_cached_property
1363  def time_fired(self) -> datetime.datetime:
1364  """Return time fired as a timestamp."""
1365  return dt_util.utc_from_timestamp(self.time_fired_timestamptime_fired_timestamp)
1366 
1367  @under_cached_property
1368  def _as_dict(self) -> dict[str, Any]:
1369  """Create a dict representation of this Event.
1370 
1371  Callers should be careful to not mutate the returned dictionary
1372  as it will mutate the cached version.
1373  """
1374  return {
1375  "event_type": self.event_typeevent_type,
1376  "data": self.data,
1377  "origin": self.originorigin.value,
1378  "time_fired": self.time_firedtime_fired.isoformat(),
1379  # _as_dict is marked as protected
1380  # to avoid callers outside of this module
1381  # from misusing it by mistake.
1382  "context": self.contextcontext._as_dict, # noqa: SLF001
1383  }
1384 
1385  def as_dict(self) -> ReadOnlyDict[str, Any]:
1386  """Create a ReadOnlyDict representation of this Event.
1387 
1388  Async friendly.
1389  """
1390  return self._as_read_only_dict_as_read_only_dict
1391 
1392  @under_cached_property
1393  def _as_read_only_dict(self) -> ReadOnlyDict[str, Any]:
1394  """Create a ReadOnlyDict representation of this Event."""
1395  as_dict = self._as_dict_as_dict
1396  data = as_dict["data"]
1397  context = as_dict["context"]
1398  # json_fragment will serialize data from a ReadOnlyDict
1399  # or a normal dict so its ok to have either. We only
1400  # mutate the cache if someone asks for the as_dict version
1401  # to avoid storing multiple copies of the data in memory.
1402  if type(data) is not ReadOnlyDict:
1403  as_dict["data"] = ReadOnlyDict(data)
1404  if type(context) is not ReadOnlyDict:
1405  as_dict["context"] = ReadOnlyDict(context)
1406  return ReadOnlyDict(as_dict)
1407 
1408  @under_cached_property
1409  def json_fragment(self) -> json_fragment:
1410  """Return an event as a JSON fragment."""
1411  return json_fragment(json_bytes(self._as_dict_as_dict))
1412 
1413  def __repr__(self) -> str:
1414  """Return the representation."""
1415  return _event_repr(self.event_typeevent_type, self.originorigin, self.data)
1416 
1417 
1419  event_type: EventType[_DataT] | str, origin: EventOrigin, data: _DataT | None
1420 ) -> str:
1421  """Return the representation."""
1422  if data:
1423  return f"<Event {event_type}[{str(origin)[0]}]: {util.repr_helper(data)}>"
1424 
1425  return f"<Event {event_type}[{str(origin)[0]}]>"
1426 
1427 
1428 _FilterableJobType = tuple[
1429  HassJob[[Event[_DataT]], Coroutine[Any, Any, None] | None], # job
1430  Callable[[_DataT], bool] | None, # event_filter
1431 ]
1432 
1433 
1434 @dataclass(slots=True)
1435 class _OneTimeListener(Generic[_DataT]):
1436  hass: HomeAssistant
1437  listener_job: HassJob[[Event[_DataT]], Coroutine[Any, Any, None] | None]
1438  remove: CALLBACK_TYPE | None = None
1439 
1440  @callback
1441  def __call__(self, event: Event[_DataT]) -> None:
1442  """Remove listener from event bus and then fire listener."""
1443  if not self.removeremove:
1444  # If the listener was already removed, we don't need to do anything
1445  return
1446  self.removeremove()
1447  self.removeremove = None
1448  self.hass.async_run_hass_job(self.listener_job, event)
1449 
1450  def __repr__(self) -> str:
1451  """Return the representation of the listener and source module."""
1452  module = inspect.getmodule(self.listener_job.target)
1453  if module:
1454  return f"<_OneTimeListener {module.__name__}:{self.listener_job.target}>"
1455  return f"<_OneTimeListener {self.listener_job.target}>"
1456 
1457 
1458 # Empty list, used by EventBus.async_fire_internal
1459 EMPTY_LIST: list[Any] = []
1460 
1461 
1462 @functools.lru_cache
1463 def _verify_event_type_length_or_raise(event_type: EventType[_DataT] | str) -> None:
1464  """Verify the length of the event type and raise if too long."""
1465  if len(event_type) > MAX_LENGTH_EVENT_EVENT_TYPE:
1466  raise MaxLengthExceeded(event_type, "event_type", MAX_LENGTH_EVENT_EVENT_TYPE)
1467 
1468 
1469 class EventBus:
1470  """Allow the firing of and listening for events."""
1471 
1472  __slots__ = ("_debug", "_hass", "_listeners", "_match_all_listeners")
1473 
1474  def __init__(self, hass: HomeAssistant) -> None:
1475  """Initialize a new event bus."""
1476  self._listeners: defaultdict[
1477  EventType[Any] | str, list[_FilterableJobType[Any]]
1478  ] = defaultdict(list)
1479  self._match_all_listeners: list[_FilterableJobType[Any]] = []
1480  self._listeners[MATCH_ALL] = self._match_all_listeners
1481  self._hass_hass = hass
1482  self._async_logging_changed_async_logging_changed()
1483  self.async_listenasync_listen(EVENT_LOGGING_CHANGED, self._async_logging_changed_async_logging_changed)
1484 
1485  @callback
1486  def _async_logging_changed(self, event: Event | None = None) -> None:
1487  """Handle logging change."""
1488  self._debug_debug = _LOGGER.isEnabledFor(logging.DEBUG)
1489 
1490  @callback
1491  def async_listeners(self) -> dict[EventType[Any] | str, int]:
1492  """Return dictionary with events and the number of listeners.
1493 
1494  This method must be run in the event loop.
1495  """
1496  return {key: len(listeners) for key, listeners in self._listeners.items()}
1497 
1498  @property
1499  def listeners(self) -> dict[EventType[Any] | str, int]:
1500  """Return dictionary with events and the number of listeners."""
1501  return run_callback_threadsafe(self._hass_hass.loop, self.async_listenersasync_listeners).result()
1502 
1503  def fire(
1504  self,
1505  event_type: EventType[_DataT] | str,
1506  event_data: _DataT | None = None,
1507  origin: EventOrigin = EventOrigin.local,
1508  context: Context | None = None,
1509  ) -> None:
1510  """Fire an event."""
1512  self._hass_hass.loop.call_soon_threadsafe(
1513  self.async_fire_internalasync_fire_internal, event_type, event_data, origin, context
1514  )
1515 
1516  @callback
1518  self,
1519  event_type: EventType[_DataT] | str,
1520  event_data: _DataT | None = None,
1521  origin: EventOrigin = EventOrigin.local,
1522  context: Context | None = None,
1523  time_fired: float | None = None,
1524  ) -> None:
1525  """Fire an event.
1526 
1527  This method must be run in the event loop.
1528  """
1530  if self._hass_hass.loop_thread_id != threading.get_ident():
1531  from .helpers import frame # pylint: disable=import-outside-toplevel
1532 
1533  frame.report_non_thread_safe_operation("hass.bus.async_fire")
1534  return self.async_fire_internalasync_fire_internal(
1535  event_type, event_data, origin, context, time_fired
1536  )
1537 
1538  @callback
1540  self,
1541  event_type: EventType[_DataT] | str,
1542  event_data: _DataT | None = None,
1543  origin: EventOrigin = EventOrigin.local,
1544  context: Context | None = None,
1545  time_fired: float | None = None,
1546  ) -> None:
1547  """Fire an event, for internal use only.
1548 
1549  This method is intended to only be used by core internally
1550  and should not be considered a stable API. We will make
1551  breaking changes to this function in the future and it
1552  should not be used in integrations.
1553 
1554  This method must be run in the event loop.
1555  """
1556  if self._debug_debug:
1557  _LOGGER.debug(
1558  "Bus:Handling %s", _event_repr(event_type, origin, event_data)
1559  )
1560 
1561  listeners = self._listeners.get(event_type, EMPTY_LIST)
1562  if event_type not in EVENTS_EXCLUDED_FROM_MATCH_ALL:
1563  match_all_listeners = self._match_all_listeners
1564  else:
1565  match_all_listeners = EMPTY_LIST
1566 
1567  event: Event[_DataT] | None = None
1568  for job, event_filter in listeners + match_all_listeners:
1569  if event_filter is not None:
1570  try:
1571  if event_data is None or not event_filter(event_data):
1572  continue
1573  except Exception:
1574  _LOGGER.exception("Error in event filter")
1575  continue
1576 
1577  if not event:
1578  event = Event(
1579  event_type,
1580  event_data,
1581  origin,
1582  time_fired,
1583  context,
1584  )
1585 
1586  try:
1587  self._hass_hass.async_run_hass_job(job, event)
1588  except Exception:
1589  _LOGGER.exception("Error running job: %s", job)
1590 
1591  def listen(
1592  self,
1593  event_type: EventType[_DataT] | str,
1594  listener: Callable[[Event[_DataT]], Coroutine[Any, Any, None] | None],
1595  ) -> CALLBACK_TYPE:
1596  """Listen for all events or events of a specific type.
1597 
1598  To listen to all events specify the constant ``MATCH_ALL``
1599  as event_type.
1600  """
1601  async_remove_listener = run_callback_threadsafe(
1602  self._hass_hass.loop, self.async_listenasync_listen, event_type, listener
1603  ).result()
1604 
1605  def remove_listener() -> None:
1606  """Remove the listener."""
1607  run_callback_threadsafe(self._hass_hass.loop, async_remove_listener).result()
1608 
1609  return remove_listener
1610 
1611  @callback
1613  self,
1614  event_type: EventType[_DataT] | str,
1615  listener: Callable[[Event[_DataT]], Coroutine[Any, Any, None] | None],
1616  event_filter: Callable[[_DataT], bool] | None = None,
1617  run_immediately: bool | object = _SENTINEL,
1618  ) -> CALLBACK_TYPE:
1619  """Listen for all events or events of a specific type.
1620 
1621  To listen to all events specify the constant ``MATCH_ALL``
1622  as event_type.
1623 
1624  An optional event_filter, which must be a callable decorated with
1625  @callback that returns a boolean value, determines if the
1626  listener callable should run.
1627 
1628  If run_immediately is passed:
1629  - callbacks will be run right away instead of using call_soon.
1630  - coroutine functions will be scheduled eagerly.
1631 
1632  This method must be run in the event loop.
1633  """
1634  if run_immediately in (True, False):
1635  # late import to avoid circular imports
1636  from .helpers import frame # pylint: disable=import-outside-toplevel
1637 
1638  frame.report_usage(
1639  "calls `async_listen` with run_immediately",
1640  core_behavior=frame.ReportBehavior.LOG,
1641  breaks_in_ha_version="2025.5",
1642  )
1643 
1644  if event_filter is not None and not is_callback_check_partial(event_filter):
1645  raise HomeAssistantError(f"Event filter {event_filter} is not a callback")
1646  filterable_job = (HassJob(listener, f"listen {event_type}"), event_filter)
1647  if event_type == EVENT_STATE_REPORTED:
1648  if not event_filter:
1649  raise HomeAssistantError(
1650  f"Event filter is required for event {event_type}"
1651  )
1652  return self._async_listen_filterable_job_async_listen_filterable_job(event_type, filterable_job)
1653 
1654  @callback
1656  self,
1657  event_type: EventType[_DataT] | str,
1658  filterable_job: _FilterableJobType[_DataT],
1659  ) -> CALLBACK_TYPE:
1660  """Listen for all events or events of a specific type."""
1661  self._listeners[event_type].append(filterable_job)
1662  return functools.partial(
1663  self._async_remove_listener_async_remove_listener, event_type, filterable_job
1664  )
1665 
1667  self,
1668  event_type: EventType[_DataT] | str,
1669  listener: Callable[[Event[_DataT]], Coroutine[Any, Any, None] | None],
1670  ) -> CALLBACK_TYPE:
1671  """Listen once for event of a specific type.
1672 
1673  To listen to all events specify the constant ``MATCH_ALL``
1674  as event_type.
1675 
1676  Returns function to unsubscribe the listener.
1677  """
1678  async_remove_listener = run_callback_threadsafe(
1679  self._hass_hass.loop, self.async_listen_onceasync_listen_once, event_type, listener
1680  ).result()
1681 
1682  def remove_listener() -> None:
1683  """Remove the listener."""
1684  run_callback_threadsafe(self._hass_hass.loop, async_remove_listener).result()
1685 
1686  return remove_listener
1687 
1688  @callback
1690  self,
1691  event_type: EventType[_DataT] | str,
1692  listener: Callable[[Event[_DataT]], Coroutine[Any, Any, None] | None],
1693  run_immediately: bool | object = _SENTINEL,
1694  ) -> CALLBACK_TYPE:
1695  """Listen once for event of a specific type.
1696 
1697  To listen to all events specify the constant ``MATCH_ALL``
1698  as event_type.
1699 
1700  Returns registered listener that can be used with remove_listener.
1701 
1702  This method must be run in the event loop.
1703  """
1704  if run_immediately in (True, False):
1705  # late import to avoid circular imports
1706  from .helpers import frame # pylint: disable=import-outside-toplevel
1707 
1708  frame.report_usage(
1709  "calls `async_listen_once` with run_immediately",
1710  core_behavior=frame.ReportBehavior.LOG,
1711  breaks_in_ha_version="2025.5",
1712  )
1713 
1714  one_time_listener: _OneTimeListener[_DataT] = _OneTimeListener(
1715  self._hass_hass, HassJob(listener)
1716  )
1717  remove = self._async_listen_filterable_job_async_listen_filterable_job(
1718  event_type,
1719  (
1720  HassJob(
1721  one_time_listener,
1722  f"onetime listen {event_type} {listener}",
1723  job_type=HassJobType.Callback,
1724  ),
1725  None,
1726  ),
1727  )
1728  one_time_listener.remove = remove
1729  return remove
1730 
1731  @callback
1733  self,
1734  event_type: EventType[_DataT] | str,
1735  filterable_job: _FilterableJobType[_DataT],
1736  ) -> None:
1737  """Remove a listener of a specific event_type.
1738 
1739  This method must be run in the event loop.
1740  """
1741  try:
1742  self._listeners[event_type].remove(filterable_job)
1743 
1744  # delete event_type list if empty
1745  if not self._listeners[event_type] and event_type != MATCH_ALL:
1746  self._listeners.pop(event_type)
1747  except (KeyError, ValueError):
1748  # KeyError is key event_type listener did not exist
1749  # ValueError if listener did not exist within event_type
1750  _LOGGER.exception(
1751  "Unable to remove unknown job listener %s", filterable_job
1752  )
1753 
1754 
1755 class CompressedState(TypedDict):
1756  """Compressed dict of a state."""
1757 
1758  s: str # COMPRESSED_STATE_STATE
1759  a: ReadOnlyDict[str, Any] # COMPRESSED_STATE_ATTRIBUTES
1760  c: str | dict[str, Any] # COMPRESSED_STATE_CONTEXT
1761  lc: float # COMPRESSED_STATE_LAST_CHANGED
1762  lu: NotRequired[float] # COMPRESSED_STATE_LAST_UPDATED
1763 
1764 
1765 class State:
1766  """Object to represent a state within the state machine.
1767 
1768  entity_id: the entity that is represented.
1769  state: the state of the entity
1770  attributes: extra information on entity and state
1771  last_changed: last time the state was changed.
1772  last_reported: last time the state was reported.
1773  last_updated: last time the state or attributes were changed.
1774  context: Context in which it was created
1775  domain: Domain of this state.
1776  object_id: Object id of this state.
1777  """
1778 
1779  __slots__ = (
1780  "entity_id",
1781  "state",
1782  "attributes",
1783  "last_changed",
1784  "last_reported",
1785  "last_updated",
1786  "context",
1787  "state_info",
1788  "domain",
1789  "object_id",
1790  "last_updated_timestamp",
1791  "_cache",
1792  )
1793 
1795  self,
1796  entity_id: str,
1797  state: str,
1798  attributes: Mapping[str, Any] | None = None,
1799  last_changed: datetime.datetime | None = None,
1800  last_reported: datetime.datetime | None = None,
1801  last_updated: datetime.datetime | None = None,
1802  context: Context | None = None,
1803  validate_entity_id: bool | None = True,
1804  state_info: StateInfo | None = None,
1805  last_updated_timestamp: float | None = None,
1806  ) -> None:
1807  """Initialize a new state."""
1808  self._cache: dict[str, Any] = {}
1809  state = str(state)
1810 
1811  if validate_entity_id and not valid_entity_id(entity_id):
1813  f"Invalid entity id encountered: {entity_id}. "
1814  "Format should be <domain>.<object_id>"
1815  )
1816 
1817  validate_state(state)
1818 
1819  self.entity_identity_id = entity_id
1820  self.statestate = state
1821  # State only creates and expects a ReadOnlyDict so
1822  # there is no need to check for subclassing with
1823  # isinstance here so we can use the faster type check.
1824  if type(attributes) is not ReadOnlyDict:
1825  self.attributesattributes = ReadOnlyDict(attributes or {})
1826  else:
1827  self.attributesattributes = attributes
1828  self.last_reportedlast_reported = last_reported or dt_util.utcnow()
1829  self.last_updatedlast_updated = last_updated or self.last_reportedlast_reported
1830  self.last_changedlast_changed = last_changed or self.last_updatedlast_updated
1831  self.contextcontext = context or Context()
1832  self.state_infostate_info = state_info
1833  self.domain, self.object_idobject_id = split_entity_id(self.entity_identity_id)
1834  # The recorder or the websocket_api will always call the timestamps,
1835  # so we will set the timestamp values here to avoid the overhead of
1836  # the function call in the property we know will always be called.
1837  last_updated = self.last_updatedlast_updated
1838  if not last_updated_timestamp:
1839  last_updated_timestamp = last_updated.timestamp()
1840  self.last_updated_timestamplast_updated_timestamp = last_updated_timestamp
1841  if self.last_changedlast_changed == last_updated:
1842  self._cache["last_changed_timestamp"] = last_updated_timestamp
1843  # If last_reported is the same as last_updated async_set will pass
1844  # the same datetime object for both values so we can use an identity
1845  # check here.
1846  if self.last_reportedlast_reported is last_updated:
1847  self._cache["last_reported_timestamp"] = last_updated_timestamp
1848 
1849  @under_cached_property
1850  def name(self) -> str:
1851  """Name of this state."""
1852  return self.attributesattributes.get(ATTR_FRIENDLY_NAME) or self.object_idobject_id.replace(
1853  "_", " "
1854  )
1855 
1856  @under_cached_property
1857  def last_changed_timestamp(self) -> float:
1858  """Timestamp of last change."""
1859  return self.last_changedlast_changed.timestamp()
1860 
1861  @under_cached_property
1862  def last_reported_timestamp(self) -> float:
1863  """Timestamp of last report."""
1864  return self.last_reportedlast_reported.timestamp()
1865 
1866  @under_cached_property
1867  def _as_dict(self) -> dict[str, Any]:
1868  """Return a dict representation of the State.
1869 
1870  Callers should be careful to not mutate the returned dictionary
1871  as it will mutate the cached version.
1872  """
1873  last_changed_isoformat = self.last_changedlast_changed.isoformat()
1874  if self.last_changedlast_changed == self.last_updatedlast_updated:
1875  last_updated_isoformat = last_changed_isoformat
1876  else:
1877  last_updated_isoformat = self.last_updatedlast_updated.isoformat()
1878  if self.last_changedlast_changed == self.last_reportedlast_reported:
1879  last_reported_isoformat = last_changed_isoformat
1880  else:
1881  last_reported_isoformat = self.last_reportedlast_reported.isoformat()
1882  return {
1883  "entity_id": self.entity_identity_id,
1884  "state": self.statestate,
1885  "attributes": self.attributesattributes,
1886  "last_changed": last_changed_isoformat,
1887  "last_reported": last_reported_isoformat,
1888  "last_updated": last_updated_isoformat,
1889  # _as_dict is marked as protected
1890  # to avoid callers outside of this module
1891  # from misusing it by mistake.
1892  "context": self.contextcontext._as_dict, # noqa: SLF001
1893  }
1894 
1895  def as_dict(
1896  self,
1897  ) -> ReadOnlyDict[str, datetime.datetime | Collection[Any]]:
1898  """Return a ReadOnlyDict representation of the State.
1899 
1900  Async friendly.
1901 
1902  Can be used for JSON serialization.
1903  Ensures: state == State.from_dict(state.as_dict())
1904  """
1905  return self._as_read_only_dict_as_read_only_dict
1906 
1907  @under_cached_property
1909  self,
1910  ) -> ReadOnlyDict[str, datetime.datetime | Collection[Any]]:
1911  """Return a ReadOnlyDict representation of the State."""
1912  as_dict = self._as_dict_as_dict
1913  context = as_dict["context"]
1914  # json_fragment will serialize data from a ReadOnlyDict
1915  # or a normal dict so its ok to have either. We only
1916  # mutate the cache if someone asks for the as_dict version
1917  # to avoid storing multiple copies of the data in memory.
1918  if type(context) is not ReadOnlyDict:
1919  as_dict["context"] = ReadOnlyDict(context)
1920  return ReadOnlyDict(as_dict)
1921 
1922  @under_cached_property
1923  def as_dict_json(self) -> bytes:
1924  """Return a JSON string of the State."""
1925  return json_bytes(self._as_dict_as_dict)
1926 
1927  @under_cached_property
1928  def json_fragment(self) -> json_fragment:
1929  """Return a JSON fragment of the State."""
1930  return json_fragment(self.as_dict_jsonas_dict_json)
1931 
1932  @under_cached_property
1933  def as_compressed_state(self) -> CompressedState:
1934  """Build a compressed dict of a state for adds.
1935 
1936  Omits the lu (last_updated) if it matches (lc) last_changed.
1937 
1938  Sends c (context) as a string if it only contains an id.
1939  """
1940  state_context = self.contextcontext
1941  if state_context.parent_id is None and state_context.user_id is None:
1942  context: dict[str, Any] | str = state_context.id
1943  else:
1944  # _as_dict is marked as protected
1945  # to avoid callers outside of this module
1946  # from misusing it by mistake.
1947  context = state_context._as_dict # noqa: SLF001
1948  compressed_state: CompressedState = {
1949  COMPRESSED_STATE_STATE: self.statestate,
1950  COMPRESSED_STATE_ATTRIBUTES: self.attributesattributes,
1951  COMPRESSED_STATE_CONTEXT: context,
1952  COMPRESSED_STATE_LAST_CHANGED: self.last_changed_timestamplast_changed_timestamp,
1953  }
1954  if self.last_changedlast_changed != self.last_updatedlast_updated:
1955  compressed_state[COMPRESSED_STATE_LAST_UPDATED] = (
1956  self.last_updated_timestamplast_updated_timestamp
1957  )
1958  return compressed_state
1959 
1960  @under_cached_property
1961  def as_compressed_state_json(self) -> bytes:
1962  """Build a compressed JSON key value pair of a state for adds.
1963 
1964  The JSON string is a key value pair of the entity_id and the compressed state.
1965 
1966  It is used for sending multiple states in a single message.
1967  """
1968  return json_bytes({self.entity_identity_id: self.as_compressed_stateas_compressed_state})[1:-1]
1969 
1970  @classmethod
1971  def from_dict(cls, json_dict: dict[str, Any]) -> Self | None:
1972  """Initialize a state from a dict.
1973 
1974  Async friendly.
1975 
1976  Ensures: state == State.from_json_dict(state.to_json_dict())
1977  """
1978  if not (json_dict and "entity_id" in json_dict and "state" in json_dict):
1979  return None
1980 
1981  last_changed = json_dict.get("last_changed")
1982  if isinstance(last_changed, str):
1983  last_changed = dt_util.parse_datetime(last_changed)
1984 
1985  last_updated = json_dict.get("last_updated")
1986  if isinstance(last_updated, str):
1987  last_updated = dt_util.parse_datetime(last_updated)
1988 
1989  last_reported = json_dict.get("last_reported")
1990  if isinstance(last_reported, str):
1991  last_reported = dt_util.parse_datetime(last_reported)
1992 
1993  if context := json_dict.get("context"):
1994  context = Context(id=context.get("id"), user_id=context.get("user_id"))
1995 
1996  return cls(
1997  json_dict["entity_id"],
1998  json_dict["state"],
1999  json_dict.get("attributes"),
2000  last_changed=last_changed,
2001  last_reported=last_reported,
2002  last_updated=last_updated,
2003  context=context,
2004  )
2005 
2006  def expire(self) -> None:
2007  """Mark the state as old.
2008 
2009  We give up the original reference to the context to ensure
2010  the context can be garbage collected by replacing it with
2011  a new one with the same id to ensure the old state
2012  can still be examined for comparison against the new state.
2013 
2014  Since we are always going to fire a EVENT_STATE_CHANGED event
2015  after we remove a state from the state machine we need to make
2016  sure we don't end up holding a reference to the original context
2017  since it can never be garbage collected as each event would
2018  reference the previous one.
2019  """
2020  self.contextcontext = Context(
2021  self.contextcontext.user_id, self.contextcontext.parent_id, self.contextcontext.id
2022  )
2023 
2024  def __repr__(self) -> str:
2025  """Return the representation of the states."""
2026  attrs = f"; {util.repr_helper(self.attributes)}" if self.attributesattributes else ""
2027 
2028  return (
2029  f"<state {self.entity_id}={self.state}{attrs}"
2030  f" @ {dt_util.as_local(self.last_changed).isoformat()}>"
2031  )
2032 
2033 
2034 class States(UserDict[str, State]):
2035  """Container for states, maps entity_id -> State.
2036 
2037  Maintains an additional index:
2038  - domain -> dict[str, State]
2039  """
2040 
2041  def __init__(self) -> None:
2042  """Initialize the container."""
2043  super().__init__()
2044  self._domain_index: defaultdict[str, dict[str, State]] = defaultdict(dict)
2045 
2046  def values(self) -> ValuesView[State]:
2047  """Return the underlying values to avoid __iter__ overhead."""
2048  return self.data.values()
2049 
2050  def __setitem__(self, key: str, entry: State) -> None:
2051  """Add an item."""
2052  self.data[key] = entry
2053  self._domain_index[entry.domain][entry.entity_id] = entry
2054 
2055  def __delitem__(self, key: str) -> None:
2056  """Remove an item."""
2057  entry = self[key]
2058  del self._domain_index[entry.domain][entry.entity_id]
2059  super().__delitem__(key)
2060 
2061  def domain_entity_ids(self, key: str) -> KeysView[str] | tuple[()]:
2062  """Get all entity_ids for a domain."""
2063  # Avoid polluting _domain_index with non-existing domains
2064  if key not in self._domain_index:
2065  return ()
2066  return self._domain_index[key].keys()
2067 
2068  def domain_states(self, key: str) -> ValuesView[State] | tuple[()]:
2069  """Get all states for a domain."""
2070  # Avoid polluting _domain_index with non-existing domains
2071  if key not in self._domain_index:
2072  return ()
2073  return self._domain_index[key].values()
2074 
2075 
2077  """Helper class that tracks the state of different entities."""
2078 
2079  __slots__ = ("_states", "_states_data", "_reservations", "_bus", "_loop")
2080 
2081  def __init__(self, bus: EventBus, loop: asyncio.events.AbstractEventLoop) -> None:
2082  """Initialize state machine."""
2083  self._states_states = States()
2084  # _states_data is used to access the States backing dict directly to speed
2085  # up read operations
2086  self._states_data_states_data = self._states_states.data
2087  self._reservations: set[str] = set()
2088  self._bus_bus = bus
2089  self._loop_loop = loop
2090 
2091  def entity_ids(self, domain_filter: str | None = None) -> list[str]:
2092  """List of entity ids that are being tracked."""
2093  future = run_callback_threadsafe(
2094  self._loop_loop, self.async_entity_idsasync_entity_ids, domain_filter
2095  )
2096  return future.result()
2097 
2098  @callback
2100  self, domain_filter: str | Iterable[str] | None = None
2101  ) -> list[str]:
2102  """List of entity ids that are being tracked.
2103 
2104  This method must be run in the event loop.
2105  """
2106  if domain_filter is None:
2107  return list(self._states_data_states_data)
2108 
2109  if isinstance(domain_filter, str):
2110  return list(self._states_states.domain_entity_ids(domain_filter.lower()))
2111 
2112  entity_ids: list[str] = []
2113  for domain in domain_filter:
2114  entity_ids.extend(self._states_states.domain_entity_ids(domain))
2115  return entity_ids
2116 
2117  @callback
2119  self, domain_filter: str | Iterable[str] | None = None
2120  ) -> int:
2121  """Count the entity ids that are being tracked.
2122 
2123  This method must be run in the event loop.
2124  """
2125  if domain_filter is None:
2126  return len(self._states_data_states_data)
2127 
2128  if isinstance(domain_filter, str):
2129  return len(self._states_states.domain_entity_ids(domain_filter.lower()))
2130 
2131  return sum(
2132  len(self._states_states.domain_entity_ids(domain)) for domain in domain_filter
2133  )
2134 
2135  def all(self, domain_filter: str | Iterable[str] | None = None) -> list[State]:
2136  """Create a list of all states."""
2137  return run_callback_threadsafe(
2138  self._loop_loop, self.async_allasync_all, domain_filter
2139  ).result()
2140 
2141  @callback
2143  self, domain_filter: str | Iterable[str] | None = None
2144  ) -> list[State]:
2145  """Create a list of all states matching the filter.
2146 
2147  This method must be run in the event loop.
2148  """
2149  if domain_filter is None:
2150  return list(self._states_data_states_data.values())
2151 
2152  if isinstance(domain_filter, str):
2153  return list(self._states_states.domain_states(domain_filter.lower()))
2154 
2155  states: list[State] = []
2156  for domain in domain_filter:
2157  states.extend(self._states_states.domain_states(domain))
2158  return states
2159 
2160  def get(self, entity_id: str) -> State | None:
2161  """Retrieve state of entity_id or None if not found.
2162 
2163  Async friendly.
2164  """
2165  return self._states_data_states_data.get(entity_id) or self._states_data_states_data.get(
2166  entity_id.lower()
2167  )
2168 
2169  def is_state(self, entity_id: str, state: str) -> bool:
2170  """Test if entity exists and is in specified state.
2171 
2172  Async friendly.
2173  """
2174  state_obj = self.getget(entity_id)
2175  return state_obj is not None and state_obj.state == state
2176 
2177  def remove(self, entity_id: str) -> bool:
2178  """Remove the state of an entity.
2179 
2180  Returns boolean to indicate if an entity was removed.
2181  """
2182  return run_callback_threadsafe(
2183  self._loop_loop, self.async_removeasync_remove, entity_id
2184  ).result()
2185 
2186  @callback
2187  def async_remove(self, entity_id: str, context: Context | None = None) -> bool:
2188  """Remove the state of an entity.
2189 
2190  Returns boolean to indicate if an entity was removed.
2191 
2192  This method must be run in the event loop.
2193  """
2194  entity_id = entity_id.lower()
2195  old_state = self._states_states.pop(entity_id, None)
2196  self._reservations.discard(entity_id)
2197 
2198  if old_state is None:
2199  return False
2200 
2201  old_state.expire()
2202  state_changed_data: EventStateChangedData = {
2203  "entity_id": entity_id,
2204  "old_state": old_state,
2205  "new_state": None,
2206  }
2207  self._bus_bus.async_fire_internal(
2208  EVENT_STATE_CHANGED,
2209  state_changed_data,
2210  context=context,
2211  )
2212  return True
2213 
2214  def set(
2215  self,
2216  entity_id: str,
2217  new_state: str,
2218  attributes: Mapping[str, Any] | None = None,
2219  force_update: bool = False,
2220  context: Context | None = None,
2221  ) -> None:
2222  """Set the state of an entity, add entity if it does not exist.
2223 
2224  Attributes is an optional dict to specify attributes of this state.
2225 
2226  If you just update the attributes and not the state, last changed will
2227  not be affected.
2228  """
2229  run_callback_threadsafe(
2230  self._loop_loop,
2231  self.async_setasync_set,
2232  entity_id,
2233  new_state,
2234  attributes,
2235  force_update,
2236  context,
2237  ).result()
2238 
2239  @callback
2240  def async_reserve(self, entity_id: str) -> None:
2241  """Reserve a state in the state machine for an entity being added.
2242 
2243  This must not fire an event when the state is reserved.
2244 
2245  This avoids a race condition where multiple entities with the same
2246  entity_id are added.
2247  """
2248  entity_id = entity_id.lower()
2249  if entity_id in self._states_data_states_data or entity_id in self._reservations:
2250  raise HomeAssistantError(
2251  "async_reserve must not be called once the state is in the state"
2252  " machine."
2253  )
2254 
2255  self._reservations.add(entity_id)
2256 
2257  @callback
2258  def async_available(self, entity_id: str) -> bool:
2259  """Check to see if an entity_id is available to be used."""
2260  entity_id = entity_id.lower()
2261  return (
2262  entity_id not in self._states_data_states_data and entity_id not in self._reservations
2263  )
2264 
2265  @callback
2267  self,
2268  entity_id: str,
2269  new_state: str,
2270  attributes: Mapping[str, Any] | None = None,
2271  force_update: bool = False,
2272  context: Context | None = None,
2273  state_info: StateInfo | None = None,
2274  timestamp: float | None = None,
2275  ) -> None:
2276  """Set the state of an entity, add entity if it does not exist.
2277 
2278  Attributes is an optional dict to specify attributes of this state.
2279 
2280  If you just update the attributes and not the state, last changed will
2281  not be affected.
2282 
2283  This method must be run in the event loop.
2284  """
2285  self.async_set_internalasync_set_internal(
2286  entity_id.lower(),
2287  str(new_state),
2288  attributes or {},
2289  force_update,
2290  context,
2291  state_info,
2292  timestamp or time.time(),
2293  )
2294 
2295  @callback
2297  self,
2298  entity_id: str,
2299  new_state: str,
2300  attributes: Mapping[str, Any] | None,
2301  force_update: bool,
2302  context: Context | None,
2303  state_info: StateInfo | None,
2304  timestamp: float,
2305  ) -> None:
2306  """Set the state of an entity, add entity if it does not exist.
2307 
2308  This method is intended to only be used by core internally
2309  and should not be considered a stable API. We will make
2310  breaking changes to this function in the future and it
2311  should not be used in integrations.
2312 
2313  This method must be run in the event loop.
2314  """
2315  # Most cases the key will be in the dict
2316  # so we optimize for the happy path as
2317  # python 3.11+ has near zero overhead for
2318  # try when it does not raise an exception.
2319  old_state: State | None
2320  try:
2321  old_state = self._states_data_states_data[entity_id]
2322  except KeyError:
2323  old_state = None
2324  same_state = False
2325  same_attr = False
2326  last_changed = None
2327  else:
2328  same_state = old_state.state == new_state and not force_update
2329  same_attr = old_state.attributes == attributes
2330  last_changed = old_state.last_changed if same_state else None
2331 
2332  # It is much faster to convert a timestamp to a utc datetime object
2333  # than converting a utc datetime object to a timestamp since cpython
2334  # does not have a fast path for handling the UTC timezone and has to do
2335  # multiple local timezone conversions.
2336  #
2337  # from_timestamp implementation:
2338  # https://github.com/python/cpython/blob/c90a862cdcf55dc1753c6466e5fa4a467a13ae24/Modules/_datetimemodule.c#L2936
2339  #
2340  # timestamp implementation:
2341  # https://github.com/python/cpython/blob/c90a862cdcf55dc1753c6466e5fa4a467a13ae24/Modules/_datetimemodule.c#L6387
2342  # https://github.com/python/cpython/blob/c90a862cdcf55dc1753c6466e5fa4a467a13ae24/Modules/_datetimemodule.c#L6323
2343  now = dt_util.utc_from_timestamp(timestamp)
2344 
2345  if context is None:
2346  context = Context(id=ulid_at_time(timestamp))
2347 
2348  if same_state and same_attr:
2349  # mypy does not understand this is only possible if old_state is not None
2350  old_last_reported = old_state.last_reported # type: ignore[union-attr]
2351  old_state.last_reported = now # type: ignore[union-attr]
2352  old_state._cache["last_reported_timestamp"] = timestamp # type: ignore[union-attr] # noqa: SLF001
2353  # Avoid creating an EventStateReportedData
2354  self._bus_bus.async_fire_internal( # type: ignore[misc]
2355  EVENT_STATE_REPORTED,
2356  {
2357  "entity_id": entity_id,
2358  "old_last_reported": old_last_reported,
2359  "new_state": old_state,
2360  },
2361  context=context,
2362  time_fired=timestamp,
2363  )
2364  return
2365 
2366  if same_attr:
2367  if TYPE_CHECKING:
2368  assert old_state is not None
2369  attributes = old_state.attributes
2370 
2371  # This is intentionally called with positional only arguments for performance
2372  # reasons
2373  state = State(
2374  entity_id,
2375  new_state,
2376  attributes,
2377  last_changed,
2378  now,
2379  now,
2380  context,
2381  old_state is None,
2382  state_info,
2383  timestamp,
2384  )
2385  if old_state is not None:
2386  old_state.expire()
2387  self._states_states[entity_id] = state
2388  state_changed_data: EventStateChangedData = {
2389  "entity_id": entity_id,
2390  "old_state": old_state,
2391  "new_state": state,
2392  }
2393  self._bus_bus.async_fire_internal(
2394  EVENT_STATE_CHANGED,
2395  state_changed_data,
2396  context=context,
2397  time_fired=timestamp,
2398  )
2399 
2400 
2401 class SupportsResponse(enum.StrEnum):
2402  """Service call response configuration."""
2403 
2404  NONE = "none"
2405  """The service does not support responses (the default)."""
2406 
2407  OPTIONAL = "optional"
2408  """The service optionally returns response data when asked by the caller."""
2409 
2410  ONLY = "only"
2411  """The service is read-only and the caller must always ask for response data."""
2412 
2413 
2414 class Service:
2415  """Representation of a callable service."""
2416 
2417  __slots__ = ["job", "schema", "domain", "service", "supports_response"]
2418 
2420  self,
2421  func: Callable[
2422  [ServiceCall],
2423  Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
2424  | ServiceResponse
2425  | EntityServiceResponse
2426  | None,
2427  ],
2428  schema: VolSchemaType | None,
2429  domain: str,
2430  service: str,
2431  context: Context | None = None,
2432  supports_response: SupportsResponse = SupportsResponse.NONE,
2433  job_type: HassJobType | None = None,
2434  ) -> None:
2435  """Initialize a service."""
2436  self.jobjob = HassJob(func, f"service {domain}.{service}", job_type=job_type)
2437  self.schemaschema = schema
2438  self.supports_responsesupports_response = supports_response
2439 
2440 
2442  """Representation of a call to a service."""
2443 
2444  __slots__ = ("domain", "service", "data", "context", "return_response")
2445 
2447  self,
2448  domain: str,
2449  service: str,
2450  data: dict[str, Any] | None = None,
2451  context: Context | None = None,
2452  return_response: bool = False,
2453  ) -> None:
2454  """Initialize a service call."""
2455  self.domaindomain = domain
2456  self.serviceservice = service
2457  self.datadata = ReadOnlyDict(data or {})
2458  self.contextcontext = context or Context()
2459  self.return_responsereturn_response = return_response
2460 
2461  def __repr__(self) -> str:
2462  """Return the representation of the service."""
2463  if self.datadata:
2464  return (
2465  f"<ServiceCall {self.domain}.{self.service} "
2466  f"(c:{self.context.id}): {util.repr_helper(self.data)}>"
2467  )
2468 
2469  return f"<ServiceCall {self.domain}.{self.service} (c:{self.context.id})>"
2470 
2471 
2473  """Offer the services over the eventbus."""
2474 
2475  __slots__ = ("_services", "_hass")
2476 
2477  def __init__(self, hass: HomeAssistant) -> None:
2478  """Initialize a service registry."""
2479  self._services: dict[str, dict[str, Service]] = {}
2480  self._hass_hass = hass
2481 
2482  @property
2483  def services(self) -> dict[str, dict[str, Service]]:
2484  """Return dictionary with per domain a list of available services."""
2485  return run_callback_threadsafe(self._hass_hass.loop, self.async_servicesasync_services).result()
2486 
2487  @callback
2488  def async_services(self) -> dict[str, dict[str, Service]]:
2489  """Return dictionary with per domain a list of available services.
2490 
2491  This method makes a copy of the registry. This function is expensive,
2492  and should only be used if has_service is not sufficient.
2493 
2494  This method must be run in the event loop.
2495  """
2496  return {domain: service.copy() for domain, service in self._services.items()}
2497 
2498  @callback
2499  def async_services_for_domain(self, domain: str) -> dict[str, Service]:
2500  """Return dictionary with per domain a list of available services.
2501 
2502  This method makes a copy of the registry for the domain.
2503 
2504  This method must be run in the event loop.
2505  """
2506  return self._services.get(domain, {}).copy()
2507 
2508  @callback
2509  def async_services_internal(self) -> dict[str, dict[str, Service]]:
2510  """Return dictionary with per domain a list of available services.
2511 
2512  This method DOES NOT make a copy of the services like async_services does.
2513  It is only expected to be called from the Home Assistant internals
2514  as a performance optimization when the caller is not going to modify the
2515  returned data.
2516 
2517  This method must be run in the event loop.
2518  """
2519  return self._services
2520 
2521  def has_service(self, domain: str, service: str) -> bool:
2522  """Test if specified service exists.
2523 
2524  Async friendly.
2525  """
2526  return service.lower() in self._services.get(domain.lower(), [])
2527 
2528  def supports_response(self, domain: str, service: str) -> SupportsResponse:
2529  """Return whether or not the service supports response data.
2530 
2531  This exists so that callers can return more helpful error messages given
2532  the context. Will return NONE if the service does not exist as there is
2533  other error handling when calling the service if it does not exist.
2534  """
2535  if not (handler := self._services[domain.lower()][service.lower()]):
2536  return SupportsResponse.NONE
2537  return handler.supports_response
2538 
2540  self,
2541  domain: str,
2542  service: str,
2543  service_func: Callable[
2544  [ServiceCall],
2545  Coroutine[Any, Any, ServiceResponse] | ServiceResponse | None,
2546  ],
2547  schema: vol.Schema | None = None,
2548  supports_response: SupportsResponse = SupportsResponse.NONE,
2549  ) -> None:
2550  """Register a service.
2551 
2552  Schema is called to coerce and validate the service data.
2553  """
2554  run_callback_threadsafe(
2555  self._hass_hass.loop,
2556  self._async_register_async_register,
2557  domain,
2558  service,
2559  service_func,
2560  schema,
2561  supports_response,
2562  ).result()
2563 
2564  @callback
2566  self,
2567  domain: str,
2568  service: str,
2569  service_func: Callable[
2570  [ServiceCall],
2571  Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
2572  | ServiceResponse
2573  | EntityServiceResponse
2574  | None,
2575  ],
2576  schema: VolSchemaType | None = None,
2577  supports_response: SupportsResponse = SupportsResponse.NONE,
2578  job_type: HassJobType | None = None,
2579  ) -> None:
2580  """Register a service.
2581 
2582  Schema is called to coerce and validate the service data.
2583 
2584  This method must be run in the event loop.
2585  """
2586  self._hass_hass.verify_event_loop_thread("hass.services.async_register")
2587  self._async_register_async_register(
2588  domain, service, service_func, schema, supports_response, job_type
2589  )
2590 
2591  @callback
2593  self,
2594  domain: str,
2595  service: str,
2596  service_func: Callable[
2597  [ServiceCall],
2598  Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
2599  | ServiceResponse
2600  | EntityServiceResponse
2601  | None,
2602  ],
2603  schema: VolSchemaType | None = None,
2604  supports_response: SupportsResponse = SupportsResponse.NONE,
2605  job_type: HassJobType | None = None,
2606  ) -> None:
2607  """Register a service.
2608 
2609  Schema is called to coerce and validate the service data.
2610 
2611  This method must be run in the event loop.
2612  """
2613  domain = domain.lower()
2614  service = service.lower()
2615  service_obj = Service(
2616  service_func,
2617  schema,
2618  domain,
2619  service,
2620  supports_response=supports_response,
2621  job_type=job_type,
2622  )
2623 
2624  if domain in self._services:
2625  self._services[domain][service] = service_obj
2626  else:
2627  self._services[domain] = {service: service_obj}
2628 
2629  self._hass_hass.bus.async_fire_internal(
2630  EVENT_SERVICE_REGISTERED, {ATTR_DOMAIN: domain, ATTR_SERVICE: service}
2631  )
2632 
2633  def remove(self, domain: str, service: str) -> None:
2634  """Remove a registered service from service handler."""
2635  run_callback_threadsafe(
2636  self._hass_hass.loop, self._async_remove_async_remove, domain, service
2637  ).result()
2638 
2639  @callback
2640  def async_remove(self, domain: str, service: str) -> None:
2641  """Remove a registered service from service handler.
2642 
2643  This method must be run in the event loop.
2644  """
2645  self._hass_hass.verify_event_loop_thread("hass.services.async_remove")
2646  self._async_remove_async_remove(domain, service)
2647 
2648  @callback
2649  def _async_remove(self, domain: str, service: str) -> None:
2650  """Remove a registered service from service handler.
2651 
2652  This method must be run in the event loop.
2653  """
2654  domain = domain.lower()
2655  service = service.lower()
2656 
2657  if service not in self._services.get(domain, {}):
2658  _LOGGER.warning("Unable to remove unknown service %s/%s", domain, service)
2659  return
2660 
2661  self._services[domain].pop(service)
2662 
2663  if not self._services[domain]:
2664  self._services.pop(domain)
2665 
2666  self._hass_hass.bus.async_fire_internal(
2667  EVENT_SERVICE_REMOVED, {ATTR_DOMAIN: domain, ATTR_SERVICE: service}
2668  )
2669 
2670  def call(
2671  self,
2672  domain: str,
2673  service: str,
2674  service_data: dict[str, Any] | None = None,
2675  blocking: bool = False,
2676  context: Context | None = None,
2677  target: dict[str, Any] | None = None,
2678  return_response: bool = False,
2679  ) -> ServiceResponse:
2680  """Call a service.
2681 
2682  See description of async_call for details.
2683  """
2684  return asyncio.run_coroutine_threadsafe(
2685  self.async_callasync_call(
2686  domain,
2687  service,
2688  service_data,
2689  blocking,
2690  context,
2691  target,
2692  return_response,
2693  ),
2694  self._hass_hass.loop,
2695  ).result()
2696 
2697  async def async_call(
2698  self,
2699  domain: str,
2700  service: str,
2701  service_data: dict[str, Any] | None = None,
2702  blocking: bool = False,
2703  context: Context | None = None,
2704  target: dict[str, Any] | None = None,
2705  return_response: bool = False,
2706  ) -> ServiceResponse:
2707  """Call a service.
2708 
2709  Specify blocking=True to wait until service is executed.
2710 
2711  If return_response=True, indicates that the caller can consume return values
2712  from the service, if any. Return values are a dict that can be returned by the
2713  standard JSON serialization process. Return values can only be used with blocking=True.
2714 
2715  This method will fire an event to indicate the service has been called.
2716 
2717  Because the service is sent as an event you are not allowed to use
2718  the keys ATTR_DOMAIN and ATTR_SERVICE in your service_data.
2719 
2720  This method is a coroutine.
2721  """
2722  context = context or Context()
2723  service_data = service_data or {}
2724 
2725  try:
2726  handler = self._services[domain][service]
2727  except KeyError:
2728  # Almost all calls are already lower case, so we avoid
2729  # calling lower() on the arguments in the common case.
2730  domain = domain.lower()
2731  service = service.lower()
2732  try:
2733  handler = self._services[domain][service]
2734  except KeyError:
2735  raise ServiceNotFound(domain, service) from None
2736 
2737  if return_response:
2738  if not blocking:
2739  raise ServiceValidationError(
2740  translation_domain=DOMAIN,
2741  translation_key="service_should_be_blocking",
2742  translation_placeholders={
2743  "return_response": "return_response=True",
2744  "non_blocking_argument": "blocking=False",
2745  },
2746  )
2747  if handler.supports_response is SupportsResponse.NONE:
2748  raise ServiceValidationError(
2749  translation_domain=DOMAIN,
2750  translation_key="service_does_not_support_response",
2751  translation_placeholders={
2752  "return_response": "return_response=True"
2753  },
2754  )
2755  elif handler.supports_response is SupportsResponse.ONLY:
2756  raise ServiceValidationError(
2757  translation_domain=DOMAIN,
2758  translation_key="service_lacks_response_request",
2759  translation_placeholders={"return_response": "return_response=True"},
2760  )
2761 
2762  if target:
2763  service_data.update(target)
2764 
2765  if handler.schema:
2766  try:
2767  processed_data: dict[str, Any] = handler.schema(service_data)
2768  except vol.Invalid:
2769  _LOGGER.debug(
2770  "Invalid data for service call %s.%s: %s",
2771  domain,
2772  service,
2773  service_data,
2774  )
2775  raise
2776  else:
2777  processed_data = service_data
2778 
2779  service_call = ServiceCall(
2780  domain, service, processed_data, context, return_response
2781  )
2782 
2783  self._hass_hass.bus.async_fire_internal(
2784  EVENT_CALL_SERVICE,
2785  {
2786  ATTR_DOMAIN: domain,
2787  ATTR_SERVICE: service,
2788  ATTR_SERVICE_DATA: service_data,
2789  },
2790  context=context,
2791  )
2792 
2793  coro = self._execute_service_execute_service(handler, service_call)
2794  if not blocking:
2795  self._hass_hass.async_create_task_internal(
2796  self._run_service_call_catch_exceptions_run_service_call_catch_exceptions(coro, service_call),
2797  f"service call background {service_call.domain}.{service_call.service}",
2798  eager_start=True,
2799  )
2800  return None
2801 
2802  response_data = await coro
2803  if not return_response:
2804  return None
2805  if not isinstance(response_data, dict):
2806  raise HomeAssistantError(
2807  translation_domain=DOMAIN,
2808  translation_key="service_reponse_invalid",
2809  translation_placeholders={
2810  "response_data_type": str(type(response_data))
2811  },
2812  )
2813  return response_data
2814 
2816  self,
2817  coro_or_task: Coroutine[Any, Any, Any] | asyncio.Task[Any],
2818  service_call: ServiceCall,
2819  ) -> None:
2820  """Run service call in background, catching and logging any exceptions."""
2821  try:
2822  await coro_or_task
2823  except Unauthorized:
2824  _LOGGER.warning(
2825  "Unauthorized service called %s/%s",
2826  service_call.domain,
2827  service_call.service,
2828  )
2829  except asyncio.CancelledError:
2830  _LOGGER.debug("Service was cancelled: %s", service_call)
2831  except Exception:
2832  _LOGGER.exception("Error executing service: %s", service_call)
2833 
2834  async def _execute_service(
2835  self, handler: Service, service_call: ServiceCall
2836  ) -> ServiceResponse:
2837  """Execute a service."""
2838  job = handler.job
2839  target = job.target
2840  if job.job_type is HassJobType.Coroutinefunction:
2841  if TYPE_CHECKING:
2842  target = cast(
2843  Callable[..., Coroutine[Any, Any, ServiceResponse]], target
2844  )
2845  return await target(service_call)
2846  if job.job_type is HassJobType.Callback:
2847  if TYPE_CHECKING:
2848  target = cast(Callable[..., ServiceResponse], target)
2849  return target(service_call)
2850  if TYPE_CHECKING:
2851  target = cast(Callable[..., ServiceResponse], target)
2852  return await self._hass_hass.async_add_executor_job(target, service_call)
2853 
2854 
2855 # These can be removed if no deprecated constant are in this module anymore
2856 __getattr__ = functools.partial(check_if_deprecated_constant, module_globals=globals())
2857 __dir__ = functools.partial(
2858  dir_with_deprecated_constants, module_globals_keys=[*globals().keys()]
2859 )
2860 __all__ = all_with_deprecated_constants(globals())
json_fragment json_fragment(self)
Definition: core.py:1309
ReadOnlyDict[str, str|None] _as_read_only_dict(self)
Definition: core.py:1304
None __init__(self, str|None user_id=None, str|None parent_id=None, str|None id=None)
Definition: core.py:1266
bool __eq__(self, object other)
Definition: core.py:1274
ReadOnlyDict[str, str|None] as_dict(self)
Definition: core.py:1299
dict[str, str|None] _as_dict(self)
Definition: core.py:1287
Context __deepcopy__(self, dict[int, Any] memo)
Definition: core.py:1282
Context __copy__(self)
Definition: core.py:1278
CALLBACK_TYPE async_listen(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener, Callable[[_DataT], bool]|None event_filter=None, bool|object run_immediately=_SENTINEL)
Definition: core.py:1618
None async_fire(self, EventType[_DataT]|str event_type, _DataT|None event_data=None, EventOrigin origin=EventOrigin.local, Context|None context=None, float|None time_fired=None)
Definition: core.py:1524
CALLBACK_TYPE _async_listen_filterable_job(self, EventType[_DataT]|str event_type, _FilterableJobType[_DataT] filterable_job)
Definition: core.py:1659
None _async_remove_listener(self, EventType[_DataT]|str event_type, _FilterableJobType[_DataT] filterable_job)
Definition: core.py:1736
None async_fire_internal(self, EventType[_DataT]|str event_type, _DataT|None event_data=None, EventOrigin origin=EventOrigin.local, Context|None context=None, float|None time_fired=None)
Definition: core.py:1546
CALLBACK_TYPE async_listen_once(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener, bool|object run_immediately=_SENTINEL)
Definition: core.py:1694
None __init__(self, HomeAssistant hass)
Definition: core.py:1474
None fire(self, EventType[_DataT]|str event_type, _DataT|None event_data=None, EventOrigin origin=EventOrigin.local, Context|None context=None)
Definition: core.py:1509
None _async_logging_changed(self, Event|None event=None)
Definition: core.py:1486
dict[EventType[Any]|str, int] listeners(self)
Definition: core.py:1499
dict[EventType[Any]|str, int] async_listeners(self)
Definition: core.py:1491
CALLBACK_TYPE listen_once(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener)
Definition: core.py:1670
CALLBACK_TYPE listen(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener)
Definition: core.py:1595
ReadOnlyDict[str, Any] as_dict(self)
Definition: core.py:1385
datetime.datetime time_fired(self)
Definition: core.py:1363
str __repr__(self)
Definition: core.py:1413
None __init__(self, EventType[_DataT]|str event_type, _DataT|None data=None, EventOrigin origin=EventOrigin.local, float|None time_fired_timestamp=None, Context|None context=None)
Definition: core.py:1349
ReadOnlyDict[str, Any] _as_read_only_dict(self)
Definition: core.py:1393
json_fragment json_fragment(self)
Definition: core.py:1409
dict[str, Any] _as_dict(self)
Definition: core.py:1368
None __init__(self, Callable[_P, _R_co] target, str|None name=None, *bool|None cancel_on_shutdown=None, HassJobType|None job_type=None)
Definition: core.py:353
HassJobType job_type(self)
Definition: core.py:365
bool|None cancel_on_shutdown(self)
Definition: core.py:370
Self __new__(cls, str config_dir)
Definition: core.py:425
None _async_log_running_tasks(self, str stage)
Definition: core.py:1250
CALLBACK_TYPE async_add_shutdown_job(self, HassJob[..., Coroutine[Any, Any, Any]] hassjob, *Any args)
Definition: core.py:1058
int async_run(self, *bool attach_signals=True)
Definition: core.py:521
None async_block_till_done(self, bool wait_background_tasks=False)
Definition: core.py:1009
None _await_and_log_pending(self, Collection[asyncio.Future[Any]] pending)
Definition: core.py:1043
set[asyncio.Future[Any]] _active_tasks(self)
Definition: core.py:477
None __init__(self, str config_dir)
Definition: core.py:435
None set_state(self, CoreState state)
Definition: core.py:498
None _cancel_cancellable_timers(self)
Definition: core.py:1239
None block_till_done(self, bool wait_background_tasks=False)
Definition: core.py:1002
None async_stop(self, int exit_code=0, *bool force=False)
Definition: core.py:1098
None create_task(self, Coroutine[Any, Any, Any] target, str|None name=None)
Definition: core.py:790
None verify_event_loop_thread(self, str what)
Definition: core.py:468
None __init__(self, str domain, str service, dict[str, Any]|None data=None, Context|None context=None, bool return_response=False)
Definition: core.py:2453
None register(self, str domain, str service, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse]|ServiceResponse|None,] service_func, vol.Schema|None schema=None, SupportsResponse supports_response=SupportsResponse.NONE)
Definition: core.py:2549
dict[str, dict[str, Service]] async_services_internal(self)
Definition: core.py:2509
bool has_service(self, str domain, str service)
Definition: core.py:2521
None remove(self, str domain, str service)
Definition: core.py:2633
dict[str, dict[str, Service]] async_services(self)
Definition: core.py:2488
ServiceResponse call(self, str domain, str service, dict[str, Any]|None service_data=None, bool blocking=False, Context|None context=None, dict[str, Any]|None target=None, bool return_response=False)
Definition: core.py:2679
None _async_remove(self, str domain, str service)
Definition: core.py:2649
None _async_register(self, str domain, str service, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse|EntityServiceResponse]|ServiceResponse|EntityServiceResponse|None,] service_func, VolSchemaType|None schema=None, SupportsResponse supports_response=SupportsResponse.NONE, HassJobType|None job_type=None)
Definition: core.py:2606
dict[str, Service] async_services_for_domain(self, str domain)
Definition: core.py:2499
SupportsResponse supports_response(self, str domain, str service)
Definition: core.py:2528
None async_register(self, str domain, str service, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse|EntityServiceResponse]|ServiceResponse|EntityServiceResponse|None,] service_func, VolSchemaType|None schema=None, SupportsResponse supports_response=SupportsResponse.NONE, HassJobType|None job_type=None)
Definition: core.py:2579
None _run_service_call_catch_exceptions(self, Coroutine[Any, Any, Any]|asyncio.Task[Any] coro_or_task, ServiceCall service_call)
Definition: core.py:2819
None async_remove(self, str domain, str service)
Definition: core.py:2640
ServiceResponse _execute_service(self, Service handler, ServiceCall service_call)
Definition: core.py:2836
ServiceResponse async_call(self, str domain, str service, dict[str, Any]|None service_data=None, bool blocking=False, Context|None context=None, dict[str, Any]|None target=None, bool return_response=False)
Definition: core.py:2706
None __init__(self, HomeAssistant hass)
Definition: core.py:2477
dict[str, dict[str, Service]] services(self)
Definition: core.py:2483
None __init__(self, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse|EntityServiceResponse]|ServiceResponse|EntityServiceResponse|None,] func, VolSchemaType|None schema, str domain, str service, Context|None context=None, SupportsResponse supports_response=SupportsResponse.NONE, HassJobType|None job_type=None)
Definition: core.py:2434
list[str] async_entity_ids(self, str|Iterable[str]|None domain_filter=None)
Definition: core.py:2101
None __init__(self, EventBus bus, asyncio.events.AbstractEventLoop loop)
Definition: core.py:2081
bool is_state(self, str entity_id, str state)
Definition: core.py:2169
None async_reserve(self, str entity_id)
Definition: core.py:2240
bool async_remove(self, str entity_id, Context|None context=None)
Definition: core.py:2187
None set(self, str entity_id, str new_state, Mapping[str, Any]|None attributes=None, bool force_update=False, Context|None context=None)
Definition: core.py:2221
None async_set_internal(self, str entity_id, str new_state, Mapping[str, Any]|None attributes, bool force_update, Context|None context, StateInfo|None state_info, float timestamp)
Definition: core.py:2305
None async_set(self, str entity_id, str new_state, Mapping[str, Any]|None attributes=None, bool force_update=False, Context|None context=None, StateInfo|None state_info=None, float|None timestamp=None)
Definition: core.py:2275
State|None get(self, str entity_id)
Definition: core.py:2160
list[State] all(self, str|Iterable[str]|None domain_filter=None)
Definition: core.py:2135
list[State] async_all(self, str|Iterable[str]|None domain_filter=None)
Definition: core.py:2144
bool async_available(self, str entity_id)
Definition: core.py:2258
bool remove(self, str entity_id)
Definition: core.py:2177
list[str] entity_ids(self, str|None domain_filter=None)
Definition: core.py:2091
int async_entity_ids_count(self, str|Iterable[str]|None domain_filter=None)
Definition: core.py:2120
bytes as_compressed_state_json(self)
Definition: core.py:1961
Self|None from_dict(cls, dict[str, Any] json_dict)
Definition: core.py:1971
CompressedState as_compressed_state(self)
Definition: core.py:1933
ReadOnlyDict[str, datetime.datetime|Collection[Any]] as_dict(self)
Definition: core.py:1897
json_fragment json_fragment(self)
Definition: core.py:1928
ReadOnlyDict[str, datetime.datetime|Collection[Any]] _as_read_only_dict(self)
Definition: core.py:1910
None __init__(self, str entity_id, str state, Mapping[str, Any]|None attributes=None, datetime.datetime|None last_changed=None, datetime.datetime|None last_reported=None, datetime.datetime|None last_updated=None, Context|None context=None, bool|None validate_entity_id=True, StateInfo|None state_info=None, float|None last_updated_timestamp=None)
Definition: core.py:1806
float last_reported_timestamp(self)
Definition: core.py:1862
float last_changed_timestamp(self)
Definition: core.py:1857
bytes as_dict_json(self)
Definition: core.py:1923
None expire(self)
Definition: core.py:2006
dict[str, Any] _as_dict(self)
Definition: core.py:1867
str __repr__(self)
Definition: core.py:2024
ValuesView[State]|tuple[()] domain_states(self, str key)
Definition: core.py:2068
ValuesView[State] values(self)
Definition: core.py:2046
None __setitem__(self, str key, State entry)
Definition: core.py:2050
None __init__(self)
Definition: core.py:2041
KeysView[str]|tuple[()] domain_entity_ids(self, str key)
Definition: core.py:2061
None __delitem__(self, str key)
Definition: core.py:2055
None __call__(self, Event[_DataT] event)
Definition: core.py:1441
bool add(self, _T matcher)
Definition: match.py:185
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
str _event_repr(EventType[_DataT]|str event_type, EventOrigin origin, _DataT|None data)
Definition: core.py:1420
bool is_callback_check_partial(Callable[..., Any] target)
Definition: core.py:264
bool valid_entity_id(str entity_id)
Definition: core.py:235
HomeAssistant|None async_get_hass_or_none()
Definition: core.py:299
None _verify_event_type_length_or_raise(EventType[_DataT]|str event_type)
Definition: core.py:1463
Any _deprecated_core_config()
Definition: core.py:188
HomeAssistant async_get_hass()
Definition: core.py:286
HassJobType get_hassjob_callable_job_type(Callable[..., Any] target)
Definition: core.py:387
bool valid_domain(str domain)
Definition: core.py:229
tuple[str, str] split_entity_id(str entity_id)
Definition: core.py:214
str validate_state(str state)
Definition: core.py:243
bool is_callback(Callable[..., Any] func)
Definition: core.py:259
ReleaseChannel get_release_channel()
Definition: core.py:315
list[str] all_with_deprecated_constants(dict[str, Any] module_globals)
Definition: deprecation.py:356
None async_register_signal_handling(HomeAssistant hass)
Definition: signal.py:19
list[TimerHandle] get_scheduled_timer_handles(AbstractEventLoop loop)
Definition: async_.py:137
bool cancelling(Future[Any] task)
Definition: async_.py:48
None shutdown_run_callback_threadsafe(AbstractEventLoop loop)
Definition: async_.py:120