1 """Rest API for Home Assistant."""
4 from asyncio
import shield, timeout
5 from functools
import lru_cache
6 from http
import HTTPStatus
10 from aiohttp
import web
11 from aiohttp.web_exceptions
import HTTPBadRequest
12 import voluptuous
as vol
24 EVENT_HOMEASSISTANT_STOP,
26 KEY_DATA_LOGGING
as DATA_LOGGING,
42 InvalidEntityFormatError,
55 _LOGGER = logging.getLogger(__name__)
57 ATTR_BASE_URL =
"base_url"
58 ATTR_EXTERNAL_URL =
"external_url"
59 ATTR_INTERNAL_URL =
"internal_url"
60 ATTR_LOCATION_NAME =
"location_name"
61 ATTR_INSTALLATION_TYPE =
"installation_type"
62 ATTR_REQUIRES_API_PASSWORD =
"requires_api_password"
64 ATTR_VERSION =
"version"
67 STREAM_PING_PAYLOAD =
"ping"
68 STREAM_PING_INTERVAL = 50
69 SERVICE_WAIT_TIMEOUT = 10
71 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
74 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
75 """Register the API with the HTTP interface."""
76 hass.http.register_view(APIStatusView)
77 hass.http.register_view(APICoreStateView)
78 hass.http.register_view(APIEventStream)
79 hass.http.register_view(APIConfigView)
80 hass.http.register_view(APIStatesView)
81 hass.http.register_view(APIEntityStateView)
82 hass.http.register_view(APIEventListenersView)
83 hass.http.register_view(APIEventView)
84 hass.http.register_view(APIServicesView)
85 hass.http.register_view(APIDomainServicesView)
86 hass.http.register_view(APIComponentsView)
87 hass.http.register_view(APITemplateView)
89 if DATA_LOGGING
in hass.data:
90 hass.http.register_view(APIErrorLog)
96 """View to handle Status requests."""
102 def get(self, request: web.Request) -> web.Response:
103 """Retrieve if API is running."""
104 return self.json_message(
"API running.")
108 """View to handle core state requests."""
110 url = URL_API_CORE_STATE
111 name =
"api:core:state"
114 def get(self, request: web.Request) -> web.Response:
115 """Retrieve the current core state.
117 This API is intended to be a fast and lightweight way to check if the
118 Home Assistant core is running. Its primary use case is for supervisor
119 to check if Home Assistant is running.
121 hass = request.app[KEY_HASS]
122 migration = recorder.async_migration_in_progress(hass)
123 live = recorder.async_migration_is_live(hass)
124 recorder_state = {
"migration_in_progress": migration,
"migration_is_live": live}
125 return self.json({
"state": hass.state.value,
"recorder_state": recorder_state})
129 """View to handle EventStream requests."""
135 async
def get(self, request: web.Request) -> web.StreamResponse:
136 """Provide a streaming interface for the event bus."""
137 hass = request.app[KEY_HASS]
139 to_write: asyncio.Queue[object | str] = asyncio.Queue()
141 restrict: list[EventType[Any] | str] |
None =
None
142 if restrict_str := request.query.get(
"restrict"):
143 restrict = [*restrict_str.split(
","), EVENT_HOMEASSISTANT_STOP]
145 async
def forward_events(event: Event) ->
None:
146 """Forward events to the open request."""
147 if restrict
and event.event_type
not in restrict:
150 _LOGGER.debug(
"STREAM %s FORWARDING %s", id(stop_obj), event)
152 if event.event_type == EVENT_HOMEASSISTANT_STOP:
157 await to_write.put(data)
159 response = web.StreamResponse()
160 response.content_type =
"text/event-stream"
161 await response.prepare(request)
163 unsub_stream = hass.bus.async_listen(MATCH_ALL, forward_events)
166 _LOGGER.debug(
"STREAM %s ATTACHED", id(stop_obj))
169 await to_write.put(STREAM_PING_PAYLOAD)
173 async
with timeout(STREAM_PING_INTERVAL):
174 payload = await to_write.get()
176 if payload
is stop_obj:
179 msg = f
"data: {payload}\n\n"
180 _LOGGER.debug(
"STREAM %s WRITING %s", id(stop_obj), msg.strip())
181 await response.write(msg.encode(
"UTF-8"))
183 await to_write.put(STREAM_PING_PAYLOAD)
185 except asyncio.CancelledError:
186 _LOGGER.debug(
"STREAM %s ABORT", id(stop_obj))
189 _LOGGER.debug(
"STREAM %s RESPONSE CLOSED", id(stop_obj))
196 """View to handle Configuration requests."""
202 def get(self, request: web.Request) -> web.Response:
203 """Get current configuration."""
204 return self.json(request.app[KEY_HASS].config.as_dict())
208 """View to handle States requests."""
214 def get(self, request: web.Request) -> web.Response:
215 """Get current states."""
216 user: User = request[KEY_HASS_USER]
217 hass = request.app[KEY_HASS]
219 states = (state.as_dict_json
for state
in hass.states.async_all())
221 entity_perm = user.permissions.check_entity
224 for state
in hass.states.async_all()
225 if entity_perm(state.entity_id,
"read")
227 response = web.Response(
228 body=b
"".join((b
"[", b
",".join(states), b
"]")),
229 content_type=CONTENT_TYPE_JSON,
230 zlib_executor_size=32768,
232 response.enable_compression()
237 """View to handle EntityState requests."""
239 url =
"/api/states/{entity_id}"
240 name =
"api:entity-state"
243 def get(self, request: web.Request, entity_id: str) -> web.Response:
244 """Retrieve state of entity."""
245 user: User = request[KEY_HASS_USER]
246 hass = request.app[KEY_HASS]
247 if not user.permissions.check_entity(entity_id, POLICY_READ):
250 if state := hass.states.get(entity_id):
252 body=state.as_dict_json,
253 content_type=CONTENT_TYPE_JSON,
255 return self.json_message(
"Entity not found.", HTTPStatus.NOT_FOUND)
257 async
def post(self, request: web.Request, entity_id: str) -> web.Response:
258 """Update state of entity."""
259 user: User = request[KEY_HASS_USER]
260 if not user.is_admin:
262 hass = request.app[KEY_HASS]
264 data = await request.json()
266 return self.json_message(
"Invalid JSON specified.", HTTPStatus.BAD_REQUEST)
268 if (new_state := data.get(
"state"))
is None:
269 return self.json_message(
"No state specified.", HTTPStatus.BAD_REQUEST)
271 attributes = data.get(
"attributes")
272 force_update = data.get(
"force_update",
False)
274 is_new_state = hass.states.get(entity_id)
is None
278 hass.states.async_set(
279 entity_id, new_state, attributes, force_update, self.context(request)
281 except InvalidEntityFormatError:
282 return self.json_message(
283 "Invalid entity ID specified.", HTTPStatus.BAD_REQUEST
285 except InvalidStateError:
286 return self.json_message(
"Invalid state specified.", HTTPStatus.BAD_REQUEST)
289 status_code = HTTPStatus.CREATED
if is_new_state
else HTTPStatus.OK
290 state = hass.states.get(entity_id)
292 resp = self.json(state.as_dict(), status_code)
294 resp.headers.add(
"Location", f
"/api/states/{entity_id}")
299 def delete(self, request: web.Request, entity_id: str) -> web.Response:
301 if not request[KEY_HASS_USER].is_admin:
303 if request.app[KEY_HASS].states.async_remove(entity_id):
304 return self.json_message(
"Entity removed.")
305 return self.json_message(
"Entity not found.", HTTPStatus.NOT_FOUND)
309 """View to handle EventListeners requests."""
312 name =
"api:event-listeners"
315 def get(self, request: web.Request) -> web.Response:
316 """Get event listeners."""
321 """View to handle Event requests."""
323 url =
"/api/events/{event_type}"
327 async
def post(self, request: web.Request, event_type: str) -> web.Response:
329 body = await request.text()
331 event_data: Any =
json_loads(body)
if body
else None
333 return self.json_message(
334 "Event data should be valid JSON.", HTTPStatus.BAD_REQUEST
337 if event_data
is not None and not isinstance(event_data, dict):
338 return self.json_message(
339 "Event data should be a JSON object", HTTPStatus.BAD_REQUEST
344 if event_type == EVENT_STATE_CHANGED
and event_data:
345 for key
in (
"old_state",
"new_state"):
346 state = ha.State.from_dict(event_data[key])
349 event_data[key] = state
351 request.app[KEY_HASS].bus.async_fire(
352 event_type, event_data, ha.EventOrigin.remote, self.context(request)
355 return self.json_message(f
"Event {event_type} fired.")
359 """View to handle Services requests."""
361 url = URL_API_SERVICES
362 name =
"api:services"
364 async
def get(self, request: web.Request) -> web.Response:
365 """Get registered services."""
367 return self.json(services)
371 """View to handle DomainServices requests."""
373 url =
"/api/services/{domain}/{service}"
374 name =
"api:domain-services"
377 self, request: web.Request, domain: str, service: str
381 Returns a list of changed states.
383 hass = request.app[KEY_HASS]
384 body = await request.text()
388 return self.json_message(
389 "Data should be valid JSON.", HTTPStatus.BAD_REQUEST
392 context = self.context(request)
393 if not hass.services.has_service(domain, service):
396 if response_requested :=
"return_response" in request.query:
398 hass.services.supports_response(domain, service)
399 is ha.SupportsResponse.NONE
401 return self.json_message(
402 "Service does not support responses. Remove return_response from request.",
403 HTTPStatus.BAD_REQUEST,
406 hass.services.supports_response(domain, service)
is ha.SupportsResponse.ONLY
408 return self.json_message(
409 "Service call requires responses but caller did not ask for responses. "
410 "Add ?return_response to query parameters.",
411 HTTPStatus.BAD_REQUEST,
414 changed_states: list[json_fragment] = []
417 def _async_save_changed_entities(
418 event: Event[EventStateChangedData],
420 if event.context == context
and (state := event.data[
"new_state"]):
421 changed_states.append(state.json_fragment)
423 cancel_listen = hass.bus.async_listen(
425 _async_save_changed_entities,
430 response = await shield(
431 hass.services.async_call(
437 return_response=response_requested,
440 except (vol.Invalid, ServiceNotFound)
as ex:
441 raise HTTPBadRequest
from ex
445 if response_requested:
447 {
"changed_states": changed_states,
"service_response": response}
450 return self.json(changed_states)
454 """View to handle Components requests."""
456 url = URL_API_COMPONENTS
457 name =
"api:components"
460 def get(self, request: web.Request) -> web.Response:
461 """Get current loaded components."""
462 return self.json(request.app[KEY_HASS].config.components)
467 """Return a cached template."""
468 return template.Template(template_str, hass)
472 """View to handle Template requests."""
474 url = URL_API_TEMPLATE
475 name =
"api:template"
478 async
def post(self, request: web.Request) -> web.Response:
479 """Render a template."""
481 data = await request.json()
483 return tpl.async_render(variables=data.get(
"variables"), parse_result=
False)
484 except (ValueError, TemplateError)
as ex:
485 return self.json_message(
486 f
"Error rendering template: {ex}", HTTPStatus.BAD_REQUEST
491 """View to fetch the API error log."""
493 url = URL_API_ERROR_LOG
494 name =
"api:error_log"
497 async
def get(self, request: web.Request) -> web.FileResponse:
498 """Retrieve API error log."""
499 hass = request.app[KEY_HASS]
500 response = web.FileResponse(hass.data[DATA_LOGGING])
501 response.enable_compression()
506 """Generate services data to JSONify."""
508 return [{
"domain": key,
"services": value}
for key, value
in descriptions.items()]
513 """Generate event data to JSONify."""
515 {
"event": key,
"listener_count": value}
516 for key, value
in hass.bus.async_listeners().items()
web.Response get(self, web.Request request)
web.Response get(self, web.Request request)
web.Response get(self, web.Request request)
web.Response post(self, web.Request request, str domain, str service)
web.Response post(self, web.Request request, str entity_id)
web.Response get(self, web.Request request, str entity_id)
web.Response delete(self, web.Request request, str entity_id)
web.FileResponse get(self, web.Request request)
web.Response get(self, web.Request request)
web.StreamResponse get(self, web.Request request)
web.Response post(self, web.Request request, str event_type)
web.Response get(self, web.Request request)
web.Response get(self, web.Request request)
web.Response get(self, web.Request request)
web.Response post(self, web.Request request)
list[dict[str, Any]] async_events_json(HomeAssistant hass)
template.Template _cached_template(str template_str, HomeAssistant hass)
list[dict[str, Any]] async_services_json(HomeAssistant hass)
bool async_setup(HomeAssistant hass, ConfigType config)
dict[str, dict[str, Any]] async_get_all_descriptions(HomeAssistant hass)