1 """Data UpdateCoordinator for the Husqvarna Automower integration."""
4 from datetime
import timedelta
7 from aioautomower.exceptions
import (
10 HusqvarnaWSServerHandshakeError,
13 from aioautomower.model
import MowerAttributes
14 from aioautomower.session
import AutomowerSession
21 from .const
import DOMAIN
23 _LOGGER = logging.getLogger(__name__)
24 MAX_WS_RECONNECT_TIME = 600
26 DEFAULT_RECONNECT_TIME = 2
30 """Class to manage fetching Husqvarna data."""
32 config_entry: ConfigEntry
35 self, hass: HomeAssistant, api: AutomowerSession, entry: ConfigEntry
37 """Initialize data updater."""
42 update_interval=SCAN_INTERVAL,
49 """Subscribe for websocket and poll data from the API."""
51 await self.
apiapi.connect()
52 self.
apiapi.register_data_callback(self.
callbackcallback)
56 except ApiException
as err:
58 except AuthException
as err:
62 def callback(self, ws_data: dict[str, MowerAttributes]) ->
None:
63 """Process websocket callbacks and write them to the DataUpdateCoordinator."""
70 automower_client: AutomowerSession,
72 """Listen with the client."""
74 await automower_client.auth.websocket_connect()
77 await automower_client.start_listening()
78 except HusqvarnaWSServerHandshakeError
as err:
80 "Failed to connect to websocket. Trying to reconnect: %s",
83 except TimeoutException
as err:
85 "Failed to listen to websocket. Trying to reconnect: %s",
88 if not hass.is_stopping:
91 entry.async_create_background_task(
93 self.
client_listenclient_listen(hass, entry, automower_client),
dict[str, MowerAttributes] _async_update_data(self)
None callback(self, dict[str, MowerAttributes] ws_data)
None client_listen(self, HomeAssistant hass, ConfigEntry entry, AutomowerSession automower_client)
None __init__(self, HomeAssistant hass, AutomowerSession api, ConfigEntry entry)
None async_set_updated_data(self, _DataT data)
def get_status(hass, host, port)