1 """Web socket API for Insteon scenes."""
3 from pyinsteon
import devices
4 from pyinsteon.constants
import ResponseStatus
5 from pyinsteon.managers.scene_manager
import (
7 async_add_or_update_scene,
12 import voluptuous
as vol
17 from ..const
import ID, TYPE
21 """Return a dictionary mapping of a scene."""
23 for addr, links
in scene[
"devices"].items():
25 device_dict[str_addr] = []
27 device_dict[str_addr].append(
32 "has_controller": data.has_controller,
33 "has_responder": data.has_responder,
36 return {
"name": scene[
"name"],
"group": scene[
"group"],
"devices": device_dict}
39 @websocket_api.websocket_command({vol.Required(TYPE):
"insteon/scenes/get"})
40 @websocket_api.require_admin
41 @websocket_api.async_response
47 """Get all Insteon scenes."""
48 scenes = await async_get_scenes(work_dir=hass.config.config_dir)
50 scene_num:
_scene_to_dict(scene)
for scene_num, scene
in scenes.items()
52 connection.send_result(msg[ID], scenes_dict)
55 @websocket_api.websocket_command(
{vol.Required(TYPE):
"insteon/scene/get", vol.Required(
"scene_id"): int}
57 @websocket_api.require_admin
58 @websocket_api.async_response
64 """Get an Insteon scene."""
65 scene_id = msg[
"scene_id"]
66 scene = await async_get_scene(scene_num=scene_id, work_dir=hass.config.config_dir)
70 @websocket_api.websocket_command(
{
vol.Required(TYPE):
"insteon/scene/save",
71 vol.Required(
"name"): str,
72 vol.Required(
"scene_id"): int,
73 vol.Required(
"links"): DeviceLinkSchema,
76 @websocket_api.require_admin
77 @websocket_api.async_response
83 """Save an Insteon scene."""
84 scene_id = msg[
"scene_id"]
88 scene_id, result = await async_add_or_update_scene(
89 scene_num=scene_id, links=links, name=name, work_dir=hass.config.config_dir
91 await devices.async_save(workdir=hass.config.config_dir)
92 connection.send_result(
93 msg[ID], {
"scene_id": scene_id,
"result": result == ResponseStatus.SUCCESS}
97 @websocket_api.websocket_command(
{
vol.Required(TYPE):
"insteon/scene/delete",
98 vol.Required(
"scene_id"): int,
101 @websocket_api.require_admin
102 @websocket_api.async_response
108 """Delete an Insteon scene."""
109 scene_id = msg[
"scene_id"]
111 result = await async_delete_scene(
112 scene_num=scene_id, work_dir=hass.config.config_dir
114 await devices.async_save(workdir=hass.config.config_dir)
115 connection.send_result(
116 msg[ID], {
"scene_id": scene_id,
"result": result == ResponseStatus.SUCCESS}
118
None websocket_save_scene(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict msg)
None websocket_get_scenes(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict msg)
def _scene_to_dict(scene)
None websocket_delete_scene(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict msg)
None websocket_get_scene(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict msg)