1 """The QBittorrent coordinator."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from qbittorrentapi
import (
13 SyncMainDataDictionary,
16 from qbittorrentapi.torrents
import TorrentStatusesT
22 from .const
import DOMAIN
24 _LOGGER = logging.getLogger(__name__)
28 """Coordinator for updating QBittorrent data."""
30 def __init__(self, hass: HomeAssistant, client: Client) ->
None:
31 """Initialize coordinator."""
35 self.total_torrents: dict[str, int] = {}
36 self.active_torrents: dict[str, int] = {}
37 self.inactive_torrents: dict[str, int] = {}
38 self.paused_torrents: dict[str, int] = {}
39 self.seeding_torrents: dict[str, int] = {}
40 self.started_torrents: dict[str, int] = {}
51 data = await self.
hasshass.async_add_executor_job(self.
clientclient.sync_maindata)
53 await self.
hasshass.async_add_executor_job(
54 self.
clientclient.transfer_speed_limits_mode
58 except (LoginFailed, Forbidden403Error)
as exc:
60 translation_domain=DOMAIN, translation_key=
"login_error"
62 except APIConnectionError
as exc:
64 translation_domain=DOMAIN, translation_key=
"cannot_connect"
69 """Set the alternative speed mode."""
70 self.
clientclient.transfer_toggle_speed_limits_mode(is_enabled)
73 """Toggle the alternative speed mode."""
74 self.
clientclient.transfer_toggle_speed_limits_mode()
77 """Get the alternative speed mode."""
80 async
def get_torrents(self, torrent_filter: TorrentStatusesT) -> TorrentInfoList:
81 """Async method to get QBittorrent torrents."""
83 torrents = await self.
hasshass.async_add_executor_job(
84 lambda: self.
clientclient.torrents_info(torrent_filter)
86 except (LoginFailed, Forbidden403Error)
as exc:
88 translation_domain=DOMAIN, translation_key=
"login_error"
90 except APIConnectionError
as exc:
92 translation_domain=DOMAIN, translation_key=
"cannot_connect"
None toggle_alt_speed_enabled(self)
None set_alt_speed_enabled(self, bool is_enabled)
_is_alternative_mode_enabled
None __init__(self, HomeAssistant hass, Client client)
TorrentInfoList get_torrents(self, TorrentStatusesT torrent_filter)
SyncMainDataDictionary _async_update_data(self)
bool get_alt_speed_enabled(self)