1 """The data update coordinator for OctoPrint."""
3 from __future__
import annotations
5 from datetime
import timedelta
7 from typing
import cast
9 from pyoctoprintapi
import ApiError, OctoprintClient, PrinterOffline
10 from pyoctoprintapi.exceptions
import UnauthorizedException
21 from .const
import DOMAIN
23 _LOGGER = logging.getLogger(__name__)
27 """Class to manage fetching Octoprint data."""
29 config_entry: ConfigEntry
34 octoprint: OctoprintClient,
35 config_entry: ConfigEntry,
42 name=f
"octoprint-{config_entry.entry_id}",
43 update_interval=
timedelta(seconds=interval),
48 self.
datadatadata = {
"printer":
None,
"job":
None,
"last_read_time":
None}
51 """Update data via API."""
54 job = await self.
_octoprint_octoprint.get_job_info()
55 except UnauthorizedException
as err:
56 raise ConfigEntryAuthFailed
from err
57 except ApiError
as err:
64 printer = await self.
_octoprint_octoprint.get_printer_info()
65 except PrinterOffline:
67 _LOGGER.debug(
"Unable to retrieve printer information: Printer offline")
69 except UnauthorizedException
as err:
70 raise ConfigEntryAuthFailed
from err
71 except ApiError
as err:
76 return {
"job": job,
"printer": printer,
"last_read_time": dt_util.utcnow()}
82 configuration_url = URL.build(
90 identifiers={(DOMAIN, unique_id)},
91 manufacturer=
"OctoPrint",
93 configuration_url=
str(configuration_url),
DeviceInfo device_info(self)
def _async_update_data(self)
None __init__(self, HomeAssistant hass, OctoprintClient octoprint, ConfigEntry config_entry, int interval)