1 """Google Photos services."""
3 from __future__
import annotations
7 from pathlib
import Path
9 from google_photos_library_api.exceptions
import GooglePhotosApiError
10 from google_photos_library_api.model
import NewMediaItem, SimpleMediaItem
11 import voluptuous
as vol
23 from .const
import DOMAIN, UPLOAD_SCOPE
24 from .types
import GooglePhotosConfigEntry
26 CONF_CONFIG_ENTRY_ID =
"config_entry_id"
29 UPLOAD_SERVICE =
"upload"
30 UPLOAD_SERVICE_SCHEMA = vol.Schema(
32 vol.Required(CONF_CONFIG_ENTRY_ID): cv.string,
33 vol.Required(CONF_FILENAME): vol.All(cv.ensure_list, [cv.string]),
34 vol.Required(CONF_ALBUM): cv.string,
37 CONTENT_SIZE_LIMIT = 20 * 1024 * 1024
41 hass: HomeAssistant, filenames: list[str]
42 ) -> list[tuple[str, bytes]]:
43 """Return the mime types and file contents for each file."""
45 for filename
in filenames:
46 if not hass.config.is_allowed_path(filename):
48 translation_domain=DOMAIN,
49 translation_key=
"no_access_to_path",
50 translation_placeholders={
"filename": filename},
52 filename_path = Path(filename)
53 if not filename_path.exists():
55 translation_domain=DOMAIN,
56 translation_key=
"filename_does_not_exist",
57 translation_placeholders={
"filename": filename},
59 if filename_path.stat().st_size > CONTENT_SIZE_LIMIT:
61 translation_domain=DOMAIN,
62 translation_key=
"file_too_large",
63 translation_placeholders={
65 "size":
str(filename_path.stat().st_size),
66 "limit":
str(CONTENT_SIZE_LIMIT),
69 mime_type, _ = mimetypes.guess_type(filename)
70 if mime_type
is None or not (mime_type.startswith((
"image",
"video"))):
72 translation_domain=DOMAIN,
73 translation_key=
"filename_is_not_image",
74 translation_placeholders={
"filename": filename},
76 results.append((mime_type, filename_path.read_bytes()))
81 """Register Google Photos services."""
83 async
def async_handle_upload(call: ServiceCall) -> ServiceResponse:
84 """Generate content from text and optionally images."""
85 config_entry: GooglePhotosConfigEntry |
None = (
86 hass.config_entries.async_get_entry(call.data[CONF_CONFIG_ENTRY_ID])
90 translation_domain=DOMAIN,
91 translation_key=
"integration_not_found",
92 translation_placeholders={
"target": DOMAIN},
94 scopes = config_entry.data[
"token"][
"scope"].split(
" ")
95 if UPLOAD_SCOPE
not in scopes:
97 translation_domain=DOMAIN,
98 translation_key=
"missing_upload_permission",
99 translation_placeholders={
"target": DOMAIN},
101 coordinator = config_entry.runtime_data
102 client_api = coordinator.client
104 file_results = await hass.async_add_executor_job(
105 _read_file_contents, hass, call.data[CONF_FILENAME]
108 album = call.data[CONF_ALBUM]
110 album_id = await coordinator.get_or_create_album(album)
111 except GooglePhotosApiError
as err:
113 translation_domain=DOMAIN,
114 translation_key=
"create_album_error",
115 translation_placeholders={
"message":
str(err)},
118 for mime_type, content
in file_results:
119 upload_tasks.append(client_api.upload_content(content, mime_type))
121 upload_results = await asyncio.gather(*upload_tasks)
122 except GooglePhotosApiError
as err:
124 translation_domain=DOMAIN,
125 translation_key=
"upload_error",
126 translation_placeholders={
"message":
str(err)},
129 upload_result = await client_api.create_media_items(
132 SimpleMediaItem(upload_token=upload_result.upload_token)
134 for upload_result
in upload_results
138 except GooglePhotosApiError
as err:
140 translation_domain=DOMAIN,
141 translation_key=
"api_error",
142 translation_placeholders={
"message":
str(err)},
144 if call.return_response:
148 "media_item_id": item_result.media_item.id
149 for item_result
in upload_result.new_media_item_results
150 if item_result.media_item
and item_result.media_item.id
153 "album_id": album_id,
157 if not hass.services.has_service(DOMAIN, UPLOAD_SERVICE):
158 hass.services.async_register(
162 schema=UPLOAD_SERVICE_SCHEMA,
163 supports_response=SupportsResponse.OPTIONAL,
None async_register_services(HomeAssistant hass)
list[tuple[str, bytes]] _read_file_contents(HomeAssistant hass, list[str] filenames)