1 """Support for monitoring the Deluge BitTorrent client API."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from dataclasses
import dataclass
12 SensorEntityDescription,
20 from .
import DelugeConfigEntry
21 from .const
import DelugeGetSessionStatusKeys, DelugeSensorType
22 from .coordinator
import DelugeDataUpdateCoordinator
23 from .entity
import DelugeEntity
26 def get_state(data: dict[str, float], key: str) -> str | float:
27 """Get current download/upload state."""
28 upload = data[DelugeGetSessionStatusKeys.UPLOAD_RATE.value]
29 download = data[DelugeGetSessionStatusKeys.DOWNLOAD_RATE.value]
30 protocol_upload = data[DelugeGetSessionStatusKeys.DHT_UPLOAD_RATE.value]
31 protocol_download = data[DelugeGetSessionStatusKeys.DHT_DOWNLOAD_RATE.value]
34 if key == DelugeSensorType.CURRENT_STATUS_SENSOR:
35 if upload > 0
and download > 0:
36 return "seeding_and_downloading"
37 if upload > 0
and download == 0:
39 if upload == 0
and download > 0:
45 if key == DelugeSensorType.DOWNLOAD_SPEED_SENSOR:
47 elif key == DelugeSensorType.UPLOAD_SPEED_SENSOR:
49 elif key == DelugeSensorType.PROTOCOL_TRAFFIC_DOWNLOAD_SPEED_SENSOR:
50 rate = protocol_download
52 rate = protocol_upload
56 return round(kb_spd, 2
if kb_spd < 0.1
else 1)
59 @dataclass(frozen=True)
61 """Class to describe a Deluge sensor."""
63 value: Callable[[dict[str, float]], Any] =
lambda val: val
66 SENSOR_TYPES: tuple[DelugeSensorEntityDescription, ...] = (
68 key=DelugeSensorType.CURRENT_STATUS_SENSOR.value,
69 translation_key=
"status",
71 data, DelugeSensorType.CURRENT_STATUS_SENSOR.value
73 device_class=SensorDeviceClass.ENUM,
74 options=[
"seeding_and_downloading",
"seeding",
"downloading",
"idle"],
77 key=DelugeSensorType.DOWNLOAD_SPEED_SENSOR.value,
78 translation_key=DelugeSensorType.DOWNLOAD_SPEED_SENSOR.value,
79 device_class=SensorDeviceClass.DATA_RATE,
80 native_unit_of_measurement=UnitOfDataRate.KILOBYTES_PER_SECOND,
81 state_class=SensorStateClass.MEASUREMENT,
83 data, DelugeSensorType.DOWNLOAD_SPEED_SENSOR.value
87 key=DelugeSensorType.UPLOAD_SPEED_SENSOR.value,
88 translation_key=DelugeSensorType.UPLOAD_SPEED_SENSOR.value,
89 device_class=SensorDeviceClass.DATA_RATE,
90 native_unit_of_measurement=UnitOfDataRate.KILOBYTES_PER_SECOND,
91 state_class=SensorStateClass.MEASUREMENT,
92 value=
lambda data:
get_state(data, DelugeSensorType.UPLOAD_SPEED_SENSOR.value),
95 key=DelugeSensorType.PROTOCOL_TRAFFIC_UPLOAD_SPEED_SENSOR.value,
96 translation_key=DelugeSensorType.PROTOCOL_TRAFFIC_UPLOAD_SPEED_SENSOR.value,
97 device_class=SensorDeviceClass.DATA_RATE,
98 native_unit_of_measurement=UnitOfDataRate.KILOBYTES_PER_SECOND,
99 state_class=SensorStateClass.MEASUREMENT,
101 data, DelugeSensorType.PROTOCOL_TRAFFIC_UPLOAD_SPEED_SENSOR.value
105 key=DelugeSensorType.PROTOCOL_TRAFFIC_DOWNLOAD_SPEED_SENSOR.value,
106 translation_key=DelugeSensorType.PROTOCOL_TRAFFIC_DOWNLOAD_SPEED_SENSOR.value,
107 device_class=SensorDeviceClass.DATA_RATE,
108 native_unit_of_measurement=UnitOfDataRate.KILOBYTES_PER_SECOND,
109 state_class=SensorStateClass.MEASUREMENT,
111 data, DelugeSensorType.PROTOCOL_TRAFFIC_DOWNLOAD_SPEED_SENSOR.value
119 entry: DelugeConfigEntry,
120 async_add_entities: AddEntitiesCallback,
122 """Set up the Deluge sensor."""
124 DelugeSensor(entry.runtime_data, description)
for description
in SENSOR_TYPES
129 """Representation of a Deluge sensor."""
131 entity_description: DelugeSensorEntityDescription
135 coordinator: DelugeDataUpdateCoordinator,
136 description: DelugeSensorEntityDescription,
138 """Initialize the sensor."""
141 self.
_attr_unique_id_attr_unique_id = f
"{coordinator.config_entry.entry_id}_{description.key}"
145 """Return the state of the sensor."""
146 return self.
entity_descriptionentity_description.value(self.coordinator.data[Platform.SENSOR])
StateType native_value(self)
None __init__(self, DelugeDataUpdateCoordinator coordinator, DelugeSensorEntityDescription description)
str|float get_state(dict[str, float] data, str key)
None async_setup_entry(HomeAssistant hass, DelugeConfigEntry entry, AddEntitiesCallback async_add_entities)