Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """Jabber (XMPP) notification service."""
2 
3 from __future__ import annotations
4 
5 from concurrent.futures import TimeoutError as FutTimeoutError
6 from http import HTTPStatus
7 import logging
8 import mimetypes
9 import pathlib
10 import random
11 import string
12 
13 import requests
14 import slixmpp
15 from slixmpp.exceptions import IqError, IqTimeout, XMPPError
16 from slixmpp.plugins.xep_0363.http_upload import (
17  FileTooBig,
18  FileUploadError,
19  UploadServiceNotFound,
20 )
21 from slixmpp.xmlstream.xmlstream import NotConnectedError
22 import voluptuous as vol
23 
25  ATTR_TITLE,
26  ATTR_TITLE_DEFAULT,
27  PLATFORM_SCHEMA as NOTIFY_PLATFORM_SCHEMA,
28  BaseNotificationService,
29 )
30 from homeassistant.const import (
31  CONF_PASSWORD,
32  CONF_RECIPIENT,
33  CONF_RESOURCE,
34  CONF_ROOM,
35  CONF_SENDER,
36 )
37 from homeassistant.core import HomeAssistant
39 import homeassistant.helpers.template as template_helper
40 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
41 
42 _LOGGER = logging.getLogger(__name__)
43 
44 ATTR_DATA = "data"
45 ATTR_PATH = "path"
46 ATTR_PATH_TEMPLATE = "path_template"
47 ATTR_TIMEOUT = "timeout"
48 ATTR_URL = "url"
49 ATTR_URL_TEMPLATE = "url_template"
50 ATTR_VERIFY = "verify"
51 
52 CONF_TLS = "tls"
53 CONF_VERIFY = "verify"
54 
55 DEFAULT_CONTENT_TYPE = "application/octet-stream"
56 DEFAULT_RESOURCE = "home-assistant"
57 XEP_0363_TIMEOUT = 10
58 
59 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
60  {
61  vol.Required(CONF_SENDER): cv.string,
62  vol.Required(CONF_PASSWORD): cv.string,
63  vol.Required(CONF_RECIPIENT): vol.All(cv.ensure_list, [cv.string]),
64  vol.Optional(CONF_RESOURCE, default=DEFAULT_RESOURCE): cv.string,
65  vol.Optional(CONF_ROOM, default=""): cv.string,
66  vol.Optional(CONF_TLS, default=True): cv.boolean,
67  vol.Optional(CONF_VERIFY, default=True): cv.boolean,
68  }
69 )
70 
71 
73  hass: HomeAssistant,
74  config: ConfigType,
75  discovery_info: DiscoveryInfoType | None = None,
76 ) -> XmppNotificationService:
77  """Get the Jabber (XMPP) notification service."""
79  config.get(CONF_SENDER),
80  config.get(CONF_RESOURCE),
81  config.get(CONF_PASSWORD),
82  config.get(CONF_RECIPIENT),
83  config.get(CONF_TLS),
84  config.get(CONF_VERIFY),
85  config.get(CONF_ROOM),
86  hass,
87  )
88 
89 
90 class XmppNotificationService(BaseNotificationService):
91  """Implement the notification service for Jabber (XMPP)."""
92 
93  def __init__(self, sender, resource, password, recipient, tls, verify, room, hass):
94  """Initialize the service."""
95  self._hass_hass = hass
96  self._sender_sender = sender
97  self._resource_resource = resource
98  self._password_password = password
99  self._recipients_recipients = recipient
100  self._tls_tls = tls
101  self._verify_verify = verify
102  self._room_room = room
103 
104  async def async_send_message(self, message="", **kwargs):
105  """Send a message to a user."""
106  title = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)
107  text = f"{title}: {message}" if title else message
108  data = kwargs.get(ATTR_DATA)
109  timeout = data.get(ATTR_TIMEOUT, XEP_0363_TIMEOUT) if data else None
110 
111  await async_send_message(
112  f"{self._sender}/{self._resource}",
113  self._password_password,
114  self._recipients_recipients,
115  self._tls_tls,
116  self._verify_verify,
117  self._room_room,
118  self._hass_hass,
119  text,
120  timeout,
121  data,
122  )
123 
124 
125 async def async_send_message( # noqa: C901
126  sender,
127  password,
128  recipients,
129  use_tls,
130  verify_certificate,
131  room,
132  hass,
133  message,
134  timeout=None,
135  data=None,
136 ):
137  """Send a message over XMPP."""
138 
139  class SendNotificationBot(slixmpp.ClientXMPP):
140  """Service for sending Jabber (XMPP) messages."""
141 
142  def __init__(self):
143  """Initialize the Jabber Bot."""
144  super().__init__(sender, password)
145 
146  self.loop = hass.loop
147 
148  self.force_starttls = use_tls
149  self.use_ipv6 = False
150  self.add_event_handler("failed_all_auth", self.disconnect_on_login_fail)
151  self.add_event_handler("session_start", self.start)
152 
153  if room:
154  self.register_plugin("xep_0045") # MUC
155  if not verify_certificate:
156  self.add_event_handler(
157  "ssl_invalid_cert", self.discard_ssl_invalid_cert
158  )
159  if data:
160  # Init XEPs for image sending
161  self.register_plugin("xep_0030") # OOB dep
162  self.register_plugin("xep_0066") # Out of Band Data
163  self.register_plugin("xep_0071") # XHTML IM
164  self.register_plugin("xep_0128") # Service Discovery
165  self.register_plugin("xep_0363") # HTTP upload
166 
167  self.connect(force_starttls=self.force_starttls, use_ssl=False)
168 
169  async def start(self, event):
170  """Start the communication and sends the message."""
171  if room:
172  _LOGGER.debug("Joining room %s", room)
173  await self.plugin["xep_0045"].join_muc_wait(room, sender, seconds=0)
174  # Sending image and message independently from each other
175  if data:
176  await self.send_file(timeout=timeout)
177  if message:
178  self.send_text_message()
179 
180  self.disconnect()
181 
182  async def send_file(self, timeout=None):
183  """Send file via XMPP.
184 
185  Send XMPP file message using OOB (XEP_0066) and
186  HTTP Upload (XEP_0363)
187  """
188  try:
189  # Uploading with XEP_0363
190  _LOGGER.debug("Timeout set to %ss", timeout)
191  url = await self.upload_file(timeout=timeout)
192 
193  _LOGGER.debug("Upload success")
194  for recipient in recipients:
195  if room:
196  _LOGGER.debug("Sending file to %s", room)
197  message = self.Message(sto=room, stype="groupchat")
198  else:
199  _LOGGER.debug("Sending file to %s", recipient)
200  message = self.Message(sto=recipient, stype="chat")
201  message["body"] = url
202  message["oob"]["url"] = url
203  try:
204  message.send()
205  except (IqError, IqTimeout, XMPPError) as ex:
206  _LOGGER.error("Could not send image message %s", ex)
207  if room:
208  break
209  except (IqError, IqTimeout, XMPPError) as ex:
210  _LOGGER.error("Upload error, could not send message %s", ex)
211  except NotConnectedError as ex:
212  _LOGGER.error("Connection error %s", ex)
213  except FileTooBig as ex:
214  _LOGGER.error("File too big for server, could not upload file %s", ex)
215  except UploadServiceNotFound as ex:
216  _LOGGER.error("UploadServiceNotFound, could not upload file %s", ex)
217  except FileUploadError as ex:
218  _LOGGER.error("FileUploadError, could not upload file %s", ex)
219  except requests.exceptions.SSLError as ex:
220  _LOGGER.error("Cannot establish SSL connection %s", ex)
221  except requests.exceptions.ConnectionError as ex:
222  _LOGGER.error("Cannot connect to server %s", ex)
223  except (
224  FileNotFoundError,
225  PermissionError,
226  IsADirectoryError,
227  TimeoutError,
228  ) as ex:
229  _LOGGER.error("Error reading file %s", ex)
230  except FutTimeoutError as ex:
231  _LOGGER.error("The server did not respond in time, %s", ex)
232 
233  async def upload_file(self, timeout=None):
234  """Upload file to Jabber server and return new URL.
235 
236  upload a file with Jabber XEP_0363 from a remote URL or a local
237  file path and return a URL of that file.
238  """
239  if data.get(ATTR_URL_TEMPLATE):
240  _LOGGER.debug("Got url template: %s", data[ATTR_URL_TEMPLATE])
241  templ = template_helper.Template(data[ATTR_URL_TEMPLATE], hass)
242  get_url = template_helper.render_complex(templ, None)
243  url = await self.upload_file_from_url(get_url, timeout=timeout)
244  elif data.get(ATTR_URL):
245  url = await self.upload_file_from_url(data[ATTR_URL], timeout=timeout)
246  elif data.get(ATTR_PATH_TEMPLATE):
247  _LOGGER.debug("Got path template: %s", data[ATTR_PATH_TEMPLATE])
248  templ = template_helper.Template(data[ATTR_PATH_TEMPLATE], hass)
249  get_path = template_helper.render_complex(templ, None)
250  url = await self.upload_file_from_path(get_path, timeout=timeout)
251  elif data.get(ATTR_PATH):
252  url = await self.upload_file_from_path(data[ATTR_PATH], timeout=timeout)
253  else:
254  url = None
255 
256  if url is None:
257  _LOGGER.error("No path or URL found for file")
258  raise FileUploadError("Could not upload file")
259 
260  return url
261 
262  async def upload_file_from_url(self, url, timeout=None):
263  """Upload a file from a URL. Returns a URL.
264 
265  uploaded via XEP_0363 and HTTP and returns the resulting URL
266  """
267  _LOGGER.debug("Getting file from %s", url)
268 
269  def get_url(url):
270  """Return result for GET request to url."""
271  return requests.get(
272  url, verify=data.get(ATTR_VERIFY, True), timeout=timeout
273  )
274 
275  result = await hass.async_add_executor_job(get_url, url)
276 
277  if result.status_code >= HTTPStatus.BAD_REQUEST:
278  _LOGGER.error("Could not load file from %s", url)
279  return None
280 
281  filesize = len(result.content)
282 
283  # we need a file extension, the upload server needs a
284  # filename, if none is provided, through the path we guess
285  # the extension
286  # also setting random filename for privacy
287  if data.get(ATTR_PATH):
288  # using given path as base for new filename. Don't guess type
289  filename = self.get_random_filename(data.get(ATTR_PATH))
290  else:
291  extension = (
292  mimetypes.guess_extension(result.headers["Content-Type"])
293  or ".unknown"
294  )
295  _LOGGER.debug("Got %s extension", extension)
296  filename = self.get_random_filename(None, extension=extension)
297 
298  _LOGGER.debug("Uploading file from URL, %s", filename)
299 
300  return await self["xep_0363"].upload_file(
301  filename,
302  size=filesize,
303  input_file=result.content,
304  content_type=result.headers["Content-Type"],
305  timeout=timeout,
306  )
307 
308  def _read_upload_file(self, path: str) -> bytes:
309  """Read file from path."""
310  with open(path, "rb") as upfile:
311  _LOGGER.debug("Reading file %s", path)
312  return upfile.read()
313 
314  async def upload_file_from_path(self, path: str, timeout=None):
315  """Upload a file from a local file path via XEP_0363."""
316  _LOGGER.debug("Uploading file from path, %s", path)
317 
318  if not hass.config.is_allowed_path(path):
319  raise PermissionError("Could not access file. Path not allowed")
320 
321  input_file = await hass.async_add_executor_job(self._read_upload_file, path)
322  filesize = len(input_file)
323  _LOGGER.debug("Filesize is %s bytes", filesize)
324 
325  if (content_type := mimetypes.guess_type(path)[0]) is None:
326  content_type = DEFAULT_CONTENT_TYPE
327  _LOGGER.debug("Content type is %s", content_type)
328 
329  # set random filename for privacy
330  filename = self.get_random_filename(data.get(ATTR_PATH))
331  _LOGGER.debug("Uploading file with random filename %s", filename)
332 
333  return await self["xep_0363"].upload_file(
334  filename,
335  size=filesize,
336  input_file=input_file,
337  content_type=content_type,
338  timeout=timeout,
339  )
340 
341  def send_text_message(self):
342  """Send a text only message to a room or a recipient."""
343  try:
344  if room:
345  _LOGGER.debug("Sending message to room %s", room)
346  self.send_message(mto=room, mbody=message, mtype="groupchat")
347  else:
348  for recipient in recipients:
349  _LOGGER.debug("Sending message to %s", recipient)
350  self.send_message(mto=recipient, mbody=message, mtype="chat")
351  except (IqError, IqTimeout, XMPPError) as ex:
352  _LOGGER.error("Could not send text message %s", ex)
353  except NotConnectedError as ex:
354  _LOGGER.error("Connection error %s", ex)
355 
356  def get_random_filename(self, filename, extension=None):
357  """Return a random filename, leaving the extension intact."""
358  if extension is None:
359  path = pathlib.Path(filename)
360  if path.suffix:
361  extension = "".join(path.suffixes)
362  else:
363  extension = ".txt"
364  return (
365  "".join(random.choice(string.ascii_letters) for i in range(10))
366  + extension
367  )
368 
369  def disconnect_on_login_fail(self, event):
370  """Disconnect from the server if credentials are invalid."""
371  _LOGGER.warning("Login failed")
372  self.disconnect()
373 
374  @staticmethod
375  def discard_ssl_invalid_cert(event):
376  """Do nothing if ssl certificate is invalid."""
377  _LOGGER.debug("Ignoring invalid SSL certificate as requested")
378 
379  SendNotificationBot()
def async_send_message(self, message="", **kwargs)
Definition: notify.py:104
def __init__(self, sender, resource, password, recipient, tls, verify, room, hass)
Definition: notify.py:93
None __init__(self, _AOSmithCoordinatorT coordinator, str junction_id)
Definition: entity.py:20
None open(self, **Any kwargs)
Definition: lock.py:86
def async_send_message(sender, password, recipients, use_tls, verify_certificate, room, hass, message, timeout=None, data=None)
Definition: notify.py:136
XmppNotificationService async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:76
str get_url(HomeAssistant hass, *bool require_current_request=False, bool require_ssl=False, bool require_standard_port=False, bool require_cloud=False, bool allow_internal=True, bool allow_external=True, bool allow_cloud=True, bool|None allow_ip=None, bool|None prefer_external=None, bool prefer_cloud=False)
Definition: network.py:131