Home Assistant Unofficial Reference 2024.12.1
util.py
Go to the documentation of this file.
1 """Support for LIFX."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from functools import partial
8 from typing import Any
9 
10 from aiolifx import products
11 from aiolifx.aiolifx import Light
12 from aiolifx.message import Message
13 from awesomeversion import AwesomeVersion
14 
16  ATTR_BRIGHTNESS,
17  ATTR_BRIGHTNESS_PCT,
18  ATTR_COLOR_NAME,
19  ATTR_COLOR_TEMP,
20  ATTR_COLOR_TEMP_KELVIN,
21  ATTR_HS_COLOR,
22  ATTR_KELVIN,
23  ATTR_RGB_COLOR,
24  ATTR_XY_COLOR,
25 )
26 from homeassistant.config_entries import ConfigEntry
27 from homeassistant.core import HomeAssistant, callback
28 from homeassistant.helpers import device_registry as dr
29 import homeassistant.util.color as color_util
30 
31 from .const import (
32  _LOGGER,
33  DEFAULT_ATTEMPTS,
34  DOMAIN,
35  INFRARED_BRIGHTNESS_VALUES_MAP,
36  OVERALL_TIMEOUT,
37 )
38 
39 FIX_MAC_FW = AwesomeVersion("3.70")
40 
41 
42 @callback
43 def async_entry_is_legacy(entry: ConfigEntry) -> bool:
44  """Check if a config entry is the legacy shared one."""
45  return entry.unique_id is None or entry.unique_id == DOMAIN
46 
47 
48 @callback
49 def async_get_legacy_entry(hass: HomeAssistant) -> ConfigEntry | None:
50  """Get the legacy config entry."""
51  for entry in hass.config_entries.async_entries(DOMAIN):
52  if async_entry_is_legacy(entry):
53  return entry
54  return None
55 
56 
57 def infrared_brightness_value_to_option(value: int) -> str | None:
58  """Convert infrared brightness from value to option."""
59  return INFRARED_BRIGHTNESS_VALUES_MAP.get(value, None)
60 
61 
62 def infrared_brightness_option_to_value(option: str) -> int | None:
63  """Convert infrared brightness option to value."""
64  option_values = {v: k for k, v in INFRARED_BRIGHTNESS_VALUES_MAP.items()}
65  return option_values.get(option)
66 
67 
68 def convert_8_to_16(value: int) -> int:
69  """Scale an 8 bit level into 16 bits."""
70  return (value << 8) | value
71 
72 
73 def convert_16_to_8(value: int) -> int:
74  """Scale a 16 bit level into 8 bits."""
75  return value >> 8
76 
77 
78 def lifx_features(bulb: Light) -> dict[str, Any]:
79  """Return a feature map for this bulb, or a default map if unknown."""
80  features: dict[str, Any] = (
81  products.features_map.get(bulb.product) or products.features_map[1]
82  )
83  return features
84 
85 
86 def find_hsbk(hass: HomeAssistant, **kwargs: Any) -> list[float | int | None] | None:
87  """Find the desired color from a number of possible inputs.
88 
89  Hue, Saturation, Brightness, Kelvin
90  """
91  hue, saturation, brightness, kelvin = [None] * 4
92 
93  if (color_name := kwargs.get(ATTR_COLOR_NAME)) is not None:
94  try:
95  hue, saturation = color_util.color_RGB_to_hs(
96  *color_util.color_name_to_rgb(color_name)
97  )
98  except ValueError:
99  _LOGGER.warning(
100  "Got unknown color %s, falling back to neutral white", color_name
101  )
102  hue, saturation = (0, 0)
103 
104  if ATTR_HS_COLOR in kwargs:
105  hue, saturation = kwargs[ATTR_HS_COLOR]
106  elif ATTR_RGB_COLOR in kwargs:
107  hue, saturation = color_util.color_RGB_to_hs(*kwargs[ATTR_RGB_COLOR])
108  elif ATTR_XY_COLOR in kwargs:
109  hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR])
110 
111  if hue is not None:
112  assert saturation is not None
113  hue = int(hue / 360 * 65535)
114  saturation = int(saturation / 100 * 65535)
115  kelvin = 3500
116 
117  if ATTR_KELVIN in kwargs:
118  _LOGGER.warning(
119  "The 'kelvin' parameter is deprecated. Please use 'color_temp_kelvin' for"
120  " all service calls"
121  )
122  kelvin = kwargs.pop(ATTR_KELVIN)
123  saturation = 0
124 
125  if ATTR_COLOR_TEMP in kwargs:
126  kelvin = color_util.color_temperature_mired_to_kelvin(
127  kwargs.pop(ATTR_COLOR_TEMP)
128  )
129  saturation = 0
130 
131  if ATTR_COLOR_TEMP_KELVIN in kwargs:
132  kelvin = kwargs.pop(ATTR_COLOR_TEMP_KELVIN)
133  saturation = 0
134 
135  if ATTR_BRIGHTNESS in kwargs:
136  brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS])
137 
138  if ATTR_BRIGHTNESS_PCT in kwargs:
139  brightness = convert_8_to_16(round(255 * kwargs[ATTR_BRIGHTNESS_PCT] / 100))
140 
141  hsbk = [hue, saturation, brightness, kelvin]
142  return None if hsbk == [None] * 4 else hsbk
143 
144 
146  base: list[float | int | None], change: list[float | int | None]
147 ) -> list[float | int | None]:
148  """Copy change on top of base, except when None.
149 
150  Hue, Saturation, Brightness, Kelvin
151  """
152  return [b if c is None else c for b, c in zip(base, change, strict=False)]
153 
154 
155 def _get_mac_offset(mac_addr: str, offset: int) -> str:
156  octets = [int(octet, 16) for octet in mac_addr.split(":")]
157  octets[5] = (octets[5] + offset) % 256
158  return ":".join(f"{octet:02x}" for octet in octets)
159 
160 
161 def _off_by_one_mac(firmware: str) -> bool:
162  """Check if the firmware version has the off by one mac."""
163  return bool(firmware and AwesomeVersion(firmware) >= FIX_MAC_FW)
164 
165 
166 def get_real_mac_addr(mac_addr: str, firmware: str) -> str:
167  """Increment the last byte of the mac address by one for FW>3.70."""
168  return _get_mac_offset(mac_addr, 1) if _off_by_one_mac(firmware) else mac_addr
169 
170 
171 def formatted_serial(serial_number: str) -> str:
172  """Format the serial number to match the HA device registry."""
173  return dr.format_mac(serial_number)
174 
175 
176 def mac_matches_serial_number(mac_addr: str, serial_number: str) -> bool:
177  """Check if a mac address matches the serial number."""
178  formatted_mac = dr.format_mac(mac_addr)
179  return bool(
180  formatted_serial(serial_number) == formatted_mac
181  or _get_mac_offset(serial_number, 1) == formatted_mac
182  )
183 
184 
185 async def async_execute_lifx(method: Callable) -> Message:
186  """Execute a lifx callback method and wait for a response."""
187  return (
189  [method], DEFAULT_ATTEMPTS, OVERALL_TIMEOUT
190  )
191  )[0]
192 
193 
195  methods: list[Callable], attempts: int, overall_timeout: int
196 ) -> list[Message]:
197  """Execute multiple lifx callback methods with retries and wait for a response.
198 
199  This functional will the overall timeout by the number of attempts and
200  wait for each method to return a result. If we don't get a result
201  within the split timeout, we will send all methods that did not generate
202  a response again.
203 
204  If we don't get a result after all attempts, we will raise an
205  TimeoutError exception.
206  """
207  loop = asyncio.get_running_loop()
208  futures: list[asyncio.Future] = [loop.create_future() for _ in methods]
209 
210  def _callback(
211  bulb: Light, message: Message | None, future: asyncio.Future[Message]
212  ) -> None:
213  if message and not future.done():
214  future.set_result(message)
215 
216  timeout_per_attempt = overall_timeout / attempts
217 
218  for _ in range(attempts):
219  for idx, method in enumerate(methods):
220  future = futures[idx]
221  if not future.done():
222  method(callb=partial(_callback, future=future))
223 
224  _, pending = await asyncio.wait(futures, timeout=timeout_per_attempt)
225  if not pending:
226  break
227 
228  results: list[Message] = []
229  failed: list[str] = []
230  for idx, future in enumerate(futures):
231  if not future.done() or not (result := future.result()):
232  method = methods[idx]
233  failed.append(str(getattr(method, "__name__", method)))
234  else:
235  results.append(result)
236 
237  if failed:
238  failed_methods = ", ".join(failed)
239  raise TimeoutError(f"{failed_methods} timed out after {attempts} attempts")
240 
241  return results
ConfigEntry|None async_get_legacy_entry(HomeAssistant hass)
Definition: util.py:49
bool _off_by_one_mac(str firmware)
Definition: util.py:161
list[Message] async_multi_execute_lifx_with_retries(list[Callable] methods, int attempts, int overall_timeout)
Definition: util.py:196
Message async_execute_lifx(Callable method)
Definition: util.py:185
int convert_16_to_8(int value)
Definition: util.py:73
bool async_entry_is_legacy(ConfigEntry entry)
Definition: util.py:43
list[float|int|None] merge_hsbk(list[float|int|None] base, list[float|int|None] change)
Definition: util.py:147
list[float|int|None]|None find_hsbk(HomeAssistant hass, **Any kwargs)
Definition: util.py:86
bool mac_matches_serial_number(str mac_addr, str serial_number)
Definition: util.py:176
str get_real_mac_addr(str mac_addr, str firmware)
Definition: util.py:166
str|None infrared_brightness_value_to_option(int value)
Definition: util.py:57
int|None infrared_brightness_option_to_value(str option)
Definition: util.py:62
str _get_mac_offset(str mac_addr, int offset)
Definition: util.py:155
int convert_8_to_16(int value)
Definition: util.py:68
str formatted_serial(str serial_number)
Definition: util.py:171
dict[str, Any] lifx_features(Light bulb)
Definition: util.py:78