1 """Provides the NZBGet DataUpdateCoordinator."""
4 from collections.abc
import Mapping
5 from datetime
import timedelta
9 from pynzbgetapi
import NZBGetAPI, NZBGetAPIException
22 from .const
import DOMAIN
24 _LOGGER = logging.getLogger(__name__)
28 """Class to manage fetching NZBGet data."""
34 config: Mapping[str, Any],
36 """Initialize global NZBGet data updater."""
39 config.get(CONF_USERNAME),
40 config.get(CONF_PASSWORD),
42 config[CONF_VERIFY_SSL],
50 hass, _LOGGER, name=DOMAIN, update_interval=
timedelta(seconds=5)
54 """Check history for newly completed downloads."""
55 actual_completed_downloads = {
56 (x[
"Name"], x[
"Category"], x[
"Status"])
for x
in history
60 tmp_completed_downloads =
list(
64 for download
in tmp_completed_downloads:
65 self.
hasshass.bus.fire(
66 "nzbget_download_complete",
69 "category": download[1],
70 "status": download[2],
78 """Fetch data from NZBGet."""
81 """Fetch data from NZBGet via sync functions."""
82 status = self.
nzbgetnzbget.status()
83 history = self.
nzbgetnzbget.history()
93 async
with asyncio.timeout(4):
94 return await self.
hasshass.async_add_executor_job(_update_data)
95 except NZBGetAPIException
as error:
96 raise UpdateFailed(f
"Invalid response from API: {error}")
from error
None __init__(self, HomeAssistant hass, *Mapping[str, Any] config)
dict _async_update_data(self)
_completed_downloads_init
def _check_completed_downloads(self, history)
_ItemT _update_data(self, _ItemT item, dict update_data)