Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Minio component."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import os
7 from queue import Queue
8 import threading
9 
10 import voluptuous as vol
11 
12 from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
13 from homeassistant.core import HomeAssistant, ServiceCall
15 from homeassistant.helpers.typing import ConfigType
16 
17 from .minio_helper import MinioEventThread, create_minio_client
18 
19 _LOGGER = logging.getLogger(__name__)
20 
21 DOMAIN = "minio"
22 CONF_HOST = "host"
23 CONF_PORT = "port"
24 CONF_ACCESS_KEY = "access_key"
25 CONF_SECRET_KEY = "secret_key"
26 CONF_SECURE = "secure"
27 CONF_LISTEN = "listen"
28 CONF_LISTEN_BUCKET = "bucket"
29 CONF_LISTEN_PREFIX = "prefix"
30 CONF_LISTEN_SUFFIX = "suffix"
31 CONF_LISTEN_EVENTS = "events"
32 
33 ATTR_BUCKET = "bucket"
34 ATTR_KEY = "key"
35 ATTR_FILE_PATH = "file_path"
36 
37 DEFAULT_LISTEN_PREFIX = ""
38 DEFAULT_LISTEN_SUFFIX = ".*"
39 DEFAULT_LISTEN_EVENTS = "s3:ObjectCreated:*"
40 
41 CONFIG_SCHEMA = vol.Schema(
42  {
43  DOMAIN: vol.Schema(
44  {
45  vol.Required(CONF_HOST): cv.string,
46  vol.Required(CONF_PORT): cv.port,
47  vol.Required(CONF_ACCESS_KEY): cv.string,
48  vol.Required(CONF_SECRET_KEY): cv.string,
49  vol.Required(CONF_SECURE): cv.boolean,
50  vol.Optional(CONF_LISTEN, default=[]): vol.All(
51  cv.ensure_list,
52  [
53  vol.Schema(
54  {
55  vol.Required(CONF_LISTEN_BUCKET): cv.string,
56  vol.Optional(
57  CONF_LISTEN_PREFIX, default=DEFAULT_LISTEN_PREFIX
58  ): cv.string,
59  vol.Optional(
60  CONF_LISTEN_SUFFIX, default=DEFAULT_LISTEN_SUFFIX
61  ): cv.string,
62  vol.Optional(
63  CONF_LISTEN_EVENTS, default=DEFAULT_LISTEN_EVENTS
64  ): cv.string,
65  }
66  )
67  ],
68  ),
69  }
70  )
71  },
72  extra=vol.ALLOW_EXTRA,
73 )
74 
75 BUCKET_KEY_SCHEMA = vol.Schema(
76  {vol.Required(ATTR_BUCKET): cv.string, vol.Required(ATTR_KEY): cv.string}
77 )
78 
79 BUCKET_KEY_FILE_SCHEMA = BUCKET_KEY_SCHEMA.extend(
80  {vol.Required(ATTR_FILE_PATH): cv.string}
81 )
82 
83 
84 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
85  """Set up MinioClient and event listeners."""
86  conf = config[DOMAIN]
87 
88  host = conf[CONF_HOST]
89  port = conf[CONF_PORT]
90  access_key = conf[CONF_ACCESS_KEY]
91  secret_key = conf[CONF_SECRET_KEY]
92  secure = conf[CONF_SECURE]
93 
94  queue_listener = QueueListener(hass)
95  queue = queue_listener.queue
96 
97  hass.bus.listen_once(EVENT_HOMEASSISTANT_START, queue_listener.start_handler)
98  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, queue_listener.stop_handler)
99 
100  def _setup_listener(listener_conf):
101  bucket = listener_conf[CONF_LISTEN_BUCKET]
102  prefix = listener_conf[CONF_LISTEN_PREFIX]
103  suffix = listener_conf[CONF_LISTEN_SUFFIX]
104  events = listener_conf[CONF_LISTEN_EVENTS]
105 
106  minio_listener = MinioListener(
107  queue,
108  get_minio_endpoint(host, port),
109  access_key,
110  secret_key,
111  secure,
112  bucket,
113  prefix,
114  suffix,
115  events,
116  )
117 
118  hass.bus.listen_once(EVENT_HOMEASSISTANT_START, minio_listener.start_handler)
119  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, minio_listener.stop_handler)
120 
121  for listen_conf in conf[CONF_LISTEN]:
122  _setup_listener(listen_conf)
123 
124  minio_client = create_minio_client(
125  get_minio_endpoint(host, port), access_key, secret_key, secure
126  )
127 
128  def put_file(service: ServiceCall) -> None:
129  """Upload file service."""
130  bucket = service.data[ATTR_BUCKET]
131  key = service.data[ATTR_KEY]
132  file_path = service.data[ATTR_FILE_PATH]
133 
134  if not hass.config.is_allowed_path(file_path):
135  raise ValueError(f"Invalid file_path {file_path}")
136 
137  minio_client.fput_object(bucket, key, file_path)
138 
139  def get_file(service: ServiceCall) -> None:
140  """Download file service."""
141  bucket = service.data[ATTR_BUCKET]
142  key = service.data[ATTR_KEY]
143  file_path = service.data[ATTR_FILE_PATH]
144 
145  if not hass.config.is_allowed_path(file_path):
146  raise ValueError(f"Invalid file_path {file_path}")
147 
148  minio_client.fget_object(bucket, key, file_path)
149 
150  def remove_file(service: ServiceCall) -> None:
151  """Delete file service."""
152  bucket = service.data[ATTR_BUCKET]
153  key = service.data[ATTR_KEY]
154 
155  minio_client.remove_object(bucket, key)
156 
157  hass.services.register(DOMAIN, "put", put_file, schema=BUCKET_KEY_FILE_SCHEMA)
158  hass.services.register(DOMAIN, "get", get_file, schema=BUCKET_KEY_FILE_SCHEMA)
159  hass.services.register(DOMAIN, "remove", remove_file, schema=BUCKET_KEY_SCHEMA)
160 
161  return True
162 
163 
164 def get_minio_endpoint(host: str, port: int) -> str:
165  """Create minio endpoint from host and port."""
166  return f"{host}:{port}"
167 
168 
169 class QueueListener(threading.Thread):
170  """Forward events from queue into Home Assistant event bus."""
171 
172  def __init__(self, hass):
173  """Create queue."""
174  super().__init__()
175  self._hass_hass = hass
176  self._queue_queue = Queue()
177 
178  def run(self):
179  """Listen to queue events, and forward them to Home Assistant event bus."""
180  _LOGGER.debug("Running QueueListener")
181  while True:
182  if (event := self._queue_queue.get()) is None:
183  break
184 
185  _, file_name = os.path.split(event[ATTR_KEY])
186 
187  _LOGGER.debug(
188  "Sending event %s, %s, %s",
189  event["event_name"],
190  event[ATTR_BUCKET],
191  event[ATTR_KEY],
192  )
193  self._hass_hass.bus.fire(DOMAIN, {"file_name": file_name, **event})
194 
195  @property
196  def queue(self):
197  """Return wrapped queue."""
198  return self._queue_queue
199 
200  def stop(self):
201  """Stop run by putting None into queue and join the thread."""
202  _LOGGER.debug("Stopping QueueListener")
203  self._queue_queue.put(None)
204  self.join()
205  _LOGGER.debug("Stopped QueueListener")
206 
207  def start_handler(self, _):
208  """Start handler helper method."""
209  self.start()
210 
211  def stop_handler(self, _):
212  """Stop handler helper method."""
213  self.stopstop()
214 
215 
217  """MinioEventThread wrapper with helper methods."""
218 
219  def __init__(
220  self,
221  queue: Queue,
222  endpoint: str,
223  access_key: str,
224  secret_key: str,
225  secure: bool,
226  bucket_name: str,
227  prefix: str,
228  suffix: str,
229  events: list[str],
230  ) -> None:
231  """Create Listener."""
232  self._queue_queue = queue
233  self._endpoint_endpoint = endpoint
234  self._access_key_access_key = access_key
235  self._secret_key_secret_key = secret_key
236  self._secure_secure = secure
237  self._bucket_name_bucket_name = bucket_name
238  self._prefix_prefix = prefix
239  self._suffix_suffix = suffix
240  self._events_events = events
241  self._minio_event_thread_minio_event_thread = None
242 
243  def start_handler(self, _):
244  """Create and start the event thread."""
245  self._minio_event_thread_minio_event_thread = MinioEventThread(
246  self._queue_queue,
247  self._endpoint_endpoint,
248  self._access_key_access_key,
249  self._secret_key_secret_key,
250  self._secure_secure,
251  self._bucket_name_bucket_name,
252  self._prefix_prefix,
253  self._suffix_suffix,
254  self._events_events,
255  )
256  self._minio_event_thread_minio_event_thread.start()
257 
258  def stop_handler(self, _):
259  """Issue stop and wait for thread to join."""
260  if self._minio_event_thread_minio_event_thread is not None:
261  self._minio_event_thread_minio_event_thread.stop()
None __init__(self, Queue queue, str endpoint, str access_key, str secret_key, bool secure, str bucket_name, str prefix, str suffix, list[str] events)
Definition: __init__.py:230
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
Minio create_minio_client(str endpoint, str access_key, str secret_key, bool secure)
Definition: minio_helper.py:37
bool setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:84
str get_minio_endpoint(str host, int port)
Definition: __init__.py:164