1 """Tessie Data Coordinator."""
3 from datetime
import timedelta
4 from http
import HTTPStatus
8 from aiohttp
import ClientResponseError
9 from tesla_fleet_api
import EnergySpecific
10 from tesla_fleet_api.exceptions
import InvalidToken, MissingToken, TeslaFleetError
11 from tessie_api
import get_state, get_status
18 from .const
import TessieStatus
21 TESSIE_SYNC_INTERVAL = 10
22 TESSIE_FLEET_API_SYNC_INTERVAL =
timedelta(seconds=30)
24 _LOGGER = logging.getLogger(__name__)
27 def flatten(data: dict[str, Any], parent: str |
None =
None) -> dict[str, Any]:
28 """Flatten the data structure."""
30 for key, value
in data.items():
32 key = f
"{parent}_{key}"
33 if isinstance(value, dict):
34 result.update(
flatten(value, key))
41 """Class to manage fetching data from the Tessie API."""
50 """Initialize Tessie Data Update Coordinator."""
55 update_interval=
timedelta(seconds=TESSIE_SYNC_INTERVAL),
63 """Update vehicle data using Tessie API."""
70 if status[
"status"] == TessieStatus.ASLEEP:
72 self.
datadatadata[
"state"] = status[
"status"]
81 except ClientResponseError
as e:
82 if e.status == HTTPStatus.UNAUTHORIZED:
84 raise ConfigEntryAuthFailed
from e
91 """Class to manage fetching energy site live status from the Tessie API."""
93 def __init__(self, hass: HomeAssistant, api: EnergySpecific) ->
None:
94 """Initialize Tessie Energy Site Live coordinator."""
98 name=
"Tessie Energy Site Live",
99 update_interval=TESSIE_FLEET_API_SYNC_INTERVAL,
104 """Update energy site data using Tessie API."""
107 data = (await self.
apiapi.live_status())[
"response"]
108 except (InvalidToken, MissingToken)
as e:
109 raise ConfigEntryAuthFailed
from e
110 except TeslaFleetError
as e:
114 data[
"wall_connectors"] = {
115 wc[
"din"]: wc
for wc
in (data.get(
"wall_connectors")
or [])
122 """Class to manage fetching energy site info from the Tessie API."""
124 def __init__(self, hass: HomeAssistant, api: EnergySpecific) ->
None:
125 """Initialize Tessie Energy Info coordinator."""
129 name=
"Tessie Energy Site Info",
130 update_interval=TESSIE_FLEET_API_SYNC_INTERVAL,
135 """Update energy site data using Tessie API."""
138 data = (await self.
apiapi.site_info())[
"response"]
139 except (InvalidToken, MissingToken)
as e:
140 raise ConfigEntryAuthFailed
from e
141 except TeslaFleetError
as e:
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, EnergySpecific api)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, EnergySpecific api)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, str api_key, str vin, dict[str, Any] data)
str|float get_state(dict[str, float] data, str key)
def get_status(hass, host, port)
dict[str, Any] flatten(dict[str, Any] data, str|None parent=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)