Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support to select an option from a list."""
2 
3 from __future__ import annotations
4 
5 import logging
6 from typing import Any, Self, cast
7 
8 import voluptuous as vol
9 
11  ATTR_CYCLE,
12  ATTR_OPTION,
13  ATTR_OPTIONS,
14  SERVICE_SELECT_FIRST,
15  SERVICE_SELECT_LAST,
16  SERVICE_SELECT_NEXT,
17  SERVICE_SELECT_OPTION,
18  SERVICE_SELECT_PREVIOUS,
19  SelectEntity,
20 )
21 from homeassistant.const import (
22  ATTR_EDITABLE,
23  CONF_ICON,
24  CONF_ID,
25  CONF_NAME,
26  SERVICE_RELOAD,
27 )
28 from homeassistant.core import HomeAssistant, ServiceCall, callback
29 from homeassistant.exceptions import HomeAssistantError
30 from homeassistant.helpers import collection
32 from homeassistant.helpers.entity_component import EntityComponent
33 from homeassistant.helpers.restore_state import RestoreEntity
35 from homeassistant.helpers.storage import Store
36 from homeassistant.helpers.typing import ConfigType, VolDictType
37 
38 _LOGGER = logging.getLogger(__name__)
39 
40 DOMAIN = "input_select"
41 
42 CONF_INITIAL = "initial"
43 CONF_OPTIONS = "options"
44 
45 SERVICE_SET_OPTIONS = "set_options"
46 STORAGE_KEY = DOMAIN
47 STORAGE_VERSION = 1
48 STORAGE_VERSION_MINOR = 2
49 
50 
51 def _unique(options: Any) -> Any:
52  try:
53  return vol.Unique()(options)
54  except vol.Invalid as exc:
55  raise HomeAssistantError("Duplicate options are not allowed") from exc
56 
57 
58 STORAGE_FIELDS: VolDictType = {
59  vol.Required(CONF_NAME): vol.All(str, vol.Length(min=1)),
60  vol.Required(CONF_OPTIONS): vol.All(
61  cv.ensure_list, vol.Length(min=1), _unique, [cv.string]
62  ),
63  vol.Optional(CONF_INITIAL): cv.string,
64  vol.Optional(CONF_ICON): cv.icon,
65 }
66 
67 
68 def _remove_duplicates(options: list[str], name: str | None) -> list[str]:
69  """Remove duplicated options."""
70  unique_options = list(dict.fromkeys(options))
71  # This check was added in 2022.3
72  # Reject YAML configured input_select with duplicates from 2022.6
73  if len(unique_options) != len(options):
74  _LOGGER.warning(
75  (
76  "Input select '%s' with options %s had duplicated options, the"
77  " duplicates have been removed"
78  ),
79  name or "<unnamed>",
80  options,
81  )
82  return unique_options
83 
84 
85 def _cv_input_select(cfg: dict[str, Any]) -> dict[str, Any]:
86  """Configure validation helper for input select (voluptuous)."""
87  options = cfg[CONF_OPTIONS]
88  initial = cfg.get(CONF_INITIAL)
89  if initial is not None and initial not in options:
90  raise vol.Invalid(
91  f"initial state {initial} is not part of the options: {','.join(options)}"
92  )
93  cfg[CONF_OPTIONS] = _remove_duplicates(options, cfg.get(CONF_NAME))
94  return cfg
95 
96 
97 CONFIG_SCHEMA = vol.Schema(
98  {
99  DOMAIN: cv.schema_with_slug_keys(
100  vol.All(
101  {
102  vol.Optional(CONF_NAME): cv.string,
103  vol.Required(CONF_OPTIONS): vol.All(
104  cv.ensure_list, vol.Length(min=1), [cv.string]
105  ),
106  vol.Optional(CONF_INITIAL): cv.string,
107  vol.Optional(CONF_ICON): cv.icon,
108  },
109  _cv_input_select,
110  )
111  )
112  },
113  extra=vol.ALLOW_EXTRA,
114 )
115 RELOAD_SERVICE_SCHEMA = vol.Schema({})
116 
117 
119  """Store entity registry data."""
120 
122  self, old_major_version: int, old_minor_version: int, old_data: dict[str, Any]
123  ) -> dict[str, Any]:
124  """Migrate to the new version."""
125  if old_major_version == 1:
126  if old_minor_version < 2:
127  for item in old_data["items"]:
128  options = item[ATTR_OPTIONS]
129  item[ATTR_OPTIONS] = _remove_duplicates(
130  options, item.get(CONF_NAME)
131  )
132  return old_data
133 
134 
135 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
136  """Set up an input select."""
137  component = EntityComponent[InputSelect](_LOGGER, DOMAIN, hass)
138 
139  id_manager = collection.IDManager()
140 
141  yaml_collection = collection.YamlCollection(
142  logging.getLogger(f"{__name__}.yaml_collection"), id_manager
143  )
144  collection.sync_entity_lifecycle(
145  hass, DOMAIN, DOMAIN, component, yaml_collection, InputSelect
146  )
147 
148  storage_collection = InputSelectStorageCollection(
150  hass, STORAGE_VERSION, STORAGE_KEY, minor_version=STORAGE_VERSION_MINOR
151  ),
152  id_manager,
153  )
154  collection.sync_entity_lifecycle(
155  hass, DOMAIN, DOMAIN, component, storage_collection, InputSelect
156  )
157 
158  await yaml_collection.async_load(
159  [{CONF_ID: id_, **cfg} for id_, cfg in config.get(DOMAIN, {}).items()]
160  )
161  await storage_collection.async_load()
162 
163  collection.DictStorageCollectionWebsocket(
164  storage_collection, DOMAIN, DOMAIN, STORAGE_FIELDS, STORAGE_FIELDS
165  ).async_setup(hass)
166 
167  async def reload_service_handler(service_call: ServiceCall) -> None:
168  """Reload yaml entities."""
169  conf = await component.async_prepare_reload(skip_reset=True)
170  if conf is None:
171  conf = {DOMAIN: {}}
172  await yaml_collection.async_load(
173  [{CONF_ID: id_, **cfg} for id_, cfg in conf.get(DOMAIN, {}).items()]
174  )
175 
177  hass,
178  DOMAIN,
179  SERVICE_RELOAD,
180  reload_service_handler,
181  schema=RELOAD_SERVICE_SCHEMA,
182  )
183 
184  component.async_register_entity_service(
185  SERVICE_SELECT_FIRST,
186  None,
187  InputSelect.async_first.__name__,
188  )
189 
190  component.async_register_entity_service(
191  SERVICE_SELECT_LAST,
192  None,
193  InputSelect.async_last.__name__,
194  )
195 
196  component.async_register_entity_service(
197  SERVICE_SELECT_NEXT,
198  {vol.Optional(ATTR_CYCLE, default=True): bool},
199  InputSelect.async_next.__name__,
200  )
201 
202  component.async_register_entity_service(
203  SERVICE_SELECT_OPTION,
204  {vol.Required(ATTR_OPTION): cv.string},
205  InputSelect.async_select_option.__name__,
206  )
207 
208  component.async_register_entity_service(
209  SERVICE_SELECT_PREVIOUS,
210  {vol.Optional(ATTR_CYCLE, default=True): bool},
211  InputSelect.async_previous.__name__,
212  )
213 
214  component.async_register_entity_service(
215  SERVICE_SET_OPTIONS,
216  {
217  vol.Required(ATTR_OPTIONS): vol.All(
218  cv.ensure_list, vol.Length(min=1), [cv.string]
219  )
220  },
221  "async_set_options",
222  )
223 
224  return True
225 
226 
227 class InputSelectStorageCollection(collection.DictStorageCollection):
228  """Input storage based collection."""
229 
230  CREATE_UPDATE_SCHEMA = vol.Schema(vol.All(STORAGE_FIELDS, _cv_input_select))
231 
232  async def _process_create_data(self, data: dict[str, Any]) -> dict[str, Any]:
233  """Validate the config is valid."""
234  return cast(dict[str, Any], self.CREATE_UPDATE_SCHEMACREATE_UPDATE_SCHEMA(data))
235 
236  @callback
237  def _get_suggested_id(self, info: dict[str, Any]) -> str:
238  """Suggest an ID based on the config."""
239  return cast(str, info[CONF_NAME])
240 
241  async def _update_data(
242  self, item: dict[str, Any], update_data: dict[str, Any]
243  ) -> dict[str, Any]:
244  """Return a new updated data object."""
245  update_data = self.CREATE_UPDATE_SCHEMACREATE_UPDATE_SCHEMA(update_data)
246  return {CONF_ID: item[CONF_ID]} | update_data
247 
248 
249 # pylint: disable-next=hass-enforce-class-module
250 class InputSelect(collection.CollectionEntity, SelectEntity, RestoreEntity):
251  """Representation of a select input."""
252 
253  _entity_component_unrecorded_attributes = (
254  SelectEntity._entity_component_unrecorded_attributes - {ATTR_OPTIONS} # noqa: SLF001
255  )
256  _unrecorded_attributes = frozenset({ATTR_EDITABLE})
257 
258  _attr_should_poll = False
259  editable: bool
260 
261  def __init__(self, config: ConfigType) -> None:
262  """Initialize a select input."""
263  self._attr_current_option_attr_current_option = config.get(CONF_INITIAL)
264  self._attr_icon_attr_icon = config.get(CONF_ICON)
265  self._attr_name_attr_name = config.get(CONF_NAME)
266  self._attr_options_attr_options = config[CONF_OPTIONS]
267  self._attr_unique_id_attr_unique_id = config[CONF_ID]
268 
269  @classmethod
270  def from_storage(cls, config: ConfigType) -> Self:
271  """Return entity instance initialized from storage."""
272  input_select = cls(config)
273  input_select.editable = True
274  return input_select
275 
276  @classmethod
277  def from_yaml(cls, config: ConfigType) -> Self:
278  """Return entity instance initialized from yaml."""
279  input_select = cls(config)
280  input_select.entity_id = f"{DOMAIN}.{config[CONF_ID]}"
281  input_select.editable = False
282  return input_select
283 
284  async def async_added_to_hass(self) -> None:
285  """Run when entity about to be added."""
286  await super().async_added_to_hass()
287  if self.current_optioncurrent_option is not None:
288  return
289 
290  state = await self.async_get_last_stateasync_get_last_state()
291  if not state or state.state not in self.optionsoptions:
292  self._attr_current_option_attr_current_option = self.optionsoptions[0]
293  else:
294  self._attr_current_option_attr_current_option = state.state
295 
296  @property
297  def extra_state_attributes(self) -> dict[str, bool]:
298  """Return the state attributes."""
299  return {ATTR_EDITABLE: self.editable}
300 
301  async def async_select_option(self, option: str) -> None:
302  """Select new option."""
303  if option not in self.optionsoptions:
304  raise HomeAssistantError(
305  f"Invalid option: {option} (possible options: {', '.join(self.options)})"
306  )
307  self._attr_current_option_attr_current_option = option
308  self.async_write_ha_stateasync_write_ha_state()
309 
310  async def async_set_options(self, options: list[str]) -> None:
311  """Set options."""
312  unique_options = list(dict.fromkeys(options))
313  if len(unique_options) != len(options):
314  raise HomeAssistantError(f"Duplicated options: {options}")
315 
316  self._attr_options_attr_options = options
317 
318  if self.current_optioncurrent_option not in self.optionsoptions:
319  _LOGGER.warning(
320  "Current option: %s no longer valid (possible options: %s)",
321  self.current_optioncurrent_option,
322  ", ".join(self.optionsoptions),
323  )
324  self._attr_current_option_attr_current_option = options[0]
325 
326  self.async_write_ha_stateasync_write_ha_state()
327 
328  async def async_update_config(self, config: ConfigType) -> None:
329  """Handle when the config is updated."""
330  self._attr_icon_attr_icon = config.get(CONF_ICON)
331  self._attr_name_attr_name = config.get(CONF_NAME)
332  self._attr_options_attr_options = config[CONF_OPTIONS]
333  self.async_write_ha_stateasync_write_ha_state()
dict[str, Any] _update_data(self, dict[str, Any] item, dict[str, Any] update_data)
Definition: __init__.py:243
dict[str, Any] _process_create_data(self, dict[str, Any] data)
Definition: __init__.py:232
dict[str, Any] _async_migrate_func(self, int old_major_version, int old_minor_version, dict[str, Any] old_data)
Definition: __init__.py:123
None __init__(self, ConfigType config)
Definition: __init__.py:261
None async_update_config(self, ConfigType config)
Definition: __init__.py:328
Self from_yaml(cls, ConfigType config)
Definition: __init__.py:277
None async_set_options(self, list[str] options)
Definition: __init__.py:310
Self from_storage(cls, ConfigType config)
Definition: __init__.py:270
list[str] _remove_duplicates(list[str] options, str|None name)
Definition: __init__.py:68
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:135
dict[str, Any] _cv_input_select(dict[str, Any] cfg)
Definition: __init__.py:85
None async_register_admin_service(HomeAssistant hass, str domain, str service, Callable[[ServiceCall], Awaitable[None]|None] service_func, VolSchemaType schema=vol.Schema({}, extra=vol.PREVENT_EXTRA))
Definition: service.py:1121