Home Assistant Unofficial Reference 2024.12.1
media_source.py
Go to the documentation of this file.
1 """Netatmo Media Source Implementation."""
2 
3 from __future__ import annotations
4 
5 import datetime as dt
6 import logging
7 import re
8 
9 from homeassistant.components.media_player import BrowseError, MediaClass, MediaType
11  BrowseMediaSource,
12  MediaSource,
13  MediaSourceError,
14  MediaSourceItem,
15  PlayMedia,
16  Unresolvable,
17 )
18 from homeassistant.core import HomeAssistant, callback
19 
20 from .const import DATA_CAMERAS, DATA_EVENTS, DOMAIN, MANUFACTURER
21 
22 _LOGGER = logging.getLogger(__name__)
23 MIME_TYPE = "application/x-mpegURL"
24 
25 
27  """Incompatible media source attributes."""
28 
29 
30 async def async_get_media_source(hass: HomeAssistant) -> NetatmoSource:
31  """Set up Netatmo media source."""
32  return NetatmoSource(hass)
33 
34 
36  """Provide Netatmo camera recordings as media sources."""
37 
38  name: str = MANUFACTURER
39 
40  def __init__(self, hass: HomeAssistant) -> None:
41  """Initialize Netatmo source."""
42  super().__init__(DOMAIN)
43  self.hasshass = hass
44  self.eventsevents = self.hasshass.data[DOMAIN][DATA_EVENTS]
45 
46  async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
47  """Resolve media to a url."""
48  _, camera_id, event_id = async_parse_identifier(item)
49  url = self.eventsevents[camera_id][event_id]["media_url"]
50  return PlayMedia(url, MIME_TYPE)
51 
52  async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
53  """Return media."""
54  try:
55  source, camera_id, event_id = async_parse_identifier(item)
56  except Unresolvable as err:
57  raise BrowseError(str(err)) from err
58 
59  return self._browse_media_browse_media(source, camera_id, event_id)
60 
62  self, source: str, camera_id: str, event_id: int | None
63  ) -> BrowseMediaSource:
64  """Browse media."""
65  if camera_id and camera_id not in self.eventsevents:
66  raise BrowseError("Camera does not exist.")
67 
68  if event_id and event_id not in self.eventsevents[camera_id]:
69  raise BrowseError("Event does not exist.")
70 
71  return self._build_item_response_build_item_response(source, camera_id, event_id)
72 
74  self, source: str, camera_id: str, event_id: int | None = None
75  ) -> BrowseMediaSource:
76  if event_id and event_id in self.eventsevents[camera_id]:
77  created = dt.datetime.fromtimestamp(
78  self.eventsevents[camera_id][event_id]["event_time"]
79  )
80  thumbnail = self.eventsevents[camera_id][event_id].get("snapshot", {}).get("url")
81  message = remove_html_tags(
82  self.eventsevents[camera_id][event_id].get("message", "")
83  )
84  title = f"{created} - {message}"
85  else:
86  title = self.hasshass.data[DOMAIN][DATA_CAMERAS].get(camera_id, MANUFACTURER)
87  thumbnail = None
88 
89  if event_id:
90  path = f"{source}/{camera_id}/{event_id}"
91  else:
92  path = f"{source}/{camera_id}"
93 
94  media_class = MediaClass.DIRECTORY if event_id is None else MediaClass.VIDEO
95 
96  media = BrowseMediaSource(
97  domain=DOMAIN,
98  identifier=path,
99  media_class=media_class,
100  media_content_type=MediaType.VIDEO,
101  title=title,
102  can_play=bool(
103  event_id and self.eventsevents[camera_id][event_id].get("media_url")
104  ),
105  can_expand=event_id is None,
106  thumbnail=thumbnail,
107  )
108 
109  if not media.can_play and not media.can_expand:
110  _LOGGER.debug(
111  "Camera %s with event %s without media url found", camera_id, event_id
112  )
113  raise IncompatibleMediaSource
114 
115  if not media.can_expand:
116  return media
117 
118  media.children = []
119  # Append first level children
120  if not camera_id:
121  for cid in self.eventsevents:
122  child = self._build_item_response_build_item_response(source, cid)
123  if child:
124  media.children.append(child)
125  else:
126  for eid in self.eventsevents[camera_id]:
127  try:
128  child = self._build_item_response_build_item_response(source, camera_id, eid)
129  except IncompatibleMediaSource:
130  continue
131  if child:
132  media.children.append(child)
133 
134  return media
135 
136 
137 def remove_html_tags(text: str) -> str:
138  """Remove html tags from string."""
139  clean = re.compile("<.*?>")
140  return re.sub(clean, "", text)
141 
142 
143 @callback
145  item: MediaSourceItem,
146 ) -> tuple[str, str, int | None]:
147  """Parse identifier."""
148  if not item.identifier or "/" not in item.identifier:
149  return "events", "", None
150 
151  source, path = item.identifier.lstrip("/").split("/", 1)
152 
153  if source != "events":
154  raise Unresolvable("Unknown source directory.")
155 
156  if "/" in path:
157  camera_id, event_id = path.split("/", 1)
158  return source, camera_id, int(event_id)
159 
160  return source, path, None
BrowseMediaSource _browse_media(self, str source, str camera_id, int|None event_id)
Definition: media_source.py:63
BrowseMediaSource async_browse_media(self, MediaSourceItem item)
Definition: media_source.py:52
PlayMedia async_resolve_media(self, MediaSourceItem item)
Definition: media_source.py:46
BrowseMediaSource _build_item_response(self, str source, str camera_id, int|None event_id=None)
Definition: media_source.py:75
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
tuple[str, str, int|None] async_parse_identifier(MediaSourceItem item)
NetatmoSource async_get_media_source(HomeAssistant hass)
Definition: media_source.py:30