1 """Define an object to coordinate fetching Ondilo ICO data."""
3 from dataclasses
import dataclass
4 from datetime
import timedelta
8 from ondilo
import OndiloError
14 from .api
import OndiloClient
16 _LOGGER = logging.getLogger(__name__)
21 """Class for storing the data."""
25 sensors: dict[str, Any]
29 """Class to manage fetching Ondilo ICO data from API."""
31 def __init__(self, hass: HomeAssistant, api: OndiloClient) ->
None:
42 """Fetch data from API endpoint."""
44 return await self.
hasshass.async_add_executor_job(self.
_update_data_update_data)
45 except OndiloError
as err:
46 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
49 """Fetch data from API endpoint."""
51 pools = self.
apiapi.get_pools()
52 _LOGGER.debug(
"Pools: %s", pools)
53 error: OndiloError |
None =
None
57 ico = self.
apiapi.get_ICO_details(pool_id)
60 "The pool id %s does not have any ICO attached", pool_id
63 sensors = self.
apiapi.get_last_pool_measures(pool_id)
64 except OndiloError
as err:
66 _LOGGER.debug(
"Error communicating with API for %s: %s", pool_id, err)
71 sensors={sensor[
"data_type"]: sensor[
"value"]
for sensor
in sensors},
75 raise UpdateFailed(f
"Error communicating with API: {error}")
from error
None __init__(self, HomeAssistant hass, OndiloClient api)
dict[str, OndiloIcoData] _update_data(self)
dict[str, OndiloIcoData] _async_update_data(self)