1 """Support for exposing regular REST commands as services."""
3 from __future__
import annotations
5 from http
import HTTPStatus
6 from json.decoder
import JSONDecodeError
11 from aiohttp
import hdrs
12 import voluptuous
as vol
38 DOMAIN =
"rest_command"
40 _LOGGER = logging.getLogger(__name__)
43 DEFAULT_METHOD =
"get"
44 DEFAULT_VERIFY_SSL =
True
46 SUPPORT_REST_METHODS = [
"get",
"patch",
"post",
"put",
"delete"]
48 CONF_CONTENT_TYPE =
"content_type"
50 COMMAND_SCHEMA = vol.Schema(
52 vol.Required(CONF_URL): cv.template,
53 vol.Optional(CONF_METHOD, default=DEFAULT_METHOD): vol.All(
54 vol.Lower, vol.In(SUPPORT_REST_METHODS)
56 vol.Optional(CONF_HEADERS): vol.Schema({cv.string: cv.template}),
57 vol.Inclusive(CONF_USERNAME,
"authentication"): cv.string,
58 vol.Inclusive(CONF_PASSWORD,
"authentication"): cv.string,
59 vol.Optional(CONF_PAYLOAD): cv.template,
60 vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): vol.Coerce(int),
61 vol.Optional(CONF_CONTENT_TYPE): cv.string,
62 vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): cv.boolean,
66 CONFIG_SCHEMA = vol.Schema(
67 {DOMAIN: cv.schema_with_slug_keys(COMMAND_SCHEMA)}, extra=vol.ALLOW_EXTRA
71 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
72 """Set up the REST command component."""
74 async
def reload_service_handler(service: ServiceCall) ->
None:
75 """Remove all rest_commands and load new ones from config."""
82 existing = hass.services.async_services_for_domain(DOMAIN)
83 for existing_service
in existing:
84 if existing_service == SERVICE_RELOAD:
86 hass.services.async_remove(DOMAIN, existing_service)
88 for name, command_config
in conf[DOMAIN].items():
89 async_register_rest_command(name, command_config)
92 def async_register_rest_command(name: str, command_config: dict[str, Any]) ->
None:
93 """Create service for rest command."""
95 timeout = command_config[CONF_TIMEOUT]
96 method = command_config[CONF_METHOD]
98 template_url = command_config[CONF_URL]
101 if CONF_USERNAME
in command_config:
102 username = command_config[CONF_USERNAME]
103 password = command_config.get(CONF_PASSWORD,
"")
104 auth = aiohttp.BasicAuth(username, password=password)
106 template_payload =
None
107 if CONF_PAYLOAD
in command_config:
108 template_payload = command_config[CONF_PAYLOAD]
110 template_headers = command_config.get(CONF_HEADERS, {})
112 content_type = command_config.get(CONF_CONTENT_TYPE)
114 async
def async_service_handler(service: ServiceCall) -> ServiceResponse:
115 """Execute a shell command service."""
119 template_payload.async_render(
120 variables=service.data, parse_result=
False
125 request_url = template_url.async_render(
126 variables=service.data, parse_result=
False
130 for header_name, template_header
in template_headers.items():
131 headers[header_name] = template_header.async_render(
132 variables=service.data, parse_result=
False
136 headers[hdrs.CONTENT_TYPE] = content_type
139 async
with getattr(websession, method)(
143 headers=headers
or None,
146 if response.status < HTTPStatus.BAD_REQUEST:
148 "Success. Url: %s. Status code: %d. Payload: %s",
155 "Error. Url: %s. Status code %d. Payload: %s",
161 if not service.return_response:
166 if response.content_type ==
"application/json":
167 _content = await response.json()
169 _content = await response.text()
170 except (JSONDecodeError, AttributeError)
as err:
172 translation_domain=DOMAIN,
173 translation_key=
"decoding_error",
174 translation_placeholders={
175 "request_url": request_url,
176 "decoding_type":
"JSON",
180 except UnicodeDecodeError
as err:
182 translation_domain=DOMAIN,
183 translation_key=
"decoding_error",
184 translation_placeholders={
185 "request_url": request_url,
186 "decoding_type":
"text",
189 return {
"content": _content,
"status": response.status}
191 except TimeoutError
as err:
193 translation_domain=DOMAIN,
194 translation_key=
"timeout",
195 translation_placeholders={
"request_url": request_url},
198 except aiohttp.ClientError
as err:
199 _LOGGER.error(
"Error fetching data: %s", err)
201 translation_domain=DOMAIN,
202 translation_key=
"client_error",
203 translation_placeholders={
"request_url": request_url},
207 hass.services.async_register(
210 async_service_handler,
211 supports_response=SupportsResponse.OPTIONAL,
214 for name, command_config
in config[DOMAIN].items():
215 async_register_rest_command(name, command_config)
217 hass.services.async_register(
218 DOMAIN, SERVICE_RELOAD, reload_service_handler, schema=vol.Schema({})
bool async_setup(HomeAssistant hass, ConfigType config)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
ConfigType|None async_integration_yaml_config(HomeAssistant hass, str integration_name)