Home Assistant Unofficial Reference 2024.12.1
validators.py
Go to the documentation of this file.
1 """Validate Modbus configuration."""
2 
3 from __future__ import annotations
4 
5 from collections import namedtuple
6 import logging
7 import struct
8 from typing import Any
9 
10 import voluptuous as vol
11 
12 from homeassistant.components.climate import HVACMode
13 from homeassistant.const import (
14  CONF_ADDRESS,
15  CONF_COUNT,
16  CONF_HOST,
17  CONF_NAME,
18  CONF_PORT,
19  CONF_SCAN_INTERVAL,
20  CONF_STRUCTURE,
21  CONF_TIMEOUT,
22  CONF_TYPE,
23 )
24 from homeassistant.core import HomeAssistant
25 from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
26 
27 from .const import (
28  CONF_DATA_TYPE,
29  CONF_FAN_MODE_VALUES,
30  CONF_SLAVE_COUNT,
31  CONF_SWAP,
32  CONF_SWAP_BYTE,
33  CONF_SWAP_WORD,
34  CONF_SWAP_WORD_BYTE,
35  CONF_SWING_MODE_VALUES,
36  CONF_VIRTUAL_COUNT,
37  DEFAULT_HUB,
38  DEFAULT_SCAN_INTERVAL,
39  MODBUS_DOMAIN as DOMAIN,
40  PLATFORMS,
41  SERIAL,
42  DataType,
43 )
44 
45 _LOGGER = logging.getLogger(__name__)
46 
47 ENTRY = namedtuple( # noqa: PYI024
48  "ENTRY",
49  [
50  "struct_id",
51  "register_count",
52  "validate_parm",
53  ],
54 )
55 
56 
57 ILLEGAL = "I"
58 OPTIONAL = "O"
59 DEMANDED = "D"
60 
61 PARM_IS_LEGAL = namedtuple( # noqa: PYI024
62  "PARM_IS_LEGAL",
63  [
64  "count",
65  "structure",
66  "slave_count",
67  "swap_byte",
68  "swap_word",
69  ],
70 )
71 DEFAULT_STRUCT_FORMAT = {
72  DataType.INT16: ENTRY(
73  "h", 1, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, ILLEGAL)
74  ),
75  DataType.UINT16: ENTRY(
76  "H", 1, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, ILLEGAL)
77  ),
78  DataType.FLOAT16: ENTRY(
79  "e", 1, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, ILLEGAL)
80  ),
81  DataType.INT32: ENTRY(
82  "i", 2, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
83  ),
84  DataType.UINT32: ENTRY(
85  "I", 2, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
86  ),
87  DataType.FLOAT32: ENTRY(
88  "f", 2, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
89  ),
90  DataType.INT64: ENTRY(
91  "q", 4, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
92  ),
93  DataType.UINT64: ENTRY(
94  "Q", 4, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
95  ),
96  DataType.FLOAT64: ENTRY(
97  "d", 4, PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
98  ),
99  DataType.STRING: ENTRY(
100  "s", 0, PARM_IS_LEGAL(DEMANDED, ILLEGAL, ILLEGAL, OPTIONAL, ILLEGAL)
101  ),
102  DataType.CUSTOM: ENTRY(
103  "?", 0, PARM_IS_LEGAL(DEMANDED, DEMANDED, ILLEGAL, ILLEGAL, ILLEGAL)
104  ),
105 }
106 
107 
109  hass: HomeAssistant, key: str, subs: list[str], err: str
110 ) -> None:
111  """Create issue modbus style."""
113  hass,
114  DOMAIN,
115  key,
116  is_fixable=False,
117  severity=IssueSeverity.WARNING,
118  translation_key=key,
119  translation_placeholders={
120  "sub_1": subs[0],
121  "sub_2": subs[1],
122  "sub_3": subs[2],
123  "integration": DOMAIN,
124  },
125  issue_domain=DOMAIN,
126  learn_more_url="https://www.home-assistant.io/integrations/modbus",
127  )
128  _LOGGER.warning(err)
129 
130 
131 def struct_validator(config: dict[str, Any]) -> dict[str, Any]:
132  """Sensor schema validator."""
133 
134  name = config[CONF_NAME]
135  data_type = config[CONF_DATA_TYPE]
136  if data_type == "int":
137  data_type = config[CONF_DATA_TYPE] = DataType.INT16
138  count = config.get(CONF_COUNT)
139  structure = config.get(CONF_STRUCTURE)
140  slave_count = config.get(CONF_SLAVE_COUNT, config.get(CONF_VIRTUAL_COUNT))
141  validator = DEFAULT_STRUCT_FORMAT[data_type].validate_parm
142  swap_type = config.get(CONF_SWAP)
143  swap_dict = {
144  CONF_SWAP_BYTE: validator.swap_byte,
145  CONF_SWAP_WORD: validator.swap_word,
146  CONF_SWAP_WORD_BYTE: validator.swap_word,
147  }
148  swap_type_validator = swap_dict[swap_type] if swap_type else OPTIONAL
149  for entry in (
150  (count, validator.count, CONF_COUNT),
151  (structure, validator.structure, CONF_STRUCTURE),
152  (
153  slave_count,
154  validator.slave_count,
155  f"{CONF_VIRTUAL_COUNT} / {CONF_SLAVE_COUNT}:",
156  ),
157  (swap_type, swap_type_validator, f"{CONF_SWAP}:{swap_type}"),
158  ):
159  if entry[0] is None:
160  if entry[1] == DEMANDED:
161  error = f"{name}: `{entry[2]}` missing, demanded with `{CONF_DATA_TYPE}: {data_type}`"
162  raise vol.Invalid(error)
163  elif entry[1] == ILLEGAL:
164  error = f"{name}: `{entry[2]}` illegal with `{CONF_DATA_TYPE}: {data_type}`"
165  raise vol.Invalid(error)
166 
167  if config[CONF_DATA_TYPE] == DataType.CUSTOM:
168  assert isinstance(structure, str)
169  assert isinstance(count, int)
170  try:
171  size = struct.calcsize(structure)
172  except struct.error as err:
173  raise vol.Invalid(f"{name}: error in structure format --> {err!s}") from err
174  bytecount = count * 2
175  if bytecount != size:
176  raise vol.Invalid(
177  f"{name}: Size of structure is {size} bytes but `{CONF_COUNT}: {count}` is {bytecount} bytes"
178  )
179  else:
180  if data_type != DataType.STRING:
181  config[CONF_COUNT] = DEFAULT_STRUCT_FORMAT[data_type].register_count
182  if slave_count:
183  structure = (
184  f">{slave_count + 1}{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
185  )
186  else:
187  structure = f">{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
188  return {
189  **config,
190  CONF_STRUCTURE: structure,
191  CONF_SWAP: swap_type,
192  }
193 
194 
195 def hvac_fixedsize_reglist_validator(value: Any) -> list:
196  """Check the number of registers for target temp. and coerce it to a list, if valid."""
197  if isinstance(value, int):
198  value = [value] * len(HVACMode)
199  return list(value)
200 
201  if len(value) == len(HVACMode):
202  _rv = True
203  for svalue in value:
204  if isinstance(svalue, int) is False:
205  _rv = False
206  break
207  if _rv is True:
208  return list(value)
209 
210  raise vol.Invalid(
211  f"Invalid target temp register. Required type: integer, allowed 1 or list of {len(HVACMode)} registers"
212  )
213 
214 
215 def nan_validator(value: Any) -> int:
216  """Convert nan string to number (can be hex string or int)."""
217  if isinstance(value, int):
218  return value
219  try:
220  return int(value)
221  except (TypeError, ValueError):
222  pass
223  try:
224  return int(value, 16)
225  except (TypeError, ValueError) as err:
226  raise vol.Invalid(f"invalid number {value}") from err
227 
228 
229 def duplicate_fan_mode_validator(config: dict[str, Any]) -> dict:
230  """Control modbus climate fan mode values for duplicates."""
231  fan_modes: set[int] = set()
232  errors = []
233  for key, value in config[CONF_FAN_MODE_VALUES].items():
234  if value in fan_modes:
235  warn = f"Modbus fan mode {key} has a duplicate value {value}, not loaded, values must be unique!"
236  _LOGGER.warning(warn)
237  errors.append(key)
238  else:
239  fan_modes.add(value)
240 
241  for key in reversed(errors):
242  del config[CONF_FAN_MODE_VALUES][key]
243  return config
244 
245 
246 def duplicate_swing_mode_validator(config: dict[str, Any]) -> dict:
247  """Control modbus climate swing mode values for duplicates."""
248  swing_modes: set[int] = set()
249  errors = []
250  for key, value in config[CONF_SWING_MODE_VALUES].items():
251  if value in swing_modes:
252  warn = f"Modbus swing mode {key} has a duplicate value {value}, not loaded, values must be unique!"
253  _LOGGER.warning(warn)
254  errors.append(key)
255  else:
256  swing_modes.add(value)
257 
258  for key in reversed(errors):
259  del config[CONF_SWING_MODE_VALUES][key]
260  return config
261 
262 
263 def register_int_list_validator(value: Any) -> Any:
264  """Check if a register (CONF_ADRESS) is an int or a list having only 1 register."""
265  if isinstance(value, int) and value >= 0:
266  return value
267 
268  if isinstance(value, list):
269  if (len(value) == 1) and isinstance(value[0], int) and value[0] >= 0:
270  return value
271 
272  raise vol.Invalid(
273  f"Invalid {CONF_ADDRESS} register for fan/swing mode. Required type: positive integer, allowed 1 or list of 1 register."
274  )
275 
276 
278  hass: HomeAssistant,
279  hosts: set[str],
280  hub_names: set[str],
281  hub: dict,
282  hub_name_inx: int,
283 ) -> bool:
284  """Validate modbus entries."""
285  host: str = (
286  hub[CONF_PORT]
287  if hub[CONF_TYPE] == SERIAL
288  else f"{hub[CONF_HOST]}_{hub[CONF_PORT]}"
289  )
290  if CONF_NAME not in hub:
291  hub[CONF_NAME] = (
292  DEFAULT_HUB if not hub_name_inx else f"{DEFAULT_HUB}_{hub_name_inx}"
293  )
294  hub_name_inx += 1
296  hass,
297  "missing_modbus_name",
298  [
299  "name",
300  host,
301  hub[CONF_NAME],
302  ],
303  f"Modbus host/port {host} is missing name, added {hub[CONF_NAME]}!",
304  )
305  name = hub[CONF_NAME]
306  if host in hosts or name in hub_names:
308  hass,
309  "duplicate_modbus_entry",
310  [
311  host,
312  hub[CONF_NAME],
313  "",
314  ],
315  f"Modbus {name} host/port {host} is duplicate, not loaded!",
316  )
317  return False
318  hosts.add(host)
319  hub_names.add(name)
320  return True
321 
322 
324  hass: HomeAssistant,
325  hub_name: str,
326  component: str,
327  entity: dict,
328  minimum_scan_interval: int,
329  ent_names: set[str],
330  ent_addr: set[str],
331 ) -> bool:
332  """Validate entity."""
333  name = f"{component}.{entity[CONF_NAME]}"
334  scan_interval = entity.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
335  if 0 < scan_interval < 5:
336  err = (
337  f"{hub_name} {name} scan_interval is lower than 5 seconds, "
338  "which may cause Home Assistant stability issues"
339  )
340  _LOGGER.warning(err)
341  entity[CONF_SCAN_INTERVAL] = scan_interval
342  minimum_scan_interval = min(scan_interval, minimum_scan_interval)
343  if name in ent_names:
345  hass,
346  "duplicate_entity_name",
347  [
348  f"{hub_name}/{name}",
349  "",
350  "",
351  ],
352  f"Modbus {hub_name}/{name} is duplicate, second entry not loaded!",
353  )
354  return False
355  ent_names.add(name)
356  return True
357 
358 
359 def check_config(hass: HomeAssistant, config: dict) -> dict:
360  """Do final config check."""
361  hosts: set[str] = set()
362  hub_names: set[str] = set()
363  hub_name_inx = 0
364  minimum_scan_interval = 0
365  ent_names: set[str] = set()
366  ent_addr: set[str] = set()
367 
368  hub_inx = 0
369  while hub_inx < len(config):
370  hub = config[hub_inx]
371  if not validate_modbus(hass, hosts, hub_names, hub, hub_name_inx):
372  del config[hub_inx]
373  continue
374  minimum_scan_interval = 9999
375  no_entities = True
376  for component, conf_key in PLATFORMS:
377  if conf_key not in hub:
378  continue
379  no_entities = False
380  entity_inx = 0
381  entities = hub[conf_key]
382  while entity_inx < len(entities):
383  if not validate_entity(
384  hass,
385  hub[CONF_NAME],
386  component,
387  entities[entity_inx],
388  minimum_scan_interval,
389  ent_names,
390  ent_addr,
391  ):
392  del entities[entity_inx]
393  else:
394  entity_inx += 1
395  if no_entities:
397  hass,
398  "no_entities",
399  [
400  hub[CONF_NAME],
401  "",
402  "",
403  ],
404  f"Modbus {hub[CONF_NAME]} contain no entities, causing instability, entry not loaded",
405  )
406  del config[hub_inx]
407  continue
408  if hub[CONF_TIMEOUT] >= minimum_scan_interval:
409  hub[CONF_TIMEOUT] = minimum_scan_interval - 1
410  _LOGGER.warning(
411  "Modbus %s timeout is adjusted(%d) due to scan_interval",
412  hub[CONF_NAME],
413  hub[CONF_TIMEOUT],
414  )
415  hub_inx += 1
416  return config
dict check_config(HomeAssistant hass, dict config)
Definition: validators.py:359
bool validate_entity(HomeAssistant hass, str hub_name, str component, dict entity, int minimum_scan_interval, set[str] ent_names, set[str] ent_addr)
Definition: validators.py:331
dict[str, Any] struct_validator(dict[str, Any] config)
Definition: validators.py:131
bool validate_modbus(HomeAssistant hass, set[str] hosts, set[str] hub_names, dict hub, int hub_name_inx)
Definition: validators.py:283
None modbus_create_issue(HomeAssistant hass, str key, list[str] subs, str err)
Definition: validators.py:110
dict duplicate_swing_mode_validator(dict[str, Any] config)
Definition: validators.py:246
dict duplicate_fan_mode_validator(dict[str, Any] config)
Definition: validators.py:229
None async_create_issue(HomeAssistant hass, str entry_id)
Definition: repairs.py:69