1 """Support for LIFX."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from functools
import partial
10 from aiolifx
import products
11 from aiolifx.aiolifx
import Light
12 from aiolifx.message
import Message
13 from awesomeversion
import AwesomeVersion
20 ATTR_COLOR_TEMP_KELVIN,
35 INFRARED_BRIGHTNESS_VALUES_MAP,
39 FIX_MAC_FW = AwesomeVersion(
"3.70")
44 """Check if a config entry is the legacy shared one."""
45 return entry.unique_id
is None or entry.unique_id == DOMAIN
50 """Get the legacy config entry."""
51 for entry
in hass.config_entries.async_entries(DOMAIN):
58 """Convert infrared brightness from value to option."""
59 return INFRARED_BRIGHTNESS_VALUES_MAP.get(value,
None)
63 """Convert infrared brightness option to value."""
64 option_values = {v: k
for k, v
in INFRARED_BRIGHTNESS_VALUES_MAP.items()}
65 return option_values.get(option)
69 """Scale an 8 bit level into 16 bits."""
70 return (value << 8) | value
74 """Scale a 16 bit level into 8 bits."""
79 """Return a feature map for this bulb, or a default map if unknown."""
80 features: dict[str, Any] = (
81 products.features_map.get(bulb.product)
or products.features_map[1]
86 def find_hsbk(hass: HomeAssistant, **kwargs: Any) -> list[float | int |
None] |
None:
87 """Find the desired color from a number of possible inputs.
89 Hue, Saturation, Brightness, Kelvin
91 hue, saturation, brightness, kelvin = [
None] * 4
93 if (color_name := kwargs.get(ATTR_COLOR_NAME))
is not None:
95 hue, saturation = color_util.color_RGB_to_hs(
96 *color_util.color_name_to_rgb(color_name)
100 "Got unknown color %s, falling back to neutral white", color_name
102 hue, saturation = (0, 0)
104 if ATTR_HS_COLOR
in kwargs:
105 hue, saturation = kwargs[ATTR_HS_COLOR]
106 elif ATTR_RGB_COLOR
in kwargs:
107 hue, saturation = color_util.color_RGB_to_hs(*kwargs[ATTR_RGB_COLOR])
108 elif ATTR_XY_COLOR
in kwargs:
109 hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR])
112 assert saturation
is not None
113 hue =
int(hue / 360 * 65535)
114 saturation =
int(saturation / 100 * 65535)
117 if ATTR_KELVIN
in kwargs:
119 "The 'kelvin' parameter is deprecated. Please use 'color_temp_kelvin' for"
122 kelvin = kwargs.pop(ATTR_KELVIN)
125 if ATTR_COLOR_TEMP
in kwargs:
126 kelvin = color_util.color_temperature_mired_to_kelvin(
127 kwargs.pop(ATTR_COLOR_TEMP)
131 if ATTR_COLOR_TEMP_KELVIN
in kwargs:
132 kelvin = kwargs.pop(ATTR_COLOR_TEMP_KELVIN)
135 if ATTR_BRIGHTNESS
in kwargs:
138 if ATTR_BRIGHTNESS_PCT
in kwargs:
139 brightness =
convert_8_to_16(round(255 * kwargs[ATTR_BRIGHTNESS_PCT] / 100))
141 hsbk = [hue, saturation, brightness, kelvin]
142 return None if hsbk == [
None] * 4
else hsbk
146 base: list[float | int |
None], change: list[float | int |
None]
147 ) -> list[float | int |
None]:
148 """Copy change on top of base, except when None.
150 Hue, Saturation, Brightness, Kelvin
152 return [b
if c
is None else c
for b, c
in zip(base, change, strict=
False)]
156 octets = [
int(octet, 16)
for octet
in mac_addr.split(
":")]
157 octets[5] = (octets[5] + offset) % 256
158 return ":".join(f
"{octet:02x}" for octet
in octets)
162 """Check if the firmware version has the off by one mac."""
163 return bool(firmware
and AwesomeVersion(firmware) >= FIX_MAC_FW)
167 """Increment the last byte of the mac address by one for FW>3.70."""
172 """Format the serial number to match the HA device registry."""
173 return dr.format_mac(serial_number)
177 """Check if a mac address matches the serial number."""
178 formatted_mac = dr.format_mac(mac_addr)
186 """Execute a lifx callback method and wait for a response."""
189 [method], DEFAULT_ATTEMPTS, OVERALL_TIMEOUT
195 methods: list[Callable], attempts: int, overall_timeout: int
197 """Execute multiple lifx callback methods with retries and wait for a response.
199 This functional will the overall timeout by the number of attempts and
200 wait for each method to return a result. If we don't get a result
201 within the split timeout, we will send all methods that did not generate
204 If we don't get a result after all attempts, we will raise an
205 TimeoutError exception.
207 loop = asyncio.get_running_loop()
208 futures: list[asyncio.Future] = [loop.create_future()
for _
in methods]
211 bulb: Light, message: Message |
None, future: asyncio.Future[Message]
213 if message
and not future.done():
214 future.set_result(message)
216 timeout_per_attempt = overall_timeout / attempts
218 for _
in range(attempts):
219 for idx, method
in enumerate(methods):
220 future = futures[idx]
221 if not future.done():
222 method(callb=partial(_callback, future=future))
224 _, pending = await asyncio.wait(futures, timeout=timeout_per_attempt)
228 results: list[Message] = []
229 failed: list[str] = []
230 for idx, future
in enumerate(futures):
231 if not future.done()
or not (result := future.result()):
232 method = methods[idx]
233 failed.append(
str(getattr(method,
"__name__", method)))
235 results.append(result)
238 failed_methods =
", ".join(failed)
239 raise TimeoutError(f
"{failed_methods} timed out after {attempts} attempts")
ConfigEntry|None async_get_legacy_entry(HomeAssistant hass)
bool _off_by_one_mac(str firmware)
list[Message] async_multi_execute_lifx_with_retries(list[Callable] methods, int attempts, int overall_timeout)
Message async_execute_lifx(Callable method)
int convert_16_to_8(int value)
bool async_entry_is_legacy(ConfigEntry entry)
list[float|int|None] merge_hsbk(list[float|int|None] base, list[float|int|None] change)
list[float|int|None]|None find_hsbk(HomeAssistant hass, **Any kwargs)
bool mac_matches_serial_number(str mac_addr, str serial_number)
str get_real_mac_addr(str mac_addr, str firmware)
str|None infrared_brightness_value_to_option(int value)
int|None infrared_brightness_option_to_value(str option)
str _get_mac_offset(str mac_addr, int offset)
int convert_8_to_16(int value)
str formatted_serial(str serial_number)
dict[str, Any] lifx_features(Light bulb)