Home Assistant Unofficial Reference 2024.12.1
services.py
Go to the documentation of this file.
1 """Google Photos services."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 import mimetypes
7 from pathlib import Path
8 
9 from google_photos_library_api.exceptions import GooglePhotosApiError
10 from google_photos_library_api.model import NewMediaItem, SimpleMediaItem
11 import voluptuous as vol
12 
13 from homeassistant.const import CONF_FILENAME
14 from homeassistant.core import (
15  HomeAssistant,
16  ServiceCall,
17  ServiceResponse,
18  SupportsResponse,
19 )
20 from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
21 from homeassistant.helpers import config_validation as cv
22 
23 from .const import DOMAIN, UPLOAD_SCOPE
24 from .types import GooglePhotosConfigEntry
25 
26 CONF_CONFIG_ENTRY_ID = "config_entry_id"
27 CONF_ALBUM = "album"
28 
29 UPLOAD_SERVICE = "upload"
30 UPLOAD_SERVICE_SCHEMA = vol.Schema(
31  {
32  vol.Required(CONF_CONFIG_ENTRY_ID): cv.string,
33  vol.Required(CONF_FILENAME): vol.All(cv.ensure_list, [cv.string]),
34  vol.Required(CONF_ALBUM): cv.string,
35  }
36 )
37 CONTENT_SIZE_LIMIT = 20 * 1024 * 1024
38 
39 
41  hass: HomeAssistant, filenames: list[str]
42 ) -> list[tuple[str, bytes]]:
43  """Return the mime types and file contents for each file."""
44  results = []
45  for filename in filenames:
46  if not hass.config.is_allowed_path(filename):
47  raise HomeAssistantError(
48  translation_domain=DOMAIN,
49  translation_key="no_access_to_path",
50  translation_placeholders={"filename": filename},
51  )
52  filename_path = Path(filename)
53  if not filename_path.exists():
54  raise HomeAssistantError(
55  translation_domain=DOMAIN,
56  translation_key="filename_does_not_exist",
57  translation_placeholders={"filename": filename},
58  )
59  if filename_path.stat().st_size > CONTENT_SIZE_LIMIT:
60  raise HomeAssistantError(
61  translation_domain=DOMAIN,
62  translation_key="file_too_large",
63  translation_placeholders={
64  "filename": filename,
65  "size": str(filename_path.stat().st_size),
66  "limit": str(CONTENT_SIZE_LIMIT),
67  },
68  )
69  mime_type, _ = mimetypes.guess_type(filename)
70  if mime_type is None or not (mime_type.startswith(("image", "video"))):
71  raise HomeAssistantError(
72  translation_domain=DOMAIN,
73  translation_key="filename_is_not_image",
74  translation_placeholders={"filename": filename},
75  )
76  results.append((mime_type, filename_path.read_bytes()))
77  return results
78 
79 
80 def async_register_services(hass: HomeAssistant) -> None:
81  """Register Google Photos services."""
82 
83  async def async_handle_upload(call: ServiceCall) -> ServiceResponse:
84  """Generate content from text and optionally images."""
85  config_entry: GooglePhotosConfigEntry | None = (
86  hass.config_entries.async_get_entry(call.data[CONF_CONFIG_ENTRY_ID])
87  )
88  if not config_entry:
90  translation_domain=DOMAIN,
91  translation_key="integration_not_found",
92  translation_placeholders={"target": DOMAIN},
93  )
94  scopes = config_entry.data["token"]["scope"].split(" ")
95  if UPLOAD_SCOPE not in scopes:
96  raise HomeAssistantError(
97  translation_domain=DOMAIN,
98  translation_key="missing_upload_permission",
99  translation_placeholders={"target": DOMAIN},
100  )
101  coordinator = config_entry.runtime_data
102  client_api = coordinator.client
103  upload_tasks = []
104  file_results = await hass.async_add_executor_job(
105  _read_file_contents, hass, call.data[CONF_FILENAME]
106  )
107 
108  album = call.data[CONF_ALBUM]
109  try:
110  album_id = await coordinator.get_or_create_album(album)
111  except GooglePhotosApiError as err:
112  raise HomeAssistantError(
113  translation_domain=DOMAIN,
114  translation_key="create_album_error",
115  translation_placeholders={"message": str(err)},
116  ) from err
117 
118  for mime_type, content in file_results:
119  upload_tasks.append(client_api.upload_content(content, mime_type))
120  try:
121  upload_results = await asyncio.gather(*upload_tasks)
122  except GooglePhotosApiError as err:
123  raise HomeAssistantError(
124  translation_domain=DOMAIN,
125  translation_key="upload_error",
126  translation_placeholders={"message": str(err)},
127  ) from err
128  try:
129  upload_result = await client_api.create_media_items(
130  [
131  NewMediaItem(
132  SimpleMediaItem(upload_token=upload_result.upload_token)
133  )
134  for upload_result in upload_results
135  ],
136  album_id=album_id,
137  )
138  except GooglePhotosApiError as err:
139  raise HomeAssistantError(
140  translation_domain=DOMAIN,
141  translation_key="api_error",
142  translation_placeholders={"message": str(err)},
143  ) from err
144  if call.return_response:
145  return {
146  "media_items": [
147  {
148  "media_item_id": item_result.media_item.id
149  for item_result in upload_result.new_media_item_results
150  if item_result.media_item and item_result.media_item.id
151  }
152  ],
153  "album_id": album_id,
154  }
155  return None
156 
157  if not hass.services.has_service(DOMAIN, UPLOAD_SERVICE):
158  hass.services.async_register(
159  DOMAIN,
160  UPLOAD_SERVICE,
161  async_handle_upload,
162  schema=UPLOAD_SERVICE_SCHEMA,
163  supports_response=SupportsResponse.OPTIONAL,
164  )
None async_register_services(HomeAssistant hass)
Definition: services.py:80
list[tuple[str, bytes]] _read_file_contents(HomeAssistant hass, list[str] filenames)
Definition: services.py:42