Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The Azure Data Explorer integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from dataclasses import dataclass
8 from datetime import datetime
9 import json
10 import logging
11 
12 from azure.kusto.data.exceptions import KustoAuthenticationError, KustoServiceError
13 import voluptuous as vol
14 
15 from homeassistant.config_entries import ConfigEntry
16 from homeassistant.const import MATCH_ALL
17 from homeassistant.core import Event, HomeAssistant, State
18 from homeassistant.exceptions import ConfigEntryError
19 from homeassistant.helpers.entityfilter import FILTER_SCHEMA, EntityFilter
20 from homeassistant.helpers.event import async_call_later
21 from homeassistant.helpers.json import ExtendedJSONEncoder
22 from homeassistant.helpers.typing import ConfigType
23 from homeassistant.util.dt import utcnow
24 from homeassistant.util.hass_dict import HassKey
25 
26 from .client import AzureDataExplorerClient
27 from .const import (
28  CONF_APP_REG_SECRET,
29  CONF_FILTER,
30  CONF_SEND_INTERVAL,
31  DEFAULT_MAX_DELAY,
32  DOMAIN,
33  FILTER_STATES,
34 )
35 
36 _LOGGER = logging.getLogger(__name__)
37 
38 CONFIG_SCHEMA = vol.Schema(
39  {
40  DOMAIN: vol.Schema(
41  {
42  vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
43  },
44  )
45  },
46  extra=vol.ALLOW_EXTRA,
47 )
48 DATA_COMPONENT: HassKey[EntityFilter] = HassKey(DOMAIN)
49 
50 
51 # fixtures for both init and config flow tests
52 @dataclass
53 class FilterTest:
54  """Class for capturing a filter test."""
55 
56  entity_id: str
57  expect_called: bool
58 
59 
60 async def async_setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
61  """Activate ADX component from yaml.
62 
63  Adds an empty filter to hass data.
64  Tries to get a filter from yaml, if present set to hass data.
65  """
66  if DOMAIN in yaml_config:
67  hass.data[DATA_COMPONENT] = yaml_config[DOMAIN].pop(CONF_FILTER)
68  else:
69  hass.data[DATA_COMPONENT] = FILTER_SCHEMA({})
70 
71  return True
72 
73 
74 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
75  """Do the setup based on the config entry and the filter from yaml."""
76  adx = AzureDataExplorer(hass, entry)
77  try:
78  await adx.test_connection()
79  except KustoServiceError as exp:
80  raise ConfigEntryError(
81  "Could not find Azure Data Explorer database or table"
82  ) from exp
83  except KustoAuthenticationError:
84  return False
85 
86  entry.async_on_unload(adx.async_stop)
87  await adx.async_start()
88  return True
89 
90 
91 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
92  """Unload a config entry."""
93  return True
94 
95 
97  """A event handler class for Azure Data Explorer."""
98 
99  def __init__(
100  self,
101  hass: HomeAssistant,
102  entry: ConfigEntry,
103  ) -> None:
104  """Initialize the listener."""
105 
106  self.hasshass = hass
107  self._entry_entry = entry
108  self._entities_filter_entities_filter = hass.data[DATA_COMPONENT]
109 
110  self._client_client = AzureDataExplorerClient(entry.data)
111 
112  self._send_interval_send_interval = entry.options[CONF_SEND_INTERVAL]
113  self._client_secret_client_secret = entry.data[CONF_APP_REG_SECRET]
114  self._max_delay_max_delay = DEFAULT_MAX_DELAY
115 
116  self._shutdown_shutdown = False
117  self._queue: asyncio.Queue[tuple[datetime, State]] = asyncio.Queue()
118  self._listener_remover_listener_remover: Callable[[], None] | None = None
119  self._next_send_remover_next_send_remover: Callable[[], None] | None = None
120 
121  async def async_start(self) -> None:
122  """Start the component.
123 
124  This register the listener and
125  schedules the first send.
126  """
127 
128  self._listener_remover_listener_remover = self.hasshass.bus.async_listen(
129  MATCH_ALL, self.async_listenasync_listen
130  )
131  self._schedule_next_send_schedule_next_send()
132 
133  async def async_stop(self) -> None:
134  """Shut down the ADX by queueing None, calling send, join queue."""
135  if self._next_send_remover_next_send_remover:
136  self._next_send_remover_next_send_remover()
137  if self._listener_remover_listener_remover:
138  self._listener_remover_listener_remover()
139  self._shutdown_shutdown = True
140  await self.async_sendasync_send(None)
141 
142  async def test_connection(self) -> None:
143  """Test the connection to the Azure Data Explorer service."""
144  await self.hasshass.async_add_executor_job(self._client_client.test_connection)
145 
146  def _schedule_next_send(self) -> None:
147  """Schedule the next send."""
148  if not self._shutdown_shutdown:
149  if self._next_send_remover_next_send_remover:
150  self._next_send_remover_next_send_remover()
151  self._next_send_remover_next_send_remover = async_call_later(
152  self.hasshass, self._send_interval_send_interval, self.async_sendasync_send
153  )
154 
155  async def async_listen(self, event: Event) -> None:
156  """Listen for new messages on the bus and queue them for ADX."""
157  if state := event.data.get("new_state"):
158  await self._queue.put((event.time_fired, state))
159 
160  async def async_send(self, _) -> None:
161  """Write preprocessed events to Azure Data Explorer."""
162 
163  adx_events = []
164  dropped = 0
165  while not self._queue.empty():
166  (time_fired, event) = self._queue.get_nowait()
167  adx_event, dropped = self._parse_event_parse_event(time_fired, event, dropped)
168  self._queue.task_done()
169  if adx_event is not None:
170  adx_events.append(adx_event)
171 
172  if dropped:
173  _LOGGER.warning(
174  "Dropped %d old events, consider filtering messages", dropped
175  )
176 
177  if adx_events:
178  event_string = "".join(adx_events)
179 
180  try:
181  await self.hasshass.async_add_executor_job(
182  self._client_client.ingest_data, event_string
183  )
184 
185  except KustoServiceError as err:
186  _LOGGER.error("Could not find database or table: %s", err)
187  except KustoAuthenticationError as err:
188  _LOGGER.error("Could not authenticate to Azure Data Explorer: %s", err)
189 
190  self._schedule_next_send_schedule_next_send()
191 
193  self,
194  time_fired: datetime,
195  state: State,
196  dropped: int,
197  ) -> tuple[str | None, int]:
198  """Parse event by checking if it needs to be sent, and format it."""
199 
200  if state.state in FILTER_STATES or not self._entities_filter_entities_filter(state.entity_id):
201  return None, dropped
202  if (utcnow() - time_fired).seconds > DEFAULT_MAX_DELAY + self._send_interval_send_interval:
203  return None, dropped + 1
204 
205  json_event = json.dumps(obj=state, cls=ExtendedJSONEncoder)
206 
207  return (json_event, dropped)
tuple[str|None, int] _parse_event(self, datetime time_fired, State state, int dropped)
Definition: __init__.py:197
None __init__(self, HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:103
bool async_setup(HomeAssistant hass, ConfigType yaml_config)
Definition: __init__.py:60
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:91
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:74
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597