1 """Websocket API to interact with the floor registry."""
5 import voluptuous
as vol
16 """Register the floor registry WS commands."""
17 websocket_api.async_register_command(hass, websocket_list_floors)
18 websocket_api.async_register_command(hass, websocket_create_floor)
19 websocket_api.async_register_command(hass, websocket_delete_floor)
20 websocket_api.async_register_command(hass, websocket_update_floor)
24 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/floor_registry/list",
29 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
31 """Handle list floors command."""
32 registry = fr.async_get(hass)
33 connection.send_result(
35 [
_entry_dict(entry)
for entry
in registry.async_list_floors()],
39 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/floor_registry/create",
40 vol.Required(
"name"): str,
41 vol.Optional(
"aliases"): list,
42 vol.Optional(
"icon"): vol.Any(str,
None),
43 vol.Optional(
"level"): vol.Any(int,
None),
46 @websocket_api.require_admin
49 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
51 """Create floor command."""
52 registry = fr.async_get(hass)
60 data[
"aliases"] = set(data[
"aliases"])
63 entry = registry.async_create(**data)
64 except ValueError
as err:
65 connection.send_error(msg[
"id"],
"invalid_info",
str(err))
67 connection.send_result(msg[
"id"],
_entry_dict(entry))
70 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/floor_registry/delete",
71 vol.Required(
"floor_id"): str,
74 @websocket_api.require_admin
77 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
79 """Delete floor command."""
80 registry = fr.async_get(hass)
83 registry.async_delete(msg[
"floor_id"])
85 connection.send_error(msg[
"id"],
"invalid_info",
"Floor ID doesn't exist")
87 connection.send_result(msg[
"id"])
90 @websocket_api.websocket_command(
{
vol.Required("type"):
"config/floor_registry/update",
91 vol.Required(
"floor_id"): str,
92 vol.Optional(
"aliases"): list,
93 vol.Optional(
"icon"): vol.Any(str,
None),
94 vol.Optional(
"level"): vol.Any(int,
None),
95 vol.Optional(
"name"): str,
98 @websocket_api.require_admin
101 hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
103 """Handle update floor websocket command."""
104 registry = fr.async_get(hass)
110 if "aliases" in data:
112 data[
"aliases"] = set(data[
"aliases"])
115 entry = registry.async_update(**data)
116 except ValueError
as err:
117 connection.send_error(msg[
"id"],
"invalid_info",
str(err))
119 connection.send_result(msg[
"id"],
_entry_dict(entry))
123 def _entry_dict(entry: FloorEntry) -> dict[str, Any]:
124 """Convert entry to API format."""
126 "aliases":
list(entry.aliases),
127 "created_at": entry.created_at.timestamp(),
128 "floor_id": entry.floor_id,
130 "level": entry.level,
132 "modified_at": entry.modified_at.timestamp(),
134
None websocket_create_floor(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
dict[str, Any] _entry_dict(FloorEntry entry)
None websocket_list_floors(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
None websocket_update_floor(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
None websocket_delete_floor(HomeAssistant hass, ActiveConnection connection, dict[str, Any] msg)
bool async_setup(HomeAssistant hass)