Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """Support for azure service bus notification."""
2 
3 from __future__ import annotations
4 
5 import json
6 import logging
7 
8 from azure.servicebus import ServiceBusMessage
9 from azure.servicebus.aio import ServiceBusClient, ServiceBusSender
10 from azure.servicebus.exceptions import (
11  MessagingEntityNotFoundError,
12  ServiceBusConnectionError,
13  ServiceBusError,
14 )
15 import voluptuous as vol
16 
18  ATTR_DATA,
19  ATTR_TARGET,
20  ATTR_TITLE,
21  PLATFORM_SCHEMA as NOTIFY_PLATFORM_SCHEMA,
22  BaseNotificationService,
23 )
24 from homeassistant.const import CONTENT_TYPE_JSON
25 from homeassistant.core import HomeAssistant
27 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
28 
29 CONF_CONNECTION_STRING = "connection_string"
30 CONF_QUEUE_NAME = "queue"
31 CONF_TOPIC_NAME = "topic"
32 
33 ATTR_ASB_MESSAGE = "message"
34 ATTR_ASB_TITLE = "title"
35 ATTR_ASB_TARGET = "target"
36 
37 PLATFORM_SCHEMA = vol.All(
38  cv.has_at_least_one_key(CONF_QUEUE_NAME, CONF_TOPIC_NAME),
39  NOTIFY_PLATFORM_SCHEMA.extend(
40  {
41  vol.Required(CONF_CONNECTION_STRING): cv.string,
42  vol.Exclusive(
43  CONF_QUEUE_NAME, "output", "Can only send to a queue or a topic."
44  ): cv.string,
45  vol.Exclusive(
46  CONF_TOPIC_NAME, "output", "Can only send to a queue or a topic."
47  ): cv.string,
48  }
49  ),
50 )
51 
52 _LOGGER = logging.getLogger(__name__)
53 
54 
56  hass: HomeAssistant,
57  config: ConfigType,
58  discovery_info: DiscoveryInfoType | None = None,
59 ) -> ServiceBusNotificationService | None:
60  """Get the notification service."""
61  connection_string: str = config[CONF_CONNECTION_STRING]
62  queue_name: str | None = config.get(CONF_QUEUE_NAME)
63  topic_name: str | None = config.get(CONF_TOPIC_NAME)
64 
65  # Library can do synchronous IO when creating the clients.
66  # Passes in loop here, but can't run setup on the event loop.
67  servicebus = ServiceBusClient.from_connection_string(
68  connection_string, loop=hass.loop
69  )
70 
71  client: ServiceBusSender | None = None
72  try:
73  if queue_name:
74  client = servicebus.get_queue_sender(queue_name)
75  elif topic_name:
76  client = servicebus.get_topic_sender(topic_name)
77  except (ServiceBusConnectionError, MessagingEntityNotFoundError) as err:
78  _LOGGER.error(
79  "Connection error while creating client for queue/topic '%s'. %s",
80  queue_name or topic_name,
81  err,
82  )
83  return None
84 
85  return ServiceBusNotificationService(client) if client else None
86 
87 
88 class ServiceBusNotificationService(BaseNotificationService):
89  """Implement the notification service for the service bus service."""
90 
91  def __init__(self, client):
92  """Initialize the service."""
93  self._client_client = client
94 
95  async def async_send_message(self, message, **kwargs):
96  """Send a message."""
97  dto = {ATTR_ASB_MESSAGE: message}
98 
99  if ATTR_TITLE in kwargs:
100  dto[ATTR_ASB_TITLE] = kwargs[ATTR_TITLE]
101  if ATTR_TARGET in kwargs:
102  dto[ATTR_ASB_TARGET] = kwargs[ATTR_TARGET]
103 
104  if data := kwargs.get(ATTR_DATA):
105  dto.update(data)
106 
107  queue_message = ServiceBusMessage(
108  json.dumps(dto), content_type=CONTENT_TYPE_JSON
109  )
110  try:
111  await self._client_client.send_messages(queue_message)
112  except ServiceBusError as err:
113  _LOGGER.error(
114  "Could not send service bus notification to %s. %s",
115  self._client_client.name,
116  err,
117  )
ServiceBusNotificationService|None get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:59