Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the Lidarr integration."""
2 
3 from __future__ import annotations
4 
5 from abc import ABC, abstractmethod
6 from datetime import timedelta
7 from typing import Generic, TypeVar, cast
8 
9 from aiopyarr import LidarrAlbum, LidarrQueue, LidarrRootFolder, exceptions
10 from aiopyarr.lidarr_client import LidarrClient
11 from aiopyarr.models.host_configuration import PyArrHostConfiguration
12 
13 from homeassistant.config_entries import ConfigEntry
14 from homeassistant.core import HomeAssistant
15 from homeassistant.exceptions import ConfigEntryAuthFailed
16 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
17 
18 from .const import DEFAULT_MAX_RECORDS, DOMAIN, LOGGER
19 
20 T = TypeVar("T", bound=list[LidarrRootFolder] | LidarrQueue | str | LidarrAlbum | int)
21 
22 
24  """Data update coordinator for the Lidarr integration."""
25 
26  config_entry: ConfigEntry
27 
28  def __init__(
29  self,
30  hass: HomeAssistant,
31  host_configuration: PyArrHostConfiguration,
32  api_client: LidarrClient,
33  ) -> None:
34  """Initialize the coordinator."""
35  super().__init__(
36  hass=hass,
37  logger=LOGGER,
38  name=DOMAIN,
39  update_interval=timedelta(seconds=30),
40  )
41  self.api_clientapi_client = api_client
42  self.host_configurationhost_configuration = host_configuration
43 
44  async def _async_update_data(self) -> T:
45  """Get the latest data from Lidarr."""
46  try:
47  return await self._fetch_data_fetch_data()
48 
49  except exceptions.ArrConnectionException as ex:
50  raise UpdateFailed(ex) from ex
51  except exceptions.ArrAuthenticationException as ex:
53  "API Key is no longer valid. Please reauthenticate"
54  ) from ex
55 
56  @abstractmethod
57  async def _fetch_data(self) -> T:
58  """Fetch the actual data."""
59  raise NotImplementedError
60 
61 
63  LidarrDataUpdateCoordinator[list[LidarrRootFolder]]
64 ):
65  """Disk space update coordinator for Lidarr."""
66 
67  async def _fetch_data(self) -> list[LidarrRootFolder]:
68  """Fetch the data."""
69  return cast(
70  list[LidarrRootFolder], await self.api_clientapi_client.async_get_root_folders()
71  )
72 
73 
75  """Queue update coordinator."""
76 
77  async def _fetch_data(self) -> LidarrQueue:
78  """Fetch the album count in queue."""
79  return await self.api_clientapi_client.async_get_queue(page_size=DEFAULT_MAX_RECORDS)
80 
81 
83  """Status update coordinator for Lidarr."""
84 
85  async def _fetch_data(self) -> str:
86  """Fetch the data."""
87  return (await self.api_clientapi_client.async_get_system_status()).version
88 
89 
91  """Wanted update coordinator."""
92 
93  async def _fetch_data(self) -> LidarrAlbum:
94  """Fetch the wanted data."""
95  return cast(
96  LidarrAlbum,
97  await self.api_clientapi_client.async_get_wanted(page_size=DEFAULT_MAX_RECORDS),
98  )
99 
100 
102  """Albums update coordinator."""
103 
104  async def _fetch_data(self) -> int:
105  """Fetch the album data."""
106  return len(cast(list[LidarrAlbum], await self.api_clientapi_client.async_get_albums()))
None __init__(self, HomeAssistant hass, PyArrHostConfiguration host_configuration, LidarrClient api_client)
Definition: coordinator.py:33