3 from __future__
import annotations
7 from queue
import Queue
10 import voluptuous
as vol
17 from .minio_helper
import MinioEventThread, create_minio_client
19 _LOGGER = logging.getLogger(__name__)
24 CONF_ACCESS_KEY =
"access_key"
25 CONF_SECRET_KEY =
"secret_key"
26 CONF_SECURE =
"secure"
27 CONF_LISTEN =
"listen"
28 CONF_LISTEN_BUCKET =
"bucket"
29 CONF_LISTEN_PREFIX =
"prefix"
30 CONF_LISTEN_SUFFIX =
"suffix"
31 CONF_LISTEN_EVENTS =
"events"
33 ATTR_BUCKET =
"bucket"
35 ATTR_FILE_PATH =
"file_path"
37 DEFAULT_LISTEN_PREFIX =
""
38 DEFAULT_LISTEN_SUFFIX =
".*"
39 DEFAULT_LISTEN_EVENTS =
"s3:ObjectCreated:*"
41 CONFIG_SCHEMA = vol.Schema(
45 vol.Required(CONF_HOST): cv.string,
46 vol.Required(CONF_PORT): cv.port,
47 vol.Required(CONF_ACCESS_KEY): cv.string,
48 vol.Required(CONF_SECRET_KEY): cv.string,
49 vol.Required(CONF_SECURE): cv.boolean,
50 vol.Optional(CONF_LISTEN, default=[]): vol.All(
55 vol.Required(CONF_LISTEN_BUCKET): cv.string,
57 CONF_LISTEN_PREFIX, default=DEFAULT_LISTEN_PREFIX
60 CONF_LISTEN_SUFFIX, default=DEFAULT_LISTEN_SUFFIX
63 CONF_LISTEN_EVENTS, default=DEFAULT_LISTEN_EVENTS
72 extra=vol.ALLOW_EXTRA,
75 BUCKET_KEY_SCHEMA = vol.Schema(
76 {vol.Required(ATTR_BUCKET): cv.string, vol.Required(ATTR_KEY): cv.string}
79 BUCKET_KEY_FILE_SCHEMA = BUCKET_KEY_SCHEMA.extend(
80 {vol.Required(ATTR_FILE_PATH): cv.string}
84 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
85 """Set up MinioClient and event listeners."""
88 host = conf[CONF_HOST]
89 port = conf[CONF_PORT]
90 access_key = conf[CONF_ACCESS_KEY]
91 secret_key = conf[CONF_SECRET_KEY]
92 secure = conf[CONF_SECURE]
95 queue = queue_listener.queue
97 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, queue_listener.start_handler)
98 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, queue_listener.stop_handler)
100 def _setup_listener(listener_conf):
101 bucket = listener_conf[CONF_LISTEN_BUCKET]
102 prefix = listener_conf[CONF_LISTEN_PREFIX]
103 suffix = listener_conf[CONF_LISTEN_SUFFIX]
104 events = listener_conf[CONF_LISTEN_EVENTS]
118 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, minio_listener.start_handler)
119 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, minio_listener.stop_handler)
121 for listen_conf
in conf[CONF_LISTEN]:
122 _setup_listener(listen_conf)
128 def put_file(service: ServiceCall) ->
None:
129 """Upload file service."""
130 bucket = service.data[ATTR_BUCKET]
131 key = service.data[ATTR_KEY]
132 file_path = service.data[ATTR_FILE_PATH]
134 if not hass.config.is_allowed_path(file_path):
135 raise ValueError(f
"Invalid file_path {file_path}")
137 minio_client.fput_object(bucket, key, file_path)
139 def get_file(service: ServiceCall) ->
None:
140 """Download file service."""
141 bucket = service.data[ATTR_BUCKET]
142 key = service.data[ATTR_KEY]
143 file_path = service.data[ATTR_FILE_PATH]
145 if not hass.config.is_allowed_path(file_path):
146 raise ValueError(f
"Invalid file_path {file_path}")
148 minio_client.fget_object(bucket, key, file_path)
150 def remove_file(service: ServiceCall) ->
None:
151 """Delete file service."""
152 bucket = service.data[ATTR_BUCKET]
153 key = service.data[ATTR_KEY]
155 minio_client.remove_object(bucket, key)
157 hass.services.register(DOMAIN,
"put", put_file, schema=BUCKET_KEY_FILE_SCHEMA)
158 hass.services.register(DOMAIN,
"get", get_file, schema=BUCKET_KEY_FILE_SCHEMA)
159 hass.services.register(DOMAIN,
"remove", remove_file, schema=BUCKET_KEY_SCHEMA)
165 """Create minio endpoint from host and port."""
166 return f
"{host}:{port}"
170 """Forward events from queue into Home Assistant event bus."""
179 """Listen to queue events, and forward them to Home Assistant event bus."""
180 _LOGGER.debug(
"Running QueueListener")
182 if (event := self.
_queue_queue.
get())
is None:
185 _, file_name = os.path.split(event[ATTR_KEY])
188 "Sending event %s, %s, %s",
193 self.
_hass_hass.bus.fire(DOMAIN, {
"file_name": file_name, **event})
197 """Return wrapped queue."""
201 """Stop run by putting None into queue and join the thread."""
202 _LOGGER.debug(
"Stopping QueueListener")
203 self.
_queue_queue.put(
None)
205 _LOGGER.debug(
"Stopped QueueListener")
208 """Start handler helper method."""
212 """Stop handler helper method."""
217 """MinioEventThread wrapper with helper methods."""
231 """Create Listener."""
244 """Create and start the event thread."""
259 """Issue stop and wait for thread to join."""
def start_handler(self, _)
def stop_handler(self, _)
None __init__(self, Queue queue, str endpoint, str access_key, str secret_key, bool secure, str bucket_name, str prefix, str suffix, list[str] events)
def start_handler(self, _)
def stop_handler(self, _)
web.Response get(self, web.Request request, str config_key)
Minio create_minio_client(str endpoint, str access_key, str secret_key, bool secure)
bool setup(HomeAssistant hass, ConfigType config)
str get_minio_endpoint(str host, int port)