Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support to manage a shopping list."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from http import HTTPStatus
7 import logging
8 from typing import Any, cast
9 import uuid
10 
11 from aiohttp import web
12 import voluptuous as vol
13 
14 from homeassistant import config_entries
15 from homeassistant.components import http, websocket_api
16 from homeassistant.components.http.data_validator import RequestDataValidator
17 from homeassistant.config_entries import ConfigEntry
18 from homeassistant.const import ATTR_NAME, Platform
19 from homeassistant.core import Context, HomeAssistant, ServiceCall, callback
21 from homeassistant.helpers.json import save_json
22 from homeassistant.helpers.typing import ConfigType
23 from homeassistant.util.json import JsonValueType, load_json_array
24 
25 from .const import (
26  ATTR_REVERSE,
27  DEFAULT_REVERSE,
28  DOMAIN,
29  EVENT_SHOPPING_LIST_UPDATED,
30  SERVICE_ADD_ITEM,
31  SERVICE_CLEAR_COMPLETED_ITEMS,
32  SERVICE_COMPLETE_ALL,
33  SERVICE_COMPLETE_ITEM,
34  SERVICE_INCOMPLETE_ALL,
35  SERVICE_INCOMPLETE_ITEM,
36  SERVICE_REMOVE_ITEM,
37  SERVICE_SORT,
38 )
39 
40 PLATFORMS = [Platform.TODO]
41 
42 ATTR_COMPLETE = "complete"
43 
44 _LOGGER = logging.getLogger(__name__)
45 CONFIG_SCHEMA = vol.Schema({DOMAIN: {}}, extra=vol.ALLOW_EXTRA)
46 ITEM_UPDATE_SCHEMA = vol.Schema({ATTR_COMPLETE: bool, ATTR_NAME: str})
47 PERSISTENCE = ".shopping_list.json"
48 
49 SERVICE_ITEM_SCHEMA = vol.Schema({vol.Required(ATTR_NAME): cv.string})
50 SERVICE_LIST_SCHEMA = vol.Schema({})
51 SERVICE_SORT_SCHEMA = vol.Schema(
52  {vol.Optional(ATTR_REVERSE, default=DEFAULT_REVERSE): bool}
53 )
54 
55 
56 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
57  """Initialize the shopping list."""
58 
59  if DOMAIN not in config:
60  return True
61 
62  hass.async_create_task(
63  hass.config_entries.flow.async_init(
64  DOMAIN, context={"source": config_entries.SOURCE_IMPORT}
65  )
66  )
67 
68  return True
69 
70 
71 async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
72  """Set up shopping list from config flow."""
73 
74  async def add_item_service(call: ServiceCall) -> None:
75  """Add an item with `name`."""
76  data = hass.data[DOMAIN]
77  await data.async_add(call.data[ATTR_NAME])
78 
79  async def remove_item_service(call: ServiceCall) -> None:
80  """Remove the first item with matching `name`."""
81  data = hass.data[DOMAIN]
82  name = call.data[ATTR_NAME]
83 
84  try:
85  item = [item for item in data.items if item["name"] == name][0]
86  except IndexError:
87  _LOGGER.error("Removing of item failed: %s cannot be found", name)
88  else:
89  await data.async_remove(item["id"])
90 
91  async def complete_item_service(call: ServiceCall) -> None:
92  """Mark the first item with matching `name` as completed."""
93  data = hass.data[DOMAIN]
94  name = call.data[ATTR_NAME]
95 
96  try:
97  item = [item for item in data.items if item["name"] == name][0]
98  except IndexError:
99  _LOGGER.error("Updating of item failed: %s cannot be found", name)
100  else:
101  await data.async_update(item["id"], {"name": name, "complete": True})
102 
103  async def incomplete_item_service(call: ServiceCall) -> None:
104  """Mark the first item with matching `name` as incomplete."""
105  data = hass.data[DOMAIN]
106  name = call.data[ATTR_NAME]
107 
108  try:
109  item = [item for item in data.items if item["name"] == name][0]
110  except IndexError:
111  _LOGGER.error("Restoring of item failed: %s cannot be found", name)
112  else:
113  await data.async_update(item["id"], {"name": name, "complete": False})
114 
115  async def complete_all_service(call: ServiceCall) -> None:
116  """Mark all items in the list as complete."""
117  await data.async_update_list({"complete": True})
118 
119  async def incomplete_all_service(call: ServiceCall) -> None:
120  """Mark all items in the list as incomplete."""
121  await data.async_update_list({"complete": False})
122 
123  async def clear_completed_items_service(call: ServiceCall) -> None:
124  """Clear all completed items from the list."""
125  await data.async_clear_completed()
126 
127  async def sort_list_service(call: ServiceCall) -> None:
128  """Sort all items by name."""
129  await data.async_sort(call.data[ATTR_REVERSE])
130 
131  data = hass.data[DOMAIN] = ShoppingData(hass)
132  await data.async_load()
133 
134  hass.services.async_register(
135  DOMAIN, SERVICE_ADD_ITEM, add_item_service, schema=SERVICE_ITEM_SCHEMA
136  )
137  hass.services.async_register(
138  DOMAIN, SERVICE_REMOVE_ITEM, remove_item_service, schema=SERVICE_ITEM_SCHEMA
139  )
140  hass.services.async_register(
141  DOMAIN, SERVICE_COMPLETE_ITEM, complete_item_service, schema=SERVICE_ITEM_SCHEMA
142  )
143  hass.services.async_register(
144  DOMAIN,
145  SERVICE_INCOMPLETE_ITEM,
146  incomplete_item_service,
147  schema=SERVICE_ITEM_SCHEMA,
148  )
149  hass.services.async_register(
150  DOMAIN,
151  SERVICE_COMPLETE_ALL,
152  complete_all_service,
153  schema=SERVICE_LIST_SCHEMA,
154  )
155  hass.services.async_register(
156  DOMAIN,
157  SERVICE_INCOMPLETE_ALL,
158  incomplete_all_service,
159  schema=SERVICE_LIST_SCHEMA,
160  )
161  hass.services.async_register(
162  DOMAIN,
163  SERVICE_CLEAR_COMPLETED_ITEMS,
164  clear_completed_items_service,
165  schema=SERVICE_LIST_SCHEMA,
166  )
167  hass.services.async_register(
168  DOMAIN,
169  SERVICE_SORT,
170  sort_list_service,
171  schema=SERVICE_SORT_SCHEMA,
172  )
173 
174  hass.http.register_view(ShoppingListView)
175  hass.http.register_view(CreateShoppingListItemView)
176  hass.http.register_view(UpdateShoppingListItemView)
177  hass.http.register_view(ClearCompletedItemsView)
178 
179  websocket_api.async_register_command(hass, websocket_handle_items)
180  websocket_api.async_register_command(hass, websocket_handle_add)
181  websocket_api.async_register_command(hass, websocket_handle_remove)
182  websocket_api.async_register_command(hass, websocket_handle_update)
183  websocket_api.async_register_command(hass, websocket_handle_clear)
184  websocket_api.async_register_command(hass, websocket_handle_reorder)
185 
186  await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
187 
188  return True
189 
190 
191 class NoMatchingShoppingListItem(Exception):
192  """No matching item could be found in the shopping list."""
193 
194 
196  """Class to hold shopping list data."""
197 
198  def __init__(self, hass: HomeAssistant) -> None:
199  """Initialize the shopping list."""
200  self.hasshass = hass
201  self.itemsitems: list[dict[str, JsonValueType]] = []
202  self._listeners: list[Callable[[], None]] = []
203 
204  async def async_add(
205  self, name: str | None, complete: bool = False, context: Context | None = None
206  ) -> dict[str, JsonValueType]:
207  """Add a shopping list item."""
208  item: dict[str, JsonValueType] = {
209  "name": name,
210  "id": uuid.uuid4().hex,
211  "complete": complete,
212  }
213  self.itemsitems.append(item)
214  await self.hasshass.async_add_executor_job(self.savesave)
215  self._async_notify_async_notify()
216  self.hasshass.bus.async_fire(
217  EVENT_SHOPPING_LIST_UPDATED,
218  {"action": "add", "item": item},
219  context=context,
220  )
221  return item
222 
223  async def async_remove(
224  self, item_id: str, context: Context | None = None
225  ) -> dict[str, JsonValueType] | None:
226  """Remove a shopping list item."""
227  removed = await self.async_remove_itemsasync_remove_items(
228  item_ids=set({item_id}), context=context
229  )
230  return next(iter(removed), None)
231 
233  self, item_ids: set[str], context: Context | None = None
234  ) -> list[dict[str, JsonValueType]]:
235  """Remove a shopping list item."""
236  items_dict: dict[str, dict[str, JsonValueType]] = {}
237  for itm in self.itemsitems:
238  item_id = cast(str, itm["id"])
239  items_dict[item_id] = itm
240  removed = []
241  for item_id in item_ids:
242  _LOGGER.debug(
243  "Removing %s",
244  )
245  if not (item := items_dict.pop(item_id, None)):
247  "Item '{item_id}' not found in shopping list"
248  )
249  removed.append(item)
250  self.itemsitems = list(items_dict.values())
251  await self.hasshass.async_add_executor_job(self.savesave)
252  self._async_notify_async_notify()
253  for item in removed:
254  self.hasshass.bus.async_fire(
255  EVENT_SHOPPING_LIST_UPDATED,
256  {"action": "remove", "item": item},
257  context=context,
258  )
259  return removed
260 
261  async def async_update(
262  self, item_id: str | None, info: dict[str, Any], context: Context | None = None
263  ) -> dict[str, JsonValueType]:
264  """Update a shopping list item."""
265  item = next((itm for itm in self.itemsitems if itm["id"] == item_id), None)
266 
267  if item is None:
268  raise NoMatchingShoppingListItem
269 
270  info = ITEM_UPDATE_SCHEMA(info)
271  item.update(info)
272  await self.hasshass.async_add_executor_job(self.savesave)
273  self._async_notify_async_notify()
274  self.hasshass.bus.async_fire(
275  EVENT_SHOPPING_LIST_UPDATED,
276  {"action": "update", "item": item},
277  context=context,
278  )
279  return item
280 
281  async def async_clear_completed(self, context: Context | None = None) -> None:
282  """Clear completed items."""
283  self.itemsitems = [itm for itm in self.itemsitems if not itm["complete"]]
284  await self.hasshass.async_add_executor_job(self.savesave)
285  self._async_notify_async_notify()
286  self.hasshass.bus.async_fire(
287  EVENT_SHOPPING_LIST_UPDATED,
288  {"action": "clear"},
289  context=context,
290  )
291 
292  async def async_update_list(
293  self, info: dict[str, JsonValueType], context: Context | None = None
294  ) -> list[dict[str, JsonValueType]]:
295  """Update all items in the list."""
296  for item in self.itemsitems:
297  item.update(info)
298  await self.hasshass.async_add_executor_job(self.savesave)
299  self._async_notify_async_notify()
300  self.hasshass.bus.async_fire(
301  EVENT_SHOPPING_LIST_UPDATED,
302  {"action": "update_list"},
303  context=context,
304  )
305  return self.itemsitems
306 
307  @callback
309  self, item_ids: list[str], context: Context | None = None
310  ) -> None:
311  """Reorder items."""
312  # The array for sorted items.
313  new_items = []
314  all_items_mapping = {item["id"]: item for item in self.itemsitems}
315  # Append items by the order of passed in array.
316  for item_id in item_ids:
317  if item_id not in all_items_mapping:
318  raise NoMatchingShoppingListItem
319  new_items.append(all_items_mapping[item_id])
320  # Remove the item from mapping after it's appended in the result array.
321  del all_items_mapping[item_id]
322  # Append the rest of the items
323  for value in all_items_mapping.values():
324  # All the unchecked items must be passed in the item_ids array,
325  # so all items left in the mapping should be checked items.
326  if value["complete"] is False:
327  raise vol.Invalid(
328  "The item ids array doesn't contain all the unchecked shopping list"
329  " items."
330  )
331  new_items.append(value)
332  self.itemsitems = new_items
333  self.hasshass.async_add_executor_job(self.savesave)
334  self._async_notify_async_notify()
335  self.hasshass.bus.async_fire(
336  EVENT_SHOPPING_LIST_UPDATED,
337  {"action": "reorder"},
338  context=context,
339  )
340 
341  async def async_move_item(self, uid: str, previous: str | None = None) -> None:
342  """Re-order a shopping list item."""
343  if uid == previous:
344  return
345  item_idx = {cast(str, itm["id"]): idx for idx, itm in enumerate(self.itemsitems)}
346  if uid not in item_idx:
347  raise NoMatchingShoppingListItem(f"Item '{uid}' not found in shopping list")
348  if previous and previous not in item_idx:
350  f"Item '{previous}' not found in shopping list"
351  )
352  dst_idx = item_idx[previous] + 1 if previous else 0
353  src_idx = item_idx[uid]
354  src_item = self.itemsitems.pop(src_idx)
355  if dst_idx > src_idx:
356  dst_idx -= 1
357  self.itemsitems.insert(dst_idx, src_item)
358  await self.hasshass.async_add_executor_job(self.savesave)
359  self._async_notify_async_notify()
360  self.hasshass.bus.async_fire(
361  EVENT_SHOPPING_LIST_UPDATED,
362  {"action": "reorder"},
363  )
364 
365  async def async_sort(
366  self, reverse: bool = False, context: Context | None = None
367  ) -> None:
368  """Sort items by name."""
369  self.itemsitems = sorted(self.itemsitems, key=lambda item: item["name"], reverse=reverse) # type: ignore[arg-type,return-value]
370  self.hasshass.async_add_executor_job(self.savesave)
371  self._async_notify_async_notify()
372  self.hasshass.bus.async_fire(
373  EVENT_SHOPPING_LIST_UPDATED,
374  {"action": "sorted"},
375  context=context,
376  )
377 
378  async def async_load(self) -> None:
379  """Load items."""
380 
381  def load() -> list[dict[str, JsonValueType]]:
382  """Load the items synchronously."""
383  return cast(
384  list[dict[str, JsonValueType]],
385  load_json_array(self.hasshass.config.path(PERSISTENCE)),
386  )
387 
388  self.itemsitems = await self.hasshass.async_add_executor_job(load)
389 
390  def save(self) -> None:
391  """Save the items."""
392  save_json(self.hasshass.config.path(PERSISTENCE), self.itemsitems)
393 
394  def async_add_listener(self, cb: Callable[[], None]) -> Callable[[], None]:
395  """Add a listener to notify when data is updated."""
396 
397  def unsub() -> None:
398  self._listeners.remove(cb)
399 
400  self._listeners.append(cb)
401  return unsub
402 
403  def _async_notify(self) -> None:
404  """Notify all listeners that data has been updated."""
405  for listener in self._listeners:
406  listener()
407 
408 
409 class ShoppingListView(http.HomeAssistantView):
410  """View to retrieve shopping list content."""
411 
412  url = "/api/shopping_list"
413  name = "api:shopping_list"
414 
415  @callback
416  def get(self, request: web.Request) -> web.Response:
417  """Retrieve shopping list items."""
418  return self.json(request.app[http.KEY_HASS].data[DOMAIN].items)
419 
420 
421 class UpdateShoppingListItemView(http.HomeAssistantView):
422  """View to retrieve shopping list content."""
423 
424  url = "/api/shopping_list/item/{item_id}"
425  name = "api:shopping_list:item:id"
426 
427  async def post(self, request: web.Request, item_id: str) -> web.Response:
428  """Update a shopping list item."""
429  data = await request.json()
430  hass = request.app[http.KEY_HASS]
431 
432  try:
433  item = await hass.data[DOMAIN].async_update(item_id, data)
434  return self.json(item)
435  except NoMatchingShoppingListItem:
436  return self.json_message("Item not found", HTTPStatus.NOT_FOUND)
437  except vol.Invalid:
438  return self.json_message("Item not found", HTTPStatus.BAD_REQUEST)
439 
440 
441 class CreateShoppingListItemView(http.HomeAssistantView):
442  """View to retrieve shopping list content."""
443 
444  url = "/api/shopping_list/item"
445  name = "api:shopping_list:item"
446 
447  @RequestDataValidator(vol.Schema({vol.Required("name"): str}))
448  async def post(self, request: web.Request, data: dict[str, str]) -> web.Response:
449  """Create a new shopping list item."""
450  hass = request.app[http.KEY_HASS]
451  item = await hass.data[DOMAIN].async_add(data["name"])
452  return self.json(item)
453 
454 
455 class ClearCompletedItemsView(http.HomeAssistantView):
456  """View to retrieve shopping list content."""
457 
458  url = "/api/shopping_list/clear_completed"
459  name = "api:shopping_list:clear_completed"
460 
461  async def post(self, request: web.Request) -> web.Response:
462  """Retrieve if API is running."""
463  hass = request.app[http.KEY_HASS]
464  await hass.data[DOMAIN].async_clear_completed()
465  return self.json_message("Cleared completed items.")
466 
467 
468 @callback
469 @websocket_api.websocket_command({vol.Required("type"): "shopping_list/items"})
471  hass: HomeAssistant,
472  connection: websocket_api.ActiveConnection,
473  msg: dict[str, Any],
474 ) -> None:
475  """Handle getting shopping_list items."""
476  connection.send_message(
477  websocket_api.result_message(msg["id"], hass.data[DOMAIN].items)
478  )
479 
480 
481 @websocket_api.websocket_command( {vol.Required("type"): "shopping_list/items/add", vol.Required("name"): str}
482 )
483 @websocket_api.async_response
484 async def websocket_handle_add(
485  hass: HomeAssistant,
486  connection: websocket_api.ActiveConnection,
487  msg: dict[str, Any],
488 ) -> None:
489  """Handle adding item to shopping_list."""
490  item = await hass.data[DOMAIN].async_add(
491  msg["name"], context=connection.context(msg)
492  )
493  connection.send_message(websocket_api.result_message(msg["id"], item))
494 
495 
496 @websocket_api.websocket_command( {vol.Required("type"): "shopping_list/items/remove", vol.Required("item_id"): str}
497 )
498 @websocket_api.async_response
499 async def websocket_handle_remove(
500  hass: HomeAssistant,
502  msg: dict[str, Any],
503 ) -> None:
504  """Handle removing shopping_list item."""
505  msg_id = msg.pop("id")
506  item_id = msg.pop("item_id")
507  msg.pop("type")
508 
509  try:
510  item = await hass.data[DOMAIN].async_remove(item_id, connection.context(msg))
511  except NoMatchingShoppingListItem:
512  connection.send_message(
513  websocket_api.error_message(msg_id, "item_not_found", "Item not found")
514  )
515  return
516 
517  connection.send_message(websocket_api.result_message(msg_id, item))
518 
519 
520 @websocket_api.websocket_command( { vol.Required("type"): "shopping_list/items/update",
521  vol.Required("item_id"): str,
522  vol.Optional("name"): str,
523  vol.Optional("complete"): bool,
524  }
525 )
526 @websocket_api.async_response
527 async def websocket_handle_update(
528  hass: HomeAssistant,
529  connection: websocket_api.ActiveConnection,
530  msg: dict[str, Any],
531 ) -> None:
532  """Handle updating shopping_list item."""
533  msg_id = msg.pop("id")
534  item_id = msg.pop("item_id")
535  msg.pop("type")
536  data = msg
537 
538  try:
539  item = await hass.data[DOMAIN].async_update(
540  item_id, data, connection.context(msg)
541  )
542  except NoMatchingShoppingListItem:
543  connection.send_message(
544  websocket_api.error_message(msg_id, "item_not_found", "Item not found")
545  )
546  return
547 
548  connection.send_message(websocket_api.result_message(msg_id, item))
549 
550 
551 @websocket_api.websocket_command({vol.Required("type"): "shopping_list/items/clear"})
552 @websocket_api.async_response
553 async def websocket_handle_clear(
554  hass: HomeAssistant,
555  connection: websocket_api.ActiveConnection,
556  msg: dict[str, Any],
557 ) -> None:
558  """Handle clearing shopping_list items."""
559  await hass.data[DOMAIN].async_clear_completed(connection.context(msg))
560  connection.send_message(websocket_api.result_message(msg["id"]))
561 
562 
563 @websocket_api.websocket_command( { vol.Required("type"): "shopping_list/items/reorder",
564  vol.Required("item_ids"): [str],
565  }
566 )
568  hass: HomeAssistant,
569  connection: websocket_api.ActiveConnection,
570  msg: dict[str, Any],
571 ) -> None:
572  """Handle reordering shopping_list items."""
573  msg_id = msg.pop("id")
574  try:
575  hass.data[DOMAIN].async_reorder(msg.pop("item_ids"), connection.context(msg))
576  except NoMatchingShoppingListItem:
577  connection.send_error(
578  msg_id,
579  websocket_api.ERR_NOT_FOUND,
580  "One or more item id(s) not found.",
581  )
582  return
583  except vol.Invalid as err:
584  connection.send_error(msg_id, websocket_api.ERR_INVALID_FORMAT, f"{err}")
585  return
586 
587  connection.send_result(msg_id)
588 
web.Response post(self, web.Request request)
Definition: __init__.py:461
web.Response post(self, web.Request request, dict[str, str] data)
Definition: __init__.py:448
dict[str, JsonValueType] async_update(self, str|None item_id, dict[str, Any] info, Context|None context=None)
Definition: __init__.py:263
Callable[[], None] async_add_listener(self, Callable[[], None] cb)
Definition: __init__.py:394
None async_reorder(self, list[str] item_ids, Context|None context=None)
Definition: __init__.py:310
None async_sort(self, bool reverse=False, Context|None context=None)
Definition: __init__.py:367
dict[str, JsonValueType] async_add(self, str|None name, bool complete=False, Context|None context=None)
Definition: __init__.py:206
list[dict[str, JsonValueType]] async_remove_items(self, set[str] item_ids, Context|None context=None)
Definition: __init__.py:234
dict[str, JsonValueType]|None async_remove(self, str item_id, Context|None context=None)
Definition: __init__.py:225
list[dict[str, JsonValueType]] async_update_list(self, dict[str, JsonValueType] info, Context|None context=None)
Definition: __init__.py:294
None async_clear_completed(self, Context|None context=None)
Definition: __init__.py:281
None __init__(self, HomeAssistant hass)
Definition: __init__.py:198
None async_move_item(self, str uid, str|None previous=None)
Definition: __init__.py:341
web.Response get(self, web.Request request)
Definition: __init__.py:416
web.Response post(self, web.Request request, str item_id)
Definition: __init__.py:427
bool remove(self, _T matcher)
Definition: match.py:214
None websocket_handle_add(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:489
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:56
None websocket_handle_items(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:474
None websocket_handle_reorder(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:577
None websocket_handle_clear(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:561
bool async_setup_entry(HomeAssistant hass, ConfigEntry config_entry)
Definition: __init__.py:71
None websocket_handle_update(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:535
None websocket_handle_remove(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:505
None async_remove(HomeAssistant hass, str intent_type)
Definition: intent.py:90
None save_json(str filename, list|dict data, bool private=False, *type[json.JSONEncoder]|None encoder=None, bool atomic_writes=False)
Definition: json.py:202
JsonArrayType load_json_array(str|PathLike[str] filename, JsonArrayType default=_SENTINEL)
Definition: json.py:89