1 """Telegram platform for notify component."""
3 from __future__
import annotations
7 import voluptuous
as vol
14 PLATFORM_SCHEMA
as NOTIFY_PLATFORM_SCHEMA,
15 BaseNotificationService,
19 ATTR_DISABLE_WEB_PREV,
21 ATTR_MESSAGE_THREAD_ID,
29 from .
import DOMAIN
as TELEGRAM_DOMAIN, PLATFORMS
31 _LOGGER = logging.getLogger(__name__)
33 DOMAIN =
"telegram_bot"
34 ATTR_KEYBOARD =
"keyboard"
35 ATTR_INLINE_KEYBOARD =
"inline_keyboard"
39 ATTR_DOCUMENT =
"document"
41 CONF_CHAT_ID =
"chat_id"
43 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
44 {vol.Required(CONF_CHAT_ID): vol.Coerce(int)}
51 discovery_info: DiscoveryInfoType |
None =
None,
52 ) -> TelegramNotificationService:
53 """Get the Telegram notification service."""
56 chat_id = config.get(CONF_CHAT_ID)
61 """Implement the notification service for Telegram."""
64 """Initialize the service."""
69 """Send a message to a user."""
70 service_data = {ATTR_TARGET: kwargs.get(ATTR_TARGET, self.
_chat_id_chat_id)}
71 if ATTR_TITLE
in kwargs:
72 service_data.update({ATTR_TITLE: kwargs.get(ATTR_TITLE)})
74 service_data.update({ATTR_MESSAGE: message})
75 data = kwargs.get(ATTR_DATA)
78 if data
is not None and ATTR_MESSAGE_TAG
in data:
79 message_tag = data.get(ATTR_MESSAGE_TAG)
80 service_data.update({ATTR_MESSAGE_TAG: message_tag})
83 if data
is not None and ATTR_DISABLE_NOTIF
in data:
84 disable_notification = data.get(ATTR_DISABLE_NOTIF)
85 service_data.update({ATTR_DISABLE_NOTIF: disable_notification})
88 if data
is not None and ATTR_PARSER
in data:
89 parse_mode = data.get(ATTR_PARSER)
90 service_data.update({ATTR_PARSER: parse_mode})
93 if data
is not None and ATTR_DISABLE_WEB_PREV
in data:
94 disable_web_page_preview = data[ATTR_DISABLE_WEB_PREV]
95 service_data.update({ATTR_DISABLE_WEB_PREV: disable_web_page_preview})
98 if data
is not None and ATTR_MESSAGE_THREAD_ID
in data:
99 message_thread_id = data[ATTR_MESSAGE_THREAD_ID]
100 service_data.update({ATTR_MESSAGE_THREAD_ID: message_thread_id})
103 if data
is not None and ATTR_KEYBOARD
in data:
104 keys = data.get(ATTR_KEYBOARD)
105 keys = keys
if isinstance(keys, list)
else [keys]
106 service_data.update(keyboard=keys)
107 elif data
is not None and ATTR_INLINE_KEYBOARD
in data:
108 keys = data.get(ATTR_INLINE_KEYBOARD)
109 keys = keys
if isinstance(keys, list)
else [keys]
110 service_data.update(inline_keyboard=keys)
113 if data
is not None and ATTR_PHOTO
in data:
114 photos = data.get(ATTR_PHOTO)
115 photos = photos
if isinstance(photos, list)
else [photos]
116 for photo_data
in photos:
117 service_data.update(photo_data)
118 self.
hasshass.services.call(DOMAIN,
"send_photo", service_data=service_data)
120 if data
is not None and ATTR_VIDEO
in data:
121 videos = data.get(ATTR_VIDEO)
122 videos = videos
if isinstance(videos, list)
else [videos]
123 for video_data
in videos:
124 service_data.update(video_data)
125 self.
hasshass.services.call(DOMAIN,
"send_video", service_data=service_data)
127 if data
is not None and ATTR_VOICE
in data:
128 voices = data.get(ATTR_VOICE)
129 voices = voices
if isinstance(voices, list)
else [voices]
130 for voice_data
in voices:
131 service_data.update(voice_data)
132 self.
hasshass.services.call(DOMAIN,
"send_voice", service_data=service_data)
134 if data
is not None and ATTR_LOCATION
in data:
135 service_data.update(data.get(ATTR_LOCATION))
136 return self.
hasshass.services.call(
137 DOMAIN,
"send_location", service_data=service_data
139 if data
is not None and ATTR_DOCUMENT
in data:
140 service_data.update(data.get(ATTR_DOCUMENT))
141 return self.
hasshass.services.call(
142 DOMAIN,
"send_document", service_data=service_data
147 "TELEGRAM NOTIFIER calling %s.send_message with %s", DOMAIN, service_data
149 return self.
hasshass.services.call(
150 DOMAIN,
"send_message", service_data=service_data
def send_message(self, message="", **kwargs)
def __init__(self, hass, chat_id)
TelegramNotificationService get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
None setup_reload_service(HomeAssistant hass, str domain, Iterable[str] platforms)