1 """The Matrix bot component."""
3 from __future__
import annotations
6 from collections.abc
import Sequence
11 from typing
import Final, NewType, Required, TypedDict
14 from nio
import AsyncClient, Event, MatrixRoom
15 from nio.events.room_events
import RoomMessageText
16 from nio.responses
import (
22 RoomResolveAliasResponse,
29 import voluptuous
as vol
37 EVENT_HOMEASSISTANT_START,
38 EVENT_HOMEASSISTANT_STOP,
47 from .const
import DOMAIN, FORMAT_HTML, FORMAT_TEXT, SERVICE_SEND_MESSAGE
49 _LOGGER = logging.getLogger(__name__)
51 SESSION_FILE =
".matrix.conf"
53 CONF_HOMESERVER: Final =
"homeserver"
54 CONF_ROOMS: Final =
"rooms"
55 CONF_COMMANDS: Final =
"commands"
56 CONF_WORD: Final =
"word"
57 CONF_EXPRESSION: Final =
"expression"
59 CONF_USERNAME_REGEX =
"^@[^:]*:.*"
60 CONF_ROOMS_REGEX =
"^[!|#][^:]*:.*"
62 EVENT_MATRIX_COMMAND =
"matrix_command"
64 DEFAULT_CONTENT_TYPE =
"application/octet-stream"
66 MESSAGE_FORMATS = [FORMAT_HTML, FORMAT_TEXT]
67 DEFAULT_MESSAGE_FORMAT = FORMAT_TEXT
69 ATTR_FORMAT =
"format"
70 ATTR_IMAGES =
"images"
72 WordCommand = NewType(
"WordCommand", str)
73 ExpressionCommand = NewType(
"ExpressionCommand", re.Pattern)
74 RoomAlias = NewType(
"RoomAlias", str)
75 RoomID = NewType(
"RoomID", str)
76 RoomAnyID = RoomID | RoomAlias
80 """Corresponds to a single COMMAND_SCHEMA."""
85 expression: ExpressionCommand
88 COMMAND_SCHEMA = vol.All(
91 vol.Exclusive(CONF_WORD,
"trigger"): cv.string,
92 vol.Exclusive(CONF_EXPRESSION,
"trigger"): cv.is_regex,
93 vol.Required(CONF_NAME): cv.string,
94 vol.Optional(CONF_ROOMS): vol.All(
95 cv.ensure_list, [cv.matches_regex(CONF_ROOMS_REGEX)]
99 cv.has_at_least_one_key(CONF_WORD, CONF_EXPRESSION),
102 CONFIG_SCHEMA = vol.Schema(
106 vol.Required(CONF_HOMESERVER): cv.url,
107 vol.Optional(CONF_VERIFY_SSL, default=
True): cv.boolean,
108 vol.Required(CONF_USERNAME): cv.matches_regex(CONF_USERNAME_REGEX),
109 vol.Required(CONF_PASSWORD): cv.string,
110 vol.Optional(CONF_ROOMS, default=[]): vol.All(
111 cv.ensure_list, [cv.matches_regex(CONF_ROOMS_REGEX)]
113 vol.Optional(CONF_COMMANDS, default=[]): [COMMAND_SCHEMA],
117 extra=vol.ALLOW_EXTRA,
120 SERVICE_SCHEMA_SEND_MESSAGE = vol.Schema(
122 vol.Required(ATTR_MESSAGE): cv.string,
123 vol.Optional(ATTR_DATA, default={}): {
124 vol.Optional(ATTR_FORMAT, default=DEFAULT_MESSAGE_FORMAT): vol.In(
127 vol.Optional(ATTR_IMAGES): vol.All(cv.ensure_list, [cv.string]),
129 vol.Required(ATTR_TARGET): vol.All(
130 cv.ensure_list, [cv.matches_regex(CONF_ROOMS_REGEX)]
136 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
137 """Set up the Matrix bot component."""
138 config = config[DOMAIN]
142 os.path.join(hass.config.path(), SESSION_FILE),
143 config[CONF_HOMESERVER],
144 config[CONF_VERIFY_SSL],
145 config[CONF_USERNAME],
146 config[CONF_PASSWORD],
148 config[CONF_COMMANDS],
150 hass.data[DOMAIN] = matrix_bot
152 hass.services.async_register(
154 SERVICE_SEND_MESSAGE,
155 matrix_bot.handle_send_message,
156 schema=SERVICE_SCHEMA_SEND_MESSAGE,
163 """The Matrix Bot."""
175 listening_rooms: list[RoomAnyID],
176 commands: list[ConfigCommand],
178 """Set up the client."""
193 self._listening_rooms: dict[RoomAnyID, RoomID] = {}
194 self._word_commands: dict[RoomID, dict[WordCommand, ConfigCommand]] = {}
195 self._expression_commands: dict[RoomID, list[ConfigCommand]] = {}
198 async
def stop_client(event: HassEvent) ->
None:
199 """Run once when Home Assistant stops."""
200 if self.
_client_client
is not None:
201 await self.
_client_client.close()
203 self.
hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_client)
205 async
def handle_startup(event: HassEvent) ->
None:
206 """Run once when Home Assistant finished startup."""
214 _LOGGER.debug(
"Starting initial sync for %s", self.
_mx_id_mx_id)
215 await self.
_client_client.sync(timeout=30_000)
216 _LOGGER.debug(
"Finished initial sync for %s", self.
_mx_id_mx_id)
220 _LOGGER.debug(
"Starting sync_forever for %s", self.
_mx_id_mx_id)
221 self.
hasshass.async_create_background_task(
222 self.
_client_client.sync_forever(
224 loop_sleep_time=1_000,
226 name=f
"{self.__class__.__name__}: sync_forever for '{self._mx_id}'",
229 self.
hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, handle_startup)
232 for command
in commands:
234 if rooms := command.get(CONF_ROOMS):
235 command[CONF_ROOMS] = [self._listening_rooms[room]
for room
in rooms]
237 command[CONF_ROOMS] =
list(self._listening_rooms.values())
240 if (word_command := command.get(CONF_WORD))
is not None:
241 for room_id
in command[CONF_ROOMS]:
242 self._word_commands.setdefault(room_id, {})
243 self._word_commands[room_id][word_command] = command
245 for room_id
in command[CONF_ROOMS]:
246 self._expression_commands.setdefault(room_id, [])
247 self._expression_commands[room_id].append(command)
250 """Handle a message sent to a Matrix room."""
252 if not isinstance(message, RoomMessageText):
255 if message.sender == self.
_mx_id_mx_id:
257 _LOGGER.debug(
"Handling message: %s", message.body)
259 room_id =
RoomID(room.room_id)
261 if message.body.startswith(
"!"):
263 pieces = message.body.split()
266 if command := self._word_commands.
get(room_id, {}).
get(word):
268 "command": command[CONF_NAME],
269 "sender": message.sender,
273 self.
hasshass.bus.async_fire(EVENT_MATRIX_COMMAND, message_data)
276 for command
in self._expression_commands.
get(room_id, []):
277 match = command[CONF_EXPRESSION].
match(message.body)
281 "command": command[CONF_NAME],
282 "sender": message.sender,
284 "args": match.groupdict(),
286 self.
hasshass.bus.async_fire(EVENT_MATRIX_COMMAND, message_data)
289 self, room_alias_or_id: RoomAnyID
290 ) -> dict[RoomAnyID, RoomID]:
291 """Resolve a single RoomAlias if needed."""
292 if room_alias_or_id.startswith(
"!"):
293 room_id =
RoomID(room_alias_or_id)
294 _LOGGER.debug(
"Will listen to room_id '%s'", room_id)
295 elif room_alias_or_id.startswith(
"#"):
297 resolve_response = await self.
_client_client.room_resolve_alias(room_alias)
298 if isinstance(resolve_response, RoomResolveAliasResponse):
299 room_id =
RoomID(resolve_response.room_id)
301 "Will listen to room_alias '%s' as room_id '%s'",
307 "Could not resolve '%s' to a room_id: '%s'",
313 return {room_alias_or_id: room_id}
316 """Resolve any RoomAliases into RoomIDs for the purpose of client interactions."""
318 self.
hasshass.async_create_task(
321 for room_alias_or_id
in listening_rooms
323 for resolved_room
in asyncio.as_completed(resolved_rooms):
324 self._listening_rooms |= await resolved_room
326 async
def _join_room(self, room_id: RoomID, room_alias_or_id: RoomAnyID) ->
None:
327 """Join a room or do nothing if already joined."""
328 join_response = await self.
_client_client.join(room_id)
330 if isinstance(join_response, JoinResponse):
331 _LOGGER.debug(
"Joined or already in room '%s'", room_alias_or_id)
332 elif isinstance(join_response, JoinError):
334 "Could not join room '%s': %s",
340 """Join the Matrix rooms that we listen for commands in."""
342 self.
hasshass.async_create_task(
343 self.
_join_room_join_room(room_id, room_alias_or_id), eager_start=
False
345 for room_alias_or_id, room_id
in self._listening_rooms.items()
347 await asyncio.wait(rooms)
350 """Read sorted authentication tokens from disk."""
352 return await self.
hasshass.async_add_executor_job(
355 except HomeAssistantError
as ex:
357 "Loading authentication tokens from file '%s' failed: %s",
364 """Store authentication token to session and persistent storage."""
367 await self.
hasshass.async_add_executor_job(
375 """Log in to the Matrix homeserver.
377 Attempts to use the stored access token.
378 If that fails, then tries using the password.
379 If that also fails, raises LocalProtocolError.
384 _LOGGER.debug(
"Restoring login from stored access token")
385 self.
_client_client.restore_login(
386 user_id=self.
_client_client.user_id,
387 device_id=self.
_client_client.device_id,
390 response = await self.
_client_client.whoami()
391 if isinstance(response, WhoamiError):
393 "Restoring login from access token failed: %s, %s",
394 response.status_code,
397 self.
_client_client.access_token = (
400 elif isinstance(response, WhoamiResponse):
402 "Successfully restored login from access token: user_id '%s', device_id '%s'",
408 if not self.
_client_client.logged_in:
409 response = await self.
_client_client.login(password=self.
_password_password)
410 _LOGGER.debug(
"Logging in using password")
412 if isinstance(response, LoginError):
414 "Login by password failed: %s, %s",
415 response.status_code,
419 if not self.
_client_client.logged_in:
421 "Login failed, both token and username/password are invalid"
427 self, target_room: RoomAnyID, message_type: str, content: dict
429 """Wrap _client.room_send and handle ErrorResponses."""
430 response: Response = await self.
_client_client.room_send(
431 room_id=self._listening_rooms.
get(target_room, target_room),
432 message_type=message_type,
435 if isinstance(response, ErrorResponse):
437 "Unable to deliver message to room '%s': %s",
442 _LOGGER.debug(
"Message delivered to room '%s'", target_room)
445 self, target_rooms: Sequence[RoomAnyID], message_type: str, content: dict
447 """Wrap _handle_room_send for multiple target_rooms."""
449 self.
hasshass.async_create_task(
451 target_room=target_room,
452 message_type=message_type,
457 for target_room
in target_rooms
461 self, image_path: str, target_rooms: Sequence[RoomAnyID]
463 """Upload an image, then send it to all target_rooms."""
464 _is_allowed_path = await self.
hasshass.async_add_executor_job(
465 self.
hasshass.config.is_allowed_path, image_path
467 if not _is_allowed_path:
468 _LOGGER.error(
"Path not allowed: %s", image_path)
472 image = await self.
hasshass.async_add_executor_job(Image.open, image_path)
473 (width, height) = image.size
474 mime_type = mimetypes.guess_type(image_path)[0]
475 file_stat = await aiofiles.os.stat(image_path)
477 _LOGGER.debug(
"Uploading file from path, %s", image_path)
478 async
with aiofiles.open(image_path,
"r+b")
as image_file:
479 response, _ = await self.
_client_client.upload(
481 content_type=mime_type,
482 filename=os.path.basename(image_path),
483 filesize=file_stat.st_size,
485 if isinstance(response, UploadError):
486 _LOGGER.error(
"Unable to upload image to the homeserver: %s", response)
488 if isinstance(response, UploadResponse):
489 _LOGGER.debug(
"Successfully uploaded image to the homeserver")
492 "Unknown response received when uploading image to homeserver: %s",
498 "body": os.path.basename(image_path),
500 "size": file_stat.st_size,
501 "mimetype": mime_type,
505 "msgtype":
"m.image",
506 "url": response.content_uri,
510 target_rooms=target_rooms, message_type=
"m.room.message", content=content
514 self, message: str, target_rooms: list[RoomAnyID], data: dict |
None
516 """Send a message to the Matrix server."""
517 content = {
"msgtype":
"m.text",
"body": message}
518 if data
is not None and data.get(ATTR_FORMAT) == FORMAT_HTML:
519 content |= {
"format":
"org.matrix.custom.html",
"formatted_body": message}
522 target_rooms=target_rooms, message_type=
"m.room.message", content=content
527 and (image_paths := data.get(ATTR_IMAGES, []))
528 and len(target_rooms) > 0
531 self.
hasshass.async_create_task(
532 self.
_send_image_send_image(image_path, target_rooms), eager_start=
False
534 for image_path
in image_paths
536 await asyncio.wait(image_tasks)
539 """Handle the send_message service."""
541 service.data[ATTR_MESSAGE],
542 service.data[ATTR_TARGET],
543 service.data.get(ATTR_DATA),
None handle_send_message(self, ServiceCall service)
None _handle_multi_room_send(self, Sequence[RoomAnyID] target_rooms, str message_type, dict content)
None _resolve_room_aliases(self, list[RoomAnyID] listening_rooms)
JsonObjectType _get_auth_tokens(self)
None _handle_room_send(self, RoomAnyID target_room, str message_type, dict content)
None _handle_room_message(self, MatrixRoom room, Event message)
None __init__(self, HomeAssistant hass, str config_file, str homeserver, bool verify_ssl, str username, str password, list[RoomAnyID] listening_rooms, list[ConfigCommand] commands)
None _store_auth_token(self, str token)
None _send_message(self, str message, list[RoomAnyID] target_rooms, dict|None data)
None _join_room(self, RoomID room_id, RoomAnyID room_alias_or_id)
None _load_commands(self, list[ConfigCommand] commands)
None _send_image(self, str image_path, Sequence[RoomAnyID] target_rooms)
dict[RoomAnyID, RoomID] _resolve_room_alias(self, RoomAnyID room_alias_or_id)
list[_T] match(self, BluetoothServiceInfoBleak service_info)
web.Response get(self, web.Request request, str config_key)
bool async_setup(HomeAssistant hass, ConfigType config)