Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the Jellyfin integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from typing import Any
7 
8 from jellyfin_apiclient_python import JellyfinClient
9 
10 from homeassistant.config_entries import ConfigEntry
11 from homeassistant.core import HomeAssistant
12 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
13 
14 from .const import CONF_CLIENT_DEVICE_ID, DOMAIN, LOGGER, USER_APP_NAME
15 
16 
17 class JellyfinDataUpdateCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
18  """Data update coordinator for the Jellyfin integration."""
19 
20  config_entry: ConfigEntry
21 
22  def __init__(
23  self,
24  hass: HomeAssistant,
25  api_client: JellyfinClient,
26  system_info: dict[str, Any],
27  user_id: str,
28  ) -> None:
29  """Initialize the coordinator."""
30  super().__init__(
31  hass=hass,
32  logger=LOGGER,
33  name=DOMAIN,
34  update_interval=timedelta(seconds=10),
35  )
36  self.api_clientapi_client = api_client
37  self.server_id: str = system_info["Id"]
38  self.server_name: str = system_info["Name"]
39  self.server_version: str | None = system_info.get("Version")
40  self.client_device_id: str = self.config_entryconfig_entry.data[CONF_CLIENT_DEVICE_ID]
41  self.user_id: str = user_id
42 
43  self.session_ids: set[str] = set()
44  self.remote_session_ids: set[str] = set()
45  self.device_idsdevice_ids: set[str] = set()
46 
47  async def _async_update_data(self) -> dict[str, dict[str, Any]]:
48  """Get the latest data from Jellyfin."""
49  sessions = await self.hasshass.async_add_executor_job(
50  self.api_clientapi_client.jellyfin.sessions
51  )
52 
53  sessions_by_id: dict[str, dict[str, Any]] = {
54  session["Id"]: session
55  for session in sessions
56  if session["DeviceId"] != self.client_device_id
57  and session["Client"] != USER_APP_NAME
58  }
59 
60  self.device_idsdevice_ids = {session["DeviceId"] for session in sessions_by_id.values()}
61 
62  return sessions_by_id
None __init__(self, HomeAssistant hass, JellyfinClient api_client, dict[str, Any] system_info, str user_id)
Definition: coordinator.py:28