1 """Sensor from an SQL Query."""
3 from __future__
import annotations
5 from datetime
import date
11 from sqlalchemy
import lambda_stmt
12 from sqlalchemy.engine
import Result
13 from sqlalchemy.exc
import SQLAlchemyError
14 from sqlalchemy.orm
import Session, scoped_session, sessionmaker
15 from sqlalchemy.sql.lambdas
import StatementLambdaElement
16 from sqlalchemy.util
import LRUCache
30 CONF_UNIT_OF_MEASUREMENT,
32 EVENT_HOMEASSISTANT_STOP,
44 ManualTriggerSensorEntity,
48 from .const
import CONF_COLUMN_NAME, CONF_QUERY, DOMAIN
49 from .models
import SQLData
50 from .util
import redact_credentials, resolve_db_url
52 _LOGGER = logging.getLogger(__name__)
56 TRIGGER_ENTITY_OPTIONS = (
63 CONF_UNIT_OF_MEASUREMENT,
70 async_add_entities: AddEntitiesCallback,
71 discovery_info: DiscoveryInfoType |
None =
None,
73 """Set up the SQL sensor from yaml."""
74 if (conf := discovery_info)
is None:
77 name: Template = conf[CONF_NAME]
78 query_str: str = conf[CONF_QUERY]
79 value_template: Template |
None = conf.get(CONF_VALUE_TEMPLATE)
80 column_name: str = conf[CONF_COLUMN_NAME]
81 unique_id: str |
None = conf.get(CONF_UNIQUE_ID)
84 trigger_entity_config = {CONF_NAME: name}
85 for key
in TRIGGER_ENTITY_OPTIONS:
88 trigger_entity_config[key] = conf[key]
92 trigger_entity_config,
104 hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
106 """Set up the SQL sensor from config entry."""
108 db_url: str =
resolve_db_url(hass, entry.options.get(CONF_DB_URL))
109 name: str = entry.options[CONF_NAME]
110 query_str: str = entry.options[CONF_QUERY]
111 template: str |
None = entry.options.get(CONF_VALUE_TEMPLATE)
112 column_name: str = entry.options[CONF_COLUMN_NAME]
114 value_template: Template |
None =
None
115 if template
is not None:
117 value_template =
Template(template, hass)
118 value_template.ensure_valid()
119 except TemplateError:
120 value_template =
None
122 name_template =
Template(name, hass)
123 trigger_entity_config = {CONF_NAME: name_template, CONF_UNIQUE_ID: entry.entry_id}
124 for key
in TRIGGER_ENTITY_OPTIONS:
125 if key
not in entry.options:
127 trigger_entity_config[key] = entry.options[key]
131 trigger_entity_config,
144 """Get or initialize domain data."""
145 if DOMAIN
in hass.data:
146 sql_data: SQLData = hass.data[DOMAIN]
149 session_makers_by_db_url: dict[str, scoped_session] = {}
158 def _shutdown_db_engines(event: Event) ->
None:
159 """Shutdown all database engines."""
160 for sessmaker
in session_makers_by_db_url.values():
161 sessmaker.connection().engine.dispose()
163 cancel_shutdown = hass.bus.async_listen_once(
164 EVENT_HOMEASSISTANT_STOP, _shutdown_db_engines
167 sql_data =
SQLData(cancel_shutdown, session_makers_by_db_url)
168 hass.data[DOMAIN] = sql_data
174 trigger_entity_config: ConfigType,
177 value_template: Template |
None,
178 unique_id: str |
None,
181 async_add_entities: AddEntitiesCallback,
183 """Set up the SQL sensor."""
187 uses_recorder_db =
False
189 uses_recorder_db = db_url == instance.db_url
190 sessmaker: scoped_session |
None
192 use_database_executor =
False
193 if uses_recorder_db
and instance.dialect_name == SupportedDialect.SQLITE:
194 use_database_executor =
True
195 assert instance.engine
is not None
196 sessmaker = scoped_session(sessionmaker(bind=instance.engine, future=
True))
203 elif db_url
in sql_data.session_makers_by_db_url:
204 sessmaker = sql_data.session_makers_by_db_url[db_url]
205 elif sessmaker := await hass.async_add_executor_job(
206 _validate_and_get_session_maker_for_db_url, db_url
208 sql_data.session_makers_by_db_url[db_url] = sessmaker
212 upper_query = query_str.upper()
216 issue_key = unique_id
if unique_id
else redacted_query
221 "ENTITY_ID," in upper_query
or "ENTITY_ID " in upper_query
222 )
and "STATES_META" not in upper_query:
224 "The query `%s` contains the keyword `entity_id` but does not "
225 "reference the `states_meta` table. This will cause a full table "
226 "scan and database instability. Please check the documentation and use "
227 "`states_meta.entity_id` instead",
231 ir.async_create_issue(
234 f
"entity_id_query_does_full_table_scan_{issue_key}",
235 translation_key=
"entity_id_query_does_full_table_scan",
236 translation_placeholders={
"query": redacted_query},
238 severity=ir.IssueSeverity.ERROR,
241 "Query contains entity_id but does not reference states_meta"
244 ir.async_delete_issue(
245 hass, DOMAIN, f
"entity_id_query_does_full_table_scan_{issue_key}"
249 if not (
"LIMIT" in upper_query
or "SELECT TOP" in upper_query):
250 if "mssql" in db_url:
251 query_str = upper_query.replace(
"SELECT",
"SELECT TOP 1")
253 query_str = query_str.replace(
";",
"") +
" LIMIT 1;"
258 trigger_entity_config,
264 use_database_executor,
271 """Validate the db_url and return a session maker.
273 This does I/O and should be run in the executor.
275 sess: Session |
None =
None
277 engine = sqlalchemy.create_engine(db_url, future=
True)
278 sessmaker = scoped_session(sessionmaker(bind=engine, future=
True))
281 sess.execute(sqlalchemy.text(
"SELECT 1;"))
283 except SQLAlchemyError
as err:
285 "Couldn't connect using %s DB_URL: %s",
298 """Generate the lambda statement."""
299 text = sqlalchemy.text(query)
300 return lambda_stmt(
lambda: text, lambda_cache=_SQL_LAMBDA_CACHE)
304 """Representation of an SQL sensor."""
306 _unrecorded_attributes = frozenset({MATCH_ALL})
310 trigger_entity_config: ConfigType,
311 sessmaker: scoped_session,
314 value_template: Template |
None,
316 use_database_executor: bool,
318 """Initialize the SQL sensor."""
327 if not yaml
and (unique_id := trigger_entity_config.get(CONF_UNIQUE_ID)):
331 entry_type=DeviceEntryType.SERVICE,
332 identifiers={(DOMAIN, unique_id)},
338 """Call when entity about to be added to hass."""
344 """Return extra attributes."""
348 """Retrieve sensor data from the query using the right executor."""
352 data = await self.
hasshasshass.async_add_executor_job(self.
_update_update)
356 """Retrieve sensor data from the query."""
361 result: Result = sess.execute(self.
_lambda_stmt_lambda_stmt)
362 except SQLAlchemyError
as err:
364 "Error executing query %s: %s",
372 for res
in result.mappings():
373 _LOGGER.debug(
"Query %s result in %s", self.
_query_query, res.items())
375 for key, value
in res.items():
376 if isinstance(value, decimal.Decimal):
378 elif isinstance(value, date):
379 value = value.isoformat()
380 elif isinstance(value, (bytes, bytearray)):
381 value = f
"0x{value.hex()}"
384 if data
is not None and isinstance(data, (bytes, bytearray)):
385 data = f
"0x{data.hex()}"
387 if data
is not None and self.
_template_template
is not None:
389 self.
_template_template.async_render_with_possible_json_value(data,
None)
395 _LOGGER.warning(
"%s returned no results", self.
_query_query)
dict[str, Any]|None extra_state_attributes(self)
None __init__(self, ConfigType trigger_entity_config, scoped_session sessmaker, str query, str column, Template|None value_template, bool yaml, bool use_database_executor)
None async_added_to_hass(self)
_attr_extra_state_attributes
str|UndefinedType|None name(self)
None _process_manual_data(self, Any|None value=None)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
SQLData _async_get_or_init_domain_data(HomeAssistant hass)
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
StatementLambdaElement _generate_lambda_stmt(str query)
scoped_session|None _validate_and_get_session_maker_for_db_url(str db_url)
None async_setup_sensor(HomeAssistant hass, ConfigType trigger_entity_config, str query_str, str column_name, Template|None value_template, str|None unique_id, str db_url, bool yaml, AddEntitiesCallback async_add_entities)
str redact_credentials(str|None data)
str resolve_db_url(HomeAssistant hass, str|None db_url)
Recorder get_instance(HomeAssistant hass)