1 """HTML5 Push Messaging notification service."""
3 from __future__
import annotations
5 from contextlib
import suppress
6 from datetime
import datetime, timedelta
7 from functools
import partial
8 from http
import HTTPStatus
12 from urllib.parse
import urlparse
15 from aiohttp.hdrs
import AUTHORIZATION
17 from py_vapid
import Vapid
18 from pywebpush
import WebPusher
19 import voluptuous
as vol
20 from voluptuous.humanize
import humanize_error
29 PLATFORM_SCHEMA
as NOTIFY_PLATFORM_SCHEMA,
30 BaseNotificationService,
49 from .issues
import async_create_html5_issue
51 _LOGGER = logging.getLogger(__name__)
53 REGISTRATIONS_FILE =
"html5_push_registrations.conf"
56 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
58 vol.Optional(
"gcm_sender_id"): cv.string,
59 vol.Optional(
"gcm_api_key"): cv.string,
60 vol.Required(ATTR_VAPID_PUB_KEY): cv.string,
61 vol.Required(ATTR_VAPID_PRV_KEY): cv.string,
62 vol.Required(ATTR_VAPID_EMAIL): cv.string,
66 ATTR_SUBSCRIPTION =
"subscription"
67 ATTR_BROWSER =
"browser"
69 ATTR_ENDPOINT =
"endpoint"
72 ATTR_P256DH =
"p256dh"
73 ATTR_EXPIRATIONTIME =
"expirationTime"
76 ATTR_ACTION =
"action"
77 ATTR_ACTIONS =
"actions"
80 ATTR_DISMISS =
"dismiss"
81 ATTR_PRIORITY =
"priority"
82 DEFAULT_PRIORITY =
"normal"
88 WS_TYPE_APPKEY =
"notify/html5/appkey"
89 SCHEMA_WS_APPKEY = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
90 {vol.Required(
"type"): WS_TYPE_APPKEY}
96 VAPID_CLAIM_VALID_HOURS = 12
98 KEYS_SCHEMA = vol.All(
101 {vol.Required(ATTR_AUTH): cv.string, vol.Required(ATTR_P256DH): cv.string}
105 SUBSCRIPTION_SCHEMA = vol.All(
109 vol.Required(ATTR_ENDPOINT): vol.Url(),
110 vol.Required(ATTR_KEYS): KEYS_SCHEMA,
111 vol.Optional(ATTR_EXPIRATIONTIME): vol.Any(
None, cv.positive_int),
116 DISMISS_SERVICE_SCHEMA = vol.Schema(
118 vol.Optional(ATTR_TARGET): vol.All(cv.ensure_list, [cv.string]),
119 vol.Optional(ATTR_DATA): dict,
123 REGISTER_SCHEMA = vol.Schema(
125 vol.Required(ATTR_SUBSCRIPTION): SUBSCRIPTION_SCHEMA,
126 vol.Required(ATTR_BROWSER): vol.In([
"chrome",
"firefox"]),
127 vol.Optional(ATTR_NAME): cv.string,
131 CALLBACK_EVENT_PAYLOAD_SCHEMA = vol.Schema(
133 vol.Required(ATTR_TAG): cv.string,
134 vol.Required(ATTR_TYPE): vol.In([
"received",
"clicked",
"closed"]),
135 vol.Required(ATTR_TARGET): cv.string,
136 vol.Optional(ATTR_ACTION): cv.string,
137 vol.Optional(ATTR_DATA): dict,
141 NOTIFY_CALLBACK_EVENT =
"html5_notification"
144 HTML5_SHOWNOTIFICATION_PARAMETERS = (
153 "requireInteraction",
163 discovery_info: DiscoveryInfoType |
None =
None,
164 ) -> HTML5NotificationService |
None:
165 """Get the HTML5 push notification service."""
167 existing_config_entry = hass.config_entries.async_entries(DOMAIN)
168 if existing_config_entry:
171 hass.async_create_task(
172 hass.config_entries.flow.async_init(
173 DOMAIN, context={
"source": SOURCE_IMPORT}, data=config
178 if discovery_info
is None:
181 json_path = hass.config.path(REGISTRATIONS_FILE)
183 registrations = await hass.async_add_executor_job(_load_config, json_path)
185 vapid_pub_key = discovery_info[ATTR_VAPID_PUB_KEY]
186 vapid_prv_key = discovery_info[ATTR_VAPID_PRV_KEY]
187 vapid_email = discovery_info[ATTR_VAPID_EMAIL]
189 def websocket_appkey(_hass, connection, msg):
190 connection.send_message(websocket_api.result_message(msg[
"id"], vapid_pub_key))
192 websocket_api.async_register_command(
193 hass, WS_TYPE_APPKEY, websocket_appkey, SCHEMA_WS_APPKEY
200 hass, vapid_prv_key, vapid_email, registrations, json_path
205 """Load configuration."""
206 with suppress(HomeAssistantError):
212 """Accepts push registrations from a browser."""
214 url =
"/api/notify.html5"
215 name =
"api:notify.html5"
218 """Init HTML5PushRegistrationView."""
223 """Accept the POST request for push registrations from a browser."""
225 data = await request.json()
227 return self.json_message(
"Invalid JSON", HTTPStatus.BAD_REQUEST)
230 except vol.Invalid
as ex:
231 return self.json_message(
humanize_error(data, ex), HTTPStatus.BAD_REQUEST)
233 devname = data.get(ATTR_NAME)
234 data.pop(ATTR_NAME,
None)
242 hass = request.app[KEY_HASS]
244 await hass.async_add_executor_job(
247 return self.json_message(
"Push notification subscriber registered.")
248 except HomeAssistantError:
249 if previous_registration
is not None:
250 self.
registrationsregistrations[name] = previous_registration
254 return self.json_message(
255 "Error saving registration.", HTTPStatus.INTERNAL_SERVER_ERROR
259 """Find a registration name matching data or generate a unique one."""
260 endpoint = data.get(ATTR_SUBSCRIPTION).
get(ATTR_ENDPOINT)
261 for key, registration
in self.
registrationsregistrations.items():
262 subscription = registration.get(ATTR_SUBSCRIPTION)
263 if subscription.get(ATTR_ENDPOINT) == endpoint:
268 """Delete a registration."""
270 data = await request.json()
272 return self.json_message(
"Invalid JSON", HTTPStatus.BAD_REQUEST)
274 subscription = data.get(ATTR_SUBSCRIPTION)
278 for key, registration
in self.
registrationsregistrations.items():
279 if registration.get(ATTR_SUBSCRIPTION) == subscription:
285 return self.json_message(
"Registration not found.")
290 hass = request.app[KEY_HASS]
292 await hass.async_add_executor_job(
295 except HomeAssistantError:
297 return self.json_message(
298 "Error saving registration.", HTTPStatus.INTERNAL_SERVER_ERROR
301 return self.json_message(
"Push notification subscriber unregistered.")
305 """Accepts push registrations from a browser."""
307 requires_auth =
False
308 url =
"/api/notify.html5/callback"
309 name =
"api:notify.html5/callback"
312 """Init HTML5PushCallbackView."""
316 """Find the registration that signed this JWT and return it."""
323 target_check = jwt.decode(
324 token, algorithms=[
"ES256",
"HS256"], options={
"verify_signature":
False}
326 if target_check.get(ATTR_TARGET)
in self.
registrationsregistrations:
327 possible_target = self.
registrationsregistrations[target_check[ATTR_TARGET]]
328 key = possible_target[ATTR_SUBSCRIPTION][ATTR_KEYS][ATTR_AUTH]
329 with suppress(jwt.exceptions.DecodeError):
330 return jwt.decode(token, key, algorithms=[
"ES256",
"HS256"])
332 return self.json_message(
333 "No target found in JWT", status_code=HTTPStatus.UNAUTHORIZED
339 """Check the authorization header."""
340 if not (auth := request.headers.get(AUTHORIZATION)):
341 return self.json_message(
342 "Authorization header is expected", status_code=HTTPStatus.UNAUTHORIZED
347 if parts[0].lower() !=
"bearer":
348 return self.json_message(
349 "Authorization header must start with Bearer",
350 status_code=HTTPStatus.UNAUTHORIZED,
353 return self.json_message(
354 "Authorization header must be Bearer token",
355 status_code=HTTPStatus.UNAUTHORIZED,
361 except jwt.exceptions.InvalidTokenError:
362 return self.json_message(
363 "token is invalid", status_code=HTTPStatus.UNAUTHORIZED
368 """Accept the POST request for push registrations event callback."""
370 if not isinstance(auth_check, dict):
374 data = await request.json()
376 return self.json_message(
"Invalid JSON", HTTPStatus.BAD_REQUEST)
379 ATTR_TAG: data.get(ATTR_TAG),
380 ATTR_TYPE: data[ATTR_TYPE],
381 ATTR_TARGET: auth_check[ATTR_TARGET],
384 if data.get(ATTR_ACTION)
is not None:
385 event_payload[ATTR_ACTION] = data.get(ATTR_ACTION)
387 if data.get(ATTR_DATA)
is not None:
388 event_payload[ATTR_DATA] = data.get(ATTR_DATA)
392 except vol.Invalid
as ex:
394 "Callback event payload is not valid: %s",
398 event_name = f
"{NOTIFY_CALLBACK_EVENT}.{event_payload[ATTR_TYPE]}"
399 request.app[KEY_HASS].bus.fire(event_name, event_payload)
400 return self.json({
"status":
"ok",
"event": event_payload[ATTR_TYPE]})
404 """Implement the notification service for HTML5."""
406 def __init__(self, hass, vapid_prv, vapid_email, registrations, json_path):
407 """Initialize the service."""
413 async
def async_dismiss_message(service: ServiceCall) ->
None:
414 """Handle dismissing notification message service calls."""
417 if self.
targetstargets
is not None:
418 kwargs[ATTR_TARGET] = self.
targetstargets
419 elif service.data.get(ATTR_TARGET)
is not None:
420 kwargs[ATTR_TARGET] = service.data.get(ATTR_TARGET)
422 kwargs[ATTR_DATA] = service.data.get(ATTR_DATA)
426 hass.services.async_register(
429 async_dismiss_message,
430 schema=DISMISS_SERVICE_SCHEMA,
435 """Return a dictionary of registered targets."""
436 return {registration: registration
for registration
in self.
registrationsregistrations}
439 """Dismisses a notification."""
440 data = kwargs.get(ATTR_DATA)
441 tag = data.get(ATTR_TAG)
if data
else ""
442 payload = {ATTR_TAG: tag, ATTR_DISMISS:
True, ATTR_DATA: {}}
447 """Dismisses a notification.
449 This method must be run in the event loop.
451 await self.hass.async_add_executor_job(partial(self.
dismissdismiss, **kwargs))
454 """Send a message to a user."""
455 tag =
str(uuid.uuid4())
457 "badge":
"/static/images/notification-badge.png",
460 "icon":
"/static/icons/favicon-192x192.png",
462 ATTR_TITLE: kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT),
465 if data := kwargs.get(ATTR_DATA):
471 for key, val
in data.items():
472 if key
in HTML5_SHOWNOTIFICATION_PARAMETERS:
477 payload[ATTR_DATA] = data_tmp
480 payload[ATTR_DATA].
get(ATTR_URL)
is None
481 and payload.get(ATTR_ACTIONS)
is None
483 payload[ATTR_DATA][ATTR_URL] = URL_ROOT
488 """Send the message."""
490 timestamp =
int(time.time())
491 ttl =
int(kwargs.get(ATTR_TTL, DEFAULT_TTL))
492 priority = kwargs.get(ATTR_PRIORITY, DEFAULT_PRIORITY)
493 if priority
not in [
"normal",
"high"]:
494 priority = DEFAULT_PRIORITY
495 payload[
"timestamp"] = timestamp * 1000
497 if not (targets := kwargs.get(ATTR_TARGET)):
500 for target
in list(targets):
506 "%s is not a valid HTML5 push notification target", target
509 subscription = info[ATTR_SUBSCRIPTION]
510 payload[ATTR_DATA][ATTR_JWT] =
add_jwt(
514 subscription[ATTR_KEYS][ATTR_AUTH],
516 webpusher = WebPusher(info[ATTR_SUBSCRIPTION])
518 endpoint = urlparse(subscription[ATTR_ENDPOINT])
520 "sub": f
"mailto:{self._vapid_email}",
521 "aud": f
"{endpoint.scheme}://{endpoint.netloc}",
522 "exp": timestamp + (VAPID_CLAIM_VALID_HOURS * 60 * 60),
524 vapid_headers = Vapid.from_string(self.
_vapid_prv_vapid_prv).sign(vapid_claims)
525 vapid_headers.update({
"urgency": priority,
"priority": priority})
526 response = webpusher.send(
527 data=json.dumps(payload), headers=vapid_headers, ttl=ttl
530 if response.status_code == 410:
531 _LOGGER.info(
"Notification channel has expired")
535 except HomeAssistantError:
537 _LOGGER.error(
"Error saving registration")
539 _LOGGER.info(
"Configuration saved")
540 elif response.status_code > 399:
542 "There was an issue sending the notification %s: %s",
543 response.status_code,
548 def add_jwt(timestamp, target, tag, jwt_secret):
549 """Create JWT json to put into payload."""
551 jwt_exp = datetime.fromtimestamp(timestamp) +
timedelta(days=JWT_VALID_DAYS)
559 return jwt.encode(jwt_claims, jwt_secret)
def _push_message(self, payload, **kwargs)
def dismiss(self, **kwargs)
def __init__(self, hass, vapid_prv, vapid_email, registrations, json_path)
def send_message(self, message="", **kwargs)
def async_dismiss(self, **kwargs)
def check_authorization_header(self, request)
def decode_jwt(self, token)
def __init__(self, registrations)
def delete(self, request)
def __init__(self, registrations, json_path)
def find_registration_name(self, data, suggested=None)
web.Response get(self, web.Request request, str config_key)
None async_create_html5_issue(HomeAssistant hass, bool import_success)
JsonObjectType _load_config(str filename)
CALLBACK_EVENT_PAYLOAD_SCHEMA
HTML5NotificationService|None async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
def add_jwt(timestamp, target, tag, jwt_secret)
str humanize_error(HomeAssistant hass, vol.Invalid validation_error, str domain, dict config, str|None link, int max_sub_error_length=MAX_VALIDATION_ERROR_ITEM_LENGTH)
None save_json(str filename, list|dict data, bool private=False, *type[json.JSONEncoder]|None encoder=None, bool atomic_writes=False)
JsonObjectType load_json_object(str|PathLike[str] filename, JsonObjectType default=_SENTINEL)
str ensure_unique_string(str preferred_string, Iterable[str]|KeysView[str] current_strings)