1 """Support for RESTful API."""
3 from __future__
import annotations
17 from .const
import XML_MIME_TYPES
21 _LOGGER = logging.getLogger(__name__)
25 """Class for handling the data retrieval."""
33 auth: httpx.DigestAuth | tuple[str, str] |
None,
34 headers: dict[str, str] |
None,
35 params: dict[str, str] |
None,
39 timeout: int = DEFAULT_TIMEOUT,
41 """Initialize the data object."""
53 self.
_async_client_async_client: httpx.AsyncClient |
None =
None
54 self.
datadata: str |
None =
None
56 self.
headersheaders: httpx.Headers |
None =
None
59 """Set request data."""
72 """If the data is an XML string, convert it to a JSON string."""
73 _LOGGER.debug(
"Data fetched from resource: %s", self.
datadata)
75 (value := self.
datadata)
is not None
77 and (headers := self.
headersheaders)
is not None
78 and (content_type := headers.get(
"content-type"))
79 and content_type.startswith(XML_MIME_TYPES)
82 _LOGGER.debug(
"JSON converted from XML: %s", value)
86 """Get the latest data from REST service with provided method."""
95 rendered_headers = template.render_complex(self.
_headers_headers, parse_result=
False)
96 rendered_params = template.render_complex(self.
_params_params)
98 _LOGGER.debug(
"Updating from %s", self.
_resource_resource)
103 headers=rendered_headers,
104 params=rendered_params,
105 auth=self.
_auth_auth,
108 follow_redirects=
True,
112 except httpx.TimeoutException
as ex:
114 _LOGGER.error(
"Timeout while fetching data: %s", self.
_resource_resource)
118 except httpx.RequestError
as ex:
121 "Error fetching data: %s failed with %s", self.
_resource_resource, ex
126 except ssl.SSLError
as ex:
129 "Error connecting to %s failed with %s", self.
_resource_resource, ex
None __init__(self, HomeAssistant hass, str method, str resource, str encoding, httpx.DigestAuth|tuple[str, str]|None auth, dict[str, str]|None headers, dict[str, str]|None params, str|None data, bool verify_ssl, str ssl_cipher_list, int timeout=DEFAULT_TIMEOUT)
None set_payload(self, str payload)
None set_url(self, str url)
str|None data_without_xml(self)
None async_update(self, bool log_errors=True)
httpx.AsyncClient create_async_httpx_client(HomeAssistant hass, bool verify_ssl=True, bool auto_cleanup=True, SSLCipherList ssl_cipher_list=SSLCipherList.PYTHON_DEFAULT, **Any kwargs)