1 """Rocket.Chat notification service."""
3 from __future__
import annotations
5 from http
import HTTPStatus
8 from rocketchat_API.APIExceptions.RocketExceptions
import (
9 RocketAuthenticationException,
10 RocketConnectionException,
12 from rocketchat_API.rocketchat
import RocketChat
13 import voluptuous
as vol
17 PLATFORM_SCHEMA
as NOTIFY_PLATFORM_SCHEMA,
18 BaseNotificationService,
25 _LOGGER = logging.getLogger(__name__)
27 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
29 vol.Required(CONF_URL): vol.Url(),
30 vol.Required(CONF_USERNAME): cv.string,
31 vol.Required(CONF_PASSWORD): cv.string,
32 vol.Required(CONF_ROOM): cv.string,
40 discovery_info: DiscoveryInfoType |
None =
None,
41 ) -> RocketChatNotificationService |
None:
42 """Return the notify service."""
44 username = config.get(CONF_USERNAME)
45 password = config.get(CONF_PASSWORD)
47 url = config.get(CONF_URL)
48 room = config.get(CONF_ROOM)
52 except RocketConnectionException:
53 _LOGGER.warning(
"Unable to connect to Rocket.Chat server at %s", url)
54 except RocketAuthenticationException:
56 "Rocket.Chat authentication failed for user %s. Please check your username/password",
64 """Implement the notification service for Rocket.Chat."""
66 def __init__(self, url, username, password, room):
67 """Initialize the service."""
70 self.
_server_server = RocketChat(username, password, server_url=url)
73 """Send a message to Rocket.Chat."""
74 data = kwargs.get(ATTR_DATA)
or {}
75 resp = self.
_server_server.chat_post_message(message, channel=self.
_room_room, **data)
76 if resp.status_code == HTTPStatus.OK:
77 if not resp.json()[
"success"]:
78 _LOGGER.error(
"Unable to post Rocket.Chat message")
81 "Incorrect status code when posting message: %d", resp.status_code
def __init__(self, url, username, password, room)
def send_message(self, message="", **kwargs)
RocketChatNotificationService|None get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)