1 """InfluxDB component which allows you to get data from an Influx database."""
3 from __future__
import annotations
7 from typing
import Final
9 import voluptuous
as vol
12 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
20 CONF_UNIT_OF_MEASUREMENT,
22 EVENT_HOMEASSISTANT_STOP,
31 from .
import create_influx_url, get_influx_connection, validate_version_specific_config
34 COMPONENT_CONFIG_SCHEMA_CONNECTION,
40 CONF_MEASUREMENT_NAME,
49 DEFAULT_FUNCTION_FLUX,
50 DEFAULT_GROUP_FUNCTION,
57 MIN_TIME_BETWEEN_UPDATES,
60 QUERY_MULTIPLE_RESULTS_MESSAGE,
61 QUERY_NO_RESULTS_MESSAGE,
62 RENDERING_QUERY_ERROR_MESSAGE,
63 RENDERING_QUERY_MESSAGE,
64 RENDERING_WHERE_ERROR_MESSAGE,
65 RENDERING_WHERE_MESSAGE,
66 RUNNING_QUERY_MESSAGE,
69 _LOGGER = logging.getLogger(__name__)
71 SCAN_INTERVAL: Final = datetime.timedelta(seconds=60)
75 """Merge connection details into each configured query."""
77 if key
not in query
and key
not in [CONF_QUERIES, CONF_QUERIES_FLUX]:
78 query[key] = conf[key]
82 """Ensure queries are provided in correct format based on API version."""
83 if conf[CONF_API_VERSION] == API_VERSION_2:
84 if CONF_QUERIES_FLUX
not in conf:
86 f
"{CONF_QUERIES_FLUX} is required when {CONF_API_VERSION} is"
90 for query
in conf[CONF_QUERIES_FLUX]:
92 query[CONF_LANGUAGE] = LANGUAGE_FLUX
97 if CONF_QUERIES
not in conf:
99 f
"{CONF_QUERIES} is required when {CONF_API_VERSION} is"
100 f
" {DEFAULT_API_VERSION}"
103 for query
in conf[CONF_QUERIES]:
105 query[CONF_LANGUAGE] = LANGUAGE_INFLUXQL
107 del conf[CONF_DB_NAME]
112 _QUERY_SENSOR_SCHEMA = vol.Schema(
114 vol.Required(CONF_NAME): cv.string,
115 vol.Optional(CONF_UNIQUE_ID): cv.string,
116 vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
117 vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
122 LANGUAGE_INFLUXQL: _QUERY_SENSOR_SCHEMA.extend(
124 vol.Optional(CONF_DB_NAME): cv.string,
125 vol.Required(CONF_MEASUREMENT_NAME): cv.string,
127 CONF_GROUP_FUNCTION, default=DEFAULT_GROUP_FUNCTION
129 vol.Optional(CONF_FIELD, default=DEFAULT_FIELD): cv.string,
130 vol.Required(CONF_WHERE): cv.template,
133 LANGUAGE_FLUX: _QUERY_SENSOR_SCHEMA.extend(
135 vol.Optional(CONF_BUCKET): cv.string,
136 vol.Optional(CONF_RANGE_START, default=DEFAULT_RANGE_START): cv.string,
137 vol.Optional(CONF_RANGE_STOP, default=DEFAULT_RANGE_STOP): cv.string,
138 vol.Required(CONF_QUERY): cv.template,
139 vol.Optional(CONF_IMPORTS): vol.All(cv.ensure_list, [cv.string]),
140 vol.Optional(CONF_GROUP_FUNCTION): cv.string,
145 PLATFORM_SCHEMA = vol.All(
146 SENSOR_PLATFORM_SCHEMA.extend(COMPONENT_CONFIG_SCHEMA_CONNECTION).extend(
148 vol.Exclusive(CONF_QUERIES,
"queries"): [_QUERY_SCHEMA[LANGUAGE_INFLUXQL]],
149 vol.Exclusive(CONF_QUERIES_FLUX,
"queries"): [_QUERY_SCHEMA[LANGUAGE_FLUX]],
152 validate_version_specific_config,
153 validate_query_format_for_version,
161 add_entities: AddEntitiesCallback,
162 discovery_info: DiscoveryInfoType |
None =
None,
164 """Set up the InfluxDB component."""
167 except ConnectionError
as exc:
169 raise PlatformNotReady
from exc
172 if CONF_QUERIES_FLUX
in config:
173 for query
in config[CONF_QUERIES_FLUX]:
174 if query[CONF_BUCKET]
in influx.data_repositories:
177 _LOGGER.error(NO_BUCKET_ERROR, query[CONF_BUCKET])
179 for query
in config[CONF_QUERIES]:
180 if query[CONF_DB_NAME]
in influx.data_repositories:
183 _LOGGER.error(NO_DATABASE_ERROR, query[CONF_DB_NAME])
187 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP,
lambda _: influx.close())
191 """Implementation of a Influxdb sensor."""
194 """Initialize the sensor."""
195 self.
_name_name = query.get(CONF_NAME)
202 if query[CONF_LANGUAGE] == LANGUAGE_FLUX:
205 query.get(CONF_BUCKET),
206 query.get(CONF_RANGE_START),
207 query.get(CONF_RANGE_STOP),
208 query.get(CONF_QUERY),
209 query.get(CONF_IMPORTS),
210 query.get(CONF_GROUP_FUNCTION),
216 query.get(CONF_DB_NAME),
217 query.get(CONF_GROUP_FUNCTION),
218 query.get(CONF_FIELD),
219 query.get(CONF_MEASUREMENT_NAME),
220 query.get(CONF_WHERE),
225 """Return the name of the sensor."""
226 return self.
_name_name
230 """Return the state of the sensor."""
235 """Return the unit of measurement of this entity, if any."""
239 """Get the latest data from Influxdb and updates the states."""
241 if (value := self.
datadata.value)
is None:
244 value = self.
_value_template_value_template.render_with_possible_json_value(
252 """Class for handling the data retrieval from Influx with Flux query."""
254 def __init__(self, influx, bucket, range_start, range_stop, query, imports, group):
255 """Initialize the data object."""
267 f
'from(bucket:"{bucket}") |> range(start: {range_start}, stop:'
270 if imports
is not None:
272 self.
query_prefixquery_prefix = f
'import "{i}" {self.query_prefix}'
277 self.
query_postfixquery_postfix = f
'|> {group}(column: "{INFLUX_CONF_VALUE_V2}")'
279 @Throttle(MIN_TIME_BETWEEN_UPDATES)
281 """Get the latest data by querying influx."""
282 _LOGGER.debug(RENDERING_QUERY_MESSAGE, self.
queryquery)
284 rendered_query = self.
queryquery.render(parse_result=
False)
285 except TemplateError
as ex:
286 _LOGGER.error(RENDERING_QUERY_ERROR_MESSAGE, ex)
289 self.
full_queryfull_query = f
"{self.query_prefix} {rendered_query} {self.query_postfix}"
291 _LOGGER.debug(RUNNING_QUERY_MESSAGE, self.
full_queryfull_query)
295 except (ConnectionError, ValueError)
as exc:
297 self.
valuevalue =
None
301 _LOGGER.warning(QUERY_NO_RESULTS_MESSAGE, self.
full_queryfull_query)
302 self.
valuevalue =
None
304 if len(tables) > 1
or len(tables[0].records) > 1:
305 _LOGGER.warning(QUERY_MULTIPLE_RESULTS_MESSAGE, self.
full_queryfull_query)
306 self.
valuevalue = tables[0].records[0].values[INFLUX_CONF_VALUE_V2]
310 """Class for handling the data retrieval with v1 API."""
312 def __init__(self, influx, db_name, group, field, measurement, where):
313 """Initialize the data object."""
323 @Throttle(MIN_TIME_BETWEEN_UPDATES)
325 """Get the latest data with a shell command."""
326 _LOGGER.debug(RENDERING_WHERE_MESSAGE, self.
wherewhere)
328 where_clause = self.
wherewhere.render(parse_result=
False)
329 except TemplateError
as ex:
330 _LOGGER.error(RENDERING_WHERE_ERROR_MESSAGE, ex)
334 f
"select {self.group}({self.field}) as {INFLUX_CONF_VALUE} from"
335 f
" {self.measurement} where {where_clause}"
338 _LOGGER.debug(RUNNING_QUERY_MESSAGE, self.
queryquery)
342 except (ConnectionError, ValueError)
as exc:
344 self.
valuevalue =
None
348 _LOGGER.warning(QUERY_NO_RESULTS_MESSAGE, self.
queryquery)
349 self.
valuevalue =
None
352 _LOGGER.warning(QUERY_MULTIPLE_RESULTS_MESSAGE, self.
queryquery)
353 self.
valuevalue = points[0].
get(INFLUX_CONF_VALUE)
def __init__(self, influx, bucket, range_start, range_stop, query, imports, group)
def __init__(self, influx, db_name, group, field, measurement, where)
def native_unit_of_measurement(self)
def __init__(self, hass, influx, query)
web.Response get(self, web.Request request, str config_key)
def add_entities(account, async_add_entities, tracked)
dict validate_query_format_for_version(dict conf)
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
def _merge_connection_config_into_query(conf, query)
InfluxClient get_influx_connection(conf, test_write=False, test_read=False)