Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Home Assistant wrapper for a pyWeMo device."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from dataclasses import dataclass, fields
7 from datetime import timedelta
8 from functools import partial
9 import logging
10 from typing import TYPE_CHECKING, Literal
11 
12 from pywemo import Insight, LongPressMixin, WeMoDevice
13 from pywemo.exceptions import ActionException, PyWeMoException
14 from pywemo.subscribe import EVENT_TYPE_LONG_PRESS, SubscriptionRegistry
15 
16 from homeassistant.config_entries import ConfigEntry
17 from homeassistant.const import (
18  ATTR_CONFIGURATION_URL,
19  ATTR_IDENTIFIERS,
20  CONF_DEVICE_ID,
21  CONF_NAME,
22  CONF_PARAMS,
23  CONF_TYPE,
24  CONF_UNIQUE_ID,
25 )
26 from homeassistant.core import HomeAssistant, callback
27 from homeassistant.helpers import device_registry as dr
28 from homeassistant.helpers.device_registry import CONNECTION_UPNP, DeviceInfo
29 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
30 
31 from .const import DOMAIN, WEMO_SUBSCRIPTION_EVENT
32 from .models import async_wemo_data
33 
34 _LOGGER = logging.getLogger(__name__)
35 
36 # Literal values must match options.error keys from strings.json.
37 type ErrorStringKey = Literal["long_press_requires_subscription"]
38 # Literal values must match options.step.init.data keys from strings.json.
39 type OptionsFieldKey = Literal["enable_subscription", "enable_long_press"]
40 
41 
42 class OptionsValidationError(Exception):
43  """Error validating options."""
44 
45  def __init__(
46  self, field_key: OptionsFieldKey, error_key: ErrorStringKey, message: str
47  ) -> None:
48  """Store field and error_key so the exception handler can used them.
49 
50  The field_key and error_key strings must be the same as in strings.json.
51 
52  Args:
53  field_key: Name of the options.step.init.data key that corresponds to this error.
54  field_key must also match one of the field names inside the Options class.
55  error_key: Name of the options.error key that corresponds to this error.
56  message: Message for the Exception class.
57 
58  """
59  super().__init__(message)
60  self.field_keyfield_key = field_key
61  self.error_keyerror_key = error_key
62 
63 
64 @dataclass(frozen=True)
65 class Options:
66  """Configuration options for the DeviceCoordinator class.
67 
68  Note: The field names must match the keys (OptionsFieldKey)
69  from options.step.init.data in strings.json.
70  """
71 
72  # Subscribe to device local push updates.
73  enable_subscription: bool = True
74 
75  # Register for device long-press events.
76  enable_long_press: bool = True
77 
78  def __post_init__(self) -> None:
79  """Validate parameters."""
80  if not self.enable_subscription and self.enable_long_press:
82  "enable_subscription",
83  "long_press_requires_subscription",
84  "Local push update subscriptions must be enabled to use long-press events",
85  )
86 
87 
89  """Home Assistant wrapper for a pyWeMo device."""
90 
91  options: Options | None = None
92 
93  def __init__(self, hass: HomeAssistant, wemo: WeMoDevice) -> None:
94  """Initialize DeviceCoordinator."""
95  super().__init__(
96  hass,
97  _LOGGER,
98  name=wemo.name,
99  update_interval=timedelta(seconds=30),
100  )
101  self.hasshasshass = hass
102  self.wemowemo = wemo
103  self.device_iddevice_id: str | None = None
104  self.device_infodevice_info = _create_device_info(wemo)
105  self.supports_long_presssupports_long_press = isinstance(wemo, LongPressMixin)
106  self.update_lockupdate_lock = asyncio.Lock()
107 
108  @callback
109  def async_setup(self, device_id: str) -> None:
110  """Set up the device coordinator."""
111  self.device_iddevice_id = device_id
112 
114  self, _device: WeMoDevice, event_type: str, params: str
115  ) -> None:
116  """Receives push notifications from WeMo devices."""
117  _LOGGER.debug("Subscription event (%s) for %s", event_type, self.wemowemo.name)
118  if event_type == EVENT_TYPE_LONG_PRESS:
119  self.hasshasshass.bus.fire(
120  WEMO_SUBSCRIPTION_EVENT,
121  {
122  CONF_DEVICE_ID: self.device_iddevice_id,
123  CONF_NAME: self.wemowemo.name,
124  CONF_TYPE: event_type,
125  CONF_PARAMS: params,
126  CONF_UNIQUE_ID: self.wemowemo.serial_number,
127  },
128  )
129  else:
130  updated = self.wemowemo.subscription_update(event_type, params)
131  self.hasshasshass.loop.call_soon_threadsafe(
132  partial(
133  self.hasshasshass.async_create_background_task,
134  self._async_subscription_callback_async_subscription_callback(updated),
135  f"{self.name} subscription_callback",
136  eager_start=True,
137  )
138  )
139 
140  async def async_shutdown(self) -> None:
141  """Unregister push subscriptions and remove from coordinators dict."""
142  if self._shutdown_requested_shutdown_requested:
143  return
144  await super().async_shutdown()
145  if TYPE_CHECKING:
146  # mypy doesn't known that the device_id is set in async_setup.
147  assert self.device_iddevice_id is not None
148  del _async_coordinators(self.hasshasshass)[self.device_iddevice_id]
149  assert self.optionsoptions # Always set by async_register_device.
150  if self.optionsoptions.enable_subscription:
151  await self._async_set_enable_subscription_async_set_enable_subscription(False)
152  # Check that the device is available (last_update_success) before disabling long
153  # press. That avoids long shutdown times for devices that are no longer connected.
154  if self.optionsoptions.enable_long_press and self.last_update_successlast_update_successlast_update_success:
155  await self._async_set_enable_long_press_async_set_enable_long_press(False)
156 
157  async def _async_set_enable_subscription(self, enable_subscription: bool) -> None:
158  """Turn on/off push updates from the device."""
159  registry = _async_registry(self.hasshasshass)
160  if enable_subscription:
161  registry.on(self.wemowemo, None, self.subscription_callbacksubscription_callback)
162  await self.hasshasshass.async_add_executor_job(registry.register, self.wemowemo)
163  elif self.optionsoptions is not None:
164  await self.hasshasshass.async_add_executor_job(registry.unregister, self.wemowemo)
165 
166  async def _async_set_enable_long_press(self, enable_long_press: bool) -> None:
167  """Turn on/off long-press events from the device."""
168  if not (isinstance(self.wemowemo, LongPressMixin) and self.supports_long_presssupports_long_press):
169  return
170  try:
171  if enable_long_press:
172  await self.hasshasshass.async_add_executor_job(
173  self.wemowemo.ensure_long_press_virtual_device
174  )
175  elif self.optionsoptions is not None:
176  await self.hasshasshass.async_add_executor_job(
177  self.wemowemo.remove_long_press_virtual_device
178  )
179  except PyWeMoException:
180  _LOGGER.exception(
181  "Failed to enable long press support for device: %s", self.wemowemo.name
182  )
183  self.supports_long_presssupports_long_press = False
184 
185  async def async_set_options(
186  self, hass: HomeAssistant, config_entry: ConfigEntry
187  ) -> None:
188  """Update the configuration options for the device."""
189  options = Options(**config_entry.options)
190  _LOGGER.debug(
191  "async_set_options old(%s) new(%s)", repr(self.optionsoptions), repr(options)
192  )
193  for field in fields(options):
194  new_value = getattr(options, field.name)
195  if self.optionsoptions is None or getattr(self.optionsoptions, field.name) != new_value:
196  # The value changed, call the _async_set_* method for the option.
197  await getattr(self, f"_async_set_{field.name}")(new_value)
198  self.optionsoptions = options
199 
200  async def _async_subscription_callback(self, updated: bool) -> None:
201  """Update the state by the Wemo device."""
202  # If an update is in progress, we don't do anything.
203  if self.update_lockupdate_lock.locked():
204  return
205  try:
206  await self._async_locked_update_async_locked_update(not updated)
207  except UpdateFailed as err:
208  self.last_exceptionlast_exceptionlast_exception = err
209  if self.last_update_successlast_update_successlast_update_success:
210  _LOGGER.exception("Subscription callback failed")
211  self.last_update_successlast_update_successlast_update_success = False
212  except Exception as err:
213  self.last_exceptionlast_exceptionlast_exception = err
214  self.last_update_successlast_update_successlast_update_success = False
215  _LOGGER.exception("Unexpected error fetching %s data", self.namename)
216  else:
217  self.async_set_updated_dataasync_set_updated_data(None)
218 
219  @property
220  def should_poll(self) -> bool:
221  """Return True if polling is needed to update the state for the device.
222 
223  The alternative, when this returns False, is to rely on the subscription
224  "push updates" to update the device state in Home Assistant.
225  """
226  if isinstance(self.wemowemo, Insight) and self.wemowemo.get_state() == 0:
227  # The WeMo Insight device does not send subscription updates for the
228  # insight_params values when the device is off. Polling is required in
229  # this case so the Sensor entities are properly populated.
230  return True
231 
232  return not (
233  _async_registry(self.hasshasshass).is_subscribed(self.wemowemo)
234  and self.last_update_successlast_update_successlast_update_success
235  )
236 
237  async def _async_update_data(self) -> None:
238  """Update WeMo state."""
239  # No need to poll if the device will push updates.
240  # The device_id will not be set until after the first
241  # update so we should not check should_poll until after
242  # the device_id is set.
243  if self.device_iddevice_id and not self.should_pollshould_poll:
244  return
245 
246  # If an update is in progress, we don't do anything.
247  if self.update_lockupdate_lock.locked():
248  return
249 
250  await self._async_locked_update_async_locked_update(True)
251 
252  async def _async_locked_update(self, force_update: bool) -> None:
253  """Try updating within an async lock."""
254  async with self.update_lockupdate_lock:
255  try:
256  await self.hasshasshass.async_add_executor_job(
257  self.wemowemo.get_state, force_update
258  )
259  except ActionException as err:
260  raise UpdateFailed("WeMo update failed") from err
261 
262 
263 def _create_device_info(wemo: WeMoDevice) -> DeviceInfo:
264  """Create device information. Modify if special device."""
265  _dev_info = _device_info(wemo)
266  if wemo.model_name.lower() == "dli emulated belkin socket":
267  _dev_info[ATTR_CONFIGURATION_URL] = f"http://{wemo.host}"
268  _dev_info[ATTR_IDENTIFIERS] = {(DOMAIN, wemo.serial_number[:-1])}
269  return _dev_info
270 
271 
272 def _device_info(wemo: WeMoDevice) -> DeviceInfo:
273  return DeviceInfo(
274  connections={(CONNECTION_UPNP, wemo.udn)},
275  identifiers={(DOMAIN, wemo.serial_number)},
276  manufacturer="Belkin",
277  model=wemo.model_name,
278  model_id=wemo.model,
279  name=wemo.name,
280  sw_version=wemo.firmware_version,
281  )
282 
283 
285  hass: HomeAssistant, config_entry: ConfigEntry, wemo: WeMoDevice
286 ) -> DeviceCoordinator:
287  """Register a device with home assistant and enable pywemo event callbacks."""
288  device = DeviceCoordinator(hass, wemo)
289  await device.async_refresh()
290  if not device.last_update_success and device.last_exception:
291  raise device.last_exception
292  device_registry = dr.async_get(hass)
293  entry = device_registry.async_get_or_create(
294  config_entry_id=config_entry.entry_id, **_create_device_info(wemo)
295  )
296  device.async_setup(device_id=entry.id)
297  _async_coordinators(hass)[entry.id] = device
298 
299  config_entry.async_on_unload(
300  config_entry.add_update_listener(device.async_set_options)
301  )
302  await device.async_set_options(hass, config_entry)
303 
304  return device
305 
306 
307 @callback
308 def async_get_coordinator(hass: HomeAssistant, device_id: str) -> DeviceCoordinator:
309  """Return DeviceCoordinator for device_id."""
310  return _async_coordinators(hass)[device_id]
311 
312 
313 @callback
314 def _async_coordinators(hass: HomeAssistant) -> dict[str, DeviceCoordinator]:
315  return async_wemo_data(hass).config_entry_data.device_coordinators
316 
317 
318 @callback
319 def _async_registry(hass: HomeAssistant) -> SubscriptionRegistry:
320  return async_wemo_data(hass).registry
None _async_set_enable_long_press(self, bool enable_long_press)
Definition: coordinator.py:166
None _async_set_enable_subscription(self, bool enable_subscription)
Definition: coordinator.py:157
None __init__(self, HomeAssistant hass, WeMoDevice wemo)
Definition: coordinator.py:93
None async_set_options(self, HomeAssistant hass, ConfigEntry config_entry)
Definition: coordinator.py:187
None subscription_callback(self, WeMoDevice _device, str event_type, str params)
Definition: coordinator.py:115
None __init__(self, OptionsFieldKey field_key, ErrorStringKey error_key, str message)
Definition: coordinator.py:47
str|float get_state(dict[str, float] data, str key)
Definition: sensor.py:26
DeviceInfo _device_info(WeMoDevice wemo)
Definition: coordinator.py:272
dict[str, DeviceCoordinator] _async_coordinators(HomeAssistant hass)
Definition: coordinator.py:314
DeviceCoordinator async_get_coordinator(HomeAssistant hass, str device_id)
Definition: coordinator.py:308
DeviceCoordinator async_register_device(HomeAssistant hass, ConfigEntry config_entry, WeMoDevice wemo)
Definition: coordinator.py:286
SubscriptionRegistry _async_registry(HomeAssistant hass)
Definition: coordinator.py:319
DeviceInfo _create_device_info(WeMoDevice wemo)
Definition: coordinator.py:263
WemoData async_wemo_data(HomeAssistant hass)
Definition: models.py:43