Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for DSMR integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from functools import partial
7 import os
8 from typing import Any
9 
10 from dsmr_parser import obis_references as obis_ref
11 from dsmr_parser.clients.protocol import create_dsmr_reader, create_tcp_dsmr_reader
12 from dsmr_parser.clients.rfxtrx_protocol import (
13  create_rfxtrx_dsmr_reader,
14  create_rfxtrx_tcp_dsmr_reader,
15 )
16 from dsmr_parser.objects import DSMRObject
17 import serial
18 import serial.tools.list_ports
19 import voluptuous as vol
20 
21 from homeassistant.config_entries import (
22  ConfigEntry,
23  ConfigFlow,
24  ConfigFlowResult,
25  OptionsFlow,
26 )
27 from homeassistant.const import CONF_HOST, CONF_PORT, CONF_PROTOCOL, CONF_TYPE
28 from homeassistant.core import HomeAssistant, callback
29 from homeassistant.exceptions import HomeAssistantError
30 
31 from .const import (
32  CONF_DSMR_VERSION,
33  CONF_SERIAL_ID,
34  CONF_SERIAL_ID_GAS,
35  CONF_TIME_BETWEEN_UPDATE,
36  DEFAULT_TIME_BETWEEN_UPDATE,
37  DOMAIN,
38  DSMR_PROTOCOL,
39  DSMR_VERSIONS,
40  LOGGER,
41  RFXTRX_DSMR_PROTOCOL,
42 )
43 
44 CONF_MANUAL_PATH = "Enter Manually"
45 
46 
48  """Test the connection to DSMR and receive telegram to read serial ids."""
49 
50  def __init__(
51  self, host: str | None, port: int, dsmr_version: str, protocol: str
52  ) -> None:
53  """Initialize."""
54  self._host_host = host
55  self._port_port = port
56  self._dsmr_version_dsmr_version = dsmr_version
57  self._protocol_protocol = protocol
58  self._telegram_telegram: dict[str, DSMRObject] = {}
59  self._equipment_identifier_equipment_identifier = obis_ref.EQUIPMENT_IDENTIFIER
60  if dsmr_version == "5B":
61  self._equipment_identifier_equipment_identifier = obis_ref.BELGIUM_EQUIPMENT_IDENTIFIER
62  if dsmr_version == "5L":
63  self._equipment_identifier_equipment_identifier = obis_ref.LUXEMBOURG_EQUIPMENT_IDENTIFIER
64  if dsmr_version == "Q3D":
65  self._equipment_identifier_equipment_identifier = obis_ref.Q3D_EQUIPMENT_IDENTIFIER
66 
67  def equipment_identifier(self) -> str | None:
68  """Equipment identifier."""
69  if self._equipment_identifier_equipment_identifier in self._telegram_telegram:
70  dsmr_object = self._telegram_telegram[self._equipment_identifier_equipment_identifier]
71  identifier: str | None = getattr(dsmr_object, "value", None)
72  return identifier
73  return None
74 
75  def equipment_identifier_gas(self) -> str | None:
76  """Equipment identifier gas."""
77  if obis_ref.EQUIPMENT_IDENTIFIER_GAS in self._telegram_telegram:
78  dsmr_object = self._telegram_telegram[obis_ref.EQUIPMENT_IDENTIFIER_GAS]
79  identifier: str | None = getattr(dsmr_object, "value", None)
80  return identifier
81  return None
82 
83  async def validate_connect(self, hass: HomeAssistant) -> bool:
84  """Test if we can validate connection with the device."""
85 
86  def update_telegram(telegram: dict[str, DSMRObject]) -> None:
87  if self._equipment_identifier_equipment_identifier in telegram:
88  self._telegram_telegram = telegram
89  transport.close()
90  # Swedish meters have no equipment identifier
91  if self._dsmr_version_dsmr_version == "5S" and obis_ref.P1_MESSAGE_TIMESTAMP in telegram:
92  self._telegram_telegram = telegram
93  transport.close()
94 
95  if self._host_host is None:
96  if self._protocol_protocol == DSMR_PROTOCOL:
97  create_reader = create_dsmr_reader
98  else:
99  create_reader = create_rfxtrx_dsmr_reader
100  reader_factory = partial(
101  create_reader,
102  self._port_port,
103  self._dsmr_version_dsmr_version,
104  update_telegram,
105  loop=hass.loop,
106  )
107  else:
108  if self._protocol_protocol == DSMR_PROTOCOL:
109  create_reader = create_tcp_dsmr_reader
110  else:
111  create_reader = create_rfxtrx_tcp_dsmr_reader
112  reader_factory = partial(
113  create_reader,
114  self._host_host,
115  self._port_port,
116  self._dsmr_version_dsmr_version,
117  update_telegram,
118  loop=hass.loop,
119  )
120 
121  try:
122  transport, protocol = await asyncio.create_task(reader_factory())
123  except (serial.SerialException, OSError):
124  LOGGER.exception("Error connecting to DSMR")
125  return False
126 
127  if transport:
128  try:
129  async with asyncio.timeout(30):
130  await protocol.wait_closed()
131  except TimeoutError:
132  # Timeout (no data received), close transport and return True (if telegram is empty, will result in CannotCommunicate error)
133  transport.close()
134  await protocol.wait_closed()
135  return True
136 
137 
139  hass: HomeAssistant, data: dict[str, Any], protocol: str
140 ) -> dict[str, str | None]:
141  """Validate the user input allows us to connect."""
142  conn = DSMRConnection(
143  data.get(CONF_HOST),
144  data[CONF_PORT],
145  data[CONF_DSMR_VERSION],
146  protocol,
147  )
148 
149  if not await conn.validate_connect(hass):
150  raise CannotConnect
151 
152  equipment_identifier = conn.equipment_identifier()
153  equipment_identifier_gas = conn.equipment_identifier_gas()
154 
155  # Check only for equipment identifier in case no gas meter is connected
156  if equipment_identifier is None and data[CONF_DSMR_VERSION] != "5S":
157  raise CannotCommunicate
158 
159  return {
160  CONF_SERIAL_ID: equipment_identifier,
161  CONF_SERIAL_ID_GAS: equipment_identifier_gas,
162  }
163 
164 
165 class DSMRFlowHandler(ConfigFlow, domain=DOMAIN):
166  """Handle a config flow for DSMR."""
167 
168  VERSION = 1
169 
170  _dsmr_version: str | None = None
171 
172  @staticmethod
173  @callback
175  config_entry: ConfigEntry,
176  ) -> DSMROptionFlowHandler:
177  """Get the options flow for this handler."""
178  return DSMROptionFlowHandler()
179 
180  async def async_step_user(
181  self, user_input: dict[str, Any] | None = None
182  ) -> ConfigFlowResult:
183  """Step when user initializes a integration."""
184  if user_input is not None:
185  user_selection = user_input[CONF_TYPE]
186  if user_selection == "Serial":
187  return await self.async_step_setup_serialasync_step_setup_serial()
188 
189  return await self.async_step_setup_networkasync_step_setup_network()
190 
191  list_of_types = ["Serial", "Network"]
192 
193  schema = vol.Schema({vol.Required(CONF_TYPE): vol.In(list_of_types)})
194  return self.async_show_formasync_show_formasync_show_form(step_id="user", data_schema=schema)
195 
197  self, user_input: dict[str, Any] | None = None
198  ) -> ConfigFlowResult:
199  """Step when setting up network configuration."""
200  errors: dict[str, str] = {}
201  if user_input is not None:
202  data = await self.async_validate_dsmrasync_validate_dsmr(user_input, errors)
203  if not errors:
204  return self.async_create_entryasync_create_entryasync_create_entry(
205  title=f"{data[CONF_HOST]}:{data[CONF_PORT]}", data=data
206  )
207 
208  schema = vol.Schema(
209  {
210  vol.Required(CONF_HOST): str,
211  vol.Required(CONF_PORT): int,
212  vol.Required(CONF_DSMR_VERSION): vol.In(DSMR_VERSIONS),
213  }
214  )
215  return self.async_show_formasync_show_formasync_show_form(
216  step_id="setup_network",
217  data_schema=schema,
218  errors=errors,
219  )
220 
222  self, user_input: dict[str, Any] | None = None
223  ) -> ConfigFlowResult:
224  """Step when setting up serial configuration."""
225  errors: dict[str, str] = {}
226  if user_input is not None:
227  user_selection = user_input[CONF_PORT]
228  if user_selection == CONF_MANUAL_PATH:
229  self._dsmr_version_dsmr_version = user_input[CONF_DSMR_VERSION]
230  return await self.async_step_setup_serial_manual_pathasync_step_setup_serial_manual_path()
231 
232  dev_path = await self.hass.async_add_executor_job(
233  get_serial_by_id, user_selection
234  )
235 
236  validate_data = {
237  CONF_PORT: dev_path,
238  CONF_DSMR_VERSION: user_input[CONF_DSMR_VERSION],
239  }
240 
241  data = await self.async_validate_dsmrasync_validate_dsmr(validate_data, errors)
242  if not errors:
243  return self.async_create_entryasync_create_entryasync_create_entry(title=data[CONF_PORT], data=data)
244 
245  ports = await self.hass.async_add_executor_job(serial.tools.list_ports.comports)
246  list_of_ports = {
247  port.device: f"{port}, s/n: {port.serial_number or 'n/a'}"
248  + (f" - {port.manufacturer}" if port.manufacturer else "")
249  for port in ports
250  }
251  list_of_ports[CONF_MANUAL_PATH] = CONF_MANUAL_PATH
252 
253  schema = vol.Schema(
254  {
255  vol.Required(CONF_PORT): vol.In(list_of_ports),
256  vol.Required(CONF_DSMR_VERSION): vol.In(DSMR_VERSIONS),
257  }
258  )
259  return self.async_show_formasync_show_formasync_show_form(
260  step_id="setup_serial",
261  data_schema=schema,
262  errors=errors,
263  )
264 
266  self, user_input: dict[str, Any] | None = None
267  ) -> ConfigFlowResult:
268  """Select path manually."""
269  if user_input is not None:
270  validate_data = {
271  CONF_PORT: user_input[CONF_PORT],
272  CONF_DSMR_VERSION: self._dsmr_version_dsmr_version,
273  }
274 
275  errors: dict[str, str] = {}
276  data = await self.async_validate_dsmrasync_validate_dsmr(validate_data, errors)
277  if not errors:
278  return self.async_create_entryasync_create_entryasync_create_entry(title=data[CONF_PORT], data=data)
279 
280  schema = vol.Schema({vol.Required(CONF_PORT): str})
281  return self.async_show_formasync_show_formasync_show_form(
282  step_id="setup_serial_manual_path",
283  data_schema=schema,
284  )
285 
287  self, input_data: dict[str, Any], errors: dict[str, str]
288  ) -> dict[str, Any]:
289  """Validate dsmr connection and create data."""
290  data = input_data
291 
292  try:
293  try:
294  protocol = DSMR_PROTOCOL
295  info = await _validate_dsmr_connection(self.hass, data, protocol)
296  except CannotCommunicate:
297  protocol = RFXTRX_DSMR_PROTOCOL
298  info = await _validate_dsmr_connection(self.hass, data, protocol)
299 
300  data = {**data, **info, CONF_PROTOCOL: protocol}
301 
302  if info[CONF_SERIAL_ID]:
303  await self.async_set_unique_idasync_set_unique_id(info[CONF_SERIAL_ID])
304  self._abort_if_unique_id_configured_abort_if_unique_id_configured()
305  except CannotConnect:
306  errors["base"] = "cannot_connect"
307  except CannotCommunicate:
308  errors["base"] = "cannot_communicate"
309 
310  return data
311 
312 
314  """Handle options."""
315 
316  async def async_step_init(
317  self, user_input: dict[str, Any] | None = None
318  ) -> ConfigFlowResult:
319  """Manage the options."""
320  if user_input is not None:
321  return self.async_create_entryasync_create_entry(title="", data=user_input)
322 
323  return self.async_show_formasync_show_form(
324  step_id="init",
325  data_schema=vol.Schema(
326  {
327  vol.Optional(
328  CONF_TIME_BETWEEN_UPDATE,
329  default=self.config_entryconfig_entryconfig_entry.options.get(
330  CONF_TIME_BETWEEN_UPDATE, DEFAULT_TIME_BETWEEN_UPDATE
331  ),
332  ): vol.All(vol.Coerce(int), vol.Range(min=0)),
333  }
334  ),
335  )
336 
337 
338 def get_serial_by_id(dev_path: str) -> str:
339  """Return a /dev/serial/by-id match for given device if available."""
340  by_id = "/dev/serial/by-id"
341  if not os.path.isdir(by_id):
342  return dev_path
343 
344  for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
345  if os.path.realpath(path) == dev_path:
346  return path
347  return dev_path
348 
349 
351  """Error to indicate we cannot connect."""
352 
353 
354 class CannotCommunicate(HomeAssistantError):
355  """Error to indicate we cannot connect."""
None __init__(self, str|None host, int port, str dsmr_version, str protocol)
Definition: config_flow.py:52
ConfigFlowResult async_step_setup_serial_manual_path(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:267
DSMROptionFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:176
dict[str, Any] async_validate_dsmr(self, dict[str, Any] input_data, dict[str, str] errors)
Definition: config_flow.py:288
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:182
ConfigFlowResult async_step_setup_network(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:198
ConfigFlowResult async_step_setup_serial(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:223
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:318
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
dict[str, str|None] _validate_dsmr_connection(HomeAssistant hass, dict[str, Any] data, str protocol)
Definition: config_flow.py:140