1 """File for Azure Event Hub models."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
6 from dataclasses
import dataclass
9 from azure.eventhub.aio
import EventHubProducerClient, EventHubSharedKeyCredential
11 from .const
import ADDITIONAL_ARGS, CONF_EVENT_HUB_CON_STRING
13 _LOGGER = logging.getLogger(__name__)
18 """Class for the Azure Event Hub client. Use from_input to initialize."""
20 event_hub_instance_name: str
24 def client(self) -> EventHubProducerClient:
25 """Return the client."""
28 """Test connection, will throw EventHubError when it cannot connect."""
29 async
with self.client
as client:
30 await client.get_eventhub_properties()
34 """Create the right class."""
35 if CONF_EVENT_HUB_CON_STRING
in kwargs:
42 """Class for Connection String based Azure Event Hub Client."""
44 event_hub_connection_string: str
47 def client(self) -> EventHubProducerClient:
48 """Return the client."""
49 return EventHubProducerClient.from_connection_string(
50 conn_str=self.event_hub_connection_string,
51 eventhub_name=self.event_hub_instance_name,
58 """Class for SAS based Azure Event Hub Client."""
60 event_hub_namespace: str
61 event_hub_sas_policy: str
62 event_hub_sas_key: str
65 def client(self) -> EventHubProducerClient:
66 """Get a Event Producer Client."""
67 return EventHubProducerClient(
68 fully_qualified_namespace=(
69 f
"{self.event_hub_namespace}.servicebus.windows.net"
71 eventhub_name=self.event_hub_instance_name,
72 credential=EventHubSharedKeyCredential(
73 policy=self.event_hub_sas_policy, key=self.event_hub_sas_key
EventHubProducerClient client(self)
EventHubProducerClient client(self)
AzureEventHubClient from_input(cls, **kwargs)
None test_connection(self)
EventHubProducerClient client(self)