1 """Helper for httpx."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Coroutine
7 from typing
import Any, Self
18 create_no_verify_ssl_context,
21 from .frame
import warn_use
26 KEEP_ALIVE_TIMEOUT = 15
27 DATA_ASYNC_CLIENT: HassKey[httpx.AsyncClient] =
HassKey(
"httpx_async_client")
28 DATA_ASYNC_CLIENT_NOVERIFY: HassKey[httpx.AsyncClient] =
HassKey(
29 "httpx_async_client_noverify"
31 DEFAULT_LIMITS = limits = httpx.Limits(keepalive_expiry=KEEP_ALIVE_TIMEOUT)
33 f
"{APPLICATION_NAME}/{__version__} "
34 f
"httpx/{httpx.__version__} Python/{sys.version_info[0]}.{sys.version_info[1]}"
36 USER_AGENT =
"User-Agent"
41 def get_async_client(hass: HomeAssistant, verify_ssl: bool =
True) -> httpx.AsyncClient:
42 """Return default httpx AsyncClient.
44 This method must be run in the event loop.
46 key = DATA_ASYNC_CLIENT
if verify_ssl
else DATA_ASYNC_CLIENT_NOVERIFY
48 if (client := hass.data.get(key))
is None:
55 """httpx AsyncClient that suppresses context management."""
58 """Prevent an integration from reopen of the client via context manager."""
61 async
def __aexit__(self, *args: object) ->
None:
62 """Prevent an integration from close of the client via context manager."""
68 verify_ssl: bool =
True,
69 auto_cleanup: bool =
True,
70 ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT,
72 ) -> httpx.AsyncClient:
73 """Create a new httpx.AsyncClient with kwargs, i.e. for cookies.
75 If auto_cleanup is False, the client will be
76 automatically closed on homeassistant_stop.
78 This method must be run in the event loop.
87 headers={USER_AGENT: SERVER_SOFTWARE},
88 limits=DEFAULT_LIMITS,
92 original_aclose = client.aclose
94 client.aclose = warn_use(
95 client.aclose,
"closes the Home Assistant httpx client"
107 client: httpx.AsyncClient,
108 original_aclose: Callable[[], Coroutine[Any, Any,
None]],
110 """Register httpx AsyncClient aclose on Home Assistant shutdown.
112 This method must be run in the event loop.
115 async
def _async_close_client(event: Event) ->
None:
116 """Close httpx client."""
117 await original_aclose()
119 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, _async_close_client)
httpx.AsyncClient get_async_client(HomeAssistant hass, bool verify_ssl=True)
None _async_register_async_client_shutdown(HomeAssistant hass, httpx.AsyncClient client, Callable[[], Coroutine[Any, Any, None]] original_aclose)
httpx.AsyncClient create_async_httpx_client(HomeAssistant hass, bool verify_ssl=True, bool auto_cleanup=True, SSLCipherList ssl_cipher_list=SSLCipherList.PYTHON_DEFAULT, **Any kwargs)
ssl.SSLContext client_context(SSLCipherList ssl_cipher_list=SSLCipherList.PYTHON_DEFAULT)
ssl.SSLContext create_no_verify_ssl_context(SSLCipherList ssl_cipher_list=SSLCipherList.PYTHON_DEFAULT)