Home Assistant Unofficial Reference 2024.12.1
client.py
Go to the documentation of this file.
1 """Setting up the Azure Data Explorer ingest client."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 import io
7 import logging
8 from typing import Any
9 
10 from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
11 from azure.kusto.data.data_format import DataFormat
12 from azure.kusto.ingest import (
13  IngestionProperties,
14  ManagedStreamingIngestClient,
15  QueuedIngestClient,
16  StreamDescriptor,
17 )
18 
19 from .const import (
20  CONF_ADX_CLUSTER_INGEST_URI,
21  CONF_ADX_DATABASE_NAME,
22  CONF_ADX_TABLE_NAME,
23  CONF_APP_REG_ID,
24  CONF_APP_REG_SECRET,
25  CONF_AUTHORITY_ID,
26  CONF_USE_QUEUED_CLIENT,
27 )
28 
29 _LOGGER = logging.getLogger(__name__)
30 
31 
33  """Class for Azure Data Explorer Client."""
34 
35  def __init__(self, data: Mapping[str, Any]) -> None:
36  """Create the right class."""
37 
38  self._database_database = data[CONF_ADX_DATABASE_NAME]
39  self._table_table = data[CONF_ADX_TABLE_NAME]
40  self._ingestion_properties_ingestion_properties = IngestionProperties(
41  database=self._database_database,
42  table=self._table_table,
43  data_format=DataFormat.MULTIJSON,
44  ingestion_mapping_reference="ha_json_mapping",
45  )
46 
47  # Create client for ingesting data
48  kcsb_ingest = (
49  KustoConnectionStringBuilder.with_aad_application_key_authentication(
50  data[CONF_ADX_CLUSTER_INGEST_URI],
51  data[CONF_APP_REG_ID],
52  data[CONF_APP_REG_SECRET],
53  data[CONF_AUTHORITY_ID],
54  )
55  )
56 
57  # Create client for querying data
58  kcsb_query = (
59  KustoConnectionStringBuilder.with_aad_application_key_authentication(
60  data[CONF_ADX_CLUSTER_INGEST_URI].replace("ingest-", ""),
61  data[CONF_APP_REG_ID],
62  data[CONF_APP_REG_SECRET],
63  data[CONF_AUTHORITY_ID],
64  )
65  )
66 
67  if data[CONF_USE_QUEUED_CLIENT] is True:
68  # Queued is the only option supported on free tier of ADX
69  self.write_clientwrite_client = QueuedIngestClient(kcsb_ingest)
70  else:
71  self.write_clientwrite_client = ManagedStreamingIngestClient(kcsb_ingest)
72 
73  self.query_clientquery_client = KustoClient(kcsb_query)
74 
75  # Reduce the HTTP logging, the default INFO logging is too verbose.
76  logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(
77  logging.WARNING
78  )
79 
80  def test_connection(self) -> None:
81  """Test connection, will throw Exception if it cannot connect."""
82 
83  query = f"{self._table} | take 1"
84 
85  self.query_clientquery_client.execute_query(self._database_database, query)
86 
87  def ingest_data(self, adx_events: str) -> None:
88  """Send data to Azure Data Explorer."""
89 
90  bytes_stream = io.StringIO(adx_events)
91  stream_descriptor = StreamDescriptor(bytes_stream)
92 
93  self.write_clientwrite_client.ingest_from_stream(
94  stream_descriptor, ingestion_properties=self._ingestion_properties_ingestion_properties
95  )