1 """Support for Azure Event Hubs."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from datetime
import datetime
10 from types
import MappingProxyType
11 from typing
import Any
13 from azure.eventhub
import EventData, EventDataBatch
14 from azure.eventhub.aio
import EventHubProducerClient
15 from azure.eventhub.exceptions
import EventHubError
16 import voluptuous
as vol
30 from .client
import AzureEventHubClient
32 CONF_EVENT_HUB_CON_STRING,
33 CONF_EVENT_HUB_INSTANCE_NAME,
34 CONF_EVENT_HUB_NAMESPACE,
35 CONF_EVENT_HUB_SAS_KEY,
36 CONF_EVENT_HUB_SAS_POLICY,
45 type AzureEventHubConfigEntry = ConfigEntry[AzureEventHub]
47 _LOGGER = logging.getLogger(__name__)
49 CONFIG_SCHEMA = vol.Schema(
53 vol.Optional(CONF_EVENT_HUB_INSTANCE_NAME): cv.string,
54 vol.Optional(CONF_EVENT_HUB_CON_STRING): cv.string,
55 vol.Optional(CONF_EVENT_HUB_NAMESPACE): cv.string,
56 vol.Optional(CONF_EVENT_HUB_SAS_POLICY): cv.string,
57 vol.Optional(CONF_EVENT_HUB_SAS_KEY): cv.string,
58 vol.Optional(CONF_SEND_INTERVAL): cv.positive_int,
59 vol.Optional(CONF_MAX_DELAY): cv.positive_int,
60 vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
64 extra=vol.ALLOW_EXTRA,
66 DATA_COMPONENT: HassKey[EntityFilter] =
HassKey(DOMAIN)
69 async
def async_setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
70 """Activate Azure EH component from yaml.
72 Adds an empty filter to hass data.
73 Tries to get a filter from yaml, if present set to hass data.
74 If config is empty after getting the filter, return, otherwise emit
75 deprecated warning and pass the rest to the config flow.
77 if DOMAIN
not in yaml_config:
80 hass.data[DATA_COMPONENT] = yaml_config[DOMAIN].pop(CONF_FILTER)
82 if not yaml_config[DOMAIN]:
85 "Loading Azure Event Hub completely via yaml config is deprecated; Only the"
86 " Filter can be set in yaml, the rest is done through a config flow and has"
87 " been imported, all other keys but filter can be deleted from"
90 hass.async_create_task(
91 hass.config_entries.flow.async_init(
92 DOMAIN, context={
"source": SOURCE_IMPORT}, data=yaml_config[DOMAIN]
99 hass: HomeAssistant, entry: AzureEventHubConfigEntry
101 """Do the setup based on the config entry and the filter from yaml."""
105 hass.data[DATA_COMPONENT],
108 await hub.async_test_connection()
109 except EventHubError
as err:
111 entry.runtime_data = hub
112 entry.async_on_unload(hub.async_stop)
113 entry.async_on_unload(entry.add_update_listener(async_update_listener))
114 await hub.async_start()
119 hass: HomeAssistant, entry: AzureEventHubConfigEntry
121 """Update listener for options."""
122 entry.runtime_data.update_options(entry.options)
126 hass: HomeAssistant, entry: AzureEventHubConfigEntry
128 """Unload a config entry."""
133 """A event handler class for Azure Event Hub."""
139 entities_filter: EntityFilter,
141 """Initialize the listener."""
146 self.
_client_client = AzureEventHubClient.from_input(**self.
_entry_entry.data)
148 self.
_max_delay_max_delay = self.
_entry_entry.options.get(CONF_MAX_DELAY, DEFAULT_MAX_DELAY)
151 self._queue: asyncio.PriorityQueue[
152 tuple[int, tuple[datetime, State |
None]]
153 ] = asyncio.PriorityQueue()
160 This suppresses logging and register the listener and
161 schedules the first send.
163 Suppress the INFO and below logging on the underlying packages,
164 they are very verbose, even at INFO.
166 logging.getLogger(
"azure.eventhub").setLevel(logging.WARNING)
173 """Shut down the AEH by queueing None, calling send, join queue."""
178 await self._queue.put((3, (
utcnow(),
None)))
180 await self._queue.join()
183 """Update options."""
184 self.
_send_interval_send_interval = new_options[CONF_SEND_INTERVAL]
187 """Test the connection to the event hub."""
191 """Schedule the next send."""
198 """Listen for new messages on the bus and queue them for AEH."""
199 if state := event.data.get(
"new_state"):
200 await self._queue.put((2, (event.time_fired, state)))
203 """Write preprocessed events to eventhub, with retry."""
204 async
with self.
_client_client.client
as client:
205 while not self._queue.empty():
206 if event_batch := await self.
fill_batchfill_batch(client):
207 _LOGGER.debug(
"Sending %d event(s)", len(event_batch))
209 await client.send_batch(event_batch)
210 except EventHubError
as exc:
211 _LOGGER.error(
"Error in sending events to Event Hub: %s", exc)
214 async
def fill_batch(self, client: EventHubProducerClient) -> EventDataBatch:
215 """Return a batch of events formatted for sending to Event Hub.
217 Uses get_nowait instead of await get, because the functions batches and
218 doesn't wait for each single event.
220 Throws ValueError on add to batch when the EventDataBatch object reaches
221 max_size. Put the item back in the queue and the next batch will include
224 event_batch = await client.create_batch()
228 _, event = self._queue.get_nowait()
229 except asyncio.QueueEmpty:
231 event_data, dropped = self.
_parse_event_parse_event(*event, dropped)
235 event_batch.add(event_data)
237 self._queue.put_nowait((1, event))
242 "Dropped %d old events, consider filtering messages", dropped
247 self, time_fired: datetime, state: State |
None, dropped: int
248 ) -> tuple[EventData |
None, int]:
249 """Parse event by checking if it needs to be sent, and format it."""
250 self._queue.task_done()
254 if state.state
in FILTER_STATES
or not self.
_entities_filter_entities_filter(state.entity_id):
257 return None, dropped + 1
259 EventData(json.dumps(obj=state, cls=JSONEncoder).encode(
"utf-8")),
EventDataBatch fill_batch(self, EventHubProducerClient client)
None async_listen(self, Event event)
tuple[EventData|None, int] _parse_event(self, datetime time_fired, State|None state, int dropped)
None __init__(self, HomeAssistant hass, ConfigEntry entry, EntityFilter entities_filter)
None async_test_connection(self)
None update_options(self, MappingProxyType[str, Any] new_options)
None _schedule_next_send(self)
None async_update_listener(HomeAssistant hass, AzureEventHubConfigEntry entry)
bool async_setup_entry(HomeAssistant hass, AzureEventHubConfigEntry entry)
bool async_setup(HomeAssistant hass, ConfigType yaml_config)
bool async_unload_entry(HomeAssistant hass, AzureEventHubConfigEntry entry)
str test_connection(HomeAssistant hass, data)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)