Home Assistant Unofficial Reference 2024.12.1
helpers.py
Go to the documentation of this file.
1 """Helpers to deal with Cast devices."""
2 
3 from __future__ import annotations
4 
5 import configparser
6 from dataclasses import dataclass
7 import logging
8 from typing import TYPE_CHECKING
9 from urllib.parse import urlparse
10 
11 import aiohttp
12 import attr
13 import pychromecast
14 from pychromecast import dial
15 from pychromecast.const import CAST_TYPE_GROUP
16 from pychromecast.models import CastInfo
17 
18 from homeassistant.core import HomeAssistant
19 from homeassistant.helpers import aiohttp_client
20 
21 from .const import DOMAIN
22 
23 if TYPE_CHECKING:
24  from homeassistant.components import zeroconf
25 
26 
27 _LOGGER = logging.getLogger(__name__)
28 
29 _PLS_SECTION_PLAYLIST = "playlist"
30 
31 
32 @attr.s(slots=True, frozen=True)
34  """Class to hold all data about a chromecast for creating connections.
35 
36  This also has the same attributes as the mDNS fields by zeroconf.
37  """
38 
39  cast_info: CastInfo = attr.ib()
40  is_dynamic_group = attr.ib(type=bool | None, default=None)
41 
42  @property
43  def friendly_name(self) -> str:
44  """Return the Friendly Name."""
45  return self.cast_info.friendly_name
46 
47  @property
48  def is_audio_group(self) -> bool:
49  """Return if the cast is an audio group."""
50  return self.cast_info.cast_type == CAST_TYPE_GROUP
51 
52  @property
53  def uuid(self) -> bool:
54  """Return the UUID."""
55  return self.cast_info.uuid
56 
57  def fill_out_missing_chromecast_info(self, hass: HomeAssistant) -> ChromecastInfo:
58  """Return a new ChromecastInfo object with missing attributes filled in.
59 
60  Uses blocking HTTP / HTTPS.
61  """
62  cast_info = self.cast_info
63  if self.cast_info.cast_type is None or self.cast_info.manufacturer is None:
64  unknown_models = hass.data[DOMAIN]["unknown_models"]
65  if self.cast_info.model_name not in unknown_models:
66  # Manufacturer and cast type is not available in mDNS data,
67  # get it over HTTP
68  cast_info = dial.get_cast_type(
69  cast_info,
70  zconf=ChromeCastZeroconf.get_zeroconf(),
71  )
72  unknown_models[self.cast_info.model_name] = (
73  cast_info.cast_type,
74  cast_info.manufacturer,
75  )
76 
77  report_issue = (
78  "create a bug report at "
79  "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue"
80  "+label%3A%22integration%3A+cast%22"
81  )
82 
83  _LOGGER.debug(
84  (
85  "Fetched cast details for unknown model '%s' manufacturer:"
86  " '%s', type: '%s'. Please %s"
87  ),
88  cast_info.model_name,
89  cast_info.manufacturer,
90  cast_info.cast_type,
91  report_issue,
92  )
93  else:
94  cast_type, manufacturer = unknown_models[self.cast_info.model_name]
95  cast_info = CastInfo(
96  cast_info.services,
97  cast_info.uuid,
98  cast_info.model_name,
99  cast_info.friendly_name,
100  cast_info.host,
101  cast_info.port,
102  cast_type,
103  manufacturer,
104  )
105 
106  if not self.is_audio_groupis_audio_group or self.is_dynamic_groupis_dynamic_group is not None:
107  # We have all information, no need to check HTTP API.
108  return ChromecastInfo(cast_info=cast_info)
109 
110  # Fill out missing group information via HTTP API.
111  is_dynamic_group = False
112  http_group_status = None
113  http_group_status = dial.get_multizone_status(
114  None,
115  services=self.cast_info.services,
116  zconf=ChromeCastZeroconf.get_zeroconf(),
117  )
118  if http_group_status is not None:
119  is_dynamic_group = any(
120  g.uuid == self.cast_info.uuid for g in http_group_status.dynamic_groups
121  )
122 
123  return ChromecastInfo(
124  cast_info=cast_info,
125  is_dynamic_group=is_dynamic_group,
126  )
127 
128 
130  """Class to hold a zeroconf instance."""
131 
132  __zconf: zeroconf.HaZeroconf | None = None
133 
134  @classmethod
135  def set_zeroconf(cls, zconf: zeroconf.HaZeroconf) -> None:
136  """Set zeroconf."""
137  cls.__zconf__zconf = zconf
138 
139  @classmethod
140  def get_zeroconf(cls) -> zeroconf.HaZeroconf | None:
141  """Get zeroconf."""
142  return cls.__zconf__zconf
143 
144 
146  pychromecast.controllers.media.MediaStatusListener,
147  pychromecast.controllers.multizone.MultiZoneManagerListener,
148  pychromecast.controllers.receiver.CastStatusListener,
149  pychromecast.socket_client.ConnectionStatusListener,
150 ):
151  """Helper class to handle pychromecast status callbacks.
152 
153  Necessary because a CastDevice entity or dynamic group can create a new
154  socket client and therefore callbacks from multiple chromecast connections can
155  potentially arrive. This class allows invalidating past chromecast objects.
156  """
157 
158  def __init__(self, cast_device, chromecast, mz_mgr, mz_only=False):
159  """Initialize the status listener."""
160  self._cast_device_cast_device = cast_device
161  self._uuid_uuid = chromecast.uuid
162  self._valid_valid = True
163  self._mz_mgr_mz_mgr = mz_mgr
164 
165  if cast_device._cast_info.is_audio_group: # noqa: SLF001
166  self._mz_mgr_mz_mgr.add_multizone(chromecast)
167  if mz_only:
168  return
169 
170  chromecast.register_status_listener(self)
171  chromecast.socket_client.media_controller.register_status_listener(self)
172  chromecast.register_connection_listener(self)
173  if not cast_device._cast_info.is_audio_group: # noqa: SLF001
174  self._mz_mgr_mz_mgr.register_listener(chromecast.uuid, self)
175 
176  def new_cast_status(self, status):
177  """Handle reception of a new CastStatus."""
178  if self._valid_valid:
179  self._cast_device_cast_device.new_cast_status(status)
180 
181  def new_media_status(self, status):
182  """Handle reception of a new MediaStatus."""
183  if self._valid_valid:
184  self._cast_device_cast_device.new_media_status(status)
185 
186  def load_media_failed(self, queue_item_id, error_code):
187  """Handle reception of a new MediaStatus."""
188  if self._valid_valid:
189  self._cast_device_cast_device.load_media_failed(queue_item_id, error_code)
190 
191  def new_connection_status(self, status):
192  """Handle reception of a new ConnectionStatus."""
193  if self._valid_valid:
194  self._cast_device_cast_device.new_connection_status(status)
195 
196  def added_to_multizone(self, group_uuid):
197  """Handle the cast added to a group."""
198 
199  def removed_from_multizone(self, group_uuid):
200  """Handle the cast removed from a group."""
201  if self._valid:
202  self._cast_device.multizone_new_media_status(group_uuid, None)
203 
204  def multizone_new_cast_status(self, group_uuid, cast_status):
205  """Handle reception of a new CastStatus for a group."""
206 
207  def multizone_new_media_status(self, group_uuid, media_status):
208  """Handle reception of a new MediaStatus for a group."""
209  if self._valid:
210  self._cast_device.multizone_new_media_status(group_uuid, media_status)
211 
212  def invalidate(self):
213  """Invalidate this status listener.
214 
215  All following callbacks won't be forwarded.
216  """
217  if self._cast_device_cast_device._cast_info.is_audio_group: # noqa: SLF001
218  self._mz_mgr_mz_mgr.remove_multizone(self._uuid_uuid)
219  else:
220  self._mz_mgr_mz_mgr.deregister_listener(self._uuid_uuid, self)
221  self._valid_valid = False
222 
223 
224 class PlaylistError(Exception):
225  """Exception wrapper for pls and m3u helpers."""
226 
227 
228 class PlaylistSupported(PlaylistError):
229  """The playlist is supported by cast devices and should not be parsed."""
230 
231 
232 @dataclass
234  """Playlist item."""
235 
236  length: str | None
237  title: str | None
238  url: str
239 
240 
241 def _is_url(url):
242  """Validate the URL can be parsed and at least has scheme + netloc."""
243  result = urlparse(url)
244  return all([result.scheme, result.netloc])
245 
246 
247 async def _fetch_playlist(hass, url, supported_content_types):
248  """Fetch a playlist from the given url."""
249  try:
250  session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False)
251  async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
252  charset = resp.charset or "utf-8"
253  if resp.content_type in supported_content_types:
254  raise PlaylistSupported
255  try:
256  playlist_data = (await resp.content.read(64 * 1024)).decode(charset)
257  except ValueError as err:
258  raise PlaylistError(f"Could not decode playlist {url}") from err
259  except TimeoutError as err:
260  raise PlaylistError(f"Timeout while fetching playlist {url}") from err
261  except aiohttp.client_exceptions.ClientError as err:
262  raise PlaylistError(f"Error while fetching playlist {url}") from err
263 
264  return playlist_data
265 
266 
267 async def parse_m3u(hass, url):
268  """Very simple m3u parser.
269 
270  Based on https://github.com/dvndrsn/M3uParser/blob/master/m3uparser.py
271  """
272  # From Mozilla gecko source: https://github.com/mozilla/gecko-dev/blob/c4c1adbae87bf2d128c39832d72498550ee1b4b8/dom/media/DecoderTraits.cpp#L47-L52
273  hls_content_types = (
274  # https://tools.ietf.org/html/draft-pantos-http-live-streaming-19#section-10
275  "application/vnd.apple.mpegurl",
276  # Additional informal types used by Mozilla gecko not included as they
277  # don't reliably indicate HLS streams
278  )
279  m3u_data = await _fetch_playlist(hass, url, hls_content_types)
280  m3u_lines = m3u_data.splitlines()
281 
282  playlist = []
283 
284  length = None
285  title = None
286 
287  for line in m3u_lines:
288  line = line.strip()
289  if line.startswith("#EXTINF:"):
290  # Get length and title from #EXTINF line
291  info = line.split("#EXTINF:")[1].split(",", 1)
292  if len(info) != 2:
293  _LOGGER.warning("Ignoring invalid extinf %s in playlist %s", line, url)
294  continue
295  length = info[0].split(" ", 1)
296  title = info[1].strip()
297  elif line.startswith(("#EXT-X-VERSION:", "#EXT-X-STREAM-INF:")):
298  # HLS stream, supported by cast devices
299  raise PlaylistSupported("HLS")
300  elif line.startswith("#"):
301  # Ignore other extensions
302  continue
303  elif len(line) != 0:
304  # Get song path from all other, non-blank lines
305  if not _is_url(line):
306  raise PlaylistError(f"Invalid item {line} in playlist {url}")
307  playlist.append(PlaylistItem(length=length, title=title, url=line))
308  # reset the song variables so it doesn't use the same EXTINF more than once
309  length = None
310  title = None
311 
312  return playlist
313 
314 
315 async def parse_pls(hass, url):
316  """Very simple pls parser.
317 
318  Based on https://github.com/mariob/plsparser/blob/master/src/plsparser.py
319  """
320  pls_data = await _fetch_playlist(hass, url, ())
321 
322  pls_parser = configparser.ConfigParser()
323  try:
324  pls_parser.read_string(pls_data, url)
325  except configparser.Error as err:
326  raise PlaylistError(f"Can't parse playlist {url}") from err
327 
328  if (
329  _PLS_SECTION_PLAYLIST not in pls_parser
330  or pls_parser[_PLS_SECTION_PLAYLIST].getint("Version") != 2
331  ):
332  raise PlaylistError(f"Invalid playlist {url}")
333 
334  try:
335  num_entries = pls_parser.getint(_PLS_SECTION_PLAYLIST, "NumberOfEntries")
336  except (configparser.NoOptionError, ValueError) as err:
337  raise PlaylistError(f"Invalid NumberOfEntries in playlist {url}") from err
338 
339  playlist_section = pls_parser[_PLS_SECTION_PLAYLIST]
340 
341  playlist = []
342  for entry in range(1, num_entries + 1):
343  file_option = f"File{entry}"
344  if file_option not in playlist_section:
345  _LOGGER.warning("Missing %s in pls from %s", file_option, url)
346  continue
347  item_url = playlist_section[file_option]
348  if not _is_url(item_url):
349  raise PlaylistError(f"Invalid item {item_url} in playlist {url}")
350  playlist.append(
351  PlaylistItem(
352  length=playlist_section.get(f"Length{entry}"),
353  title=playlist_section.get(f"Title{entry}"),
354  url=item_url,
355  )
356  )
357  return playlist
358 
359 
360 async def parse_playlist(hass, url):
361  """Parse an m3u or pls playlist."""
362  if url.endswith((".m3u", ".m3u8")):
363  playlist = await parse_m3u(hass, url)
364  else:
365  playlist = await parse_pls(hass, url)
366 
367  if not playlist:
368  raise PlaylistError(f"Empty playlist {url}")
369 
370  return playlist
def multizone_new_media_status(self, group_uuid, media_status)
Definition: helpers.py:207
def load_media_failed(self, queue_item_id, error_code)
Definition: helpers.py:186
def multizone_new_cast_status(self, group_uuid, cast_status)
Definition: helpers.py:204
def __init__(self, cast_device, chromecast, mz_mgr, mz_only=False)
Definition: helpers.py:158
None set_zeroconf(cls, zeroconf.HaZeroconf zconf)
Definition: helpers.py:135
ChromecastInfo fill_out_missing_chromecast_info(self, HomeAssistant hass)
Definition: helpers.py:57
def _fetch_playlist(hass, url, supported_content_types)
Definition: helpers.py:247