1 """Minio helper methods."""
3 from __future__
import annotations
5 from collections.abc
import Iterable
8 from queue
import Queue
12 from typing
import Self
13 from urllib.parse
import unquote
15 from minio
import Minio
16 from urllib3.exceptions
import HTTPError
18 _LOGGER = logging.getLogger(__name__)
20 _METADATA_RE = re.compile(
"x-amz-meta-(.*)", re.IGNORECASE)
24 """Normalize object metadata by stripping the prefix."""
26 for meta_key, meta_value
in metadata.items():
27 if not (match := _METADATA_RE.match(meta_key)):
30 new_metadata[match.group(1).lower()] = meta_value
36 endpoint: str, access_key: str, secret_key: str, secure: bool
38 """Create Minio client."""
40 endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure
45 minio_client, bucket_name: str, prefix: str, suffix: str, events: list[str]
47 """Start listening to minio events. Copied from minio-py."""
48 query = {
"prefix": prefix,
"suffix": suffix,
"events": events}
49 return minio_client._url_open(
50 "GET", bucket_name=bucket_name, query=query, preload_content=
False
55 """Iterator wrapper over notification http response stream."""
67 """Get next not empty line."""
69 line = next(self.
_stream_stream)
71 event = json.loads(line.decode(
"utf-8"))
72 if event[
"Records"]
is not None:
76 """Close the response."""
81 """Thread wrapper around minio notification blocking stream."""
95 """Copy over all Minio client options."""
110 """Start the thread."""
114 """Stop and join the thread."""
118 """Create MinioClient and run the loop."""
119 _LOGGER.debug(
"Running MinioEventThread")
128 _LOGGER.debug(
"Connecting to minio event stream")
142 except json.JSONDecodeError:
145 except HTTPError
as error:
146 _LOGGER.error(
"Failed to connect to Minio endpoint: %s", error)
150 except AttributeError:
156 for event
in event_stream_it:
160 presigned_url = minio_client.presigned_get_object(bucket, key)
163 except Exception
as error:
164 _LOGGER.error(
"Failed to generate presigned url: %s", error)
167 "event_name": event_name,
170 "presigned_url": presigned_url,
171 "metadata": metadata,
173 _LOGGER.debug(
"Queue entry, %s", queue_entry)
174 self.
_queue_queue.put(queue_entry)
177 """Cancel event stream and join the thread."""
178 _LOGGER.debug(
"Stopping event thread")
184 _LOGGER.debug(
"Joining event thread")
186 _LOGGER.debug(
"Event thread joined")
190 """Iterate over file records of notification event.
192 Most of the time it should still be only one record.
194 records = event.get(
"Records", [])
196 for record
in records:
197 event_name = record.get(
"eventName")
198 bucket = record.get(
"s3", {}).
get(
"bucket", {}).
get(
"name")
199 key = record.get(
"s3", {}).
get(
"object", {}).
get(
"key")
201 record.get(
"s3", {}).
get(
"object", {}).
get(
"userMetadata", {})
204 if not bucket
or not key:
205 _LOGGER.warning(
"Invalid bucket and/or key, %s, %s", bucket, key)
210 yield event_name, bucket, key, metadata
def __init__(self, response)
def _iterate_event_stream(self, event_stream_it, minio_client)
def __exit__(self, exc_type, exc_val, exc_tb)
None __init__(self, Queue queue, str endpoint, str access_key, str secret_key, bool secure, str bucket_name, str prefix, str suffix, list[str] events)
web.Response get(self, web.Request request, str config_key)
def get_minio_notification_response(minio_client, str bucket_name, str prefix, str suffix, list[str] events)
Minio create_minio_client(str endpoint, str access_key, str secret_key, bool secure)
def iterate_objects(event)
dict normalize_metadata(dict metadata)