1 """Teslemetry Data Coordinator."""
3 from datetime
import datetime, timedelta
6 from tesla_fleet_api
import EnergySpecific, VehicleSpecific
7 from tesla_fleet_api.const
import TeslaEnergyPeriod, VehicleDataEndpoint
8 from tesla_fleet_api.exceptions
import (
20 from .const
import ENERGY_HISTORY_FIELDS, LOGGER, TeslemetryState
21 from .helpers
import flatten
30 VehicleDataEndpoint.CHARGE_STATE,
31 VehicleDataEndpoint.CLIMATE_STATE,
32 VehicleDataEndpoint.DRIVE_STATE,
33 VehicleDataEndpoint.LOCATION_DATA,
34 VehicleDataEndpoint.VEHICLE_STATE,
35 VehicleDataEndpoint.VEHICLE_CONFIG,
40 """Class to manage fetching data from the Teslemetry API."""
46 self, hass: HomeAssistant, api: VehicleSpecific, product: dict
48 """Initialize Teslemetry Vehicle Update Coordinator."""
52 name=
"Teslemetry Vehicle",
53 update_interval=VEHICLE_INTERVAL,
61 """Update vehicle data using Teslemetry API."""
66 if self.
datadatadata[
"state"] != TeslemetryState.ONLINE:
67 response = await self.
apiapi.vehicle()
68 self.
datadatadata[
"state"] = response[
"response"][
"state"]
70 if self.
datadatadata[
"state"] != TeslemetryState.ONLINE:
73 response = await self.
apiapi.vehicle_data(endpoints=ENDPOINTS)
74 data = response[
"response"]
76 except VehicleOffline:
77 self.
datadatadata[
"state"] = TeslemetryState.OFFLINE
79 except InvalidToken
as e:
80 raise ConfigEntryAuthFailed
from e
81 except SubscriptionRequired
as e:
82 raise ConfigEntryAuthFailed
from e
83 except TeslaFleetError
as e:
88 if self.
apiapi.pre2021
and data[
"state"] == TeslemetryState.ONLINE:
91 data[
"charge_state"].
get(
"charging_state") ==
"Charging"
92 or data[
"vehicle_state"].
get(
"is_user_present")
93 or data[
"vehicle_state"].
get(
"sentry_mode")
98 elapsed = datetime.now() - self.
last_activelast_active
110 """Class to manage fetching energy site live status from the Teslemetry API."""
114 def __init__(self, hass: HomeAssistant, api: EnergySpecific) ->
None:
115 """Initialize Teslemetry Energy Site Live coordinator."""
119 name=
"Teslemetry Energy Site Live",
120 update_interval=ENERGY_LIVE_INTERVAL,
125 """Update energy site data using Teslemetry API."""
128 data = (await self.
apiapi.live_status())[
"response"]
129 except (InvalidToken, Forbidden, SubscriptionRequired)
as e:
130 raise ConfigEntryAuthFailed
from e
131 except TeslaFleetError
as e:
135 data[
"wall_connectors"] = {
136 wc[
"din"]: wc
for wc
in (data.get(
"wall_connectors")
or [])
143 """Class to manage fetching energy site info from the Teslemetry API."""
147 def __init__(self, hass: HomeAssistant, api: EnergySpecific, product: dict) ->
None:
148 """Initialize Teslemetry Energy Info coordinator."""
152 name=
"Teslemetry Energy Site Info",
153 update_interval=ENERGY_INFO_INTERVAL,
159 """Update energy site data using Teslemetry API."""
162 data = (await self.
apiapi.site_info())[
"response"]
163 except (InvalidToken, Forbidden, SubscriptionRequired)
as e:
164 raise ConfigEntryAuthFailed
from e
165 except TeslaFleetError
as e:
172 """Class to manage fetching energy site info from the Teslemetry API."""
176 def __init__(self, hass: HomeAssistant, api: EnergySpecific) ->
None:
177 """Initialize Teslemetry Energy Info coordinator."""
181 name=f
"Teslemetry Energy History {api.energy_site_id}",
182 update_interval=ENERGY_HISTORY_INTERVAL,
187 """Update energy site data using Teslemetry API."""
190 data = (await self.
apiapi.energy_history(TeslaEnergyPeriod.DAY))[
"response"]
191 except (InvalidToken, Forbidden, SubscriptionRequired)
as e:
192 raise ConfigEntryAuthFailed
from e
193 except TeslaFleetError
as e:
199 output = {key: 0
for key
in ENERGY_HISTORY_FIELDS}
200 for period
in data.get(
"time_series", []):
201 for key
in ENERGY_HISTORY_FIELDS:
202 output[key] += period.get(key, 0)
None __init__(self, HomeAssistant hass, EnergySpecific api)
dict[str, Any] _async_update_data(self)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, EnergySpecific api, dict product)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, EnergySpecific api)
None __init__(self, HomeAssistant hass, VehicleSpecific api, dict product)
dict[str, Any] _async_update_data(self)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)
web.Response get(self, web.Request request, str config_key)
dict[str, Any] flatten(dict[str, Any] data, str|None parent=None)