Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for the Bring! integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 from bring_api import (
9  Bring,
10  BringAuthException,
11  BringParseException,
12  BringRequestException,
13 )
14 from bring_api.types import BringItemsResponse, BringList, BringUserSettingsResponse
15 
16 from homeassistant.config_entries import ConfigEntry
17 from homeassistant.const import CONF_EMAIL
18 from homeassistant.core import HomeAssistant
19 from homeassistant.exceptions import ConfigEntryAuthFailed
20 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
21 
22 from .const import DOMAIN
23 
24 _LOGGER = logging.getLogger(__name__)
25 
26 
27 class BringData(BringList, BringItemsResponse):
28  """Coordinator data class."""
29 
30 
32  """A Bring Data Update Coordinator."""
33 
34  config_entry: ConfigEntry
35  user_settings: BringUserSettingsResponse
36 
37  def __init__(self, hass: HomeAssistant, bring: Bring) -> None:
38  """Initialize the Bring data coordinator."""
39  super().__init__(
40  hass,
41  _LOGGER,
42  name=DOMAIN,
43  update_interval=timedelta(seconds=90),
44  )
45  self.bringbring = bring
46 
47  async def _async_update_data(self) -> dict[str, BringData]:
48  try:
49  lists_response = await self.bringbring.load_lists()
50  except BringRequestException as e:
51  raise UpdateFailed("Unable to connect and retrieve data from bring") from e
52  except BringParseException as e:
53  raise UpdateFailed("Unable to parse response from bring") from e
54  except BringAuthException as e:
55  # try to recover by refreshing access token, otherwise
56  # initiate reauth flow
57  try:
58  await self.bringbring.retrieve_new_access_token()
59  except (BringRequestException, BringParseException) as exc:
60  raise UpdateFailed("Refreshing authentication token failed") from exc
61  except BringAuthException as exc:
63  translation_domain=DOMAIN,
64  translation_key="setup_authentication_exception",
65  translation_placeholders={CONF_EMAIL: self.bringbring.mail},
66  ) from exc
67  raise UpdateFailed(
68  "Authentication failed but re-authentication was successful, trying again later"
69  ) from e
70 
71  list_dict: dict[str, BringData] = {}
72  for lst in lists_response["lists"]:
73  try:
74  items = await self.bringbring.get_list(lst["listUuid"])
75  except BringRequestException as e:
76  raise UpdateFailed(
77  "Unable to connect and retrieve data from bring"
78  ) from e
79  except BringParseException as e:
80  raise UpdateFailed("Unable to parse response from bring") from e
81  else:
82  list_dict[lst["listUuid"]] = BringData(**lst, **items)
83 
84  return list_dict
85 
86  async def _async_setup(self) -> None:
87  """Set up coordinator."""
88 
89  await self.async_refresh_user_settingsasync_refresh_user_settings()
90 
91  async def async_refresh_user_settings(self) -> None:
92  """Refresh user settings."""
93  try:
94  self.user_settingsuser_settings = await self.bringbring.get_all_user_settings()
95  except (BringAuthException, BringRequestException, BringParseException) as e:
96  raise UpdateFailed(
97  "Unable to connect and retrieve user settings from bring"
98  ) from e
None __init__(self, HomeAssistant hass, Bring bring)
Definition: coordinator.py:37