1 """The Blue Current integration."""
3 from __future__
import annotations
6 from contextlib
import suppress
9 from bluecurrent_api
import Client
10 from bluecurrent_api.exceptions
import (
23 from .const
import DOMAIN, EVSE_ID, LOGGER, MODEL_TYPE
25 type BlueCurrentConfigEntry = ConfigEntry[Connector]
27 PLATFORMS = [Platform.SENSOR]
28 CHARGE_POINTS =
"CHARGE_POINTS"
34 VALUE_TYPES = [
"CH_STATUS"]
38 hass: HomeAssistant, config_entry: BlueCurrentConfigEntry
40 """Set up Blue Current as a config entry."""
42 api_token = config_entry.data[CONF_API_TOKEN]
43 connector =
Connector(hass, config_entry, client)
46 await client.validate_api_token(api_token)
47 except InvalidApiToken
as err:
49 except BlueCurrentException
as err:
50 raise ConfigEntryNotReady
from err
51 config_entry.async_create_background_task(
52 hass, connector.run_task(),
"blue_current-websocket"
55 await client.wait_for_charge_points()
56 config_entry.runtime_data = connector
57 await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
63 hass: HomeAssistant, config_entry: BlueCurrentConfigEntry
65 """Unload the Blue Current config entry."""
67 return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
71 """Define a class that connects to the Blue Current websocket API."""
74 self, hass: HomeAssistant, config: BlueCurrentConfigEntry, client: Client
80 self.charge_points: dict[str, dict] = {}
81 self.
gridgrid: dict[str, Any] = {}
83 async
def on_data(self, message: dict) ->
None:
84 """Handle received data."""
86 object_name: str = message[OBJECT]
89 if object_name == CHARGE_POINTS:
90 charge_points_data: list = message[DATA]
94 elif object_name
in VALUE_TYPES:
95 value_data: dict = message[DATA]
96 evse_id = value_data.pop(EVSE_ID)
100 elif GRID
in object_name:
101 data: dict = message[DATA]
106 """Handle incoming chargepoint data."""
107 await asyncio.gather(
110 entry[EVSE_ID], entry[MODEL_TYPE], entry[ATTR_NAME]
112 for entry
in charge_points_data
114 self.
clientclient.get_grid_status(charge_points_data[0][EVSE_ID]),
118 """Add the chargepoint and request their data."""
123 """Add a charge point to charge_points."""
124 self.charge_points[evse_id] = {MODEL_TYPE: model, ATTR_NAME: name}
127 """Update the charge point data."""
128 self.charge_points[evse_id].
update(data)
132 """Dispatch a charge point update signal."""
136 """Dispatch a grid update signal."""
140 """Fetch data when connection is established."""
141 await self.
clientclient.get_charge_points()
144 """Start the receive loop."""
149 except RequestLimitReached:
151 "Request limit reached. reconnecting at 00:00 (Europe/Amsterdam)"
153 delay = self.
clientclient.get_next_reset_delta().seconds
154 except WebsocketError:
155 LOGGER.debug(
"Disconnected, retrying in background")
159 await asyncio.sleep(delay)
164 """Dispatch signals to update entity states."""
165 for evse_id
in self.charge_points:
170 """Disconnect from the websocket."""
171 with suppress(WebsocketError):
172 await self.
clientclient.disconnect()
177 """Returns the connection status."""
None _on_disconnect(self)
None __init__(self, HomeAssistant hass, BlueCurrentConfigEntry config, Client client)
None dispatch_charge_point_update_signal(self, str evse_id)
None add_charge_point(self, str evse_id, str model, str name)
None handle_charge_point(self, str evse_id, str model, str name)
None on_data(self, dict message)
None update_charge_point(self, str evse_id, dict data)
None handle_charge_point_data(self, list charge_points_data)
None dispatch_grid_update_signal(self)
bool async_setup_entry(HomeAssistant hass, BlueCurrentConfigEntry config_entry)
bool async_unload_entry(HomeAssistant hass, BlueCurrentConfigEntry config_entry)
IssData update(pyiss.ISS iss)
def get_status(hass, host, port)
bool is_connected(HomeAssistant hass)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)