1 """DataUpdateCoordinator for the Bring! integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from bring_api
import (
12 BringRequestException,
14 from bring_api.types
import BringItemsResponse, BringList, BringUserSettingsResponse
22 from .const
import DOMAIN
24 _LOGGER = logging.getLogger(__name__)
28 """Coordinator data class."""
32 """A Bring Data Update Coordinator."""
34 config_entry: ConfigEntry
35 user_settings: BringUserSettingsResponse
37 def __init__(self, hass: HomeAssistant, bring: Bring) ->
None:
38 """Initialize the Bring data coordinator."""
49 lists_response = await self.
bringbring.load_lists()
50 except BringRequestException
as e:
51 raise UpdateFailed(
"Unable to connect and retrieve data from bring")
from e
52 except BringParseException
as e:
53 raise UpdateFailed(
"Unable to parse response from bring")
from e
54 except BringAuthException
as e:
58 await self.
bringbring.retrieve_new_access_token()
59 except (BringRequestException, BringParseException)
as exc:
60 raise UpdateFailed(
"Refreshing authentication token failed")
from exc
61 except BringAuthException
as exc:
63 translation_domain=DOMAIN,
64 translation_key=
"setup_authentication_exception",
65 translation_placeholders={CONF_EMAIL: self.
bringbring.mail},
68 "Authentication failed but re-authentication was successful, trying again later"
71 list_dict: dict[str, BringData] = {}
72 for lst
in lists_response[
"lists"]:
74 items = await self.
bringbring.get_list(lst[
"listUuid"])
75 except BringRequestException
as e:
77 "Unable to connect and retrieve data from bring"
79 except BringParseException
as e:
80 raise UpdateFailed(
"Unable to parse response from bring")
from e
82 list_dict[lst[
"listUuid"]] =
BringData(**lst, **items)
87 """Set up coordinator."""
92 """Refresh user settings."""
95 except (BringAuthException, BringRequestException, BringParseException)
as e:
97 "Unable to connect and retrieve user settings from bring"
None async_refresh_user_settings(self)
None __init__(self, HomeAssistant hass, Bring bring)
dict[str, BringData] _async_update_data(self)