1 """Define Guardian-specific utilities."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine
7 from datetime
import timedelta
8 from typing
import Any, cast
10 from aioguardian
import Client
11 from aioguardian.errors
import GuardianError
18 from .const
import LOGGER
22 SIGNAL_REBOOT_REQUESTED =
"guardian_reboot_requested_{0}"
26 """Define an extended DataUpdateCoordinator with some Guardian goodies."""
28 config_entry: ConfigEntry
37 api_coro: Callable[[], Coroutine[Any, Any, dict[str, Any]]],
38 api_lock: asyncio.Lock,
39 valve_controller_uid: str,
45 name=f
"{valve_controller_uid}_{api_name}",
46 update_interval=DEFAULT_UPDATE_INTERVAL,
59 """Execute a "locked" API request against the valve controller."""
63 except GuardianError
as err:
65 return cast(dict[str, Any], resp[
"data"])
68 """Initialize the coordinator."""
71 def async_reboot_requested() -> None:
72 """Respond to a reboot request."""
None async_initialize(self)
None __init__(self, HomeAssistant hass, *ConfigEntry entry, Client client, str api_name, Callable[[], Coroutine[Any, Any, dict[str, Any]]] api_coro, asyncio.Lock api_lock, str valve_controller_uid)
dict[str, Any] _async_update_data(self)
None async_update_listeners(self)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)