1 """DataUpdateCoordinator for the Habitica integration."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from dataclasses
import dataclass
7 from datetime
import timedelta
8 from http
import HTTPStatus
10 from typing
import Any
12 from aiohttp
import ClientResponseError
13 from habitipy.aio
import HabitipyAsync
21 from .const
import DOMAIN
23 _LOGGER = logging.getLogger(__name__)
28 """Coordinator data class."""
35 """Habitica Data Update Coordinator."""
37 config_entry: ConfigEntry
39 def __init__(self, hass: HomeAssistant, habitipy: HabitipyAsync) ->
None:
40 """Initialize the Habitica data coordinator."""
53 self.
apiapi = habitipy
54 self.
contentcontent: dict[str, Any] = {}
58 user_response = await self.
apiapi.user.get()
59 tasks_response = await self.
apiapi.tasks.user.get()
60 tasks_response.extend(await self.
apiapi.tasks.user.get(type=
"completedTodos"))
63 language=user_response[
"preferences"][
"language"]
65 except ClientResponseError
as error:
66 if error.status == HTTPStatus.TOO_MANY_REQUESTS:
67 _LOGGER.debug(
"Rate limit exceeded, will try again later")
69 raise UpdateFailed(f
"Unable to connect to Habitica: {error}")
from error
71 return HabiticaData(user=user_response, tasks=tasks_response)
74 self, func: Callable[[HabiticaDataUpdateCoordinator], Any]
76 """Execute an API call."""
80 except ClientResponseError
as e:
81 if e.status == HTTPStatus.TOO_MANY_REQUESTS:
83 translation_domain=DOMAIN,
84 translation_key=
"setup_rate_limit_exception",
87 translation_domain=DOMAIN,
88 translation_key=
"service_call_exception",
None __init__(self, HomeAssistant hass, HabitipyAsync habitipy)
None execute(self, Callable[[HabiticaDataUpdateCoordinator], Any] func)
HabiticaData _async_update_data(self)
None async_request_refresh(self)