Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for transmssion integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 import transmission_rpc
9 from transmission_rpc.session import SessionStats
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.const import CONF_HOST
13 from homeassistant.core import HomeAssistant
14 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
15 
16 from .const import (
17  CONF_LIMIT,
18  CONF_ORDER,
19  DEFAULT_LIMIT,
20  DEFAULT_ORDER,
21  DEFAULT_SCAN_INTERVAL,
22  DOMAIN,
23  EVENT_DOWNLOADED_TORRENT,
24  EVENT_REMOVED_TORRENT,
25  EVENT_STARTED_TORRENT,
26 )
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 
32  """Transmission dataupdate coordinator class."""
33 
34  config_entry: ConfigEntry
35 
36  def __init__(
37  self, hass: HomeAssistant, entry: ConfigEntry, api: transmission_rpc.Client
38  ) -> None:
39  """Initialize the Transmission RPC API."""
40  self.config_entryconfig_entryconfig_entry = entry
41  self.apiapi = api
42  self.hosthost = entry.data[CONF_HOST]
43  self._session_session: transmission_rpc.Session | None = None
44  self._all_torrents_all_torrents: list[transmission_rpc.Torrent] = []
45  self._completed_torrents_completed_torrents: list[transmission_rpc.Torrent] = []
46  self._started_torrents_started_torrents: list[transmission_rpc.Torrent] = []
47  self.torrentstorrents: list[transmission_rpc.Torrent] = []
48  super().__init__(
49  hass,
50  name=f"{DOMAIN} - {self.host}",
51  logger=_LOGGER,
52  update_interval=timedelta(seconds=DEFAULT_SCAN_INTERVAL),
53  )
54 
55  @property
56  def limit(self) -> int:
57  """Return limit."""
58  return self.config_entryconfig_entryconfig_entry.options.get(CONF_LIMIT, DEFAULT_LIMIT)
59 
60  @property
61  def order(self) -> str:
62  """Return order."""
63  return self.config_entryconfig_entryconfig_entry.options.get(CONF_ORDER, DEFAULT_ORDER)
64 
65  async def _async_update_data(self) -> SessionStats:
66  """Update transmission data."""
67  return await self.hasshass.async_add_executor_job(self.updateupdate)
68 
69  def update(self) -> SessionStats:
70  """Get the latest data from Transmission instance."""
71  try:
72  data = self.apiapi.session_stats()
73  self.torrentstorrents = self.apiapi.get_torrents()
74  self._session_session = self.apiapi.get_session()
75  except transmission_rpc.TransmissionError as err:
76  raise UpdateFailed("Unable to connect to Transmission client") from err
77 
78  self.check_completed_torrentcheck_completed_torrent()
79  self.check_started_torrentcheck_started_torrent()
80  self.check_removed_torrentcheck_removed_torrent()
81 
82  return data
83 
84  def init_torrent_list(self) -> None:
85  """Initialize torrent lists."""
86  self.torrentstorrents = self.apiapi.get_torrents()
87  self._completed_torrents_completed_torrents = [
88  torrent for torrent in self.torrentstorrents if torrent.status == "seeding"
89  ]
90  self._started_torrents_started_torrents = [
91  torrent for torrent in self.torrentstorrents if torrent.status == "downloading"
92  ]
93 
94  def check_completed_torrent(self) -> None:
95  """Get completed torrent functionality."""
96  old_completed_torrents = {torrent.id for torrent in self._completed_torrents_completed_torrents}
97 
98  current_completed_torrents = [
99  torrent for torrent in self.torrentstorrents if torrent.status == "seeding"
100  ]
101 
102  for torrent in current_completed_torrents:
103  if torrent.id not in old_completed_torrents:
104  self.hasshass.bus.fire(
105  EVENT_DOWNLOADED_TORRENT, {"name": torrent.name, "id": torrent.id}
106  )
107 
108  self._completed_torrents_completed_torrents = current_completed_torrents
109 
110  def check_started_torrent(self) -> None:
111  """Get started torrent functionality."""
112  old_started_torrents = {torrent.id for torrent in self._started_torrents_started_torrents}
113 
114  current_started_torrents = [
115  torrent for torrent in self.torrentstorrents if torrent.status == "downloading"
116  ]
117 
118  for torrent in current_started_torrents:
119  if torrent.id not in old_started_torrents:
120  self.hasshass.bus.fire(
121  EVENT_STARTED_TORRENT, {"name": torrent.name, "id": torrent.id}
122  )
123 
124  self._started_torrents_started_torrents = current_started_torrents
125 
126  def check_removed_torrent(self) -> None:
127  """Get removed torrent functionality."""
128  current_torrents = {torrent.id for torrent in self.torrentstorrents}
129 
130  for torrent in self._all_torrents_all_torrents:
131  if torrent.id not in current_torrents:
132  self.hasshass.bus.fire(
133  EVENT_REMOVED_TORRENT, {"name": torrent.name, "id": torrent.id}
134  )
135 
136  self._all_torrents_all_torrents = self.torrentstorrents.copy()
137 
138  def start_torrents(self) -> None:
139  """Start all torrents."""
140  if not self.torrentstorrents:
141  return
142  self.apiapi.start_all()
143 
144  def stop_torrents(self) -> None:
145  """Stop all active torrents."""
146  if not self.torrentstorrents:
147  return
148  torrent_ids: list[int | str] = [torrent.id for torrent in self.torrentstorrents]
149  self.apiapi.stop_torrent(torrent_ids)
150 
151  def set_alt_speed_enabled(self, is_enabled: bool) -> None:
152  """Set the alternative speed flag."""
153  self.apiapi.set_session(alt_speed_enabled=is_enabled)
154 
155  def get_alt_speed_enabled(self) -> bool | None:
156  """Get the alternative speed flag."""
157  if self._session_session is None:
158  return None
159 
160  return self._session_session.alt_speed_enabled
None __init__(self, HomeAssistant hass, ConfigEntry entry, transmission_rpc.Client api)
Definition: coordinator.py:38