1 """Webhooks for Home Assistant."""
3 from __future__
import annotations
5 from collections.abc
import Awaitable, Callable, Iterable
6 from http
import HTTPStatus
7 from ipaddress
import ip_address
10 from typing
import TYPE_CHECKING, Any
12 from aiohttp
import StreamReader
13 from aiohttp.hdrs
import METH_GET, METH_HEAD, METH_POST, METH_PUT
14 from aiohttp.web
import Request, Response
15 import voluptuous
as vol
27 _LOGGER = logging.getLogger(__name__)
31 DEFAULT_METHODS = (METH_POST, METH_PUT)
32 SUPPORTED_METHODS = (METH_GET, METH_HEAD, METH_POST, METH_PUT)
33 URL_WEBHOOK_PATH =
"/api/webhook/{webhook_id}"
35 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
45 handler: Callable[[HomeAssistant, str, Request], Awaitable[Response |
None]],
47 local_only: bool |
None =
False,
48 allowed_methods: Iterable[str] |
None =
None,
50 """Register a webhook."""
51 handlers = hass.data.setdefault(DOMAIN, {})
53 if webhook_id
in handlers:
54 raise ValueError(
"Handler is already defined!")
56 if allowed_methods
is None:
57 allowed_methods = DEFAULT_METHODS
58 allowed_methods = frozenset(allowed_methods)
60 if not allowed_methods.issubset(SUPPORTED_METHODS):
62 f
"Unexpected method: {allowed_methods.difference(SUPPORTED_METHODS)}"
65 handlers[webhook_id] = {
69 "local_only": local_only,
70 "allowed_methods": allowed_methods,
77 """Remove a webhook."""
78 handlers = hass.data.setdefault(DOMAIN, {})
79 handlers.pop(webhook_id,
None)
84 """Generate a webhook_id."""
85 return secrets.token_hex(32)
93 allow_internal: bool =
True,
94 allow_external: bool =
True,
95 allow_ip: bool |
None =
None,
96 prefer_external: bool |
None =
True,
98 """Generate the full URL for a webhook_id."""
102 allow_internal=allow_internal,
103 allow_external=allow_external,
106 prefer_external=prefer_external,
108 f"{async_generate_path(webhook_id)}"
114 """Generate the path component for a webhook_id."""
115 return URL_WEBHOOK_PATH.format(webhook_id=webhook_id)
120 hass: HomeAssistant, webhook_id: str, request: Request | MockRequest
122 """Handle a webhook."""
123 handlers: dict[str, dict[str, Any]] = hass.data.setdefault(DOMAIN, {})
125 content_stream: StreamReader | MockStreamReader
126 if isinstance(request, MockRequest):
127 received_from = request.mock_source
128 content_stream = request.content
129 method_name = request.method
131 received_from = request.remote
132 content_stream = request.content
133 method_name = request.method
136 if (webhook := handlers.get(webhook_id))
is None:
138 "Received message for unregistered webhook %s from %s",
144 content = await content_stream.read(64)
145 _LOGGER.debug(
"%s", content)
146 return Response(status=HTTPStatus.OK)
148 if method_name
not in webhook[
"allowed_methods"]:
149 if method_name == METH_HEAD:
151 return Response(status=HTTPStatus.OK)
154 "Webhook %s only supports %s methods but %s was received from %s",
156 ",".join(webhook[
"allowed_methods"]),
160 return Response(status=HTTPStatus.METHOD_NOT_ALLOWED)
162 if webhook[
"local_only"]
in (
True,
None)
and not isinstance(request, MockRequest):
166 assert isinstance(request, Request)
167 assert request.remote
is not None
170 request_remote = ip_address(request.remote)
172 _LOGGER.debug(
"Unable to parse remote ip %s", request.remote)
173 return Response(status=HTTPStatus.OK)
175 is_local = network.is_local(request_remote)
178 _LOGGER.warning(
"Received remote request for local webhook %s", webhook_id)
179 if webhook[
"local_only"]:
180 return Response(status=HTTPStatus.OK)
181 if not webhook.get(
"warned_about_deprecation"):
182 webhook[
"warned_about_deprecation"] =
True
184 "Deprecation warning: "
185 "Webhook '%s' does not provide a value for local_only. "
186 "This webhook will be blocked after the 2023.11.0 release. "
187 "Use `local_only: false` to keep this webhook operating as-is",
192 response: Response |
None = await webhook[
"handler"](hass, webhook_id, request)
194 response = Response(status=HTTPStatus.OK)
196 _LOGGER.exception(
"Error processing webhook %s", webhook_id)
197 return Response(status=HTTPStatus.OK)
201 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
202 """Initialize the webhook component."""
203 hass.http.register_view(WebhookView)
204 websocket_api.async_register_command(hass, websocket_list)
205 websocket_api.async_register_command(hass, websocket_handle)
210 """Handle incoming webhook requests."""
212 url = URL_WEBHOOK_PATH
214 requires_auth =
False
217 async
def _handle(self, request: Request, webhook_id: str) -> Response:
218 """Handle webhook call."""
219 _LOGGER.debug(
"Handling webhook %s payload for %s", request.method, webhook_id)
220 hass = request.app[KEY_HASS]
229 @websocket_api.websocket_command(
{
"type": "webhook/list",
}
)
236 """Return a list of webhooks."""
237 handlers = hass.data.setdefault(DOMAIN, {})
240 "webhook_id": webhook_id,
241 "domain": info[
"domain"],
242 "name": info[
"name"],
243 "local_only": info[
"local_only"],
244 "allowed_methods": sorted(info[
"allowed_methods"]),
246 for webhook_id, info
in handlers.items()
249 connection.send_message(websocket_api.result_message(msg[
"id"], result))
252 @websocket_api.websocket_command(
{
vol.Required("type"):
"webhook/handle",
253 vol.Required(
"webhook_id"): str,
254 vol.Required(
"method"): vol.In(SUPPORTED_METHODS),
255 vol.Optional(
"body", default=
""): str,
256 vol.Optional(
"headers", default={}): {str: str},
257 vol.Optional(
"query", default=
""): str,
260 @websocket_api.async_response
266 """Handle an incoming webhook via the WS API."""
268 content=msg[
"body"].encode(
"utf-8"),
269 headers=msg[
"headers"],
270 method=msg[
"method"],
271 query_string=msg[
"query"],
272 mock_source=f
"{DOMAIN}/ws",
278 body = response_dict.get(
"body")
280 connection.send_result(
284 "status": response_dict[
"status"],
285 "headers": {
"Content-Type": response.content_type},
288
Response _handle(self, Request request, str webhook_id)
bool async_setup(HomeAssistant hass, ConfigType config)
None websocket_list(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None async_unregister(HomeAssistant hass, str webhook_id)
None async_register(HomeAssistant hass, str domain, str name, str webhook_id, Callable[[HomeAssistant, str, Request], Awaitable[Response|None]] handler, *bool|None local_only=False, Iterable[str]|None allowed_methods=None)
Response async_handle_webhook(HomeAssistant hass, str webhook_id, Request|MockRequest request)
None websocket_handle(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
str async_generate_path(str webhook_id)
str async_generate_url(HomeAssistant hass, str webhook_id, bool allow_internal=True, bool allow_external=True, bool|None allow_ip=None, bool|None prefer_external=True)
bool is_cloud_connection(HomeAssistant hass)
dict[str, Any] serialize_response(web.Response response)