Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the Deluge integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from ssl import SSLError
7 from typing import TYPE_CHECKING, Any
8 
9 from deluge_client.client import DelugeRPCClient, FailedToReconnectException
10 
11 from homeassistant.const import Platform
12 from homeassistant.core import HomeAssistant
13 from homeassistant.exceptions import ConfigEntryAuthFailed
14 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
15 
16 from .const import LOGGER, DelugeGetSessionStatusKeys
17 
18 if TYPE_CHECKING:
19  from . import DelugeConfigEntry
20 
21 
23  DataUpdateCoordinator[dict[Platform, dict[str, Any]]]
24 ):
25  """Data update coordinator for the Deluge integration."""
26 
27  config_entry: DelugeConfigEntry
28 
29  def __init__(
30  self, hass: HomeAssistant, api: DelugeRPCClient, entry: DelugeConfigEntry
31  ) -> None:
32  """Initialize the coordinator."""
33  super().__init__(
34  hass=hass,
35  logger=LOGGER,
36  name=entry.title,
37  update_interval=timedelta(seconds=30),
38  )
39  self.apiapi = api
40  self.config_entryconfig_entryconfig_entry = entry
41 
42  async def _async_update_data(self) -> dict[Platform, dict[str, Any]]:
43  """Get the latest data from Deluge and updates the state."""
44  data = {}
45  try:
46  _data = await self.hasshass.async_add_executor_job(
47  self.apiapi.call,
48  "core.get_session_status",
49  [iter_member.value for iter_member in list(DelugeGetSessionStatusKeys)],
50  )
51  data[Platform.SENSOR] = {k.decode(): v for k, v in _data.items()}
52  data[Platform.SWITCH] = await self.hasshass.async_add_executor_job(
53  self.apiapi.call, "core.get_torrents_status", {}, ["paused"]
54  )
55  except (
56  ConnectionRefusedError,
57  TimeoutError,
58  SSLError,
59  FailedToReconnectException,
60  ) as ex:
61  raise UpdateFailed(f"Connection to Deluge Daemon Lost: {ex}") from ex
62  except Exception as ex:
63  if type(ex).__name__ == "BadLoginError":
65  "Credentials for Deluge client are not valid"
66  ) from ex
67  LOGGER.error("Unknown error connecting to Deluge: %s", ex)
68  raise
69  return data
None __init__(self, HomeAssistant hass, DelugeRPCClient api, DelugeConfigEntry entry)
Definition: coordinator.py:31