Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinators for the Shelly integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Coroutine
7 from dataclasses import dataclass
8 from datetime import timedelta
9 from typing import Any, cast
10 
11 from aioshelly.ble import async_ensure_ble_enabled, async_stop_scanner
12 from aioshelly.block_device import BlockDevice, BlockUpdateType
13 from aioshelly.const import MODEL_NAMES, MODEL_VALVE
14 from aioshelly.exceptions import (
15  DeviceConnectionError,
16  InvalidAuthError,
17  MacAddressMismatchError,
18  RpcCallError,
19 )
20 from aioshelly.rpc_device import RpcDevice, RpcUpdateType
21 from propcache import cached_property
22 
23 from homeassistant.config_entries import ConfigEntry, ConfigEntryState
24 from homeassistant.const import (
25  ATTR_DEVICE_ID,
26  CONF_HOST,
27  EVENT_HOMEASSISTANT_STOP,
28  Platform,
29 )
30 from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
31 from homeassistant.helpers import device_registry as dr, issue_registry as ir
32 from homeassistant.helpers.debounce import Debouncer
33 from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
34 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
35 
36 from .bluetooth import async_connect_scanner
37 from .const import (
38  ATTR_CHANNEL,
39  ATTR_CLICK_TYPE,
40  ATTR_DEVICE,
41  ATTR_GENERATION,
42  BATTERY_DEVICES_WITH_PERMANENT_CONNECTION,
43  CONF_BLE_SCANNER_MODE,
44  CONF_SLEEP_PERIOD,
45  DOMAIN,
46  DUAL_MODE_LIGHT_MODELS,
47  ENTRY_RELOAD_COOLDOWN,
48  EVENT_SHELLY_CLICK,
49  INPUTS_EVENTS_DICT,
50  LOGGER,
51  MAX_PUSH_UPDATE_FAILURES,
52  MODELS_SUPPORTING_LIGHT_EFFECTS,
53  OTA_BEGIN,
54  OTA_ERROR,
55  OTA_PROGRESS,
56  OTA_SUCCESS,
57  PUSH_UPDATE_ISSUE_ID,
58  REST_SENSORS_UPDATE_INTERVAL,
59  RPC_INPUTS_EVENTS_TYPES,
60  RPC_RECONNECT_INTERVAL,
61  RPC_SENSORS_POLLING_INTERVAL,
62  SHBTN_MODELS,
63  UPDATE_PERIOD_MULTIPLIER,
64  BLEScannerMode,
65 )
66 from .utils import (
67  async_create_issue_unsupported_firmware,
68  get_block_device_sleep_period,
69  get_device_entry_gen,
70  get_host,
71  get_http_port,
72  get_rpc_device_wakeup_period,
73  get_rpc_ws_url,
74  update_device_fw_info,
75 )
76 
77 
78 @dataclass
80  """Class for sharing data within a given config entry."""
81 
82  platforms: list[Platform]
83  block: ShellyBlockCoordinator | None = None
84  rest: ShellyRestCoordinator | None = None
85  rpc: ShellyRpcCoordinator | None = None
86  rpc_poll: ShellyRpcPollingCoordinator | None = None
87 
88 
89 type ShellyConfigEntry = ConfigEntry[ShellyEntryData]
90 
91 
92 class ShellyCoordinatorBase[_DeviceT: BlockDevice | RpcDevice](
93  DataUpdateCoordinator[None]
94 ):
95  """Coordinator for a Shelly device."""
96 
97  def __init__(
98  self,
99  hass: HomeAssistant,
100  entry: ShellyConfigEntry,
101  device: _DeviceT,
102  update_interval: float,
103  ) -> None:
104  """Initialize the Shelly device coordinator."""
105  self.entry = entry
106  self.device = device
107  self.device_id: str | None = None
108  self._pending_platforms: list[Platform] | None = None
109  device_name = device.name if device.initialized else entry.title
110  interval_td = timedelta(seconds=update_interval)
111  # The device has come online at least once. In the case of a sleeping RPC
112  # device, this means that the device has connected to the WS server at least once.
113  self._came_online_once = False
114  super().__init__(hass, LOGGER, name=device_name, update_interval=interval_td)
115 
116  self._debounced_reload: Debouncer[Coroutine[Any, Any, None]] = Debouncer(
117  hass,
118  LOGGER,
119  cooldown=ENTRY_RELOAD_COOLDOWN,
120  immediate=False,
121  function=self._async_reload_entry,
122  )
123  entry.async_on_unload(self._debounced_reload.async_shutdown)
124 
125  entry.async_on_unload(
126  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
127  )
128 
129  @cached_property
130  def model(self) -> str:
131  """Model of the device."""
132  return cast(str, self.entry.data["model"])
133 
134  @cached_property
135  def mac(self) -> str:
136  """Mac address of the device."""
137  return cast(str, self.entry.unique_id)
138 
139  @property
140  def sw_version(self) -> str:
141  """Firmware version of the device."""
142  return self.device.firmware_version if self.device.initialized else ""
143 
144  @property
145  def sleep_period(self) -> int:
146  """Sleep period of the device."""
147  return self.entry.data.get(CONF_SLEEP_PERIOD, 0)
148 
149  def async_setup(self, pending_platforms: list[Platform] | None = None) -> None:
150  """Set up the coordinator."""
151  self._pending_platforms = pending_platforms
152  dev_reg = dr.async_get(self.hass)
153  device_entry = dev_reg.async_get_or_create(
154  config_entry_id=self.entry.entry_id,
155  name=self.name,
156  connections={(CONNECTION_NETWORK_MAC, self.mac)},
157  manufacturer="Shelly",
158  model=MODEL_NAMES.get(self.model),
159  model_id=self.model,
160  sw_version=self.sw_version,
161  hw_version=f"gen{get_device_entry_gen(self.entry)}",
162  configuration_url=f"http://{get_host(self.entry.data[CONF_HOST])}:{get_http_port(self.entry.data)}",
163  )
164  self.device_id = device_entry.id
165 
166  async def shutdown(self) -> None:
167  """Shutdown the coordinator."""
168  await self.device.shutdown()
169 
170  async def _handle_ha_stop(self, _event: Event) -> None:
171  """Handle Home Assistant stopping."""
172  LOGGER.debug("Stopping RPC device coordinator for %s", self.name)
173  await self.shutdown()
174 
175  async def _async_device_connect_task(self) -> bool:
176  """Connect to a Shelly device task."""
177  LOGGER.debug("Connecting to Shelly Device - %s", self.name)
178  try:
179  await self.device.initialize()
180  update_device_fw_info(self.hass, self.device, self.entry)
181  except (DeviceConnectionError, MacAddressMismatchError) as err:
182  LOGGER.debug(
183  "Error connecting to Shelly device %s, error: %r", self.name, err
184  )
185  return False
186  except InvalidAuthError:
187  self.entry.async_start_reauth(self.hass)
188  return False
189 
190  if not self.device.firmware_supported:
191  async_create_issue_unsupported_firmware(self.hass, self.entry)
192  return False
193 
194  if not self._pending_platforms:
195  return True
196 
197  LOGGER.debug("Device %s is online, resuming setup", self.name)
198  platforms = self._pending_platforms
199  self._pending_platforms = None
200 
201  data = {**self.entry.data}
202 
203  # Update sleep_period
204  old_sleep_period = data[CONF_SLEEP_PERIOD]
205  if isinstance(self.device, RpcDevice):
206  new_sleep_period = get_rpc_device_wakeup_period(self.device.status)
207  elif isinstance(self.device, BlockDevice):
208  new_sleep_period = get_block_device_sleep_period(self.device.settings)
209 
210  if new_sleep_period != old_sleep_period:
211  data[CONF_SLEEP_PERIOD] = new_sleep_period
212  self.hass.config_entries.async_update_entry(self.entry, data=data)
213 
214  # Resume platform setup
215  await self.hass.config_entries.async_forward_entry_setups(self.entry, platforms)
216 
217  return True
218 
219  async def _async_reload_entry(self) -> None:
220  """Reload entry."""
221  self._debounced_reload.async_cancel()
222  LOGGER.debug("Reloading entry %s", self.name)
223  await self.hass.config_entries.async_reload(self.entry.entry_id)
224 
225  async def async_shutdown_device_and_start_reauth(self) -> None:
226  """Shutdown Shelly device and start reauth flow."""
227  # not running disconnect events since we have auth error
228  # and won't be able to send commands to the device
229  self.last_update_success = False
230  await self.shutdown()
231  self.entry.async_start_reauth(self.hass)
232 
233 
235  """Coordinator for a Shelly block based device."""
236 
237  def __init__(
238  self, hass: HomeAssistant, entry: ShellyConfigEntry, device: BlockDevice
239  ) -> None:
240  """Initialize the Shelly block device coordinator."""
241  self.entryentry = entry
242  if self.sleep_period:
243  update_interval = UPDATE_PERIOD_MULTIPLIER * self.sleep_period
244  else:
245  update_interval = (
246  UPDATE_PERIOD_MULTIPLIER * device.settings["coiot"]["update_period"]
247  )
248  super().__init__(hass, entry, device, update_interval)
249 
250  self._last_cfg_changed_last_cfg_changed: int | None = None
251  self._last_mode_last_mode: str | None = None
252  self._last_effect_last_effect: str | None = None
253  self._last_input_events_count: dict = {}
254  self._last_target_temp: float | None = None
255  self._push_update_failures_push_update_failures: int = 0
256  self._input_event_listeners: list[Callable[[dict[str, Any]], None]] = []
257 
258  entry.async_on_unload(
259  self.async_add_listener(self._async_device_updates_handler_async_device_updates_handler)
260  )
261 
262  @callback
264  self, input_event_callback: Callable[[dict[str, Any]], None]
265  ) -> CALLBACK_TYPE:
266  """Subscribe to input events."""
267 
268  def _unsubscribe() -> None:
269  self._input_event_listeners.remove(input_event_callback)
270 
271  self._input_event_listeners.append(input_event_callback)
272 
273  return _unsubscribe
274 
275  @callback
276  def _async_device_updates_handler(self) -> None:
277  """Handle device updates."""
278  if not self.device.initialized:
279  return
280 
281  # For buttons which are battery powered - set initial value for last_event_count
282  if self.modelmodel in SHBTN_MODELS and self._last_input_events_count.get(1) is None:
283  for block in self.device.blocks:
284  if block.type != "device":
285  continue
286 
287  wakeup_event = cast(list, block.wakeupEvent)
288  if len(wakeup_event) == 1 and wakeup_event[0] == "button":
289  self._last_input_events_count[1] = -1
290 
291  break
292 
293  # Check for input events and config change
294  cfg_changed = 0
295  for block in self.device.blocks:
296  if block.type == "device" and block.cfgChanged is not None:
297  cfg_changed = cast(int, block.cfgChanged)
298 
299  # Shelly TRV sends information about changing the configuration for no
300  # reason, reloading the config entry is not needed for it.
301  if self.modelmodel == MODEL_VALVE:
302  self._last_cfg_changed_last_cfg_changed = None
303 
304  # For dual mode bulbs ignore change if it is due to mode/effect change
305  if self.modelmodel in DUAL_MODE_LIGHT_MODELS:
306  if "mode" in block.sensor_ids:
307  if self._last_mode_last_mode != block.mode:
308  self._last_cfg_changed_last_cfg_changed = None
309  self._last_mode_last_mode = block.mode
310 
311  if self.modelmodel in MODELS_SUPPORTING_LIGHT_EFFECTS:
312  if "effect" in block.sensor_ids:
313  if self._last_effect_last_effect != block.effect:
314  self._last_cfg_changed_last_cfg_changed = None
315  self._last_effect_last_effect = block.effect
316 
317  if (
318  "inputEvent" not in block.sensor_ids
319  or "inputEventCnt" not in block.sensor_ids
320  ):
321  LOGGER.debug("Skipping non-input event block %s", block.description)
322  continue
323 
324  channel = int(block.channel or 0) + 1
325  event_type = block.inputEvent
326  last_event_count = self._last_input_events_count.get(channel)
327  self._last_input_events_count[channel] = block.inputEventCnt
328 
329  if (
330  last_event_count is None
331  or last_event_count == block.inputEventCnt
332  or event_type == ""
333  ):
334  LOGGER.debug("Skipping block event %s", event_type)
335  continue
336 
337  if event_type in INPUTS_EVENTS_DICT:
338  for event_callback in self._input_event_listeners:
339  event_callback(
340  {"channel": channel, "event": INPUTS_EVENTS_DICT[event_type]}
341  )
342  self.hass.bus.async_fire(
343  EVENT_SHELLY_CLICK,
344  {
345  ATTR_DEVICE_ID: self.device_id,
346  ATTR_DEVICE: self.device.settings["device"]["hostname"],
347  ATTR_CHANNEL: channel,
348  ATTR_CLICK_TYPE: INPUTS_EVENTS_DICT[event_type],
349  ATTR_GENERATION: 1,
350  },
351  )
352 
353  if self._last_cfg_changed_last_cfg_changed is not None and cfg_changed > self._last_cfg_changed_last_cfg_changed:
354  LOGGER.info(
355  "Config for %s changed, reloading entry in %s seconds",
356  self.name,
357  ENTRY_RELOAD_COOLDOWN,
358  )
359  self._debounced_reload.async_schedule_call()
360  self._last_cfg_changed_last_cfg_changed = cfg_changed
361 
362  async def _async_update_data(self) -> None:
363  """Fetch data."""
364  if self.sleep_period:
365  # Sleeping device, no point polling it, just mark it unavailable
366  raise UpdateFailed(
367  f"Sleeping device did not update within {self.sleep_period} seconds interval"
368  )
369 
370  LOGGER.debug("Polling Shelly Block Device - %s", self.name)
371  try:
372  await self.device.update()
373  except DeviceConnectionError as err:
374  raise UpdateFailed(f"Error fetching data: {err!r}") from err
375  except InvalidAuthError:
376  await self.async_shutdown_device_and_start_reauth()
377 
378  @callback
380  self, device_: BlockDevice, update_type: BlockUpdateType
381  ) -> None:
382  """Handle device update."""
383  LOGGER.debug("Shelly %s handle update, type: %s", self.name, update_type)
384  if update_type is BlockUpdateType.ONLINE:
385  self._came_online_once_came_online_once = True
386  self.entryentry.async_create_background_task(
387  self.hass,
388  self._async_device_connect_task(),
389  "block device online",
390  eager_start=True,
391  )
392  elif update_type is BlockUpdateType.COAP_PERIODIC:
393  if self._push_update_failures_push_update_failures >= MAX_PUSH_UPDATE_FAILURES:
394  ir.async_delete_issue(
395  self.hass,
396  DOMAIN,
397  PUSH_UPDATE_ISSUE_ID.format(unique=self.mac),
398  )
399  self._push_update_failures_push_update_failures = 0
400  elif update_type is BlockUpdateType.COAP_REPLY:
401  self._push_update_failures_push_update_failures += 1
402  if self._push_update_failures_push_update_failures == MAX_PUSH_UPDATE_FAILURES:
403  LOGGER.debug(
404  "Creating issue %s", PUSH_UPDATE_ISSUE_ID.format(unique=self.mac)
405  )
406  ir.async_create_issue(
407  self.hass,
408  DOMAIN,
409  PUSH_UPDATE_ISSUE_ID.format(unique=self.mac),
410  is_fixable=False,
411  is_persistent=False,
412  severity=ir.IssueSeverity.ERROR,
413  learn_more_url="https://www.home-assistant.io/integrations/shelly/#shelly-device-configuration-generation-1",
414  translation_key="push_update_failure",
415  translation_placeholders={
416  "device_name": self.entryentry.title,
417  "ip_address": self.device.ip_address,
418  },
419  )
420  if self._push_update_failures_push_update_failures:
421  LOGGER.debug(
422  "Push update failures for %s: %s", self.name, self._push_update_failures_push_update_failures
423  )
424  self.async_set_updated_data(None)
425 
426  def async_setup(self, pending_platforms: list[Platform] | None = None) -> None:
427  """Set up the coordinator."""
428  super().async_setup(pending_platforms)
429  self.device.subscribe_updates(self._async_handle_update_async_handle_update)
430 
431 
433  """Coordinator for a Shelly REST device."""
434 
435  def __init__(
436  self, hass: HomeAssistant, device: BlockDevice, entry: ShellyConfigEntry
437  ) -> None:
438  """Initialize the Shelly REST device coordinator."""
439  update_interval = REST_SENSORS_UPDATE_INTERVAL
440  if (
441  device.settings["device"]["type"]
442  in BATTERY_DEVICES_WITH_PERMANENT_CONNECTION
443  ):
444  update_interval = (
445  UPDATE_PERIOD_MULTIPLIER * device.settings["coiot"]["update_period"]
446  )
447  super().__init__(hass, entry, device, update_interval)
448 
449  async def _async_update_data(self) -> None:
450  """Fetch data."""
451  LOGGER.debug("REST update for %s", self.name)
452  try:
453  await self.device.update_status()
454 
455  if self.device.status["uptime"] > 2 * REST_SENSORS_UPDATE_INTERVAL:
456  return
457  await self.device.update_shelly()
458  except (DeviceConnectionError, MacAddressMismatchError) as err:
459  raise UpdateFailed(f"Error fetching data: {err!r}") from err
460  except InvalidAuthError:
461  await self.async_shutdown_device_and_start_reauth()
462  else:
463  update_device_fw_info(self.hass, self.device, self.entry)
464 
465 
467  """Coordinator for a Shelly RPC based device."""
468 
469  def __init__(
470  self, hass: HomeAssistant, entry: ShellyConfigEntry, device: RpcDevice
471  ) -> None:
472  """Initialize the Shelly RPC device coordinator."""
473  self.entryentry = entry
474  if self.sleep_period:
475  update_interval = UPDATE_PERIOD_MULTIPLIER * self.sleep_period
476  else:
477  update_interval = RPC_RECONNECT_INTERVAL
478  super().__init__(hass, entry, device, update_interval)
479 
480  self.connectedconnected = False
481  self._disconnected_callbacks: list[CALLBACK_TYPE] = []
482  self._connection_lock_connection_lock = asyncio.Lock()
483  self._event_listeners: list[Callable[[dict[str, Any]], None]] = []
484  self._ota_event_listeners: list[Callable[[dict[str, Any]], None]] = []
485  self._input_event_listeners: list[Callable[[dict[str, Any]], None]] = []
486  self._connect_task_connect_task: asyncio.Task | None = None
487  entry.async_on_unload(entry.add_update_listener(self._async_update_listener_async_update_listener))
488 
489  async def async_device_online(self, source: str) -> None:
490  """Handle device going online."""
491  if not self.sleep_period:
492  await self.async_request_refresh()
493  elif not self._came_online_once_came_online_once or not self.device.initialized:
494  LOGGER.debug(
495  "Sleepy device %s is online (source: %s), trying to poll and configure",
496  self.name,
497  source,
498  )
499  # Source told us the device is online, try to poll
500  # the device and if possible, set up the outbound
501  # websocket so the device will send us updates
502  # instead of relying on polling it fast enough before
503  # it goes to sleep again
504  self._async_handle_rpc_device_online_async_handle_rpc_device_online()
505 
506  def update_sleep_period(self) -> bool:
507  """Check device sleep period & update if changed."""
508  if (
509  not self.device.initialized
510  or not (wakeup_period := get_rpc_device_wakeup_period(self.device.status))
511  or wakeup_period == self.sleep_period
512  ):
513  return False
514 
515  data = {**self.entryentry.data}
516  data[CONF_SLEEP_PERIOD] = wakeup_period
517  self.hass.config_entries.async_update_entry(self.entryentry, data=data)
518 
519  update_interval = UPDATE_PERIOD_MULTIPLIER * wakeup_period
520  self.update_intervalupdate_interval = timedelta(seconds=update_interval)
521 
522  return True
523 
524  @callback
526  self, ota_event_callback: Callable[[dict[str, Any]], None]
527  ) -> CALLBACK_TYPE:
528  """Subscribe to OTA events."""
529 
530  def _unsubscribe() -> None:
531  self._ota_event_listeners.remove(ota_event_callback)
532 
533  self._ota_event_listeners.append(ota_event_callback)
534 
535  return _unsubscribe
536 
537  @callback
539  self, input_event_callback: Callable[[dict[str, Any]], None]
540  ) -> CALLBACK_TYPE:
541  """Subscribe to input events."""
542 
543  def _unsubscribe() -> None:
544  self._input_event_listeners.remove(input_event_callback)
545 
546  self._input_event_listeners.append(input_event_callback)
547 
548  return _unsubscribe
549 
550  @callback
552  self, event_callback: Callable[[dict[str, Any]], None]
553  ) -> CALLBACK_TYPE:
554  """Subscribe to events."""
555 
556  def _unsubscribe() -> None:
557  self._event_listeners.remove(event_callback)
558 
559  self._event_listeners.append(event_callback)
560 
561  return _unsubscribe
562 
564  self, hass: HomeAssistant, entry: ShellyConfigEntry
565  ) -> None:
566  """Reconfigure on update."""
567  async with self._connection_lock_connection_lock:
568  if self.connectedconnected:
569  self._async_run_disconnected_events_async_run_disconnected_events()
570  await self._async_run_connected_events_async_run_connected_events()
571 
572  @callback
573  def _async_device_event_handler(self, event_data: dict[str, Any]) -> None:
574  """Handle device events."""
575  events: list[dict[str, Any]] = event_data["events"]
576  for event in events:
577  event_type = event.get("event")
578  if event_type is None:
579  continue
580 
581  for event_callback in self._event_listeners:
582  event_callback(event)
583 
584  if event_type in ("component_added", "component_removed", "config_changed"):
585  self.update_sleep_periodupdate_sleep_period()
586  LOGGER.info(
587  "Config for %s changed, reloading entry in %s seconds",
588  self.name,
589  ENTRY_RELOAD_COOLDOWN,
590  )
591  self._debounced_reload.async_schedule_call()
592  elif event_type in RPC_INPUTS_EVENTS_TYPES:
593  for event_callback in self._input_event_listeners:
594  event_callback(event)
595  self.hass.bus.async_fire(
596  EVENT_SHELLY_CLICK,
597  {
598  ATTR_DEVICE_ID: self.device_id,
599  ATTR_DEVICE: self.device.hostname,
600  ATTR_CHANNEL: event["id"] + 1,
601  ATTR_CLICK_TYPE: event["event"],
602  ATTR_GENERATION: 2,
603  },
604  )
605  elif event_type in (OTA_BEGIN, OTA_ERROR, OTA_PROGRESS, OTA_SUCCESS):
606  for event_callback in self._ota_event_listeners:
607  event_callback(event)
608 
609  async def _async_update_data(self) -> None:
610  """Fetch data."""
611  if self.update_sleep_periodupdate_sleep_period() or self.hass.is_stopping:
612  return
613 
614  if self.sleep_period:
615  # Sleeping device, no point polling it, just mark it unavailable
616  raise UpdateFailed(
617  f"Sleeping device did not update within {self.sleep_period} seconds interval"
618  )
619 
620  async with self._connection_lock_connection_lock:
621  if self.device.connected: # Already connected
622  return
623 
624  if not await self._async_device_connect_task():
625  raise UpdateFailed("Device reconnect error")
626 
627  async def _async_disconnected(self, reconnect: bool) -> None:
628  """Handle device disconnected."""
629  async with self._connection_lock_connection_lock:
630  if not self.connectedconnected: # Already disconnected
631  return
632  self.connectedconnected = False
633  # Sleeping devices send data and disconnect
634  # There are no disconnect events for sleeping devices
635  # but we do need to make sure self.connected is False
636  if self.sleep_period:
637  return
638  self._async_run_disconnected_events_async_run_disconnected_events()
639  # Try to reconnect right away if triggered by disconnect event
640  if reconnect:
641  await self.async_request_refresh()
642 
643  @callback
645  """Run disconnected events.
646 
647  This will be executed on disconnect or when the config entry
648  is updated.
649  """
650  for disconnected_callback in self._disconnected_callbacks:
651  disconnected_callback()
652  self._disconnected_callbacks.clear()
653 
654  async def _async_connected(self) -> None:
655  """Handle device connected."""
656  async with self._connection_lock_connection_lock:
657  if self.connectedconnected: # Already connected
658  return
659  self.connectedconnected = True
660  try:
661  await self._async_run_connected_events_async_run_connected_events()
662  except DeviceConnectionError as err:
663  LOGGER.error(
664  "Error running connected events for device %s: %s", self.name, err
665  )
666  self.last_update_successlast_update_success = False
667 
668  async def _async_run_connected_events(self) -> None:
669  """Run connected events.
670 
671  This will be executed on connect or when the config entry
672  is updated.
673  """
674  if not self.sleep_period:
675  await self._async_connect_ble_scanner_async_connect_ble_scanner()
676  else:
677  await self._async_setup_outbound_websocket_async_setup_outbound_websocket()
678 
679  async def _async_setup_outbound_websocket(self) -> None:
680  """Set up outbound websocket if it is not enabled."""
681  config = self.device.config
682  if (
683  (ws_config := config.get("ws"))
684  and (not ws_config["server"] or not ws_config["enable"])
685  and (ws_url := get_rpc_ws_url(self.hass))
686  ):
687  LOGGER.debug(
688  "Setting up outbound websocket for device %s - %s", self.name, ws_url
689  )
690  await self.device.update_outbound_websocket(ws_url)
691 
692  async def _async_connect_ble_scanner(self) -> None:
693  """Connect BLE scanner."""
694  ble_scanner_mode = self.entryentry.options.get(
695  CONF_BLE_SCANNER_MODE, BLEScannerMode.DISABLED
696  )
697  if ble_scanner_mode == BLEScannerMode.DISABLED and self.connectedconnected:
698  await async_stop_scanner(self.device)
699  return
700  if await async_ensure_ble_enabled(self.device):
701  # BLE enable required a reboot, don't bother connecting
702  # the scanner since it will be disconnected anyway
703  return
704  self._disconnected_callbacks.append(
705  await async_connect_scanner(self.hass, self, ble_scanner_mode)
706  )
707 
708  @callback
710  """Handle device going online."""
711  if self.device.connected or (
712  self._connect_task_connect_task and not self._connect_task_connect_task.done()
713  ):
714  LOGGER.debug("Device %s already connected/connecting", self.name)
715  return
716  self._connect_task_connect_task = self.entryentry.async_create_background_task(
717  self.hass,
718  self._async_device_connect_task(),
719  "rpc device online",
720  eager_start=True,
721  )
722 
723  @callback
725  self, device_: RpcDevice, update_type: RpcUpdateType
726  ) -> None:
727  """Handle device update."""
728  LOGGER.debug("Shelly %s handle update, type: %s", self.name, update_type)
729  if update_type is RpcUpdateType.ONLINE:
730  self._came_online_once_came_online_once = True
731  self._async_handle_rpc_device_online_async_handle_rpc_device_online()
732  elif update_type is RpcUpdateType.INITIALIZED:
733  self.entryentry.async_create_background_task(
734  self.hass, self._async_connected_async_connected(), "rpc device init", eager_start=True
735  )
736  # Make sure entities are marked available
737  self.async_set_updated_data(None)
738  elif update_type is RpcUpdateType.DISCONNECTED:
739  self.entryentry.async_create_background_task(
740  self.hass,
741  self._async_disconnected_async_disconnected(True),
742  "rpc device disconnected",
743  eager_start=True,
744  )
745  # Make sure entities are marked as unavailable
746  self.async_set_updated_data(None)
747  elif update_type is RpcUpdateType.STATUS:
748  self.async_set_updated_data(None)
749  if self.sleep_period:
750  update_device_fw_info(self.hass, self.device, self.entryentry)
751  elif update_type is RpcUpdateType.EVENT and (event := self.device.event):
752  self._async_device_event_handler_async_device_event_handler(event)
753 
754  def async_setup(self, pending_platforms: list[Platform] | None = None) -> None:
755  """Set up the coordinator."""
756  super().async_setup(pending_platforms)
757  self.device.subscribe_updates(self._async_handle_update_async_handle_update)
758  if self.device.initialized:
759  # If we are already initialized, we are connected
760  self.entryentry.async_create_task(
761  self.hass, self._async_connected_async_connected(), eager_start=True
762  )
763 
764  async def shutdown(self) -> None:
765  """Shutdown the coordinator."""
766  if self.device.connected:
767  try:
768  if not self.sleep_period:
769  await async_stop_scanner(self.device)
770  await super().shutdown()
771  except InvalidAuthError:
772  self.entryentry.async_start_reauth(self.hass)
773  return
774  except DeviceConnectionError as err:
775  # If the device is restarting or has gone offline before
776  # the ping/pong timeout happens, the shutdown command
777  # will fail, but we don't care since we are unloading
778  # and if we setup again, we will fix anything that is
779  # in an inconsistent state at that time.
780  LOGGER.debug("Error during shutdown for device %s: %s", self.name, err)
781  return
782  await self._async_disconnected_async_disconnected(False)
783 
784 
786  """Polling coordinator for a Shelly RPC based device."""
787 
788  def __init__(
789  self, hass: HomeAssistant, entry: ShellyConfigEntry, device: RpcDevice
790  ) -> None:
791  """Initialize the RPC polling coordinator."""
792  super().__init__(hass, entry, device, RPC_SENSORS_POLLING_INTERVAL)
793 
794  async def _async_update_data(self) -> None:
795  """Fetch data."""
796  if not self.device.connected:
797  raise UpdateFailed("Device disconnected")
798 
799  LOGGER.debug("Polling Shelly RPC Device - %s", self.name)
800  try:
801  await self.device.poll()
802  except (DeviceConnectionError, RpcCallError) as err:
803  raise UpdateFailed(f"Device disconnected: {err!r}") from err
804  except InvalidAuthError:
805  await self.async_shutdown_device_and_start_reauth()
806 
807 
809  hass: HomeAssistant, device_id: str
810 ) -> ShellyBlockCoordinator | None:
811  """Get a Shelly block device coordinator for the given device id."""
812  dev_reg = dr.async_get(hass)
813  if device := dev_reg.async_get(device_id):
814  for config_entry in device.config_entries:
815  entry = hass.config_entries.async_get_entry(config_entry)
816  if (
817  entry
818  and entry.state is ConfigEntryState.LOADED
819  and hasattr(entry, "runtime_data")
820  and isinstance(entry.runtime_data, ShellyEntryData)
821  and (coordinator := entry.runtime_data.block)
822  ):
823  return coordinator
824 
825  return None
826 
827 
829  hass: HomeAssistant, device_id: str
830 ) -> ShellyRpcCoordinator | None:
831  """Get a Shelly RPC device coordinator for the given device id."""
832  dev_reg = dr.async_get(hass)
833  if device := dev_reg.async_get(device_id):
834  for config_entry in device.config_entries:
835  entry = hass.config_entries.async_get_entry(config_entry)
836  if (
837  entry
838  and entry.state is ConfigEntryState.LOADED
839  and hasattr(entry, "runtime_data")
840  and isinstance(entry.runtime_data, ShellyEntryData)
841  and (coordinator := entry.runtime_data.rpc)
842  ):
843  return coordinator
844 
845  return None
846 
847 
848 async def async_reconnect_soon(hass: HomeAssistant, entry: ShellyConfigEntry) -> None:
849  """Try to reconnect soon."""
850  if (
851  not hass.is_stopping
852  and entry.state is ConfigEntryState.LOADED
853  and (coordinator := entry.runtime_data.rpc)
854  ):
855  entry.async_create_background_task(
856  hass,
857  coordinator.async_device_online("zeroconf"),
858  "reconnect soon",
859  eager_start=True,
860  )
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, BlockDevice device)
Definition: coordinator.py:239
None async_setup(self, list[Platform]|None pending_platforms=None)
Definition: coordinator.py:426
None _async_handle_update(self, BlockDevice device_, BlockUpdateType update_type)
Definition: coordinator.py:381
CALLBACK_TYPE async_subscribe_input_events(self, Callable[[dict[str, Any]], None] input_event_callback)
Definition: coordinator.py:265
None __init__(self, HomeAssistant hass, BlockDevice device, ShellyConfigEntry entry)
Definition: coordinator.py:437
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, RpcDevice device)
Definition: coordinator.py:471
None _async_update_listener(self, HomeAssistant hass, ShellyConfigEntry entry)
Definition: coordinator.py:565
CALLBACK_TYPE async_subscribe_input_events(self, Callable[[dict[str, Any]], None] input_event_callback)
Definition: coordinator.py:540
None _async_handle_update(self, RpcDevice device_, RpcUpdateType update_type)
Definition: coordinator.py:726
CALLBACK_TYPE async_subscribe_events(self, Callable[[dict[str, Any]], None] event_callback)
Definition: coordinator.py:553
None _async_device_event_handler(self, dict[str, Any] event_data)
Definition: coordinator.py:573
None async_setup(self, list[Platform]|None pending_platforms=None)
Definition: coordinator.py:754
CALLBACK_TYPE async_subscribe_ota_events(self, Callable[[dict[str, Any]], None] ota_event_callback)
Definition: coordinator.py:527
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, RpcDevice device)
Definition: coordinator.py:790
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
CALLBACK_TYPE async_connect_scanner(HomeAssistant hass, RuntimeEntryData entry_data, APIClient cli, DeviceInfo device_info, ESPHomeBluetoothCache cache)
Definition: bluetooth.py:32
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
None _handle_ha_stop(self, Event _event)
Definition: coordinator.py:170
None async_reconnect_soon(HomeAssistant hass, ShellyConfigEntry entry)
Definition: coordinator.py:848
None async_setup(self, list[Platform]|None pending_platforms=None)
Definition: coordinator.py:149
ShellyRpcCoordinator|None get_rpc_coordinator_by_device_id(HomeAssistant hass, str device_id)
Definition: coordinator.py:830
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, _DeviceT device, float update_interval)
Definition: coordinator.py:103
ShellyBlockCoordinator|None get_block_coordinator_by_device_id(HomeAssistant hass, str device_id)
Definition: coordinator.py:810
int get_rpc_device_wakeup_period(dict[str, Any] status)
Definition: utils.py:293
str|None get_rpc_ws_url(HomeAssistant hass)
Definition: utils.py:585
None async_create_issue_unsupported_firmware(HomeAssistant hass, ConfigEntry entry)
Definition: utils.py:465
None update_device_fw_info(HomeAssistant hass, BlockDevice|RpcDevice shellydevice, ConfigEntry entry)
Definition: utils.py:421
int get_block_device_sleep_period(dict[str, Any] settings)
Definition: utils.py:281