Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinators for the PrusaLink integration."""
2 
3 from __future__ import annotations
4 
5 from abc import ABC, abstractmethod
6 import asyncio
7 from datetime import timedelta
8 import logging
9 from time import monotonic
10 from typing import TypeVar
11 
12 from httpx import ConnectError
13 from pyprusalink import (
14  JobInfo,
15  LegacyPrinterStatus,
16  PrinterInfo,
17  PrinterStatus,
18  PrusaLink,
19 )
20 from pyprusalink.types import InvalidAuth, PrusaLinkError
21 
22 from homeassistant.config_entries import ConfigEntry
23 from homeassistant.core import HomeAssistant, callback
24 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
25 
26 from .const import DOMAIN
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 
31 T = TypeVar("T", PrinterStatus, LegacyPrinterStatus, JobInfo)
32 
33 
35  """Update coordinator for the printer."""
36 
37  config_entry: ConfigEntry
38  expect_change_until = 0.0
39 
40  def __init__(self, hass: HomeAssistant, api: PrusaLink) -> None:
41  """Initialize the update coordinator."""
42  self.apiapi = api
43 
44  super().__init__(
45  hass, _LOGGER, name=DOMAIN, update_interval=self._get_update_interval_get_update_interval(None)
46  )
47 
48  async def _async_update_data(self) -> T:
49  """Update the data."""
50  try:
51  async with asyncio.timeout(5):
52  data = await self._fetch_data_fetch_data()
53  except InvalidAuth:
54  raise UpdateFailed("Invalid authentication") from None
55  except PrusaLinkError as err:
56  raise UpdateFailed(str(err)) from err
57  except (TimeoutError, ConnectError) as err:
58  raise UpdateFailed("Cannot connect") from err
59 
60  self.update_intervalupdate_intervalupdate_intervalupdate_intervalupdate_interval = self._get_update_interval_get_update_interval(data)
61  return data
62 
63  @abstractmethod
64  async def _fetch_data(self) -> T:
65  """Fetch the actual data."""
66  raise NotImplementedError
67 
68  @callback
69  def expect_change(self) -> None:
70  """Expect a change."""
71  self.expect_change_untilexpect_change_untilexpect_change_until = monotonic() + 30
72 
73  def _get_update_interval(self, data: T) -> timedelta:
74  """Get new update interval."""
75  if self.expect_change_untilexpect_change_untilexpect_change_until > monotonic():
76  return timedelta(seconds=5)
77 
78  return timedelta(seconds=30)
79 
80 
82  """Printer update coordinator."""
83 
84  async def _fetch_data(self) -> PrinterStatus:
85  """Fetch the printer data."""
86  return await self.apiapi.get_status()
87 
88 
90  """Printer legacy update coordinator."""
91 
92  async def _fetch_data(self) -> LegacyPrinterStatus:
93  """Fetch the printer data."""
94  return await self.apiapi.get_legacy_printer()
95 
96 
98  """Job update coordinator."""
99 
100  async def _fetch_data(self) -> JobInfo:
101  """Fetch the printer data."""
102  return await self.apiapi.get_job()
103 
104 
106  """Info update coordinator."""
107 
108  async def _fetch_data(self) -> PrinterInfo:
109  """Fetch the printer data."""
110  return await self.apiapi.get_info()
dict[str, Any]|None get_info(HomeAssistant hass)
Definition: coordinator.py:69
def get_status(hass, host, port)
Definition: panel.py:387