1 """Core components of Home Assistant.
3 Home Assistant is a Home Automation framework for observing the state
4 of entities and react to changes.
7 from __future__
import annotations
10 from collections
import UserDict, defaultdict
11 from collections.abc
import (
20 import concurrent.futures
21 from dataclasses
import dataclass
30 from time
import monotonic
43 from propcache
import cached_property, under_cached_property
44 from typing_extensions
import TypeVar
45 import voluptuous
as vol
53 COMPRESSED_STATE_ATTRIBUTES,
54 COMPRESSED_STATE_CONTEXT,
55 COMPRESSED_STATE_LAST_CHANGED,
56 COMPRESSED_STATE_LAST_UPDATED,
57 COMPRESSED_STATE_STATE,
59 EVENT_CORE_CONFIG_UPDATE,
60 EVENT_HOMEASSISTANT_CLOSE,
61 EVENT_HOMEASSISTANT_FINAL_WRITE,
62 EVENT_HOMEASSISTANT_START,
63 EVENT_HOMEASSISTANT_STARTED,
64 EVENT_HOMEASSISTANT_STOP,
65 EVENT_LOGGING_CHANGED,
66 EVENT_SERVICE_REGISTERED,
67 EVENT_SERVICE_REMOVED,
71 MAX_EXPECTED_ENTITY_IDS,
72 MAX_LENGTH_EVENT_EVENT_TYPE,
73 MAX_LENGTH_STATE_STATE,
76 from .exceptions
import (
78 InvalidEntityFormatError,
82 ServiceValidationError,
85 from .helpers.deprecation
import (
86 DeferredDeprecatedAlias,
87 DeprecatedConstantEnum,
88 EnumWithDeprecatedMembers,
89 all_with_deprecated_constants,
90 check_if_deprecated_constant,
91 dir_with_deprecated_constants,
93 from .helpers.json
import json_bytes, json_fragment
94 from .helpers.typing
import VolSchemaType
95 from .util
import dt
as dt_util
96 from .util.async_
import (
99 get_scheduled_timer_handles,
100 run_callback_threadsafe,
101 shutdown_run_callback_threadsafe,
103 from .util.event_type
import EventType
104 from .util.executor
import InterruptibleThreadPoolExecutor
105 from .util.hass_dict
import HassDict
106 from .util.json
import JsonObjectType
107 from .util.read_only_dict
import ReadOnlyDict
108 from .util.timeout
import TimeoutManager
109 from .util.ulid
import ulid_at_time, ulid_now
113 from .auth
import AuthManager
114 from .components.http
import HomeAssistantHTTP
115 from .config_entries
import ConfigEntries
116 from .helpers.entity
import StateInfo
118 STOPPING_STAGE_SHUTDOWN_TIMEOUT = 20
119 STOP_STAGE_SHUTDOWN_TIMEOUT = 100
120 FINAL_WRITE_STAGE_SHUTDOWN_TIMEOUT = 60
121 CLOSE_STAGE_SHUTDOWN_TIMEOUT = 30
125 _DataT = TypeVar(
"_DataT", bound=Mapping[str, Any], default=Mapping[str, Any])
126 type CALLBACK_TYPE = Callable[[],
None]
128 DOMAIN =
"homeassistant"
131 BLOCK_LOG_TIMEOUT = 60
133 type ServiceResponse = JsonObjectType |
None
134 type EntityServiceResponse = dict[str, ServiceResponse]
139 metaclass=EnumWithDeprecatedMembers,
141 "DEFAULT": (
"core_config.ConfigSource.DEFAULT",
"2025.11.0"),
142 "DISCOVERED": (
"core_config.ConfigSource.DISCOVERED",
"2025.11.0"),
143 "STORAGE": (
"core_config.ConfigSource.STORAGE",
"2025.11.0"),
144 "YAML": (
"core_config.ConfigSource.YAML",
"2025.11.0"),
147 """Source of core configuration."""
150 DISCOVERED =
"discovered"
156 """Base class for EVENT_STATE_CHANGED and EVENT_STATE_REPORTED data."""
159 new_state: State |
None
163 """EVENT_STATE_CHANGED data.
165 A state changed event is fired when on state write the state is changed.
168 old_state: State |
None
172 """EVENT_STATE_REPORTED data.
174 A state reported event is fired when on state write the state is unchanged.
177 old_last_reported: datetime.datetime
182 ConfigSource.DISCOVERED,
"2025.1"
190 from .
import core_config
197 _deprecated_core_config,
"homeassistant.core_config.Config",
"2025.11"
202 TIMEOUT_EVENT_START = 15
205 EVENTS_EXCLUDED_FROM_MATCH_ALL = {
206 EVENT_HOMEASSISTANT_CLOSE,
207 EVENT_STATE_REPORTED,
210 _LOGGER = logging.getLogger(__name__)
213 @functools.lru_cache(MAX_EXPECTED_ENTITY_IDS)
215 """Split a state entity ID into domain and object ID."""
216 domain, _, object_id = entity_id.partition(
".")
217 if not domain
or not object_id:
218 raise ValueError(f
"Invalid entity ID {entity_id}")
219 return domain, object_id
222 _OBJECT_ID =
r"(?!_)[\da-z_]+(?<!_)"
223 _DOMAIN =
r"(?!.+__)" + _OBJECT_ID
224 VALID_DOMAIN = re.compile(
r"^" + _DOMAIN +
r"$")
225 VALID_ENTITY_ID = re.compile(
r"^" + _DOMAIN +
r"\." + _OBJECT_ID +
r"$")
228 @functools.lru_cache(64)
230 """Test if a domain a valid format."""
231 return VALID_DOMAIN.match(domain)
is not None
234 @functools.lru_cache(512)
236 """Test if an entity ID is a valid format.
238 Format: <domain>.<entity> where both are slugs.
240 return VALID_ENTITY_ID.match(entity_id)
is not None
244 """Validate a state, raise if it not valid."""
245 if len(state) > MAX_LENGTH_STATE_STATE:
247 f
"Invalid state with length {len(state)}. "
248 "State max length is 255 characters."
253 def callback[_CallableT: Callable[..., Any]](func: _CallableT) -> _CallableT:
254 """Annotation to mark method as safe to call from within the event loop."""
255 setattr(func,
"_hass_callback",
True)
260 """Check if function is safe to be called in the event loop."""
261 return getattr(func,
"_hass_callback",
False)
is True
265 """Check if function is safe to be called in the event loop.
267 This version of is_callback will also check if the target is a partial
268 and walk the chain of partials to find the original function.
270 check_target = target
271 while isinstance(check_target, functools.partial):
272 check_target = check_target.func
277 """Container which makes a HomeAssistant instance available to the event loop."""
279 hass: HomeAssistant |
None =
None
287 """Return the HomeAssistant instance.
289 Raises HomeAssistantError when called from the wrong thread.
291 This should be used where it's very cumbersome or downright impossible to pass
292 hass to the code which needs it.
300 """Return the HomeAssistant instance or None.
302 Returns None when called from the wrong thread.
316 """Find release channel based on version number."""
317 version = __version__
318 if "dev0" in version:
319 return ReleaseChannel.DEV
321 return ReleaseChannel.NIGHTLY
323 return ReleaseChannel.BETA
324 return ReleaseChannel.STABLE
329 """Represent a job type."""
331 Coroutinefunction = 1
337 """Represent a job to be run later.
339 We check the callable type in advance
340 so we can avoid checking it every time
344 __slots__ = (
"target",
"name",
"_cancel_on_shutdown",
"_cache")
348 target: Callable[_P, _R_co],
349 name: str |
None =
None,
351 cancel_on_shutdown: bool |
None =
None,
352 job_type: HassJobType |
None =
None,
354 """Create a job object."""
355 self.target: Final = target
358 self._cache: dict[str, Any] = {}
362 self._cache[
"job_type"] = job_type
364 @under_cached_property
366 """Return the job type."""
371 """Return if the job should be cancelled on shutdown."""
375 """Return the job."""
376 return f
"<Job {self.name} {self.job_type} {self.target}>"
379 @dataclass(frozen=True)
381 """Container for a HassJob and arguments."""
383 job: HassJob[..., Coroutine[Any, Any, Any] | Any]
388 """Determine the job type from the callable."""
390 check_target = target
391 while isinstance(check_target, functools.partial):
392 check_target = check_target.func
394 if asyncio.iscoroutinefunction(check_target):
395 return HassJobType.Coroutinefunction
397 return HassJobType.Callback
398 if asyncio.iscoroutine(check_target):
399 raise ValueError(
"Coroutine not allowed to be passed to HassJob")
400 return HassJobType.Executor
404 """Represent the current state of Home Assistant."""
406 not_running =
"NOT_RUNNING"
407 starting =
"STARTING"
409 stopping =
"STOPPING"
410 final_write =
"FINAL_WRITE"
414 """Return the event."""
419 """Root object of the Home Assistant home automation."""
422 http: HomeAssistantHTTP =
None
423 config_entries: ConfigEntries =
None
426 """Set the _hass thread local data."""
432 """Return the representation."""
433 return f
"<HomeAssistant {self.state}>"
436 """Initialize new Home Assistant object."""
441 from .core_config
import Config
445 self.
looploop = asyncio.get_running_loop()
446 self.
_tasks_tasks: set[asyncio.Future[Any]] = set()
447 self._background_tasks: set[asyncio.Future[Any]] = set()
451 self.
configconfig = Config(self, config_dir)
452 self.
configconfig.async_initialize()
455 self.
statestate: CoreState = CoreState.not_running
458 self.
_stopped_stopped: asyncio.Event |
None =
None
461 self.
_stop_future_stop_future: concurrent.futures.Future[
None] |
None =
None
462 self._shutdown_jobs: list[HassJobWithArgs] = []
464 max_workers=1, thread_name_prefix=
"ImportExecutor"
469 """Report and raise if we are not running in the event loop thread."""
472 from .helpers
import frame
474 frame.report_non_thread_safe_operation(what)
478 """Return all active tasks.
480 This property is used in bootstrap to log all active tasks
481 so we can identify what is blocking startup.
483 This property is marked as private to avoid accidental use
484 as it is not guaranteed to be present in future versions.
490 """Return if Home Assistant is running."""
491 return self.
statestate
in (CoreState.starting, CoreState.running)
495 """Return if Home Assistant is stopping."""
496 return self.
statestate
in (CoreState.stopping, CoreState.final_write)
499 """Set the current state."""
501 for prop
in (
"is_running",
"is_stopping"):
502 self.__dict__.pop(prop,
None)
505 """Start Home Assistant.
507 Note: This function is only used for testing.
508 For regular use, use "await hass.run()".
511 _future = asyncio.run_coroutine_threadsafe(self.
async_startasync_start(), self.
looploop)
514 _LOGGER.info(
"Starting Home Assistant core loop")
515 self.
looploop.run_forever()
521 async
def async_run(self, *, attach_signals: bool =
True) -> int:
522 """Home Assistant main entry point.
524 Start Home Assistant and block until stopped.
526 This method is a coroutine.
528 if self.
statestate
is not CoreState.not_running:
529 raise RuntimeError(
"Home Assistant is already running")
537 from .helpers.signal
import async_register_signal_handling
545 """Finalize startup from inside the event loop.
547 This method is a coroutine.
549 _LOGGER.info(
"Starting Home Assistant")
551 self.
set_stateset_state(CoreState.starting)
552 self.
busbus.async_fire_internal(EVENT_CORE_CONFIG_UPDATE)
553 self.
busbus.async_fire_internal(EVENT_HOMEASSISTANT_START)
556 pending: set[asyncio.Future[Any]] |
None =
None
558 _done, pending = await asyncio.wait(
559 self.
_tasks_tasks, timeout=TIMEOUT_EVENT_START
565 "Something is blocking Home Assistant from wrapping up the start up"
566 " phase. We're going to continue anyway. Please report the"
568 " https://github.com/home-assistant/core/issues: %s"
569 " The system is waiting for tasks: %s"
571 ", ".join(self.
configconfig.components),
576 await asyncio.sleep(0)
578 if self.
statestate
is not CoreState.starting:
580 "Home Assistant startup has been interrupted. "
581 "Its state may be inconsistent"
585 self.
set_stateset_state(CoreState.running)
586 self.
busbus.async_fire_internal(EVENT_CORE_CONFIG_UPDATE)
587 self.
busbus.async_fire_internal(EVENT_HOMEASSISTANT_STARTED)
590 self, target: Callable[[*_Ts], Any] | Coroutine[Any, Any, Any], *args: *_Ts
592 """Add a job to be executed by the event loop or by an executor.
594 If the job is either a coroutine or decorated with @callback, it will be
595 run by the event loop, if not it will be run by an executor.
597 target: target to call.
598 args: parameters for method to call.
601 raise ValueError(
"Don't call add_job with None")
602 if asyncio.iscoroutine(target):
603 self.
looploop.call_soon_threadsafe(
604 functools.partial(self.async_create_task, target, eager_start=
True)
607 self.
looploop.call_soon_threadsafe(
608 functools.partial(self._async_add_hass_job,
HassJob(target), *args)
613 def async_add_job[_R, *_Ts](
615 target: Callable[[*_Ts], Coroutine[Any, Any, _R]],
617 eager_start: bool =
False,
618 ) -> asyncio.Future[_R] |
None: ...
622 def async_add_job[_R, *_Ts](
624 target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R],
626 eager_start: bool =
False,
627 ) -> asyncio.Future[_R] |
None: ...
631 def async_add_job[_R](
633 target: Coroutine[Any, Any, _R],
635 eager_start: bool =
False,
636 ) -> asyncio.Future[_R] |
None: ...
639 def async_add_job[_R, *_Ts](
641 target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R]
642 | Coroutine[Any, Any, _R],
644 eager_start: bool =
False,
645 ) -> asyncio.Future[_R] |
None:
646 """Add a job to be executed by the event loop or by an executor.
648 If the job is either a coroutine or decorated with @callback, it will be
649 run by the event loop, if not it will be run by an executor.
651 This method must be run in the event loop.
653 target: target to call.
654 args: parameters for method to call.
657 from .helpers
import frame
660 "calls `async_add_job`, which should be reviewed against "
661 "https://developers.home-assistant.io/blog/2024/03/13/deprecate_add_run_job"
662 " for replacement options",
663 core_behavior=frame.ReportBehavior.LOG,
664 breaks_in_ha_version=
"2025.4",
668 raise ValueError(
"Don't call async_add_job with None")
670 if asyncio.iscoroutine(target):
671 return self.async_create_task(target, eager_start=eager_start)
673 return self._async_add_hass_job(
HassJob(target), *args)
677 def async_add_hass_job[_R](
679 hassjob: HassJob[..., Coroutine[Any, Any, _R]],
681 eager_start: bool =
False,
682 background: bool =
False,
683 ) -> asyncio.Future[_R] |
None: ...
687 def async_add_hass_job[_R](
689 hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
691 eager_start: bool =
False,
692 background: bool =
False,
693 ) -> asyncio.Future[_R] |
None: ...
696 def async_add_hass_job[_R](
698 hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
700 eager_start: bool =
False,
701 background: bool =
False,
702 ) -> asyncio.Future[_R] |
None:
703 """Add a HassJob from within the event loop.
705 If eager_start is True, coroutine functions will be scheduled eagerly.
706 If background is True, the task will created as a background task.
708 This method must be run in the event loop.
709 hassjob: HassJob to call.
710 args: parameters for method to call.
713 from .helpers
import frame
716 "calls `async_add_hass_job`, which should be reviewed against "
717 "https://developers.home-assistant.io/blog/2024/04/07/deprecate_add_hass_job"
718 " for replacement options",
719 core_behavior=frame.ReportBehavior.LOG,
720 breaks_in_ha_version=
"2025.5",
723 return self._async_add_hass_job(hassjob, *args, background=background)
727 def _async_add_hass_job[_R](
729 hassjob: HassJob[..., Coroutine[Any, Any, _R]],
731 background: bool =
False,
732 ) -> asyncio.Future[_R] |
None: ...
736 def _async_add_hass_job[_R](
738 hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
740 background: bool =
False,
741 ) -> asyncio.Future[_R] |
None: ...
744 def _async_add_hass_job[_R](
746 hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
748 background: bool =
False,
749 ) -> asyncio.Future[_R] |
None:
750 """Add a HassJob from within the event loop.
752 If eager_start is True, coroutine functions will be scheduled eagerly.
753 If background is True, the task will created as a background task.
755 This method must be run in the event loop.
756 hassjob: HassJob to call.
757 args: parameters for method to call.
759 task: asyncio.Future[_R]
764 if hassjob.job_type
is HassJobType.Coroutinefunction:
766 hassjob = cast(HassJob[..., Coroutine[Any, Any, _R]], hassjob)
767 task = create_eager_task(
768 hassjob.target(*args), name=hassjob.name, loop=self.
looploop
772 elif hassjob.job_type
is HassJobType.Callback:
774 hassjob = cast(HassJob[..., _R], hassjob)
775 self.
looploop.call_soon(hassjob.target, *args)
779 hassjob = cast(HassJob[..., _R], hassjob)
780 task = self.
looploop.run_in_executor(
None, hassjob.target, *args)
782 task_bucket = self._background_tasks
if background
else self.
_tasks_tasks
783 task_bucket.add(task)
784 task.add_done_callback(task_bucket.remove)
789 self, target: Coroutine[Any, Any, Any], name: str |
None =
None
791 """Add task to the executor pool.
793 target: target to call.
795 self.
looploop.call_soon_threadsafe(
797 self.async_create_task_internal, target, name, eager_start=
True
802 def async_create_task[_R](
804 target: Coroutine[Any, Any, _R],
805 name: str |
None =
None,
806 eager_start: bool =
True,
807 ) -> asyncio.Task[_R]:
808 """Create a task from within the event loop.
810 This method must be run in the event loop. If you are using this in your
811 integration, use the create task methods on the config entry instead.
813 target: target to call.
816 from .helpers
import frame
818 frame.report_non_thread_safe_operation(
"hass.async_create_task")
819 return self.async_create_task_internal(target, name, eager_start)
822 def async_create_task_internal[_R](
824 target: Coroutine[Any, Any, _R],
825 name: str |
None =
None,
826 eager_start: bool =
True,
827 ) -> asyncio.Task[_R]:
828 """Create a task from within the event loop, internal use only.
830 This method is intended to only be used by core internally
831 and should not be considered a stable API. We will make
832 breaking changes to this function in the future and it
833 should not be used in integrations.
835 This method must be run in the event loop. If you are using this in your
836 integration, use the create task methods on the config entry instead.
838 target: target to call.
841 task = create_eager_task(target, name=name, loop=self.
looploop)
849 task.add_done_callback(self.
_tasks_tasks.remove)
853 def async_create_background_task[_R](
854 self, target: Coroutine[Any, Any, _R], name: str, eager_start: bool =
True
855 ) -> asyncio.Task[_R]:
856 """Create a task from within the event loop.
858 This type of task is for background tasks that usually run for
859 the lifetime of Home Assistant or an integration's setup.
861 A background task is different from a normal task:
863 - Will not block startup
864 - Will be automatically cancelled on shutdown
865 - Calls to async_block_till_done will not wait for completion
867 If you are using this in your integration, use the create task
868 methods on the config entry instead.
870 This method must be run in the event loop.
873 task = create_eager_task(target, name=name, loop=self.
looploop)
880 self._background_tasks.
add(task)
881 task.add_done_callback(self._background_tasks.remove)
885 def async_add_executor_job[*_Ts, _T](
886 self, target: Callable[[*_Ts], _T], *args: *_Ts
887 ) -> asyncio.Future[_T]:
888 """Add an executor job from within the event loop."""
889 task = self.
looploop.run_in_executor(
None, target, *args)
891 tracked = asyncio.current_task()
in self.
_tasks_tasks
892 task_bucket = self.
_tasks_tasks
if tracked
else self._background_tasks
893 task_bucket.add(task)
894 task.add_done_callback(task_bucket.remove)
899 def async_add_import_executor_job[*_Ts, _T](
900 self, target: Callable[[*_Ts], _T], *args: *_Ts
901 ) -> asyncio.Future[_T]:
902 """Add an import executor job from within the event loop.
904 The future returned from this method must be awaited in the event loop.
910 def async_run_hass_job[_R](
912 hassjob: HassJob[..., Coroutine[Any, Any, _R]],
914 background: bool =
False,
915 ) -> asyncio.Future[_R] |
None: ...
919 def async_run_hass_job[_R](
921 hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
923 background: bool =
False,
924 ) -> asyncio.Future[_R] |
None: ...
927 def async_run_hass_job[_R](
929 hassjob: HassJob[..., Coroutine[Any, Any, _R] | _R],
931 background: bool =
False,
932 ) -> asyncio.Future[_R] |
None:
933 """Run a HassJob from within the event loop.
935 This method must be run in the event loop.
937 If background is True, the task will created as a background task.
940 args: parameters for method to call.
946 if hassjob.job_type
is HassJobType.Callback:
948 hassjob = cast(HassJob[..., _R], hassjob)
949 hassjob.target(*args)
952 return self._async_add_hass_job(hassjob, *args, background=background)
956 def async_run_job[_R, *_Ts](
957 self, target: Callable[[*_Ts], Coroutine[Any, Any, _R]], *args: *_Ts
958 ) -> asyncio.Future[_R] |
None: ...
962 def async_run_job[_R, *_Ts](
963 self, target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R], *args: *_Ts
964 ) -> asyncio.Future[_R] |
None: ...
968 def async_run_job[_R](
969 self, target: Coroutine[Any, Any, _R], *args: Any
970 ) -> asyncio.Future[_R] |
None: ...
973 def async_run_job[_R, *_Ts](
975 target: Callable[[*_Ts], Coroutine[Any, Any, _R] | _R]
976 | Coroutine[Any, Any, _R],
978 ) -> asyncio.Future[_R] |
None:
979 """Run a job from within the event loop.
981 This method must be run in the event loop.
983 target: target to call.
984 args: parameters for method to call.
987 from .helpers
import frame
990 "calls `async_run_job`, which should be reviewed against "
991 "https://developers.home-assistant.io/blog/2024/03/13/deprecate_add_run_job"
992 " for replacement options",
993 core_behavior=frame.ReportBehavior.LOG,
994 breaks_in_ha_version=
"2025.4",
997 if asyncio.iscoroutine(target):
998 return self.async_create_task(target, eager_start=
True)
1000 return self.async_run_hass_job(
HassJob(target), *args)
1003 """Block until all pending work is done."""
1004 asyncio.run_coroutine_threadsafe(
1010 """Block until all pending work is done."""
1012 await asyncio.sleep(0)
1013 start_time: float |
None =
None
1014 current_task = asyncio.current_task()
1018 self.
_tasks_tasks | self._background_tasks
1019 if wait_background_tasks
1022 if task
is not current_task
and not cancelling(task)
1026 if start_time
is None:
1030 elif start_time == 0:
1033 start_time = monotonic()
1034 elif monotonic() - start_time > BLOCK_LOG_TIMEOUT:
1039 _LOGGER.debug(
"Waiting for task: %s", task)
1042 self, pending: Collection[asyncio.Future[Any]]
1044 """Await and log tasks that take a long time."""
1047 _, pending = await asyncio.wait(pending, timeout=BLOCK_LOG_TIMEOUT)
1050 wait_time += BLOCK_LOG_TIMEOUT
1051 for task
in pending:
1052 _LOGGER.debug(
"Waited %s seconds for task: %s", wait_time, task)
1057 self, hassjob: HassJob[..., Coroutine[Any, Any, Any]], *args: Any
1058 ) -> CALLBACK_TYPE: ...
1063 self, hassjob: HassJob[..., Coroutine[Any, Any, Any] | Any], *args: Any
1064 ) -> CALLBACK_TYPE: ...
1068 self, hassjob: HassJob[..., Coroutine[Any, Any, Any] | Any], *args: Any
1070 """Add a HassJob which will be executed on shutdown.
1072 This method must be run in the event loop.
1075 args: parameters for method to call.
1077 Returns function to remove the job.
1080 self._shutdown_jobs.append(job_with_args)
1083 def remove_job() -> None:
1084 self._shutdown_jobs.
remove(job_with_args)
1089 """Stop Home Assistant and shuts down all threads."""
1090 if self.
statestate
is CoreState.not_running:
1098 async
def async_stop(self, exit_code: int = 0, *, force: bool =
False) ->
None:
1099 """Stop Home Assistant and shuts down all threads.
1101 The "force" flag commands async_stop to proceed regardless of
1102 Home Assistant's current state. You should not set this flag
1103 unless you're testing.
1105 This method is a coroutine.
1110 if self.
statestate
is CoreState.not_running:
1112 if self.
statestate
in [CoreState.stopping, CoreState.final_write]:
1113 _LOGGER.info(
"Additional call to async_stop was ignored")
1115 if self.
statestate
is CoreState.starting:
1118 "Stopping Home Assistant before startup has completed may fail"
1123 async
with self.timeout.async_timeout(STOPPING_STAGE_SHUTDOWN_TIMEOUT):
1124 tasks: list[asyncio.Future[Any]] = []
1125 for job
in self._shutdown_jobs:
1126 task_or_none = self.async_run_hass_job(job.job, *job.args)
1127 if not task_or_none:
1129 tasks.append(task_or_none)
1131 await asyncio.gather(*tasks, return_exceptions=
True)
1132 except TimeoutError:
1134 "Timed out waiting for shutdown jobs to complete, the shutdown will"
1144 running_tasks = self.
_tasks_tasks
1150 for task
in self._background_tasks:
1152 task.add_done_callback(self.
_tasks_tasks.remove)
1153 task.cancel(
"Home Assistant is stopping")
1158 self.
set_stateset_state(CoreState.stopping)
1159 self.
busbus.async_fire_internal(EVENT_HOMEASSISTANT_STOP)
1161 async
with self.timeout.async_timeout(STOP_STAGE_SHUTDOWN_TIMEOUT):
1163 except TimeoutError:
1165 "Timed out waiting for integrations to stop, the shutdown will"
1171 self.
set_stateset_state(CoreState.final_write)
1172 self.
busbus.async_fire_internal(EVENT_HOMEASSISTANT_FINAL_WRITE)
1174 async
with self.timeout.async_timeout(FINAL_WRITE_STAGE_SHUTDOWN_TIMEOUT):
1176 except TimeoutError:
1178 "Timed out waiting for final writes to complete, the shutdown will"
1184 self.
set_stateset_state(CoreState.not_running)
1185 self.
busbus.async_fire_internal(EVENT_HOMEASSISTANT_CLOSE)
1190 for task
in list(running_tasks):
1197 "Task %s was still running after final writes shutdown stage; "
1198 "Integrations should cancel non-critical tasks when receiving "
1199 "the stop event to prevent delaying shutdown",
1202 task.cancel(
"Home Assistant final writes shutdown stage")
1204 async
with asyncio.timeout(0.1):
1206 except asyncio.CancelledError:
1208 except TimeoutError:
1211 "Task %s could not be canceled during final shutdown stage", task
1214 _LOGGER.exception(
"Task %s error during final shutdown stage", task)
1224 async
with self.timeout.async_timeout(CLOSE_STAGE_SHUTDOWN_TIMEOUT):
1226 except TimeoutError:
1228 "Timed out waiting for close event to be processed, the shutdown will"
1233 self.
set_stateset_state(CoreState.stopped)
1236 if self.
_stopped_stopped
is not None:
1240 """Cancel timer handles marked as cancellable."""
1243 not handle.cancelled()
1244 and (args := handle._args)
1245 and type(job := args[0])
is HassJob
1246 and job.cancel_on_shutdown
1251 """Log all running tasks."""
1252 for task
in self.
_tasks_tasks:
1253 _LOGGER.warning(
"Shutdown stage '%s': still running: %s", stage, task)
1257 """The context that triggered something."""
1259 __slots__ = (
"id",
"user_id",
"parent_id",
"origin_event",
"_cache")
1263 user_id: str |
None =
None,
1264 parent_id: str |
None =
None,
1265 id: str |
None =
None,
1267 """Init the context."""
1268 self.
idid = id
or ulid_now()
1271 self.origin_event: Event[Any] |
None =
None
1272 self._cache: dict[str, Any] = {}
1275 """Compare contexts."""
1276 return isinstance(other, Context)
and self.
idid == other.id
1279 """Create a shallow copy of this context."""
1283 """Create a deep copy of this context."""
1286 @under_cached_property
1288 """Return a dictionary representation of the context.
1290 Callers should be careful to not mutate the returned dictionary
1291 as it will mutate the cached version.
1296 "user_id": self.
user_iduser_id,
1299 def as_dict(self) -> ReadOnlyDict[str, str | None]:
1300 """Return a ReadOnlyDict representation of the context."""
1303 @under_cached_property
1305 """Return a ReadOnlyDict representation of the context."""
1308 @under_cached_property
1310 """Return a JSON fragment of the context."""
1315 """Represent the origin of an event."""
1321 """Return the event."""
1326 """Return the index of the origin."""
1327 return next((idx
for idx, origin
in enumerate(EventOrigin)
if origin
is self))
1331 """Representation of an event within the bus."""
1337 "time_fired_timestamp",
1344 event_type: EventType[_DataT] | str,
1345 data: _DataT |
None =
None,
1346 origin: EventOrigin = EventOrigin.local,
1347 time_fired_timestamp: float |
None =
None,
1348 context: Context |
None =
None,
1350 """Initialize a new event."""
1352 self.data: _DataT = data
or {}
1358 if not context.origin_event:
1359 context.origin_event = self
1360 self._cache: dict[str, Any] = {}
1362 @under_cached_property
1364 """Return time fired as a timestamp."""
1367 @under_cached_property
1369 """Create a dict representation of this Event.
1371 Callers should be careful to not mutate the returned dictionary
1372 as it will mutate the cached version.
1377 "origin": self.
originorigin.value,
1378 "time_fired": self.
time_firedtime_fired.isoformat(),
1382 "context": self.
contextcontext._as_dict,
1386 """Create a ReadOnlyDict representation of this Event.
1392 @under_cached_property
1394 """Create a ReadOnlyDict representation of this Event."""
1396 data = as_dict[
"data"]
1397 context = as_dict[
"context"]
1402 if type(data)
is not ReadOnlyDict:
1404 if type(context)
is not ReadOnlyDict:
1408 @under_cached_property
1410 """Return an event as a JSON fragment."""
1414 """Return the representation."""
1419 event_type: EventType[_DataT] | str, origin: EventOrigin, data: _DataT |
None
1421 """Return the representation."""
1423 return f
"<Event {event_type}[{str(origin)[0]}]: {util.repr_helper(data)}>"
1425 return f
"<Event {event_type}[{str(origin)[0]}]>"
1428 _FilterableJobType = tuple[
1429 HassJob[[Event[_DataT]], Coroutine[Any, Any,
None] |
None],
1430 Callable[[_DataT], bool] |
None,
1434 @dataclass(slots=True)
1437 listener_job: HassJob[[Event[_DataT]], Coroutine[Any, Any,
None] |
None]
1438 remove: CALLBACK_TYPE |
None =
None
1442 """Remove listener from event bus and then fire listener."""
1443 if not self.
removeremove:
1448 self.hass.async_run_hass_job(self.listener_job, event)
1451 """Return the representation of the listener and source module."""
1452 module = inspect.getmodule(self.listener_job.target)
1454 return f
"<_OneTimeListener {module.__name__}:{self.listener_job.target}>"
1455 return f
"<_OneTimeListener {self.listener_job.target}>"
1459 EMPTY_LIST: list[Any] = []
1462 @functools.lru_cache
1464 """Verify the length of the event type and raise if too long."""
1465 if len(event_type) > MAX_LENGTH_EVENT_EVENT_TYPE:
1470 """Allow the firing of and listening for events."""
1472 __slots__ = (
"_debug",
"_hass",
"_listeners",
"_match_all_listeners")
1475 """Initialize a new event bus."""
1476 self._listeners: defaultdict[
1477 EventType[Any] | str, list[_FilterableJobType[Any]]
1478 ] = defaultdict(list)
1479 self._match_all_listeners: list[_FilterableJobType[Any]] = []
1480 self._listeners[MATCH_ALL] = self._match_all_listeners
1487 """Handle logging change."""
1488 self.
_debug_debug = _LOGGER.isEnabledFor(logging.DEBUG)
1492 """Return dictionary with events and the number of listeners.
1494 This method must be run in the event loop.
1496 return {key: len(listeners)
for key, listeners
in self._listeners.items()}
1500 """Return dictionary with events and the number of listeners."""
1501 return run_callback_threadsafe(self.
_hass_hass.loop, self.
async_listenersasync_listeners).result()
1505 event_type: EventType[_DataT] | str,
1506 event_data: _DataT |
None =
None,
1507 origin: EventOrigin = EventOrigin.local,
1508 context: Context |
None =
None,
1510 """Fire an event."""
1512 self.
_hass_hass.loop.call_soon_threadsafe(
1519 event_type: EventType[_DataT] | str,
1520 event_data: _DataT |
None =
None,
1521 origin: EventOrigin = EventOrigin.local,
1522 context: Context |
None =
None,
1523 time_fired: float |
None =
None,
1527 This method must be run in the event loop.
1530 if self.
_hass_hass.loop_thread_id != threading.get_ident():
1531 from .helpers
import frame
1533 frame.report_non_thread_safe_operation(
"hass.bus.async_fire")
1535 event_type, event_data, origin, context, time_fired
1541 event_type: EventType[_DataT] | str,
1542 event_data: _DataT |
None =
None,
1543 origin: EventOrigin = EventOrigin.local,
1544 context: Context |
None =
None,
1545 time_fired: float |
None =
None,
1547 """Fire an event, for internal use only.
1549 This method is intended to only be used by core internally
1550 and should not be considered a stable API. We will make
1551 breaking changes to this function in the future and it
1552 should not be used in integrations.
1554 This method must be run in the event loop.
1558 "Bus:Handling %s",
_event_repr(event_type, origin, event_data)
1561 listeners = self._listeners.
get(event_type, EMPTY_LIST)
1562 if event_type
not in EVENTS_EXCLUDED_FROM_MATCH_ALL:
1563 match_all_listeners = self._match_all_listeners
1565 match_all_listeners = EMPTY_LIST
1567 event: Event[_DataT] |
None =
None
1568 for job, event_filter
in listeners + match_all_listeners:
1569 if event_filter
is not None:
1571 if event_data
is None or not event_filter(event_data):
1574 _LOGGER.exception(
"Error in event filter")
1587 self.
_hass_hass.async_run_hass_job(job, event)
1589 _LOGGER.exception(
"Error running job: %s", job)
1593 event_type: EventType[_DataT] | str,
1594 listener: Callable[[Event[_DataT]], Coroutine[Any, Any,
None] |
None],
1596 """Listen for all events or events of a specific type.
1598 To listen to all events specify the constant ``MATCH_ALL``
1601 async_remove_listener = run_callback_threadsafe(
1605 def remove_listener() -> None:
1606 """Remove the listener."""
1607 run_callback_threadsafe(self.
_hass_hass.loop, async_remove_listener).result()
1609 return remove_listener
1614 event_type: EventType[_DataT] | str,
1615 listener: Callable[[Event[_DataT]], Coroutine[Any, Any,
None] |
None],
1616 event_filter: Callable[[_DataT], bool] |
None =
None,
1617 run_immediately: bool | object = _SENTINEL,
1619 """Listen for all events or events of a specific type.
1621 To listen to all events specify the constant ``MATCH_ALL``
1624 An optional event_filter, which must be a callable decorated with
1625 @callback that returns a boolean value, determines if the
1626 listener callable should run.
1628 If run_immediately is passed:
1629 - callbacks will be run right away instead of using call_soon.
1630 - coroutine functions will be scheduled eagerly.
1632 This method must be run in the event loop.
1634 if run_immediately
in (
True,
False):
1636 from .helpers
import frame
1639 "calls `async_listen` with run_immediately",
1640 core_behavior=frame.ReportBehavior.LOG,
1641 breaks_in_ha_version=
"2025.5",
1646 filterable_job = (
HassJob(listener, f
"listen {event_type}"), event_filter)
1647 if event_type == EVENT_STATE_REPORTED:
1648 if not event_filter:
1650 f
"Event filter is required for event {event_type}"
1657 event_type: EventType[_DataT] | str,
1658 filterable_job: _FilterableJobType[_DataT],
1660 """Listen for all events or events of a specific type."""
1661 self._listeners[event_type].append(filterable_job)
1662 return functools.partial(
1668 event_type: EventType[_DataT] | str,
1669 listener: Callable[[Event[_DataT]], Coroutine[Any, Any,
None] |
None],
1671 """Listen once for event of a specific type.
1673 To listen to all events specify the constant ``MATCH_ALL``
1676 Returns function to unsubscribe the listener.
1678 async_remove_listener = run_callback_threadsafe(
1682 def remove_listener() -> None:
1683 """Remove the listener."""
1684 run_callback_threadsafe(self.
_hass_hass.loop, async_remove_listener).result()
1686 return remove_listener
1691 event_type: EventType[_DataT] | str,
1692 listener: Callable[[Event[_DataT]], Coroutine[Any, Any,
None] |
None],
1693 run_immediately: bool | object = _SENTINEL,
1695 """Listen once for event of a specific type.
1697 To listen to all events specify the constant ``MATCH_ALL``
1700 Returns registered listener that can be used with remove_listener.
1702 This method must be run in the event loop.
1704 if run_immediately
in (
True,
False):
1706 from .helpers
import frame
1709 "calls `async_listen_once` with run_immediately",
1710 core_behavior=frame.ReportBehavior.LOG,
1711 breaks_in_ha_version=
"2025.5",
1722 f
"onetime listen {event_type} {listener}",
1723 job_type=HassJobType.Callback,
1728 one_time_listener.remove = remove
1734 event_type: EventType[_DataT] | str,
1735 filterable_job: _FilterableJobType[_DataT],
1737 """Remove a listener of a specific event_type.
1739 This method must be run in the event loop.
1742 self._listeners[event_type].
remove(filterable_job)
1745 if not self._listeners[event_type]
and event_type != MATCH_ALL:
1746 self._listeners.pop(event_type)
1747 except (KeyError, ValueError):
1751 "Unable to remove unknown job listener %s", filterable_job
1756 """Compressed dict of a state."""
1759 a: ReadOnlyDict[str, Any]
1760 c: str | dict[str, Any]
1762 lu: NotRequired[float]
1766 """Object to represent a state within the state machine.
1768 entity_id: the entity that is represented.
1769 state: the state of the entity
1770 attributes: extra information on entity and state
1771 last_changed: last time the state was changed.
1772 last_reported: last time the state was reported.
1773 last_updated: last time the state or attributes were changed.
1774 context: Context in which it was created
1775 domain: Domain of this state.
1776 object_id: Object id of this state.
1790 "last_updated_timestamp",
1798 attributes: Mapping[str, Any] |
None =
None,
1799 last_changed: datetime.datetime |
None =
None,
1800 last_reported: datetime.datetime |
None =
None,
1801 last_updated: datetime.datetime |
None =
None,
1802 context: Context |
None =
None,
1803 validate_entity_id: bool |
None =
True,
1804 state_info: StateInfo |
None =
None,
1805 last_updated_timestamp: float |
None =
None,
1807 """Initialize a new state."""
1808 self._cache: dict[str, Any] = {}
1813 f
"Invalid entity id encountered: {entity_id}. "
1814 "Format should be <domain>.<object_id>"
1824 if type(attributes)
is not ReadOnlyDict:
1838 if not last_updated_timestamp:
1839 last_updated_timestamp = last_updated.timestamp()
1842 self._cache[
"last_changed_timestamp"] = last_updated_timestamp
1847 self._cache[
"last_reported_timestamp"] = last_updated_timestamp
1849 @under_cached_property
1851 """Name of this state."""
1856 @under_cached_property
1858 """Timestamp of last change."""
1861 @under_cached_property
1863 """Timestamp of last report."""
1866 @under_cached_property
1868 """Return a dict representation of the State.
1870 Callers should be careful to not mutate the returned dictionary
1871 as it will mutate the cached version.
1873 last_changed_isoformat = self.
last_changedlast_changed.isoformat()
1875 last_updated_isoformat = last_changed_isoformat
1877 last_updated_isoformat = self.
last_updatedlast_updated.isoformat()
1879 last_reported_isoformat = last_changed_isoformat
1881 last_reported_isoformat = self.
last_reportedlast_reported.isoformat()
1884 "state": self.
statestate,
1886 "last_changed": last_changed_isoformat,
1887 "last_reported": last_reported_isoformat,
1888 "last_updated": last_updated_isoformat,
1892 "context": self.
contextcontext._as_dict,
1897 ) -> ReadOnlyDict[str, datetime.datetime | Collection[Any]]:
1898 """Return a ReadOnlyDict representation of the State.
1902 Can be used for JSON serialization.
1903 Ensures: state == State.from_dict(state.as_dict())
1907 @under_cached_property
1910 ) -> ReadOnlyDict[str, datetime.datetime | Collection[Any]]:
1911 """Return a ReadOnlyDict representation of the State."""
1913 context = as_dict[
"context"]
1918 if type(context)
is not ReadOnlyDict:
1922 @under_cached_property
1924 """Return a JSON string of the State."""
1927 @under_cached_property
1929 """Return a JSON fragment of the State."""
1932 @under_cached_property
1934 """Build a compressed dict of a state for adds.
1936 Omits the lu (last_updated) if it matches (lc) last_changed.
1938 Sends c (context) as a string if it only contains an id.
1940 state_context = self.
contextcontext
1941 if state_context.parent_id
is None and state_context.user_id
is None:
1942 context: dict[str, Any] | str = state_context.id
1947 context = state_context._as_dict
1948 compressed_state: CompressedState = {
1949 COMPRESSED_STATE_STATE: self.
statestate,
1950 COMPRESSED_STATE_ATTRIBUTES: self.
attributesattributes,
1951 COMPRESSED_STATE_CONTEXT: context,
1955 compressed_state[COMPRESSED_STATE_LAST_UPDATED] = (
1958 return compressed_state
1960 @under_cached_property
1962 """Build a compressed JSON key value pair of a state for adds.
1964 The JSON string is a key value pair of the entity_id and the compressed state.
1966 It is used for sending multiple states in a single message.
1971 def from_dict(cls, json_dict: dict[str, Any]) -> Self |
None:
1972 """Initialize a state from a dict.
1976 Ensures: state == State.from_json_dict(state.to_json_dict())
1978 if not (json_dict
and "entity_id" in json_dict
and "state" in json_dict):
1981 last_changed = json_dict.get(
"last_changed")
1982 if isinstance(last_changed, str):
1983 last_changed = dt_util.parse_datetime(last_changed)
1985 last_updated = json_dict.get(
"last_updated")
1986 if isinstance(last_updated, str):
1987 last_updated = dt_util.parse_datetime(last_updated)
1989 last_reported = json_dict.get(
"last_reported")
1990 if isinstance(last_reported, str):
1991 last_reported = dt_util.parse_datetime(last_reported)
1993 if context := json_dict.get(
"context"):
1994 context =
Context(id=context.get(
"id"), user_id=context.get(
"user_id"))
1997 json_dict[
"entity_id"],
1999 json_dict.get(
"attributes"),
2000 last_changed=last_changed,
2001 last_reported=last_reported,
2002 last_updated=last_updated,
2007 """Mark the state as old.
2009 We give up the original reference to the context to ensure
2010 the context can be garbage collected by replacing it with
2011 a new one with the same id to ensure the old state
2012 can still be examined for comparison against the new state.
2014 Since we are always going to fire a EVENT_STATE_CHANGED event
2015 after we remove a state from the state machine we need to make
2016 sure we don't end up holding a reference to the original context
2017 since it can never be garbage collected as each event would
2018 reference the previous one.
2025 """Return the representation of the states."""
2026 attrs = f
"; {util.repr_helper(self.attributes)}" if self.
attributesattributes
else ""
2029 f
"<state {self.entity_id}={self.state}{attrs}"
2030 f
" @ {dt_util.as_local(self.last_changed).isoformat()}>"
2035 """Container for states, maps entity_id -> State.
2037 Maintains an additional index:
2038 - domain -> dict[str, State]
2042 """Initialize the container."""
2044 self._domain_index: defaultdict[str, dict[str, State]] = defaultdict(dict)
2047 """Return the underlying values to avoid __iter__ overhead."""
2048 return self.data.
values()
2052 self.data[key] = entry
2053 self._domain_index[entry.domain][entry.entity_id] = entry
2056 """Remove an item."""
2058 del self._domain_index[entry.domain][entry.entity_id]
2062 """Get all entity_ids for a domain."""
2064 if key
not in self._domain_index:
2066 return self._domain_index[key].keys()
2069 """Get all states for a domain."""
2071 if key
not in self._domain_index:
2073 return self._domain_index[key].
values()
2077 """Helper class that tracks the state of different entities."""
2079 __slots__ = (
"_states",
"_states_data",
"_reservations",
"_bus",
"_loop")
2081 def __init__(self, bus: EventBus, loop: asyncio.events.AbstractEventLoop) ->
None:
2082 """Initialize state machine."""
2087 self._reservations: set[str] =
set()
2091 def entity_ids(self, domain_filter: str |
None =
None) -> list[str]:
2092 """List of entity ids that are being tracked."""
2093 future = run_callback_threadsafe(
2096 return future.result()
2100 self, domain_filter: str | Iterable[str] |
None =
None
2102 """List of entity ids that are being tracked.
2104 This method must be run in the event loop.
2106 if domain_filter
is None:
2109 if isinstance(domain_filter, str):
2110 return list(self.
_states_states.domain_entity_ids(domain_filter.lower()))
2112 entity_ids: list[str] = []
2113 for domain
in domain_filter:
2114 entity_ids.extend(self.
_states_states.domain_entity_ids(domain))
2119 self, domain_filter: str | Iterable[str] |
None =
None
2121 """Count the entity ids that are being tracked.
2123 This method must be run in the event loop.
2125 if domain_filter
is None:
2128 if isinstance(domain_filter, str):
2129 return len(self.
_states_states.domain_entity_ids(domain_filter.lower()))
2132 len(self.
_states_states.domain_entity_ids(domain))
for domain
in domain_filter
2135 def all(self, domain_filter: str | Iterable[str] |
None =
None) -> list[State]:
2136 """Create a list of all states."""
2137 return run_callback_threadsafe(
2143 self, domain_filter: str | Iterable[str] |
None =
None
2145 """Create a list of all states matching the filter.
2147 This method must be run in the event loop.
2149 if domain_filter
is None:
2152 if isinstance(domain_filter, str):
2153 return list(self.
_states_states.domain_states(domain_filter.lower()))
2155 states: list[State] = []
2156 for domain
in domain_filter:
2157 states.extend(self.
_states_states.domain_states(domain))
2160 def get(self, entity_id: str) -> State |
None:
2161 """Retrieve state of entity_id or None if not found.
2169 def is_state(self, entity_id: str, state: str) -> bool:
2170 """Test if entity exists and is in specified state.
2174 state_obj = self.
getget(entity_id)
2175 return state_obj
is not None and state_obj.state == state
2178 """Remove the state of an entity.
2180 Returns boolean to indicate if an entity was removed.
2182 return run_callback_threadsafe(
2187 def async_remove(self, entity_id: str, context: Context |
None =
None) -> bool:
2188 """Remove the state of an entity.
2190 Returns boolean to indicate if an entity was removed.
2192 This method must be run in the event loop.
2194 entity_id = entity_id.lower()
2195 old_state = self.
_states_states.pop(entity_id,
None)
2196 self._reservations.discard(entity_id)
2198 if old_state
is None:
2202 state_changed_data: EventStateChangedData = {
2203 "entity_id": entity_id,
2204 "old_state": old_state,
2207 self.
_bus_bus.async_fire_internal(
2208 EVENT_STATE_CHANGED,
2218 attributes: Mapping[str, Any] |
None =
None,
2219 force_update: bool =
False,
2220 context: Context |
None =
None,
2222 """Set the state of an entity, add entity if it does not exist.
2224 Attributes is an optional dict to specify attributes of this state.
2226 If you just update the attributes and not the state, last changed will
2229 run_callback_threadsafe(
2241 """Reserve a state in the state machine for an entity being added.
2243 This must not fire an event when the state is reserved.
2245 This avoids a race condition where multiple entities with the same
2246 entity_id are added.
2248 entity_id = entity_id.lower()
2249 if entity_id
in self.
_states_data_states_data
or entity_id
in self._reservations:
2251 "async_reserve must not be called once the state is in the state"
2255 self._reservations.
add(entity_id)
2259 """Check to see if an entity_id is available to be used."""
2260 entity_id = entity_id.lower()
2262 entity_id
not in self.
_states_data_states_data
and entity_id
not in self._reservations
2270 attributes: Mapping[str, Any] |
None =
None,
2271 force_update: bool =
False,
2272 context: Context |
None =
None,
2273 state_info: StateInfo |
None =
None,
2274 timestamp: float |
None =
None,
2276 """Set the state of an entity, add entity if it does not exist.
2278 Attributes is an optional dict to specify attributes of this state.
2280 If you just update the attributes and not the state, last changed will
2283 This method must be run in the event loop.
2292 timestamp
or time.time(),
2300 attributes: Mapping[str, Any] |
None,
2302 context: Context |
None,
2303 state_info: StateInfo |
None,
2306 """Set the state of an entity, add entity if it does not exist.
2308 This method is intended to only be used by core internally
2309 and should not be considered a stable API. We will make
2310 breaking changes to this function in the future and it
2311 should not be used in integrations.
2313 This method must be run in the event loop.
2319 old_state: State |
None
2328 same_state = old_state.state == new_state
and not force_update
2329 same_attr = old_state.attributes == attributes
2330 last_changed = old_state.last_changed
if same_state
else None
2343 now = dt_util.utc_from_timestamp(timestamp)
2346 context =
Context(id=ulid_at_time(timestamp))
2348 if same_state
and same_attr:
2350 old_last_reported = old_state.last_reported
2351 old_state.last_reported = now
2352 old_state._cache[
"last_reported_timestamp"] = timestamp
2354 self.
_bus_bus.async_fire_internal(
2355 EVENT_STATE_REPORTED,
2357 "entity_id": entity_id,
2358 "old_last_reported": old_last_reported,
2359 "new_state": old_state,
2362 time_fired=timestamp,
2368 assert old_state
is not None
2369 attributes = old_state.attributes
2385 if old_state
is not None:
2387 self.
_states_states[entity_id] = state
2388 state_changed_data: EventStateChangedData = {
2389 "entity_id": entity_id,
2390 "old_state": old_state,
2393 self.
_bus_bus.async_fire_internal(
2394 EVENT_STATE_CHANGED,
2397 time_fired=timestamp,
2402 """Service call response configuration."""
2405 """The service does not support responses (the default)."""
2407 OPTIONAL =
"optional"
2408 """The service optionally returns response data when asked by the caller."""
2411 """The service is read-only and the caller must always ask for response data."""
2415 """Representation of a callable service."""
2417 __slots__ = [
"job",
"schema",
"domain",
"service",
"supports_response"]
2423 Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
2425 | EntityServiceResponse
2428 schema: VolSchemaType |
None,
2431 context: Context |
None =
None,
2432 supports_response: SupportsResponse = SupportsResponse.NONE,
2433 job_type: HassJobType |
None =
None,
2435 """Initialize a service."""
2436 self.
jobjob =
HassJob(func, f
"service {domain}.{service}", job_type=job_type)
2442 """Representation of a call to a service."""
2444 __slots__ = (
"domain",
"service",
"data",
"context",
"return_response")
2450 data: dict[str, Any] |
None =
None,
2451 context: Context |
None =
None,
2452 return_response: bool =
False,
2454 """Initialize a service call."""
2462 """Return the representation of the service."""
2465 f
"<ServiceCall {self.domain}.{self.service} "
2466 f
"(c:{self.context.id}): {util.repr_helper(self.data)}>"
2469 return f
"<ServiceCall {self.domain}.{self.service} (c:{self.context.id})>"
2473 """Offer the services over the eventbus."""
2475 __slots__ = (
"_services",
"_hass")
2478 """Initialize a service registry."""
2479 self._services: dict[str, dict[str, Service]] = {}
2484 """Return dictionary with per domain a list of available services."""
2485 return run_callback_threadsafe(self.
_hass_hass.loop, self.
async_servicesasync_services).result()
2489 """Return dictionary with per domain a list of available services.
2491 This method makes a copy of the registry. This function is expensive,
2492 and should only be used if has_service is not sufficient.
2494 This method must be run in the event loop.
2496 return {domain: service.copy()
for domain, service
in self._services.items()}
2500 """Return dictionary with per domain a list of available services.
2502 This method makes a copy of the registry for the domain.
2504 This method must be run in the event loop.
2506 return self._services.
get(domain, {}).copy()
2510 """Return dictionary with per domain a list of available services.
2512 This method DOES NOT make a copy of the services like async_services does.
2513 It is only expected to be called from the Home Assistant internals
2514 as a performance optimization when the caller is not going to modify the
2517 This method must be run in the event loop.
2519 return self._services
2522 """Test if specified service exists.
2526 return service.lower()
in self._services.
get(domain.lower(), [])
2529 """Return whether or not the service supports response data.
2531 This exists so that callers can return more helpful error messages given
2532 the context. Will return NONE if the service does not exist as there is
2533 other error handling when calling the service if it does not exist.
2535 if not (handler := self._services[domain.lower()][service.lower()]):
2536 return SupportsResponse.NONE
2537 return handler.supports_response
2543 service_func: Callable[
2545 Coroutine[Any, Any, ServiceResponse] | ServiceResponse |
None,
2547 schema: vol.Schema |
None =
None,
2548 supports_response: SupportsResponse = SupportsResponse.NONE,
2550 """Register a service.
2552 Schema is called to coerce and validate the service data.
2554 run_callback_threadsafe(
2555 self.
_hass_hass.loop,
2569 service_func: Callable[
2571 Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
2573 | EntityServiceResponse
2576 schema: VolSchemaType |
None =
None,
2577 supports_response: SupportsResponse = SupportsResponse.NONE,
2578 job_type: HassJobType |
None =
None,
2580 """Register a service.
2582 Schema is called to coerce and validate the service data.
2584 This method must be run in the event loop.
2586 self.
_hass_hass.verify_event_loop_thread(
"hass.services.async_register")
2588 domain, service, service_func, schema, supports_response, job_type
2596 service_func: Callable[
2598 Coroutine[Any, Any, ServiceResponse | EntityServiceResponse]
2600 | EntityServiceResponse
2603 schema: VolSchemaType |
None =
None,
2604 supports_response: SupportsResponse = SupportsResponse.NONE,
2605 job_type: HassJobType |
None =
None,
2607 """Register a service.
2609 Schema is called to coerce and validate the service data.
2611 This method must be run in the event loop.
2613 domain = domain.lower()
2614 service = service.lower()
2620 supports_response=supports_response,
2624 if domain
in self._services:
2625 self._services[domain][service] = service_obj
2627 self._services[domain] = {service: service_obj}
2629 self.
_hass_hass.bus.async_fire_internal(
2630 EVENT_SERVICE_REGISTERED, {ATTR_DOMAIN: domain, ATTR_SERVICE: service}
2633 def remove(self, domain: str, service: str) ->
None:
2634 """Remove a registered service from service handler."""
2635 run_callback_threadsafe(
2641 """Remove a registered service from service handler.
2643 This method must be run in the event loop.
2645 self.
_hass_hass.verify_event_loop_thread(
"hass.services.async_remove")
2650 """Remove a registered service from service handler.
2652 This method must be run in the event loop.
2654 domain = domain.lower()
2655 service = service.lower()
2657 if service
not in self._services.
get(domain, {}):
2658 _LOGGER.warning(
"Unable to remove unknown service %s/%s", domain, service)
2661 self._services[domain].pop(service)
2663 if not self._services[domain]:
2664 self._services.pop(domain)
2666 self.
_hass_hass.bus.async_fire_internal(
2667 EVENT_SERVICE_REMOVED, {ATTR_DOMAIN: domain, ATTR_SERVICE: service}
2674 service_data: dict[str, Any] |
None =
None,
2675 blocking: bool =
False,
2676 context: Context |
None =
None,
2677 target: dict[str, Any] |
None =
None,
2678 return_response: bool =
False,
2679 ) -> ServiceResponse:
2682 See description of async_call for details.
2684 return asyncio.run_coroutine_threadsafe(
2694 self.
_hass_hass.loop,
2701 service_data: dict[str, Any] |
None =
None,
2702 blocking: bool =
False,
2703 context: Context |
None =
None,
2704 target: dict[str, Any] |
None =
None,
2705 return_response: bool =
False,
2706 ) -> ServiceResponse:
2709 Specify blocking=True to wait until service is executed.
2711 If return_response=True, indicates that the caller can consume return values
2712 from the service, if any. Return values are a dict that can be returned by the
2713 standard JSON serialization process. Return values can only be used with blocking=True.
2715 This method will fire an event to indicate the service has been called.
2717 Because the service is sent as an event you are not allowed to use
2718 the keys ATTR_DOMAIN and ATTR_SERVICE in your service_data.
2720 This method is a coroutine.
2722 context = context
or Context()
2723 service_data = service_data
or {}
2726 handler = self._services[domain][service]
2730 domain = domain.lower()
2731 service = service.lower()
2733 handler = self._services[domain][service]
2740 translation_domain=DOMAIN,
2741 translation_key=
"service_should_be_blocking",
2742 translation_placeholders={
2743 "return_response":
"return_response=True",
2744 "non_blocking_argument":
"blocking=False",
2747 if handler.supports_response
is SupportsResponse.NONE:
2749 translation_domain=DOMAIN,
2750 translation_key=
"service_does_not_support_response",
2751 translation_placeholders={
2752 "return_response":
"return_response=True"
2755 elif handler.supports_response
is SupportsResponse.ONLY:
2757 translation_domain=DOMAIN,
2758 translation_key=
"service_lacks_response_request",
2759 translation_placeholders={
"return_response":
"return_response=True"},
2763 service_data.update(target)
2767 processed_data: dict[str, Any] = handler.schema(service_data)
2770 "Invalid data for service call %s.%s: %s",
2777 processed_data = service_data
2780 domain, service, processed_data, context, return_response
2783 self.
_hass_hass.bus.async_fire_internal(
2786 ATTR_DOMAIN: domain,
2787 ATTR_SERVICE: service,
2788 ATTR_SERVICE_DATA: service_data,
2795 self.
_hass_hass.async_create_task_internal(
2797 f
"service call background {service_call.domain}.{service_call.service}",
2802 response_data = await coro
2803 if not return_response:
2805 if not isinstance(response_data, dict):
2807 translation_domain=DOMAIN,
2808 translation_key=
"service_reponse_invalid",
2809 translation_placeholders={
2810 "response_data_type":
str(type(response_data))
2813 return response_data
2817 coro_or_task: Coroutine[Any, Any, Any] | asyncio.Task[Any],
2818 service_call: ServiceCall,
2820 """Run service call in background, catching and logging any exceptions."""
2823 except Unauthorized:
2825 "Unauthorized service called %s/%s",
2826 service_call.domain,
2827 service_call.service,
2829 except asyncio.CancelledError:
2830 _LOGGER.debug(
"Service was cancelled: %s", service_call)
2832 _LOGGER.exception(
"Error executing service: %s", service_call)
2835 self, handler: Service, service_call: ServiceCall
2836 ) -> ServiceResponse:
2837 """Execute a service."""
2840 if job.job_type
is HassJobType.Coroutinefunction:
2843 Callable[..., Coroutine[Any, Any, ServiceResponse]], target
2845 return await target(service_call)
2846 if job.job_type
is HassJobType.Callback:
2848 target = cast(Callable[..., ServiceResponse], target)
2849 return target(service_call)
2851 target = cast(Callable[..., ServiceResponse], target)
2852 return await self.
_hass_hass.async_add_executor_job(target, service_call)
2856 __getattr__ = functools.partial(check_if_deprecated_constant, module_globals=globals())
2857 __dir__ = functools.partial(
2858 dir_with_deprecated_constants, module_globals_keys=[*globals().keys()]
json_fragment json_fragment(self)
ReadOnlyDict[str, str|None] _as_read_only_dict(self)
None __init__(self, str|None user_id=None, str|None parent_id=None, str|None id=None)
bool __eq__(self, object other)
ReadOnlyDict[str, str|None] as_dict(self)
dict[str, str|None] _as_dict(self)
Context __deepcopy__(self, dict[int, Any] memo)
CALLBACK_TYPE async_listen(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener, Callable[[_DataT], bool]|None event_filter=None, bool|object run_immediately=_SENTINEL)
None async_fire(self, EventType[_DataT]|str event_type, _DataT|None event_data=None, EventOrigin origin=EventOrigin.local, Context|None context=None, float|None time_fired=None)
CALLBACK_TYPE _async_listen_filterable_job(self, EventType[_DataT]|str event_type, _FilterableJobType[_DataT] filterable_job)
None _async_remove_listener(self, EventType[_DataT]|str event_type, _FilterableJobType[_DataT] filterable_job)
None async_fire_internal(self, EventType[_DataT]|str event_type, _DataT|None event_data=None, EventOrigin origin=EventOrigin.local, Context|None context=None, float|None time_fired=None)
CALLBACK_TYPE async_listen_once(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener, bool|object run_immediately=_SENTINEL)
None __init__(self, HomeAssistant hass)
None fire(self, EventType[_DataT]|str event_type, _DataT|None event_data=None, EventOrigin origin=EventOrigin.local, Context|None context=None)
None _async_logging_changed(self, Event|None event=None)
dict[EventType[Any]|str, int] listeners(self)
dict[EventType[Any]|str, int] async_listeners(self)
CALLBACK_TYPE listen_once(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener)
CALLBACK_TYPE listen(self, EventType[_DataT]|str event_type, Callable[[Event[_DataT]], Coroutine[Any, Any, None]|None] listener)
ReadOnlyDict[str, Any] as_dict(self)
datetime.datetime time_fired(self)
None __init__(self, EventType[_DataT]|str event_type, _DataT|None data=None, EventOrigin origin=EventOrigin.local, float|None time_fired_timestamp=None, Context|None context=None)
ReadOnlyDict[str, Any] _as_read_only_dict(self)
json_fragment json_fragment(self)
dict[str, Any] _as_dict(self)
None __init__(self, Callable[_P, _R_co] target, str|None name=None, *bool|None cancel_on_shutdown=None, HassJobType|None job_type=None)
HassJobType job_type(self)
bool|None cancel_on_shutdown(self)
Self __new__(cls, str config_dir)
None _async_log_running_tasks(self, str stage)
CALLBACK_TYPE async_add_shutdown_job(self, HassJob[..., Coroutine[Any, Any, Any]] hassjob, *Any args)
int async_run(self, *bool attach_signals=True)
None async_block_till_done(self, bool wait_background_tasks=False)
None _await_and_log_pending(self, Collection[asyncio.Future[Any]] pending)
set[asyncio.Future[Any]] _active_tasks(self)
None __init__(self, str config_dir)
None set_state(self, CoreState state)
None _cancel_cancellable_timers(self)
None block_till_done(self, bool wait_background_tasks=False)
None async_stop(self, int exit_code=0, *bool force=False)
None create_task(self, Coroutine[Any, Any, Any] target, str|None name=None)
None verify_event_loop_thread(self, str what)
None __init__(self, str domain, str service, dict[str, Any]|None data=None, Context|None context=None, bool return_response=False)
None register(self, str domain, str service, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse]|ServiceResponse|None,] service_func, vol.Schema|None schema=None, SupportsResponse supports_response=SupportsResponse.NONE)
dict[str, dict[str, Service]] async_services_internal(self)
bool has_service(self, str domain, str service)
None remove(self, str domain, str service)
dict[str, dict[str, Service]] async_services(self)
ServiceResponse call(self, str domain, str service, dict[str, Any]|None service_data=None, bool blocking=False, Context|None context=None, dict[str, Any]|None target=None, bool return_response=False)
None _async_remove(self, str domain, str service)
None _async_register(self, str domain, str service, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse|EntityServiceResponse]|ServiceResponse|EntityServiceResponse|None,] service_func, VolSchemaType|None schema=None, SupportsResponse supports_response=SupportsResponse.NONE, HassJobType|None job_type=None)
dict[str, Service] async_services_for_domain(self, str domain)
SupportsResponse supports_response(self, str domain, str service)
None async_register(self, str domain, str service, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse|EntityServiceResponse]|ServiceResponse|EntityServiceResponse|None,] service_func, VolSchemaType|None schema=None, SupportsResponse supports_response=SupportsResponse.NONE, HassJobType|None job_type=None)
None _run_service_call_catch_exceptions(self, Coroutine[Any, Any, Any]|asyncio.Task[Any] coro_or_task, ServiceCall service_call)
None async_remove(self, str domain, str service)
ServiceResponse _execute_service(self, Service handler, ServiceCall service_call)
ServiceResponse async_call(self, str domain, str service, dict[str, Any]|None service_data=None, bool blocking=False, Context|None context=None, dict[str, Any]|None target=None, bool return_response=False)
None __init__(self, HomeAssistant hass)
dict[str, dict[str, Service]] services(self)
None __init__(self, Callable[[ServiceCall], Coroutine[Any, Any, ServiceResponse|EntityServiceResponse]|ServiceResponse|EntityServiceResponse|None,] func, VolSchemaType|None schema, str domain, str service, Context|None context=None, SupportsResponse supports_response=SupportsResponse.NONE, HassJobType|None job_type=None)
list[str] async_entity_ids(self, str|Iterable[str]|None domain_filter=None)
None __init__(self, EventBus bus, asyncio.events.AbstractEventLoop loop)
bool is_state(self, str entity_id, str state)
None async_reserve(self, str entity_id)
bool async_remove(self, str entity_id, Context|None context=None)
None set(self, str entity_id, str new_state, Mapping[str, Any]|None attributes=None, bool force_update=False, Context|None context=None)
None async_set_internal(self, str entity_id, str new_state, Mapping[str, Any]|None attributes, bool force_update, Context|None context, StateInfo|None state_info, float timestamp)
None async_set(self, str entity_id, str new_state, Mapping[str, Any]|None attributes=None, bool force_update=False, Context|None context=None, StateInfo|None state_info=None, float|None timestamp=None)
State|None get(self, str entity_id)
list[State] all(self, str|Iterable[str]|None domain_filter=None)
list[State] async_all(self, str|Iterable[str]|None domain_filter=None)
bool async_available(self, str entity_id)
bool remove(self, str entity_id)
list[str] entity_ids(self, str|None domain_filter=None)
int async_entity_ids_count(self, str|Iterable[str]|None domain_filter=None)
bytes as_compressed_state_json(self)
Self|None from_dict(cls, dict[str, Any] json_dict)
CompressedState as_compressed_state(self)
ReadOnlyDict[str, datetime.datetime|Collection[Any]] as_dict(self)
json_fragment json_fragment(self)
ReadOnlyDict[str, datetime.datetime|Collection[Any]] _as_read_only_dict(self)
None __init__(self, str entity_id, str state, Mapping[str, Any]|None attributes=None, datetime.datetime|None last_changed=None, datetime.datetime|None last_reported=None, datetime.datetime|None last_updated=None, Context|None context=None, bool|None validate_entity_id=True, StateInfo|None state_info=None, float|None last_updated_timestamp=None)
float last_reported_timestamp(self)
float last_changed_timestamp(self)
dict[str, Any] _as_dict(self)
ValuesView[State]|tuple[()] domain_states(self, str key)
ValuesView[State] values(self)
None __setitem__(self, str key, State entry)
KeysView[str]|tuple[()] domain_entity_ids(self, str key)
None __delitem__(self, str key)
None __call__(self, Event[_DataT] event)
bool add(self, _T matcher)
bool remove(self, _T matcher)
web.Response get(self, web.Request request, str config_key)
str _event_repr(EventType[_DataT]|str event_type, EventOrigin origin, _DataT|None data)
bool is_callback_check_partial(Callable[..., Any] target)
bool valid_entity_id(str entity_id)
HomeAssistant|None async_get_hass_or_none()
None _verify_event_type_length_or_raise(EventType[_DataT]|str event_type)
Any _deprecated_core_config()
HomeAssistant async_get_hass()
HassJobType get_hassjob_callable_job_type(Callable[..., Any] target)
bool valid_domain(str domain)
tuple[str, str] split_entity_id(str entity_id)
str validate_state(str state)
bool is_callback(Callable[..., Any] func)
ReleaseChannel get_release_channel()
list[str] all_with_deprecated_constants(dict[str, Any] module_globals)
None async_register_signal_handling(HomeAssistant hass)
list[TimerHandle] get_scheduled_timer_handles(AbstractEventLoop loop)
bool cancelling(Future[Any] task)
None shutdown_run_callback_threadsafe(AbstractEventLoop loop)