Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Adds config flow for SQL integration."""
2 
3 from __future__ import annotations
4 
5 import logging
6 from typing import Any
7 
8 import sqlalchemy
9 from sqlalchemy.engine import Result
10 from sqlalchemy.exc import MultipleResultsFound, NoSuchColumnError, SQLAlchemyError
11 from sqlalchemy.orm import Session, scoped_session, sessionmaker
12 import sqlparse
13 from sqlparse.exceptions import SQLParseError
14 import voluptuous as vol
15 
16 from homeassistant.components.recorder import CONF_DB_URL, get_instance
18  CONF_STATE_CLASS,
19  SensorDeviceClass,
20  SensorStateClass,
21 )
22 from homeassistant.config_entries import (
23  ConfigEntry,
24  ConfigFlow,
25  ConfigFlowResult,
26  OptionsFlow,
27 )
28 from homeassistant.const import (
29  CONF_DEVICE_CLASS,
30  CONF_NAME,
31  CONF_UNIT_OF_MEASUREMENT,
32  CONF_VALUE_TEMPLATE,
33 )
34 from homeassistant.core import callback
35 from homeassistant.helpers import selector
36 
37 from .const import CONF_COLUMN_NAME, CONF_QUERY, DOMAIN
38 from .util import resolve_db_url
39 
40 _LOGGER = logging.getLogger(__name__)
41 
42 
43 OPTIONS_SCHEMA: vol.Schema = vol.Schema(
44  {
45  vol.Optional(
46  CONF_DB_URL,
47  ): selector.TextSelector(),
48  vol.Required(
49  CONF_COLUMN_NAME,
50  ): selector.TextSelector(),
51  vol.Required(
52  CONF_QUERY,
53  ): selector.TextSelector(selector.TextSelectorConfig(multiline=True)),
54  vol.Optional(
55  CONF_UNIT_OF_MEASUREMENT,
56  ): selector.TextSelector(),
57  vol.Optional(
58  CONF_VALUE_TEMPLATE,
59  ): selector.TemplateSelector(),
60  vol.Optional(CONF_DEVICE_CLASS): selector.SelectSelector(
61  selector.SelectSelectorConfig(
62  options=[
63  cls.value
64  for cls in SensorDeviceClass
65  if cls != SensorDeviceClass.ENUM
66  ],
67  mode=selector.SelectSelectorMode.DROPDOWN,
68  translation_key="device_class",
69  sort=True,
70  )
71  ),
72  vol.Optional(CONF_STATE_CLASS): selector.SelectSelector(
73  selector.SelectSelectorConfig(
74  options=[cls.value for cls in SensorStateClass],
75  mode=selector.SelectSelectorMode.DROPDOWN,
76  translation_key="state_class",
77  sort=True,
78  )
79  ),
80  }
81 )
82 
83 CONFIG_SCHEMA: vol.Schema = vol.Schema(
84  {
85  vol.Required(CONF_NAME, default="Select SQL Query"): selector.TextSelector(),
86  }
87 ).extend(OPTIONS_SCHEMA.schema)
88 
89 
90 def validate_sql_select(value: str) -> str:
91  """Validate that value is a SQL SELECT query."""
92  if len(query := sqlparse.parse(value.lstrip().lstrip(";"))) > 1:
93  raise MultipleResultsFound
94  if len(query) == 0 or (query_type := query[0].get_type()) == "UNKNOWN":
95  raise ValueError
96  if query_type != "SELECT":
97  _LOGGER.debug("The SQL query %s is of type %s", query, query_type)
98  raise SQLParseError
99  return str(query[0])
100 
101 
102 def validate_query(db_url: str, query: str, column: str) -> bool:
103  """Validate SQL query."""
104 
105  engine = sqlalchemy.create_engine(db_url, future=True)
106  sessmaker = scoped_session(sessionmaker(bind=engine, future=True))
107  sess: Session = sessmaker()
108 
109  try:
110  result: Result = sess.execute(sqlalchemy.text(query))
111  except SQLAlchemyError as error:
112  _LOGGER.debug("Execution error %s", error)
113  if sess:
114  sess.close()
115  engine.dispose()
116  raise ValueError(error) from error
117 
118  for res in result.mappings():
119  if column not in res:
120  _LOGGER.debug("Column `%s` is not returned by the query", column)
121  if sess:
122  sess.close()
123  engine.dispose()
124  raise NoSuchColumnError(f"Column {column} is not returned by the query.")
125 
126  data = res[column]
127  _LOGGER.debug("Return value from query: %s", data)
128 
129  if sess:
130  sess.close()
131  engine.dispose()
132 
133  return True
134 
135 
136 class SQLConfigFlow(ConfigFlow, domain=DOMAIN):
137  """Handle a config flow for SQL integration."""
138 
139  VERSION = 1
140 
141  @staticmethod
142  @callback
144  config_entry: ConfigEntry,
145  ) -> SQLOptionsFlowHandler:
146  """Get the options flow for this handler."""
147  return SQLOptionsFlowHandler()
148 
149  async def async_step_user(
150  self, user_input: dict[str, Any] | None = None
151  ) -> ConfigFlowResult:
152  """Handle the user step."""
153  errors = {}
154  description_placeholders = {}
155 
156  if user_input is not None:
157  db_url = user_input.get(CONF_DB_URL)
158  query = user_input[CONF_QUERY]
159  column = user_input[CONF_COLUMN_NAME]
160  db_url_for_validation = None
161 
162  try:
163  query = validate_sql_select(query)
164  db_url_for_validation = resolve_db_url(self.hass, db_url)
165  await self.hass.async_add_executor_job(
166  validate_query, db_url_for_validation, query, column
167  )
168  except NoSuchColumnError:
169  errors["column"] = "column_invalid"
170  description_placeholders = {"column": column}
171  except MultipleResultsFound:
172  errors["query"] = "multiple_queries"
173  except SQLAlchemyError:
174  errors["db_url"] = "db_url_invalid"
175  except SQLParseError:
176  errors["query"] = "query_no_read_only"
177  except ValueError as err:
178  _LOGGER.debug("Invalid query: %s", err)
179  errors["query"] = "query_invalid"
180 
181  options = {
182  CONF_QUERY: query,
183  CONF_COLUMN_NAME: column,
184  CONF_NAME: user_input[CONF_NAME],
185  }
186  if uom := user_input.get(CONF_UNIT_OF_MEASUREMENT):
187  options[CONF_UNIT_OF_MEASUREMENT] = uom
188  if value_template := user_input.get(CONF_VALUE_TEMPLATE):
189  options[CONF_VALUE_TEMPLATE] = value_template
190  if device_class := user_input.get(CONF_DEVICE_CLASS):
191  options[CONF_DEVICE_CLASS] = device_class
192  if state_class := user_input.get(CONF_STATE_CLASS):
193  options[CONF_STATE_CLASS] = state_class
194  if db_url_for_validation != get_instance(self.hass).db_url:
195  options[CONF_DB_URL] = db_url_for_validation
196 
197  if not errors:
198  return self.async_create_entryasync_create_entryasync_create_entry(
199  title=user_input[CONF_NAME],
200  data={},
201  options=options,
202  )
203 
204  return self.async_show_formasync_show_formasync_show_form(
205  step_id="user",
206  data_schema=self.add_suggested_values_to_schemaadd_suggested_values_to_schema(CONFIG_SCHEMA, user_input),
207  errors=errors,
208  description_placeholders=description_placeholders,
209  )
210 
211 
213  """Handle SQL options."""
214 
215  async def async_step_init(
216  self, user_input: dict[str, Any] | None = None
217  ) -> ConfigFlowResult:
218  """Manage SQL options."""
219  errors = {}
220  description_placeholders = {}
221 
222  if user_input is not None:
223  db_url = user_input.get(CONF_DB_URL)
224  query = user_input[CONF_QUERY]
225  column = user_input[CONF_COLUMN_NAME]
226  name = self.config_entryconfig_entryconfig_entry.options.get(CONF_NAME, self.config_entryconfig_entryconfig_entry.title)
227 
228  try:
229  query = validate_sql_select(query)
230  db_url_for_validation = resolve_db_url(self.hass, db_url)
231  await self.hass.async_add_executor_job(
232  validate_query, db_url_for_validation, query, column
233  )
234  except NoSuchColumnError:
235  errors["column"] = "column_invalid"
236  description_placeholders = {"column": column}
237  except MultipleResultsFound:
238  errors["query"] = "multiple_queries"
239  except SQLAlchemyError:
240  errors["db_url"] = "db_url_invalid"
241  except SQLParseError:
242  errors["query"] = "query_no_read_only"
243  except ValueError as err:
244  _LOGGER.debug("Invalid query: %s", err)
245  errors["query"] = "query_invalid"
246  else:
247  recorder_db = get_instance(self.hass).db_url
248  _LOGGER.debug(
249  "db_url: %s, resolved db_url: %s, recorder: %s",
250  db_url,
251  db_url_for_validation,
252  recorder_db,
253  )
254 
255  options = {
256  CONF_QUERY: query,
257  CONF_COLUMN_NAME: column,
258  CONF_NAME: name,
259  }
260  if uom := user_input.get(CONF_UNIT_OF_MEASUREMENT):
261  options[CONF_UNIT_OF_MEASUREMENT] = uom
262  if value_template := user_input.get(CONF_VALUE_TEMPLATE):
263  options[CONF_VALUE_TEMPLATE] = value_template
264  if device_class := user_input.get(CONF_DEVICE_CLASS):
265  options[CONF_DEVICE_CLASS] = device_class
266  if state_class := user_input.get(CONF_STATE_CLASS):
267  options[CONF_STATE_CLASS] = state_class
268  if db_url_for_validation != get_instance(self.hass).db_url:
269  options[CONF_DB_URL] = db_url_for_validation
270 
271  return self.async_create_entryasync_create_entry(
272  data=options,
273  )
274 
275  return self.async_show_formasync_show_form(
276  step_id="init",
277  data_schema=self.add_suggested_values_to_schemaadd_suggested_values_to_schema(
278  OPTIONS_SCHEMA, user_input or self.config_entryconfig_entryconfig_entry.options
279  ),
280  errors=errors,
281  description_placeholders=description_placeholders,
282  )
SQLOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:145
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:151
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:217
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
bool validate_query(str db_url, str query, str column)
Definition: config_flow.py:102
str resolve_db_url(HomeAssistant hass, str|None db_url)
Definition: util.py:22
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74