1 """Validate Modbus configuration."""
3 from __future__
import annotations
5 from collections
import namedtuple
10 import voluptuous
as vol
35 CONF_SWING_MODE_VALUES,
38 DEFAULT_SCAN_INTERVAL,
39 MODBUS_DOMAIN
as DOMAIN,
45 _LOGGER = logging.getLogger(__name__)
61 PARM_IS_LEGAL = namedtuple(
71 DEFAULT_STRUCT_FORMAT = {
72 DataType.INT16:
ENTRY(
73 "h", 1,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, ILLEGAL)
75 DataType.UINT16:
ENTRY(
76 "H", 1,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, ILLEGAL)
78 DataType.FLOAT16:
ENTRY(
79 "e", 1,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, ILLEGAL)
81 DataType.INT32:
ENTRY(
82 "i", 2,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
84 DataType.UINT32:
ENTRY(
85 "I", 2,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
87 DataType.FLOAT32:
ENTRY(
88 "f", 2,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
90 DataType.INT64:
ENTRY(
91 "q", 4,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
93 DataType.UINT64:
ENTRY(
94 "Q", 4,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
96 DataType.FLOAT64:
ENTRY(
97 "d", 4,
PARM_IS_LEGAL(ILLEGAL, ILLEGAL, OPTIONAL, OPTIONAL, OPTIONAL)
99 DataType.STRING:
ENTRY(
100 "s", 0,
PARM_IS_LEGAL(DEMANDED, ILLEGAL, ILLEGAL, OPTIONAL, ILLEGAL)
102 DataType.CUSTOM:
ENTRY(
103 "?", 0,
PARM_IS_LEGAL(DEMANDED, DEMANDED, ILLEGAL, ILLEGAL, ILLEGAL)
109 hass: HomeAssistant, key: str, subs: list[str], err: str
111 """Create issue modbus style."""
117 severity=IssueSeverity.WARNING,
119 translation_placeholders={
123 "integration": DOMAIN,
126 learn_more_url=
"https://www.home-assistant.io/integrations/modbus",
132 """Sensor schema validator."""
134 name = config[CONF_NAME]
135 data_type = config[CONF_DATA_TYPE]
136 if data_type ==
"int":
137 data_type = config[CONF_DATA_TYPE] = DataType.INT16
138 count = config.get(CONF_COUNT)
139 structure = config.get(CONF_STRUCTURE)
140 slave_count = config.get(CONF_SLAVE_COUNT, config.get(CONF_VIRTUAL_COUNT))
141 validator = DEFAULT_STRUCT_FORMAT[data_type].validate_parm
142 swap_type = config.get(CONF_SWAP)
144 CONF_SWAP_BYTE: validator.swap_byte,
145 CONF_SWAP_WORD: validator.swap_word,
146 CONF_SWAP_WORD_BYTE: validator.swap_word,
148 swap_type_validator = swap_dict[swap_type]
if swap_type
else OPTIONAL
150 (count, validator.count, CONF_COUNT),
151 (structure, validator.structure, CONF_STRUCTURE),
154 validator.slave_count,
155 f
"{CONF_VIRTUAL_COUNT} / {CONF_SLAVE_COUNT}:",
157 (swap_type, swap_type_validator, f
"{CONF_SWAP}:{swap_type}"),
160 if entry[1] == DEMANDED:
161 error = f
"{name}: `{entry[2]}` missing, demanded with `{CONF_DATA_TYPE}: {data_type}`"
162 raise vol.Invalid(error)
163 elif entry[1] == ILLEGAL:
164 error = f
"{name}: `{entry[2]}` illegal with `{CONF_DATA_TYPE}: {data_type}`"
165 raise vol.Invalid(error)
167 if config[CONF_DATA_TYPE] == DataType.CUSTOM:
168 assert isinstance(structure, str)
169 assert isinstance(count, int)
171 size = struct.calcsize(structure)
172 except struct.error
as err:
173 raise vol.Invalid(f
"{name}: error in structure format --> {err!s}")
from err
174 bytecount = count * 2
175 if bytecount != size:
177 f
"{name}: Size of structure is {size} bytes but `{CONF_COUNT}: {count}` is {bytecount} bytes"
180 if data_type != DataType.STRING:
181 config[CONF_COUNT] = DEFAULT_STRUCT_FORMAT[data_type].register_count
184 f
">{slave_count + 1}{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
187 structure = f
">{DEFAULT_STRUCT_FORMAT[data_type].struct_id}"
190 CONF_STRUCTURE: structure,
191 CONF_SWAP: swap_type,
196 """Check the number of registers for target temp. and coerce it to a list, if valid."""
197 if isinstance(value, int):
198 value = [value] * len(HVACMode)
201 if len(value) == len(HVACMode):
204 if isinstance(svalue, int)
is False:
211 f
"Invalid target temp register. Required type: integer, allowed 1 or list of {len(HVACMode)} registers"
216 """Convert nan string to number (can be hex string or int)."""
217 if isinstance(value, int):
221 except (TypeError, ValueError):
224 return int(value, 16)
225 except (TypeError, ValueError)
as err:
226 raise vol.Invalid(f
"invalid number {value}")
from err
230 """Control modbus climate fan mode values for duplicates."""
231 fan_modes: set[int] = set()
233 for key, value
in config[CONF_FAN_MODE_VALUES].items():
234 if value
in fan_modes:
235 warn = f
"Modbus fan mode {key} has a duplicate value {value}, not loaded, values must be unique!"
236 _LOGGER.warning(warn)
241 for key
in reversed(errors):
242 del config[CONF_FAN_MODE_VALUES][key]
247 """Control modbus climate swing mode values for duplicates."""
248 swing_modes: set[int] = set()
250 for key, value
in config[CONF_SWING_MODE_VALUES].items():
251 if value
in swing_modes:
252 warn = f
"Modbus swing mode {key} has a duplicate value {value}, not loaded, values must be unique!"
253 _LOGGER.warning(warn)
256 swing_modes.add(value)
258 for key
in reversed(errors):
259 del config[CONF_SWING_MODE_VALUES][key]
264 """Check if a register (CONF_ADRESS) is an int or a list having only 1 register."""
265 if isinstance(value, int)
and value >= 0:
268 if isinstance(value, list):
269 if (len(value) == 1)
and isinstance(value[0], int)
and value[0] >= 0:
273 f
"Invalid {CONF_ADDRESS} register for fan/swing mode. Required type: positive integer, allowed 1 or list of 1 register."
284 """Validate modbus entries."""
287 if hub[CONF_TYPE] == SERIAL
288 else f
"{hub[CONF_HOST]}_{hub[CONF_PORT]}"
290 if CONF_NAME
not in hub:
292 DEFAULT_HUB
if not hub_name_inx
else f
"{DEFAULT_HUB}_{hub_name_inx}"
297 "missing_modbus_name",
303 f
"Modbus host/port {host} is missing name, added {hub[CONF_NAME]}!",
305 name = hub[CONF_NAME]
306 if host
in hosts
or name
in hub_names:
309 "duplicate_modbus_entry",
315 f
"Modbus {name} host/port {host} is duplicate, not loaded!",
328 minimum_scan_interval: int,
332 """Validate entity."""
333 name = f
"{component}.{entity[CONF_NAME]}"
334 scan_interval = entity.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
335 if 0 < scan_interval < 5:
337 f
"{hub_name} {name} scan_interval is lower than 5 seconds, "
338 "which may cause Home Assistant stability issues"
341 entity[CONF_SCAN_INTERVAL] = scan_interval
342 minimum_scan_interval =
min(scan_interval, minimum_scan_interval)
343 if name
in ent_names:
346 "duplicate_entity_name",
348 f
"{hub_name}/{name}",
352 f
"Modbus {hub_name}/{name} is duplicate, second entry not loaded!",
360 """Do final config check."""
361 hosts: set[str] = set()
362 hub_names: set[str] = set()
364 minimum_scan_interval = 0
365 ent_names: set[str] = set()
366 ent_addr: set[str] = set()
369 while hub_inx < len(config):
370 hub = config[hub_inx]
374 minimum_scan_interval = 9999
376 for component, conf_key
in PLATFORMS:
377 if conf_key
not in hub:
381 entities = hub[conf_key]
382 while entity_inx < len(entities):
387 entities[entity_inx],
388 minimum_scan_interval,
392 del entities[entity_inx]
404 f
"Modbus {hub[CONF_NAME]} contain no entities, causing instability, entry not loaded",
408 if hub[CONF_TIMEOUT] >= minimum_scan_interval:
409 hub[CONF_TIMEOUT] = minimum_scan_interval - 1
411 "Modbus %s timeout is adjusted(%d) due to scan_interval",
dict check_config(HomeAssistant hass, dict config)
int nan_validator(Any value)
list hvac_fixedsize_reglist_validator(Any value)
bool validate_entity(HomeAssistant hass, str hub_name, str component, dict entity, int minimum_scan_interval, set[str] ent_names, set[str] ent_addr)
dict[str, Any] struct_validator(dict[str, Any] config)
Any register_int_list_validator(Any value)
bool validate_modbus(HomeAssistant hass, set[str] hosts, set[str] hub_names, dict hub, int hub_name_inx)
None modbus_create_issue(HomeAssistant hass, str key, list[str] subs, str err)
dict duplicate_swing_mode_validator(dict[str, Any] config)
dict duplicate_fan_mode_validator(dict[str, Any] config)
None async_create_issue(HomeAssistant hass, str entry_id)