Home Assistant Unofficial Reference 2024.12.1
sensor.py
Go to the documentation of this file.
1 """InfluxDB component which allows you to get data from an Influx database."""
2 
3 from __future__ import annotations
4 
5 import datetime
6 import logging
7 from typing import Final
8 
9 import voluptuous as vol
10 
12  PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
13  SensorEntity,
14 )
15 from homeassistant.const import (
16  CONF_API_VERSION,
17  CONF_LANGUAGE,
18  CONF_NAME,
19  CONF_UNIQUE_ID,
20  CONF_UNIT_OF_MEASUREMENT,
21  CONF_VALUE_TEMPLATE,
22  EVENT_HOMEASSISTANT_STOP,
23 )
24 from homeassistant.core import HomeAssistant
25 from homeassistant.exceptions import PlatformNotReady, TemplateError
27 from homeassistant.helpers.entity_platform import AddEntitiesCallback
28 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
29 from homeassistant.util import Throttle
30 
31 from . import create_influx_url, get_influx_connection, validate_version_specific_config
32 from .const import (
33  API_VERSION_2,
34  COMPONENT_CONFIG_SCHEMA_CONNECTION,
35  CONF_BUCKET,
36  CONF_DB_NAME,
37  CONF_FIELD,
38  CONF_GROUP_FUNCTION,
39  CONF_IMPORTS,
40  CONF_MEASUREMENT_NAME,
41  CONF_QUERIES,
42  CONF_QUERIES_FLUX,
43  CONF_QUERY,
44  CONF_RANGE_START,
45  CONF_RANGE_STOP,
46  CONF_WHERE,
47  DEFAULT_API_VERSION,
48  DEFAULT_FIELD,
49  DEFAULT_FUNCTION_FLUX,
50  DEFAULT_GROUP_FUNCTION,
51  DEFAULT_RANGE_START,
52  DEFAULT_RANGE_STOP,
53  INFLUX_CONF_VALUE,
54  INFLUX_CONF_VALUE_V2,
55  LANGUAGE_FLUX,
56  LANGUAGE_INFLUXQL,
57  MIN_TIME_BETWEEN_UPDATES,
58  NO_BUCKET_ERROR,
59  NO_DATABASE_ERROR,
60  QUERY_MULTIPLE_RESULTS_MESSAGE,
61  QUERY_NO_RESULTS_MESSAGE,
62  RENDERING_QUERY_ERROR_MESSAGE,
63  RENDERING_QUERY_MESSAGE,
64  RENDERING_WHERE_ERROR_MESSAGE,
65  RENDERING_WHERE_MESSAGE,
66  RUNNING_QUERY_MESSAGE,
67 )
68 
69 _LOGGER = logging.getLogger(__name__)
70 
71 SCAN_INTERVAL: Final = datetime.timedelta(seconds=60)
72 
73 
75  """Merge connection details into each configured query."""
76  for key in conf:
77  if key not in query and key not in [CONF_QUERIES, CONF_QUERIES_FLUX]:
78  query[key] = conf[key]
79 
80 
81 def validate_query_format_for_version(conf: dict) -> dict:
82  """Ensure queries are provided in correct format based on API version."""
83  if conf[CONF_API_VERSION] == API_VERSION_2:
84  if CONF_QUERIES_FLUX not in conf:
85  raise vol.Invalid(
86  f"{CONF_QUERIES_FLUX} is required when {CONF_API_VERSION} is"
87  f" {API_VERSION_2}"
88  )
89 
90  for query in conf[CONF_QUERIES_FLUX]:
92  query[CONF_LANGUAGE] = LANGUAGE_FLUX
93 
94  del conf[CONF_BUCKET]
95 
96  else:
97  if CONF_QUERIES not in conf:
98  raise vol.Invalid(
99  f"{CONF_QUERIES} is required when {CONF_API_VERSION} is"
100  f" {DEFAULT_API_VERSION}"
101  )
102 
103  for query in conf[CONF_QUERIES]:
105  query[CONF_LANGUAGE] = LANGUAGE_INFLUXQL
106 
107  del conf[CONF_DB_NAME]
108 
109  return conf
110 
111 
112 _QUERY_SENSOR_SCHEMA = vol.Schema(
113  {
114  vol.Required(CONF_NAME): cv.string,
115  vol.Optional(CONF_UNIQUE_ID): cv.string,
116  vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
117  vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
118  }
119 )
120 
121 _QUERY_SCHEMA = {
122  LANGUAGE_INFLUXQL: _QUERY_SENSOR_SCHEMA.extend(
123  {
124  vol.Optional(CONF_DB_NAME): cv.string,
125  vol.Required(CONF_MEASUREMENT_NAME): cv.string,
126  vol.Optional(
127  CONF_GROUP_FUNCTION, default=DEFAULT_GROUP_FUNCTION
128  ): cv.string,
129  vol.Optional(CONF_FIELD, default=DEFAULT_FIELD): cv.string,
130  vol.Required(CONF_WHERE): cv.template,
131  }
132  ),
133  LANGUAGE_FLUX: _QUERY_SENSOR_SCHEMA.extend(
134  {
135  vol.Optional(CONF_BUCKET): cv.string,
136  vol.Optional(CONF_RANGE_START, default=DEFAULT_RANGE_START): cv.string,
137  vol.Optional(CONF_RANGE_STOP, default=DEFAULT_RANGE_STOP): cv.string,
138  vol.Required(CONF_QUERY): cv.template,
139  vol.Optional(CONF_IMPORTS): vol.All(cv.ensure_list, [cv.string]),
140  vol.Optional(CONF_GROUP_FUNCTION): cv.string,
141  }
142  ),
143 }
144 
145 PLATFORM_SCHEMA = vol.All(
146  SENSOR_PLATFORM_SCHEMA.extend(COMPONENT_CONFIG_SCHEMA_CONNECTION).extend(
147  {
148  vol.Exclusive(CONF_QUERIES, "queries"): [_QUERY_SCHEMA[LANGUAGE_INFLUXQL]],
149  vol.Exclusive(CONF_QUERIES_FLUX, "queries"): [_QUERY_SCHEMA[LANGUAGE_FLUX]],
150  }
151  ),
152  validate_version_specific_config,
153  validate_query_format_for_version,
154  create_influx_url,
155 )
156 
157 
159  hass: HomeAssistant,
160  config: ConfigType,
161  add_entities: AddEntitiesCallback,
162  discovery_info: DiscoveryInfoType | None = None,
163 ) -> None:
164  """Set up the InfluxDB component."""
165  try:
166  influx = get_influx_connection(config, test_read=True)
167  except ConnectionError as exc:
168  _LOGGER.error(exc)
169  raise PlatformNotReady from exc
170 
171  entities = []
172  if CONF_QUERIES_FLUX in config:
173  for query in config[CONF_QUERIES_FLUX]:
174  if query[CONF_BUCKET] in influx.data_repositories:
175  entities.append(InfluxSensor(hass, influx, query))
176  else:
177  _LOGGER.error(NO_BUCKET_ERROR, query[CONF_BUCKET])
178  else:
179  for query in config[CONF_QUERIES]:
180  if query[CONF_DB_NAME] in influx.data_repositories:
181  entities.append(InfluxSensor(hass, influx, query))
182  else:
183  _LOGGER.error(NO_DATABASE_ERROR, query[CONF_DB_NAME])
184 
185  add_entities(entities, update_before_add=True)
186 
187  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, lambda _: influx.close())
188 
189 
191  """Implementation of a Influxdb sensor."""
192 
193  def __init__(self, hass, influx, query):
194  """Initialize the sensor."""
195  self._name_name = query.get(CONF_NAME)
196  self._unit_of_measurement_unit_of_measurement = query.get(CONF_UNIT_OF_MEASUREMENT)
197  self._value_template_value_template = query.get(CONF_VALUE_TEMPLATE)
198  self._state_state = None
199  self._hass_hass = hass
200  self._attr_unique_id_attr_unique_id = query.get(CONF_UNIQUE_ID)
201 
202  if query[CONF_LANGUAGE] == LANGUAGE_FLUX:
204  influx,
205  query.get(CONF_BUCKET),
206  query.get(CONF_RANGE_START),
207  query.get(CONF_RANGE_STOP),
208  query.get(CONF_QUERY),
209  query.get(CONF_IMPORTS),
210  query.get(CONF_GROUP_FUNCTION),
211  )
212 
213  else:
214  self.datadata = InfluxQLSensorData(
215  influx,
216  query.get(CONF_DB_NAME),
217  query.get(CONF_GROUP_FUNCTION),
218  query.get(CONF_FIELD),
219  query.get(CONF_MEASUREMENT_NAME),
220  query.get(CONF_WHERE),
221  )
222 
223  @property
224  def name(self):
225  """Return the name of the sensor."""
226  return self._name_name
227 
228  @property
229  def native_value(self):
230  """Return the state of the sensor."""
231  return self._state_state
232 
233  @property
235  """Return the unit of measurement of this entity, if any."""
236  return self._unit_of_measurement_unit_of_measurement
237 
238  def update(self) -> None:
239  """Get the latest data from Influxdb and updates the states."""
240  self.datadata.update()
241  if (value := self.datadata.value) is None:
242  value = None
243  if self._value_template_value_template is not None:
244  value = self._value_template_value_template.render_with_possible_json_value(
245  str(value), None
246  )
247 
248  self._state_state = value
249 
250 
252  """Class for handling the data retrieval from Influx with Flux query."""
253 
254  def __init__(self, influx, bucket, range_start, range_stop, query, imports, group):
255  """Initialize the data object."""
256  self.influxinflux = influx
257  self.bucketbucket = bucket
258  self.range_startrange_start = range_start
259  self.range_stoprange_stop = range_stop
260  self.queryquery = query
261  self.importsimports = imports
262  self.groupgroup = group
263  self.valuevalue = None
264  self.full_queryfull_query = None
265 
266  self.query_prefixquery_prefix = (
267  f'from(bucket:"{bucket}") |> range(start: {range_start}, stop:'
268  f" {range_stop}) |>"
269  )
270  if imports is not None:
271  for i in imports:
272  self.query_prefixquery_prefix = f'import "{i}" {self.query_prefix}'
273 
274  if group is None:
275  self.query_postfixquery_postfix = DEFAULT_FUNCTION_FLUX
276  else:
277  self.query_postfixquery_postfix = f'|> {group}(column: "{INFLUX_CONF_VALUE_V2}")'
278 
279  @Throttle(MIN_TIME_BETWEEN_UPDATES)
280  def update(self):
281  """Get the latest data by querying influx."""
282  _LOGGER.debug(RENDERING_QUERY_MESSAGE, self.queryquery)
283  try:
284  rendered_query = self.queryquery.render(parse_result=False)
285  except TemplateError as ex:
286  _LOGGER.error(RENDERING_QUERY_ERROR_MESSAGE, ex)
287  return
288 
289  self.full_queryfull_query = f"{self.query_prefix} {rendered_query} {self.query_postfix}"
290 
291  _LOGGER.debug(RUNNING_QUERY_MESSAGE, self.full_queryfull_query)
292 
293  try:
294  tables = self.influxinflux.query(self.full_queryfull_query)
295  except (ConnectionError, ValueError) as exc:
296  _LOGGER.error(exc)
297  self.valuevalue = None
298  return
299 
300  if not tables:
301  _LOGGER.warning(QUERY_NO_RESULTS_MESSAGE, self.full_queryfull_query)
302  self.valuevalue = None
303  else:
304  if len(tables) > 1 or len(tables[0].records) > 1:
305  _LOGGER.warning(QUERY_MULTIPLE_RESULTS_MESSAGE, self.full_queryfull_query)
306  self.valuevalue = tables[0].records[0].values[INFLUX_CONF_VALUE_V2]
307 
308 
310  """Class for handling the data retrieval with v1 API."""
311 
312  def __init__(self, influx, db_name, group, field, measurement, where):
313  """Initialize the data object."""
314  self.influxinflux = influx
315  self.db_namedb_name = db_name
316  self.groupgroup = group
317  self.fieldfield = field
318  self.measurementmeasurement = measurement
319  self.wherewhere = where
320  self.valuevalue = None
321  self.queryquery = None
322 
323  @Throttle(MIN_TIME_BETWEEN_UPDATES)
324  def update(self):
325  """Get the latest data with a shell command."""
326  _LOGGER.debug(RENDERING_WHERE_MESSAGE, self.wherewhere)
327  try:
328  where_clause = self.wherewhere.render(parse_result=False)
329  except TemplateError as ex:
330  _LOGGER.error(RENDERING_WHERE_ERROR_MESSAGE, ex)
331  return
332 
333  self.queryquery = (
334  f"select {self.group}({self.field}) as {INFLUX_CONF_VALUE} from" # noqa: S608
335  f" {self.measurement} where {where_clause}"
336  )
337 
338  _LOGGER.debug(RUNNING_QUERY_MESSAGE, self.queryquery)
339 
340  try:
341  points = self.influxinflux.query(self.queryquery, self.db_namedb_name)
342  except (ConnectionError, ValueError) as exc:
343  _LOGGER.error(exc)
344  self.valuevalue = None
345  return
346 
347  if not points:
348  _LOGGER.warning(QUERY_NO_RESULTS_MESSAGE, self.queryquery)
349  self.valuevalue = None
350  else:
351  if len(points) > 1:
352  _LOGGER.warning(QUERY_MULTIPLE_RESULTS_MESSAGE, self.queryquery)
353  self.valuevalue = points[0].get(INFLUX_CONF_VALUE)
def __init__(self, influx, bucket, range_start, range_stop, query, imports, group)
Definition: sensor.py:254
def __init__(self, influx, db_name, group, field, measurement, where)
Definition: sensor.py:312
def __init__(self, hass, influx, query)
Definition: sensor.py:193
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
def add_entities(account, async_add_entities, tracked)
Definition: sensor.py:40
dict validate_query_format_for_version(dict conf)
Definition: sensor.py:81
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
Definition: sensor.py:163
def _merge_connection_config_into_query(conf, query)
Definition: sensor.py:74
InfluxClient get_influx_connection(conf, test_write=False, test_read=False)
Definition: __init__.py:336