Home Assistant Unofficial Reference 2024.12.1
websocket_api.py
Go to the documentation of this file.
1 """The Energy websocket API."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import defaultdict
7 from collections.abc import Callable, Coroutine
8 from datetime import timedelta
9 import functools
10 from itertools import chain
11 from typing import Any, cast
12 
13 import voluptuous as vol
14 
15 from homeassistant.components import recorder, websocket_api
16 from homeassistant.components.recorder.statistics import StatisticsRow
17 from homeassistant.const import UnitOfEnergy
18 from homeassistant.core import HomeAssistant, callback
20  async_process_integration_platforms,
21 )
22 from homeassistant.helpers.singleton import singleton
23 from homeassistant.util import dt as dt_util
24 
25 from .const import DOMAIN
26 from .data import (
27  DEVICE_CONSUMPTION_SCHEMA,
28  ENERGY_SOURCE_SCHEMA,
29  EnergyManager,
30  EnergyPreferencesUpdate,
31  async_get_manager,
32 )
33 from .types import EnergyPlatform, GetSolarForecastType, SolarForecastType
34 from .validate import async_validate
35 
36 type EnergyWebSocketCommandHandler = Callable[
37  [HomeAssistant, websocket_api.ActiveConnection, dict[str, Any], EnergyManager],
38  None,
39 ]
40 type AsyncEnergyWebSocketCommandHandler = Callable[
41  [HomeAssistant, websocket_api.ActiveConnection, dict[str, Any], EnergyManager],
42  Coroutine[Any, Any, None],
43 ]
44 
45 
46 @callback
47 def async_setup(hass: HomeAssistant) -> None:
48  """Set up the energy websocket API."""
49  websocket_api.async_register_command(hass, ws_get_prefs)
50  websocket_api.async_register_command(hass, ws_save_prefs)
51  websocket_api.async_register_command(hass, ws_info)
52  websocket_api.async_register_command(hass, ws_validate)
53  websocket_api.async_register_command(hass, ws_solar_forecast)
54  websocket_api.async_register_command(hass, ws_get_fossil_energy_consumption)
55 
56 
57 @singleton("energy_platforms")
59  hass: HomeAssistant,
60 ) -> dict[str, GetSolarForecastType]:
61  """Get energy platforms."""
62  platforms: dict[str, GetSolarForecastType] = {}
63 
64  @callback
65  def _process_energy_platform(
66  hass: HomeAssistant,
67  domain: str,
68  platform: EnergyPlatform,
69  ) -> None:
70  """Process energy platforms."""
71  if not hasattr(platform, "async_get_solar_forecast"):
72  return
73 
74  platforms[domain] = platform.async_get_solar_forecast
75 
77  hass, DOMAIN, _process_energy_platform, wait_for_platforms=True
78  )
79 
80  return platforms
81 
82 
84  func: AsyncEnergyWebSocketCommandHandler | EnergyWebSocketCommandHandler,
85 ) -> websocket_api.AsyncWebSocketCommandHandler:
86  """Decorate a function to pass in a manager."""
87 
88  @functools.wraps(func)
89  async def with_manager(
90  hass: HomeAssistant,
92  msg: dict[str, Any],
93  ) -> None:
94  manager = await async_get_manager(hass)
95 
96  result = func(hass, connection, msg, manager)
97 
98  if asyncio.iscoroutine(result):
99  await result
100 
101  return with_manager
102 
103 
104 @websocket_api.websocket_command( { vol.Required("type"): "energy/get_prefs",
105  }
106 )
107 @websocket_api.async_response
108 @_ws_with_manager
109 @callback
110 def ws_get_prefs(
111  hass: HomeAssistant,
113  msg: dict[str, Any],
114  manager: EnergyManager,
115 ) -> None:
116  """Handle get prefs command."""
117  if manager.data is None:
118  connection.send_error(msg["id"], websocket_api.ERR_NOT_FOUND, "No prefs")
119  return
120 
121  connection.send_result(msg["id"], manager.data)
122 
123 
124 @websocket_api.require_admin
125 @websocket_api.websocket_command( { vol.Required("type"): "energy/save_prefs",
126  vol.Optional("energy_sources"): ENERGY_SOURCE_SCHEMA,
127  vol.Optional("device_consumption"): [DEVICE_CONSUMPTION_SCHEMA],
128  }
129 )
130 @websocket_api.async_response
131 @_ws_with_manager
132 async def ws_save_prefs(
133  hass: HomeAssistant,
134  connection: websocket_api.ActiveConnection,
135  msg: dict[str, Any],
136  manager: EnergyManager,
137 ) -> None:
138  """Handle get prefs command."""
139  msg_id = msg.pop("id")
140  msg.pop("type")
141  await manager.async_update(cast(EnergyPreferencesUpdate, msg))
142  connection.send_result(msg_id, manager.data)
143 
144 
145 @websocket_api.websocket_command( { vol.Required("type"): "energy/info",
146  }
147 )
148 @websocket_api.async_response
149 async def ws_info(
150  hass: HomeAssistant,
151  connection: websocket_api.ActiveConnection,
152  msg: dict[str, Any],
153 ) -> None:
154  """Handle get info command."""
155  forecast_platforms = await async_get_energy_platforms(hass)
156  connection.send_result(
157  msg["id"],
158  {
159  "cost_sensors": hass.data[DOMAIN]["cost_sensors"],
160  "solar_forecast_domains": list(forecast_platforms),
161  },
162  )
163 
164 
165 @websocket_api.websocket_command( { vol.Required("type"): "energy/validate",
166  }
167 )
168 @websocket_api.async_response
169 async def ws_validate(
170  hass: HomeAssistant,
171  connection: websocket_api.ActiveConnection,
172  msg: dict[str, Any],
173 ) -> None:
174  """Handle validate command."""
175  connection.send_result(msg["id"], (await async_validate(hass)).as_dict())
176 
177 
178 @websocket_api.websocket_command( { vol.Required("type"): "energy/solar_forecast",
179  }
180 )
181 @websocket_api.async_response
182 @_ws_with_manager
183 async def ws_solar_forecast(
184  hass: HomeAssistant,
185  connection: websocket_api.ActiveConnection,
186  msg: dict[str, Any],
187  manager: EnergyManager,
188 ) -> None:
189  """Handle solar forecast command."""
190  if manager.data is None:
191  connection.send_result(msg["id"], {})
192  return
193 
194  config_entries: dict[str, str | None] = {}
195 
196  for source in manager.data["energy_sources"]:
197  if (
198  source["type"] != "solar"
199  or (solar_forecast := source.get("config_entry_solar_forecast")) is None
200  ):
201  continue
202 
203  for entry in solar_forecast:
204  config_entries[entry] = None
205 
206  if not config_entries:
207  connection.send_result(msg["id"], {})
208  return
209 
210  forecasts: dict[str, SolarForecastType] = {}
211 
212  forecast_platforms = await async_get_energy_platforms(hass)
213 
214  for config_entry_id in config_entries:
215  config_entry = hass.config_entries.async_get_entry(config_entry_id)
216  # Filter out non-existing config entries or unsupported domains
217 
218  if config_entry is None or config_entry.domain not in forecast_platforms:
219  continue
220 
221  forecast = await forecast_platforms[config_entry.domain](hass, config_entry_id)
222 
223  if forecast is not None:
224  forecasts[config_entry_id] = forecast
225 
226  connection.send_result(msg["id"], forecasts)
227 
228 
229 @websocket_api.websocket_command( { vol.Required("type"): "energy/fossil_energy_consumption",
230  vol.Required("start_time"): str,
231  vol.Required("end_time"): str,
232  vol.Required("energy_statistic_ids"): [str],
233  vol.Required("co2_statistic_id"): str,
234  vol.Required("period"): vol.Any("5minute", "hour", "day", "month"),
235  }
236 )
237 @websocket_api.async_response
239  hass: HomeAssistant,
240  connection: websocket_api.ActiveConnection,
241  msg: dict[str, Any],
242 ) -> None:
243  """Calculate amount of fossil based energy."""
244  start_time_str = msg["start_time"]
245  end_time_str = msg["end_time"]
246 
247  if start_time := dt_util.parse_datetime(start_time_str):
248  start_time = dt_util.as_utc(start_time)
249  else:
250  connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
251  return
252 
253  if end_time := dt_util.parse_datetime(end_time_str):
254  end_time = dt_util.as_utc(end_time)
255  else:
256  connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
257  return
258 
259  statistic_ids = set(msg["energy_statistic_ids"])
260  statistic_ids.add(msg["co2_statistic_id"])
261 
262  # Fetch energy + CO2 statistics
263  statistics = await recorder.get_instance(hass).async_add_executor_job(
264  recorder.statistics.statistics_during_period,
265  hass,
266  start_time,
267  end_time,
268  statistic_ids,
269  "hour",
270  {"energy": UnitOfEnergy.KILO_WATT_HOUR},
271  {"mean", "change"},
272  )
273 
274  def _combine_change_statistics(
275  stats: dict[str, list[StatisticsRow]], statistic_ids: list[str]
276  ) -> dict[float, float]:
277  """Combine multiple statistics, returns a dict indexed by start time."""
278  result: defaultdict[float, float] = defaultdict(float)
279 
280  for statistics_id, stat in stats.items():
281  if statistics_id not in statistic_ids:
282  continue
283  for period in stat:
284  if period["change"] is None:
285  continue
286  result[period["start"]] += period["change"]
287 
288  return {key: result[key] for key in sorted(result)}
289 
290  def _reduce_deltas(
291  stat_list: list[dict[str, Any]],
292  same_period: Callable[[float, float], bool],
293  period_start_end: Callable[[float], tuple[float, float]],
294  period: timedelta,
295  ) -> list[dict[str, Any]]:
296  """Reduce hourly deltas to daily or monthly deltas."""
297  result: list[dict[str, Any]] = []
298  deltas: list[float] = []
299  if not stat_list:
300  return result
301  prev_stat: dict[str, Any] = stat_list[0]
302  fake_stat = {"start": stat_list[-1]["start"] + period.total_seconds()}
303 
304  # Loop over the hourly deltas + a fake entry to end the period
305  for statistic in chain(stat_list, (fake_stat,)):
306  if not same_period(prev_stat["start"], statistic["start"]):
307  start, _ = period_start_end(prev_stat["start"])
308  # The previous statistic was the last entry of the period
309  result.append(
310  {
311  "start": dt_util.utc_from_timestamp(start).isoformat(),
312  "delta": sum(deltas),
313  }
314  )
315  deltas = []
316  if statistic.get("delta") is not None:
317  deltas.append(statistic["delta"])
318  prev_stat = statistic
319 
320  return result
321 
322  merged_energy_statistics = _combine_change_statistics(
323  statistics, msg["energy_statistic_ids"]
324  )
325  indexed_co2_statistics = cast(
326  dict[float, float],
327  {
328  period["start"]: period["mean"]
329  for period in statistics.get(msg["co2_statistic_id"], {})
330  },
331  )
332 
333  # Calculate amount of fossil based energy, assume 100% fossil if missing
334  fossil_energy = [
335  {"start": start, "delta": delta * indexed_co2_statistics.get(start, 100) / 100}
336  for start, delta in merged_energy_statistics.items()
337  ]
338 
339  if msg["period"] == "hour":
340  reduced_fossil_energy = [
341  {
342  "start": dt_util.utc_from_timestamp(period["start"]).isoformat(),
343  "delta": period["delta"],
344  }
345  for period in fossil_energy
346  ]
347 
348  elif msg["period"] == "day":
349  _same_day_ts, _day_start_end_ts = recorder.statistics.reduce_day_ts_factory()
350  reduced_fossil_energy = _reduce_deltas(
351  fossil_energy,
352  _same_day_ts,
353  _day_start_end_ts,
354  timedelta(days=1),
355  )
356  else:
357  (
358  _same_month_ts,
359  _month_start_end_ts,
360  ) = recorder.statistics.reduce_month_ts_factory()
361  reduced_fossil_energy = _reduce_deltas(
362  fossil_energy,
363  _same_month_ts,
364  _month_start_end_ts,
365  timedelta(days=1),
366  )
367 
368  result = {period["start"]: period["delta"] for period in reduced_fossil_energy}
369  connection.send_result(msg["id"], result)
370 
EnergyManager async_get_manager(HomeAssistant hass)
Definition: data.py:22
EnergyPreferencesValidation async_validate(HomeAssistant hass)
Definition: validate.py:318
None ws_validate(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_solar_forecast(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg, EnergyManager manager)
None ws_info(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_save_prefs(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg, EnergyManager manager)
None ws_get_prefs(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg, EnergyManager manager)
websocket_api.AsyncWebSocketCommandHandler _ws_with_manager(AsyncEnergyWebSocketCommandHandler|EnergyWebSocketCommandHandler func)
None ws_get_fossil_energy_consumption(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
dict[str, GetSolarForecastType] async_get_energy_platforms(HomeAssistant hass)
None async_process_integration_platforms(HomeAssistant hass, str platform_name, Callable[[HomeAssistant, str, Any], Awaitable[None]|None] process_platform, bool wait_for_platforms=False)