1 """Provides the data update coordinators for SolarEdge."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
6 from datetime
import date, datetime, timedelta
9 from aiosolaredge
import SolarEdge
10 from stringcase
import snakecase
18 INVENTORY_UPDATE_DELAY,
20 OVERVIEW_UPDATE_DELAY,
21 POWER_FLOW_UPDATE_DELAY,
26 """Get and update the latest data."""
28 coordinator: DataUpdateCoordinator[
None]
30 def __init__(self, hass: HomeAssistant, api: SolarEdge, site_id: str) ->
None:
31 """Initialize the data object."""
35 self.data: dict[str, Any] = {}
36 self.attributes: dict[str, Any] = {}
42 """Coordinator creation."""
54 """Update interval."""
61 class SolarEdgeOverviewDataService(SolarEdgeDataService):
62 """Get and update the latest overview data."""
66 """Update interval."""
67 return OVERVIEW_UPDATE_DELAY
70 """Update the data from the SolarEdge Monitoring API."""
72 data = await self.
apiapi.get_overview(self.
site_idsite_id)
73 overview = data[
"overview"]
74 except KeyError
as ex:
75 raise UpdateFailed(
"Missing overview data, skipping update")
from ex
79 energy_keys = [
"lifeTimeData",
"lastYearData",
"lastMonthData",
"lastDayData"]
80 for key, value
in overview.items():
81 if key
in energy_keys:
82 data = value[
"energy"]
83 elif key
in [
"currentPower"]:
87 self.
datadata[key] = data
92 if set(energy_keys).issubset(self.
datadata.keys()):
93 for index, key
in enumerate(energy_keys, start=1):
95 if any(self.
datadata[k] > self.
datadata[key]
for k
in energy_keys[index:]):
97 "Ignoring invalid energy value %s for %s", self.
datadata[key], key
99 self.
datadata.pop(key)
101 LOGGER.debug(
"Updated SolarEdge overview: %s", self.
datadata)
105 """Get and update the latest details data."""
109 """Update interval."""
110 return DETAILS_UPDATE_DELAY
113 """Update the data from the SolarEdge Monitoring API."""
116 data = await self.
apiapi.get_details(self.
site_idsite_id)
117 details = data[
"details"]
118 except KeyError
as ex:
119 raise UpdateFailed(
"Missing details data, skipping update")
from ex
124 for key, value
in details.items():
127 if key
in [
"primary_module"]:
128 for module_key, module_value
in value.items():
129 self.
attributesattributes[snakecase(module_key)] = module_value
138 elif key ==
"status":
139 self.
datadata[
"status"] = value
142 "Updated SolarEdge details: %s, %s",
149 """Get and update the latest inventory data."""
153 """Update interval."""
154 return INVENTORY_UPDATE_DELAY
157 """Update the data from the SolarEdge Monitoring API."""
159 data = await self.
apiapi.get_inventory(self.
site_idsite_id)
160 inventory = data[
"Inventory"]
161 except KeyError
as ex:
162 raise UpdateFailed(
"Missing inventory data, skipping update")
from ex
167 for key, value
in inventory.items():
168 self.
datadata[key] = len(value)
171 LOGGER.debug(
"Updated SolarEdge inventory: %s, %s", self.
datadata, self.
attributesattributes)
175 """Get and update the latest power flow data."""
177 def __init__(self, hass: HomeAssistant, api: SolarEdge, site_id: str) ->
None:
178 """Initialize the power flow data service."""
179 super().
__init__(hass, api, site_id)
185 """Update interval."""
186 return ENERGY_DETAILS_DELAY
189 """Update the data from the SolarEdge Monitoring API."""
193 midnight = datetime.combine(today, datetime.min.time())
194 data = await self.
apiapi.get_energy_details(
200 energy_details = data[
"energyDetails"]
201 except KeyError
as ex:
202 raise UpdateFailed(
"Missing power flow data, skipping update")
from ex
204 if "meters" not in energy_details:
206 "Missing meters in energy details data. Assuming site does not have any"
212 self.
unitunit = energy_details[
"unit"]
214 for meter
in energy_details[
"meters"]:
215 if "type" not in meter
or "values" not in meter:
217 if meter[
"type"]
not in [
225 if len(meter[
"values"][0]) == 2:
226 self.
datadata[meter[
"type"]] = meter[
"values"][0][
"value"]
227 self.
attributesattributes[meter[
"type"]] = {
"date": meter[
"values"][0][
"date"]}
230 "Updated SolarEdge energy details: %s, %s", self.
datadata, self.
attributesattributes
235 """Get and update the latest power flow data."""
237 def __init__(self, hass: HomeAssistant, api: SolarEdge, site_id: str) ->
None:
238 """Initialize the power flow data service."""
239 super().
__init__(hass, api, site_id)
245 """Update interval."""
246 return POWER_FLOW_UPDATE_DELAY
249 """Update the data from the SolarEdge Monitoring API."""
251 data = await self.
apiapi.get_current_power_flow(self.
site_idsite_id)
252 power_flow = data[
"siteCurrentPowerFlow"]
253 except KeyError
as ex:
254 raise UpdateFailed(
"Missing power flow data, skipping update")
from ex
259 if "connections" not in power_flow:
261 "Missing connections in power flow data. Assuming site does not"
266 for connection
in power_flow[
"connections"]:
267 power_from.append(connection[
"from"].lower())
268 power_to.append(connection[
"to"].lower())
272 self.
unitunit = power_flow[
"unit"]
274 for key, value
in power_flow.items():
275 if key
in [
"LOAD",
"PV",
"GRID",
"STORAGE"]:
276 self.
datadata[key] = value.get(
"currentPower")
277 self.
attributesattributes[key] = {
"status": value[
"status"]}
280 export = key.lower()
in power_to
281 if self.
datadata[key]:
282 self.
datadata[key] *= -1
if export
else 1
283 self.
attributesattributes[key][
"flow"] =
"export" if export
else "import"
285 if key
in [
"STORAGE"]:
286 charge = key.lower()
in power_to
287 if self.
datadata[key]:
288 self.
datadata[key] *= -1
if charge
else 1
289 self.
attributesattributes[key][
"flow"] =
"charge" if charge
else "discharge"
290 self.
attributesattributes[key][
"soc"] = value[
"chargeLevel"]
292 LOGGER.debug(
"Updated SolarEdge power flow: %s, %s", self.
datadata, self.
attributesattributes)
None __init__(self, HomeAssistant hass, SolarEdge api, str site_id)
None async_update_data(self)
timedelta update_interval(self)
timedelta update_interval(self)
None async_update_data(self)
timedelta update_interval(self)
None async_update_data(self)
None __init__(self, HomeAssistant hass, SolarEdge api, str site_id)
timedelta update_interval(self)
None async_update_data(self)
timedelta update_interval(self)
None async_update_data(self)
None __init__(self, HomeAssistant hass, SolarEdge api, str site_id)
None async_update_data(self)
timedelta update_interval(self)
web.Response get(self, web.Request request, str config_key)