1 """Setting up the Azure Data Explorer ingest client."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
10 from azure.kusto.data
import KustoClient, KustoConnectionStringBuilder
11 from azure.kusto.data.data_format
import DataFormat
12 from azure.kusto.ingest
import (
14 ManagedStreamingIngestClient,
20 CONF_ADX_CLUSTER_INGEST_URI,
21 CONF_ADX_DATABASE_NAME,
26 CONF_USE_QUEUED_CLIENT,
29 _LOGGER = logging.getLogger(__name__)
33 """Class for Azure Data Explorer Client."""
35 def __init__(self, data: Mapping[str, Any]) ->
None:
36 """Create the right class."""
38 self.
_database_database = data[CONF_ADX_DATABASE_NAME]
39 self.
_table_table = data[CONF_ADX_TABLE_NAME]
43 data_format=DataFormat.MULTIJSON,
44 ingestion_mapping_reference=
"ha_json_mapping",
49 KustoConnectionStringBuilder.with_aad_application_key_authentication(
50 data[CONF_ADX_CLUSTER_INGEST_URI],
51 data[CONF_APP_REG_ID],
52 data[CONF_APP_REG_SECRET],
53 data[CONF_AUTHORITY_ID],
59 KustoConnectionStringBuilder.with_aad_application_key_authentication(
60 data[CONF_ADX_CLUSTER_INGEST_URI].replace(
"ingest-",
""),
61 data[CONF_APP_REG_ID],
62 data[CONF_APP_REG_SECRET],
63 data[CONF_AUTHORITY_ID],
67 if data[CONF_USE_QUEUED_CLIENT]
is True:
71 self.
write_clientwrite_client = ManagedStreamingIngestClient(kcsb_ingest)
76 logging.getLogger(
"azure.core.pipeline.policies.http_logging_policy").setLevel(
81 """Test connection, will throw Exception if it cannot connect."""
83 query = f
"{self._table} | take 1"
88 """Send data to Azure Data Explorer."""
90 bytes_stream = io.StringIO(adx_events)
91 stream_descriptor = StreamDescriptor(bytes_stream)
None test_connection(self)
None __init__(self, Mapping[str, Any] data)
None ingest_data(self, str adx_events)