Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Helper methods for various modules."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Coroutine, Iterable, KeysView, Mapping
7 from datetime import datetime, timedelta
8 from functools import wraps
9 import random
10 import re
11 import string
12 import threading
13 from typing import Any
14 
15 import slugify as unicode_slug
16 
17 from .dt import as_local, utcnow
18 
19 RE_SANITIZE_FILENAME = re.compile(r"(~|\.\.|/|\\)")
20 RE_SANITIZE_PATH = re.compile(r"(~|\.(\.)+)")
21 
22 
23 def raise_if_invalid_filename(filename: str) -> None:
24  """Check if a filename is valid.
25 
26  Raises a ValueError if the filename is invalid.
27  """
28  if RE_SANITIZE_FILENAME.sub("", filename) != filename:
29  raise ValueError(f"{filename} is not a safe filename")
30 
31 
32 def raise_if_invalid_path(path: str) -> None:
33  """Check if a path is valid.
34 
35  Raises a ValueError if the path is invalid.
36  """
37  if RE_SANITIZE_PATH.sub("", path) != path:
38  raise ValueError(f"{path} is not a safe path")
39 
40 
41 def slugify(text: str | None, *, separator: str = "_") -> str:
42  """Slugify a given text."""
43  if text == "" or text is None:
44  return ""
45  slug = unicode_slug.slugify(text, separator=separator)
46  return "unknown" if slug == "" else slug
47 
48 
49 def repr_helper(inp: Any) -> str:
50  """Help creating a more readable string representation of objects."""
51  if isinstance(inp, Mapping):
52  return ", ".join(
53  f"{repr_helper(key)}={repr_helper(item)}" for key, item in inp.items()
54  )
55  if isinstance(inp, datetime):
56  return as_local(inp).isoformat()
57 
58  return str(inp)
59 
60 
61 def convert[_T, _U](
62  value: _T | None, to_type: Callable[[_T], _U], default: _U | None = None
63 ) -> _U | None:
64  """Convert value to to_type, returns default if fails."""
65  try:
66  return default if value is None else to_type(value)
67  except (ValueError, TypeError):
68  # If value could not be converted
69  return default
70 
71 
73  preferred_string: str, current_strings: Iterable[str] | KeysView[str]
74 ) -> str:
75  """Return a string that is not present in current_strings.
76 
77  If preferred string exists will append _2, _3, ..
78  """
79  test_string = preferred_string
80  current_strings_set = set(current_strings)
81 
82  tries = 1
83 
84  while test_string in current_strings_set:
85  tries += 1
86  test_string = f"{preferred_string}_{tries}"
87 
88  return test_string
89 
90 
91 # Taken from http://stackoverflow.com/a/23728630
92 def get_random_string(length: int = 10) -> str:
93  """Return a random string with letters and digits."""
94  generator = random.SystemRandom()
95  source_chars = string.ascii_letters + string.digits
96 
97  return "".join(generator.choice(source_chars) for _ in range(length))
98 
99 
100 class Throttle:
101  """A class for throttling the execution of tasks.
102 
103  This method decorator adds a cooldown to a method to prevent it from being
104  called more than 1 time within the timedelta interval `min_time` after it
105  returned its result.
106 
107  Calling a method a second time during the interval will return None.
108 
109  Pass keyword argument `no_throttle=True` to the wrapped method to make
110  the call not throttled.
111 
112  Decorator takes in an optional second timedelta interval to throttle the
113  'no_throttle' calls.
114 
115  Adds a datetime attribute `last_call` to the method.
116  """
117 
118  def __init__(
119  self, min_time: timedelta, limit_no_throttle: timedelta | None = None
120  ) -> None:
121  """Initialize the throttle."""
122  self.min_timemin_time = min_time
123  self.limit_no_throttlelimit_no_throttle = limit_no_throttle
124 
125  def __call__(self, method: Callable) -> Callable:
126  """Caller for the throttle."""
127  # Make sure we return a coroutine if the method is async.
128  if asyncio.iscoroutinefunction(method):
129 
130  async def throttled_value() -> None:
131  """Stand-in function for when real func is being throttled."""
132 
133  else:
134 
135  def throttled_value() -> None: # type: ignore[misc]
136  """Stand-in function for when real func is being throttled."""
137 
138  if self.limit_no_throttle is not None:
139  method = Throttle(self.limit_no_throttle)(method)
140 
141  # Different methods that can be passed in:
142  # - a function
143  # - an unbound function on a class
144  # - a method (bound function on a class)
145 
146  # We want to be able to differentiate between function and unbound
147  # methods (which are considered functions).
148  # All methods have the classname in their qualname separated by a '.'
149  # Functions have a '.' in their qualname if defined inline, but will
150  # be prefixed by '.<locals>.' so we strip that out.
151  is_func = (
152  not hasattr(method, "__self__")
153  and "." not in method.__qualname__.rpartition(".<locals>.")[-1]
154  )
155 
156  @wraps(method)
157  def wrapper(*args: Any, **kwargs: Any) -> Callable | Coroutine:
158  """Wrap that allows wrapped to be called only once per min_time.
159 
160  If we cannot acquire the lock, it is running so return None.
161  """
162  if hasattr(method, "__self__"):
163  host = getattr(method, "__self__")
164  elif is_func:
165  host = wrapper
166  else:
167  host = args[0] if args else wrapper
168 
169  if not hasattr(host, "_throttle"):
170  host._throttle = {} # noqa: SLF001
171 
172  if id(self) not in host._throttle: # noqa: SLF001
173  host._throttle[id(self)] = [threading.Lock(), None] # noqa: SLF001
174  throttle = host._throttle[id(self)] # noqa: SLF001
175 
176  if not throttle[0].acquire(False):
177  return throttled_value()
178 
179  # Check if method is never called or no_throttle is given
180  force = kwargs.pop("no_throttle", False) or not throttle[1]
181 
182  try:
183  if force or utcnow() - throttle[1] > self.min_time:
184  result = method(*args, **kwargs)
185  throttle[1] = utcnow()
186  return result # type: ignore[no-any-return]
187 
188  return throttled_value()
189  finally:
190  throttle[0].release()
191 
192  return wrapper
Callable __call__(self, Callable method)
Definition: __init__.py:125
None __init__(self, timedelta min_time, timedelta|None limit_no_throttle=None)
Definition: __init__.py:120
dt.datetime as_local(dt.datetime dattim)
Definition: dt.py:157
str get_random_string(int length=10)
Definition: __init__.py:92
str slugify(str|None text, *str separator="_")
Definition: __init__.py:41
str ensure_unique_string(str preferred_string, Iterable[str]|KeysView[str] current_strings)
Definition: __init__.py:74
None raise_if_invalid_path(str path)
Definition: __init__.py:32
str repr_helper(Any inp)
Definition: __init__.py:49
None raise_if_invalid_filename(str filename)
Definition: __init__.py:23