1 """Data update coordinator for the Radarr integration."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
7 from dataclasses
import dataclass
8 from datetime
import date, datetime, timedelta
9 from typing
import TYPE_CHECKING, Generic, TypeVar, cast
11 from aiopyarr
import (
19 from aiopyarr.models.host_configuration
import PyArrHostConfiguration
20 from aiopyarr.radarr_client
import RadarrClient
27 from .const
import DEFAULT_MAX_RECORDS, DOMAIN, LOGGER
30 from .
import RadarrConfigEntry
32 T = TypeVar(
"T", bound=SystemStatus | list[RootFolder] | list[Health] | int |
None)
37 """Mixin for Radarr calendar event."""
44 """A class to describe a Radarr calendar event."""
48 """Data update coordinator for the Radarr integration."""
50 config_entry: RadarrConfigEntry
56 host_configuration: PyArrHostConfiguration,
57 api_client: RadarrClient,
59 """Initialize the coordinator."""
70 """Get the latest data from Radarr."""
74 except exceptions.ArrConnectionException
as ex:
76 except exceptions.ArrAuthenticationException
as ex:
78 "API Key is no longer valid. Please reauthenticate"
83 """Fetch the actual data."""
84 raise NotImplementedError
88 """Status update coordinator for Radarr."""
92 return await self.
api_clientapi_client.async_get_system_status()
96 """Disk space update coordinator for Radarr."""
100 root_folders = await self.
api_clientapi_client.async_get_root_folders()
101 if isinstance(root_folders, RootFolder):
102 return [root_folders]
107 """Health update coordinator."""
110 """Fetch the health data."""
111 health = await self.
api_clientapi_client.async_get_failed_health_checks()
112 if isinstance(health, Health):
118 """Movies update coordinator."""
121 """Fetch the movies data."""
122 return len(cast(list[RadarrMovie], await self.
api_clientapi_client.async_get_movies()))
126 """Queue update coordinator."""
129 """Fetch the movies in queue."""
131 await self.
api_clientapi_client.async_get_queue(page_size=DEFAULT_MAX_RECORDS)
136 """Calendar update coordinator."""
143 host_configuration: PyArrHostConfiguration,
144 api_client: RadarrClient,
147 super().
__init__(hass, host_configuration, api_client)
148 self.
eventevent: RadarrEvent |
None =
None
149 self.
_events_events: list[RadarrEvent] = []
152 """Fetch the calendar."""
154 _date = datetime.today()
155 while self.
eventevent
is None:
157 for event
in self.
_events_events:
158 if event.start >= _date.date():
159 self.
eventevent = event
162 if (_date - datetime.today()).days > 45:
167 self, start_date: datetime, end_date: datetime
168 ) -> list[RadarrEvent]:
169 """Get cached events and request missing dates."""
176 _days = (end_date - start_date).days
177 await asyncio.gather(
180 for d
in ((start_date +
timedelta(days=x)).
date()
for x
in range(_days))
181 if d
not in (event.start
for event
in self.
_events_events)
187 """Return events from specified date."""
190 for evt
in await self.
api_clientapi_client.async_get_calendar(
191 start_date=_date, end_date=_date +
timedelta(days=1)
193 if evt.title
not in (e.summary
for e
in self.
_events_events)
198 """Return a RadarrEvent from an API event."""
199 _date, _type = event.releaseDateType()
204 description=event.overview.replace(
":",
";"),
None __init__(self, HomeAssistant hass, PyArrHostConfiguration host_configuration, RadarrClient api_client)
list[RadarrEvent] async_get_events(self, datetime start_date, datetime end_date)
None _async_get_events(self, date _date)
list[RootFolder] _fetch_data(self)
list[Health] _fetch_data(self)
None __init__(self, HomeAssistant hass, PyArrHostConfiguration host_configuration, RadarrClient api_client)
T _async_update_data(self)
SystemStatus _fetch_data(self)
RadarrEvent _get_calendar_event(RadarrCalendarItem event)