1 """Coordinators for the Shelly integration."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine
7 from dataclasses
import dataclass
8 from datetime
import timedelta
9 from typing
import Any, cast
11 from aioshelly.ble
import async_ensure_ble_enabled, async_stop_scanner
12 from aioshelly.block_device
import BlockDevice, BlockUpdateType
13 from aioshelly.const
import MODEL_NAMES, MODEL_VALVE
14 from aioshelly.exceptions
import (
15 DeviceConnectionError,
17 MacAddressMismatchError,
20 from aioshelly.rpc_device
import RpcDevice, RpcUpdateType
21 from propcache
import cached_property
27 EVENT_HOMEASSISTANT_STOP,
36 from .bluetooth
import async_connect_scanner
42 BATTERY_DEVICES_WITH_PERMANENT_CONNECTION,
43 CONF_BLE_SCANNER_MODE,
46 DUAL_MODE_LIGHT_MODELS,
47 ENTRY_RELOAD_COOLDOWN,
51 MAX_PUSH_UPDATE_FAILURES,
52 MODELS_SUPPORTING_LIGHT_EFFECTS,
58 REST_SENSORS_UPDATE_INTERVAL,
59 RPC_INPUTS_EVENTS_TYPES,
60 RPC_RECONNECT_INTERVAL,
61 RPC_SENSORS_POLLING_INTERVAL,
63 UPDATE_PERIOD_MULTIPLIER,
67 async_create_issue_unsupported_firmware,
68 get_block_device_sleep_period,
72 get_rpc_device_wakeup_period,
74 update_device_fw_info,
80 """Class for sharing data within a given config entry."""
82 platforms: list[Platform]
83 block: ShellyBlockCoordinator |
None =
None
84 rest: ShellyRestCoordinator |
None =
None
85 rpc: ShellyRpcCoordinator |
None =
None
86 rpc_poll: ShellyRpcPollingCoordinator |
None =
None
89 type ShellyConfigEntry = ConfigEntry[ShellyEntryData]
93 DataUpdateCoordinator[
None]
95 """Coordinator for a Shelly device."""
100 entry: ShellyConfigEntry,
102 update_interval: float,
104 """Initialize the Shelly device coordinator."""
107 self.device_id: str |
None =
None
108 self._pending_platforms: list[Platform] |
None =
None
109 device_name = device.name
if device.initialized
else entry.title
110 interval_td =
timedelta(seconds=update_interval)
113 self._came_online_once =
False
114 super().
__init__(hass, LOGGER, name=device_name, update_interval=interval_td)
116 self._debounced_reload: Debouncer[Coroutine[Any, Any,
None]] =
Debouncer(
119 cooldown=ENTRY_RELOAD_COOLDOWN,
121 function=self._async_reload_entry,
123 entry.async_on_unload(self._debounced_reload.async_shutdown)
125 entry.async_on_unload(
126 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
131 """Model of the device."""
132 return cast(str, self.entry.data[
"model"])
136 """Mac address of the device."""
137 return cast(str, self.entry.unique_id)
141 """Firmware version of the device."""
142 return self.device.firmware_version
if self.device.initialized
else ""
146 """Sleep period of the device."""
147 return self.entry.data.get(CONF_SLEEP_PERIOD, 0)
149 def async_setup(self, pending_platforms: list[Platform] |
None =
None) ->
None:
150 """Set up the coordinator."""
151 self._pending_platforms = pending_platforms
152 dev_reg = dr.async_get(self.hass)
153 device_entry = dev_reg.async_get_or_create(
154 config_entry_id=self.entry.entry_id,
156 connections={(CONNECTION_NETWORK_MAC, self.mac)},
157 manufacturer=
"Shelly",
158 model=MODEL_NAMES.get(self.model),
160 sw_version=self.sw_version,
161 hw_version=f
"gen{get_device_entry_gen(self.entry)}",
162 configuration_url=f
"http://{get_host(self.entry.data[CONF_HOST])}:{get_http_port(self.entry.data)}",
164 self.device_id = device_entry.id
167 """Shutdown the coordinator."""
171 """Handle Home Assistant stopping."""
172 LOGGER.debug(
"Stopping RPC device coordinator for %s", self.name)
173 await self.shutdown()
176 """Connect to a Shelly device task."""
177 LOGGER.debug(
"Connecting to Shelly Device - %s", self.name)
179 await self.device.initialize()
181 except (DeviceConnectionError, MacAddressMismatchError)
as err:
183 "Error connecting to Shelly device %s, error: %r", self.name, err
186 except InvalidAuthError:
187 self.entry.async_start_reauth(self.hass)
190 if not self.device.firmware_supported:
194 if not self._pending_platforms:
197 LOGGER.debug(
"Device %s is online, resuming setup", self.name)
198 platforms = self._pending_platforms
199 self._pending_platforms =
None
201 data = {**self.entry.data}
204 old_sleep_period = data[CONF_SLEEP_PERIOD]
205 if isinstance(self.device, RpcDevice):
207 elif isinstance(self.device, BlockDevice):
210 if new_sleep_period != old_sleep_period:
211 data[CONF_SLEEP_PERIOD] = new_sleep_period
212 self.hass.config_entries.async_update_entry(self.entry, data=data)
215 await self.hass.config_entries.async_forward_entry_setups(self.entry, platforms)
221 self._debounced_reload.async_cancel()
222 LOGGER.debug(
"Reloading entry %s", self.name)
223 await self.hass.config_entries.async_reload(self.entry.entry_id)
226 """Shutdown Shelly device and start reauth flow."""
229 self.last_update_success =
False
230 await self.shutdown()
231 self.entry.async_start_reauth(self.hass)
235 """Coordinator for a Shelly block based device."""
238 self, hass: HomeAssistant, entry: ShellyConfigEntry, device: BlockDevice
240 """Initialize the Shelly block device coordinator."""
242 if self.sleep_period:
243 update_interval = UPDATE_PERIOD_MULTIPLIER * self.sleep_period
246 UPDATE_PERIOD_MULTIPLIER * device.settings[
"coiot"][
"update_period"]
248 super().
__init__(hass, entry, device, update_interval)
253 self._last_input_events_count: dict = {}
254 self._last_target_temp: float |
None =
None
256 self._input_event_listeners: list[Callable[[dict[str, Any]],
None]] = []
258 entry.async_on_unload(
264 self, input_event_callback: Callable[[dict[str, Any]],
None]
266 """Subscribe to input events."""
268 def _unsubscribe() -> None:
269 self._input_event_listeners.
remove(input_event_callback)
271 self._input_event_listeners.append(input_event_callback)
277 """Handle device updates."""
278 if not self.device.initialized:
282 if self.
modelmodel
in SHBTN_MODELS
and self._last_input_events_count.
get(1)
is None:
283 for block
in self.device.blocks:
284 if block.type !=
"device":
287 wakeup_event = cast(list, block.wakeupEvent)
288 if len(wakeup_event) == 1
and wakeup_event[0] ==
"button":
289 self._last_input_events_count[1] = -1
295 for block
in self.device.blocks:
296 if block.type ==
"device" and block.cfgChanged
is not None:
297 cfg_changed = cast(int, block.cfgChanged)
305 if self.
modelmodel
in DUAL_MODE_LIGHT_MODELS:
306 if "mode" in block.sensor_ids:
311 if self.
modelmodel
in MODELS_SUPPORTING_LIGHT_EFFECTS:
312 if "effect" in block.sensor_ids:
318 "inputEvent" not in block.sensor_ids
319 or "inputEventCnt" not in block.sensor_ids
321 LOGGER.debug(
"Skipping non-input event block %s", block.description)
324 channel =
int(block.channel
or 0) + 1
325 event_type = block.inputEvent
326 last_event_count = self._last_input_events_count.
get(channel)
327 self._last_input_events_count[channel] = block.inputEventCnt
330 last_event_count
is None
331 or last_event_count == block.inputEventCnt
334 LOGGER.debug(
"Skipping block event %s", event_type)
337 if event_type
in INPUTS_EVENTS_DICT:
338 for event_callback
in self._input_event_listeners:
340 {
"channel": channel,
"event": INPUTS_EVENTS_DICT[event_type]}
342 self.hass.bus.async_fire(
345 ATTR_DEVICE_ID: self.device_id,
346 ATTR_DEVICE: self.device.settings[
"device"][
"hostname"],
347 ATTR_CHANNEL: channel,
348 ATTR_CLICK_TYPE: INPUTS_EVENTS_DICT[event_type],
355 "Config for %s changed, reloading entry in %s seconds",
357 ENTRY_RELOAD_COOLDOWN,
359 self._debounced_reload.async_schedule_call()
364 if self.sleep_period:
367 f
"Sleeping device did not update within {self.sleep_period} seconds interval"
370 LOGGER.debug(
"Polling Shelly Block Device - %s", self.name)
372 await self.device.
update()
373 except DeviceConnectionError
as err:
374 raise UpdateFailed(f
"Error fetching data: {err!r}")
from err
375 except InvalidAuthError:
376 await self.async_shutdown_device_and_start_reauth()
380 self, device_: BlockDevice, update_type: BlockUpdateType
382 """Handle device update."""
383 LOGGER.debug(
"Shelly %s handle update, type: %s", self.name, update_type)
384 if update_type
is BlockUpdateType.ONLINE:
386 self.
entryentry.async_create_background_task(
388 self._async_device_connect_task(),
389 "block device online",
392 elif update_type
is BlockUpdateType.COAP_PERIODIC:
394 ir.async_delete_issue(
397 PUSH_UPDATE_ISSUE_ID.format(unique=self.mac),
400 elif update_type
is BlockUpdateType.COAP_REPLY:
404 "Creating issue %s", PUSH_UPDATE_ISSUE_ID.format(unique=self.mac)
406 ir.async_create_issue(
409 PUSH_UPDATE_ISSUE_ID.format(unique=self.mac),
412 severity=ir.IssueSeverity.ERROR,
413 learn_more_url=
"https://www.home-assistant.io/integrations/shelly/#shelly-device-configuration-generation-1",
414 translation_key=
"push_update_failure",
415 translation_placeholders={
416 "device_name": self.
entryentry.title,
417 "ip_address": self.device.ip_address,
424 self.async_set_updated_data(
None)
426 def async_setup(self, pending_platforms: list[Platform] |
None =
None) ->
None:
427 """Set up the coordinator."""
433 """Coordinator for a Shelly REST device."""
436 self, hass: HomeAssistant, device: BlockDevice, entry: ShellyConfigEntry
438 """Initialize the Shelly REST device coordinator."""
439 update_interval = REST_SENSORS_UPDATE_INTERVAL
441 device.settings[
"device"][
"type"]
442 in BATTERY_DEVICES_WITH_PERMANENT_CONNECTION
445 UPDATE_PERIOD_MULTIPLIER * device.settings[
"coiot"][
"update_period"]
447 super().
__init__(hass, entry, device, update_interval)
451 LOGGER.debug(
"REST update for %s", self.name)
453 await self.device.update_status()
455 if self.device.status[
"uptime"] > 2 * REST_SENSORS_UPDATE_INTERVAL:
457 await self.device.update_shelly()
458 except (DeviceConnectionError, MacAddressMismatchError)
as err:
459 raise UpdateFailed(f
"Error fetching data: {err!r}")
from err
460 except InvalidAuthError:
461 await self.async_shutdown_device_and_start_reauth()
467 """Coordinator for a Shelly RPC based device."""
470 self, hass: HomeAssistant, entry: ShellyConfigEntry, device: RpcDevice
472 """Initialize the Shelly RPC device coordinator."""
474 if self.sleep_period:
475 update_interval = UPDATE_PERIOD_MULTIPLIER * self.sleep_period
477 update_interval = RPC_RECONNECT_INTERVAL
478 super().
__init__(hass, entry, device, update_interval)
481 self._disconnected_callbacks: list[CALLBACK_TYPE] = []
483 self._event_listeners: list[Callable[[dict[str, Any]],
None]] = []
484 self._ota_event_listeners: list[Callable[[dict[str, Any]],
None]] = []
485 self._input_event_listeners: list[Callable[[dict[str, Any]],
None]] = []
490 """Handle device going online."""
491 if not self.sleep_period:
492 await self.async_request_refresh()
495 "Sleepy device %s is online (source: %s), trying to poll and configure",
507 """Check device sleep period & update if changed."""
509 not self.device.initialized
511 or wakeup_period == self.sleep_period
515 data = {**self.
entryentry.data}
516 data[CONF_SLEEP_PERIOD] = wakeup_period
517 self.hass.config_entries.async_update_entry(self.
entryentry, data=data)
519 update_interval = UPDATE_PERIOD_MULTIPLIER * wakeup_period
526 self, ota_event_callback: Callable[[dict[str, Any]],
None]
528 """Subscribe to OTA events."""
530 def _unsubscribe() -> None:
531 self._ota_event_listeners.
remove(ota_event_callback)
533 self._ota_event_listeners.append(ota_event_callback)
539 self, input_event_callback: Callable[[dict[str, Any]],
None]
541 """Subscribe to input events."""
543 def _unsubscribe() -> None:
544 self._input_event_listeners.
remove(input_event_callback)
546 self._input_event_listeners.append(input_event_callback)
552 self, event_callback: Callable[[dict[str, Any]],
None]
554 """Subscribe to events."""
556 def _unsubscribe() -> None:
557 self._event_listeners.
remove(event_callback)
559 self._event_listeners.append(event_callback)
564 self, hass: HomeAssistant, entry: ShellyConfigEntry
566 """Reconfigure on update."""
574 """Handle device events."""
575 events: list[dict[str, Any]] = event_data[
"events"]
577 event_type = event.get(
"event")
578 if event_type
is None:
581 for event_callback
in self._event_listeners:
582 event_callback(event)
584 if event_type
in (
"component_added",
"component_removed",
"config_changed"):
587 "Config for %s changed, reloading entry in %s seconds",
589 ENTRY_RELOAD_COOLDOWN,
591 self._debounced_reload.async_schedule_call()
592 elif event_type
in RPC_INPUTS_EVENTS_TYPES:
593 for event_callback
in self._input_event_listeners:
594 event_callback(event)
595 self.hass.bus.async_fire(
598 ATTR_DEVICE_ID: self.device_id,
599 ATTR_DEVICE: self.device.hostname,
600 ATTR_CHANNEL: event[
"id"] + 1,
601 ATTR_CLICK_TYPE: event[
"event"],
605 elif event_type
in (OTA_BEGIN, OTA_ERROR, OTA_PROGRESS, OTA_SUCCESS):
606 for event_callback
in self._ota_event_listeners:
607 event_callback(event)
614 if self.sleep_period:
617 f
"Sleeping device did not update within {self.sleep_period} seconds interval"
621 if self.device.connected:
624 if not await self._async_device_connect_task():
628 """Handle device disconnected."""
636 if self.sleep_period:
641 await self.async_request_refresh()
645 """Run disconnected events.
647 This will be executed on disconnect or when the config entry
650 for disconnected_callback
in self._disconnected_callbacks:
651 disconnected_callback()
652 self._disconnected_callbacks.clear()
655 """Handle device connected."""
662 except DeviceConnectionError
as err:
664 "Error running connected events for device %s: %s", self.name, err
669 """Run connected events.
671 This will be executed on connect or when the config entry
674 if not self.sleep_period:
680 """Set up outbound websocket if it is not enabled."""
681 config = self.device.config
683 (ws_config := config.get(
"ws"))
684 and (
not ws_config[
"server"]
or not ws_config[
"enable"])
688 "Setting up outbound websocket for device %s - %s", self.name, ws_url
690 await self.device.update_outbound_websocket(ws_url)
693 """Connect BLE scanner."""
694 ble_scanner_mode = self.
entryentry.options.get(
695 CONF_BLE_SCANNER_MODE, BLEScannerMode.DISABLED
697 if ble_scanner_mode == BLEScannerMode.DISABLED
and self.
connectedconnected:
698 await async_stop_scanner(self.device)
700 if await async_ensure_ble_enabled(self.device):
704 self._disconnected_callbacks.append(
710 """Handle device going online."""
711 if self.device.connected
or (
714 LOGGER.debug(
"Device %s already connected/connecting", self.name)
718 self._async_device_connect_task(),
725 self, device_: RpcDevice, update_type: RpcUpdateType
727 """Handle device update."""
728 LOGGER.debug(
"Shelly %s handle update, type: %s", self.name, update_type)
729 if update_type
is RpcUpdateType.ONLINE:
732 elif update_type
is RpcUpdateType.INITIALIZED:
733 self.
entryentry.async_create_background_task(
734 self.hass, self.
_async_connected_async_connected(),
"rpc device init", eager_start=
True
737 self.async_set_updated_data(
None)
738 elif update_type
is RpcUpdateType.DISCONNECTED:
739 self.
entryentry.async_create_background_task(
742 "rpc device disconnected",
746 self.async_set_updated_data(
None)
747 elif update_type
is RpcUpdateType.STATUS:
748 self.async_set_updated_data(
None)
749 if self.sleep_period:
751 elif update_type
is RpcUpdateType.EVENT
and (event := self.device.event):
754 def async_setup(self, pending_platforms: list[Platform] |
None =
None) ->
None:
755 """Set up the coordinator."""
758 if self.device.initialized:
760 self.
entryentry.async_create_task(
765 """Shutdown the coordinator."""
766 if self.device.connected:
768 if not self.sleep_period:
769 await async_stop_scanner(self.device)
771 except InvalidAuthError:
772 self.
entryentry.async_start_reauth(self.hass)
774 except DeviceConnectionError
as err:
780 LOGGER.debug(
"Error during shutdown for device %s: %s", self.name, err)
786 """Polling coordinator for a Shelly RPC based device."""
789 self, hass: HomeAssistant, entry: ShellyConfigEntry, device: RpcDevice
791 """Initialize the RPC polling coordinator."""
792 super().
__init__(hass, entry, device, RPC_SENSORS_POLLING_INTERVAL)
796 if not self.device.connected:
799 LOGGER.debug(
"Polling Shelly RPC Device - %s", self.name)
801 await self.device.poll()
802 except (DeviceConnectionError, RpcCallError)
as err:
803 raise UpdateFailed(f
"Device disconnected: {err!r}")
from err
804 except InvalidAuthError:
805 await self.async_shutdown_device_and_start_reauth()
809 hass: HomeAssistant, device_id: str
810 ) -> ShellyBlockCoordinator |
None:
811 """Get a Shelly block device coordinator for the given device id."""
812 dev_reg = dr.async_get(hass)
813 if device := dev_reg.async_get(device_id):
814 for config_entry
in device.config_entries:
815 entry = hass.config_entries.async_get_entry(config_entry)
818 and entry.state
is ConfigEntryState.LOADED
819 and hasattr(entry,
"runtime_data")
820 and isinstance(entry.runtime_data, ShellyEntryData)
821 and (coordinator := entry.runtime_data.block)
829 hass: HomeAssistant, device_id: str
830 ) -> ShellyRpcCoordinator |
None:
831 """Get a Shelly RPC device coordinator for the given device id."""
832 dev_reg = dr.async_get(hass)
833 if device := dev_reg.async_get(device_id):
834 for config_entry
in device.config_entries:
835 entry = hass.config_entries.async_get_entry(config_entry)
838 and entry.state
is ConfigEntryState.LOADED
839 and hasattr(entry,
"runtime_data")
840 and isinstance(entry.runtime_data, ShellyEntryData)
841 and (coordinator := entry.runtime_data.rpc)
849 """Try to reconnect soon."""
852 and entry.state
is ConfigEntryState.LOADED
853 and (coordinator := entry.runtime_data.rpc)
855 entry.async_create_background_task(
857 coordinator.async_device_online(
"zeroconf"),
None _async_device_updates_handler(self)
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, BlockDevice device)
None async_setup(self, list[Platform]|None pending_platforms=None)
None _async_handle_update(self, BlockDevice device_, BlockUpdateType update_type)
None _async_update_data(self)
CALLBACK_TYPE async_subscribe_input_events(self, Callable[[dict[str, Any]], None] input_event_callback)
None __init__(self, HomeAssistant hass, BlockDevice device, ShellyConfigEntry entry)
None _async_update_data(self)
None _async_run_connected_events(self)
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, RpcDevice device)
None _async_update_listener(self, HomeAssistant hass, ShellyConfigEntry entry)
CALLBACK_TYPE async_subscribe_input_events(self, Callable[[dict[str, Any]], None] input_event_callback)
None async_device_online(self, str source)
None _async_connected(self)
bool update_sleep_period(self)
None _async_setup_outbound_websocket(self)
None _async_connect_ble_scanner(self)
None _async_handle_update(self, RpcDevice device_, RpcUpdateType update_type)
None _async_update_data(self)
CALLBACK_TYPE async_subscribe_events(self, Callable[[dict[str, Any]], None] event_callback)
None _async_device_event_handler(self, dict[str, Any] event_data)
None async_setup(self, list[Platform]|None pending_platforms=None)
None _async_handle_rpc_device_online(self)
None _async_run_disconnected_events(self)
None _async_disconnected(self, bool reconnect)
CALLBACK_TYPE async_subscribe_ota_events(self, Callable[[dict[str, Any]], None] ota_event_callback)
None _async_update_data(self)
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, RpcDevice device)
bool remove(self, _T matcher)
web.Response get(self, web.Request request, str config_key)
CALLBACK_TYPE async_connect_scanner(HomeAssistant hass, RuntimeEntryData entry_data, APIClient cli, DeviceInfo device_info, ESPHomeBluetoothCache cache)
IssData update(pyiss.ISS iss)
None async_shutdown_device_and_start_reauth(self)
None _handle_ha_stop(self, Event _event)
None async_reconnect_soon(HomeAssistant hass, ShellyConfigEntry entry)
None async_setup(self, list[Platform]|None pending_platforms=None)
None _async_reload_entry(self)
ShellyRpcCoordinator|None get_rpc_coordinator_by_device_id(HomeAssistant hass, str device_id)
None __init__(self, HomeAssistant hass, ShellyConfigEntry entry, _DeviceT device, float update_interval)
bool _async_device_connect_task(self)
ShellyBlockCoordinator|None get_block_coordinator_by_device_id(HomeAssistant hass, str device_id)
int get_rpc_device_wakeup_period(dict[str, Any] status)
str|None get_rpc_ws_url(HomeAssistant hass)
None async_create_issue_unsupported_firmware(HomeAssistant hass, ConfigEntry entry)
None update_device_fw_info(HomeAssistant hass, BlockDevice|RpcDevice shellydevice, ConfigEntry entry)
int get_block_device_sleep_period(dict[str, Any] settings)