1 """Coordinator for La Marzocco API."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Coroutine
6 from datetime
import timedelta
11 from pylamarzocco.client_bluetooth
import LaMarzoccoBluetoothClient
12 from pylamarzocco.client_cloud
import LaMarzoccoCloudClient
13 from pylamarzocco.client_local
import LaMarzoccoLocalClient
14 from pylamarzocco.exceptions
import AuthFail, RequestNotSuccessful
15 from pylamarzocco.lm_machine
import LaMarzoccoMachine
16 from websockets.protocol
import State
24 from .const
import DOMAIN
27 FIRMWARE_UPDATE_INTERVAL = 3600
28 STATISTICS_UPDATE_INTERVAL = 300
30 _LOGGER = logging.getLogger(__name__)
32 type LaMarzoccoConfigEntry = ConfigEntry[LaMarzoccoUpdateCoordinator]
36 """Class to handle fetching data from the La Marzocco API centrally."""
38 config_entry: LaMarzoccoConfigEntry
43 entry: LaMarzoccoConfigEntry,
44 cloud_client: LaMarzoccoCloudClient,
45 local_client: LaMarzoccoLocalClient |
None,
46 bluetooth_client: LaMarzoccoBluetoothClient |
None,
48 """Initialize coordinator."""
54 update_interval=SCAN_INTERVAL,
59 self.
devicedevice = LaMarzoccoMachine(
63 cloud_client=cloud_client,
64 local_client=local_client,
65 bluetooth_client=bluetooth_client,
73 """Set up the coordinator."""
75 _LOGGER.debug(
"Init WebSocket in background task")
77 self.
config_entryconfig_entry.async_create_background_task(
79 target=self.
devicedevice.websocket_connect(
82 name=
"lm_websocket_task",
85 async
def websocket_close(_: Any |
None =
None) ->
None:
89 and self.
_local_client_local_client.websocket.state
is State.OPEN
95 self.
hasshass.bus.async_listen_once(
96 EVENT_HOMEASSISTANT_STOP, websocket_close
99 self.
config_entryconfig_entry.async_on_unload(websocket_close)
102 """Fetch data from API endpoint."""
103 await self._async_handle_request(self.
devicedevice.get_config)
109 await self._async_handle_request(self.
devicedevice.get_firmware)
116 await self._async_handle_request(self.
devicedevice.get_statistics)
119 _LOGGER.debug(
"Current status: %s",
str(self.
devicedevice.config))
121 async
def _async_handle_request[**_P](
123 func: Callable[_P, Coroutine[
None,
None,
None]],
128 await func(*args, **kwargs)
129 except AuthFail
as ex:
130 _LOGGER.debug(
"Authentication failed", exc_info=
True)
132 translation_domain=DOMAIN, translation_key=
"authentication_failed"
134 except RequestNotSuccessful
as ex:
135 _LOGGER.debug(ex, exc_info=
True)
137 translation_domain=DOMAIN, translation_key=
"api_error"
None __init__(self, HomeAssistant hass, LaMarzoccoConfigEntry entry, LaMarzoccoCloudClient cloud_client, LaMarzoccoLocalClient|None local_client, LaMarzoccoBluetoothClient|None bluetooth_client)
_last_firmware_data_update
_last_statistics_data_update
None _async_update_data(self)
local_connection_configured
None async_set_updated_data(self, _DataT data)
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)