1 """Support for azure service bus notification."""
3 from __future__
import annotations
8 from azure.servicebus
import ServiceBusMessage
9 from azure.servicebus.aio
import ServiceBusClient, ServiceBusSender
10 from azure.servicebus.exceptions
import (
11 MessagingEntityNotFoundError,
12 ServiceBusConnectionError,
15 import voluptuous
as vol
21 PLATFORM_SCHEMA
as NOTIFY_PLATFORM_SCHEMA,
22 BaseNotificationService,
29 CONF_CONNECTION_STRING =
"connection_string"
30 CONF_QUEUE_NAME =
"queue"
31 CONF_TOPIC_NAME =
"topic"
33 ATTR_ASB_MESSAGE =
"message"
34 ATTR_ASB_TITLE =
"title"
35 ATTR_ASB_TARGET =
"target"
37 PLATFORM_SCHEMA = vol.All(
38 cv.has_at_least_one_key(CONF_QUEUE_NAME, CONF_TOPIC_NAME),
39 NOTIFY_PLATFORM_SCHEMA.extend(
41 vol.Required(CONF_CONNECTION_STRING): cv.string,
43 CONF_QUEUE_NAME,
"output",
"Can only send to a queue or a topic."
46 CONF_TOPIC_NAME,
"output",
"Can only send to a queue or a topic."
52 _LOGGER = logging.getLogger(__name__)
58 discovery_info: DiscoveryInfoType |
None =
None,
59 ) -> ServiceBusNotificationService |
None:
60 """Get the notification service."""
61 connection_string: str = config[CONF_CONNECTION_STRING]
62 queue_name: str |
None = config.get(CONF_QUEUE_NAME)
63 topic_name: str |
None = config.get(CONF_TOPIC_NAME)
67 servicebus = ServiceBusClient.from_connection_string(
68 connection_string, loop=hass.loop
71 client: ServiceBusSender |
None =
None
74 client = servicebus.get_queue_sender(queue_name)
76 client = servicebus.get_topic_sender(topic_name)
77 except (ServiceBusConnectionError, MessagingEntityNotFoundError)
as err:
79 "Connection error while creating client for queue/topic '%s'. %s",
80 queue_name
or topic_name,
89 """Implement the notification service for the service bus service."""
92 """Initialize the service."""
97 dto = {ATTR_ASB_MESSAGE: message}
99 if ATTR_TITLE
in kwargs:
100 dto[ATTR_ASB_TITLE] = kwargs[ATTR_TITLE]
101 if ATTR_TARGET
in kwargs:
102 dto[ATTR_ASB_TARGET] = kwargs[ATTR_TARGET]
104 if data := kwargs.get(ATTR_DATA):
107 queue_message = ServiceBusMessage(
108 json.dumps(dto), content_type=CONTENT_TYPE_JSON
111 await self.
_client_client.send_messages(queue_message)
112 except ServiceBusError
as err:
114 "Could not send service bus notification to %s. %s",
def __init__(self, client)
def async_send_message(self, message, **kwargs)
ServiceBusNotificationService|None get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)