1 """Support for LIFX Cloud scenes."""
3 from __future__
import annotations
6 from http
import HTTPStatus
11 from aiohttp.hdrs
import AUTHORIZATION
12 import voluptuous
as vol
22 _LOGGER = logging.getLogger(__name__)
26 PLATFORM_SCHEMA = vol.Schema(
28 vol.Required(CONF_PLATFORM):
"lifx_cloud",
29 vol.Required(CONF_TOKEN): cv.string,
30 vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
38 async_add_entities: AddEntitiesCallback,
39 discovery_info: DiscoveryInfoType |
None =
None,
41 """Set up the scenes stored in the LIFX Cloud."""
42 token = config.get(CONF_TOKEN)
43 timeout = config.get(CONF_TIMEOUT)
45 headers: dict[str, str] = {AUTHORIZATION: f
"Bearer {token}"}
47 url =
"https://api.lifx.com/v1/scenes"
51 async
with asyncio.timeout(timeout):
52 scenes_resp = await httpsession.get(url, headers=headers)
54 except (TimeoutError, aiohttp.ClientError):
55 _LOGGER.exception(
"Error on %s", url)
58 status = scenes_resp.status
59 if status == HTTPStatus.OK:
60 data = await scenes_resp.json()
61 devices = [
LifxCloudScene(hass, headers, timeout, scene)
for scene
in data]
64 if status == HTTPStatus.UNAUTHORIZED:
65 _LOGGER.error(
"Unauthorized (bad token?) on %s", url)
68 _LOGGER.error(
"HTTP error %d on %s", scenes_resp.status, url)
72 """Representation of a LIFX Cloud scene."""
74 def __init__(self, hass, headers, timeout, scene_data):
75 """Initialize the scene."""
79 self.
_name_name = scene_data[
"name"]
80 self.
_uuid_uuid = scene_data[
"uuid"]
84 """Return the name of the scene."""
85 return self.
_name_name
88 """Activate the scene."""
89 url = f
"https://api.lifx.com/v1/scenes/scene_id:{self._uuid}/activate"
93 async
with asyncio.timeout(self.
_timeout_timeout):
94 await httpsession.put(url, headers=self.
_headers_headers)
96 except (TimeoutError, aiohttp.ClientError):
97 _LOGGER.exception(
"Error on %s", url)
None async_activate(self, **Any kwargs)
def __init__(self, hass, headers, timeout, scene_data)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)