1 """Support to manage a shopping list."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from http
import HTTPStatus
8 from typing
import Any, cast
11 from aiohttp
import web
12 import voluptuous
as vol
14 from homeassistant
import config_entries
29 EVENT_SHOPPING_LIST_UPDATED,
31 SERVICE_CLEAR_COMPLETED_ITEMS,
33 SERVICE_COMPLETE_ITEM,
34 SERVICE_INCOMPLETE_ALL,
35 SERVICE_INCOMPLETE_ITEM,
40 PLATFORMS = [Platform.TODO]
42 ATTR_COMPLETE =
"complete"
44 _LOGGER = logging.getLogger(__name__)
45 CONFIG_SCHEMA = vol.Schema({DOMAIN: {}}, extra=vol.ALLOW_EXTRA)
46 ITEM_UPDATE_SCHEMA = vol.Schema({ATTR_COMPLETE: bool, ATTR_NAME: str})
47 PERSISTENCE =
".shopping_list.json"
49 SERVICE_ITEM_SCHEMA = vol.Schema({vol.Required(ATTR_NAME): cv.string})
50 SERVICE_LIST_SCHEMA = vol.Schema({})
51 SERVICE_SORT_SCHEMA = vol.Schema(
52 {vol.Optional(ATTR_REVERSE, default=DEFAULT_REVERSE): bool}
56 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
57 """Initialize the shopping list."""
59 if DOMAIN
not in config:
62 hass.async_create_task(
63 hass.config_entries.flow.async_init(
64 DOMAIN, context={
"source": config_entries.SOURCE_IMPORT}
72 """Set up shopping list from config flow."""
74 async
def add_item_service(call: ServiceCall) ->
None:
75 """Add an item with `name`."""
76 data = hass.data[DOMAIN]
77 await data.async_add(call.data[ATTR_NAME])
79 async
def remove_item_service(call: ServiceCall) ->
None:
80 """Remove the first item with matching `name`."""
81 data = hass.data[DOMAIN]
82 name = call.data[ATTR_NAME]
85 item = [item
for item
in data.items
if item[
"name"] == name][0]
87 _LOGGER.error(
"Removing of item failed: %s cannot be found", name)
89 await data.async_remove(item[
"id"])
91 async
def complete_item_service(call: ServiceCall) ->
None:
92 """Mark the first item with matching `name` as completed."""
93 data = hass.data[DOMAIN]
94 name = call.data[ATTR_NAME]
97 item = [item
for item
in data.items
if item[
"name"] == name][0]
99 _LOGGER.error(
"Updating of item failed: %s cannot be found", name)
101 await data.async_update(item[
"id"], {
"name": name,
"complete":
True})
103 async
def incomplete_item_service(call: ServiceCall) ->
None:
104 """Mark the first item with matching `name` as incomplete."""
105 data = hass.data[DOMAIN]
106 name = call.data[ATTR_NAME]
109 item = [item
for item
in data.items
if item[
"name"] == name][0]
111 _LOGGER.error(
"Restoring of item failed: %s cannot be found", name)
113 await data.async_update(item[
"id"], {
"name": name,
"complete":
False})
115 async
def complete_all_service(call: ServiceCall) ->
None:
116 """Mark all items in the list as complete."""
117 await data.async_update_list({
"complete":
True})
119 async
def incomplete_all_service(call: ServiceCall) ->
None:
120 """Mark all items in the list as incomplete."""
121 await data.async_update_list({
"complete":
False})
123 async
def clear_completed_items_service(call: ServiceCall) ->
None:
124 """Clear all completed items from the list."""
125 await data.async_clear_completed()
127 async
def sort_list_service(call: ServiceCall) ->
None:
128 """Sort all items by name."""
129 await data.async_sort(call.data[ATTR_REVERSE])
132 await data.async_load()
134 hass.services.async_register(
135 DOMAIN, SERVICE_ADD_ITEM, add_item_service, schema=SERVICE_ITEM_SCHEMA
137 hass.services.async_register(
138 DOMAIN, SERVICE_REMOVE_ITEM, remove_item_service, schema=SERVICE_ITEM_SCHEMA
140 hass.services.async_register(
141 DOMAIN, SERVICE_COMPLETE_ITEM, complete_item_service, schema=SERVICE_ITEM_SCHEMA
143 hass.services.async_register(
145 SERVICE_INCOMPLETE_ITEM,
146 incomplete_item_service,
147 schema=SERVICE_ITEM_SCHEMA,
149 hass.services.async_register(
151 SERVICE_COMPLETE_ALL,
152 complete_all_service,
153 schema=SERVICE_LIST_SCHEMA,
155 hass.services.async_register(
157 SERVICE_INCOMPLETE_ALL,
158 incomplete_all_service,
159 schema=SERVICE_LIST_SCHEMA,
161 hass.services.async_register(
163 SERVICE_CLEAR_COMPLETED_ITEMS,
164 clear_completed_items_service,
165 schema=SERVICE_LIST_SCHEMA,
167 hass.services.async_register(
171 schema=SERVICE_SORT_SCHEMA,
174 hass.http.register_view(ShoppingListView)
175 hass.http.register_view(CreateShoppingListItemView)
176 hass.http.register_view(UpdateShoppingListItemView)
177 hass.http.register_view(ClearCompletedItemsView)
179 websocket_api.async_register_command(hass, websocket_handle_items)
180 websocket_api.async_register_command(hass, websocket_handle_add)
181 websocket_api.async_register_command(hass, websocket_handle_remove)
182 websocket_api.async_register_command(hass, websocket_handle_update)
183 websocket_api.async_register_command(hass, websocket_handle_clear)
184 websocket_api.async_register_command(hass, websocket_handle_reorder)
186 await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
192 """No matching item could be found in the shopping list."""
196 """Class to hold shopping list data."""
199 """Initialize the shopping list."""
201 self.
itemsitems: list[dict[str, JsonValueType]] = []
202 self._listeners: list[Callable[[],
None]] = []
205 self, name: str |
None, complete: bool =
False, context: Context |
None =
None
206 ) -> dict[str, JsonValueType]:
207 """Add a shopping list item."""
208 item: dict[str, JsonValueType] = {
210 "id": uuid.uuid4().hex,
211 "complete": complete,
213 self.
itemsitems.append(item)
214 await self.
hasshass.async_add_executor_job(self.
savesave)
216 self.
hasshass.bus.async_fire(
217 EVENT_SHOPPING_LIST_UPDATED,
218 {
"action":
"add",
"item": item},
224 self, item_id: str, context: Context |
None =
None
225 ) -> dict[str, JsonValueType] |
None:
226 """Remove a shopping list item."""
228 item_ids=set({item_id}), context=context
230 return next(iter(removed),
None)
233 self, item_ids: set[str], context: Context |
None =
None
234 ) -> list[dict[str, JsonValueType]]:
235 """Remove a shopping list item."""
236 items_dict: dict[str, dict[str, JsonValueType]] = {}
237 for itm
in self.
itemsitems:
238 item_id = cast(str, itm[
"id"])
239 items_dict[item_id] = itm
241 for item_id
in item_ids:
245 if not (item := items_dict.pop(item_id,
None)):
247 "Item '{item_id}' not found in shopping list"
251 await self.
hasshass.async_add_executor_job(self.
savesave)
254 self.
hasshass.bus.async_fire(
255 EVENT_SHOPPING_LIST_UPDATED,
256 {
"action":
"remove",
"item": item},
262 self, item_id: str |
None, info: dict[str, Any], context: Context |
None =
None
263 ) -> dict[str, JsonValueType]:
264 """Update a shopping list item."""
265 item = next((itm
for itm
in self.
itemsitems
if itm[
"id"] == item_id),
None)
268 raise NoMatchingShoppingListItem
272 await self.
hasshass.async_add_executor_job(self.
savesave)
274 self.
hasshass.bus.async_fire(
275 EVENT_SHOPPING_LIST_UPDATED,
276 {
"action":
"update",
"item": item},
282 """Clear completed items."""
283 self.
itemsitems = [itm
for itm
in self.
itemsitems
if not itm[
"complete"]]
284 await self.
hasshass.async_add_executor_job(self.
savesave)
286 self.
hasshass.bus.async_fire(
287 EVENT_SHOPPING_LIST_UPDATED,
293 self, info: dict[str, JsonValueType], context: Context |
None =
None
294 ) -> list[dict[str, JsonValueType]]:
295 """Update all items in the list."""
296 for item
in self.
itemsitems:
298 await self.
hasshass.async_add_executor_job(self.
savesave)
300 self.
hasshass.bus.async_fire(
301 EVENT_SHOPPING_LIST_UPDATED,
302 {
"action":
"update_list"},
305 return self.
itemsitems
309 self, item_ids: list[str], context: Context |
None =
None
314 all_items_mapping = {item[
"id"]: item
for item
in self.
itemsitems}
316 for item_id
in item_ids:
317 if item_id
not in all_items_mapping:
318 raise NoMatchingShoppingListItem
319 new_items.append(all_items_mapping[item_id])
321 del all_items_mapping[item_id]
323 for value
in all_items_mapping.values():
326 if value[
"complete"]
is False:
328 "The item ids array doesn't contain all the unchecked shopping list"
331 new_items.append(value)
332 self.
itemsitems = new_items
333 self.
hasshass.async_add_executor_job(self.
savesave)
335 self.
hasshass.bus.async_fire(
336 EVENT_SHOPPING_LIST_UPDATED,
337 {
"action":
"reorder"},
342 """Re-order a shopping list item."""
345 item_idx = {cast(str, itm[
"id"]): idx
for idx, itm
in enumerate(self.
itemsitems)}
346 if uid
not in item_idx:
348 if previous
and previous
not in item_idx:
350 f
"Item '{previous}' not found in shopping list"
352 dst_idx = item_idx[previous] + 1
if previous
else 0
353 src_idx = item_idx[uid]
354 src_item = self.
itemsitems.pop(src_idx)
355 if dst_idx > src_idx:
357 self.
itemsitems.insert(dst_idx, src_item)
358 await self.
hasshass.async_add_executor_job(self.
savesave)
360 self.
hasshass.bus.async_fire(
361 EVENT_SHOPPING_LIST_UPDATED,
362 {
"action":
"reorder"},
366 self, reverse: bool =
False, context: Context |
None =
None
368 """Sort items by name."""
369 self.
itemsitems = sorted(self.
itemsitems, key=
lambda item: item[
"name"], reverse=reverse)
370 self.
hasshass.async_add_executor_job(self.
savesave)
372 self.
hasshass.bus.async_fire(
373 EVENT_SHOPPING_LIST_UPDATED,
374 {
"action":
"sorted"},
381 def load() -> list[dict[str, JsonValueType]]:
382 """Load the items synchronously."""
384 list[dict[str, JsonValueType]],
388 self.
itemsitems = await self.
hasshass.async_add_executor_job(load)
391 """Save the items."""
395 """Add a listener to notify when data is updated."""
398 self._listeners.
remove(cb)
400 self._listeners.append(cb)
404 """Notify all listeners that data has been updated."""
405 for listener
in self._listeners:
410 """View to retrieve shopping list content."""
412 url =
"/api/shopping_list"
413 name =
"api:shopping_list"
416 def get(self, request: web.Request) -> web.Response:
417 """Retrieve shopping list items."""
418 return self.json(request.app[http.KEY_HASS].data[DOMAIN].items)
422 """View to retrieve shopping list content."""
424 url =
"/api/shopping_list/item/{item_id}"
425 name =
"api:shopping_list:item:id"
427 async
def post(self, request: web.Request, item_id: str) -> web.Response:
428 """Update a shopping list item."""
429 data = await request.json()
430 hass = request.app[http.KEY_HASS]
433 item = await hass.data[DOMAIN].
async_update(item_id, data)
434 return self.json(item)
435 except NoMatchingShoppingListItem:
436 return self.json_message(
"Item not found", HTTPStatus.NOT_FOUND)
438 return self.json_message(
"Item not found", HTTPStatus.BAD_REQUEST)
442 """View to retrieve shopping list content."""
444 url =
"/api/shopping_list/item"
445 name =
"api:shopping_list:item"
447 @RequestDataValidator(vol.Schema({vol.Required("name"): str}))
448 async
def post(self, request: web.Request, data: dict[str, str]) -> web.Response:
449 """Create a new shopping list item."""
450 hass = request.app[http.KEY_HASS]
451 item = await hass.data[DOMAIN].async_add(data[
"name"])
452 return self.json(item)
456 """View to retrieve shopping list content."""
458 url =
"/api/shopping_list/clear_completed"
459 name =
"api:shopping_list:clear_completed"
461 async
def post(self, request: web.Request) -> web.Response:
462 """Retrieve if API is running."""
463 hass = request.app[http.KEY_HASS]
464 await hass.data[DOMAIN].async_clear_completed()
465 return self.json_message(
"Cleared completed items.")
469 @websocket_api.websocket_command({vol.Required("type"):
"shopping_list/items"})
475 """Handle getting shopping_list items."""
476 connection.send_message(
477 websocket_api.result_message(msg[
"id"], hass.data[DOMAIN].items)
481 @websocket_api.websocket_command(
{vol.Required("type"):
"shopping_list/items/add", vol.Required(
"name"): str}
483 @websocket_api.async_response
489 """Handle adding item to shopping_list."""
490 item = await hass.data[DOMAIN].async_add(
491 msg[
"name"], context=connection.context(msg)
493 connection.send_message(websocket_api.result_message(msg[
"id"], item))
496 @websocket_api.websocket_command(
{vol.Required("type"):
"shopping_list/items/remove", vol.Required(
"item_id"): str}
498 @websocket_api.async_response
504 """Handle removing shopping_list item."""
505 msg_id = msg.pop(
"id")
506 item_id = msg.pop(
"item_id")
510 item = await hass.data[DOMAIN].
async_remove(item_id, connection.context(msg))
511 except NoMatchingShoppingListItem:
512 connection.send_message(
513 websocket_api.error_message(msg_id,
"item_not_found",
"Item not found")
517 connection.send_message(websocket_api.result_message(msg_id, item))
520 @websocket_api.websocket_command(
{
vol.Required("type"):
"shopping_list/items/update",
521 vol.Required(
"item_id"): str,
522 vol.Optional(
"name"): str,
523 vol.Optional(
"complete"): bool,
526 @websocket_api.async_response
532 """Handle updating shopping_list item."""
533 msg_id = msg.pop(
"id")
534 item_id = msg.pop(
"item_id")
540 item_id, data, connection.context(msg)
542 except NoMatchingShoppingListItem:
543 connection.send_message(
544 websocket_api.error_message(msg_id,
"item_not_found",
"Item not found")
548 connection.send_message(websocket_api.result_message(msg_id, item))
551 @websocket_api.websocket_command({vol.Required("type"):
"shopping_list/items/clear"})
552 @websocket_api.async_response
558 """Handle clearing shopping_list items."""
559 await hass.data[DOMAIN].async_clear_completed(connection.context(msg))
560 connection.send_message(websocket_api.result_message(msg[
"id"]))
563 @websocket_api.websocket_command(
{
vol.Required("type"):
"shopping_list/items/reorder",
564 vol.Required(
"item_ids"): [str],
572 """Handle reordering shopping_list items."""
573 msg_id = msg.pop(
"id")
575 hass.data[DOMAIN].async_reorder(msg.pop(
"item_ids"), connection.context(msg))
576 except NoMatchingShoppingListItem:
577 connection.send_error(
579 websocket_api.ERR_NOT_FOUND,
580 "One or more item id(s) not found.",
583 except vol.Invalid
as err:
584 connection.send_error(msg_id, websocket_api.ERR_INVALID_FORMAT, f
"{err}")
587 connection.send_result(msg_id)
588
bool remove(self, _T matcher)
None async_remove(HomeAssistant hass, str intent_type)
None save_json(str filename, list|dict data, bool private=False, *type[json.JSONEncoder]|None encoder=None, bool atomic_writes=False)
JsonArrayType load_json_array(str|PathLike[str] filename, JsonArrayType default=_SENTINEL)