1 """Websocket API to interact with the category registry."""
5 import voluptuous
as vol
15 """Register the category registry WS commands."""
16 websocket_api.async_register_command(hass, websocket_list_categories)
17 websocket_api.async_register_command(hass, websocket_create_category)
18 websocket_api.async_register_command(hass, websocket_delete_category)
19 websocket_api.async_register_command(hass, websocket_update_category)
23 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/category_registry/list",
24 vol.Required(
"scope"): str,
29 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
31 """Handle list categories command."""
32 category_registry = cr.async_get(hass)
33 connection.send_result(
37 for entry
in category_registry.async_list_categories(scope=msg[
"scope"])
42 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/category_registry/create",
43 vol.Required(
"scope"): str,
44 vol.Required(
"name"): str,
45 vol.Optional(
"icon"): vol.Any(cv.icon,
None),
48 @websocket_api.require_admin
51 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
53 """Create category command."""
54 category_registry = cr.async_get(hass)
61 entry = category_registry.async_create(**data)
62 except ValueError
as err:
63 connection.send_error(msg[
"id"],
"invalid_info",
str(err))
65 connection.send_result(msg[
"id"],
_entry_dict(entry))
68 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/category_registry/delete",
69 vol.Required(
"scope"): str,
70 vol.Required(
"category_id"): str,
73 @websocket_api.require_admin
76 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
78 """Delete category command."""
79 category_registry = cr.async_get(hass)
82 category_registry.async_delete(
83 scope=msg[
"scope"], category_id=msg[
"category_id"]
86 connection.send_error(msg[
"id"],
"invalid_info",
"Category ID doesn't exist")
88 connection.send_result(msg[
"id"])
91 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/category_registry/update",
92 vol.Required(
"scope"): str,
93 vol.Required(
"category_id"): str,
94 vol.Optional(
"name"): str,
95 vol.Optional(
"icon"): vol.Any(cv.icon,
None),
98 @websocket_api.require_admin
101 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
103 """Handle update category websocket command."""
104 category_registry = cr.async_get(hass)
111 entry = category_registry.async_update(**data)
112 except ValueError
as err:
113 connection.send_error(msg[
"id"],
"invalid_info",
str(err))
115 connection.send_error(msg[
"id"],
"invalid_info",
"Category ID doesn't exist")
117 connection.send_result(msg[
"id"],
_entry_dict(entry))
121 def _entry_dict(entry: cr.CategoryEntry) -> dict[str, Any]:
122 """Convert entry to API format."""
124 "category_id": entry.category_id,
125 "created_at": entry.created_at.timestamp(),
127 "modified_at": entry.modified_at.timestamp(),
130
bool async_setup(HomeAssistant hass)
None websocket_delete_category(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
None websocket_create_category(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
dict[str, Any] _entry_dict(cr.CategoryEntry entry)
None websocket_list_categories(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
None websocket_update_category(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)