1 """The OurGroceries coordinator."""
3 from __future__
import annotations
6 from datetime
import timedelta
9 from ourgroceries
import OurGroceries
14 from .const
import DOMAIN
18 _LOGGER = logging.getLogger(__name__)
22 """Class to manage fetching OurGroceries data."""
24 def __init__(self, hass: HomeAssistant, og: OurGroceries) ->
None:
25 """Initialize global OurGroceries data updater."""
27 self.
listslists: list[dict] = []
28 self._cache: dict[str, dict] = {}
29 interval =
timedelta(seconds=SCAN_INTERVAL)
34 update_interval=interval,
37 async
def _update_list(self, list_id: str, version_id: str) ->
None:
38 old_version = self._cache.
get(list_id, {}).
get(
"list", {}).
get(
"versionId",
"")
39 if old_version == version_id:
41 self._cache[list_id] = await self.
ogog.get_list_items(list_id=list_id)
44 """Fetch data from OurGroceries."""
45 self.
listslists = (await self.
ogog.get_my_lists())[
"shoppingLists"]
47 *[self.
_update_list_update_list(sl[
"id"], sl[
"versionId"])
for sl
in self.
listslists]
None _update_list(self, str list_id, str version_id)
None __init__(self, HomeAssistant hass, OurGroceries og)
dict[str, dict] _async_update_data(self)
web.Response get(self, web.Request request, str config_key)