1 """Config flow for DSMR integration."""
3 from __future__
import annotations
6 from functools
import partial
10 from dsmr_parser
import obis_references
as obis_ref
11 from dsmr_parser.clients.protocol
import create_dsmr_reader, create_tcp_dsmr_reader
12 from dsmr_parser.clients.rfxtrx_protocol
import (
13 create_rfxtrx_dsmr_reader,
14 create_rfxtrx_tcp_dsmr_reader,
16 from dsmr_parser.objects
import DSMRObject
18 import serial.tools.list_ports
19 import voluptuous
as vol
35 CONF_TIME_BETWEEN_UPDATE,
36 DEFAULT_TIME_BETWEEN_UPDATE,
44 CONF_MANUAL_PATH =
"Enter Manually"
48 """Test the connection to DSMR and receive telegram to read serial ids."""
51 self, host: str |
None, port: int, dsmr_version: str, protocol: str
58 self.
_telegram_telegram: dict[str, DSMRObject] = {}
60 if dsmr_version ==
"5B":
62 if dsmr_version ==
"5L":
64 if dsmr_version ==
"Q3D":
68 """Equipment identifier."""
71 identifier: str |
None = getattr(dsmr_object,
"value",
None)
76 """Equipment identifier gas."""
77 if obis_ref.EQUIPMENT_IDENTIFIER_GAS
in self.
_telegram_telegram:
78 dsmr_object = self.
_telegram_telegram[obis_ref.EQUIPMENT_IDENTIFIER_GAS]
79 identifier: str |
None = getattr(dsmr_object,
"value",
None)
84 """Test if we can validate connection with the device."""
86 def update_telegram(telegram: dict[str, DSMRObject]) ->
None:
91 if self.
_dsmr_version_dsmr_version ==
"5S" and obis_ref.P1_MESSAGE_TIMESTAMP
in telegram:
95 if self.
_host_host
is None:
96 if self.
_protocol_protocol == DSMR_PROTOCOL:
97 create_reader = create_dsmr_reader
99 create_reader = create_rfxtrx_dsmr_reader
100 reader_factory = partial(
108 if self.
_protocol_protocol == DSMR_PROTOCOL:
109 create_reader = create_tcp_dsmr_reader
111 create_reader = create_rfxtrx_tcp_dsmr_reader
112 reader_factory = partial(
122 transport, protocol = await asyncio.create_task(reader_factory())
123 except (serial.SerialException, OSError):
124 LOGGER.exception(
"Error connecting to DSMR")
129 async
with asyncio.timeout(30):
130 await protocol.wait_closed()
134 await protocol.wait_closed()
139 hass: HomeAssistant, data: dict[str, Any], protocol: str
140 ) -> dict[str, str |
None]:
141 """Validate the user input allows us to connect."""
145 data[CONF_DSMR_VERSION],
149 if not await conn.validate_connect(hass):
152 equipment_identifier = conn.equipment_identifier()
153 equipment_identifier_gas = conn.equipment_identifier_gas()
156 if equipment_identifier
is None and data[CONF_DSMR_VERSION] !=
"5S":
157 raise CannotCommunicate
160 CONF_SERIAL_ID: equipment_identifier,
161 CONF_SERIAL_ID_GAS: equipment_identifier_gas,
166 """Handle a config flow for DSMR."""
170 _dsmr_version: str |
None =
None
175 config_entry: ConfigEntry,
176 ) -> DSMROptionFlowHandler:
177 """Get the options flow for this handler."""
181 self, user_input: dict[str, Any] |
None =
None
182 ) -> ConfigFlowResult:
183 """Step when user initializes a integration."""
184 if user_input
is not None:
185 user_selection = user_input[CONF_TYPE]
186 if user_selection ==
"Serial":
191 list_of_types = [
"Serial",
"Network"]
193 schema = vol.Schema({vol.Required(CONF_TYPE): vol.In(list_of_types)})
197 self, user_input: dict[str, Any] |
None =
None
198 ) -> ConfigFlowResult:
199 """Step when setting up network configuration."""
200 errors: dict[str, str] = {}
201 if user_input
is not None:
205 title=f
"{data[CONF_HOST]}:{data[CONF_PORT]}", data=data
210 vol.Required(CONF_HOST): str,
211 vol.Required(CONF_PORT): int,
212 vol.Required(CONF_DSMR_VERSION): vol.In(DSMR_VERSIONS),
216 step_id=
"setup_network",
222 self, user_input: dict[str, Any] |
None =
None
223 ) -> ConfigFlowResult:
224 """Step when setting up serial configuration."""
225 errors: dict[str, str] = {}
226 if user_input
is not None:
227 user_selection = user_input[CONF_PORT]
228 if user_selection == CONF_MANUAL_PATH:
232 dev_path = await self.hass.async_add_executor_job(
233 get_serial_by_id, user_selection
238 CONF_DSMR_VERSION: user_input[CONF_DSMR_VERSION],
245 ports = await self.hass.async_add_executor_job(serial.tools.list_ports.comports)
247 port.device: f
"{port}, s/n: {port.serial_number or 'n/a'}"
248 + (f
" - {port.manufacturer}" if port.manufacturer
else "")
251 list_of_ports[CONF_MANUAL_PATH] = CONF_MANUAL_PATH
255 vol.Required(CONF_PORT): vol.In(list_of_ports),
256 vol.Required(CONF_DSMR_VERSION): vol.In(DSMR_VERSIONS),
260 step_id=
"setup_serial",
266 self, user_input: dict[str, Any] |
None =
None
267 ) -> ConfigFlowResult:
268 """Select path manually."""
269 if user_input
is not None:
271 CONF_PORT: user_input[CONF_PORT],
275 errors: dict[str, str] = {}
280 schema = vol.Schema({vol.Required(CONF_PORT): str})
282 step_id=
"setup_serial_manual_path",
287 self, input_data: dict[str, Any], errors: dict[str, str]
289 """Validate dsmr connection and create data."""
294 protocol = DSMR_PROTOCOL
296 except CannotCommunicate:
297 protocol = RFXTRX_DSMR_PROTOCOL
300 data = {**data, **info, CONF_PROTOCOL: protocol}
302 if info[CONF_SERIAL_ID]:
305 except CannotConnect:
306 errors[
"base"] =
"cannot_connect"
307 except CannotCommunicate:
308 errors[
"base"] =
"cannot_communicate"
314 """Handle options."""
317 self, user_input: dict[str, Any] |
None =
None
318 ) -> ConfigFlowResult:
319 """Manage the options."""
320 if user_input
is not None:
325 data_schema=vol.Schema(
328 CONF_TIME_BETWEEN_UPDATE,
330 CONF_TIME_BETWEEN_UPDATE, DEFAULT_TIME_BETWEEN_UPDATE
332 ): vol.All(vol.Coerce(int), vol.Range(min=0)),
339 """Return a /dev/serial/by-id match for given device if available."""
340 by_id =
"/dev/serial/by-id"
341 if not os.path.isdir(by_id):
344 for path
in (entry.path
for entry
in os.scandir(by_id)
if entry.is_symlink()):
345 if os.path.realpath(path) == dev_path:
351 """Error to indicate we cannot connect."""
354 class CannotCommunicate(HomeAssistantError):
355 """Error to indicate we cannot connect."""
bool validate_connect(self, HomeAssistant hass)
None __init__(self, str|None host, int port, str dsmr_version, str protocol)
str|None equipment_identifier(self)
str|None equipment_identifier_gas(self)
ConfigFlowResult async_step_setup_serial_manual_path(self, dict[str, Any]|None user_input=None)
DSMROptionFlowHandler async_get_options_flow(ConfigEntry config_entry)
dict[str, Any] async_validate_dsmr(self, dict[str, Any] input_data, dict[str, str] errors)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_setup_network(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_setup_serial(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
str get_serial_by_id(str dev_path)
dict[str, str|None] _validate_dsmr_connection(HomeAssistant hass, dict[str, Any] data, str protocol)