Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for sending data to an Influx database."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from contextlib import suppress
7 from dataclasses import dataclass
8 import logging
9 import math
10 import queue
11 import threading
12 import time
13 from typing import Any
14 
15 from influxdb import InfluxDBClient, exceptions
16 from influxdb_client import InfluxDBClient as InfluxDBClientV2
17 from influxdb_client.client.write_api import ASYNCHRONOUS, SYNCHRONOUS
18 from influxdb_client.rest import ApiException
19 import requests.exceptions
20 import urllib3.exceptions
21 import voluptuous as vol
22 
23 from homeassistant.const import (
24  CONF_DOMAIN,
25  CONF_ENTITY_ID,
26  CONF_HOST,
27  CONF_PASSWORD,
28  CONF_PATH,
29  CONF_PORT,
30  CONF_SSL,
31  CONF_TIMEOUT,
32  CONF_TOKEN,
33  CONF_UNIT_OF_MEASUREMENT,
34  CONF_URL,
35  CONF_USERNAME,
36  CONF_VERIFY_SSL,
37  EVENT_HOMEASSISTANT_STOP,
38  EVENT_STATE_CHANGED,
39  STATE_UNAVAILABLE,
40  STATE_UNKNOWN,
41 )
42 from homeassistant.core import Event, HomeAssistant, State, callback
43 from homeassistant.helpers import event as event_helper, state as state_helper
45 from homeassistant.helpers.entity_values import EntityValues
47  INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA,
48  convert_include_exclude_filter,
49 )
50 from homeassistant.helpers.typing import ConfigType
51 
52 from .const import (
53  API_VERSION_2,
54  BATCH_BUFFER_SIZE,
55  BATCH_TIMEOUT,
56  CATCHING_UP_MESSAGE,
57  CLIENT_ERROR_V1,
58  CLIENT_ERROR_V2,
59  CODE_INVALID_INPUTS,
60  COMPONENT_CONFIG_SCHEMA_CONNECTION,
61  CONF_API_VERSION,
62  CONF_BUCKET,
63  CONF_COMPONENT_CONFIG,
64  CONF_COMPONENT_CONFIG_DOMAIN,
65  CONF_COMPONENT_CONFIG_GLOB,
66  CONF_DB_NAME,
67  CONF_DEFAULT_MEASUREMENT,
68  CONF_IGNORE_ATTRIBUTES,
69  CONF_MEASUREMENT_ATTR,
70  CONF_ORG,
71  CONF_OVERRIDE_MEASUREMENT,
72  CONF_PRECISION,
73  CONF_RETRY_COUNT,
74  CONF_SSL_CA_CERT,
75  CONF_TAGS,
76  CONF_TAGS_ATTRIBUTES,
77  CONNECTION_ERROR,
78  DEFAULT_API_VERSION,
79  DEFAULT_HOST_V2,
80  DEFAULT_MEASUREMENT_ATTR,
81  DEFAULT_SSL_V2,
82  DOMAIN,
83  EVENT_NEW_STATE,
84  INFLUX_CONF_FIELDS,
85  INFLUX_CONF_MEASUREMENT,
86  INFLUX_CONF_ORG,
87  INFLUX_CONF_STATE,
88  INFLUX_CONF_TAGS,
89  INFLUX_CONF_TIME,
90  INFLUX_CONF_VALUE,
91  QUERY_ERROR,
92  QUEUE_BACKLOG_SECONDS,
93  RE_DECIMAL,
94  RE_DIGIT_TAIL,
95  RESUMED_MESSAGE,
96  RETRY_DELAY,
97  RETRY_INTERVAL,
98  RETRY_MESSAGE,
99  TEST_QUERY_V1,
100  TEST_QUERY_V2,
101  TIMEOUT,
102  WRITE_ERROR,
103  WROTE_MESSAGE,
104 )
105 
106 _LOGGER = logging.getLogger(__name__)
107 
108 
109 def create_influx_url(conf: dict) -> dict:
110  """Build URL used from config inputs and default when necessary."""
111  if conf[CONF_API_VERSION] == API_VERSION_2:
112  if CONF_SSL not in conf:
113  conf[CONF_SSL] = DEFAULT_SSL_V2
114  if CONF_HOST not in conf:
115  conf[CONF_HOST] = DEFAULT_HOST_V2
116 
117  url = conf[CONF_HOST]
118  if conf[CONF_SSL]:
119  url = f"https://{url}"
120  else:
121  url = f"http://{url}"
122 
123  if CONF_PORT in conf:
124  url = f"{url}:{conf[CONF_PORT]}"
125 
126  if CONF_PATH in conf:
127  url = f"{url}{conf[CONF_PATH]}"
128 
129  conf[CONF_URL] = url
130 
131  return conf
132 
133 
134 def validate_version_specific_config(conf: dict) -> dict:
135  """Ensure correct config fields are provided based on API version used."""
136  if conf[CONF_API_VERSION] == API_VERSION_2:
137  if CONF_TOKEN not in conf:
138  raise vol.Invalid(
139  f"{CONF_TOKEN} and {CONF_BUCKET} are required when"
140  f" {CONF_API_VERSION} is {API_VERSION_2}"
141  )
142 
143  if CONF_USERNAME in conf:
144  raise vol.Invalid(
145  f"{CONF_USERNAME} and {CONF_PASSWORD} are only allowed when"
146  f" {CONF_API_VERSION} is {DEFAULT_API_VERSION}"
147  )
148 
149  elif CONF_TOKEN in conf:
150  raise vol.Invalid(
151  f"{CONF_TOKEN} and {CONF_BUCKET} are only allowed when"
152  f" {CONF_API_VERSION} is {API_VERSION_2}"
153  )
154 
155  return conf
156 
157 
158 _CUSTOMIZE_ENTITY_SCHEMA = vol.Schema(
159  {
160  vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
161  vol.Optional(CONF_IGNORE_ATTRIBUTES): vol.All(cv.ensure_list, [cv.string]),
162  }
163 )
164 
165 _INFLUX_BASE_SCHEMA = INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend(
166  {
167  vol.Optional(CONF_RETRY_COUNT, default=0): cv.positive_int,
168  vol.Optional(CONF_DEFAULT_MEASUREMENT): cv.string,
169  vol.Optional(CONF_MEASUREMENT_ATTR, default=DEFAULT_MEASUREMENT_ATTR): vol.In(
170  ["unit_of_measurement", "domain__device_class", "entity_id"]
171  ),
172  vol.Optional(CONF_OVERRIDE_MEASUREMENT): cv.string,
173  vol.Optional(CONF_TAGS, default={}): vol.Schema({cv.string: cv.string}),
174  vol.Optional(CONF_TAGS_ATTRIBUTES, default=[]): vol.All(
175  cv.ensure_list, [cv.string]
176  ),
177  vol.Optional(CONF_IGNORE_ATTRIBUTES, default=[]): vol.All(
178  cv.ensure_list, [cv.string]
179  ),
180  vol.Optional(CONF_COMPONENT_CONFIG, default={}): vol.Schema(
181  {cv.entity_id: _CUSTOMIZE_ENTITY_SCHEMA}
182  ),
183  vol.Optional(CONF_COMPONENT_CONFIG_GLOB, default={}): vol.Schema(
184  {cv.string: _CUSTOMIZE_ENTITY_SCHEMA}
185  ),
186  vol.Optional(CONF_COMPONENT_CONFIG_DOMAIN, default={}): vol.Schema(
187  {cv.string: _CUSTOMIZE_ENTITY_SCHEMA}
188  ),
189  }
190 )
191 
192 INFLUX_SCHEMA = vol.All(
193  _INFLUX_BASE_SCHEMA.extend(COMPONENT_CONFIG_SCHEMA_CONNECTION),
194  validate_version_specific_config,
195  create_influx_url,
196 )
197 
198 CONFIG_SCHEMA = vol.Schema(
199  {DOMAIN: INFLUX_SCHEMA},
200  extra=vol.ALLOW_EXTRA,
201 )
202 
203 
204 def _generate_event_to_json(conf: dict) -> Callable[[Event], dict[str, Any] | None]:
205  """Build event to json converter and add to config."""
206  entity_filter = convert_include_exclude_filter(conf)
207  tags = conf.get(CONF_TAGS)
208  tags_attributes: list[str] = conf[CONF_TAGS_ATTRIBUTES]
209  default_measurement = conf.get(CONF_DEFAULT_MEASUREMENT)
210  measurement_attr: str = conf[CONF_MEASUREMENT_ATTR]
211  override_measurement = conf.get(CONF_OVERRIDE_MEASUREMENT)
212  global_ignore_attributes = set(conf[CONF_IGNORE_ATTRIBUTES])
213  component_config = EntityValues(
214  conf[CONF_COMPONENT_CONFIG],
215  conf[CONF_COMPONENT_CONFIG_DOMAIN],
216  conf[CONF_COMPONENT_CONFIG_GLOB],
217  )
218 
219  def event_to_json(event: Event) -> dict[str, Any] | None:
220  """Convert event into json in format Influx expects."""
221  state: State | None = event.data.get(EVENT_NEW_STATE)
222  if (
223  state is None
224  or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE, None)
225  or not entity_filter(state.entity_id)
226  ):
227  return None
228 
229  try:
230  _include_state = _include_value = False
231 
232  _state_as_value = float(state.state)
233  _include_value = True
234  except ValueError:
235  try:
236  _state_as_value = float(state_helper.state_as_number(state))
237  _include_state = _include_value = True
238  except ValueError:
239  _include_state = True
240 
241  include_uom = True
242  include_dc = True
243  entity_config = component_config.get(state.entity_id)
244  measurement = entity_config.get(CONF_OVERRIDE_MEASUREMENT)
245  if measurement in (None, ""):
246  if override_measurement:
247  measurement = override_measurement
248  else:
249  if measurement_attr == "entity_id":
250  measurement = state.entity_id
251  elif measurement_attr == "domain__device_class":
252  device_class = state.attributes.get("device_class")
253  if device_class is None:
254  # This entity doesn't have a device_class set, use only domain
255  measurement = state.domain
256  else:
257  measurement = f"{state.domain}__{device_class}"
258  include_dc = False
259  else:
260  measurement = state.attributes.get(measurement_attr)
261  if measurement in (None, ""):
262  if default_measurement:
263  measurement = default_measurement
264  else:
265  measurement = state.entity_id
266  else:
267  include_uom = measurement_attr != "unit_of_measurement"
268 
269  json: dict[str, Any] = {
270  INFLUX_CONF_MEASUREMENT: measurement,
271  INFLUX_CONF_TAGS: {
272  CONF_DOMAIN: state.domain,
273  CONF_ENTITY_ID: state.object_id,
274  },
275  INFLUX_CONF_TIME: event.time_fired,
276  INFLUX_CONF_FIELDS: {},
277  }
278  if _include_state:
279  json[INFLUX_CONF_FIELDS][INFLUX_CONF_STATE] = state.state
280  if _include_value:
281  json[INFLUX_CONF_FIELDS][INFLUX_CONF_VALUE] = _state_as_value
282 
283  ignore_attributes = set(entity_config.get(CONF_IGNORE_ATTRIBUTES, []))
284  ignore_attributes.update(global_ignore_attributes)
285  for key, value in state.attributes.items():
286  if key in tags_attributes:
287  json[INFLUX_CONF_TAGS][key] = value
288  elif (
289  (key != CONF_UNIT_OF_MEASUREMENT or include_uom)
290  and (key != "device_class" or include_dc)
291  and key not in ignore_attributes
292  ):
293  # If the key is already in fields
294  if key in json[INFLUX_CONF_FIELDS]:
295  key = f"{key}_"
296  # Prevent column data errors in influxDB.
297  # For each value we try to cast it as float
298  # But if we cannot do it we store the value
299  # as string add "_str" postfix to the field key
300  try:
301  json[INFLUX_CONF_FIELDS][key] = float(value)
302  except (ValueError, TypeError):
303  new_key = f"{key}_str"
304  new_value = str(value)
305  json[INFLUX_CONF_FIELDS][new_key] = new_value
306 
307  if RE_DIGIT_TAIL.match(new_value):
308  json[INFLUX_CONF_FIELDS][key] = float(
309  RE_DECIMAL.sub("", new_value)
310  )
311 
312  # Infinity and NaN are not valid floats in InfluxDB
313  with suppress(KeyError, TypeError):
314  if not math.isfinite(json[INFLUX_CONF_FIELDS][key]):
315  del json[INFLUX_CONF_FIELDS][key]
316 
317  json[INFLUX_CONF_TAGS].update(tags)
318 
319  return json
320 
321  return event_to_json
322 
323 
324 @dataclass
326  """An InfluxDB client wrapper for V1 or V2."""
327 
328  data_repositories: list[str]
329  write: Callable[[str], None]
330  query: Callable[[str, str], list[Any]]
331  close: Callable[[], None]
332 
333 
334 def get_influx_connection( # noqa: C901
335  conf, test_write=False, test_read=False
336 ) -> InfluxClient:
337  """Create the correct influx connection for the API version."""
338  kwargs = {
339  CONF_TIMEOUT: TIMEOUT,
340  }
341  precision = conf.get(CONF_PRECISION)
342 
343  if conf[CONF_API_VERSION] == API_VERSION_2:
344  kwargs[CONF_TIMEOUT] = TIMEOUT * 1000
345  kwargs[CONF_URL] = conf[CONF_URL]
346  kwargs[CONF_TOKEN] = conf[CONF_TOKEN]
347  kwargs[INFLUX_CONF_ORG] = conf[CONF_ORG]
348  kwargs[CONF_VERIFY_SSL] = conf[CONF_VERIFY_SSL]
349  if CONF_SSL_CA_CERT in conf:
350  kwargs[CONF_SSL_CA_CERT] = conf[CONF_SSL_CA_CERT]
351  bucket = conf.get(CONF_BUCKET)
352  influx = InfluxDBClientV2(**kwargs)
353  query_api = influx.query_api()
354  initial_write_mode = SYNCHRONOUS if test_write else ASYNCHRONOUS
355  write_api = influx.write_api(write_options=initial_write_mode)
356 
357  def write_v2(json):
358  """Write data to V2 influx."""
359  data = {"bucket": bucket, "record": json}
360 
361  if precision is not None:
362  data["write_precision"] = precision
363 
364  try:
365  write_api.write(**data)
366  except (urllib3.exceptions.HTTPError, OSError) as exc:
367  raise ConnectionError(CONNECTION_ERROR % exc) from exc
368  except ApiException as exc:
369  if exc.status == CODE_INVALID_INPUTS:
370  raise ValueError(WRITE_ERROR % (json, exc)) from exc
371  raise ConnectionError(CLIENT_ERROR_V2 % exc) from exc
372 
373  def query_v2(query, _=None):
374  """Query V2 influx."""
375  try:
376  return query_api.query(query)
377  except (urllib3.exceptions.HTTPError, OSError) as exc:
378  raise ConnectionError(CONNECTION_ERROR % exc) from exc
379  except ApiException as exc:
380  if exc.status == CODE_INVALID_INPUTS:
381  raise ValueError(QUERY_ERROR % (query, exc)) from exc
382  raise ConnectionError(CLIENT_ERROR_V2 % exc) from exc
383 
384  def close_v2():
385  """Close V2 influx client."""
386  influx.close()
387 
388  buckets = []
389  if test_write:
390  # Try to write b"" to influx. If we can connect and creds are valid
391  # Then invalid inputs is returned. Anything else is a broken config
392  with suppress(ValueError):
393  write_v2(b"")
394  write_api = influx.write_api(write_options=ASYNCHRONOUS)
395 
396  if test_read:
397  tables = query_v2(TEST_QUERY_V2)
398  if tables and tables[0].records:
399  buckets = [bucket.values["name"] for bucket in tables[0].records]
400  else:
401  buckets = []
402 
403  return InfluxClient(buckets, write_v2, query_v2, close_v2)
404 
405  # Else it's a V1 client
406  if CONF_SSL_CA_CERT in conf and conf[CONF_VERIFY_SSL]:
407  kwargs[CONF_VERIFY_SSL] = conf[CONF_SSL_CA_CERT]
408  else:
409  kwargs[CONF_VERIFY_SSL] = conf[CONF_VERIFY_SSL]
410 
411  if CONF_DB_NAME in conf:
412  kwargs[CONF_DB_NAME] = conf[CONF_DB_NAME]
413 
414  if CONF_USERNAME in conf:
415  kwargs[CONF_USERNAME] = conf[CONF_USERNAME]
416 
417  if CONF_PASSWORD in conf:
418  kwargs[CONF_PASSWORD] = conf[CONF_PASSWORD]
419 
420  if CONF_HOST in conf:
421  kwargs[CONF_HOST] = conf[CONF_HOST]
422 
423  if CONF_PATH in conf:
424  kwargs[CONF_PATH] = conf[CONF_PATH]
425 
426  if CONF_PORT in conf:
427  kwargs[CONF_PORT] = conf[CONF_PORT]
428 
429  if CONF_SSL in conf:
430  kwargs[CONF_SSL] = conf[CONF_SSL]
431 
432  influx = InfluxDBClient(**kwargs)
433 
434  def write_v1(json):
435  """Write data to V1 influx."""
436  try:
437  influx.write_points(json, time_precision=precision)
438  except (
439  requests.exceptions.RequestException,
440  exceptions.InfluxDBServerError,
441  OSError,
442  ) as exc:
443  raise ConnectionError(CONNECTION_ERROR % exc) from exc
444  except exceptions.InfluxDBClientError as exc:
445  if exc.code == CODE_INVALID_INPUTS:
446  raise ValueError(WRITE_ERROR % (json, exc)) from exc
447  raise ConnectionError(CLIENT_ERROR_V1 % exc) from exc
448 
449  def query_v1(query, database=None):
450  """Query V1 influx."""
451  try:
452  return list(influx.query(query, database=database).get_points())
453  except (
454  requests.exceptions.RequestException,
455  exceptions.InfluxDBServerError,
456  OSError,
457  ) as exc:
458  raise ConnectionError(CONNECTION_ERROR % exc) from exc
459  except exceptions.InfluxDBClientError as exc:
460  if exc.code == CODE_INVALID_INPUTS:
461  raise ValueError(QUERY_ERROR % (query, exc)) from exc
462  raise ConnectionError(CLIENT_ERROR_V1 % exc) from exc
463 
464  def close_v1():
465  """Close the V1 Influx client."""
466  influx.close()
467 
468  databases = []
469  if test_write:
470  write_v1([])
471 
472  if test_read:
473  databases = [db["name"] for db in query_v1(TEST_QUERY_V1)]
474 
475  return InfluxClient(databases, write_v1, query_v1, close_v1)
476 
477 
478 def _retry_setup(hass: HomeAssistant, config: ConfigType) -> None:
479  setup(hass, config)
480 
481 
482 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
483  """Set up the InfluxDB component."""
484  conf = config[DOMAIN]
485  try:
486  influx = get_influx_connection(conf, test_write=True)
487  except ConnectionError as exc:
488  _LOGGER.error(RETRY_MESSAGE, exc)
489  event_helper.call_later(
490  hass, RETRY_INTERVAL, lambda _: _retry_setup(hass, config)
491  )
492  return True
493 
494  event_to_json = _generate_event_to_json(conf)
495  max_tries = conf.get(CONF_RETRY_COUNT)
496  instance = hass.data[DOMAIN] = InfluxThread(hass, influx, event_to_json, max_tries)
497  instance.start()
498 
499  def shutdown(event):
500  """Shut down the thread."""
501  instance.queue.put(None)
502  instance.join()
503  influx.close()
504 
505  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
506 
507  return True
508 
509 
510 class InfluxThread(threading.Thread):
511  """A threaded event handler class."""
512 
513  def __init__(self, hass, influx, event_to_json, max_tries):
514  """Initialize the listener."""
515  threading.Thread.__init__(self, name=DOMAIN)
516  self.queue: queue.SimpleQueue[threading.Event | tuple[float, Event] | None] = (
517  queue.SimpleQueue()
518  )
519  self.influxinflux = influx
520  self.event_to_jsonevent_to_json = event_to_json
521  self.max_triesmax_tries = max_tries
522  self.write_errorswrite_errors = 0
523  self.shutdownshutdown = False
524  hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener_event_listener)
525 
526  @callback
527  def _event_listener(self, event):
528  """Listen for new messages on the bus and queue them for Influx."""
529  item = (time.monotonic(), event)
530  self.queue.put(item)
531 
532  @staticmethod
534  """Return number of seconds to wait for more events."""
535  return BATCH_TIMEOUT
536 
537  def get_events_json(self):
538  """Return a batch of events formatted for writing."""
539  queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_triesmax_tries * RETRY_DELAY
540 
541  count = 0
542  json = []
543 
544  dropped = 0
545 
546  with suppress(queue.Empty):
547  while len(json) < BATCH_BUFFER_SIZE and not self.shutdownshutdown:
548  timeout = None if count == 0 else self.batch_timeoutbatch_timeout()
549  item = self.queue.get(timeout=timeout)
550  count += 1
551 
552  if item is None:
553  self.shutdownshutdown = True
554  elif type(item) is tuple:
555  timestamp, event = item
556  age = time.monotonic() - timestamp
557 
558  if age < queue_seconds:
559  if event_json := self.event_to_jsonevent_to_json(event):
560  json.append(event_json)
561  else:
562  dropped += 1
563  elif isinstance(item, threading.Event):
564  item.set()
565 
566  if dropped:
567  _LOGGER.warning(CATCHING_UP_MESSAGE, dropped)
568 
569  return count, json
570 
571  def write_to_influxdb(self, json):
572  """Write preprocessed events to influxdb, with retry."""
573  for retry in range(self.max_triesmax_tries + 1):
574  try:
575  self.influxinflux.write(json)
576 
577  if self.write_errorswrite_errors:
578  _LOGGER.error(RESUMED_MESSAGE, self.write_errorswrite_errors)
579  self.write_errorswrite_errors = 0
580 
581  _LOGGER.debug(WROTE_MESSAGE, len(json))
582  break
583  except ValueError as err:
584  _LOGGER.error(err)
585  break
586  except ConnectionError as err:
587  if retry < self.max_triesmax_tries:
588  time.sleep(RETRY_DELAY)
589  else:
590  if not self.write_errorswrite_errors:
591  _LOGGER.error(err)
592  self.write_errorswrite_errors += len(json)
593 
594  def run(self):
595  """Process incoming events."""
596  while not self.shutdownshutdown:
597  _, json = self.get_events_jsonget_events_json()
598  if json:
599  self.write_to_influxdbwrite_to_influxdb(json)
600 
601  def block_till_done(self):
602  """Block till all events processed.
603 
604  Currently only used for testing.
605  """
606  event = threading.Event()
607  self.queue.put(event)
608  event.wait()
def __init__(self, hass, influx, event_to_json, max_tries)
Definition: __init__.py:513
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
InfluxClient get_influx_connection(conf, test_write=False, test_read=False)
Definition: __init__.py:336
Callable[[Event], dict[str, Any]|None] _generate_event_to_json(dict conf)
Definition: __init__.py:204
dict create_influx_url(dict conf)
Definition: __init__.py:109
None _retry_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:478
dict validate_version_specific_config(dict conf)
Definition: __init__.py:134
bool setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:482
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
EntityFilter convert_include_exclude_filter(dict[str, dict[str, list[str]]] config)