Home Assistant Unofficial Reference 2024.12.1
loader.py
Go to the documentation of this file.
1 """Custom loader."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Iterator
6 import fnmatch
7 from io import StringIO, TextIOWrapper
8 import logging
9 import os
10 from pathlib import Path
11 from typing import Any, TextIO, overload
12 
13 import yaml
14 
15 try:
16  from yaml import CSafeLoader as FastestAvailableSafeLoader
17 
18  HAS_C_LOADER = True
19 except ImportError:
20  HAS_C_LOADER = False
21  from yaml import ( # type: ignore[assignment]
22  SafeLoader as FastestAvailableSafeLoader,
23  )
24 
25 from propcache import cached_property
26 
27 from homeassistant.exceptions import HomeAssistantError
28 
29 from .const import SECRET_YAML
30 from .objects import Input, NodeDictClass, NodeListClass, NodeStrClass
31 
32 # mypy: allow-untyped-calls, no-warn-return-any
33 
34 JSON_TYPE = list | dict | str
35 
36 _LOGGER = logging.getLogger(__name__)
37 
38 
39 class YamlTypeError(HomeAssistantError):
40  """Raised by load_yaml_dict if top level data is not a dict."""
41 
42 
43 class Secrets:
44  """Store secrets while loading YAML."""
45 
46  def __init__(self, config_dir: Path) -> None:
47  """Initialize secrets."""
48  self.config_dirconfig_dir = config_dir
49  self._cache: dict[Path, dict[str, str]] = {}
50 
51  def get(self, requester_path: str, secret: str) -> str:
52  """Return the value of a secret."""
53  current_path = Path(requester_path)
54 
55  secret_dir = current_path
56  while True:
57  secret_dir = secret_dir.parent
58 
59  try:
60  secret_dir.relative_to(self.config_dirconfig_dir)
61  except ValueError:
62  # We went above the config dir
63  break
64 
65  secrets = self._load_secret_yaml_load_secret_yaml(secret_dir)
66 
67  if secret in secrets:
68  _LOGGER.debug(
69  "Secret %s retrieved from secrets.yaml in folder %s",
70  secret,
71  secret_dir,
72  )
73  return secrets[secret]
74 
75  raise HomeAssistantError(f"Secret {secret} not defined")
76 
77  def _load_secret_yaml(self, secret_dir: Path) -> dict[str, str]:
78  """Load the secrets yaml from path."""
79  if (secret_path := secret_dir / SECRET_YAML) in self._cache:
80  return self._cache[secret_path]
81 
82  _LOGGER.debug("Loading %s", secret_path)
83  try:
84  secrets = load_yaml(str(secret_path))
85 
86  if not isinstance(secrets, dict):
87  raise HomeAssistantError("Secrets is not a dictionary")
88 
89  if "logger" in secrets:
90  logger = str(secrets["logger"]).lower()
91  if logger == "debug":
92  _LOGGER.setLevel(logging.DEBUG)
93  else:
94  _LOGGER.error(
95  (
96  "Error in secrets.yaml: 'logger: debug' expected, but"
97  " 'logger: %s' found"
98  ),
99  logger,
100  )
101  del secrets["logger"]
102  except FileNotFoundError:
103  secrets = {}
104 
105  self._cache[secret_path] = secrets
106 
107  return secrets
108 
109 
111  """Mixin class with extensions for YAML loader."""
112 
113  name: str
114  stream: Any
115 
116  @cached_property
117  def get_name(self) -> str:
118  """Get the name of the loader."""
119  return self.name
120 
121  @cached_property
122  def get_stream_name(self) -> str:
123  """Get the name of the stream."""
124  return getattr(self.stream, "name", "")
125 
126 
127 class FastSafeLoader(FastestAvailableSafeLoader, _LoaderMixin):
128  """The fastest available safe loader, either C or Python."""
129 
130  def __init__(self, stream: Any, secrets: Secrets | None = None) -> None:
131  """Initialize a safe line loader."""
132  self.streamstream = stream
133 
134  # Set name in same way as the Python loader does in yaml.reader.__init__
135  if isinstance(stream, str):
136  self.namename = "<unicode string>"
137  elif isinstance(stream, bytes):
138  self.namename = "<byte string>"
139  else:
140  self.namename = getattr(stream, "name", "<file>")
141 
142  super().__init__(stream)
143  self.secretssecrets = secrets
144 
145 
146 class PythonSafeLoader(yaml.SafeLoader, _LoaderMixin):
147  """Python safe loader."""
148 
149  def __init__(self, stream: Any, secrets: Secrets | None = None) -> None:
150  """Initialize a safe line loader."""
151  super().__init__(stream)
152  self.secretssecrets = secrets
153 
154 
155 type LoaderType = FastSafeLoader | PythonSafeLoader
156 
157 
159  fname: str | os.PathLike[str], secrets: Secrets | None = None
160 ) -> JSON_TYPE | None:
161  """Load a YAML file.
162 
163  If opening the file raises an OSError it will be wrapped in a HomeAssistantError,
164  except for FileNotFoundError which will be re-raised.
165  """
166  try:
167  with open(fname, encoding="utf-8") as conf_file:
168  return parse_yaml(conf_file, secrets)
169  except UnicodeDecodeError as exc:
170  _LOGGER.error("Unable to read file %s: %s", fname, exc)
171  raise HomeAssistantError(exc) from exc
172  except FileNotFoundError:
173  raise
174  except OSError as exc:
175  raise HomeAssistantError(exc) from exc
176 
177 
179  fname: str | os.PathLike[str], secrets: Secrets | None = None
180 ) -> dict:
181  """Load a YAML file and ensure the top level is a dict.
182 
183  Raise if the top level is not a dict.
184  Return an empty dict if the file is empty.
185  """
186  loaded_yaml = load_yaml(fname, secrets)
187  if loaded_yaml is None:
188  loaded_yaml = {}
189  if not isinstance(loaded_yaml, dict):
190  raise YamlTypeError(f"YAML file {fname} does not contain a dict")
191  return loaded_yaml
192 
193 
195  content: str | TextIO | StringIO, secrets: Secrets | None = None
196 ) -> JSON_TYPE:
197  """Parse YAML with the fastest available loader."""
198  if not HAS_C_LOADER:
199  return _parse_yaml_python(content, secrets)
200  try:
201  return _parse_yaml(FastSafeLoader, content, secrets)
202  except yaml.YAMLError:
203  # Loading failed, so we now load with the Python loader which has more
204  # readable exceptions
205  if isinstance(content, (StringIO, TextIO, TextIOWrapper)):
206  # Rewind the stream so we can try again
207  content.seek(0, 0)
208  return _parse_yaml_python(content, secrets)
209 
210 
212  content: str | TextIO | StringIO, secrets: Secrets | None = None
213 ) -> JSON_TYPE:
214  """Parse YAML with the python loader (this is very slow)."""
215  try:
216  return _parse_yaml(PythonSafeLoader, content, secrets)
217  except yaml.YAMLError as exc:
218  _LOGGER.error(str(exc))
219  raise HomeAssistantError(exc) from exc
220 
221 
223  loader: type[FastSafeLoader | PythonSafeLoader],
224  content: str | TextIO,
225  secrets: Secrets | None = None,
226 ) -> JSON_TYPE:
227  """Load a YAML file."""
228  return yaml.load(content, Loader=lambda stream: loader(stream, secrets)) # type: ignore[arg-type]
229 
230 
231 @overload
233  obj: list | NodeListClass, loader: LoaderType, node: yaml.nodes.Node
234 ) -> NodeListClass: ...
235 
236 
237 @overload
239  obj: str | NodeStrClass, loader: LoaderType, node: yaml.nodes.Node
240 ) -> NodeStrClass: ...
241 
242 
243 @overload
245  obj: dict | NodeDictClass, loader: LoaderType, node: yaml.nodes.Node
246 ) -> NodeDictClass: ...
247 
248 
250  obj: dict | list | str | NodeDictClass | NodeListClass | NodeStrClass,
251  loader: LoaderType,
252  node: yaml.nodes.Node,
253 ) -> NodeDictClass | NodeListClass | NodeStrClass:
254  """Add file reference information to an object."""
255  if isinstance(obj, list):
256  obj = NodeListClass(obj)
257  elif isinstance(obj, str):
258  obj = NodeStrClass(obj)
259  elif isinstance(obj, dict):
260  obj = NodeDictClass(obj)
261  return _add_reference_to_node_class(obj, loader, node)
262 
263 
264 @overload
266  obj: NodeListClass, loader: LoaderType, node: yaml.nodes.Node
267 ) -> NodeListClass: ...
268 
269 
270 @overload
272  obj: NodeStrClass, loader: LoaderType, node: yaml.nodes.Node
273 ) -> NodeStrClass: ...
274 
275 
276 @overload
278  obj: NodeDictClass, loader: LoaderType, node: yaml.nodes.Node
279 ) -> NodeDictClass: ...
280 
281 
283  obj: NodeDictClass | NodeListClass | NodeStrClass,
284  loader: LoaderType,
285  node: yaml.nodes.Node,
286 ) -> NodeDictClass | NodeListClass | NodeStrClass:
287  """Add file reference information to a node class object."""
288  try: # suppress is much slower
289  obj.__config_file__ = loader.get_name
290  obj.__line__ = node.start_mark.line + 1
291  except AttributeError:
292  pass
293  return obj
294 
295 
296 def _raise_if_no_value[NodeT: yaml.nodes.Node, _R](
297  func: Callable[[LoaderType, NodeT], _R],
298 ) -> Callable[[LoaderType, NodeT], _R]:
299  def wrapper(loader: LoaderType, node: NodeT) -> _R:
300  if not node.value:
301  raise HomeAssistantError(
302  f"{node.start_mark}: {node.tag} needs an argument."
303  )
304  return func(loader, node)
305 
306  return wrapper
307 
308 
309 @_raise_if_no_value
310 def _include_yaml(loader: LoaderType, node: yaml.nodes.Node) -> JSON_TYPE:
311  """Load another YAML file and embed it using the !include tag.
312 
313  Example:
314  device_tracker: !include device_tracker.yaml
315 
316  """
317  fname = os.path.join(os.path.dirname(loader.get_name), node.value)
318  try:
319  loaded_yaml = load_yaml(fname, loader.secrets)
320  if loaded_yaml is None:
321  loaded_yaml = NodeDictClass()
322  return _add_reference(loaded_yaml, loader, node)
323  except FileNotFoundError as exc:
324  raise HomeAssistantError(
325  f"{node.start_mark}: Unable to read file {fname}"
326  ) from exc
327 
328 
329 def _is_file_valid(name: str) -> bool:
330  """Decide if a file is valid."""
331  return not name.startswith(".")
332 
333 
334 def _find_files(directory: str, pattern: str) -> Iterator[str]:
335  """Recursively load files in a directory."""
336  for root, dirs, files in os.walk(directory, topdown=True):
337  dirs[:] = [d for d in dirs if _is_file_valid(d)]
338  for basename in sorted(files):
339  if _is_file_valid(basename) and fnmatch.fnmatch(basename, pattern):
340  filename = os.path.join(root, basename)
341  yield filename
342 
343 
344 @_raise_if_no_value
345 def _include_dir_named_yaml(loader: LoaderType, node: yaml.nodes.Node) -> NodeDictClass:
346  """Load multiple files from directory as a dictionary."""
347  mapping = NodeDictClass()
348  loc = os.path.join(os.path.dirname(loader.get_name), node.value)
349  for fname in _find_files(loc, "*.yaml"):
350  filename = os.path.splitext(os.path.basename(fname))[0]
351  if os.path.basename(fname) == SECRET_YAML:
352  continue
353  loaded_yaml = load_yaml(fname, loader.secrets)
354  if loaded_yaml is None:
355  # Special case, an empty file included by !include_dir_named is treated
356  # as an empty dictionary
357  loaded_yaml = NodeDictClass()
358  mapping[filename] = loaded_yaml
359  return _add_reference_to_node_class(mapping, loader, node)
360 
361 
362 @_raise_if_no_value
364  loader: LoaderType, node: yaml.nodes.Node
365 ) -> NodeDictClass:
366  """Load multiple files from directory as a merged dictionary."""
367  mapping = NodeDictClass()
368  loc = os.path.join(os.path.dirname(loader.get_name), node.value)
369  for fname in _find_files(loc, "*.yaml"):
370  if os.path.basename(fname) == SECRET_YAML:
371  continue
372  loaded_yaml = load_yaml(fname, loader.secrets)
373  if isinstance(loaded_yaml, dict):
374  mapping.update(loaded_yaml)
375  return _add_reference_to_node_class(mapping, loader, node)
376 
377 
378 @_raise_if_no_value
380  loader: LoaderType, node: yaml.nodes.Node
381 ) -> list[JSON_TYPE]:
382  """Load multiple files from directory as a list."""
383  loc = os.path.join(os.path.dirname(loader.get_name), node.value)
384  return [
385  loaded_yaml
386  for f in _find_files(loc, "*.yaml")
387  if os.path.basename(f) != SECRET_YAML
388  and (loaded_yaml := load_yaml(f, loader.secrets)) is not None
389  ]
390 
391 
392 @_raise_if_no_value
394  loader: LoaderType, node: yaml.nodes.Node
395 ) -> JSON_TYPE:
396  """Load multiple files from directory as a merged list."""
397  loc: str = os.path.join(os.path.dirname(loader.get_name), node.value)
398  merged_list: list[JSON_TYPE] = []
399  for fname in _find_files(loc, "*.yaml"):
400  if os.path.basename(fname) == SECRET_YAML:
401  continue
402  loaded_yaml = load_yaml(fname, loader.secrets)
403  if isinstance(loaded_yaml, list):
404  merged_list.extend(loaded_yaml)
405  return _add_reference(merged_list, loader, node)
406 
407 
409  loader: LoaderType, node: yaml.nodes.MappingNode
410 ) -> NodeDictClass:
411  """Load YAML mappings into an ordered dictionary to preserve key order."""
412  loader.flatten_mapping(node)
413  nodes = loader.construct_pairs(node)
414 
415  seen: dict = {}
416  for (key, _), (child_node, _) in zip(nodes, node.value, strict=False):
417  line = child_node.start_mark.line
418 
419  try:
420  hash(key)
421  except TypeError as exc:
422  fname = loader.get_stream_name
423  raise yaml.MarkedYAMLError(
424  context=f'invalid key: "{key}"',
425  context_mark=yaml.Mark(
426  fname,
427  0,
428  line,
429  -1,
430  None,
431  None, # type: ignore[arg-type]
432  ),
433  ) from exc
434 
435  if key in seen:
436  fname = loader.get_stream_name
437  _LOGGER.warning(
438  'YAML file %s contains duplicate key "%s". Check lines %d and %d',
439  fname,
440  key,
441  seen[key],
442  line,
443  )
444  seen[key] = line
445 
446  return _add_reference_to_node_class(NodeDictClass(nodes), loader, node)
447 
448 
449 def _construct_seq(loader: LoaderType, node: yaml.nodes.Node) -> JSON_TYPE:
450  """Add line number and file name to Load YAML sequence."""
451  (obj,) = loader.construct_yaml_seq(node)
452  return _add_reference(obj, loader, node)
453 
454 
456  loader: LoaderType, node: yaml.nodes.ScalarNode
457 ) -> str | int | float | None:
458  """Add line number and file name to Load YAML sequence."""
459  obj = node.value
460  if not isinstance(obj, str):
461  return obj
462  return _add_reference_to_node_class(NodeStrClass(obj), loader, node)
463 
464 
465 def _env_var_yaml(loader: LoaderType, node: yaml.nodes.Node) -> str:
466  """Load environment variables and embed it into the configuration YAML."""
467  args = node.value.split()
468 
469  # Check for a default value
470  if len(args) > 1:
471  return os.getenv(args[0], " ".join(args[1:]))
472  if args[0] in os.environ:
473  return os.environ[args[0]]
474  _LOGGER.error("Environment variable %s not defined", node.value)
475  raise HomeAssistantError(node.value)
476 
477 
478 def secret_yaml(loader: LoaderType, node: yaml.nodes.Node) -> JSON_TYPE:
479  """Load secrets and embed it into the configuration YAML."""
480  if loader.secrets is None:
481  raise HomeAssistantError("Secrets not supported in this YAML file")
482 
483  return loader.secrets.get(loader.get_name, node.value)
484 
485 
486 def add_constructor(tag: Any, constructor: Any) -> None:
487  """Add to constructor to all loaders."""
488  for yaml_loader in (FastSafeLoader, PythonSafeLoader):
489  yaml_loader.add_constructor(tag, constructor)
490 
491 
492 add_constructor("!include", _include_yaml)
493 add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _handle_mapping_tag)
494 add_constructor(yaml.resolver.BaseResolver.DEFAULT_SCALAR_TAG, _handle_scalar_tag)
495 add_constructor(yaml.resolver.BaseResolver.DEFAULT_SEQUENCE_TAG, _construct_seq)
496 add_constructor("!env_var", _env_var_yaml)
497 add_constructor("!secret", secret_yaml)
498 add_constructor("!include_dir_list", _include_dir_list_yaml)
499 add_constructor("!include_dir_merge_list", _include_dir_merge_list_yaml)
500 add_constructor("!include_dir_named", _include_dir_named_yaml)
501 add_constructor("!include_dir_merge_named", _include_dir_merge_named_yaml)
502 add_constructor("!input", Input.from_node)
None __init__(self, Any stream, Secrets|None secrets=None)
Definition: loader.py:130
None __init__(self, Any stream, Secrets|None secrets=None)
Definition: loader.py:149
dict[str, str] _load_secret_yaml(self, Path secret_dir)
Definition: loader.py:77
str get(self, str requester_path, str secret)
Definition: loader.py:51
None __init__(self, Path config_dir)
Definition: loader.py:46
None open(self, **Any kwargs)
Definition: lock.py:86
JSON_TYPE _parse_yaml_python(str|TextIO|StringIO content, Secrets|None secrets=None)
Definition: loader.py:213
None add_constructor(Any tag, Any constructor)
Definition: loader.py:486
bool _is_file_valid(str name)
Definition: loader.py:329
NodeDictClass _handle_mapping_tag(LoaderType loader, yaml.nodes.MappingNode node)
Definition: loader.py:410
JSON_TYPE _include_dir_merge_list_yaml(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:395
str|int|float|None _handle_scalar_tag(LoaderType loader, yaml.nodes.ScalarNode node)
Definition: loader.py:457
Iterator[str] _find_files(str directory, str pattern)
Definition: loader.py:334
str _env_var_yaml(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:465
NodeListClass _add_reference_to_node_class(NodeListClass obj, LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:267
JSON_TYPE|None load_yaml(str|os.PathLike[str] fname, Secrets|None secrets=None)
Definition: loader.py:160
JSON_TYPE parse_yaml(str|TextIO|StringIO content, Secrets|None secrets=None)
Definition: loader.py:196
dict load_yaml_dict(str|os.PathLike[str] fname, Secrets|None secrets=None)
Definition: loader.py:180
list[JSON_TYPE] _include_dir_list_yaml(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:381
NodeDictClass _include_dir_merge_named_yaml(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:365
JSON_TYPE _construct_seq(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:449
NodeDictClass _include_dir_named_yaml(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:345
JSON_TYPE _include_yaml(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:310
JSON_TYPE _parse_yaml(type[FastSafeLoader|PythonSafeLoader] loader, str|TextIO content, Secrets|None secrets=None)
Definition: loader.py:226
JSON_TYPE secret_yaml(LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:478
NodeListClass _add_reference(list|NodeListClass obj, LoaderType loader, yaml.nodes.Node node)
Definition: loader.py:234