1 """Block blocking calls being done in asyncio."""
4 from collections.abc
import Callable
5 from contextlib
import suppress
6 from dataclasses
import dataclass
8 from http.client
import HTTPConnection
11 from pathlib
import Path
12 from ssl
import SSLContext
16 from typing
import Any
18 from .helpers.frame
import get_current_frame
19 from .util.loop
import protect_loop
21 _IN_TESTS =
"unittest" in sys.modules
23 ALLOWED_FILE_PREFIXES = (
"/proc",)
28 return bool((args := mapped_args.get(
"args"))
and args[0]
in sys.modules)
33 args = mapped_args[
"args"]
34 path = args[0]
if type(args[0])
is str
else str(args[0])
35 return path.startswith(ALLOWED_FILE_PREFIXES)
48 with suppress(ValueError):
53 @dataclass(slots=True, frozen=True)
55 """Class to hold information about a blocking call."""
57 original_func: Callable
60 check_allowed: Callable[[dict[str, Any]], bool] |
None
66 _BLOCKING_CALLS: tuple[BlockingCall, ...] = (
68 original_func=HTTPConnection.putrequest,
69 object=HTTPConnection,
70 function=
"putrequest",
77 original_func=time.sleep,
80 check_allowed=_check_sleep_call_allowed,
86 original_func=glob.glob,
95 original_func=glob.iglob,
101 skip_for_tests=
False,
104 original_func=os.walk,
110 skip_for_tests=
False,
113 original_func=os.listdir,
122 original_func=os.scandir,
131 original_func=builtins.open,
134 check_allowed=_check_file_allowed,
140 original_func=importlib.import_module,
142 function=
"import_module",
143 check_allowed=_check_import_call_allowed,
149 original_func=SSLContext.load_default_certs,
151 function=
"load_default_certs",
158 original_func=SSLContext.load_verify_locations,
160 function=
"load_verify_locations",
167 original_func=SSLContext.load_cert_chain,
169 function=
"load_cert_chain",
176 original_func=Path.open,
179 check_allowed=_check_file_allowed,
185 original_func=Path.read_text,
187 function=
"read_text",
188 check_allowed=_check_file_allowed,
194 original_func=Path.read_bytes,
196 function=
"read_bytes",
197 check_allowed=_check_file_allowed,
203 original_func=Path.write_text,
205 function=
"write_text",
206 check_allowed=_check_file_allowed,
212 original_func=Path.write_bytes,
214 function=
"write_bytes",
215 check_allowed=_check_file_allowed,
223 @dataclass(slots=True)
225 """Class to track which calls are blocked."""
227 calls: set[BlockingCall]
234 """Enable the detection of blocking calls in the event loop."""
235 calls = _BLOCKED_CALLS.calls
237 raise RuntimeError(
"Blocking call detection is already enabled")
239 loop_thread_id = threading.get_ident()
240 for blocking_call
in _BLOCKING_CALLS:
241 if _IN_TESTS
and blocking_call.skip_for_tests:
244 protected_function = protect_loop(
245 blocking_call.original_func,
246 strict=blocking_call.strict,
247 strict_core=blocking_call.strict_core,
248 check_allowed=blocking_call.check_allowed,
249 loop_thread_id=loop_thread_id,
251 setattr(blocking_call.object, blocking_call.function, protected_function)
252 calls.add(blocking_call)
bool _check_sleep_call_allowed(dict[str, Any] mapped_args)
bool _check_file_allowed(dict[str, Any] mapped_args)
bool _check_import_call_allowed(dict[str, Any] mapped_args)
FrameType get_current_frame(int depth=0)