1 """A platform which allows you to get information from Tautulli."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from dataclasses
import dataclass
7 from typing
import cast
9 from pytautulli
import (
10 PyTautulliApiActivity,
11 PyTautulliApiHomeStats,
19 SensorEntityDescription,
29 from .
import TautulliConfigEntry
30 from .const
import ATTR_TOP_USER, DOMAIN
31 from .coordinator
import TautulliDataUpdateCoordinator
32 from .entity
import TautulliEntity
36 home_stats: PyTautulliApiHomeStats, activity: PyTautulliApiActivity, key: str
38 """Get top statistics."""
40 for stat
in home_stats:
41 if stat.rows
and stat.stat_id == key:
42 value = stat.rows[0].title
43 elif stat.rows
and stat.stat_id ==
"top_users" and key == ATTR_TOP_USER:
44 value = stat.rows[0].user
48 @dataclass(frozen=True, kw_only=True)
50 """Describes a Tautulli sensor."""
52 value_fn: Callable[[PyTautulliApiHomeStats, PyTautulliApiActivity, str], StateType]
55 SENSOR_TYPES: tuple[TautulliSensorEntityDescription, ...] = (
58 translation_key=
"watching_count",
59 native_unit_of_measurement=
"Watching",
60 value_fn=
lambda home_stats, activity, _: cast(int, activity.stream_count),
63 key=
"stream_count_direct_play",
64 translation_key=
"stream_count_direct_play",
65 entity_category=EntityCategory.DIAGNOSTIC,
66 native_unit_of_measurement=
"Streams",
67 entity_registry_enabled_default=
False,
68 value_fn=
lambda home_stats, activity, _: cast(
69 int, activity.stream_count_direct_play
73 key=
"stream_count_direct_stream",
74 translation_key=
"stream_count_direct_stream",
75 entity_category=EntityCategory.DIAGNOSTIC,
76 native_unit_of_measurement=
"Streams",
77 entity_registry_enabled_default=
False,
78 value_fn=
lambda home_stats, activity, _: cast(
79 int, activity.stream_count_direct_stream
83 key=
"stream_count_transcode",
84 translation_key=
"stream_count_transcode",
85 entity_category=EntityCategory.DIAGNOSTIC,
86 native_unit_of_measurement=
"Streams",
87 entity_registry_enabled_default=
False,
88 value_fn=
lambda home_stats, activity, _: cast(
89 int, activity.stream_count_transcode
93 key=
"total_bandwidth",
94 translation_key=
"total_bandwidth",
95 entity_category=EntityCategory.DIAGNOSTIC,
96 native_unit_of_measurement=UnitOfInformation.KILOBITS,
97 device_class=SensorDeviceClass.DATA_SIZE,
98 state_class=SensorStateClass.MEASUREMENT,
99 value_fn=
lambda home_stats, activity, _: cast(int, activity.total_bandwidth),
103 translation_key=
"lan_bandwidth",
104 entity_category=EntityCategory.DIAGNOSTIC,
105 native_unit_of_measurement=UnitOfInformation.KILOBITS,
106 device_class=SensorDeviceClass.DATA_SIZE,
107 entity_registry_enabled_default=
False,
108 state_class=SensorStateClass.MEASUREMENT,
109 value_fn=
lambda home_stats, activity, _: cast(int, activity.lan_bandwidth),
113 translation_key=
"wan_bandwidth",
114 entity_category=EntityCategory.DIAGNOSTIC,
115 native_unit_of_measurement=UnitOfInformation.KILOBITS,
116 device_class=SensorDeviceClass.DATA_SIZE,
117 entity_registry_enabled_default=
False,
118 state_class=SensorStateClass.MEASUREMENT,
119 value_fn=
lambda home_stats, activity, _: cast(int, activity.wan_bandwidth),
123 translation_key=
"top_movies",
124 entity_registry_enabled_default=
False,
125 value_fn=get_top_stats,
129 translation_key=
"top_tv",
130 entity_registry_enabled_default=
False,
131 value_fn=get_top_stats,
135 translation_key=
"top_user",
136 entity_registry_enabled_default=
False,
137 value_fn=get_top_stats,
142 @dataclass(frozen=True, kw_only=True)
144 """Describes a Tautulli session sensor."""
146 value_fn: Callable[[PyTautulliApiSession], StateType]
149 SESSION_SENSOR_TYPES: tuple[TautulliSessionSensorEntityDescription, ...] = (
152 translation_key=
"state",
153 value_fn=
lambda session: cast(str, session.state),
157 translation_key=
"full_title",
158 entity_registry_enabled_default=
False,
159 value_fn=
lambda session: cast(str, session.full_title),
163 translation_key=
"progress",
164 native_unit_of_measurement=PERCENTAGE,
165 entity_registry_enabled_default=
False,
166 value_fn=
lambda session: cast(str, session.progress_percent),
169 key=
"stream_resolution",
170 translation_key=
"stream_resolution",
171 entity_category=EntityCategory.DIAGNOSTIC,
172 entity_registry_enabled_default=
False,
173 value_fn=
lambda session: cast(str, session.stream_video_resolution),
176 key=
"transcode_decision",
177 translation_key=
"transcode_decision",
178 entity_category=EntityCategory.DIAGNOSTIC,
179 entity_registry_enabled_default=
False,
180 value_fn=
lambda session: cast(str, session.transcode_decision),
184 translation_key=
"session_thumb",
185 entity_category=EntityCategory.DIAGNOSTIC,
186 entity_registry_enabled_default=
False,
187 value_fn=
lambda session: cast(str, session.user_thumb),
190 key=
"video_resolution",
191 translation_key=
"video_resolution",
192 entity_category=EntityCategory.DIAGNOSTIC,
193 entity_registry_enabled_default=
False,
194 value_fn=
lambda session: cast(str, session.video_resolution),
202 async_add_entities: AddEntitiesCallback,
203 discovery_info: DiscoveryInfoType |
None =
None,
205 """Create the Tautulli sensor."""
206 hass.async_create_task(
207 hass.config_entries.flow.async_init(
208 DOMAIN, context={
"source": SOURCE_IMPORT}, data=config
215 entry: TautulliConfigEntry,
216 async_add_entities: AddEntitiesCallback,
218 """Set up Tautulli sensor."""
219 data = entry.runtime_data
220 entities: list[TautulliSensor | TautulliSessionSensor] = [
225 for description
in SENSOR_TYPES
234 for description
in SESSION_SENSOR_TYPES
235 for user
in data.users
236 if user.username !=
"Local"
242 """Representation of a Tautulli sensor."""
244 entity_description: TautulliSensorEntityDescription
248 """Return the state of the sensor."""
250 self.coordinator.home_stats,
251 self.coordinator.activity,
257 """Representation of a Tautulli session sensor."""
259 entity_description: TautulliSessionSensorEntityDescription
263 coordinator: TautulliDataUpdateCoordinator,
264 description: EntityDescription,
265 user: PyTautulliApiUser,
267 """Initialize the Tautulli entity."""
268 super().
__init__(coordinator, description, user)
269 entry_id = coordinator.config_entry.entry_id
274 """Return the state of the sensor."""
275 if self.coordinator.activity:
276 for session
in self.coordinator.activity.sessions:
277 if self.
useruser
and session.user_id == self.
useruser.user_id:
StateType native_value(self)
StateType native_value(self)
None __init__(self, TautulliDataUpdateCoordinator coordinator, EntityDescription description, PyTautulliApiUser user)
None async_setup_entry(HomeAssistant hass, TautulliConfigEntry entry, AddEntitiesCallback async_add_entities)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
str|None get_top_stats(PyTautulliApiHomeStats home_stats, PyTautulliApiActivity activity, str key)