1 """Electric Kiwi coordinators."""
4 from collections
import OrderedDict
5 from datetime
import timedelta
8 from electrickiwi_api
import ElectricKiwiApi
9 from electrickiwi_api.exceptions
import ApiException, AuthException
10 from electrickiwi_api.model
import AccountBalance, Hop, HopIntervals
16 _LOGGER = logging.getLogger(__name__)
23 """ElectricKiwi Account Data object."""
25 def __init__(self, hass: HomeAssistant, ek_api: ElectricKiwiApi) ->
None:
26 """Initialize ElectricKiwiAccountDataCoordinator."""
30 name=
"Electric Kiwi Account Data",
31 update_interval=ACCOUNT_SCAN_INTERVAL,
36 """Fetch data from Account balance API endpoint."""
38 async
with asyncio.timeout(60):
39 return await self.
_ek_api_ek_api.get_account_balance()
40 except AuthException
as auth_err:
41 raise ConfigEntryAuthFailed
from auth_err
42 except ApiException
as api_err:
44 f
"Error communicating with EK API: {api_err}"
49 """ElectricKiwi HOP Data object."""
51 def __init__(self, hass: HomeAssistant, ek_api: ElectricKiwiApi) ->
None:
52 """Initialize ElectricKiwiAccountDataCoordinator."""
57 name=
"Electric Kiwi HOP Data",
59 update_interval=HOP_SCAN_INTERVAL,
65 """Get the hop interval options for selection."""
68 f
"{v.start_time} - {v.end_time}": k
74 """Update selected hop and data."""
77 except AuthException
as auth_err:
78 raise ConfigEntryAuthFailed
from auth_err
79 except ApiException
as api_err:
81 f
"Error communicating with EK API: {api_err}"
87 """Fetch data from API endpoint.
89 filters the intervals to remove ones that are not active
92 async
with asyncio.timeout(60):
94 hop_intervals: HopIntervals = await self.
_ek_api_ek_api.get_hop_intervals()
95 hop_intervals.intervals = OrderedDict(
97 lambda pair: pair[1].active == 1,
98 hop_intervals.intervals.items(),
103 return await self.
_ek_api_ek_api.get_hop()
104 except AuthException
as auth_err:
105 raise ConfigEntryAuthFailed
from auth_err
106 except ApiException
as api_err:
108 f
"Error communicating with EK API: {api_err}"
None __init__(self, HomeAssistant hass, ElectricKiwiApi ek_api)
AccountBalance _async_update_data(self)
Hop async_update_hop(self, int hop_interval)
dict[str, int] get_hop_options(self)
Hop _async_update_data(self)
None __init__(self, HomeAssistant hass, ElectricKiwiApi ek_api)
None async_set_updated_data(self, _DataT data)