1 """Support for sending data to an Influx database."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from contextlib
import suppress
7 from dataclasses
import dataclass
13 from typing
import Any
15 from influxdb
import InfluxDBClient, exceptions
16 from influxdb_client
import InfluxDBClient
as InfluxDBClientV2
17 from influxdb_client.client.write_api
import ASYNCHRONOUS, SYNCHRONOUS
18 from influxdb_client.rest
import ApiException
19 import requests.exceptions
20 import urllib3.exceptions
21 import voluptuous
as vol
33 CONF_UNIT_OF_MEASUREMENT,
37 EVENT_HOMEASSISTANT_STOP,
47 INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA,
48 convert_include_exclude_filter,
60 COMPONENT_CONFIG_SCHEMA_CONNECTION,
63 CONF_COMPONENT_CONFIG,
64 CONF_COMPONENT_CONFIG_DOMAIN,
65 CONF_COMPONENT_CONFIG_GLOB,
67 CONF_DEFAULT_MEASUREMENT,
68 CONF_IGNORE_ATTRIBUTES,
69 CONF_MEASUREMENT_ATTR,
71 CONF_OVERRIDE_MEASUREMENT,
80 DEFAULT_MEASUREMENT_ATTR,
85 INFLUX_CONF_MEASUREMENT,
92 QUEUE_BACKLOG_SECONDS,
106 _LOGGER = logging.getLogger(__name__)
110 """Build URL used from config inputs and default when necessary."""
111 if conf[CONF_API_VERSION] == API_VERSION_2:
112 if CONF_SSL
not in conf:
113 conf[CONF_SSL] = DEFAULT_SSL_V2
114 if CONF_HOST
not in conf:
115 conf[CONF_HOST] = DEFAULT_HOST_V2
117 url = conf[CONF_HOST]
119 url = f
"https://{url}"
121 url = f
"http://{url}"
123 if CONF_PORT
in conf:
124 url = f
"{url}:{conf[CONF_PORT]}"
126 if CONF_PATH
in conf:
127 url = f
"{url}{conf[CONF_PATH]}"
135 """Ensure correct config fields are provided based on API version used."""
136 if conf[CONF_API_VERSION] == API_VERSION_2:
137 if CONF_TOKEN
not in conf:
139 f
"{CONF_TOKEN} and {CONF_BUCKET} are required when"
140 f
" {CONF_API_VERSION} is {API_VERSION_2}"
143 if CONF_USERNAME
in conf:
145 f
"{CONF_USERNAME} and {CONF_PASSWORD} are only allowed when"
146 f
" {CONF_API_VERSION} is {DEFAULT_API_VERSION}"
149 elif CONF_TOKEN
in conf:
151 f
"{CONF_TOKEN} and {CONF_BUCKET} are only allowed when"
152 f
" {CONF_API_VERSION} is {API_VERSION_2}"
158 _CUSTOMIZE_ENTITY_SCHEMA = vol.Schema(
160 vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
161 vol.Optional(CONF_IGNORE_ATTRIBUTES): vol.All(cv.ensure_list, [cv.string]),
165 _INFLUX_BASE_SCHEMA = INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend(
167 vol.Optional(CONF_RETRY_COUNT, default=0): cv.positive_int,
168 vol.Optional(CONF_DEFAULT_MEASUREMENT): cv.string,
169 vol.Optional(CONF_MEASUREMENT_ATTR, default=DEFAULT_MEASUREMENT_ATTR): vol.In(
170 [
"unit_of_measurement",
"domain__device_class",
"entity_id"]
172 vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
173 vol.Optional(CONF_TAGS, default={}): vol.Schema({cv.string: cv.string}),
174 vol.Optional(CONF_TAGS_ATTRIBUTES, default=[]): vol.All(
175 cv.ensure_list, [cv.string]
177 vol.Optional(CONF_IGNORE_ATTRIBUTES, default=[]): vol.All(
178 cv.ensure_list, [cv.string]
180 vol.Optional(CONF_COMPONENT_CONFIG, default={}): vol.Schema(
181 {cv.entity_id: _CUSTOMIZE_ENTITY_SCHEMA}
183 vol.Optional(CONF_COMPONENT_CONFIG_GLOB, default={}): vol.Schema(
184 {cv.string: _CUSTOMIZE_ENTITY_SCHEMA}
186 vol.Optional(CONF_COMPONENT_CONFIG_DOMAIN, default={}): vol.Schema(
187 {cv.string: _CUSTOMIZE_ENTITY_SCHEMA}
192 INFLUX_SCHEMA = vol.All(
193 _INFLUX_BASE_SCHEMA.extend(COMPONENT_CONFIG_SCHEMA_CONNECTION),
194 validate_version_specific_config,
198 CONFIG_SCHEMA = vol.Schema(
199 {DOMAIN: INFLUX_SCHEMA},
200 extra=vol.ALLOW_EXTRA,
205 """Build event to json converter and add to config."""
207 tags = conf.get(CONF_TAGS)
208 tags_attributes: list[str] = conf[CONF_TAGS_ATTRIBUTES]
209 default_measurement = conf.get(CONF_DEFAULT_MEASUREMENT)
210 measurement_attr: str = conf[CONF_MEASUREMENT_ATTR]
211 override_measurement = conf.get(CONF_OVERRIDE_MEASUREMENT)
212 global_ignore_attributes = set(conf[CONF_IGNORE_ATTRIBUTES])
214 conf[CONF_COMPONENT_CONFIG],
215 conf[CONF_COMPONENT_CONFIG_DOMAIN],
216 conf[CONF_COMPONENT_CONFIG_GLOB],
219 def event_to_json(event: Event) -> dict[str, Any] |
None:
220 """Convert event into json in format Influx expects."""
221 state: State |
None = event.data.get(EVENT_NEW_STATE)
224 or state.state
in (STATE_UNKNOWN,
"", STATE_UNAVAILABLE,
None)
225 or not entity_filter(state.entity_id)
230 _include_state = _include_value =
False
232 _state_as_value =
float(state.state)
233 _include_value =
True
236 _state_as_value =
float(state_helper.state_as_number(state))
237 _include_state = _include_value =
True
239 _include_state =
True
243 entity_config = component_config.get(state.entity_id)
244 measurement = entity_config.get(CONF_OVERRIDE_MEASUREMENT)
245 if measurement
in (
None,
""):
246 if override_measurement:
247 measurement = override_measurement
249 if measurement_attr ==
"entity_id":
250 measurement = state.entity_id
251 elif measurement_attr ==
"domain__device_class":
252 device_class = state.attributes.get(
"device_class")
253 if device_class
is None:
255 measurement = state.domain
257 measurement = f
"{state.domain}__{device_class}"
260 measurement = state.attributes.get(measurement_attr)
261 if measurement
in (
None,
""):
262 if default_measurement:
263 measurement = default_measurement
265 measurement = state.entity_id
267 include_uom = measurement_attr !=
"unit_of_measurement"
269 json: dict[str, Any] = {
270 INFLUX_CONF_MEASUREMENT: measurement,
272 CONF_DOMAIN: state.domain,
273 CONF_ENTITY_ID: state.object_id,
275 INFLUX_CONF_TIME: event.time_fired,
276 INFLUX_CONF_FIELDS: {},
279 json[INFLUX_CONF_FIELDS][INFLUX_CONF_STATE] = state.state
281 json[INFLUX_CONF_FIELDS][INFLUX_CONF_VALUE] = _state_as_value
283 ignore_attributes = set(entity_config.get(CONF_IGNORE_ATTRIBUTES, []))
284 ignore_attributes.update(global_ignore_attributes)
285 for key, value
in state.attributes.items():
286 if key
in tags_attributes:
287 json[INFLUX_CONF_TAGS][key] = value
289 (key != CONF_UNIT_OF_MEASUREMENT
or include_uom)
290 and (key !=
"device_class" or include_dc)
291 and key
not in ignore_attributes
294 if key
in json[INFLUX_CONF_FIELDS]:
301 json[INFLUX_CONF_FIELDS][key] =
float(value)
302 except (ValueError, TypeError):
303 new_key = f
"{key}_str"
304 new_value =
str(value)
305 json[INFLUX_CONF_FIELDS][new_key] = new_value
307 if RE_DIGIT_TAIL.match(new_value):
308 json[INFLUX_CONF_FIELDS][key] =
float(
309 RE_DECIMAL.sub(
"", new_value)
313 with suppress(KeyError, TypeError):
314 if not math.isfinite(json[INFLUX_CONF_FIELDS][key]):
315 del json[INFLUX_CONF_FIELDS][key]
317 json[INFLUX_CONF_TAGS].
update(tags)
326 """An InfluxDB client wrapper for V1 or V2."""
328 data_repositories: list[str]
329 write: Callable[[str],
None]
330 query: Callable[[str, str], list[Any]]
331 close: Callable[[],
None]
335 conf, test_write=False, test_read=False
337 """Create the correct influx connection for the API version."""
339 CONF_TIMEOUT: TIMEOUT,
341 precision = conf.get(CONF_PRECISION)
343 if conf[CONF_API_VERSION] == API_VERSION_2:
344 kwargs[CONF_TIMEOUT] = TIMEOUT * 1000
345 kwargs[CONF_URL] = conf[CONF_URL]
346 kwargs[CONF_TOKEN] = conf[CONF_TOKEN]
347 kwargs[INFLUX_CONF_ORG] = conf[CONF_ORG]
348 kwargs[CONF_VERIFY_SSL] = conf[CONF_VERIFY_SSL]
349 if CONF_SSL_CA_CERT
in conf:
350 kwargs[CONF_SSL_CA_CERT] = conf[CONF_SSL_CA_CERT]
351 bucket = conf.get(CONF_BUCKET)
352 influx = InfluxDBClientV2(**kwargs)
353 query_api = influx.query_api()
354 initial_write_mode = SYNCHRONOUS
if test_write
else ASYNCHRONOUS
355 write_api = influx.write_api(write_options=initial_write_mode)
358 """Write data to V2 influx."""
359 data = {
"bucket": bucket,
"record": json}
361 if precision
is not None:
362 data[
"write_precision"] = precision
365 write_api.write(**data)
366 except (urllib3.exceptions.HTTPError, OSError)
as exc:
367 raise ConnectionError(CONNECTION_ERROR % exc)
from exc
368 except ApiException
as exc:
369 if exc.status == CODE_INVALID_INPUTS:
370 raise ValueError(WRITE_ERROR % (json, exc))
from exc
371 raise ConnectionError(CLIENT_ERROR_V2 % exc)
from exc
373 def query_v2(query, _=None):
374 """Query V2 influx."""
376 return query_api.query(query)
377 except (urllib3.exceptions.HTTPError, OSError)
as exc:
378 raise ConnectionError(CONNECTION_ERROR % exc)
from exc
379 except ApiException
as exc:
380 if exc.status == CODE_INVALID_INPUTS:
381 raise ValueError(QUERY_ERROR % (query, exc))
from exc
382 raise ConnectionError(CLIENT_ERROR_V2 % exc)
from exc
385 """Close V2 influx client."""
392 with suppress(ValueError):
394 write_api = influx.write_api(write_options=ASYNCHRONOUS)
397 tables = query_v2(TEST_QUERY_V2)
398 if tables
and tables[0].records:
399 buckets = [bucket.values[
"name"]
for bucket
in tables[0].records]
403 return InfluxClient(buckets, write_v2, query_v2, close_v2)
406 if CONF_SSL_CA_CERT
in conf
and conf[CONF_VERIFY_SSL]:
407 kwargs[CONF_VERIFY_SSL] = conf[CONF_SSL_CA_CERT]
409 kwargs[CONF_VERIFY_SSL] = conf[CONF_VERIFY_SSL]
411 if CONF_DB_NAME
in conf:
412 kwargs[CONF_DB_NAME] = conf[CONF_DB_NAME]
414 if CONF_USERNAME
in conf:
415 kwargs[CONF_USERNAME] = conf[CONF_USERNAME]
417 if CONF_PASSWORD
in conf:
418 kwargs[CONF_PASSWORD] = conf[CONF_PASSWORD]
420 if CONF_HOST
in conf:
421 kwargs[CONF_HOST] = conf[CONF_HOST]
423 if CONF_PATH
in conf:
424 kwargs[CONF_PATH] = conf[CONF_PATH]
426 if CONF_PORT
in conf:
427 kwargs[CONF_PORT] = conf[CONF_PORT]
430 kwargs[CONF_SSL] = conf[CONF_SSL]
432 influx = InfluxDBClient(**kwargs)
435 """Write data to V1 influx."""
437 influx.write_points(json, time_precision=precision)
439 requests.exceptions.RequestException,
440 exceptions.InfluxDBServerError,
443 raise ConnectionError(CONNECTION_ERROR % exc)
from exc
444 except exceptions.InfluxDBClientError
as exc:
445 if exc.code == CODE_INVALID_INPUTS:
446 raise ValueError(WRITE_ERROR % (json, exc))
from exc
447 raise ConnectionError(CLIENT_ERROR_V1 % exc)
from exc
449 def query_v1(query, database=None):
450 """Query V1 influx."""
452 return list(influx.query(query, database=database).get_points())
454 requests.exceptions.RequestException,
455 exceptions.InfluxDBServerError,
458 raise ConnectionError(CONNECTION_ERROR % exc)
from exc
459 except exceptions.InfluxDBClientError
as exc:
460 if exc.code == CODE_INVALID_INPUTS:
461 raise ValueError(QUERY_ERROR % (query, exc))
from exc
462 raise ConnectionError(CLIENT_ERROR_V1 % exc)
from exc
465 """Close the V1 Influx client."""
473 databases = [db[
"name"]
for db
in query_v1(TEST_QUERY_V1)]
475 return InfluxClient(databases, write_v1, query_v1, close_v1)
482 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
483 """Set up the InfluxDB component."""
484 conf = config[DOMAIN]
487 except ConnectionError
as exc:
488 _LOGGER.error(RETRY_MESSAGE, exc)
489 event_helper.call_later(
490 hass, RETRY_INTERVAL,
lambda _:
_retry_setup(hass, config)
495 max_tries = conf.get(CONF_RETRY_COUNT)
496 instance = hass.data[DOMAIN] =
InfluxThread(hass, influx, event_to_json, max_tries)
500 """Shut down the thread."""
501 instance.queue.put(
None)
505 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
511 """A threaded event handler class."""
513 def __init__(self, hass, influx, event_to_json, max_tries):
514 """Initialize the listener."""
515 threading.Thread.__init__(self, name=DOMAIN)
516 self.queue: queue.SimpleQueue[threading.Event | tuple[float, Event] |
None] = (
524 hass.bus.listen(EVENT_STATE_CHANGED, self.
_event_listener_event_listener)
528 """Listen for new messages on the bus and queue them for Influx."""
529 item = (time.monotonic(), event)
534 """Return number of seconds to wait for more events."""
538 """Return a batch of events formatted for writing."""
539 queue_seconds = QUEUE_BACKLOG_SECONDS + self.
max_triesmax_tries * RETRY_DELAY
546 with suppress(queue.Empty):
547 while len(json) < BATCH_BUFFER_SIZE
and not self.
shutdownshutdown:
548 timeout =
None if count == 0
else self.
batch_timeoutbatch_timeout()
549 item = self.queue.
get(timeout=timeout)
554 elif type(item)
is tuple:
555 timestamp, event = item
556 age = time.monotonic() - timestamp
558 if age < queue_seconds:
560 json.append(event_json)
563 elif isinstance(item, threading.Event):
567 _LOGGER.warning(CATCHING_UP_MESSAGE, dropped)
572 """Write preprocessed events to influxdb, with retry."""
573 for retry
in range(self.
max_triesmax_tries + 1):
575 self.
influxinflux.write(json)
578 _LOGGER.error(RESUMED_MESSAGE, self.
write_errorswrite_errors)
581 _LOGGER.debug(WROTE_MESSAGE, len(json))
583 except ValueError
as err:
586 except ConnectionError
as err:
588 time.sleep(RETRY_DELAY)
595 """Process incoming events."""
602 """Block till all events processed.
604 Currently only used for testing.
606 event = threading.Event()
607 self.queue.put(event)
def write_to_influxdb(self, json)
def __init__(self, hass, influx, event_to_json, max_tries)
def _event_listener(self, event)
def get_events_json(self)
def block_till_done(self)
web.Response get(self, web.Request request, str config_key)
InfluxClient get_influx_connection(conf, test_write=False, test_read=False)
Callable[[Event], dict[str, Any]|None] _generate_event_to_json(dict conf)
dict create_influx_url(dict conf)
None _retry_setup(HomeAssistant hass, ConfigType config)
dict validate_version_specific_config(dict conf)
bool setup(HomeAssistant hass, ConfigType config)
IssData update(pyiss.ISS iss)
EntityFilter convert_include_exclude_filter(dict[str, dict[str, list[str]]] config)