1 """Coordinators for the PrusaLink integration."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
7 from datetime
import timedelta
9 from time
import monotonic
10 from typing
import TypeVar
12 from httpx
import ConnectError
13 from pyprusalink
import (
20 from pyprusalink.types
import InvalidAuth, PrusaLinkError
26 from .const
import DOMAIN
28 _LOGGER = logging.getLogger(__name__)
31 T = TypeVar(
"T", PrinterStatus, LegacyPrinterStatus, JobInfo)
35 """Update coordinator for the printer."""
37 config_entry: ConfigEntry
38 expect_change_until = 0.0
40 def __init__(self, hass: HomeAssistant, api: PrusaLink) ->
None:
41 """Initialize the update coordinator."""
49 """Update the data."""
51 async
with asyncio.timeout(5):
55 except PrusaLinkError
as err:
57 except (TimeoutError, ConnectError)
as err:
65 """Fetch the actual data."""
66 raise NotImplementedError
70 """Expect a change."""
74 """Get new update interval."""
82 """Printer update coordinator."""
85 """Fetch the printer data."""
90 """Printer legacy update coordinator."""
93 """Fetch the printer data."""
94 return await self.
apiapi.get_legacy_printer()
98 """Job update coordinator."""
101 """Fetch the printer data."""
102 return await self.
apiapi.get_job()
106 """Info update coordinator."""
109 """Fetch the printer data."""
PrinterInfo _fetch_data(self)
JobInfo _fetch_data(self)
LegacyPrinterStatus _fetch_data(self)
T _async_update_data(self)
timedelta _get_update_interval(self, T data)
None __init__(self, HomeAssistant hass, PrusaLink api)
float expect_change_until
PrinterStatus _fetch_data(self)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)
dict[str, Any]|None get_info(HomeAssistant hass)
def get_status(hass, host, port)