1 """Data update coordinator for the Jellyfin integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from jellyfin_apiclient_python
import JellyfinClient
14 from .const
import CONF_CLIENT_DEVICE_ID, DOMAIN, LOGGER, USER_APP_NAME
18 """Data update coordinator for the Jellyfin integration."""
20 config_entry: ConfigEntry
25 api_client: JellyfinClient,
26 system_info: dict[str, Any],
29 """Initialize the coordinator."""
37 self.server_id: str = system_info[
"Id"]
38 self.server_name: str = system_info[
"Name"]
39 self.server_version: str |
None = system_info.get(
"Version")
40 self.client_device_id: str = self.
config_entryconfig_entry.data[CONF_CLIENT_DEVICE_ID]
41 self.user_id: str = user_id
43 self.session_ids: set[str] = set()
44 self.remote_session_ids: set[str] = set()
48 """Get the latest data from Jellyfin."""
49 sessions = await self.
hasshass.async_add_executor_job(
53 sessions_by_id: dict[str, dict[str, Any]] = {
54 session[
"Id"]: session
55 for session
in sessions
56 if session[
"DeviceId"] != self.client_device_id
57 and session[
"Client"] != USER_APP_NAME
60 self.
device_idsdevice_ids = {session[
"DeviceId"]
for session
in sessions_by_id.values()}
None __init__(self, HomeAssistant hass, JellyfinClient api_client, dict[str, Any] system_info, str user_id)
dict[str, dict[str, Any]] _async_update_data(self)