1 """Adds config flow for SQL integration."""
3 from __future__
import annotations
9 from sqlalchemy.engine
import Result
10 from sqlalchemy.exc
import MultipleResultsFound, NoSuchColumnError, SQLAlchemyError
11 from sqlalchemy.orm
import Session, scoped_session, sessionmaker
13 from sqlparse.exceptions
import SQLParseError
14 import voluptuous
as vol
31 CONF_UNIT_OF_MEASUREMENT,
37 from .const
import CONF_COLUMN_NAME, CONF_QUERY, DOMAIN
38 from .util
import resolve_db_url
40 _LOGGER = logging.getLogger(__name__)
43 OPTIONS_SCHEMA: vol.Schema = vol.Schema(
47 ): selector.TextSelector(),
50 ): selector.TextSelector(),
53 ): selector.TextSelector(selector.TextSelectorConfig(multiline=
True)),
55 CONF_UNIT_OF_MEASUREMENT,
56 ): selector.TextSelector(),
59 ): selector.TemplateSelector(),
60 vol.Optional(CONF_DEVICE_CLASS): selector.SelectSelector(
61 selector.SelectSelectorConfig(
64 for cls
in SensorDeviceClass
65 if cls != SensorDeviceClass.ENUM
67 mode=selector.SelectSelectorMode.DROPDOWN,
68 translation_key=
"device_class",
72 vol.Optional(CONF_STATE_CLASS): selector.SelectSelector(
73 selector.SelectSelectorConfig(
74 options=[cls.value
for cls
in SensorStateClass],
75 mode=selector.SelectSelectorMode.DROPDOWN,
76 translation_key=
"state_class",
83 CONFIG_SCHEMA: vol.Schema = vol.Schema(
85 vol.Required(CONF_NAME, default=
"Select SQL Query"): selector.TextSelector(),
87 ).extend(OPTIONS_SCHEMA.schema)
91 """Validate that value is a SQL SELECT query."""
92 if len(query := sqlparse.parse(value.lstrip().lstrip(
";"))) > 1:
93 raise MultipleResultsFound
94 if len(query) == 0
or (query_type := query[0].get_type()) ==
"UNKNOWN":
96 if query_type !=
"SELECT":
97 _LOGGER.debug(
"The SQL query %s is of type %s", query, query_type)
103 """Validate SQL query."""
105 engine = sqlalchemy.create_engine(db_url, future=
True)
106 sessmaker = scoped_session(sessionmaker(bind=engine, future=
True))
107 sess: Session = sessmaker()
110 result: Result = sess.execute(sqlalchemy.text(query))
111 except SQLAlchemyError
as error:
112 _LOGGER.debug(
"Execution error %s", error)
116 raise ValueError(error)
from error
118 for res
in result.mappings():
119 if column
not in res:
120 _LOGGER.debug(
"Column `%s` is not returned by the query", column)
124 raise NoSuchColumnError(f
"Column {column} is not returned by the query.")
127 _LOGGER.debug(
"Return value from query: %s", data)
137 """Handle a config flow for SQL integration."""
144 config_entry: ConfigEntry,
145 ) -> SQLOptionsFlowHandler:
146 """Get the options flow for this handler."""
150 self, user_input: dict[str, Any] |
None =
None
151 ) -> ConfigFlowResult:
152 """Handle the user step."""
154 description_placeholders = {}
156 if user_input
is not None:
157 db_url = user_input.get(CONF_DB_URL)
158 query = user_input[CONF_QUERY]
159 column = user_input[CONF_COLUMN_NAME]
160 db_url_for_validation =
None
165 await self.hass.async_add_executor_job(
166 validate_query, db_url_for_validation, query, column
168 except NoSuchColumnError:
169 errors[
"column"] =
"column_invalid"
170 description_placeholders = {
"column": column}
171 except MultipleResultsFound:
172 errors[
"query"] =
"multiple_queries"
173 except SQLAlchemyError:
174 errors[
"db_url"] =
"db_url_invalid"
175 except SQLParseError:
176 errors[
"query"] =
"query_no_read_only"
177 except ValueError
as err:
178 _LOGGER.debug(
"Invalid query: %s", err)
179 errors[
"query"] =
"query_invalid"
183 CONF_COLUMN_NAME: column,
184 CONF_NAME: user_input[CONF_NAME],
186 if uom := user_input.get(CONF_UNIT_OF_MEASUREMENT):
187 options[CONF_UNIT_OF_MEASUREMENT] = uom
188 if value_template := user_input.get(CONF_VALUE_TEMPLATE):
189 options[CONF_VALUE_TEMPLATE] = value_template
190 if device_class := user_input.get(CONF_DEVICE_CLASS):
191 options[CONF_DEVICE_CLASS] = device_class
192 if state_class := user_input.get(CONF_STATE_CLASS):
193 options[CONF_STATE_CLASS] = state_class
194 if db_url_for_validation !=
get_instance(self.hass).db_url:
195 options[CONF_DB_URL] = db_url_for_validation
199 title=user_input[CONF_NAME],
208 description_placeholders=description_placeholders,
213 """Handle SQL options."""
216 self, user_input: dict[str, Any] |
None =
None
217 ) -> ConfigFlowResult:
218 """Manage SQL options."""
220 description_placeholders = {}
222 if user_input
is not None:
223 db_url = user_input.get(CONF_DB_URL)
224 query = user_input[CONF_QUERY]
225 column = user_input[CONF_COLUMN_NAME]
231 await self.hass.async_add_executor_job(
232 validate_query, db_url_for_validation, query, column
234 except NoSuchColumnError:
235 errors[
"column"] =
"column_invalid"
236 description_placeholders = {
"column": column}
237 except MultipleResultsFound:
238 errors[
"query"] =
"multiple_queries"
239 except SQLAlchemyError:
240 errors[
"db_url"] =
"db_url_invalid"
241 except SQLParseError:
242 errors[
"query"] =
"query_no_read_only"
243 except ValueError
as err:
244 _LOGGER.debug(
"Invalid query: %s", err)
245 errors[
"query"] =
"query_invalid"
249 "db_url: %s, resolved db_url: %s, recorder: %s",
251 db_url_for_validation,
257 CONF_COLUMN_NAME: column,
260 if uom := user_input.get(CONF_UNIT_OF_MEASUREMENT):
261 options[CONF_UNIT_OF_MEASUREMENT] = uom
262 if value_template := user_input.get(CONF_VALUE_TEMPLATE):
263 options[CONF_VALUE_TEMPLATE] = value_template
264 if device_class := user_input.get(CONF_DEVICE_CLASS):
265 options[CONF_DEVICE_CLASS] = device_class
266 if state_class := user_input.get(CONF_STATE_CLASS):
267 options[CONF_STATE_CLASS] = state_class
268 if db_url_for_validation !=
get_instance(self.hass).db_url:
269 options[CONF_DB_URL] = db_url_for_validation
281 description_placeholders=description_placeholders,
SQLOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
vol.Schema add_suggested_values_to_schema(self, vol.Schema data_schema, Mapping[str, Any]|None suggested_values)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
str validate_sql_select(str value)
bool validate_query(str db_url, str query, str column)
str resolve_db_url(HomeAssistant hass, str|None db_url)
Recorder get_instance(HomeAssistant hass)