Home Assistant Unofficial Reference 2024.12.1
client.py
Go to the documentation of this file.
1 """File for Azure Event Hub models."""
2 
3 from __future__ import annotations
4 
5 from abc import ABC, abstractmethod
6 from dataclasses import dataclass
7 import logging
8 
9 from azure.eventhub.aio import EventHubProducerClient, EventHubSharedKeyCredential
10 
11 from .const import ADDITIONAL_ARGS, CONF_EVENT_HUB_CON_STRING
12 
13 _LOGGER = logging.getLogger(__name__)
14 
15 
16 @dataclass
18  """Class for the Azure Event Hub client. Use from_input to initialize."""
19 
20  event_hub_instance_name: str
21 
22  @property
23  @abstractmethod
24  def client(self) -> EventHubProducerClient:
25  """Return the client."""
26 
27  async def test_connection(self) -> None:
28  """Test connection, will throw EventHubError when it cannot connect."""
29  async with self.client as client:
30  await client.get_eventhub_properties()
31 
32  @classmethod
33  def from_input(cls, **kwargs) -> AzureEventHubClient:
34  """Create the right class."""
35  if CONF_EVENT_HUB_CON_STRING in kwargs:
37  return AzureEventHubClientSAS(**kwargs)
38 
39 
40 @dataclass
42  """Class for Connection String based Azure Event Hub Client."""
43 
44  event_hub_connection_string: str
45 
46  @property
47  def client(self) -> EventHubProducerClient:
48  """Return the client."""
49  return EventHubProducerClient.from_connection_string(
50  conn_str=self.event_hub_connection_string,
51  eventhub_name=self.event_hub_instance_name,
52  **ADDITIONAL_ARGS,
53  )
54 
55 
56 @dataclass
58  """Class for SAS based Azure Event Hub Client."""
59 
60  event_hub_namespace: str
61  event_hub_sas_policy: str
62  event_hub_sas_key: str
63 
64  @property
65  def client(self) -> EventHubProducerClient:
66  """Get a Event Producer Client."""
67  return EventHubProducerClient(
68  fully_qualified_namespace=(
69  f"{self.event_hub_namespace}.servicebus.windows.net"
70  ),
71  eventhub_name=self.event_hub_instance_name,
72  credential=EventHubSharedKeyCredential(
73  policy=self.event_hub_sas_policy, key=self.event_hub_sas_key
74  ),
75  **ADDITIONAL_ARGS,
76  )