Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """HTML5 Push Messaging notification service."""
2 
3 from __future__ import annotations
4 
5 from contextlib import suppress
6 from datetime import datetime, timedelta
7 from functools import partial
8 from http import HTTPStatus
9 import json
10 import logging
11 import time
12 from urllib.parse import urlparse
13 import uuid
14 
15 from aiohttp.hdrs import AUTHORIZATION
16 import jwt
17 from py_vapid import Vapid
18 from pywebpush import WebPusher
19 import voluptuous as vol
20 from voluptuous.humanize import humanize_error
21 
22 from homeassistant.components import websocket_api
23 from homeassistant.components.http import KEY_HASS, HomeAssistantView
25  ATTR_DATA,
26  ATTR_TARGET,
27  ATTR_TITLE,
28  ATTR_TITLE_DEFAULT,
29  PLATFORM_SCHEMA as NOTIFY_PLATFORM_SCHEMA,
30  BaseNotificationService,
31 )
32 from homeassistant.config_entries import SOURCE_IMPORT
33 from homeassistant.const import ATTR_NAME, URL_ROOT
34 from homeassistant.core import HomeAssistant, ServiceCall
35 from homeassistant.exceptions import HomeAssistantError
36 from homeassistant.helpers import config_validation as cv
37 from homeassistant.helpers.json import save_json
38 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
39 from homeassistant.util import ensure_unique_string
40 from homeassistant.util.json import JsonObjectType, load_json_object
41 
42 from .const import (
43  ATTR_VAPID_EMAIL,
44  ATTR_VAPID_PRV_KEY,
45  ATTR_VAPID_PUB_KEY,
46  DOMAIN,
47  SERVICE_DISMISS,
48 )
49 from .issues import async_create_html5_issue
50 
51 _LOGGER = logging.getLogger(__name__)
52 
53 REGISTRATIONS_FILE = "html5_push_registrations.conf"
54 
55 
56 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
57  {
58  vol.Optional("gcm_sender_id"): cv.string,
59  vol.Optional("gcm_api_key"): cv.string,
60  vol.Required(ATTR_VAPID_PUB_KEY): cv.string,
61  vol.Required(ATTR_VAPID_PRV_KEY): cv.string,
62  vol.Required(ATTR_VAPID_EMAIL): cv.string,
63  }
64 )
65 
66 ATTR_SUBSCRIPTION = "subscription"
67 ATTR_BROWSER = "browser"
68 
69 ATTR_ENDPOINT = "endpoint"
70 ATTR_KEYS = "keys"
71 ATTR_AUTH = "auth"
72 ATTR_P256DH = "p256dh"
73 ATTR_EXPIRATIONTIME = "expirationTime"
74 
75 ATTR_TAG = "tag"
76 ATTR_ACTION = "action"
77 ATTR_ACTIONS = "actions"
78 ATTR_TYPE = "type"
79 ATTR_URL = "url"
80 ATTR_DISMISS = "dismiss"
81 ATTR_PRIORITY = "priority"
82 DEFAULT_PRIORITY = "normal"
83 ATTR_TTL = "ttl"
84 DEFAULT_TTL = 86400
85 
86 ATTR_JWT = "jwt"
87 
88 WS_TYPE_APPKEY = "notify/html5/appkey"
89 SCHEMA_WS_APPKEY = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
90  {vol.Required("type"): WS_TYPE_APPKEY}
91 )
92 
93 # The number of days after the moment a notification is sent that a JWT
94 # is valid.
95 JWT_VALID_DAYS = 7
96 VAPID_CLAIM_VALID_HOURS = 12
97 
98 KEYS_SCHEMA = vol.All(
99  dict,
100  vol.Schema(
101  {vol.Required(ATTR_AUTH): cv.string, vol.Required(ATTR_P256DH): cv.string}
102  ),
103 )
104 
105 SUBSCRIPTION_SCHEMA = vol.All(
106  dict,
107  vol.Schema(
108  {
109  vol.Required(ATTR_ENDPOINT): vol.Url(),
110  vol.Required(ATTR_KEYS): KEYS_SCHEMA,
111  vol.Optional(ATTR_EXPIRATIONTIME): vol.Any(None, cv.positive_int),
112  }
113  ),
114 )
115 
116 DISMISS_SERVICE_SCHEMA = vol.Schema(
117  {
118  vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [cv.string]),
119  vol.Optional(ATTR_DATA): dict,
120  }
121 )
122 
123 REGISTER_SCHEMA = vol.Schema(
124  {
125  vol.Required(ATTR_SUBSCRIPTION): SUBSCRIPTION_SCHEMA,
126  vol.Required(ATTR_BROWSER): vol.In(["chrome", "firefox"]),
127  vol.Optional(ATTR_NAME): cv.string,
128  }
129 )
130 
131 CALLBACK_EVENT_PAYLOAD_SCHEMA = vol.Schema(
132  {
133  vol.Required(ATTR_TAG): cv.string,
134  vol.Required(ATTR_TYPE): vol.In(["received", "clicked", "closed"]),
135  vol.Required(ATTR_TARGET): cv.string,
136  vol.Optional(ATTR_ACTION): cv.string,
137  vol.Optional(ATTR_DATA): dict,
138  }
139 )
140 
141 NOTIFY_CALLBACK_EVENT = "html5_notification"
142 
143 # Badge and timestamp are Chrome specific (not in official spec)
144 HTML5_SHOWNOTIFICATION_PARAMETERS = (
145  "actions",
146  "badge",
147  "body",
148  "dir",
149  "icon",
150  "image",
151  "lang",
152  "renotify",
153  "requireInteraction",
154  "tag",
155  "timestamp",
156  "vibrate",
157 )
158 
159 
161  hass: HomeAssistant,
162  config: ConfigType,
163  discovery_info: DiscoveryInfoType | None = None,
164 ) -> HTML5NotificationService | None:
165  """Get the HTML5 push notification service."""
166  if config:
167  existing_config_entry = hass.config_entries.async_entries(DOMAIN)
168  if existing_config_entry:
169  async_create_html5_issue(hass, True)
170  return None
171  hass.async_create_task(
172  hass.config_entries.flow.async_init(
173  DOMAIN, context={"source": SOURCE_IMPORT}, data=config
174  )
175  )
176  return None
177 
178  if discovery_info is None:
179  return None
180 
181  json_path = hass.config.path(REGISTRATIONS_FILE)
182 
183  registrations = await hass.async_add_executor_job(_load_config, json_path)
184 
185  vapid_pub_key = discovery_info[ATTR_VAPID_PUB_KEY]
186  vapid_prv_key = discovery_info[ATTR_VAPID_PRV_KEY]
187  vapid_email = discovery_info[ATTR_VAPID_EMAIL]
188 
189  def websocket_appkey(_hass, connection, msg):
190  connection.send_message(websocket_api.result_message(msg["id"], vapid_pub_key))
191 
192  websocket_api.async_register_command(
193  hass, WS_TYPE_APPKEY, websocket_appkey, SCHEMA_WS_APPKEY
194  )
195 
196  hass.http.register_view(HTML5PushRegistrationView(registrations, json_path))
197  hass.http.register_view(HTML5PushCallbackView(registrations))
198 
200  hass, vapid_prv_key, vapid_email, registrations, json_path
201  )
202 
203 
204 def _load_config(filename: str) -> JsonObjectType:
205  """Load configuration."""
206  with suppress(HomeAssistantError):
207  return load_json_object(filename)
208  return {}
209 
210 
211 class HTML5PushRegistrationView(HomeAssistantView):
212  """Accepts push registrations from a browser."""
213 
214  url = "/api/notify.html5"
215  name = "api:notify.html5"
216 
217  def __init__(self, registrations, json_path):
218  """Init HTML5PushRegistrationView."""
219  self.registrationsregistrations = registrations
220  self.json_pathjson_path = json_path
221 
222  async def post(self, request):
223  """Accept the POST request for push registrations from a browser."""
224  try:
225  data = await request.json()
226  except ValueError:
227  return self.json_message("Invalid JSON", HTTPStatus.BAD_REQUEST)
228  try:
229  data = REGISTER_SCHEMA(data)
230  except vol.Invalid as ex:
231  return self.json_message(humanize_error(data, ex), HTTPStatus.BAD_REQUEST)
232 
233  devname = data.get(ATTR_NAME)
234  data.pop(ATTR_NAME, None)
235 
236  name = self.find_registration_namefind_registration_name(data, devname)
237  previous_registration = self.registrationsregistrations.get(name)
238 
239  self.registrationsregistrations[name] = data
240 
241  try:
242  hass = request.app[KEY_HASS]
243 
244  await hass.async_add_executor_job(
245  save_json, self.json_pathjson_path, self.registrationsregistrations
246  )
247  return self.json_message("Push notification subscriber registered.")
248  except HomeAssistantError:
249  if previous_registration is not None:
250  self.registrationsregistrations[name] = previous_registration
251  else:
252  self.registrationsregistrations.pop(name)
253 
254  return self.json_message(
255  "Error saving registration.", HTTPStatus.INTERNAL_SERVER_ERROR
256  )
257 
258  def find_registration_name(self, data, suggested=None):
259  """Find a registration name matching data or generate a unique one."""
260  endpoint = data.get(ATTR_SUBSCRIPTION).get(ATTR_ENDPOINT)
261  for key, registration in self.registrationsregistrations.items():
262  subscription = registration.get(ATTR_SUBSCRIPTION)
263  if subscription.get(ATTR_ENDPOINT) == endpoint:
264  return key
265  return ensure_unique_string(suggested or "unnamed device", self.registrationsregistrations)
266 
267  async def delete(self, request):
268  """Delete a registration."""
269  try:
270  data = await request.json()
271  except ValueError:
272  return self.json_message("Invalid JSON", HTTPStatus.BAD_REQUEST)
273 
274  subscription = data.get(ATTR_SUBSCRIPTION)
275 
276  found = None
277 
278  for key, registration in self.registrationsregistrations.items():
279  if registration.get(ATTR_SUBSCRIPTION) == subscription:
280  found = key
281  break
282 
283  if not found:
284  # If not found, unregistering was already done. Return 200
285  return self.json_message("Registration not found.")
286 
287  reg = self.registrationsregistrations.pop(found)
288 
289  try:
290  hass = request.app[KEY_HASS]
291 
292  await hass.async_add_executor_job(
293  save_json, self.json_pathjson_path, self.registrationsregistrations
294  )
295  except HomeAssistantError:
296  self.registrationsregistrations[found] = reg
297  return self.json_message(
298  "Error saving registration.", HTTPStatus.INTERNAL_SERVER_ERROR
299  )
300 
301  return self.json_message("Push notification subscriber unregistered.")
302 
303 
304 class HTML5PushCallbackView(HomeAssistantView):
305  """Accepts push registrations from a browser."""
306 
307  requires_auth = False
308  url = "/api/notify.html5/callback"
309  name = "api:notify.html5/callback"
310 
311  def __init__(self, registrations):
312  """Init HTML5PushCallbackView."""
313  self.registrationsregistrations = registrations
314 
315  def decode_jwt(self, token):
316  """Find the registration that signed this JWT and return it."""
317 
318  # 1. Check claims w/o verifying to see if a target is in there.
319  # 2. If target in claims, attempt to verify against the given name.
320  # 2a. If decode is successful, return the payload.
321  # 2b. If decode is unsuccessful, return a 401.
322 
323  target_check = jwt.decode(
324  token, algorithms=["ES256", "HS256"], options={"verify_signature": False}
325  )
326  if target_check.get(ATTR_TARGET) in self.registrationsregistrations:
327  possible_target = self.registrationsregistrations[target_check[ATTR_TARGET]]
328  key = possible_target[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH]
329  with suppress(jwt.exceptions.DecodeError):
330  return jwt.decode(token, key, algorithms=["ES256", "HS256"])
331 
332  return self.json_message(
333  "No target found in JWT", status_code=HTTPStatus.UNAUTHORIZED
334  )
335 
336  # The following is based on code from Auth0
337  # https://auth0.com/docs/quickstart/backend/python
338  def check_authorization_header(self, request):
339  """Check the authorization header."""
340  if not (auth := request.headers.get(AUTHORIZATION)):
341  return self.json_message(
342  "Authorization header is expected", status_code=HTTPStatus.UNAUTHORIZED
343  )
344 
345  parts = auth.split()
346 
347  if parts[0].lower() != "bearer":
348  return self.json_message(
349  "Authorization header must start with Bearer",
350  status_code=HTTPStatus.UNAUTHORIZED,
351  )
352  if len(parts) != 2:
353  return self.json_message(
354  "Authorization header must be Bearer token",
355  status_code=HTTPStatus.UNAUTHORIZED,
356  )
357 
358  token = parts[1]
359  try:
360  payload = self.decode_jwtdecode_jwt(token)
361  except jwt.exceptions.InvalidTokenError:
362  return self.json_message(
363  "token is invalid", status_code=HTTPStatus.UNAUTHORIZED
364  )
365  return payload
366 
367  async def post(self, request):
368  """Accept the POST request for push registrations event callback."""
369  auth_check = self.check_authorization_headercheck_authorization_header(request)
370  if not isinstance(auth_check, dict):
371  return auth_check
372 
373  try:
374  data = await request.json()
375  except ValueError:
376  return self.json_message("Invalid JSON", HTTPStatus.BAD_REQUEST)
377 
378  event_payload = {
379  ATTR_TAG: data.get(ATTR_TAG),
380  ATTR_TYPE: data[ATTR_TYPE],
381  ATTR_TARGET: auth_check[ATTR_TARGET],
382  }
383 
384  if data.get(ATTR_ACTION) is not None:
385  event_payload[ATTR_ACTION] = data.get(ATTR_ACTION)
386 
387  if data.get(ATTR_DATA) is not None:
388  event_payload[ATTR_DATA] = data.get(ATTR_DATA)
389 
390  try:
391  event_payload = CALLBACK_EVENT_PAYLOAD_SCHEMA(event_payload)
392  except vol.Invalid as ex:
393  _LOGGER.warning(
394  "Callback event payload is not valid: %s",
395  humanize_error(event_payload, ex),
396  )
397 
398  event_name = f"{NOTIFY_CALLBACK_EVENT}.{event_payload[ATTR_TYPE]}"
399  request.app[KEY_HASS].bus.fire(event_name, event_payload)
400  return self.json({"status": "ok", "event": event_payload[ATTR_TYPE]})
401 
402 
403 class HTML5NotificationService(BaseNotificationService):
404  """Implement the notification service for HTML5."""
405 
406  def __init__(self, hass, vapid_prv, vapid_email, registrations, json_path):
407  """Initialize the service."""
408  self._vapid_prv_vapid_prv = vapid_prv
409  self._vapid_email_vapid_email = vapid_email
410  self.registrationsregistrations = registrations
411  self.registrations_json_pathregistrations_json_path = json_path
412 
413  async def async_dismiss_message(service: ServiceCall) -> None:
414  """Handle dismissing notification message service calls."""
415  kwargs = {}
416 
417  if self.targetstargets is not None:
418  kwargs[ATTR_TARGET] = self.targetstargets
419  elif service.data.get(ATTR_TARGET) is not None:
420  kwargs[ATTR_TARGET] = service.data.get(ATTR_TARGET)
421 
422  kwargs[ATTR_DATA] = service.data.get(ATTR_DATA)
423 
424  await self.async_dismissasync_dismiss(**kwargs)
425 
426  hass.services.async_register(
427  DOMAIN,
428  SERVICE_DISMISS,
429  async_dismiss_message,
430  schema=DISMISS_SERVICE_SCHEMA,
431  )
432 
433  @property
434  def targets(self):
435  """Return a dictionary of registered targets."""
436  return {registration: registration for registration in self.registrationsregistrations}
437 
438  def dismiss(self, **kwargs):
439  """Dismisses a notification."""
440  data = kwargs.get(ATTR_DATA)
441  tag = data.get(ATTR_TAG) if data else ""
442  payload = {ATTR_TAG: tag, ATTR_DISMISS: True, ATTR_DATA: {}}
443 
444  self._push_message_push_message(payload, **kwargs)
445 
446  async def async_dismiss(self, **kwargs):
447  """Dismisses a notification.
448 
449  This method must be run in the event loop.
450  """
451  await self.hass.async_add_executor_job(partial(self.dismissdismiss, **kwargs))
452 
453  def send_message(self, message="", **kwargs):
454  """Send a message to a user."""
455  tag = str(uuid.uuid4())
456  payload = {
457  "badge": "/static/images/notification-badge.png",
458  "body": message,
459  ATTR_DATA: {},
460  "icon": "/static/icons/favicon-192x192.png",
461  ATTR_TAG: tag,
462  ATTR_TITLE: kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT),
463  }
464 
465  if data := kwargs.get(ATTR_DATA):
466  # Pick out fields that should go into the notification directly vs
467  # into the notification data dictionary.
468 
469  data_tmp = {}
470 
471  for key, val in data.items():
472  if key in HTML5_SHOWNOTIFICATION_PARAMETERS:
473  payload[key] = val
474  else:
475  data_tmp[key] = val
476 
477  payload[ATTR_DATA] = data_tmp
478 
479  if (
480  payload[ATTR_DATA].get(ATTR_URL) is None
481  and payload.get(ATTR_ACTIONS) is None
482  ):
483  payload[ATTR_DATA][ATTR_URL] = URL_ROOT
484 
485  self._push_message_push_message(payload, **kwargs)
486 
487  def _push_message(self, payload, **kwargs):
488  """Send the message."""
489 
490  timestamp = int(time.time())
491  ttl = int(kwargs.get(ATTR_TTL, DEFAULT_TTL))
492  priority = kwargs.get(ATTR_PRIORITY, DEFAULT_PRIORITY)
493  if priority not in ["normal", "high"]:
494  priority = DEFAULT_PRIORITY
495  payload["timestamp"] = timestamp * 1000 # Javascript ms since epoch
496 
497  if not (targets := kwargs.get(ATTR_TARGET)):
498  targets = self.registrationsregistrations.keys()
499 
500  for target in list(targets):
501  info = self.registrationsregistrations.get(target)
502  try:
503  info = REGISTER_SCHEMA(info)
504  except vol.Invalid:
505  _LOGGER.error(
506  "%s is not a valid HTML5 push notification target", target
507  )
508  continue
509  subscription = info[ATTR_SUBSCRIPTION]
510  payload[ATTR_DATA][ATTR_JWT] = add_jwt(
511  timestamp,
512  target,
513  payload[ATTR_TAG],
514  subscription[ATTR_KEYS][ATTR_AUTH],
515  )
516  webpusher = WebPusher(info[ATTR_SUBSCRIPTION])
517 
518  endpoint = urlparse(subscription[ATTR_ENDPOINT])
519  vapid_claims = {
520  "sub": f"mailto:{self._vapid_email}",
521  "aud": f"{endpoint.scheme}://{endpoint.netloc}",
522  "exp": timestamp + (VAPID_CLAIM_VALID_HOURS * 60 * 60),
523  }
524  vapid_headers = Vapid.from_string(self._vapid_prv_vapid_prv).sign(vapid_claims)
525  vapid_headers.update({"urgency": priority, "priority": priority})
526  response = webpusher.send(
527  data=json.dumps(payload), headers=vapid_headers, ttl=ttl
528  )
529 
530  if response.status_code == 410:
531  _LOGGER.info("Notification channel has expired")
532  reg = self.registrationsregistrations.pop(target)
533  try:
534  save_json(self.registrations_json_pathregistrations_json_path, self.registrationsregistrations)
535  except HomeAssistantError:
536  self.registrationsregistrations[target] = reg
537  _LOGGER.error("Error saving registration")
538  else:
539  _LOGGER.info("Configuration saved")
540  elif response.status_code > 399:
541  _LOGGER.error(
542  "There was an issue sending the notification %s: %s",
543  response.status_code,
544  response.text,
545  )
546 
547 
548 def add_jwt(timestamp, target, tag, jwt_secret):
549  """Create JWT json to put into payload."""
550 
551  jwt_exp = datetime.fromtimestamp(timestamp) + timedelta(days=JWT_VALID_DAYS)
552  jwt_claims = {
553  "exp": jwt_exp,
554  "nbf": timestamp,
555  "iat": timestamp,
556  ATTR_TARGET: target,
557  ATTR_TAG: tag,
558  }
559  return jwt.encode(jwt_claims, jwt_secret)
def __init__(self, hass, vapid_prv, vapid_email, registrations, json_path)
Definition: notify.py:406
def find_registration_name(self, data, suggested=None)
Definition: notify.py:258
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_create_html5_issue(HomeAssistant hass, bool import_success)
Definition: issues.py:19
JsonObjectType _load_config(str filename)
Definition: notify.py:204
HTML5NotificationService|None async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:164
def add_jwt(timestamp, target, tag, jwt_secret)
Definition: notify.py:548
str humanize_error(HomeAssistant hass, vol.Invalid validation_error, str domain, dict config, str|None link, int max_sub_error_length=MAX_VALIDATION_ERROR_ITEM_LENGTH)
Definition: config.py:520
None save_json(str filename, list|dict data, bool private=False, *type[json.JSONEncoder]|None encoder=None, bool atomic_writes=False)
Definition: json.py:202
JsonObjectType load_json_object(str|PathLike[str] filename, JsonObjectType default=_SENTINEL)
Definition: json.py:109
str ensure_unique_string(str preferred_string, Iterable[str]|KeysView[str] current_strings)
Definition: __init__.py:74