Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for the YouTube integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from typing import Any
7 
8 from youtubeaio.helper import first
9 from youtubeaio.types import UnauthorizedError, YouTubeBackendError
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.const import ATTR_ICON, ATTR_ID
13 from homeassistant.core import HomeAssistant
14 from homeassistant.exceptions import ConfigEntryAuthFailed
15 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
16 
17 from . import AsyncConfigEntryAuth
18 from .const import (
19  ATTR_DESCRIPTION,
20  ATTR_LATEST_VIDEO,
21  ATTR_PUBLISHED_AT,
22  ATTR_SUBSCRIBER_COUNT,
23  ATTR_THUMBNAIL,
24  ATTR_TITLE,
25  ATTR_TOTAL_VIEWS,
26  ATTR_VIDEO_ID,
27  CONF_CHANNELS,
28  DOMAIN,
29  LOGGER,
30 )
31 
32 
34  """A YouTube Data Update Coordinator."""
35 
36  config_entry: ConfigEntry
37 
38  def __init__(self, hass: HomeAssistant, auth: AsyncConfigEntryAuth) -> None:
39  """Initialize the YouTube data coordinator."""
40  self._auth_auth = auth
41  super().__init__(
42  hass,
43  LOGGER,
44  name=DOMAIN,
45  update_interval=timedelta(minutes=15),
46  )
47 
48  async def _async_update_data(self) -> dict[str, Any]:
49  youtube = await self._auth_auth.get_resource()
50  res = {}
51  channel_ids = self.config_entryconfig_entry.options[CONF_CHANNELS]
52  try:
53  async for channel in youtube.get_channels(channel_ids):
54  video = await first(
55  youtube.get_playlist_items(channel.upload_playlist_id, 1)
56  )
57  latest_video = None
58  if video:
59  latest_video = {
60  ATTR_PUBLISHED_AT: video.snippet.added_at,
61  ATTR_TITLE: video.snippet.title,
62  ATTR_DESCRIPTION: video.snippet.description,
63  ATTR_THUMBNAIL: video.snippet.thumbnails.get_highest_quality().url,
64  ATTR_VIDEO_ID: video.content_details.video_id,
65  }
66  res[channel.channel_id] = {
67  ATTR_ID: channel.channel_id,
68  ATTR_TITLE: channel.snippet.title,
69  ATTR_ICON: channel.snippet.thumbnails.get_highest_quality().url,
70  ATTR_LATEST_VIDEO: latest_video,
71  ATTR_SUBSCRIBER_COUNT: channel.statistics.subscriber_count,
72  ATTR_TOTAL_VIEWS: channel.statistics.view_count,
73  }
74  except UnauthorizedError as err:
75  raise ConfigEntryAuthFailed from err
76  except YouTubeBackendError as err:
77  raise UpdateFailed("Couldn't connect to YouTube") from err
78  return res
None __init__(self, HomeAssistant hass, AsyncConfigEntryAuth auth)
Definition: coordinator.py:38
str get_resource(str domain_name, ConfigType domain_data)
Definition: helpers.py:83