1 """The Energy websocket API."""
3 from __future__
import annotations
6 from collections
import defaultdict
7 from collections.abc
import Callable, Coroutine
8 from datetime
import timedelta
10 from itertools
import chain
11 from typing
import Any, cast
13 import voluptuous
as vol
20 async_process_integration_platforms,
25 from .const
import DOMAIN
27 DEVICE_CONSUMPTION_SCHEMA,
30 EnergyPreferencesUpdate,
33 from .types
import EnergyPlatform, GetSolarForecastType, SolarForecastType
34 from .validate
import async_validate
36 type EnergyWebSocketCommandHandler = Callable[
40 type AsyncEnergyWebSocketCommandHandler = Callable[
42 Coroutine[Any, Any,
None],
48 """Set up the energy websocket API."""
49 websocket_api.async_register_command(hass, ws_get_prefs)
50 websocket_api.async_register_command(hass, ws_save_prefs)
51 websocket_api.async_register_command(hass, ws_info)
52 websocket_api.async_register_command(hass, ws_validate)
53 websocket_api.async_register_command(hass, ws_solar_forecast)
54 websocket_api.async_register_command(hass, ws_get_fossil_energy_consumption)
57 @singleton("energy_platforms")
60 ) -> dict[str, GetSolarForecastType]:
61 """Get energy platforms."""
62 platforms: dict[str, GetSolarForecastType] = {}
65 def _process_energy_platform(
68 platform: EnergyPlatform,
70 """Process energy platforms."""
71 if not hasattr(platform,
"async_get_solar_forecast"):
74 platforms[domain] = platform.async_get_solar_forecast
77 hass, DOMAIN, _process_energy_platform, wait_for_platforms=
True
84 func: AsyncEnergyWebSocketCommandHandler | EnergyWebSocketCommandHandler,
85 ) -> websocket_api.AsyncWebSocketCommandHandler:
86 """Decorate a function to pass in a manager."""
88 @functools.wraps(func)
89 async
def with_manager(
96 result = func(hass, connection, msg, manager)
98 if asyncio.iscoroutine(result):
104 @websocket_api.websocket_command(
{
vol.Required("type"):
"energy/get_prefs",
107 @websocket_api.async_response
114 manager: EnergyManager,
116 """Handle get prefs command."""
117 if manager.data
is None:
118 connection.send_error(msg[
"id"], websocket_api.ERR_NOT_FOUND,
"No prefs")
121 connection.send_result(msg[
"id"], manager.data)
124 @websocket_api.require_admin
125 @websocket_api.websocket_command(
{
vol.Required("type"):
"energy/save_prefs",
126 vol.Optional(
"energy_sources"): ENERGY_SOURCE_SCHEMA,
127 vol.Optional(
"device_consumption"): [DEVICE_CONSUMPTION_SCHEMA],
130 @websocket_api.async_response
136 manager: EnergyManager,
138 """Handle get prefs command."""
139 msg_id = msg.pop(
"id")
141 await manager.async_update(cast(EnergyPreferencesUpdate, msg))
142 connection.send_result(msg_id, manager.data)
145 @websocket_api.websocket_command(
{
vol.Required("type"):
"energy/info",
148 @websocket_api.async_response
154 """Handle get info command."""
156 connection.send_result(
159 "cost_sensors": hass.data[DOMAIN][
"cost_sensors"],
160 "solar_forecast_domains":
list(forecast_platforms),
165 @websocket_api.websocket_command(
{
vol.Required("type"):
"energy/validate",
168 @websocket_api.async_response
174 """Handle validate command."""
175 connection.send_result(msg[
"id"], (await
async_validate(hass)).as_dict())
178 @websocket_api.websocket_command(
{
vol.Required("type"):
"energy/solar_forecast",
181 @websocket_api.async_response
187 manager: EnergyManager,
189 """Handle solar forecast command."""
190 if manager.data
is None:
191 connection.send_result(msg[
"id"], {})
194 config_entries: dict[str, str |
None] = {}
196 for source
in manager.data[
"energy_sources"]:
198 source[
"type"] !=
"solar"
199 or (solar_forecast := source.get(
"config_entry_solar_forecast"))
is None
203 for entry
in solar_forecast:
204 config_entries[entry] =
None
206 if not config_entries:
207 connection.send_result(msg[
"id"], {})
210 forecasts: dict[str, SolarForecastType] = {}
214 for config_entry_id
in config_entries:
215 config_entry = hass.config_entries.async_get_entry(config_entry_id)
218 if config_entry
is None or config_entry.domain
not in forecast_platforms:
221 forecast = await forecast_platforms[config_entry.domain](hass, config_entry_id)
223 if forecast
is not None:
224 forecasts[config_entry_id] = forecast
226 connection.send_result(msg[
"id"], forecasts)
229 @websocket_api.websocket_command(
{
vol.Required("type"):
"energy/fossil_energy_consumption",
230 vol.Required(
"start_time"): str,
231 vol.Required(
"end_time"): str,
232 vol.Required(
"energy_statistic_ids"): [str],
233 vol.Required(
"co2_statistic_id"): str,
234 vol.Required(
"period"): vol.Any(
"5minute",
"hour",
"day",
"month"),
237 @websocket_api.async_response
243 """Calculate amount of fossil based energy."""
244 start_time_str = msg[
"start_time"]
245 end_time_str = msg[
"end_time"]
247 if start_time := dt_util.parse_datetime(start_time_str):
248 start_time = dt_util.as_utc(start_time)
250 connection.send_error(msg[
"id"],
"invalid_start_time",
"Invalid start_time")
253 if end_time := dt_util.parse_datetime(end_time_str):
254 end_time = dt_util.as_utc(end_time)
256 connection.send_error(msg[
"id"],
"invalid_end_time",
"Invalid end_time")
259 statistic_ids = set(msg[
"energy_statistic_ids"])
260 statistic_ids.add(msg[
"co2_statistic_id"])
263 statistics = await recorder.get_instance(hass).async_add_executor_job(
264 recorder.statistics.statistics_during_period,
270 {
"energy": UnitOfEnergy.KILO_WATT_HOUR},
274 def _combine_change_statistics(
275 stats: dict[str, list[StatisticsRow]], statistic_ids: list[str]
276 ) -> dict[float, float]:
277 """Combine multiple statistics, returns a dict indexed by start time."""
278 result: defaultdict[float, float] = defaultdict(float)
280 for statistics_id, stat
in stats.items():
281 if statistics_id
not in statistic_ids:
284 if period[
"change"]
is None:
286 result[period[
"start"]] += period[
"change"]
288 return {key: result[key]
for key
in sorted(result)}
291 stat_list: list[dict[str, Any]],
292 same_period: Callable[[float, float], bool],
293 period_start_end: Callable[[float], tuple[float, float]],
295 ) -> list[dict[str, Any]]:
296 """Reduce hourly deltas to daily or monthly deltas."""
297 result: list[dict[str, Any]] = []
298 deltas: list[float] = []
301 prev_stat: dict[str, Any] = stat_list[0]
302 fake_stat = {
"start": stat_list[-1][
"start"] + period.total_seconds()}
305 for statistic
in chain(stat_list, (fake_stat,)):
306 if not same_period(prev_stat[
"start"], statistic[
"start"]):
307 start, _ = period_start_end(prev_stat[
"start"])
311 "start": dt_util.utc_from_timestamp(start).isoformat(),
312 "delta": sum(deltas),
316 if statistic.get(
"delta")
is not None:
317 deltas.append(statistic[
"delta"])
318 prev_stat = statistic
322 merged_energy_statistics = _combine_change_statistics(
323 statistics, msg[
"energy_statistic_ids"]
325 indexed_co2_statistics = cast(
328 period[
"start"]: period[
"mean"]
329 for period
in statistics.get(msg[
"co2_statistic_id"], {})
335 {
"start": start,
"delta": delta * indexed_co2_statistics.get(start, 100) / 100}
336 for start, delta
in merged_energy_statistics.items()
339 if msg[
"period"] ==
"hour":
340 reduced_fossil_energy = [
342 "start": dt_util.utc_from_timestamp(period[
"start"]).isoformat(),
343 "delta": period[
"delta"],
345 for period
in fossil_energy
348 elif msg[
"period"] ==
"day":
349 _same_day_ts, _day_start_end_ts = recorder.statistics.reduce_day_ts_factory()
350 reduced_fossil_energy = _reduce_deltas(
360 ) = recorder.statistics.reduce_month_ts_factory()
361 reduced_fossil_energy = _reduce_deltas(
368 result = {period[
"start"]: period[
"delta"]
for period
in reduced_fossil_energy}
369 connection.send_result(msg[
"id"], result)
370
EnergyManager async_get_manager(HomeAssistant hass)
EnergyPreferencesValidation async_validate(HomeAssistant hass)
None ws_validate(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None async_setup(HomeAssistant hass)
None ws_solar_forecast(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg, EnergyManager manager)
None ws_info(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_save_prefs(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg, EnergyManager manager)
None ws_get_prefs(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg, EnergyManager manager)
websocket_api.AsyncWebSocketCommandHandler _ws_with_manager(AsyncEnergyWebSocketCommandHandler|EnergyWebSocketCommandHandler func)
None ws_get_fossil_energy_consumption(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
dict[str, GetSolarForecastType] async_get_energy_platforms(HomeAssistant hass)