Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for Azure Event Hubs."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from datetime import datetime
8 import json
9 import logging
10 from types import MappingProxyType
11 from typing import Any
12 
13 from azure.eventhub import EventData, EventDataBatch
14 from azure.eventhub.aio import EventHubProducerClient
15 from azure.eventhub.exceptions import EventHubError
16 import voluptuous as vol
17 
18 from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
19 from homeassistant.const import MATCH_ALL
20 from homeassistant.core import Event, HomeAssistant, State
21 from homeassistant.exceptions import ConfigEntryNotReady
23 from homeassistant.helpers.entityfilter import FILTER_SCHEMA, EntityFilter
24 from homeassistant.helpers.event import async_call_later
25 from homeassistant.helpers.json import JSONEncoder
26 from homeassistant.helpers.typing import ConfigType
27 from homeassistant.util.dt import utcnow
28 from homeassistant.util.hass_dict import HassKey
29 
30 from .client import AzureEventHubClient
31 from .const import (
32  CONF_EVENT_HUB_CON_STRING,
33  CONF_EVENT_HUB_INSTANCE_NAME,
34  CONF_EVENT_HUB_NAMESPACE,
35  CONF_EVENT_HUB_SAS_KEY,
36  CONF_EVENT_HUB_SAS_POLICY,
37  CONF_FILTER,
38  CONF_MAX_DELAY,
39  CONF_SEND_INTERVAL,
40  DEFAULT_MAX_DELAY,
41  DOMAIN,
42  FILTER_STATES,
43 )
44 
45 type AzureEventHubConfigEntry = ConfigEntry[AzureEventHub]
46 
47 _LOGGER = logging.getLogger(__name__)
48 
49 CONFIG_SCHEMA = vol.Schema(
50  {
51  DOMAIN: vol.Schema(
52  {
53  vol.Optional(CONF_EVENT_HUB_INSTANCE_NAME): cv.string,
54  vol.Optional(CONF_EVENT_HUB_CON_STRING): cv.string,
55  vol.Optional(CONF_EVENT_HUB_NAMESPACE): cv.string,
56  vol.Optional(CONF_EVENT_HUB_SAS_POLICY): cv.string,
57  vol.Optional(CONF_EVENT_HUB_SAS_KEY): cv.string,
58  vol.Optional(CONF_SEND_INTERVAL): cv.positive_int,
59  vol.Optional(CONF_MAX_DELAY): cv.positive_int,
60  vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
61  },
62  )
63  },
64  extra=vol.ALLOW_EXTRA,
65 )
66 DATA_COMPONENT: HassKey[EntityFilter] = HassKey(DOMAIN)
67 
68 
69 async def async_setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
70  """Activate Azure EH component from yaml.
71 
72  Adds an empty filter to hass data.
73  Tries to get a filter from yaml, if present set to hass data.
74  If config is empty after getting the filter, return, otherwise emit
75  deprecated warning and pass the rest to the config flow.
76  """
77  if DOMAIN not in yaml_config:
78  hass.data[DATA_COMPONENT] = FILTER_SCHEMA({})
79  return True
80  hass.data[DATA_COMPONENT] = yaml_config[DOMAIN].pop(CONF_FILTER)
81 
82  if not yaml_config[DOMAIN]:
83  return True
84  _LOGGER.warning(
85  "Loading Azure Event Hub completely via yaml config is deprecated; Only the"
86  " Filter can be set in yaml, the rest is done through a config flow and has"
87  " been imported, all other keys but filter can be deleted from"
88  " configuration.yaml"
89  )
90  hass.async_create_task(
91  hass.config_entries.flow.async_init(
92  DOMAIN, context={"source": SOURCE_IMPORT}, data=yaml_config[DOMAIN]
93  )
94  )
95  return True
96 
97 
99  hass: HomeAssistant, entry: AzureEventHubConfigEntry
100 ) -> bool:
101  """Do the setup based on the config entry and the filter from yaml."""
102  hub = AzureEventHub(
103  hass,
104  entry,
105  hass.data[DATA_COMPONENT],
106  )
107  try:
108  await hub.async_test_connection()
109  except EventHubError as err:
110  raise ConfigEntryNotReady("Could not connect to Azure Event Hub") from err
111  entry.runtime_data = hub
112  entry.async_on_unload(hub.async_stop)
113  entry.async_on_unload(entry.add_update_listener(async_update_listener))
114  await hub.async_start()
115  return True
116 
117 
119  hass: HomeAssistant, entry: AzureEventHubConfigEntry
120 ) -> None:
121  """Update listener for options."""
122  entry.runtime_data.update_options(entry.options)
123 
124 
126  hass: HomeAssistant, entry: AzureEventHubConfigEntry
127 ) -> bool:
128  """Unload a config entry."""
129  return True
130 
131 
133  """A event handler class for Azure Event Hub."""
134 
135  def __init__(
136  self,
137  hass: HomeAssistant,
138  entry: ConfigEntry,
139  entities_filter: EntityFilter,
140  ) -> None:
141  """Initialize the listener."""
142  self.hasshass = hass
143  self._entry_entry = entry
144  self._entities_filter_entities_filter = entities_filter
145 
146  self._client_client = AzureEventHubClient.from_input(**self._entry_entry.data)
147  self._send_interval_send_interval = self._entry_entry.options[CONF_SEND_INTERVAL]
148  self._max_delay_max_delay = self._entry_entry.options.get(CONF_MAX_DELAY, DEFAULT_MAX_DELAY)
149 
150  self._shutdown_shutdown = False
151  self._queue: asyncio.PriorityQueue[
152  tuple[int, tuple[datetime, State | None]]
153  ] = asyncio.PriorityQueue()
154  self._listener_remover_listener_remover: Callable[[], None] | None = None
155  self._next_send_remover_next_send_remover: Callable[[], None] | None = None
156 
157  async def async_start(self) -> None:
158  """Start the hub.
159 
160  This suppresses logging and register the listener and
161  schedules the first send.
162 
163  Suppress the INFO and below logging on the underlying packages,
164  they are very verbose, even at INFO.
165  """
166  logging.getLogger("azure.eventhub").setLevel(logging.WARNING)
167  self._listener_remover_listener_remover = self.hasshass.bus.async_listen(
168  MATCH_ALL, self.async_listenasync_listen
169  )
170  self._schedule_next_send_schedule_next_send()
171 
172  async def async_stop(self) -> None:
173  """Shut down the AEH by queueing None, calling send, join queue."""
174  if self._next_send_remover_next_send_remover:
175  self._next_send_remover_next_send_remover()
176  if self._listener_remover_listener_remover:
177  self._listener_remover_listener_remover()
178  await self._queue.put((3, (utcnow(), None)))
179  await self.async_sendasync_send(None)
180  await self._queue.join()
181 
182  def update_options(self, new_options: MappingProxyType[str, Any]) -> None:
183  """Update options."""
184  self._send_interval_send_interval = new_options[CONF_SEND_INTERVAL]
185 
186  async def async_test_connection(self) -> None:
187  """Test the connection to the event hub."""
188  await self._client_client.test_connection()
189 
190  def _schedule_next_send(self) -> None:
191  """Schedule the next send."""
192  if not self._shutdown_shutdown:
193  self._next_send_remover_next_send_remover = async_call_later(
194  self.hasshass, self._send_interval_send_interval, self.async_sendasync_send
195  )
196 
197  async def async_listen(self, event: Event) -> None:
198  """Listen for new messages on the bus and queue them for AEH."""
199  if state := event.data.get("new_state"):
200  await self._queue.put((2, (event.time_fired, state)))
201 
202  async def async_send(self, _) -> None:
203  """Write preprocessed events to eventhub, with retry."""
204  async with self._client_client.client as client:
205  while not self._queue.empty():
206  if event_batch := await self.fill_batchfill_batch(client):
207  _LOGGER.debug("Sending %d event(s)", len(event_batch))
208  try:
209  await client.send_batch(event_batch)
210  except EventHubError as exc:
211  _LOGGER.error("Error in sending events to Event Hub: %s", exc)
212  self._schedule_next_send_schedule_next_send()
213 
214  async def fill_batch(self, client: EventHubProducerClient) -> EventDataBatch:
215  """Return a batch of events formatted for sending to Event Hub.
216 
217  Uses get_nowait instead of await get, because the functions batches and
218  doesn't wait for each single event.
219 
220  Throws ValueError on add to batch when the EventDataBatch object reaches
221  max_size. Put the item back in the queue and the next batch will include
222  it.
223  """
224  event_batch = await client.create_batch()
225  dropped = 0
226  while not self._shutdown_shutdown:
227  try:
228  _, event = self._queue.get_nowait()
229  except asyncio.QueueEmpty:
230  break
231  event_data, dropped = self._parse_event_parse_event(*event, dropped)
232  if not event_data:
233  continue
234  try:
235  event_batch.add(event_data)
236  except ValueError:
237  self._queue.put_nowait((1, event))
238  break
239 
240  if dropped:
241  _LOGGER.warning(
242  "Dropped %d old events, consider filtering messages", dropped
243  )
244  return event_batch
245 
247  self, time_fired: datetime, state: State | None, dropped: int
248  ) -> tuple[EventData | None, int]:
249  """Parse event by checking if it needs to be sent, and format it."""
250  self._queue.task_done()
251  if not state:
252  self._shutdown_shutdown = True
253  return None, dropped
254  if state.state in FILTER_STATES or not self._entities_filter_entities_filter(state.entity_id):
255  return None, dropped
256  if (utcnow() - time_fired).seconds > self._max_delay_max_delay + self._send_interval_send_interval:
257  return None, dropped + 1
258  return (
259  EventData(json.dumps(obj=state, cls=JSONEncoder).encode("utf-8")),
260  dropped,
261  )
EventDataBatch fill_batch(self, EventHubProducerClient client)
Definition: __init__.py:214
tuple[EventData|None, int] _parse_event(self, datetime time_fired, State|None state, int dropped)
Definition: __init__.py:248
None __init__(self, HomeAssistant hass, ConfigEntry entry, EntityFilter entities_filter)
Definition: __init__.py:140
None update_options(self, MappingProxyType[str, Any] new_options)
Definition: __init__.py:182
None async_update_listener(HomeAssistant hass, AzureEventHubConfigEntry entry)
Definition: __init__.py:120
bool async_setup_entry(HomeAssistant hass, AzureEventHubConfigEntry entry)
Definition: __init__.py:100
bool async_setup(HomeAssistant hass, ConfigType yaml_config)
Definition: __init__.py:69
bool async_unload_entry(HomeAssistant hass, AzureEventHubConfigEntry entry)
Definition: __init__.py:127
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597