1 """The Azure Data Explorer integration."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from dataclasses
import dataclass
8 from datetime
import datetime
12 from azure.kusto.data.exceptions
import KustoAuthenticationError, KustoServiceError
13 import voluptuous
as vol
26 from .client
import AzureDataExplorerClient
36 _LOGGER = logging.getLogger(__name__)
38 CONFIG_SCHEMA = vol.Schema(
42 vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
46 extra=vol.ALLOW_EXTRA,
48 DATA_COMPONENT: HassKey[EntityFilter] =
HassKey(DOMAIN)
54 """Class for capturing a filter test."""
60 async
def async_setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
61 """Activate ADX component from yaml.
63 Adds an empty filter to hass data.
64 Tries to get a filter from yaml, if present set to hass data.
66 if DOMAIN
in yaml_config:
67 hass.data[DATA_COMPONENT] = yaml_config[DOMAIN].pop(CONF_FILTER)
75 """Do the setup based on the config entry and the filter from yaml."""
78 await adx.test_connection()
79 except KustoServiceError
as exp:
81 "Could not find Azure Data Explorer database or table"
83 except KustoAuthenticationError:
86 entry.async_on_unload(adx.async_stop)
87 await adx.async_start()
92 """Unload a config entry."""
97 """A event handler class for Azure Data Explorer."""
104 """Initialize the listener."""
117 self._queue: asyncio.Queue[tuple[datetime, State]] = asyncio.Queue()
122 """Start the component.
124 This register the listener and
125 schedules the first send.
134 """Shut down the ADX by queueing None, calling send, join queue."""
143 """Test the connection to the Azure Data Explorer service."""
144 await self.
hasshass.async_add_executor_job(self.
_client_client.test_connection)
147 """Schedule the next send."""
156 """Listen for new messages on the bus and queue them for ADX."""
157 if state := event.data.get(
"new_state"):
158 await self._queue.put((event.time_fired, state))
161 """Write preprocessed events to Azure Data Explorer."""
165 while not self._queue.empty():
166 (time_fired, event) = self._queue.get_nowait()
167 adx_event, dropped = self.
_parse_event_parse_event(time_fired, event, dropped)
168 self._queue.task_done()
169 if adx_event
is not None:
170 adx_events.append(adx_event)
174 "Dropped %d old events, consider filtering messages", dropped
178 event_string =
"".join(adx_events)
181 await self.
hasshass.async_add_executor_job(
182 self.
_client_client.ingest_data, event_string
185 except KustoServiceError
as err:
186 _LOGGER.error(
"Could not find database or table: %s", err)
187 except KustoAuthenticationError
as err:
188 _LOGGER.error(
"Could not authenticate to Azure Data Explorer: %s", err)
194 time_fired: datetime,
197 ) -> tuple[str |
None, int]:
198 """Parse event by checking if it needs to be sent, and format it."""
200 if state.state
in FILTER_STATES
or not self.
_entities_filter_entities_filter(state.entity_id):
203 return None, dropped + 1
205 json_event = json.dumps(obj=state, cls=ExtendedJSONEncoder)
207 return (json_event, dropped)
None test_connection(self)
None async_listen(self, Event event)
tuple[str|None, int] _parse_event(self, datetime time_fired, State state, int dropped)
None __init__(self, HomeAssistant hass, ConfigEntry entry)
None _schedule_next_send(self)
bool async_setup(HomeAssistant hass, ConfigType yaml_config)
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)