Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """Notification service for Google Mail integration."""
2 
3 from __future__ import annotations
4 
5 import base64
6 from email.mime.text import MIMEText
7 from typing import Any
8 
9 from googleapiclient.http import HttpRequest
10 
12  ATTR_DATA,
13  ATTR_MESSAGE,
14  ATTR_TARGET,
15  ATTR_TITLE,
16  ATTR_TITLE_DEFAULT,
17  BaseNotificationService,
18 )
19 from homeassistant.core import HomeAssistant
20 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
21 
22 from .api import AsyncConfigEntryAuth
23 from .const import ATTR_BCC, ATTR_CC, ATTR_FROM, ATTR_ME, ATTR_SEND, DATA_AUTH
24 
25 
27  hass: HomeAssistant,
28  config: ConfigType,
29  discovery_info: DiscoveryInfoType | None = None,
30 ) -> GMailNotificationService | None:
31  """Get the notification service."""
32  return GMailNotificationService(discovery_info) if discovery_info else None
33 
34 
35 class GMailNotificationService(BaseNotificationService):
36  """Define the Google Mail notification logic."""
37 
38  def __init__(self, config: dict[str, Any]) -> None:
39  """Initialize the service."""
40  self.auth: AsyncConfigEntryAuth = config[DATA_AUTH]
41 
42  async def async_send_message(self, message: str, **kwargs: Any) -> None:
43  """Send a message."""
44  data: dict[str, Any] = kwargs.get(ATTR_DATA) or {}
45  title = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)
46 
47  email = MIMEText(message, "html")
48  if to_addrs := kwargs.get(ATTR_TARGET):
49  email["To"] = ", ".join(to_addrs)
50  email["From"] = data.get(ATTR_FROM, ATTR_ME)
51  email["Subject"] = title
52  email[ATTR_CC] = ", ".join(data.get(ATTR_CC, []))
53  email[ATTR_BCC] = ", ".join(data.get(ATTR_BCC, []))
54 
55  encoded_message = base64.urlsafe_b64encode(email.as_bytes()).decode()
56  body = {"raw": encoded_message}
57  msg: HttpRequest
58  users = (await self.auth.get_resource()).users()
59  if data.get(ATTR_SEND) is False:
60  msg = users.drafts().create(userId=email["From"], body={ATTR_MESSAGE: body})
61  else:
62  if not to_addrs:
63  raise ValueError("recipient address required")
64  msg = users.messages().send(userId=email["From"], body=body)
65  await self.hass.async_add_executor_job(msg.execute)
None async_send_message(self, str message, **Any kwargs)
Definition: notify.py:42
GMailNotificationService|None async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:30
str get_resource(str domain_name, ConfigType domain_data)
Definition: helpers.py:83
None create(HomeAssistant hass, str message, str|None title=None, str|None notification_id=None)
Definition: __init__.py:84