1 """Helper methods for various modules."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine, Iterable, KeysView, Mapping
7 from datetime
import datetime, timedelta
8 from functools
import wraps
13 from typing
import Any
15 import slugify
as unicode_slug
17 from .dt
import as_local, utcnow
19 RE_SANITIZE_FILENAME = re.compile(
r"(~|\.\.|/|\\)")
20 RE_SANITIZE_PATH = re.compile(
r"(~|\.(\.)+)")
24 """Check if a filename is valid.
26 Raises a ValueError if the filename is invalid.
28 if RE_SANITIZE_FILENAME.sub(
"", filename) != filename:
29 raise ValueError(f
"{filename} is not a safe filename")
33 """Check if a path is valid.
35 Raises a ValueError if the path is invalid.
37 if RE_SANITIZE_PATH.sub(
"", path) != path:
38 raise ValueError(f
"{path} is not a safe path")
41 def slugify(text: str |
None, *, separator: str =
"_") -> str:
42 """Slugify a given text."""
43 if text ==
"" or text
is None:
45 slug = unicode_slug.slugify(text, separator=separator)
46 return "unknown" if slug ==
"" else slug
50 """Help creating a more readable string representation of objects."""
51 if isinstance(inp, Mapping):
53 f
"{repr_helper(key)}={repr_helper(item)}" for key, item
in inp.items()
55 if isinstance(inp, datetime):
62 value: _T |
None, to_type: Callable[[_T], _U], default: _U |
None =
None
64 """Convert value to to_type, returns default if fails."""
66 return default
if value
is None else to_type(value)
67 except (ValueError, TypeError):
73 preferred_string: str, current_strings: Iterable[str] | KeysView[str]
75 """Return a string that is not present in current_strings.
77 If preferred string exists will append _2, _3, ..
79 test_string = preferred_string
80 current_strings_set = set(current_strings)
84 while test_string
in current_strings_set:
86 test_string = f
"{preferred_string}_{tries}"
93 """Return a random string with letters and digits."""
94 generator = random.SystemRandom()
95 source_chars = string.ascii_letters + string.digits
97 return "".join(generator.choice(source_chars)
for _
in range(length))
101 """A class for throttling the execution of tasks.
103 This method decorator adds a cooldown to a method to prevent it from being
104 called more than 1 time within the timedelta interval `min_time` after it
107 Calling a method a second time during the interval will return None.
109 Pass keyword argument `no_throttle=True` to the wrapped method to make
110 the call not throttled.
112 Decorator takes in an optional second timedelta interval to throttle the
115 Adds a datetime attribute `last_call` to the method.
119 self, min_time: timedelta, limit_no_throttle: timedelta |
None =
None
121 """Initialize the throttle."""
126 """Caller for the throttle."""
128 if asyncio.iscoroutinefunction(method):
130 async
def throttled_value() -> None:
131 """Stand-in function for when real func is being throttled."""
135 def throttled_value() -> None:
136 """Stand-in function for when real func is being throttled."""
138 if self.limit_no_throttle
is not None:
139 method = Throttle(self.limit_no_throttle)(method)
152 not hasattr(method,
"__self__")
153 and "." not in method.__qualname__.rpartition(
".<locals>.")[-1]
157 def wrapper(*args: Any, **kwargs: Any) -> Callable | Coroutine:
158 """Wrap that allows wrapped to be called only once per min_time.
160 If we cannot acquire the lock, it is running so return None.
162 if hasattr(method,
"__self__"):
163 host = getattr(method,
"__self__")
167 host = args[0]
if args
else wrapper
169 if not hasattr(host,
"_throttle"):
172 if id(self)
not in host._throttle:
173 host._throttle[id(self)] = [threading.Lock(),
None]
174 throttle = host._throttle[id(self)]
176 if not throttle[0].acquire(
False):
177 return throttled_value()
180 force = kwargs.pop(
"no_throttle",
False)
or not throttle[1]
183 if force
or utcnow() - throttle[1] > self.min_time:
184 result = method(*args, **kwargs)
188 return throttled_value()
190 throttle[0].release()
Callable __call__(self, Callable method)
None __init__(self, timedelta min_time, timedelta|None limit_no_throttle=None)
dt.datetime as_local(dt.datetime dattim)
str get_random_string(int length=10)
str slugify(str|None text, *str separator="_")
str ensure_unique_string(str preferred_string, Iterable[str]|KeysView[str] current_strings)
None raise_if_invalid_path(str path)
None raise_if_invalid_filename(str filename)