1 """Data update coordinator for the Deluge integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
6 from ssl
import SSLError
7 from typing
import TYPE_CHECKING, Any
9 from deluge_client.client
import DelugeRPCClient, FailedToReconnectException
16 from .const
import LOGGER, DelugeGetSessionStatusKeys
19 from .
import DelugeConfigEntry
23 DataUpdateCoordinator[dict[Platform, dict[str, Any]]]
25 """Data update coordinator for the Deluge integration."""
27 config_entry: DelugeConfigEntry
30 self, hass: HomeAssistant, api: DelugeRPCClient, entry: DelugeConfigEntry
32 """Initialize the coordinator."""
43 """Get the latest data from Deluge and updates the state."""
46 _data = await self.
hasshass.async_add_executor_job(
48 "core.get_session_status",
49 [iter_member.value
for iter_member
in list(DelugeGetSessionStatusKeys)],
51 data[Platform.SENSOR] = {k.decode(): v
for k, v
in _data.items()}
52 data[Platform.SWITCH] = await self.
hasshass.async_add_executor_job(
53 self.
apiapi.call,
"core.get_torrents_status", {}, [
"paused"]
56 ConnectionRefusedError,
59 FailedToReconnectException,
61 raise UpdateFailed(f
"Connection to Deluge Daemon Lost: {ex}")
from ex
62 except Exception
as ex:
63 if type(ex).__name__ ==
"BadLoginError":
65 "Credentials for Deluge client are not valid"
67 LOGGER.error(
"Unknown error connecting to Deluge: %s", ex)
dict[Platform, dict[str, Any]] _async_update_data(self)
None __init__(self, HomeAssistant hass, DelugeRPCClient api, DelugeConfigEntry entry)