1 """Number for Shelly."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from dataclasses
import dataclass
7 from typing
import TYPE_CHECKING, Any, Final, cast
9 from aioshelly.block_device
import Block
10 from aioshelly.const
import RPC_GENERATIONS
11 from aioshelly.exceptions
import DeviceConnectionError, InvalidAuthError
14 DOMAIN
as NUMBER_PLATFORM,
16 NumberEntityDescription,
17 NumberExtraStoredData,
27 from .const
import CONF_SLEEP_PERIOD, LOGGER, VIRTUAL_NUMBER_MODE_MAP
28 from .coordinator
import ShellyBlockCoordinator, ShellyConfigEntry, ShellyRpcCoordinator
30 BlockEntityDescription,
32 ShellyRpcAttributeEntity,
33 ShellySleepingBlockAttributeEntity,
34 async_setup_entry_attribute_entities,
35 async_setup_entry_rpc,
38 async_remove_orphaned_entities,
40 get_virtual_component_ids,
44 @dataclass(frozen=True, kw_only=True)
46 """Class to describe a BLOCK sensor."""
52 @dataclass(frozen=True, kw_only=True)
54 """Class to describe a RPC number entity."""
56 max_fn: Callable[[dict], float] |
None =
None
57 min_fn: Callable[[dict], float] |
None =
None
58 step_fn: Callable[[dict], float] |
None =
None
59 mode_fn: Callable[[dict], NumberMode] |
None =
None
62 NUMBERS: dict[tuple[str, str], BlockNumberDescription] = {
64 key=
"device|valvepos",
65 translation_key=
"valve_position",
66 name=
"Valve position",
67 native_unit_of_measurement=PERCENTAGE,
68 available=
lambda block: cast(int, block.valveError) != 1,
69 entity_category=EntityCategory.CONFIG,
73 mode=NumberMode.SLIDER,
74 rest_path=
"thermostat/0",
80 RPC_NUMBERS: Final = {
85 max_fn=
lambda config: config[
"max"],
86 min_fn=
lambda config: config[
"min"],
87 mode_fn=
lambda config: VIRTUAL_NUMBER_MODE_MAP.get(
88 config[
"meta"][
"ui"][
"view"], NumberMode.BOX
90 step_fn=
lambda config: config[
"meta"][
"ui"][
"step"],
92 unit=
lambda config: config[
"meta"][
"ui"][
"unit"]
93 if config[
"meta"][
"ui"][
"unit"]
101 config_entry: ShellyConfigEntry,
102 async_add_entities: AddEntitiesCallback,
104 """Set up numbers for device."""
106 coordinator = config_entry.runtime_data.rpc
110 hass, config_entry, async_add_entities, RPC_NUMBERS, RpcNumber
116 coordinator.device.config, NUMBER_PLATFORM
120 config_entry.entry_id,
128 if config_entry.data[CONF_SLEEP_PERIOD]:
139 """Represent a block sleeping number."""
141 entity_description: BlockNumberDescription
145 coordinator: ShellyBlockCoordinator,
148 description: BlockNumberDescription,
149 entry: RegistryEntry |
None =
None,
151 """Initialize the sleeping sensor."""
152 self.
restored_datarestored_data: NumberExtraStoredData |
None =
None
153 super().
__init__(coordinator, block, attribute, description, entry)
156 """Handle entity which will be added."""
162 """Return value of number."""
169 return cast(float, self.
restored_datarestored_data.native_value)
181 """Set block state (HTTP request)."""
182 LOGGER.debug(
"Setting state for entity %s, state: %s", self.
namename, params)
184 return await self.
coordinatorcoordinator.device.http_request(
"get", path, params)
185 except DeviceConnectionError
as err:
186 self.
coordinatorcoordinator.last_update_success =
False
188 f
"Setting state for entity {self.name} failed, state: {params}, error:"
191 except InvalidAuthError:
196 """Represent a RPC number entity."""
198 entity_description: RpcNumberDescription
202 coordinator: ShellyRpcCoordinator,
205 description: RpcNumberDescription,
207 """Initialize sensor."""
208 super().
__init__(coordinator, key, attribute, description)
210 if description.max_fn
is not None:
212 coordinator.device.config[key]
214 if description.min_fn
is not None:
216 coordinator.device.config[key]
218 if description.step_fn
is not None:
220 if description.mode_fn
is not None:
221 self.
_attr_mode_attr_mode = description.mode_fn(coordinator.device.config[key])
225 """Return value of number."""
232 """Change the value."""
233 await self.
call_rpccall_rpc(
"Number.Set", {
"id": self.
_id_id,
"value": value})
NumberExtraStoredData|None async_get_last_number_data(self)
StateType attribute_value(self)
StateType attribute_value(self)
Any call_rpc(self, str method, Any params)
Any _set_state_full_path(self, str path, Any params)
float|None native_value(self)
None async_set_native_value(self, float value)
None async_added_to_hass(self)
None __init__(self, ShellyBlockCoordinator coordinator, Block|None block, str attribute, BlockNumberDescription description, RegistryEntry|None entry=None)
None async_set_native_value(self, float value)
None __init__(self, ShellyRpcCoordinator coordinator, str key, str attribute, RpcNumberDescription description)
float|None native_value(self)
None async_write_ha_state(self)
str|UndefinedType|None name(self)
None async_shutdown_device_and_start_reauth(self)
None async_setup_entry_rpc(HomeAssistant hass, ShellyConfigEntry config_entry, AddEntitiesCallback async_add_entities, Mapping[str, RpcEntityDescription] sensors, Callable sensor_class)
None async_setup_entry_attribute_entities(HomeAssistant hass, ShellyConfigEntry config_entry, AddEntitiesCallback async_add_entities, Mapping[tuple[str, str], BlockEntityDescription] sensors, Callable sensor_class)
None async_setup_entry(HomeAssistant hass, ShellyConfigEntry config_entry, AddEntitiesCallback async_add_entities)
list[str] get_virtual_component_ids(dict[str, Any] config, str platform)
None async_remove_orphaned_entities(HomeAssistant hass, str config_entry_id, str mac, str platform, Iterable[str] keys, str|None key_suffix=None)
int get_device_entry_gen(ConfigEntry entry)