1 """Proxy to handle account communication with Renault servers."""
3 from __future__
import annotations
6 from datetime
import timedelta
9 from renault_api.gigya.exceptions
import InvalidCredentialsException
10 from renault_api.kamereon.models
import KamereonVehiclesLink
11 from renault_api.renault_account
import RenaultAccount
12 from renault_api.renault_client
import RenaultClient
27 from .const
import CONF_KAMEREON_ACCOUNT_ID, DEFAULT_SCAN_INTERVAL
28 from .renault_vehicle
import RenaultVehicleProxy
30 LOGGER = logging.getLogger(__name__)
34 """Handle account communication with Renault servers."""
36 def __init__(self, hass: HomeAssistant, locale: str) ->
None:
37 """Initialise proxy."""
42 self.
_account_account: RenaultAccount |
None =
None
43 self._vehicles: dict[str, RenaultVehicleProxy] = {}
46 """Attempt login to Renault servers."""
48 await self.
_client_client.session.login(username, password)
49 except InvalidCredentialsException
as ex:
50 LOGGER.error(
"Login to Renault failed: %s", ex.error_details)
57 account_id: str = config_entry.data[CONF_KAMEREON_ACCOUNT_ID]
58 scan_interval =
timedelta(seconds=DEFAULT_SCAN_INTERVAL)
61 vehicles = await self.
_account_account.get_vehicles()
62 if vehicles.vehicleLinks:
64 vehicle_link.vehicleDetails
is None
65 for vehicle_link
in vehicles.vehicleLinks
68 "Failed to retrieve vehicle details from Renault servers"
70 device_registry = dr.async_get(self.
_hass_hass)
80 for vehicle_link
in vehicles.vehicleLinks
86 vehicle_link: KamereonVehiclesLink,
87 renault_account: RenaultAccount,
88 scan_interval: timedelta,
89 config_entry: ConfigEntry,
90 device_registry: dr.DeviceRegistry,
93 assert vehicle_link.vin
is not None
94 assert vehicle_link.vehicleDetails
is not None
98 vehicle=await renault_account.get_api_vehicle(vehicle_link.vin),
99 details=vehicle_link.vehicleDetails,
100 scan_interval=scan_interval,
102 await vehicle.async_initialise()
103 device_registry.async_get_or_create(
104 config_entry_id=config_entry.entry_id,
105 identifiers=vehicle.device_info[ATTR_IDENTIFIERS],
106 manufacturer=vehicle.device_info[ATTR_MANUFACTURER],
107 name=vehicle.device_info[ATTR_NAME],
108 model=vehicle.device_info[ATTR_MODEL],
109 model_id=vehicle.device_info[ATTR_MODEL_ID],
111 self._vehicles[vehicle_link.vin] = vehicle
114 """Get Kamereon account ids."""
116 for account
in await self.
_client_client.get_api_accounts():
117 vehicles = await account.get_vehicles()
120 if vehicles.vehicleLinks:
121 accounts.append(account.account_id)
125 def vehicles(self) -> dict[str, RenaultVehicleProxy]:
126 """Get list of vehicles."""
127 return self._vehicles
None async_initialise_vehicle(self, KamereonVehiclesLink vehicle_link, RenaultAccount renault_account, timedelta scan_interval, ConfigEntry config_entry, dr.DeviceRegistry device_registry)
list[str] get_account_ids(self)
bool attempt_login(self, str username, str password)
dict[str, RenaultVehicleProxy] vehicles(self)
None __init__(self, HomeAssistant hass, str locale)
None async_initialise(self, ConfigEntry config_entry)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)