1 """KNX Websocket API."""
3 from __future__
import annotations
6 from collections.abc
import Awaitable, Callable
7 from functools
import wraps
8 from typing
import TYPE_CHECKING, Any, Final, overload
10 import knx_frontend
as knx_panel
11 import voluptuous
as vol
12 from xknx.telegram
import Telegram
13 from xknxproject.exceptions
import XknxProjectException
24 from .const
import DOMAIN, KNX_MODULE_KEY
25 from .storage.config_store
import ConfigStoreException
26 from .storage.const
import CONF_DATA
27 from .storage.entity_store_schema
import (
28 CREATE_ENTITY_BASE_SCHEMA,
29 UPDATE_ENTITY_BASE_SCHEMA,
31 from .storage.entity_store_validation
import (
32 EntityStoreValidationException,
33 EntityStoreValidationSuccess,
36 from .telegrams
import SIGNAL_KNX_TELEGRAM, TelegramDict
39 from .
import KNXModule
41 URL_BASE: Final =
"/knx_static"
45 """Register the KNX Panel and Websocket API."""
46 websocket_api.async_register_command(hass, ws_info)
47 websocket_api.async_register_command(hass, ws_project_file_process)
48 websocket_api.async_register_command(hass, ws_project_file_remove)
49 websocket_api.async_register_command(hass, ws_group_monitor_info)
50 websocket_api.async_register_command(hass, ws_group_telegrams)
51 websocket_api.async_register_command(hass, ws_subscribe_telegram)
52 websocket_api.async_register_command(hass, ws_get_knx_project)
53 websocket_api.async_register_command(hass, ws_validate_entity)
54 websocket_api.async_register_command(hass, ws_create_entity)
55 websocket_api.async_register_command(hass, ws_update_entity)
56 websocket_api.async_register_command(hass, ws_delete_entity)
57 websocket_api.async_register_command(hass, ws_get_entity_config)
58 websocket_api.async_register_command(hass, ws_get_entity_entries)
59 websocket_api.async_register_command(hass, ws_create_device)
61 if DOMAIN
not in hass.data.get(
"frontend_panels", {}):
62 await hass.http.async_register_static_paths(
66 path=knx_panel.locate_dir(),
67 cache_headers=knx_panel.is_prod_build,
71 await panel_custom.async_register_panel(
73 frontend_url_path=DOMAIN,
74 webcomponent_name=knx_panel.webcomponent_name,
75 sidebar_title=DOMAIN.upper(),
76 sidebar_icon=
"mdi:bus-electric",
77 module_url=f
"{URL_BASE}/{knx_panel.entrypoint_js}",
83 type KnxWebSocketCommandHandler = Callable[
86 type KnxAsyncWebSocketCommandHandler = Callable[
94 func: KnxAsyncWebSocketCommandHandler,
95 ) -> websocket_api.const.AsyncWebSocketCommandHandler: ...
98 func: KnxWebSocketCommandHandler,
99 ) -> websocket_api.const.WebSocketCommandHandler: ...
103 func: KnxAsyncWebSocketCommandHandler | KnxWebSocketCommandHandler,
105 websocket_api.const.AsyncWebSocketCommandHandler
106 | websocket_api.const.WebSocketCommandHandler
108 """Websocket decorator to provide a KNXModule instance."""
110 def _send_not_loaded_error(
111 connection: websocket_api.ActiveConnection, msg_id: int
113 connection.send_error(
115 websocket_api.const.ERR_HOME_ASSISTANT_ERROR,
116 "KNX integration not loaded.",
119 if asyncio.iscoroutinefunction(func):
127 """Add KNX Module to call function."""
129 knx = hass.data[KNX_MODULE_KEY]
131 _send_not_loaded_error(connection, msg[
"id"])
133 await func(hass, knx, connection, msg)
143 """Add KNX Module to call function."""
145 knx = hass.data[KNX_MODULE_KEY]
147 _send_not_loaded_error(connection, msg[
"id"])
149 func(hass, knx, connection, msg)
154 @websocket_api.require_admin
155 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/info",
166 """Handle get info command."""
168 if project_info := knx.project.info:
170 "name": project_info[
"name"],
171 "last_modified": project_info[
"last_modified"],
172 "tool_version": project_info[
"tool_version"],
173 "xknxproject_version": project_info[
"xknxproject_version"],
176 connection.send_result(
179 "version": knx.xknx.version,
180 "connected": knx.xknx.connection_manager.connected.is_set(),
181 "current_address":
str(knx.xknx.current_address),
182 "project": _project_info,
187 @websocket_api.require_admin
188 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/get_knx_project",
191 @websocket_api.async_response
199 """Handle get KNX project."""
200 knxproject = await knx.project.get_knxproject()
201 connection.send_result(
204 "project_loaded": knx.project.loaded,
205 "knxproject": knxproject,
210 @websocket_api.require_admin
211 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/project_file_process",
212 vol.Required(
"file_id"): str,
213 vol.Required(
"password"): str,
216 @websocket_api.async_response
224 """Handle get info command."""
226 await knx.project.process_project_file(
228 file_id=msg[
"file_id"],
229 password=msg[
"password"],
231 except (ValueError, XknxProjectException)
as err:
233 connection.send_error(
234 msg[
"id"], websocket_api.ERR_HOME_ASSISTANT_ERROR,
str(err)
238 connection.send_result(msg[
"id"])
241 @websocket_api.require_admin
242 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/project_file_remove",
245 @websocket_api.async_response
253 """Handle get info command."""
254 await knx.project.remove_project_file()
255 connection.send_result(msg[
"id"])
258 @websocket_api.require_admin
259 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/group_monitor_info",
270 """Handle get info command of group monitor."""
271 recent_telegrams = [*knx.telegrams.recent_telegrams]
272 connection.send_result(
275 "project_loaded": knx.project.loaded,
276 "recent_telegrams": recent_telegrams,
281 @websocket_api.require_admin
282 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/group_telegrams",
293 """Handle get group telegrams command."""
294 connection.send_result(
296 knx.telegrams.last_ga_telegrams,
300 @websocket_api.require_admin
301 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/subscribe_telegrams",
310 """Subscribe to incoming and outgoing KNX telegrams."""
313 def forward_telegram(_telegram: Telegram, telegram_dict: TelegramDict) ->
None:
314 """Forward telegram to websocket subscription."""
315 connection.send_event(
322 signal=SIGNAL_KNX_TELEGRAM,
323 target=forward_telegram,
325 connection.send_result(msg[
"id"])
328 @websocket_api.require_admin
329 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/validate_entity",
330 **CREATE_ENTITY_BASE_SCHEMA,
339 """Validate entity data."""
342 except EntityStoreValidationException
as exc:
343 connection.send_result(msg[
"id"], exc.validation_error)
345 connection.send_result(
350 @websocket_api.require_admin
351 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/create_entity",
352 **CREATE_ENTITY_BASE_SCHEMA,
355 @websocket_api.async_response
363 """Create entity in entity store and load it."""
366 except EntityStoreValidationException
as exc:
367 connection.send_result(msg[
"id"], exc.validation_error)
370 entity_id = await knx.config_store.create_entity(
372 validated_data[CONF_PLATFORM],
373 validated_data[CONF_DATA],
375 except ConfigStoreException
as err:
376 connection.send_error(
377 msg[
"id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR,
str(err)
380 connection.send_result(
385 @websocket_api.require_admin
386 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/update_entity",
387 **UPDATE_ENTITY_BASE_SCHEMA,
390 @websocket_api.async_response
398 """Update entity in entity store and reload it."""
401 except EntityStoreValidationException
as exc:
402 connection.send_result(msg[
"id"], exc.validation_error)
405 await knx.config_store.update_entity(
406 validated_data[CONF_PLATFORM],
407 validated_data[CONF_ENTITY_ID],
408 validated_data[CONF_DATA],
410 except ConfigStoreException
as err:
411 connection.send_error(
412 msg[
"id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR,
str(err)
415 connection.send_result(
420 @websocket_api.require_admin
421 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/delete_entity",
422 vol.Required(CONF_ENTITY_ID): str,
425 @websocket_api.async_response
433 """Delete entity from entity store and remove it."""
435 await knx.config_store.delete_entity(msg[CONF_ENTITY_ID])
436 except ConfigStoreException
as err:
437 connection.send_error(
438 msg[
"id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR,
str(err)
441 connection.send_result(msg[
"id"])
444 @websocket_api.require_admin
445 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/get_entity_entries",
456 """Get entities configured from entity store."""
458 entry.extended_dict
for entry
in knx.config_store.get_entity_entries()
460 connection.send_result(msg[
"id"], entity_entries)
463 @websocket_api.require_admin
464 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/get_entity_config",
465 vol.Required(CONF_ENTITY_ID): str,
476 """Get entity configuration from entity store."""
478 config_info = knx.config_store.get_entity_config(msg[CONF_ENTITY_ID])
479 except ConfigStoreException
as err:
480 connection.send_error(
481 msg[
"id"], websocket_api.const.ERR_HOME_ASSISTANT_ERROR,
str(err)
484 connection.send_result(msg[
"id"], config_info)
487 @websocket_api.require_admin
488 @websocket_api.websocket_command(
{
vol.Required("type"):
"knx/create_device",
489 vol.Required(
"name"): str,
490 vol.Optional(
"area_id"): str,
501 """Create a new KNX device."""
502 identifier = f
"knx_vdev_{ulid_now()}"
503 device_registry = dr.async_get(hass)
504 _device = device_registry.async_get_or_create(
505 config_entry_id=knx.entry.entry_id,
508 identifiers={(DOMAIN, identifier)},
510 device_registry.async_update_device(
512 area_id=msg.get(
"area_id")
or UNDEFINED,
513 configuration_url=f
"homeassistant://knx/entities/view?device_id={_device.id}",
515 connection.send_result(msg[
"id"], _device.dict_repr)
516
dict validate_entity_data(dict entity_data)
None ws_get_entity_entries(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_delete_entity(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_get_knx_project(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_project_file_process(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_get_entity_config(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
websocket_api.const .AsyncWebSocketCommandHandler provide_knx(KnxAsyncWebSocketCommandHandler func)
None ws_validate_entity(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_update_entity(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_subscribe_telegram(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_group_monitor_info(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_project_file_remove(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_info(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_group_telegrams(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None ws_create_entity(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
None register_panel(HomeAssistant hass)
None ws_create_device(HomeAssistant hass, KNXModule knx, websocket_api.ActiveConnection connection, dict msg)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)