Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """Rocket.Chat notification service."""
2 
3 from __future__ import annotations
4 
5 from http import HTTPStatus
6 import logging
7 
8 from rocketchat_API.APIExceptions.RocketExceptions import (
9  RocketAuthenticationException,
10  RocketConnectionException,
11 )
12 from rocketchat_API.rocketchat import RocketChat
13 import voluptuous as vol
14 
16  ATTR_DATA,
17  PLATFORM_SCHEMA as NOTIFY_PLATFORM_SCHEMA,
18  BaseNotificationService,
19 )
20 from homeassistant.const import CONF_PASSWORD, CONF_ROOM, CONF_URL, CONF_USERNAME
21 from homeassistant.core import HomeAssistant
23 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
24 
25 _LOGGER = logging.getLogger(__name__)
26 
27 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
28  {
29  vol.Required(CONF_URL): vol.Url(),
30  vol.Required(CONF_USERNAME): cv.string,
31  vol.Required(CONF_PASSWORD): cv.string,
32  vol.Required(CONF_ROOM): cv.string,
33  }
34 )
35 
36 
38  hass: HomeAssistant,
39  config: ConfigType,
40  discovery_info: DiscoveryInfoType | None = None,
41 ) -> RocketChatNotificationService | None:
42  """Return the notify service."""
43 
44  username = config.get(CONF_USERNAME)
45  password = config.get(CONF_PASSWORD)
46 
47  url = config.get(CONF_URL)
48  room = config.get(CONF_ROOM)
49 
50  try:
51  return RocketChatNotificationService(url, username, password, room)
52  except RocketConnectionException:
53  _LOGGER.warning("Unable to connect to Rocket.Chat server at %s", url)
54  except RocketAuthenticationException:
55  _LOGGER.warning(
56  "Rocket.Chat authentication failed for user %s. Please check your username/password",
57  username,
58  )
59 
60  return None
61 
62 
63 class RocketChatNotificationService(BaseNotificationService):
64  """Implement the notification service for Rocket.Chat."""
65 
66  def __init__(self, url, username, password, room):
67  """Initialize the service."""
68 
69  self._room_room = room
70  self._server_server = RocketChat(username, password, server_url=url)
71 
72  def send_message(self, message="", **kwargs):
73  """Send a message to Rocket.Chat."""
74  data = kwargs.get(ATTR_DATA) or {}
75  resp = self._server_server.chat_post_message(message, channel=self._room_room, **data)
76  if resp.status_code == HTTPStatus.OK:
77  if not resp.json()["success"]:
78  _LOGGER.error("Unable to post Rocket.Chat message")
79  else:
80  _LOGGER.error(
81  "Incorrect status code when posting message: %d", resp.status_code
82  )
RocketChatNotificationService|None get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:41