1 """Notification service for Google Mail integration."""
3 from __future__
import annotations
6 from email.mime.text
import MIMEText
9 from googleapiclient.http
import HttpRequest
17 BaseNotificationService,
22 from .api
import AsyncConfigEntryAuth
23 from .const
import ATTR_BCC, ATTR_CC, ATTR_FROM, ATTR_ME, ATTR_SEND, DATA_AUTH
29 discovery_info: DiscoveryInfoType |
None =
None,
30 ) -> GMailNotificationService |
None:
31 """Get the notification service."""
36 """Define the Google Mail notification logic."""
38 def __init__(self, config: dict[str, Any]) ->
None:
39 """Initialize the service."""
40 self.auth: AsyncConfigEntryAuth = config[DATA_AUTH]
44 data: dict[str, Any] = kwargs.get(ATTR_DATA)
or {}
45 title = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)
47 email = MIMEText(message,
"html")
48 if to_addrs := kwargs.get(ATTR_TARGET):
49 email[
"To"] =
", ".join(to_addrs)
50 email[
"From"] = data.get(ATTR_FROM, ATTR_ME)
51 email[
"Subject"] = title
52 email[ATTR_CC] =
", ".join(data.get(ATTR_CC, []))
53 email[ATTR_BCC] =
", ".join(data.get(ATTR_BCC, []))
55 encoded_message = base64.urlsafe_b64encode(email.as_bytes()).decode()
56 body = {
"raw": encoded_message}
59 if data.get(ATTR_SEND)
is False:
60 msg = users.drafts().
create(userId=email[
"From"], body={ATTR_MESSAGE: body})
63 raise ValueError(
"recipient address required")
64 msg = users.messages().send(userId=email[
"From"], body=body)
65 await self.hass.async_add_executor_job(msg.execute)
None async_send_message(self, str message, **Any kwargs)
None __init__(self, dict[str, Any] config)
GMailNotificationService|None async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
str get_resource(str domain_name, ConfigType domain_data)
None create(HomeAssistant hass, str message, str|None title=None, str|None notification_id=None)