Home Assistant Unofficial Reference 2024.12.1
service.py
Go to the documentation of this file.
1 """Service calling related helpers."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Awaitable, Callable, Coroutine, Iterable
7 import dataclasses
8 from enum import Enum
9 from functools import cache, partial
10 import logging
11 from types import ModuleType
12 from typing import TYPE_CHECKING, Any, TypedDict, TypeGuard, cast
13 
14 import voluptuous as vol
15 
16 from homeassistant.auth.permissions.const import CAT_ENTITIES, POLICY_CONTROL
17 from homeassistant.const import (
18  ATTR_AREA_ID,
19  ATTR_DEVICE_ID,
20  ATTR_ENTITY_ID,
21  ATTR_FLOOR_ID,
22  ATTR_LABEL_ID,
23  CONF_ACTION,
24  CONF_ENTITY_ID,
25  CONF_SERVICE_DATA,
26  CONF_SERVICE_DATA_TEMPLATE,
27  CONF_SERVICE_TEMPLATE,
28  CONF_TARGET,
29  ENTITY_MATCH_ALL,
30  ENTITY_MATCH_NONE,
31 )
32 from homeassistant.core import (
33  Context,
34  EntityServiceResponse,
35  HassJob,
36  HassJobType,
37  HomeAssistant,
38  ServiceCall,
39  ServiceResponse,
40  SupportsResponse,
41  callback,
42 )
43 from homeassistant.exceptions import (
44  HomeAssistantError,
45  TemplateError,
46  Unauthorized,
47  UnknownUser,
48 )
49 from homeassistant.loader import Integration, async_get_integrations, bind_hass
50 from homeassistant.util.async_ import create_eager_task
51 from homeassistant.util.hass_dict import HassKey
52 from homeassistant.util.yaml import load_yaml_dict
53 from homeassistant.util.yaml.loader import JSON_TYPE
54 
55 from . import (
56  area_registry,
57  config_validation as cv,
58  device_registry,
59  entity_registry,
60  floor_registry,
61  label_registry,
62  template,
63  translation,
64 )
65 from .group import expand_entity_ids
66 from .selector import TargetSelector
67 from .typing import ConfigType, TemplateVarsType, VolDictType, VolSchemaType
68 
69 if TYPE_CHECKING:
70  from .entity import Entity
71 
72 CONF_SERVICE_ENTITY_ID = "entity_id"
73 
74 _LOGGER = logging.getLogger(__name__)
75 
76 SERVICE_DESCRIPTION_CACHE: HassKey[dict[tuple[str, str], dict[str, Any] | None]] = (
77  HassKey("service_description_cache")
78 )
79 ALL_SERVICE_DESCRIPTIONS_CACHE: HassKey[
80  tuple[set[tuple[str, str]], dict[str, dict[str, Any]]]
81 ] = HassKey("all_service_descriptions_cache")
82 
83 
84 @cache
85 def _base_components() -> dict[str, ModuleType]:
86  """Return a cached lookup of base components."""
87  # pylint: disable-next=import-outside-toplevel
88  from homeassistant.components import (
89  alarm_control_panel,
90  calendar,
91  camera,
92  climate,
93  cover,
94  fan,
95  humidifier,
96  light,
97  lock,
98  media_player,
99  notify,
100  remote,
101  siren,
102  todo,
103  update,
104  vacuum,
105  water_heater,
106  )
107 
108  return {
109  "alarm_control_panel": alarm_control_panel,
110  "calendar": calendar,
111  "camera": camera,
112  "climate": climate,
113  "cover": cover,
114  "fan": fan,
115  "humidifier": humidifier,
116  "light": light,
117  "lock": lock,
118  "media_player": media_player,
119  "notify": notify,
120  "remote": remote,
121  "siren": siren,
122  "todo": todo,
123  "update": update,
124  "vacuum": vacuum,
125  "water_heater": water_heater,
126  }
127 
128 
129 def _validate_option_or_feature(option_or_feature: str, label: str) -> Any:
130  """Validate attribute option or supported feature."""
131  try:
132  domain, enum, option = option_or_feature.split(".", 2)
133  except ValueError as exc:
134  raise vol.Invalid(
135  f"Invalid {label} '{option_or_feature}', expected "
136  "<domain>.<enum>.<member>"
137  ) from exc
138 
139  base_components = _base_components()
140  if not (base_component := base_components.get(domain)):
141  raise vol.Invalid(f"Unknown base component '{domain}'")
142 
143  try:
144  attribute_enum = getattr(base_component, enum)
145  except AttributeError as exc:
146  raise vol.Invalid(f"Unknown {label} enum '{domain}.{enum}'") from exc
147 
148  if not issubclass(attribute_enum, Enum):
149  raise vol.Invalid(f"Expected {label} '{domain}.{enum}' to be an enum")
150 
151  try:
152  return getattr(attribute_enum, option).value
153  except AttributeError as exc:
154  raise vol.Invalid(f"Unknown {label} '{enum}.{option}'") from exc
155 
156 
157 def validate_attribute_option(attribute_option: str) -> Any:
158  """Validate attribute option."""
159  return _validate_option_or_feature(attribute_option, "attribute option")
160 
161 
162 def validate_supported_feature(supported_feature: str) -> Any:
163  """Validate supported feature."""
164  return _validate_option_or_feature(supported_feature, "supported feature")
165 
166 
167 # Basic schemas which translate attribute and supported feature enum names
168 # to their values. Full validation is done by hassfest.services
169 _FIELD_SCHEMA = vol.Schema(
170  {
171  vol.Optional("filter"): {
172  vol.Optional("attribute"): {
173  vol.Required(str): [vol.All(str, validate_attribute_option)],
174  },
175  vol.Optional("supported_features"): [
176  vol.All(str, validate_supported_feature)
177  ],
178  },
179  },
180  extra=vol.ALLOW_EXTRA,
181 )
182 
183 _SECTION_SCHEMA = vol.Schema(
184  {
185  vol.Required("fields"): vol.Schema({str: _FIELD_SCHEMA}),
186  },
187  extra=vol.ALLOW_EXTRA,
188 )
189 
190 _SERVICE_SCHEMA = vol.Schema(
191  {
192  vol.Optional("target"): vol.Any(TargetSelector.CONFIG_SCHEMA, None),
193  vol.Optional("fields"): vol.Schema(
194  {str: vol.Any(_SECTION_SCHEMA, _FIELD_SCHEMA)}
195  ),
196  },
197  extra=vol.ALLOW_EXTRA,
198 )
199 
200 
201 def starts_with_dot(key: str) -> str:
202  """Check if key starts with dot."""
203  if not key.startswith("."):
204  raise vol.Invalid("Key does not start with .")
205  return key
206 
207 
208 _SERVICES_SCHEMA = vol.Schema(
209  {
210  vol.Remove(vol.All(str, starts_with_dot)): object,
211  cv.slug: vol.Any(None, _SERVICE_SCHEMA),
212  }
213 )
214 
215 
216 class ServiceParams(TypedDict):
217  """Type for service call parameters."""
218 
219  domain: str
220  service: str
221  service_data: dict[str, Any]
222  target: dict | None
223 
224 
226  """Class to hold a target selector for a service."""
227 
228  __slots__ = ("entity_ids", "device_ids", "area_ids", "floor_ids", "label_ids")
229 
230  def __init__(self, service_call: ServiceCall) -> None:
231  """Extract ids from service call data."""
232  service_call_data = service_call.data
233  entity_ids: str | list | None = service_call_data.get(ATTR_ENTITY_ID)
234  device_ids: str | list | None = service_call_data.get(ATTR_DEVICE_ID)
235  area_ids: str | list | None = service_call_data.get(ATTR_AREA_ID)
236  floor_ids: str | list | None = service_call_data.get(ATTR_FLOOR_ID)
237  label_ids: str | list | None = service_call_data.get(ATTR_LABEL_ID)
238 
239  self.entity_idsentity_ids = (
240  set(cv.ensure_list(entity_ids)) if _has_match(entity_ids) else set()
241  )
242  self.device_idsdevice_ids = (
243  set(cv.ensure_list(device_ids)) if _has_match(device_ids) else set()
244  )
245  self.area_idsarea_ids = set(cv.ensure_list(area_ids)) if _has_match(area_ids) else set()
246  self.floor_idsfloor_ids = (
247  set(cv.ensure_list(floor_ids)) if _has_match(floor_ids) else set()
248  )
249  self.label_idslabel_ids = (
250  set(cv.ensure_list(label_ids)) if _has_match(label_ids) else set()
251  )
252 
253  @property
254  def has_any_selector(self) -> bool:
255  """Determine if any selectors are present."""
256  return bool(
257  self.entity_idsentity_ids
258  or self.device_idsdevice_ids
259  or self.area_idsarea_ids
260  or self.floor_idsfloor_ids
261  or self.label_idslabel_ids
262  )
263 
264 
265 @dataclasses.dataclass(slots=True)
267  """Class to hold the selected entities."""
268 
269  # Entities that were explicitly mentioned.
270  referenced: set[str] = dataclasses.field(default_factory=set)
271 
272  # Entities that were referenced via device/area/floor/label ID.
273  # Should not trigger a warning when they don't exist.
274  indirectly_referenced: set[str] = dataclasses.field(default_factory=set)
275 
276  # Referenced items that could not be found.
277  missing_devices: set[str] = dataclasses.field(default_factory=set)
278  missing_areas: set[str] = dataclasses.field(default_factory=set)
279  missing_floors: set[str] = dataclasses.field(default_factory=set)
280  missing_labels: set[str] = dataclasses.field(default_factory=set)
281 
282  # Referenced devices
283  referenced_devices: set[str] = dataclasses.field(default_factory=set)
284  referenced_areas: set[str] = dataclasses.field(default_factory=set)
285 
286  def log_missing(self, missing_entities: set[str]) -> None:
287  """Log about missing items."""
288  parts = []
289  for label, items in (
290  ("floors", self.missing_floors),
291  ("areas", self.missing_areas),
292  ("devices", self.missing_devices),
293  ("entities", missing_entities),
294  ("labels", self.missing_labels),
295  ):
296  if items:
297  parts.append(f"{label} {', '.join(sorted(items))}")
298 
299  if not parts:
300  return
301 
302  _LOGGER.warning(
303  "Referenced %s are missing or not currently available",
304  ", ".join(parts),
305  )
306 
307 
308 @bind_hass
310  hass: HomeAssistant,
311  config: ConfigType,
312  blocking: bool = False,
313  variables: TemplateVarsType = None,
314  validate_config: bool = True,
315 ) -> None:
316  """Call a service based on a config hash."""
317  asyncio.run_coroutine_threadsafe(
318  async_call_from_config(hass, config, blocking, variables, validate_config),
319  hass.loop,
320  ).result()
321 
322 
323 @bind_hass
325  hass: HomeAssistant,
326  config: ConfigType,
327  blocking: bool = False,
328  variables: TemplateVarsType = None,
329  validate_config: bool = True,
330  context: Context | None = None,
331 ) -> None:
332  """Call a service based on a config hash."""
333  try:
335  hass, config, variables, validate_config
336  )
337  except HomeAssistantError as ex:
338  if blocking:
339  raise
340  _LOGGER.error(ex)
341  else:
342  await hass.services.async_call(**params, blocking=blocking, context=context)
343 
344 
345 @callback
346 @bind_hass
348  hass: HomeAssistant,
349  config: ConfigType,
350  variables: TemplateVarsType = None,
351  validate_config: bool = False,
352 ) -> ServiceParams:
353  """Prepare to call a service based on a config hash."""
354  if validate_config:
355  try:
356  config = cv.SERVICE_SCHEMA(config)
357  except vol.Invalid as ex:
358  raise HomeAssistantError(
359  f"Invalid config for calling service: {ex}"
360  ) from ex
361 
362  if CONF_ACTION in config:
363  domain_service = config[CONF_ACTION]
364  else:
365  domain_service = config[CONF_SERVICE_TEMPLATE]
366 
367  if isinstance(domain_service, template.Template):
368  try:
369  domain_service = domain_service.async_render(variables)
370  domain_service = cv.service(domain_service)
371  except TemplateError as ex:
372  raise HomeAssistantError(
373  f"Error rendering service name template: {ex}"
374  ) from ex
375  except vol.Invalid as ex:
376  raise HomeAssistantError(
377  f"Template rendered invalid service: {domain_service}"
378  ) from ex
379 
380  domain, _, service = domain_service.partition(".")
381 
382  target = {}
383  if CONF_TARGET in config:
384  conf = config[CONF_TARGET]
385  try:
386  if isinstance(conf, template.Template):
387  target.update(conf.async_render(variables))
388  else:
389  target.update(template.render_complex(conf, variables))
390 
391  if CONF_ENTITY_ID in target:
392  registry = entity_registry.async_get(hass)
393  entity_ids = cv.comp_entity_ids_or_uuids(target[CONF_ENTITY_ID])
394  if entity_ids not in (ENTITY_MATCH_ALL, ENTITY_MATCH_NONE):
395  entity_ids = entity_registry.async_validate_entity_ids(
396  registry, entity_ids
397  )
398  target[CONF_ENTITY_ID] = entity_ids
399  except TemplateError as ex:
400  raise HomeAssistantError(
401  f"Error rendering service target template: {ex}"
402  ) from ex
403  except vol.Invalid as ex:
404  raise HomeAssistantError(
405  f"Template rendered invalid entity IDs: {target[CONF_ENTITY_ID]}"
406  ) from ex
407 
408  service_data = {}
409 
410  for conf in (CONF_SERVICE_DATA, CONF_SERVICE_DATA_TEMPLATE):
411  if conf not in config:
412  continue
413  try:
414  render = template.render_complex(config[conf], variables)
415  if not isinstance(render, dict):
416  raise HomeAssistantError(
417  "Error rendering data template: Result is not a Dictionary"
418  )
419  service_data.update(render)
420  except TemplateError as ex:
421  raise HomeAssistantError(f"Error rendering data template: {ex}") from ex
422 
423  if CONF_SERVICE_ENTITY_ID in config:
424  if target:
425  target[ATTR_ENTITY_ID] = config[CONF_SERVICE_ENTITY_ID]
426  else:
427  target = {ATTR_ENTITY_ID: config[CONF_SERVICE_ENTITY_ID]}
428 
429  return {
430  "domain": domain,
431  "service": service,
432  "service_data": service_data,
433  "target": target,
434  }
435 
436 
437 @bind_hass
439  hass: HomeAssistant, service_call: ServiceCall, expand_group: bool = True
440 ) -> set[str]:
441  """Extract a list of entity ids from a service call.
442 
443  Will convert group entity ids to the entity ids it represents.
444  """
445  return asyncio.run_coroutine_threadsafe(
446  async_extract_entity_ids(hass, service_call, expand_group), hass.loop
447  ).result()
448 
449 
450 @bind_hass
451 async def async_extract_entities[_EntityT: Entity](
452  hass: HomeAssistant,
453  entities: Iterable[_EntityT],
454  service_call: ServiceCall,
455  expand_group: bool = True,
456 ) -> list[_EntityT]:
457  """Extract a list of entity objects from a service call.
458 
459  Will convert group entity ids to the entity ids it represents.
460  """
461  data_ent_id = service_call.data.get(ATTR_ENTITY_ID)
462 
463  if data_ent_id == ENTITY_MATCH_ALL:
464  return [entity for entity in entities if entity.available]
465 
466  referenced = async_extract_referenced_entity_ids(hass, service_call, expand_group)
467  combined = referenced.referenced | referenced.indirectly_referenced
468 
469  found = []
470 
471  for entity in entities:
472  if entity.entity_id not in combined:
473  continue
474 
475  combined.remove(entity.entity_id)
476 
477  if not entity.available:
478  continue
479 
480  found.append(entity)
481 
482  referenced.log_missing(referenced.referenced & combined)
483 
484  return found
485 
486 
487 @bind_hass
489  hass: HomeAssistant, service_call: ServiceCall, expand_group: bool = True
490 ) -> set[str]:
491  """Extract a set of entity ids from a service call.
492 
493  Will convert group entity ids to the entity ids it represents.
494  """
495  referenced = async_extract_referenced_entity_ids(hass, service_call, expand_group)
496  return referenced.referenced | referenced.indirectly_referenced
497 
498 
499 def _has_match(ids: str | list[str] | None) -> TypeGuard[str | list[str]]:
500  """Check if ids can match anything."""
501  return ids not in (None, ENTITY_MATCH_NONE)
502 
503 
504 @bind_hass
506  hass: HomeAssistant, service_call: ServiceCall, expand_group: bool = True
507 ) -> SelectedEntities:
508  """Extract referenced entity IDs from a service call."""
509  selector = ServiceTargetSelector(service_call)
510  selected = SelectedEntities()
511 
512  if not selector.has_any_selector:
513  return selected
514 
515  entity_ids: set[str] | list[str] = selector.entity_ids
516  if expand_group:
517  entity_ids = expand_entity_ids(hass, entity_ids)
518 
519  selected.referenced.update(entity_ids)
520 
521  if (
522  not selector.device_ids
523  and not selector.area_ids
524  and not selector.floor_ids
525  and not selector.label_ids
526  ):
527  return selected
528 
529  entities = entity_registry.async_get(hass).entities
530  dev_reg = device_registry.async_get(hass)
531  area_reg = area_registry.async_get(hass)
532 
533  if selector.floor_ids:
534  floor_reg = floor_registry.async_get(hass)
535  for floor_id in selector.floor_ids:
536  if floor_id not in floor_reg.floors:
537  selected.missing_floors.add(floor_id)
538 
539  for area_id in selector.area_ids:
540  if area_id not in area_reg.areas:
541  selected.missing_areas.add(area_id)
542 
543  for device_id in selector.device_ids:
544  if device_id not in dev_reg.devices:
545  selected.missing_devices.add(device_id)
546 
547  if selector.label_ids:
548  label_reg = label_registry.async_get(hass)
549  for label_id in selector.label_ids:
550  if label_id not in label_reg.labels:
551  selected.missing_labels.add(label_id)
552 
553  for entity_entry in entities.get_entries_for_label(label_id):
554  if (
555  entity_entry.entity_category is None
556  and entity_entry.hidden_by is None
557  ):
558  selected.indirectly_referenced.add(entity_entry.entity_id)
559 
560  for device_entry in dev_reg.devices.get_devices_for_label(label_id):
561  selected.referenced_devices.add(device_entry.id)
562 
563  for area_entry in area_reg.areas.get_areas_for_label(label_id):
564  selected.referenced_areas.add(area_entry.id)
565 
566  # Find areas for targeted floors
567  if selector.floor_ids:
568  selected.referenced_areas.update(
569  area_entry.id
570  for floor_id in selector.floor_ids
571  for area_entry in area_reg.areas.get_areas_for_floor(floor_id)
572  )
573 
574  selected.referenced_areas.update(selector.area_ids)
575  selected.referenced_devices.update(selector.device_ids)
576 
577  if not selected.referenced_areas and not selected.referenced_devices:
578  return selected
579 
580  # Add indirectly referenced by device
581  selected.indirectly_referenced.update(
582  entry.entity_id
583  for device_id in selected.referenced_devices
584  for entry in entities.get_entries_for_device_id(device_id)
585  # Do not add entities which are hidden or which are config
586  # or diagnostic entities.
587  if (entry.entity_category is None and entry.hidden_by is None)
588  )
589 
590  # Find devices for targeted areas
591  referenced_devices_by_area: set[str] = set()
592  if selected.referenced_areas:
593  for area_id in selected.referenced_areas:
594  referenced_devices_by_area.update(
595  device_entry.id
596  for device_entry in dev_reg.devices.get_devices_for_area_id(area_id)
597  )
598  selected.referenced_devices.update(referenced_devices_by_area)
599 
600  # Add indirectly referenced by area
601  selected.indirectly_referenced.update(
602  entry.entity_id
603  for area_id in selected.referenced_areas
604  # The entity's area matches a targeted area
605  for entry in entities.get_entries_for_area_id(area_id)
606  # Do not add entities which are hidden or which are config
607  # or diagnostic entities.
608  if entry.entity_category is None and entry.hidden_by is None
609  )
610  # Add indirectly referenced by area through device
611  selected.indirectly_referenced.update(
612  entry.entity_id
613  for device_id in referenced_devices_by_area
614  for entry in entities.get_entries_for_device_id(device_id)
615  # Do not add entities which are hidden or which are config
616  # or diagnostic entities.
617  if (
618  entry.entity_category is None
619  and entry.hidden_by is None
620  and (
621  # The entity's device matches a device referenced
622  # by an area and the entity
623  # has no explicitly set area
624  not entry.area_id
625  )
626  )
627  )
628 
629  return selected
630 
631 
632 @bind_hass
634  hass: HomeAssistant, service_call: ServiceCall, expand_group: bool = True
635 ) -> set[str]:
636  """Extract referenced config entry ids from a service call."""
637  referenced = async_extract_referenced_entity_ids(hass, service_call, expand_group)
638  ent_reg = entity_registry.async_get(hass)
639  dev_reg = device_registry.async_get(hass)
640  config_entry_ids: set[str] = set()
641 
642  # Some devices may have no entities
643  for device_id in referenced.referenced_devices:
644  if (
645  device_id in dev_reg.devices
646  and (device := dev_reg.async_get(device_id)) is not None
647  ):
648  config_entry_ids.update(device.config_entries)
649 
650  for entity_id in referenced.referenced | referenced.indirectly_referenced:
651  entry = ent_reg.async_get(entity_id)
652  if entry is not None and entry.config_entry_id is not None:
653  config_entry_ids.add(entry.config_entry_id)
654 
655  return config_entry_ids
656 
657 
658 def _load_services_file(hass: HomeAssistant, integration: Integration) -> JSON_TYPE:
659  """Load services file for an integration."""
660  try:
661  return cast(
662  JSON_TYPE,
664  load_yaml_dict(str(integration.file_path / "services.yaml"))
665  ),
666  )
667  except FileNotFoundError:
668  _LOGGER.warning(
669  "Unable to find services.yaml for the %s integration", integration.domain
670  )
671  return {}
672  except (HomeAssistantError, vol.Invalid) as ex:
673  _LOGGER.warning(
674  "Unable to parse services.yaml for the %s integration: %s",
675  integration.domain,
676  ex,
677  )
678  return {}
679 
680 
682  hass: HomeAssistant, integrations: Iterable[Integration]
683 ) -> list[JSON_TYPE]:
684  """Load service files for multiple integrations."""
685  return [_load_services_file(hass, integration) for integration in integrations]
686 
687 
688 @callback
690  hass: HomeAssistant, domain: str, service: str
691 ) -> dict[str, Any] | None:
692  """Return the cached description for a service."""
693  return hass.data.get(SERVICE_DESCRIPTION_CACHE, {}).get((domain, service))
694 
695 
696 @bind_hass
698  hass: HomeAssistant,
699 ) -> dict[str, dict[str, Any]]:
700  """Return descriptions (i.e. user documentation) for all service calls."""
701  descriptions_cache = hass.data.setdefault(SERVICE_DESCRIPTION_CACHE, {})
702 
703  # We don't mutate services here so we avoid calling
704  # async_services which makes a copy of every services
705  # dict.
706  services = hass.services.async_services_internal()
707 
708  # See if there are new services not seen before.
709  # Any service that we saw before already has an entry in description_cache.
710  all_services = {
711  (domain, service_name)
712  for domain, services_by_domain in services.items()
713  for service_name in services_by_domain
714  }
715  # If we have a complete cache, check if it is still valid
716  all_cache: tuple[set[tuple[str, str]], dict[str, dict[str, Any]]] | None
717  if all_cache := hass.data.get(ALL_SERVICE_DESCRIPTIONS_CACHE):
718  previous_all_services, previous_descriptions_cache = all_cache
719  # If the services are the same, we can return the cache
720  if previous_all_services == all_services:
721  return previous_descriptions_cache
722 
723  # Files we loaded for missing descriptions
724  loaded: dict[str, JSON_TYPE] = {}
725  # We try to avoid making a copy in the event the cache is good,
726  # but now we must make a copy in case new services get added
727  # while we are loading the missing ones so we do not
728  # add the new ones to the cache without their descriptions
729  services = {domain: service.copy() for domain, service in services.items()}
730 
731  if domains_with_missing_services := {
732  domain for domain, _ in all_services.difference(descriptions_cache)
733  }:
734  ints_or_excs = await async_get_integrations(hass, domains_with_missing_services)
735  integrations: list[Integration] = []
736  for domain, int_or_exc in ints_or_excs.items():
737  if type(int_or_exc) is Integration and int_or_exc.has_services:
738  integrations.append(int_or_exc)
739  continue
740  if TYPE_CHECKING:
741  assert isinstance(int_or_exc, Exception)
742  _LOGGER.error("Failed to load integration: %s", domain, exc_info=int_or_exc)
743 
744  if integrations:
745  contents = await hass.async_add_executor_job(
746  _load_services_files, hass, integrations
747  )
748  loaded = dict(zip(domains_with_missing_services, contents, strict=False))
749 
750  # Load translations for all service domains
751  translations = await translation.async_get_translations(
752  hass, "en", "services", services
753  )
754 
755  # Build response
756  descriptions: dict[str, dict[str, Any]] = {}
757  for domain, services_map in services.items():
758  descriptions[domain] = {}
759  domain_descriptions = descriptions[domain]
760 
761  for service_name, service in services_map.items():
762  cache_key = (domain, service_name)
763  description = descriptions_cache.get(cache_key)
764  if description is not None:
765  domain_descriptions[service_name] = description
766  continue
767 
768  # Cache missing descriptions
769  domain_yaml = loaded.get(domain) or {}
770  # The YAML may be empty for dynamically defined
771  # services (ie shell_command) that never call
772  # service.async_set_service_schema for the dynamic
773  # service
774 
775  yaml_description = (
776  domain_yaml.get(service_name) or {} # type: ignore[union-attr]
777  )
778 
779  # Don't warn for missing services, because it triggers false
780  # positives for things like scripts, that register as a service
781  #
782  # When name & description are in the translations use those;
783  # otherwise fallback to backwards compatible behavior from
784  # the time when we didn't have translations for descriptions yet.
785  # This mimics the behavior of the frontend.
786  description = {
787  "name": translations.get(
788  f"component.{domain}.services.{service_name}.name",
789  yaml_description.get("name", ""),
790  ),
791  "description": translations.get(
792  f"component.{domain}.services.{service_name}.description",
793  yaml_description.get("description", ""),
794  ),
795  "fields": dict(yaml_description.get("fields", {})),
796  }
797 
798  # Translate fields names & descriptions as well
799  for field_name, field_schema in description["fields"].items():
800  if name := translations.get(
801  f"component.{domain}.services.{service_name}.fields.{field_name}.name"
802  ):
803  field_schema["name"] = name
804  if desc := translations.get(
805  f"component.{domain}.services.{service_name}.fields.{field_name}.description"
806  ):
807  field_schema["description"] = desc
808  if example := translations.get(
809  f"component.{domain}.services.{service_name}.fields.{field_name}.example"
810  ):
811  field_schema["example"] = example
812 
813  if "target" in yaml_description:
814  description["target"] = yaml_description["target"]
815 
816  response = service.supports_response
817  if response is not SupportsResponse.NONE:
818  description["response"] = {
819  "optional": response is SupportsResponse.OPTIONAL,
820  }
821 
822  descriptions_cache[cache_key] = description
823 
824  domain_descriptions[service_name] = description
825 
826  hass.data[ALL_SERVICE_DESCRIPTIONS_CACHE] = (all_services, descriptions)
827  return descriptions
828 
829 
830 @callback
831 def remove_entity_service_fields(call: ServiceCall) -> dict[Any, Any]:
832  """Remove entity service fields."""
833  return {
834  key: val
835  for key, val in call.data.items()
836  if key not in cv.ENTITY_SERVICE_FIELDS
837  }
838 
839 
840 @callback
841 @bind_hass
843  hass: HomeAssistant, domain: str, service: str, schema: dict[str, Any]
844 ) -> None:
845  """Register a description for a service."""
846  domain = domain.lower()
847  service = service.lower()
848 
849  descriptions_cache = hass.data.setdefault(SERVICE_DESCRIPTION_CACHE, {})
850 
851  description = {
852  "name": schema.get("name", ""),
853  "description": schema.get("description", ""),
854  "fields": schema.get("fields", {}),
855  }
856 
857  if "target" in schema:
858  description["target"] = schema["target"]
859 
860  if (
861  response := hass.services.supports_response(domain, service)
862  ) != SupportsResponse.NONE:
863  description["response"] = {
864  "optional": response == SupportsResponse.OPTIONAL,
865  }
866 
867  hass.data.pop(ALL_SERVICE_DESCRIPTIONS_CACHE, None)
868  descriptions_cache[(domain, service)] = description
869 
870 
872  call: ServiceCall,
873  entities: dict[str, Entity],
874  entity_perms: Callable[[str, str], bool] | None,
875  target_all_entities: bool,
876  all_referenced: set[str] | None,
877 ) -> list[Entity]:
878  """Get entity candidates that the user is allowed to access."""
879  if entity_perms is not None:
880  # Check the permissions since entity_perms is set
881  if target_all_entities:
882  # If we target all entities, we will select all entities the user
883  # is allowed to control.
884  return [
885  entity
886  for entity_id, entity in entities.items()
887  if entity_perms(entity_id, POLICY_CONTROL)
888  ]
889 
890  assert all_referenced is not None
891  # If they reference specific entities, we will check if they are all
892  # allowed to be controlled.
893  for entity_id in all_referenced:
894  if not entity_perms(entity_id, POLICY_CONTROL):
895  raise Unauthorized(
896  context=call.context,
897  entity_id=entity_id,
898  permission=POLICY_CONTROL,
899  )
900 
901  elif target_all_entities:
902  return list(entities.values())
903 
904  # We have already validated they have permissions to control all_referenced
905  # entities so we do not need to check again.
906  if TYPE_CHECKING:
907  assert all_referenced is not None
908  if (
909  len(all_referenced) == 1
910  and (single_entity := list(all_referenced)[0])
911  and (entity := entities.get(single_entity)) is not None
912  ):
913  return [entity]
914 
915  return [entities[entity_id] for entity_id in all_referenced.intersection(entities)]
916 
917 
918 @bind_hass
920  hass: HomeAssistant,
921  registered_entities: dict[str, Entity],
922  func: str | HassJob,
923  call: ServiceCall,
924  required_features: Iterable[int] | None = None,
925 ) -> EntityServiceResponse | None:
926  """Handle an entity service call.
927 
928  Calls all platforms simultaneously.
929  """
930  entity_perms: Callable[[str, str], bool] | None = None
931  return_response = call.return_response
932 
933  if call.context.user_id:
934  user = await hass.auth.async_get_user(call.context.user_id)
935  if user is None:
936  raise UnknownUser(context=call.context)
937  if not user.is_admin:
938  entity_perms = user.permissions.check_entity
939 
940  target_all_entities = call.data.get(ATTR_ENTITY_ID) == ENTITY_MATCH_ALL
941 
942  if target_all_entities:
943  referenced: SelectedEntities | None = None
944  all_referenced: set[str] | None = None
945  else:
946  # A set of entities we're trying to target.
947  referenced = async_extract_referenced_entity_ids(hass, call, True)
948  all_referenced = referenced.referenced | referenced.indirectly_referenced
949 
950  # If the service function is a string, we'll pass it the service call data
951  if isinstance(func, str):
952  data: dict | ServiceCall = remove_entity_service_fields(call)
953  # If the service function is not a string, we pass the service call
954  else:
955  data = call
956 
957  # A list with entities to call the service on.
958  entity_candidates = _get_permissible_entity_candidates(
959  call,
960  registered_entities,
961  entity_perms,
962  target_all_entities,
963  all_referenced,
964  )
965 
966  if not target_all_entities:
967  assert referenced is not None
968  # Only report on explicit referenced entities
969  missing = referenced.referenced.copy()
970  for entity in entity_candidates:
971  missing.discard(entity.entity_id)
972  referenced.log_missing(missing)
973 
974  entities: list[Entity] = []
975  for entity in entity_candidates:
976  if not entity.available:
977  continue
978 
979  # Skip entities that don't have the required feature.
980  if required_features is not None and (
981  entity.supported_features is None
982  or not any(
983  entity.supported_features & feature_set == feature_set
984  for feature_set in required_features
985  )
986  ):
987  # If entity explicitly referenced, raise an error
988  if referenced is not None and entity.entity_id in referenced.referenced:
989  raise HomeAssistantError(
990  f"Entity {entity.entity_id} does not support this service."
991  )
992 
993  continue
994 
995  entities.append(entity)
996 
997  if not entities:
998  if return_response:
999  raise HomeAssistantError(
1000  "Service call requested response data but did not match any entities"
1001  )
1002  return None
1003 
1004  if len(entities) == 1:
1005  # Single entity case avoids creating task
1006  entity = entities[0]
1007  single_response = await _handle_entity_call(
1008  hass, entity, func, data, call.context
1009  )
1010  if entity.should_poll:
1011  # Context expires if the turn on commands took a long time.
1012  # Set context again so it's there when we update
1013  entity.async_set_context(call.context)
1014  await entity.async_update_ha_state(True)
1015  return {entity.entity_id: single_response} if return_response else None
1016 
1017  # Use asyncio.gather here to ensure the returned results
1018  # are in the same order as the entities list
1019  results: list[ServiceResponse | BaseException] = await asyncio.gather(
1020  *[
1021  entity.async_request_call(
1022  _handle_entity_call(hass, entity, func, data, call.context)
1023  )
1024  for entity in entities
1025  ],
1026  return_exceptions=True,
1027  )
1028 
1029  response_data: EntityServiceResponse = {}
1030  for entity, result in zip(entities, results, strict=False):
1031  if isinstance(result, BaseException):
1032  raise result from None
1033  response_data[entity.entity_id] = result
1034 
1035  tasks: list[asyncio.Task[None]] = []
1036 
1037  for entity in entities:
1038  if not entity.should_poll:
1039  continue
1040 
1041  # Context expires if the turn on commands took a long time.
1042  # Set context again so it's there when we update
1043  entity.async_set_context(call.context)
1044  tasks.append(create_eager_task(entity.async_update_ha_state(True)))
1045 
1046  if tasks:
1047  done, pending = await asyncio.wait(tasks)
1048  assert not pending
1049  for future in done:
1050  future.result() # pop exception if have
1051 
1052  return response_data if return_response and response_data else None
1053 
1054 
1056  hass: HomeAssistant,
1057  entity: Entity,
1058  func: str | HassJob,
1059  data: dict | ServiceCall,
1060  context: Context,
1061 ) -> ServiceResponse:
1062  """Handle calling service method."""
1063  entity.async_set_context(context)
1064 
1065  task: asyncio.Future[ServiceResponse] | None
1066  if isinstance(func, str):
1067  job = HassJob(
1068  partial(getattr(entity, func), **data), # type: ignore[arg-type]
1069  job_type=entity.get_hassjob_type(func),
1070  )
1071  task = hass.async_run_hass_job(job)
1072  else:
1073  task = hass.async_run_hass_job(func, entity, data)
1074 
1075  # Guard because callback functions do not return a task when passed to
1076  # async_run_job.
1077  result: ServiceResponse = None
1078  if task is not None:
1079  result = await task
1080 
1081  if asyncio.iscoroutine(result):
1082  _LOGGER.error( # type: ignore[unreachable]
1083  (
1084  "Service %s for %s incorrectly returns a coroutine object. Await result"
1085  " instead in service handler. Report bug to integration author"
1086  ),
1087  func,
1088  entity.entity_id,
1089  )
1090  result = await result
1091 
1092  return result
1093 
1094 
1096  hass: HomeAssistant,
1097  service_job: HassJob[[ServiceCall], Awaitable[None] | None],
1098  call: ServiceCall,
1099 ) -> None:
1100  """Run an admin service."""
1101  if call.context.user_id:
1102  user = await hass.auth.async_get_user(call.context.user_id)
1103  if user is None:
1104  raise UnknownUser(context=call.context)
1105  if not user.is_admin:
1106  raise Unauthorized(context=call.context)
1107 
1108  result = hass.async_run_hass_job(service_job, call)
1109  if result is not None:
1110  await result
1111 
1112 
1113 @bind_hass
1114 @callback
1116  hass: HomeAssistant,
1117  domain: str,
1118  service: str,
1119  service_func: Callable[[ServiceCall], Awaitable[None] | None],
1120  schema: VolSchemaType = vol.Schema({}, extra=vol.PREVENT_EXTRA),
1121 ) -> None:
1122  """Register a service that requires admin access."""
1123  hass.services.async_register(
1124  domain,
1125  service,
1126  partial(
1127  _async_admin_handler,
1128  hass,
1129  HassJob(service_func, f"admin service {domain}.{service}"),
1130  ),
1131  schema,
1132  )
1133 
1134 
1135 @bind_hass
1136 @callback
1138  hass: HomeAssistant, domain: str
1139 ) -> Callable[[Callable[[ServiceCall], Any]], Callable[[ServiceCall], Any]]:
1140  """Ensure permission to access any entity under domain in service call."""
1141 
1142  def decorator(
1143  service_handler: Callable[[ServiceCall], Any],
1144  ) -> Callable[[ServiceCall], Any]:
1145  """Decorate."""
1146  if not asyncio.iscoroutinefunction(service_handler):
1147  raise HomeAssistantError("Can only decorate async functions.")
1148 
1149  async def check_permissions(call: ServiceCall) -> Any:
1150  """Check user permission and raise before call if unauthorized."""
1151  if not call.context.user_id:
1152  return await service_handler(call)
1153 
1154  user = await hass.auth.async_get_user(call.context.user_id)
1155 
1156  if user is None:
1157  raise UnknownUser(
1158  context=call.context,
1159  permission=POLICY_CONTROL,
1160  user_id=call.context.user_id,
1161  )
1162 
1163  reg = entity_registry.async_get(hass)
1164 
1165  authorized = False
1166 
1167  for entity in reg.entities.values():
1168  if entity.platform != domain:
1169  continue
1170 
1171  if user.permissions.check_entity(entity.entity_id, POLICY_CONTROL):
1172  authorized = True
1173  break
1174 
1175  if not authorized:
1176  raise Unauthorized(
1177  context=call.context,
1178  permission=POLICY_CONTROL,
1179  user_id=call.context.user_id,
1180  perm_category=CAT_ENTITIES,
1181  )
1182 
1183  return await service_handler(call)
1184 
1185  return check_permissions
1186 
1187  return decorator
1188 
1189 
1191  """Helper for reload services.
1192 
1193  The helper has the following purposes:
1194  - Make sure reloads do not happen in parallel
1195  - Avoid redundant reloads of the same target
1196  """
1197 
1199  self,
1200  service_func: Callable[[ServiceCall], Coroutine[Any, Any, Any]],
1201  reload_targets_func: Callable[[ServiceCall], set[_T]],
1202  ) -> None:
1203  """Initialize ReloadServiceHelper."""
1204  self._service_func_service_func = service_func
1205  self._service_running_service_running = False
1206  self._service_condition_service_condition = asyncio.Condition()
1207  self._pending_reload_targets: set[_T] = set()
1208  self._reload_targets_func_reload_targets_func = reload_targets_func
1209 
1210  async def execute_service(self, service_call: ServiceCall) -> None:
1211  """Execute the service.
1212 
1213  If a previous reload task is currently in progress, wait for it to finish first.
1214  Once the previous reload task has finished, one of the waiting tasks will be
1215  assigned to execute the reload of the targets it is assigned to reload. The
1216  other tasks will wait if they should reload the same target, otherwise they
1217  will wait for the next round.
1218  """
1219 
1220  do_reload = False
1221  reload_targets = None
1222  async with self._service_condition_service_condition:
1223  if self._service_running_service_running:
1224  # A previous reload task is already in progress, wait for it to finish,
1225  # because that task may be reloading a stale version of the resource.
1226  await self._service_condition_service_condition.wait()
1227 
1228  while True:
1229  async with self._service_condition_service_condition:
1230  # Once we've passed this point, we assume the version of the resource is
1231  # the one our task was assigned to reload, or a newer one. Regardless of
1232  # which, our task is happy as long as the target is reloaded at least
1233  # once.
1234  if reload_targets is None:
1235  reload_targets = self._reload_targets_func_reload_targets_func(service_call)
1236  self._pending_reload_targets |= reload_targets
1237  if not self._service_running_service_running:
1238  # This task will do a reload
1239  self._service_running_service_running = True
1240  do_reload = True
1241  break
1242  # Another task will perform a reload, wait for it to finish
1243  await self._service_condition_service_condition.wait()
1244  # Check if the reload this task is waiting for has been completed
1245  if reload_targets.isdisjoint(self._pending_reload_targets):
1246  break
1247 
1248  if do_reload:
1249  # Reload, then notify other tasks
1250  await self._service_func_service_func(service_call)
1251  async with self._service_condition_service_condition:
1252  self._service_running_service_running = False
1253  self._pending_reload_targets -= reload_targets
1254  self._service_condition_service_condition.notify_all()
1255 
1256 
1257 @callback
1259  hass: HomeAssistant,
1260  domain: str,
1261  name: str,
1262  *,
1263  entities: dict[str, Entity],
1264  func: str | Callable[..., Any],
1265  job_type: HassJobType | None,
1266  required_features: Iterable[int] | None = None,
1267  schema: VolDictType | VolSchemaType | None,
1268  supports_response: SupportsResponse = SupportsResponse.NONE,
1269 ) -> None:
1270  """Help registering an entity service.
1271 
1272  This is called by EntityComponent.async_register_entity_service and
1273  EntityPlatform.async_register_entity_service and should not be called
1274  directly by integrations.
1275  """
1276  if schema is None or isinstance(schema, dict):
1277  schema = cv.make_entity_service_schema(schema)
1278  elif not cv.is_entity_service_schema(schema):
1279  # pylint: disable-next=import-outside-toplevel
1280  from .frame import ReportBehavior, report_usage
1281 
1282  report_usage(
1283  "registers an entity service with a non entity service schema",
1284  core_behavior=ReportBehavior.LOG,
1285  breaks_in_ha_version="2025.9",
1286  )
1287 
1288  service_func: str | HassJob[..., Any]
1289  service_func = func if isinstance(func, str) else HassJob(func)
1290 
1291  hass.services.async_register(
1292  domain,
1293  name,
1294  partial(
1295  entity_service_call,
1296  hass,
1297  entities,
1298  service_func,
1299  required_features=required_features,
1300  ),
1301  schema,
1302  supports_response,
1303  job_type=job_type,
1304  )
None execute_service(self, ServiceCall service_call)
Definition: service.py:1210
None __init__(self, Callable[[ServiceCall], Coroutine[Any, Any, Any]] service_func, Callable[[ServiceCall], set[_T]] reload_targets_func)
Definition: service.py:1202
None log_missing(self, set[str] missing_entities)
Definition: service.py:286
None __init__(self, ServiceCall service_call)
Definition: service.py:230
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
set[str] async_extract_entities(ConfigType|Template config)
Definition: condition.py:1121
None report_usage(str what, *str|None breaks_in_ha_version=None, ReportBehavior core_behavior=ReportBehavior.ERROR, ReportBehavior core_integration_behavior=ReportBehavior.LOG, ReportBehavior custom_integration_behavior=ReportBehavior.LOG, set[str]|None exclude_integrations=None, str|None integration_domain=None, int level=logging.WARNING)
Definition: frame.py:195
None async_register_entity_service(HomeAssistant hass, str domain, str name, *dict[str, Entity] entities, str|Callable[..., Any] func, HassJobType|None job_type, Iterable[int]|None required_features=None, VolDictType|VolSchemaType|None schema, SupportsResponse supports_response=SupportsResponse.NONE)
Definition: service.py:1269
set[str] async_extract_config_entry_ids(HomeAssistant hass, ServiceCall service_call, bool expand_group=True)
Definition: service.py:635
Callable[[Callable[[ServiceCall], Any]], Callable[[ServiceCall], Any]] verify_domain_control(HomeAssistant hass, str domain)
Definition: service.py:1139
Any validate_supported_feature(str supported_feature)
Definition: service.py:162
ServiceParams async_prepare_call_from_config(HomeAssistant hass, ConfigType config, TemplateVarsType variables=None, bool validate_config=False)
Definition: service.py:352
set[str] extract_entity_ids(HomeAssistant hass, ServiceCall service_call, bool expand_group=True)
Definition: service.py:440
dict[str, ModuleType] _base_components()
Definition: service.py:85
dict[Any, Any] remove_entity_service_fields(ServiceCall call)
Definition: service.py:831
None async_set_service_schema(HomeAssistant hass, str domain, str service, dict[str, Any] schema)
Definition: service.py:844
str starts_with_dot(str key)
Definition: service.py:201
None async_register_admin_service(HomeAssistant hass, str domain, str service, Callable[[ServiceCall], Awaitable[None]|None] service_func, VolSchemaType schema=vol.Schema({}, extra=vol.PREVENT_EXTRA))
Definition: service.py:1121
ServiceResponse _handle_entity_call(HomeAssistant hass, Entity entity, str|HassJob func, dict|ServiceCall data, Context context)
Definition: service.py:1061
None async_call_from_config(HomeAssistant hass, ConfigType config, bool blocking=False, TemplateVarsType variables=None, bool validate_config=True, Context|None context=None)
Definition: service.py:331
dict[str, Any]|None async_get_cached_service_description(HomeAssistant hass, str domain, str service)
Definition: service.py:691
list[Entity] _get_permissible_entity_candidates(ServiceCall call, dict[str, Entity] entities, Callable[[str, str], bool]|None entity_perms, bool target_all_entities, set[str]|None all_referenced)
Definition: service.py:877
None call_from_config(HomeAssistant hass, ConfigType config, bool blocking=False, TemplateVarsType variables=None, bool validate_config=True)
Definition: service.py:315
SelectedEntities async_extract_referenced_entity_ids(HomeAssistant hass, ServiceCall service_call, bool expand_group=True)
Definition: service.py:507
TypeGuard[str|list[str]] _has_match(str|list[str]|None ids)
Definition: service.py:499
set[str] async_extract_entity_ids(HomeAssistant hass, ServiceCall service_call, bool expand_group=True)
Definition: service.py:490
EntityServiceResponse|None entity_service_call(HomeAssistant hass, dict[str, Entity] registered_entities, str|HassJob func, ServiceCall call, Iterable[int]|None required_features=None)
Definition: service.py:925
Any _validate_option_or_feature(str option_or_feature, str label)
Definition: service.py:129
list[JSON_TYPE] _load_services_files(HomeAssistant hass, Iterable[Integration] integrations)
Definition: service.py:683
JSON_TYPE _load_services_file(HomeAssistant hass, Integration integration)
Definition: service.py:658
Any validate_attribute_option(str attribute_option)
Definition: service.py:157
dict[str, dict[str, Any]] async_get_all_descriptions(HomeAssistant hass)
Definition: service.py:699
None _async_admin_handler(HomeAssistant hass, HassJob[[ServiceCall], Awaitable[None]|None] service_job, ServiceCall call)
Definition: service.py:1099
dict[str, Integration|Exception] async_get_integrations(HomeAssistant hass, Iterable[str] domains)
Definition: loader.py:1368
dict load_yaml_dict(str|os.PathLike[str] fname, Secrets|None secrets=None)
Definition: loader.py:180