1 """Base implementation for all modbus platforms."""
3 from __future__
import annotations
5 from abc
import abstractmethod
6 from collections.abc
import Callable
7 from datetime
import datetime, timedelta
10 from typing
import Any, cast
37 CALL_TYPE_REGISTER_HOLDING,
38 CALL_TYPE_REGISTER_INPUT,
40 CALL_TYPE_WRITE_COILS,
41 CALL_TYPE_WRITE_REGISTER,
42 CALL_TYPE_WRITE_REGISTERS,
44 CALL_TYPE_X_REGISTER_HOLDINGS,
68 from .modbus
import ModbusHub
70 _LOGGER = logging.getLogger(__name__)
74 """Base for readonly platforms."""
77 self, hass: HomeAssistant, hub: ModbusHub, entry: dict[str, Any]
79 """Initialize the Modbus binary sensor."""
82 self.
_slave_slave = entry.get(CONF_SLAVE)
or entry.get(CONF_DEVICE_ADDRESS, 0)
85 self._value: str |
None =
None
88 self.
_cancel_timer_cancel_timer: Callable[[],
None] |
None =
None
89 self.
_cancel_call_cancel_call: Callable[[],
None] |
None =
None
98 def get_optional_numeric_config(config_name: str) -> int | float |
None:
99 if (val := entry.get(config_name))
is None:
103 ), f
"Expected float or int but {config_name} was {type(val)}"
106 self.
_min_value_min_value = get_optional_numeric_config(CONF_MIN_VALUE)
107 self.
_max_value_max_value = get_optional_numeric_config(CONF_MAX_VALUE)
109 self.
_zero_suppress_zero_suppress = get_optional_numeric_config(CONF_ZERO_SUPPRESS)
113 """Virtual function to be overwritten."""
117 """Remote start entity."""
131 """Remote stop entity."""
143 """Handle entity which will be added."""
154 """Base class representing a sensor/climate."""
156 def __init__(self, hass: HomeAssistant, hub: ModbusHub, config: dict) ->
None:
157 """Initialize the switch."""
161 self._structure: str = config[CONF_STRUCTURE]
164 self.
_slave_count_slave_count = config.get(CONF_SLAVE_COUNT)
or config.get(
165 CONF_VIRTUAL_COUNT, 0
179 self.
_precision_precision = config.get(CONF_PRECISION, 0)
184 """Do swap as needed."""
190 swapped.extend(self.
_swap_registers_swap_registers(registers[inx:inx2], 0))
192 if self.
_swap_swap
in (CONF_SWAP_BYTE, CONF_SWAP_WORD_BYTE):
194 for i, register
in enumerate(registers):
195 registers[i] = int.from_bytes(
196 register.to_bytes(2, byteorder=
"little"),
200 if self.
_swap_swap
in (CONF_SWAP_WORD, CONF_SWAP_WORD_BYTE):
206 """Process value from sensor with NaN handling, scaling, offset, min/max etc."""
209 if isinstance(entry, bytes):
210 return entry.decode()
214 val: float | int = self.
_scale_scale * entry + self.
_offset_offset
222 return str(round(val))
223 return f
"{float(val):.{self._precision}f}"
226 """Convert registers to proper result."""
230 byte_string = b
"".join([x.to_bytes(2, byteorder=
"big")
for x
in registers])
231 if self.
_data_type_data_type == DataType.STRING:
232 return byte_string.decode()
233 if byte_string == b
"nan\x00":
237 val = struct.unpack(self._structure, byte_string)
238 except struct.error
as err:
239 recv_size = len(registers) * 2
240 msg = f
"Received {recv_size} bytes, unpack error {err}"
251 v_result.append(
str(v_temp))
252 return ",".join(map(str, v_result))
259 """Base class representing a Modbus switch."""
261 def __init__(self, hass: HomeAssistant, hub: ModbusHub, config: dict) ->
None:
262 """Initialize the switch."""
263 config[CONF_INPUT_TYPE] =
""
267 CALL_TYPE_REGISTER_HOLDING: (
268 CALL_TYPE_REGISTER_HOLDING,
269 CALL_TYPE_WRITE_REGISTER,
271 CALL_TYPE_DISCRETE: (
275 CALL_TYPE_REGISTER_INPUT: (
276 CALL_TYPE_REGISTER_INPUT,
279 CALL_TYPE_COIL: (CALL_TYPE_COIL, CALL_TYPE_WRITE_COIL),
280 CALL_TYPE_X_COILS: (CALL_TYPE_COIL, CALL_TYPE_WRITE_COILS),
281 CALL_TYPE_X_REGISTER_HOLDINGS: (
282 CALL_TYPE_REGISTER_HOLDING,
283 CALL_TYPE_WRITE_REGISTERS,
286 self.
_write_type_write_type = cast(str, convert[config[CONF_WRITE_TYPE]][1])
289 if CONF_VERIFY
in config:
290 if config[CONF_VERIFY]
is None:
291 config[CONF_VERIFY] = {}
295 CONF_ADDRESS, config[CONF_ADDRESS]
298 config[CONF_VERIFY].
get(CONF_INPUT_TYPE, config[CONF_WRITE_TYPE])
308 """Handle entity which will be added."""
311 if state.state == STATE_ON:
313 elif state.state == STATE_OFF:
317 """Evaluate switch result."""
318 result = await self.
_hub_hub.async_pb_call(
338 """Set switch off."""
342 """Update the entity state."""
354 result = await self.
_hub_hub.async_pb_call(
364 if self.
_verify_type_verify_type
in (CALL_TYPE_COIL, CALL_TYPE_DISCRETE):
367 value =
int(result.registers[0])
372 elif value
is not None:
375 "Unexpected response from modbus device slave %s register %s,"
None async_added_to_hass(self)
None async_update(self, datetime|None now=None)
None async_turn_off(self, **Any kwargs)
None __init__(self, HomeAssistant hass, ModbusHub hub, dict config)
None async_turn(self, int command)
None async_write_ha_state(self)
None async_on_remove(self, CALLBACK_TYPE func)
State|None async_get_last_state(self)
web.Response get(self, web.Request request, str config_key)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)