Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the Sonarr integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from typing import TypeVar, cast
7 
8 from aiopyarr import (
9  Command,
10  Diskspace,
11  SonarrCalendar,
12  SonarrQueue,
13  SonarrSeries,
14  SonarrWantedMissing,
15  SystemStatus,
16  exceptions,
17 )
18 from aiopyarr.models.host_configuration import PyArrHostConfiguration
19 from aiopyarr.sonarr_client import SonarrClient
20 
21 from homeassistant.config_entries import ConfigEntry
22 from homeassistant.core import HomeAssistant
23 from homeassistant.exceptions import ConfigEntryAuthFailed
24 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
25 import homeassistant.util.dt as dt_util
26 
27 from .const import CONF_UPCOMING_DAYS, CONF_WANTED_MAX_ITEMS, DOMAIN, LOGGER
28 
29 SonarrDataT = TypeVar(
30  "SonarrDataT",
31  bound=(
32  list[SonarrCalendar]
33  | list[Command]
34  | list[Diskspace]
35  | SonarrQueue
36  | list[SonarrSeries]
37  | SystemStatus
38  | SonarrWantedMissing
39  ),
40 )
41 
42 
44  """Data update coordinator for the Sonarr integration."""
45 
46  config_entry: ConfigEntry
47 
48  def __init__(
49  self,
50  hass: HomeAssistant,
51  host_configuration: PyArrHostConfiguration,
52  api_client: SonarrClient,
53  ) -> None:
54  """Initialize the coordinator."""
55  super().__init__(
56  hass=hass,
57  logger=LOGGER,
58  name=DOMAIN,
59  update_interval=timedelta(seconds=30),
60  )
61  self.api_clientapi_client = api_client
62  self.host_configurationhost_configuration = host_configuration
63  self.system_version: str | None = None
64 
65  async def _async_update_data(self) -> SonarrDataT:
66  """Get the latest data from Sonarr."""
67  try:
68  return await self._fetch_data_fetch_data()
69 
70  except exceptions.ArrConnectionException as ex:
71  raise UpdateFailed(ex) from ex
72  except exceptions.ArrAuthenticationException as ex:
74  "API Key is no longer valid. Please reauthenticate"
75  ) from ex
76 
77  async def _fetch_data(self) -> SonarrDataT:
78  """Fetch the actual data."""
79  raise NotImplementedError
80 
81 
83  """Calendar update coordinator."""
84 
85  async def _fetch_data(self) -> list[SonarrCalendar]:
86  """Fetch the movies data."""
87  local = dt_util.start_of_local_day().replace(microsecond=0)
88  start = dt_util.as_utc(local)
89  end = start + timedelta(days=self.config_entryconfig_entry.options[CONF_UPCOMING_DAYS])
90  return cast(
91  list[SonarrCalendar],
92  await self.api_clientapi_client.async_get_calendar(
93  start_date=start, end_date=end, include_series=True
94  ),
95  )
96 
97 
99  """Commands update coordinator for Sonarr."""
100 
101  async def _fetch_data(self) -> list[Command]:
102  """Fetch the data."""
103  return cast(list[Command], await self.api_clientapi_client.async_get_commands())
104 
105 
107  """Disk space update coordinator for Sonarr."""
108 
109  async def _fetch_data(self) -> list[Diskspace]:
110  """Fetch the data."""
111  return await self.api_clientapi_client.async_get_diskspace()
112 
113 
115  """Queue update coordinator."""
116 
117  async def _fetch_data(self) -> SonarrQueue:
118  """Fetch the data."""
119  return await self.api_clientapi_client.async_get_queue(
120  include_series=True, include_episode=True
121  )
122 
123 
125  """Series update coordinator."""
126 
127  async def _fetch_data(self) -> list[SonarrSeries]:
128  """Fetch the data."""
129  return cast(list[SonarrSeries], await self.api_clientapi_client.async_get_series())
130 
131 
133  """Status update coordinator for Sonarr."""
134 
135  async def _fetch_data(self) -> SystemStatus:
136  """Fetch the data."""
137  return await self.api_clientapi_client.async_get_system_status()
138 
139 
141  """Wanted update coordinator."""
142 
143  async def _fetch_data(self) -> SonarrWantedMissing:
144  """Fetch the data."""
145  return await self.api_clientapi_client.async_get_wanted(
146  page_size=self.config_entryconfig_entry.options[CONF_WANTED_MAX_ITEMS],
147  include_series=True,
148  )
None __init__(self, HomeAssistant hass, PyArrHostConfiguration host_configuration, SonarrClient api_client)
Definition: coordinator.py:53