1 """Support for scripts."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
7 from dataclasses
import dataclass
9 from typing
import TYPE_CHECKING, Any, cast
11 from propcache
import cached_property
12 import voluptuous
as vol
68 from .config
import ScriptConfig, ValidationStatus
80 from .helpers
import async_get_blueprints
81 from .trace
import trace_script
83 SCRIPT_SERVICE_SCHEMA = vol.Schema(dict)
85 {vol.Optional(ATTR_VARIABLES): {str: cv.match_all}}
87 RELOAD_SERVICE_SCHEMA = vol.Schema({})
91 def is_on(hass: HomeAssistant, entity_id: str) -> bool:
92 """Return if the script is on based on the statemachine."""
93 return hass.states.is_state(entity_id, STATE_ON)
97 hass: HomeAssistant, referenced_id: str, property_name: str
99 """Return all scripts that reference the x."""
100 if DOMAIN
not in hass.data:
103 component: EntityComponent[BaseScriptEntity] = hass.data[DOMAIN]
106 script_entity.entity_id
107 for script_entity
in component.entities
108 if referenced_id
in getattr(script_entity, property_name)
112 def _x_in_script(hass: HomeAssistant, entity_id: str, property_name: str) -> list[str]:
113 """Return all x in a script."""
114 if DOMAIN
not in hass.data:
117 component: EntityComponent[BaseScriptEntity] = hass.data[DOMAIN]
119 if (script_entity := component.get_entity(entity_id))
is None:
122 return list(getattr(script_entity, property_name))
127 """Return all scripts that reference the entity."""
133 """Return all entities in script."""
134 return _x_in_script(hass, entity_id,
"referenced_entities")
139 """Return all scripts that reference the device."""
145 """Return all devices in script."""
146 return _x_in_script(hass, entity_id,
"referenced_devices")
151 """Return all scripts that reference the area."""
157 """Return all areas in a script."""
158 return _x_in_script(hass, entity_id,
"referenced_areas")
163 """Return all scripts that reference the floor."""
169 """Return all floors in a script."""
170 return _x_in_script(hass, entity_id,
"referenced_floors")
175 """Return all scripts that reference the label."""
181 """Return all labels in a script."""
182 return _x_in_script(hass, entity_id,
"referenced_labels")
187 """Return all scripts that reference the blueprint."""
188 if DOMAIN
not in hass.data:
191 component: EntityComponent[BaseScriptEntity] = hass.data[DOMAIN]
194 script_entity.entity_id
195 for script_entity
in component.entities
196 if script_entity.referenced_blueprint == blueprint_path
202 """Return the blueprint the script is based on or None."""
203 if DOMAIN
not in hass.data:
206 component: EntityComponent[BaseScriptEntity] = hass.data[DOMAIN]
208 if (script_entity := component.get_entity(entity_id))
is None:
211 return script_entity.referenced_blueprint
214 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
215 """Load the scripts from the configuration."""
216 hass.data[DOMAIN] = component = EntityComponent[BaseScriptEntity](
230 hass.async_create_task(
234 async
def reload_service(service: ServiceCall) ->
None:
235 """Call a service to reload scripts."""
237 if (conf := await component.async_prepare_reload(skip_reset=
True))
is None:
241 async
def turn_on_service(service: ServiceCall) ->
None:
242 """Call a service to turn script on."""
243 variables = service.data.get(ATTR_VARIABLES)
244 script_entities = await component.async_extract_from_service(service)
245 for script_entity
in script_entities:
246 await script_entity.async_turn_on(
247 variables=variables, context=service.context, wait=
False
250 async
def turn_off_service(service: ServiceCall) ->
None:
251 """Cancel a script."""
253 script_entities = await component.async_extract_from_service(service)
255 if not script_entities:
260 create_eager_task(script_entity.async_turn_off())
261 for script_entity
in script_entities
265 async
def toggle_service(service: ServiceCall) ->
None:
266 """Toggle a script."""
267 script_entities = await component.async_extract_from_service(service)
268 for script_entity
in script_entities:
269 await script_entity.async_toggle(context=service.context, wait=
False)
271 hass.services.async_register(
272 DOMAIN, SERVICE_RELOAD, reload_service, schema=RELOAD_SERVICE_SCHEMA
274 hass.services.async_register(
275 DOMAIN, SERVICE_TURN_ON, turn_on_service, schema=SCRIPT_TURN_ONOFF_SCHEMA
277 hass.services.async_register(
278 DOMAIN, SERVICE_TURN_OFF, turn_off_service, schema=SCRIPT_TURN_ONOFF_SCHEMA
280 hass.services.async_register(
281 DOMAIN, SERVICE_TOGGLE, toggle_service, schema=SCRIPT_TURN_ONOFF_SCHEMA
283 websocket_api.async_register_command(hass, websocket_config)
288 @dataclass(slots=True)
290 """Container for prepared script entity configuration."""
292 config_block: ConfigType
294 raw_blueprint_inputs: ConfigType |
None
295 raw_config: ConfigType |
None
296 validation_error: str |
None
297 validation_status: ValidationStatus
303 ) -> list[ScriptEntityConfig]:
304 """Parse configuration and prepare script entity configuration."""
305 script_configs: list[ScriptEntityConfig] = []
307 conf: dict[str, ConfigType] = config[DOMAIN]
309 for key, config_block
in conf.items():
310 raw_config = cast(ScriptConfig, config_block).raw_config
311 raw_blueprint_inputs = cast(ScriptConfig, config_block).raw_blueprint_inputs
312 validation_error = cast(ScriptConfig, config_block).validation_error
313 validation_status = cast(ScriptConfig, config_block).validation_status
315 script_configs.append(
319 raw_blueprint_inputs,
326 return script_configs
330 hass: HomeAssistant, script_configs: list[ScriptEntityConfig]
331 ) -> list[BaseScriptEntity]:
332 """Create script entities from prepared configuration."""
333 entities: list[BaseScriptEntity] = []
335 for script_config
in script_configs:
336 if script_config.validation_status != ValidationStatus.OK:
340 script_config.raw_config,
341 cast(str, script_config.validation_error),
342 script_config.validation_status,
350 script_config.config_block,
351 script_config.raw_config,
352 script_config.raw_blueprint_inputs,
354 entities.append(entity)
362 component: EntityComponent[BaseScriptEntity],
364 """Process script configuration."""
367 def script_matches_config(
368 script: BaseScriptEntity, config: ScriptEntityConfig
370 return script.unique_id == config.key
and script.raw_config == config.raw_config
373 scripts: list[BaseScriptEntity],
374 script_configs: list[ScriptEntityConfig],
375 ) -> tuple[set[int], set[int]]:
376 """Find matches between a list of script entities and a list of configurations.
378 A script or configuration is only allowed to match at most once to handle
379 the case of multiple scripts with identical configuration.
381 Returns a tuple of sets of indices: ({script_matches}, {config_matches})
383 script_matches: set[int] = set()
384 config_matches: set[int] = set()
386 for script_idx, script
in enumerate(scripts):
387 for config_idx, script_config
in enumerate(script_configs):
388 if config_idx
in config_matches:
391 if script_matches_config(script, script_config):
392 script_matches.add(script_idx)
393 config_matches.add(config_idx)
397 return script_matches, config_matches
400 scripts: list[BaseScriptEntity] =
list(component.entities)
403 script_matches, config_matches = find_matches(scripts, script_configs)
407 script.async_remove()
408 for idx, script
in enumerate(scripts)
409 if idx
not in script_matches
411 await asyncio.gather(*tasks)
414 updated_script_configs = [
415 config
for idx, config
in enumerate(script_configs)
if idx
not in config_matches
418 await component.async_add_entities(entities)
422 """Base class for script entities."""
424 _entity_component_unrecorded_attributes = frozenset(
425 {ATTR_LAST_TRIGGERED, ATTR_MODE, ATTR_CUR, ATTR_MAX, ATTR_LAST_ACTION}
428 raw_config: ConfigType |
None
433 """Return a set of referenced labels."""
438 """Return a set of referenced floors."""
443 """Return a set of referenced areas."""
448 """Return referenced blueprint or None."""
453 """Return a set of referenced devices."""
458 """Return a set of referenced entities."""
461 class UnavailableScriptEntity(BaseScriptEntity):
462 """A non-functional script entity with its state set to unavailable.
464 This class is instantiated when an script fails to validate.
467 _attr_should_poll =
False
468 _attr_available =
False
473 raw_config: ConfigType |
None,
474 validation_error: str,
475 validation_status: ValidationStatus,
477 """Initialize a script entity."""
478 self.
_attr_name_attr_name = raw_config.get(CONF_ALIAS, key)
if raw_config
else key
486 """Return a set of referenced labels."""
491 """Return a set of referenced floors."""
496 """Return a set of referenced areas."""
501 """Return referenced blueprint or None."""
506 """Return a set of referenced devices."""
511 """Return a set of referenced entities."""
515 """Create a repair issue to notify the user the automation has errors."""
520 f
"{self.entity_id}_validation_{self._validation_status}",
522 severity=IssueSeverity.ERROR,
523 translation_key=f
"validation_{self._validation_status}",
524 translation_placeholders={
525 "edit": f
"/config/script/edit/{self.unique_id}",
533 """Run when entity will be removed from hass."""
536 self.
hasshass, DOMAIN, f
"{self.entity_id}_validation_{self._validation_status}"
541 """Representation of a script entity."""
544 _attr_should_poll =
False
552 raw_config: ConfigType |
None,
553 blueprint_inputs: ConfigType |
None,
555 """Initialize the script."""
556 self.
iconiconicon = cfg.get(CONF_ICON)
567 cfg.get(CONF_ALIAS, key),
569 running_description=
"script sequence",
571 script_mode=cfg[CONF_MODE],
572 max_runs=cfg[CONF_MAX],
573 max_exceeded=cfg[CONF_MAX_EXCEEDED],
574 logger=logging.getLogger(f
"{__name__}.{key}"),
575 variables=cfg.get(CONF_VARIABLES),
585 """Return the state attributes."""
586 script = self.
scriptscript
588 ATTR_LAST_TRIGGERED: script.last_triggered,
589 ATTR_MODE: script.script_mode,
590 ATTR_CUR: script.runs,
592 if script.supports_max:
593 attrs[ATTR_MAX] = script.max_runs
594 if script.last_action:
595 attrs[ATTR_LAST_ACTION] = script.last_action
600 """Return true if script is on."""
601 return self.
scriptscript.is_running
605 """Return a set of referenced labels."""
606 return self.
scriptscript.referenced_labels
610 """Return a set of referenced floors."""
611 return self.
scriptscript.referenced_floors
615 """Return a set of referenced areas."""
616 return self.
scriptscript.referenced_areas
620 """Return referenced blueprint or None."""
623 path: str = self.
_blueprint_inputs_blueprint_inputs[CONF_USE_BLUEPRINT][CONF_PATH]
628 """Return a set of referenced devices."""
629 return self.
scriptscript.referenced_devices
633 """Return a set of referenced entities."""
634 return self.
scriptscript.referenced_entities
645 Depending on the script's run mode, this may do nothing, restart the script or
646 fire an additional parallel run.
648 variables: dict[str, Any] |
None = kwargs.get(
"variables")
649 context: Context = kwargs[
"context"]
650 wait: bool = kwargs.get(
"wait",
True)
654 self, variables: dict[str, Any] |
None, context: Context, wait: bool
655 ) -> ServiceResponse:
656 """Start the run of a script."""
658 self.
hasshass.bus.async_fire(
659 EVENT_SCRIPT_STARTED,
663 coro = self.
_async_run_async_run(variables, context)
671 if script_stack := script_stack_cv.get():
672 script_stack_cv.set(script_stack.copy())
674 script_result = await coro
675 return script_result.service_response
if script_result
else None
680 script_stack_cv.set([])
683 self.
hasshass.async_create_task(coro, eager_start=
True)
690 self, variables: dict[str, Any] |
None, context: Context
691 ) -> ScriptRunResult |
None:
705 this = state.as_dict()
706 script_vars = {
"this": this, **(variables
or {})}
710 """Stop running the script.
712 If multiple runs are in progress, all will be stopped.
717 """Execute a service call to script.<script name>."""
719 variables=service.data, context=service.context, wait=
True
721 if service.return_response:
722 return response
or {}
726 """Restore last triggered on startup and register service."""
728 assert self.
unique_idunique_id
is not None
733 hass.services.async_register(
737 schema=SCRIPT_SERVICE_SCHEMA,
738 supports_response=SupportsResponse.OPTIONAL,
745 CONF_FIELDS: self.
fieldsfields,
750 last_triggered := state.attributes.get(
"last_triggered")
755 """Stop script and remove service when it will be removed from HA."""
762 @websocket_api.websocket_command({"type": "script/config", "entity_id": str})
768 """Get script config."""
769 component: EntityComponent[BaseScriptEntity] = hass.data[DOMAIN]
771 script = component.get_entity(msg[
"entity_id"])
774 connection.send_error(
775 msg[
"id"], websocket_api.ERR_NOT_FOUND,
"Entity not found"
779 connection.send_result(
782 "config": script.raw_config,
set[str] referenced_labels(self)
set[str] referenced_areas(self)
set[str] referenced_devices(self)
str|None referenced_blueprint(self)
set[str] referenced_floors(self)
set[str] referenced_entities(self)
None async_will_remove_from_hass(self)
ServiceResponse _async_start_run(self, dict[str, Any]|None variables, Context context, bool wait)
set[str] referenced_devices(self)
None async_turn_on(self, **Any kwargs)
None async_added_to_hass(self)
str|None referenced_blueprint(self)
ScriptRunResult|None _async_run(self, dict[str, Any]|None variables, Context context)
None async_change_listener(self)
dict[str, Any] extra_state_attributes(self)
set[str] referenced_areas(self)
ServiceResponse _service_handler(self, ServiceCall service)
set[str] referenced_labels(self)
set[str] referenced_entities(self)
None async_turn_off(self, **Any kwargs)
set[str] referenced_floors(self)
None __init__(self, HomeAssistant hass, str key, ConfigType cfg, ConfigType|None raw_config, ConfigType|None blueprint_inputs)
set[str] referenced_areas(self)
set[str] referenced_devices(self)
str|None referenced_blueprint(self)
None async_added_to_hass(self)
set[str] referenced_floors(self)
set[str] referenced_entities(self)
set[str] referenced_labels(self)
None async_will_remove_from_hass(self)
None __init__(self, str key, ConfigType|None raw_config, str validation_error, ValidationStatus validation_status)
None async_write_ha_state(self)
str|UndefinedType|None name(self)
None async_set_context(self, Context context)
State|None async_get_last_state(self)
blueprint.DomainBlueprints async_get_blueprints(HomeAssistant hass)
datetime|None parse_datetime(str|None value)
None async_stop(HomeAssistant hass)
None async_create_issue(HomeAssistant hass, str entry_id)
None async_delete_issue(HomeAssistant hass, str entry_id)
Iterator[ScriptTrace] trace_script(HomeAssistant hass, str item_id, dict[str, Any]|None config, dict[str, Any]|None blueprint_inputs, Context context, dict[str, Any] trace_config)
list[str] scripts_with_floor(HomeAssistant hass, str floor_id)
list[str] scripts_with_device(HomeAssistant hass, str device_id)
None websocket_config(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
list[str] _scripts_with_x(HomeAssistant hass, str referenced_id, str property_name)
list[str] labels_in_script(HomeAssistant hass, str entity_id)
bool async_setup(HomeAssistant hass, ConfigType config)
bool is_on(HomeAssistant hass, str entity_id)
str|None blueprint_in_script(HomeAssistant hass, str entity_id)
None _async_process_config(HomeAssistant hass, ConfigType config, EntityComponent[BaseScriptEntity] component)
list[str] areas_in_script(HomeAssistant hass, str entity_id)
list[str] _x_in_script(HomeAssistant hass, str entity_id, str property_name)
list[str] devices_in_script(HomeAssistant hass, str entity_id)
list[str] scripts_with_blueprint(HomeAssistant hass, str blueprint_path)
list[str] scripts_with_label(HomeAssistant hass, str label_id)
list[str] floors_in_script(HomeAssistant hass, str entity_id)
list[str] scripts_with_area(HomeAssistant hass, str area_id)
list[str] scripts_with_entity(HomeAssistant hass, str entity_id)
list[ScriptEntityConfig] _prepare_script_config(HomeAssistant hass, ConfigType config)
list[BaseScriptEntity] _create_script_entities(HomeAssistant hass, list[ScriptEntityConfig] script_configs)
list[str] entities_in_script(HomeAssistant hass, str entity_id)
VolSchemaType make_entity_service_schema(dict|None schema, *int extra=vol.PREVENT_EXTRA)
None async_set_service_schema(HomeAssistant hass, str domain, str service, dict[str, Any] schema)
Generator[None] trace_path(str|list[str] suffix)
dict[str, deque[TraceElement]]|None trace_get(bool clear=True)
def async_run(config_dir)