1 """Teslemetry integration."""
4 from collections.abc
import Callable
5 from typing
import Final
7 from tesla_fleet_api
import EnergySpecific, Teslemetry, VehicleSpecific
8 from tesla_fleet_api.const
import Scope
9 from tesla_fleet_api.exceptions
import (
14 from teslemetry_stream
import TeslemetryStream
26 from .const
import DOMAIN, LOGGER, MODELS
27 from .coordinator
import (
28 TeslemetryEnergyHistoryCoordinator,
29 TeslemetryEnergySiteInfoCoordinator,
30 TeslemetryEnergySiteLiveCoordinator,
31 TeslemetryVehicleDataCoordinator,
33 from .helpers
import flatten
34 from .models
import TeslemetryData, TeslemetryEnergyData, TeslemetryVehicleData
35 from .services
import async_register_services
38 Platform.BINARY_SENSOR,
42 Platform.DEVICE_TRACKER,
44 Platform.MEDIA_PLAYER,
52 type TeslemetryConfigEntry = ConfigEntry[TeslemetryData]
54 CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
57 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
58 """Set up the Telemetry integration."""
64 """Set up Teslemetry config."""
66 access_token = entry.data[CONF_ACCESS_TOKEN]
70 teslemetry = Teslemetry(
72 access_token=access_token,
75 calls = await asyncio.gather(
76 teslemetry.metadata(),
77 teslemetry.products(),
79 except InvalidToken
as e:
80 raise ConfigEntryAuthFailed
from e
81 except SubscriptionRequired
as e:
82 raise ConfigEntryAuthFailed
from e
83 except TeslaFleetError
as e:
84 raise ConfigEntryNotReady
from e
86 scopes = calls[0][
"scopes"]
87 region = calls[0][
"region"]
88 products = calls[1][
"response"]
90 device_registry = dr.async_get(hass)
93 vehicles: list[TeslemetryVehicleData] = []
94 energysites: list[TeslemetryEnergyData] = []
97 stream = TeslemetryStream(
100 server=f
"{region.lower()}.teslemetry.com",
101 parse_timestamp=
True,
104 for product
in products:
105 if "vin" in product
and Scope.VEHICLE_DEVICE_DATA
in scopes:
107 product.pop(
"cached_data",
None)
109 api = VehicleSpecific(teslemetry.vehicle, vin)
112 identifiers={(DOMAIN, vin)},
113 manufacturer=
"Tesla",
114 configuration_url=
"https://teslemetry.com/console",
115 name=product[
"display_name"],
116 model=MODELS.get(vin[3]),
120 remove_listener = stream.async_add_listener(
128 coordinator=coordinator,
132 remove_listener=remove_listener,
136 elif "energy_site_id" in product
and Scope.ENERGY_DEVICE_DATA
in scopes:
137 site_id = product[
"energy_site_id"]
139 product[
"components"][
"battery"]
or product[
"components"][
"solar"]
141 wall_connector =
"wall_connectors" in product[
"components"]
142 if not powerwall
and not wall_connector:
144 "Skipping Energy Site %s as it has no components",
149 api = EnergySpecific(teslemetry.energy, site_id)
151 identifiers={(DOMAIN,
str(site_id))},
152 manufacturer=
"Tesla",
153 configuration_url=
"https://teslemetry.com/console",
154 name=product.get(
"site_name",
"Energy Site"),
155 serial_number=
str(site_id),
165 history_coordinator=(
176 await asyncio.gather(
178 vehicle.coordinator.async_config_entry_first_refresh()
179 for vehicle
in vehicles
182 energysite.live_coordinator.async_config_entry_first_refresh()
183 for energysite
in energysites
186 energysite.info_coordinator.async_config_entry_first_refresh()
187 for energysite
in energysites
190 energysite.history_coordinator.async_config_entry_first_refresh()
191 for energysite
in energysites
192 if energysite.history_coordinator
197 for energysite
in energysites:
199 for gateway
in energysite.info_coordinator.data.get(
"components_gateways", []):
200 if gateway.get(
"part_name"):
201 models.add(gateway[
"part_name"])
202 for battery
in energysite.info_coordinator.data.get(
"components_batteries", []):
203 if battery.get(
"part_name"):
204 models.add(battery[
"part_name"])
206 energysite.device[
"model"] =
", ".join(sorted(models))
210 device_registry.async_get_or_create(
211 config_entry_id=entry.entry_id, **energysite.device
215 entry.runtime_data =
TeslemetryData(vehicles, energysites, scopes)
216 await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
222 """Unload Teslemetry Config."""
223 return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
227 """Migrate config entry."""
228 if config_entry.version > 1:
231 if config_entry.version == 1
and config_entry.minor_version < 2:
233 teslemetry = Teslemetry(
235 access_token=config_entry.data[CONF_ACCESS_TOKEN],
238 metadata = await teslemetry.metadata()
239 except TeslaFleetError
as e:
240 LOGGER.error(e.message)
243 hass.config_entries.async_update_entry(
244 config_entry, unique_id=metadata[
"uid"], version=1, minor_version=2
250 """Create a handle vehicle stream function."""
252 def handle_vehicle_stream(data: dict) ->
None:
253 """Handle vehicle data from the stream."""
254 if "vehicle_data" in data:
255 LOGGER.debug(
"Streaming received vehicle data from %s", vin)
256 coordinator.updated_once =
True
257 coordinator.async_set_updated_data(
flatten(data[
"vehicle_data"]))
258 elif "state" in data:
259 LOGGER.debug(
"Streaming received state from %s", vin)
260 coordinator.data[
"state"] = data[
"state"]
261 coordinator.async_set_updated_data(coordinator.data)
263 return handle_vehicle_stream
None async_register_services(HomeAssistant hass)
dict[str, Any] flatten(dict[str, Any] data, str|None parent=None)
Callable[[dict], None] create_handle_vehicle_stream(str vin, coordinator)
bool async_setup_entry(HomeAssistant hass, TeslemetryConfigEntry entry)
bool async_setup(HomeAssistant hass, ConfigType config)
bool async_unload_entry(HomeAssistant hass, TeslemetryConfigEntry entry)
bool async_migrate_entry(HomeAssistant hass, ConfigEntry config_entry)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)