1 """Coordinator for the RainMachine integration."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Coroutine
6 from datetime
import timedelta
7 from typing
import TYPE_CHECKING, Any
11 async_dispatcher_connect,
12 async_dispatcher_send,
16 from .const
import LOGGER
18 SIGNAL_REBOOT_COMPLETED =
"rainmachine_reboot_completed_{0}"
19 SIGNAL_REBOOT_REQUESTED =
"rainmachine_reboot_requested_{0}"
22 from .
import RainMachineConfigEntry
26 """Define an extended DataUpdateCoordinator."""
28 config_entry: RainMachineConfigEntry
34 entry: RainMachineConfigEntry,
37 update_interval: timedelta,
38 update_method: Callable[[], Coroutine[Any, Any, dict]],
45 update_interval=update_interval,
46 update_method=update_method,
51 self._signal_handler_unsubs: list[Callable[[],
None]] = []
62 """Initialize the coordinator."""
65 def async_reboot_completed() -> None:
66 """Respond to a reboot completed notification."""
67 LOGGER.debug(
"%s responding to reboot complete", self.
namename)
73 def async_reboot_requested() -> None:
74 """Respond to a reboot request."""
75 LOGGER.debug(
"%s responding to reboot request", self.
namename)
84 self._signal_handler_unsubs.append(
89 def async_check_reboot_complete() -> None:
90 """Check whether an active reboot has been completed."""
92 LOGGER.debug(
"%s discovered reboot complete", self.
namename)
98 def async_teardown() -> None:
99 """Tear the coordinator down appropriately."""
100 for unsub
in self._signal_handler_unsubs:
None __init__(self, HomeAssistant hass, *RainMachineConfigEntry entry, str name, str api_category, timedelta update_interval, Callable[[], Coroutine[Any, Any, dict]] update_method)
None async_initialize(self)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None async_update_listeners(self)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)