Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The Matrix bot component."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Sequence
7 import logging
8 import mimetypes
9 import os
10 import re
11 from typing import Final, NewType, Required, TypedDict
12 
13 import aiofiles.os
14 from nio import AsyncClient, Event, MatrixRoom
15 from nio.events.room_events import RoomMessageText
16 from nio.responses import (
17  ErrorResponse,
18  JoinError,
19  JoinResponse,
20  LoginError,
21  Response,
22  RoomResolveAliasResponse,
23  UploadError,
24  UploadResponse,
25  WhoamiError,
26  WhoamiResponse,
27 )
28 from PIL import Image
29 import voluptuous as vol
30 
31 from homeassistant.components.notify import ATTR_DATA, ATTR_MESSAGE, ATTR_TARGET
32 from homeassistant.const import (
33  CONF_NAME,
34  CONF_PASSWORD,
35  CONF_USERNAME,
36  CONF_VERIFY_SSL,
37  EVENT_HOMEASSISTANT_START,
38  EVENT_HOMEASSISTANT_STOP,
39 )
40 from homeassistant.core import Event as HassEvent, HomeAssistant, ServiceCall
41 from homeassistant.exceptions import ConfigEntryAuthFailed, HomeAssistantError
43 from homeassistant.helpers.json import save_json
44 from homeassistant.helpers.typing import ConfigType
45 from homeassistant.util.json import JsonObjectType, load_json_object
46 
47 from .const import DOMAIN, FORMAT_HTML, FORMAT_TEXT, SERVICE_SEND_MESSAGE
48 
49 _LOGGER = logging.getLogger(__name__)
50 
51 SESSION_FILE = ".matrix.conf"
52 
53 CONF_HOMESERVER: Final = "homeserver"
54 CONF_ROOMS: Final = "rooms"
55 CONF_COMMANDS: Final = "commands"
56 CONF_WORD: Final = "word"
57 CONF_EXPRESSION: Final = "expression"
58 
59 CONF_USERNAME_REGEX = "^@[^:]*:.*"
60 CONF_ROOMS_REGEX = "^[!|#][^:]*:.*"
61 
62 EVENT_MATRIX_COMMAND = "matrix_command"
63 
64 DEFAULT_CONTENT_TYPE = "application/octet-stream"
65 
66 MESSAGE_FORMATS = [FORMAT_HTML, FORMAT_TEXT]
67 DEFAULT_MESSAGE_FORMAT = FORMAT_TEXT
68 
69 ATTR_FORMAT = "format" # optional message format
70 ATTR_IMAGES = "images" # optional images
71 
72 WordCommand = NewType("WordCommand", str)
73 ExpressionCommand = NewType("ExpressionCommand", re.Pattern)
74 RoomAlias = NewType("RoomAlias", str) # Starts with "#"
75 RoomID = NewType("RoomID", str) # Starts with "!"
76 RoomAnyID = RoomID | RoomAlias
77 
78 
79 class ConfigCommand(TypedDict, total=False):
80  """Corresponds to a single COMMAND_SCHEMA."""
81 
82  name: Required[str] # CONF_NAME
83  rooms: list[RoomID] # CONF_ROOMS
84  word: WordCommand # CONF_WORD
85  expression: ExpressionCommand # CONF_EXPRESSION
86 
87 
88 COMMAND_SCHEMA = vol.All(
89  vol.Schema(
90  {
91  vol.Exclusive(CONF_WORD, "trigger"): cv.string,
92  vol.Exclusive(CONF_EXPRESSION, "trigger"): cv.is_regex,
93  vol.Required(CONF_NAME): cv.string,
94  vol.Optional(CONF_ROOMS): vol.All(
95  cv.ensure_list, [cv.matches_regex(CONF_ROOMS_REGEX)]
96  ),
97  }
98  ),
99  cv.has_at_least_one_key(CONF_WORD, CONF_EXPRESSION),
100 )
101 
102 CONFIG_SCHEMA = vol.Schema(
103  {
104  DOMAIN: vol.Schema(
105  {
106  vol.Required(CONF_HOMESERVER): cv.url,
107  vol.Optional(CONF_VERIFY_SSL, default=True): cv.boolean,
108  vol.Required(CONF_USERNAME): cv.matches_regex(CONF_USERNAME_REGEX),
109  vol.Required(CONF_PASSWORD): cv.string,
110  vol.Optional(CONF_ROOMS, default=[]): vol.All(
111  cv.ensure_list, [cv.matches_regex(CONF_ROOMS_REGEX)]
112  ),
113  vol.Optional(CONF_COMMANDS, default=[]): [COMMAND_SCHEMA],
114  }
115  )
116  },
117  extra=vol.ALLOW_EXTRA,
118 )
119 
120 SERVICE_SCHEMA_SEND_MESSAGE = vol.Schema(
121  {
122  vol.Required(ATTR_MESSAGE): cv.string,
123  vol.Optional(ATTR_DATA, default={}): {
124  vol.Optional(ATTR_FORMAT, default=DEFAULT_MESSAGE_FORMAT): vol.In(
125  MESSAGE_FORMATS
126  ),
127  vol.Optional(ATTR_IMAGES): vol.All(cv.ensure_list, [cv.string]),
128  },
129  vol.Required(ATTR_TARGET): vol.All(
130  cv.ensure_list, [cv.matches_regex(CONF_ROOMS_REGEX)]
131  ),
132  }
133 )
134 
135 
136 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
137  """Set up the Matrix bot component."""
138  config = config[DOMAIN]
139 
140  matrix_bot = MatrixBot(
141  hass,
142  os.path.join(hass.config.path(), SESSION_FILE),
143  config[CONF_HOMESERVER],
144  config[CONF_VERIFY_SSL],
145  config[CONF_USERNAME],
146  config[CONF_PASSWORD],
147  config[CONF_ROOMS],
148  config[CONF_COMMANDS],
149  )
150  hass.data[DOMAIN] = matrix_bot
151 
152  hass.services.async_register(
153  DOMAIN,
154  SERVICE_SEND_MESSAGE,
155  matrix_bot.handle_send_message,
156  schema=SERVICE_SCHEMA_SEND_MESSAGE,
157  )
158 
159  return True
160 
161 
162 class MatrixBot:
163  """The Matrix Bot."""
164 
165  _client: AsyncClient
166 
167  def __init__(
168  self,
169  hass: HomeAssistant,
170  config_file: str,
171  homeserver: str,
172  verify_ssl: bool,
173  username: str,
174  password: str,
175  listening_rooms: list[RoomAnyID],
176  commands: list[ConfigCommand],
177  ) -> None:
178  """Set up the client."""
179  self.hasshass = hass
180 
181  self._session_filepath_session_filepath = config_file
182  self._access_tokens_access_tokens: JsonObjectType = {}
183 
184  self._homeserver_homeserver = homeserver
185  self._verify_tls_verify_tls = verify_ssl
186  self._mx_id_mx_id = username
187  self._password_password = password
188 
189  self._client_client = AsyncClient(
190  homeserver=self._homeserver_homeserver, user=self._mx_id_mx_id, ssl=self._verify_tls_verify_tls
191  )
192 
193  self._listening_rooms: dict[RoomAnyID, RoomID] = {}
194  self._word_commands: dict[RoomID, dict[WordCommand, ConfigCommand]] = {}
195  self._expression_commands: dict[RoomID, list[ConfigCommand]] = {}
196  self._unparsed_commands_unparsed_commands = commands
197 
198  async def stop_client(event: HassEvent) -> None:
199  """Run once when Home Assistant stops."""
200  if self._client_client is not None:
201  await self._client_client.close()
202 
203  self.hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_client)
204 
205  async def handle_startup(event: HassEvent) -> None:
206  """Run once when Home Assistant finished startup."""
207  self._access_tokens_access_tokens = await self._get_auth_tokens_get_auth_tokens()
208  await self._login_login()
209  await self._resolve_room_aliases_resolve_room_aliases(listening_rooms)
210  self._load_commands_load_commands(commands)
211  await self._join_rooms_join_rooms()
212 
213  # Sync once so that we don't respond to past events.
214  _LOGGER.debug("Starting initial sync for %s", self._mx_id_mx_id)
215  await self._client_client.sync(timeout=30_000)
216  _LOGGER.debug("Finished initial sync for %s", self._mx_id_mx_id)
217 
218  self._client_client.add_event_callback(self._handle_room_message_handle_room_message, RoomMessageText)
219 
220  _LOGGER.debug("Starting sync_forever for %s", self._mx_id_mx_id)
221  self.hasshass.async_create_background_task(
222  self._client_client.sync_forever(
223  timeout=30_000,
224  loop_sleep_time=1_000,
225  ), # milliseconds.
226  name=f"{self.__class__.__name__}: sync_forever for '{self._mx_id}'",
227  )
228 
229  self.hasshass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, handle_startup)
230 
231  def _load_commands(self, commands: list[ConfigCommand]) -> None:
232  for command in commands:
233  # Set the command for all listening_rooms, unless otherwise specified.
234  if rooms := command.get(CONF_ROOMS):
235  command[CONF_ROOMS] = [self._listening_rooms[room] for room in rooms]
236  else:
237  command[CONF_ROOMS] = list(self._listening_rooms.values())
238 
239  # COMMAND_SCHEMA guarantees that exactly one of CONF_WORD and CONF_EXPRESSION are set.
240  if (word_command := command.get(CONF_WORD)) is not None:
241  for room_id in command[CONF_ROOMS]:
242  self._word_commands.setdefault(room_id, {})
243  self._word_commands[room_id][word_command] = command
244  else:
245  for room_id in command[CONF_ROOMS]:
246  self._expression_commands.setdefault(room_id, [])
247  self._expression_commands[room_id].append(command)
248 
249  async def _handle_room_message(self, room: MatrixRoom, message: Event) -> None:
250  """Handle a message sent to a Matrix room."""
251  # Corresponds to message type 'm.text' and NOT other RoomMessage subtypes, like 'm.notice' and 'm.emote'.
252  if not isinstance(message, RoomMessageText):
253  return
254  # Don't respond to our own messages.
255  if message.sender == self._mx_id_mx_id:
256  return
257  _LOGGER.debug("Handling message: %s", message.body)
258 
259  room_id = RoomID(room.room_id)
260 
261  if message.body.startswith("!"):
262  # Could trigger a single-word command.
263  pieces = message.body.split()
264  word = WordCommand(pieces[0].lstrip("!"))
265 
266  if command := self._word_commands.get(room_id, {}).get(word):
267  message_data = {
268  "command": command[CONF_NAME],
269  "sender": message.sender,
270  "room": room_id,
271  "args": pieces[1:],
272  }
273  self.hasshass.bus.async_fire(EVENT_MATRIX_COMMAND, message_data)
274 
275  # After single-word commands, check all regex commands in the room.
276  for command in self._expression_commands.get(room_id, []):
277  match = command[CONF_EXPRESSION].match(message.body)
278  if not match:
279  continue
280  message_data = {
281  "command": command[CONF_NAME],
282  "sender": message.sender,
283  "room": room_id,
284  "args": match.groupdict(),
285  }
286  self.hasshass.bus.async_fire(EVENT_MATRIX_COMMAND, message_data)
287 
289  self, room_alias_or_id: RoomAnyID
290  ) -> dict[RoomAnyID, RoomID]:
291  """Resolve a single RoomAlias if needed."""
292  if room_alias_or_id.startswith("!"):
293  room_id = RoomID(room_alias_or_id)
294  _LOGGER.debug("Will listen to room_id '%s'", room_id)
295  elif room_alias_or_id.startswith("#"):
296  room_alias = RoomAlias(room_alias_or_id)
297  resolve_response = await self._client_client.room_resolve_alias(room_alias)
298  if isinstance(resolve_response, RoomResolveAliasResponse):
299  room_id = RoomID(resolve_response.room_id)
300  _LOGGER.debug(
301  "Will listen to room_alias '%s' as room_id '%s'",
302  room_alias_or_id,
303  room_id,
304  )
305  else:
306  _LOGGER.error(
307  "Could not resolve '%s' to a room_id: '%s'",
308  room_alias_or_id,
309  resolve_response,
310  )
311  return {}
312  # The config schema guarantees it's a valid room alias or id, so room_id is always set.
313  return {room_alias_or_id: room_id}
314 
315  async def _resolve_room_aliases(self, listening_rooms: list[RoomAnyID]) -> None:
316  """Resolve any RoomAliases into RoomIDs for the purpose of client interactions."""
317  resolved_rooms = [
318  self.hasshass.async_create_task(
319  self._resolve_room_alias_resolve_room_alias(room_alias_or_id), eager_start=False
320  )
321  for room_alias_or_id in listening_rooms
322  ]
323  for resolved_room in asyncio.as_completed(resolved_rooms):
324  self._listening_rooms |= await resolved_room
325 
326  async def _join_room(self, room_id: RoomID, room_alias_or_id: RoomAnyID) -> None:
327  """Join a room or do nothing if already joined."""
328  join_response = await self._client_client.join(room_id)
329 
330  if isinstance(join_response, JoinResponse):
331  _LOGGER.debug("Joined or already in room '%s'", room_alias_or_id)
332  elif isinstance(join_response, JoinError):
333  _LOGGER.error(
334  "Could not join room '%s': %s",
335  room_alias_or_id,
336  join_response,
337  )
338 
339  async def _join_rooms(self) -> None:
340  """Join the Matrix rooms that we listen for commands in."""
341  rooms = [
342  self.hasshass.async_create_task(
343  self._join_room_join_room(room_id, room_alias_or_id), eager_start=False
344  )
345  for room_alias_or_id, room_id in self._listening_rooms.items()
346  ]
347  await asyncio.wait(rooms)
348 
349  async def _get_auth_tokens(self) -> JsonObjectType:
350  """Read sorted authentication tokens from disk."""
351  try:
352  return await self.hasshass.async_add_executor_job(
353  load_json_object, self._session_filepath_session_filepath
354  )
355  except HomeAssistantError as ex:
356  _LOGGER.warning(
357  "Loading authentication tokens from file '%s' failed: %s",
358  self._session_filepath_session_filepath,
359  str(ex),
360  )
361  return {}
362 
363  async def _store_auth_token(self, token: str) -> None:
364  """Store authentication token to session and persistent storage."""
365  self._access_tokens_access_tokens[self._mx_id_mx_id] = token
366 
367  await self.hasshass.async_add_executor_job(
368  save_json,
369  self._session_filepath_session_filepath,
370  self._access_tokens_access_tokens,
371  True, # private=True
372  )
373 
374  async def _login(self) -> None:
375  """Log in to the Matrix homeserver.
376 
377  Attempts to use the stored access token.
378  If that fails, then tries using the password.
379  If that also fails, raises LocalProtocolError.
380  """
381 
382  # If we have an access token
383  if (token := self._access_tokens_access_tokens.get(self._mx_id_mx_id)) is not None:
384  _LOGGER.debug("Restoring login from stored access token")
385  self._client_client.restore_login(
386  user_id=self._client_client.user_id,
387  device_id=self._client_client.device_id,
388  access_token=token,
389  )
390  response = await self._client_client.whoami()
391  if isinstance(response, WhoamiError):
392  _LOGGER.warning(
393  "Restoring login from access token failed: %s, %s",
394  response.status_code,
395  response.message,
396  )
397  self._client_client.access_token = (
398  "" # Force a soft-logout if the homeserver didn't.
399  )
400  elif isinstance(response, WhoamiResponse):
401  _LOGGER.debug(
402  "Successfully restored login from access token: user_id '%s', device_id '%s'",
403  response.user_id,
404  response.device_id,
405  )
406 
407  # If the token login did not succeed
408  if not self._client_client.logged_in:
409  response = await self._client_client.login(password=self._password_password)
410  _LOGGER.debug("Logging in using password")
411 
412  if isinstance(response, LoginError):
413  _LOGGER.warning(
414  "Login by password failed: %s, %s",
415  response.status_code,
416  response.message,
417  )
418 
419  if not self._client_client.logged_in:
420  raise ConfigEntryAuthFailed(
421  "Login failed, both token and username/password are invalid"
422  )
423 
424  await self._store_auth_token_store_auth_token(self._client_client.access_token)
425 
426  async def _handle_room_send(
427  self, target_room: RoomAnyID, message_type: str, content: dict
428  ) -> None:
429  """Wrap _client.room_send and handle ErrorResponses."""
430  response: Response = await self._client_client.room_send(
431  room_id=self._listening_rooms.get(target_room, target_room),
432  message_type=message_type,
433  content=content,
434  )
435  if isinstance(response, ErrorResponse):
436  _LOGGER.error(
437  "Unable to deliver message to room '%s': %s",
438  target_room,
439  response,
440  )
441  else:
442  _LOGGER.debug("Message delivered to room '%s'", target_room)
443 
445  self, target_rooms: Sequence[RoomAnyID], message_type: str, content: dict
446  ) -> None:
447  """Wrap _handle_room_send for multiple target_rooms."""
448  await asyncio.wait(
449  self.hasshass.async_create_task(
450  self._handle_room_send_handle_room_send(
451  target_room=target_room,
452  message_type=message_type,
453  content=content,
454  ),
455  eager_start=False,
456  )
457  for target_room in target_rooms
458  )
459 
460  async def _send_image(
461  self, image_path: str, target_rooms: Sequence[RoomAnyID]
462  ) -> None:
463  """Upload an image, then send it to all target_rooms."""
464  _is_allowed_path = await self.hasshass.async_add_executor_job(
465  self.hasshass.config.is_allowed_path, image_path
466  )
467  if not _is_allowed_path:
468  _LOGGER.error("Path not allowed: %s", image_path)
469  return
470 
471  # Get required image metadata.
472  image = await self.hasshass.async_add_executor_job(Image.open, image_path)
473  (width, height) = image.size
474  mime_type = mimetypes.guess_type(image_path)[0]
475  file_stat = await aiofiles.os.stat(image_path)
476 
477  _LOGGER.debug("Uploading file from path, %s", image_path)
478  async with aiofiles.open(image_path, "r+b") as image_file:
479  response, _ = await self._client_client.upload(
480  image_file,
481  content_type=mime_type,
482  filename=os.path.basename(image_path),
483  filesize=file_stat.st_size,
484  )
485  if isinstance(response, UploadError):
486  _LOGGER.error("Unable to upload image to the homeserver: %s", response)
487  return
488  if isinstance(response, UploadResponse):
489  _LOGGER.debug("Successfully uploaded image to the homeserver")
490  else:
491  _LOGGER.error(
492  "Unknown response received when uploading image to homeserver: %s",
493  response,
494  )
495  return
496 
497  content = {
498  "body": os.path.basename(image_path),
499  "info": {
500  "size": file_stat.st_size,
501  "mimetype": mime_type,
502  "w": width,
503  "h": height,
504  },
505  "msgtype": "m.image",
506  "url": response.content_uri,
507  }
508 
509  await self._handle_multi_room_send_handle_multi_room_send(
510  target_rooms=target_rooms, message_type="m.room.message", content=content
511  )
512 
513  async def _send_message(
514  self, message: str, target_rooms: list[RoomAnyID], data: dict | None
515  ) -> None:
516  """Send a message to the Matrix server."""
517  content = {"msgtype": "m.text", "body": message}
518  if data is not None and data.get(ATTR_FORMAT) == FORMAT_HTML:
519  content |= {"format": "org.matrix.custom.html", "formatted_body": message}
520 
521  await self._handle_multi_room_send_handle_multi_room_send(
522  target_rooms=target_rooms, message_type="m.room.message", content=content
523  )
524 
525  if (
526  data is not None
527  and (image_paths := data.get(ATTR_IMAGES, []))
528  and len(target_rooms) > 0
529  ):
530  image_tasks = [
531  self.hasshass.async_create_task(
532  self._send_image_send_image(image_path, target_rooms), eager_start=False
533  )
534  for image_path in image_paths
535  ]
536  await asyncio.wait(image_tasks)
537 
538  async def handle_send_message(self, service: ServiceCall) -> None:
539  """Handle the send_message service."""
540  await self._send_message_send_message(
541  service.data[ATTR_MESSAGE],
542  service.data[ATTR_TARGET],
543  service.data.get(ATTR_DATA),
544  )
None handle_send_message(self, ServiceCall service)
Definition: __init__.py:538
None _handle_multi_room_send(self, Sequence[RoomAnyID] target_rooms, str message_type, dict content)
Definition: __init__.py:446
None _resolve_room_aliases(self, list[RoomAnyID] listening_rooms)
Definition: __init__.py:315
JsonObjectType _get_auth_tokens(self)
Definition: __init__.py:349
None _handle_room_send(self, RoomAnyID target_room, str message_type, dict content)
Definition: __init__.py:428
None _handle_room_message(self, MatrixRoom room, Event message)
Definition: __init__.py:249
None __init__(self, HomeAssistant hass, str config_file, str homeserver, bool verify_ssl, str username, str password, list[RoomAnyID] listening_rooms, list[ConfigCommand] commands)
Definition: __init__.py:177
None _store_auth_token(self, str token)
Definition: __init__.py:363
None _send_message(self, str message, list[RoomAnyID] target_rooms, dict|None data)
Definition: __init__.py:515
None _join_room(self, RoomID room_id, RoomAnyID room_alias_or_id)
Definition: __init__.py:326
None _load_commands(self, list[ConfigCommand] commands)
Definition: __init__.py:231
None _send_image(self, str image_path, Sequence[RoomAnyID] target_rooms)
Definition: __init__.py:462
dict[RoomAnyID, RoomID] _resolve_room_alias(self, RoomAnyID room_alias_or_id)
Definition: __init__.py:290
list[_T] match(self, BluetoothServiceInfoBleak service_info)
Definition: match.py:246
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:136