Home Assistant Unofficial Reference 2024.12.1
minio_helper.py
Go to the documentation of this file.
1 """Minio helper methods."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Iterable
6 import json
7 import logging
8 from queue import Queue
9 import re
10 import threading
11 import time
12 from typing import Self
13 from urllib.parse import unquote
14 
15 from minio import Minio
16 from urllib3.exceptions import HTTPError
17 
18 _LOGGER = logging.getLogger(__name__)
19 
20 _METADATA_RE = re.compile("x-amz-meta-(.*)", re.IGNORECASE)
21 
22 
23 def normalize_metadata(metadata: dict) -> dict:
24  """Normalize object metadata by stripping the prefix."""
25  new_metadata = {}
26  for meta_key, meta_value in metadata.items():
27  if not (match := _METADATA_RE.match(meta_key)):
28  continue
29 
30  new_metadata[match.group(1).lower()] = meta_value
31 
32  return new_metadata
33 
34 
36  endpoint: str, access_key: str, secret_key: str, secure: bool
37 ) -> Minio:
38  """Create Minio client."""
39  return Minio(
40  endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure
41  )
42 
43 
45  minio_client, bucket_name: str, prefix: str, suffix: str, events: list[str]
46 ):
47  """Start listening to minio events. Copied from minio-py."""
48  query = {"prefix": prefix, "suffix": suffix, "events": events}
49  return minio_client._url_open( # noqa: SLF001
50  "GET", bucket_name=bucket_name, query=query, preload_content=False
51  )
52 
53 
54 class MinioEventStreamIterator(Iterable):
55  """Iterator wrapper over notification http response stream."""
56 
57  def __iter__(self) -> Self:
58  """Return self."""
59  return self
60 
61  def __init__(self, response):
62  """Init."""
63  self._response_response = response
64  self._stream_stream = response.stream()
65 
66  def __next__(self):
67  """Get next not empty line."""
68  while True:
69  line = next(self._stream_stream)
70  if line.strip():
71  event = json.loads(line.decode("utf-8"))
72  if event["Records"] is not None:
73  return event
74 
75  def close(self):
76  """Close the response."""
77  self._response_response.close()
78 
79 
80 class MinioEventThread(threading.Thread):
81  """Thread wrapper around minio notification blocking stream."""
82 
83  def __init__(
84  self,
85  queue: Queue,
86  endpoint: str,
87  access_key: str,
88  secret_key: str,
89  secure: bool,
90  bucket_name: str,
91  prefix: str,
92  suffix: str,
93  events: list[str],
94  ) -> None:
95  """Copy over all Minio client options."""
96  super().__init__()
97  self._queue_queue = queue
98  self._endpoint_endpoint = endpoint
99  self._access_key_access_key = access_key
100  self._secret_key_secret_key = secret_key
101  self._secure_secure = secure
102  self._bucket_name_bucket_name = bucket_name
103  self._prefix_prefix = prefix
104  self._suffix_suffix = suffix
105  self._events_events = events
106  self._event_stream_it_event_stream_it = None
107  self._should_stop_should_stop = False
108 
109  def __enter__(self):
110  """Start the thread."""
111  self.start()
112 
113  def __exit__(self, exc_type, exc_val, exc_tb):
114  """Stop and join the thread."""
115  self.stopstop()
116 
117  def run(self):
118  """Create MinioClient and run the loop."""
119  _LOGGER.debug("Running MinioEventThread")
120 
121  self._should_stop_should_stop = False
122 
123  minio_client = create_minio_client(
124  self._endpoint_endpoint, self._access_key_access_key, self._secret_key_secret_key, self._secure_secure
125  )
126 
127  while not self._should_stop_should_stop:
128  _LOGGER.debug("Connecting to minio event stream")
129  response = None
130  try:
132  minio_client,
133  self._bucket_name_bucket_name,
134  self._prefix_prefix,
135  self._suffix_suffix,
136  self._events_events,
137  )
138 
139  self._event_stream_it_event_stream_it = MinioEventStreamIterator(response)
140 
141  self._iterate_event_stream_iterate_event_stream(self._event_stream_it_event_stream_it, minio_client)
142  except json.JSONDecodeError:
143  if response:
144  response.close()
145  except HTTPError as error:
146  _LOGGER.error("Failed to connect to Minio endpoint: %s", error)
147 
148  # Wait before attempting to connect again.
149  time.sleep(1)
150  except AttributeError:
151  # When response is closed, iterator will fail to access
152  # the underlying socket descriptor.
153  break
154 
155  def _iterate_event_stream(self, event_stream_it, minio_client):
156  for event in event_stream_it:
157  for event_name, bucket, key, metadata in iterate_objects(event):
158  presigned_url = ""
159  try:
160  presigned_url = minio_client.presigned_get_object(bucket, key)
161  # Fail gracefully. If for whatever reason this stops working,
162  # it shouldn't prevent it from firing events.
163  except Exception as error: # noqa: BLE001
164  _LOGGER.error("Failed to generate presigned url: %s", error)
165 
166  queue_entry = {
167  "event_name": event_name,
168  "bucket": bucket,
169  "key": key,
170  "presigned_url": presigned_url,
171  "metadata": metadata,
172  }
173  _LOGGER.debug("Queue entry, %s", queue_entry)
174  self._queue_queue.put(queue_entry)
175 
176  def stop(self):
177  """Cancel event stream and join the thread."""
178  _LOGGER.debug("Stopping event thread")
179  self._should_stop_should_stop = True
180  if self._event_stream_it_event_stream_it is not None:
181  self._event_stream_it_event_stream_it.close()
182  self._event_stream_it_event_stream_it = None
183 
184  _LOGGER.debug("Joining event thread")
185  self.join()
186  _LOGGER.debug("Event thread joined")
187 
188 
189 def iterate_objects(event):
190  """Iterate over file records of notification event.
191 
192  Most of the time it should still be only one record.
193  """
194  records = event.get("Records", [])
195 
196  for record in records:
197  event_name = record.get("eventName")
198  bucket = record.get("s3", {}).get("bucket", {}).get("name")
199  key = record.get("s3", {}).get("object", {}).get("key")
200  metadata = normalize_metadata(
201  record.get("s3", {}).get("object", {}).get("userMetadata", {})
202  )
203 
204  if not bucket or not key:
205  _LOGGER.warning("Invalid bucket and/or key, %s, %s", bucket, key)
206  continue
207 
208  key = unquote(key)
209 
210  yield event_name, bucket, key, metadata
def _iterate_event_stream(self, event_stream_it, minio_client)
None __init__(self, Queue queue, str endpoint, str access_key, str secret_key, bool secure, str bucket_name, str prefix, str suffix, list[str] events)
Definition: minio_helper.py:94
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
def get_minio_notification_response(minio_client, str bucket_name, str prefix, str suffix, list[str] events)
Definition: minio_helper.py:46
Minio create_minio_client(str endpoint, str access_key, str secret_key, bool secure)
Definition: minio_helper.py:37