Home Assistant Unofficial Reference 2024.12.1
websocket_api.py
Go to the documentation of this file.
1 """The Recorder websocket API."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from datetime import datetime as dt
7 from typing import Any, Literal, cast
8 
9 import voluptuous as vol
10 
11 from homeassistant.components import websocket_api
12 from homeassistant.components.websocket_api import messages
13 from homeassistant.core import HomeAssistant, callback, valid_entity_id
14 from homeassistant.exceptions import HomeAssistantError
15 from homeassistant.helpers import config_validation as cv
16 from homeassistant.helpers.json import json_bytes
17 from homeassistant.util import dt as dt_util
19  AreaConverter,
20  BloodGlucoseConcentrationConverter,
21  ConductivityConverter,
22  DataRateConverter,
23  DistanceConverter,
24  DurationConverter,
25  ElectricCurrentConverter,
26  ElectricPotentialConverter,
27  EnergyConverter,
28  InformationConverter,
29  MassConverter,
30  PowerConverter,
31  PressureConverter,
32  SpeedConverter,
33  TemperatureConverter,
34  UnitlessRatioConverter,
35  VolumeConverter,
36  VolumeFlowRateConverter,
37 )
38 
39 from .models import StatisticPeriod
40 from .statistics import (
41  STATISTIC_UNIT_TO_UNIT_CONVERTER,
42  async_add_external_statistics,
43  async_change_statistics_unit,
44  async_import_statistics,
45  async_list_statistic_ids,
46  list_statistic_ids,
47  statistic_during_period,
48  statistics_during_period,
49  update_statistics_issues,
50  validate_statistics,
51 )
52 from .util import PERIOD_SCHEMA, get_instance, resolve_period
53 
54 CLEAR_STATISTICS_TIME_OUT = 10
55 UPDATE_STATISTICS_METADATA_TIME_OUT = 10
56 
57 UNIT_SCHEMA = vol.Schema(
58  {
59  vol.Optional("area"): vol.In(AreaConverter.VALID_UNITS),
60  vol.Optional("blood_glucose_concentration"): vol.In(
61  BloodGlucoseConcentrationConverter.VALID_UNITS
62  ),
63  vol.Optional("conductivity"): vol.In(ConductivityConverter.VALID_UNITS),
64  vol.Optional("data_rate"): vol.In(DataRateConverter.VALID_UNITS),
65  vol.Optional("distance"): vol.In(DistanceConverter.VALID_UNITS),
66  vol.Optional("duration"): vol.In(DurationConverter.VALID_UNITS),
67  vol.Optional("electric_current"): vol.In(ElectricCurrentConverter.VALID_UNITS),
68  vol.Optional("voltage"): vol.In(ElectricPotentialConverter.VALID_UNITS),
69  vol.Optional("energy"): vol.In(EnergyConverter.VALID_UNITS),
70  vol.Optional("information"): vol.In(InformationConverter.VALID_UNITS),
71  vol.Optional("mass"): vol.In(MassConverter.VALID_UNITS),
72  vol.Optional("power"): vol.In(PowerConverter.VALID_UNITS),
73  vol.Optional("pressure"): vol.In(PressureConverter.VALID_UNITS),
74  vol.Optional("speed"): vol.In(SpeedConverter.VALID_UNITS),
75  vol.Optional("temperature"): vol.In(TemperatureConverter.VALID_UNITS),
76  vol.Optional("unitless"): vol.In(UnitlessRatioConverter.VALID_UNITS),
77  vol.Optional("volume"): vol.In(VolumeConverter.VALID_UNITS),
78  vol.Optional("volume_flow_rate"): vol.In(VolumeFlowRateConverter.VALID_UNITS),
79  }
80 )
81 
82 
83 @callback
84 def async_setup(hass: HomeAssistant) -> None:
85  """Set up the recorder websocket API."""
86  websocket_api.async_register_command(hass, ws_adjust_sum_statistics)
87  websocket_api.async_register_command(hass, ws_change_statistics_unit)
88  websocket_api.async_register_command(hass, ws_clear_statistics)
89  websocket_api.async_register_command(hass, ws_get_statistic_during_period)
90  websocket_api.async_register_command(hass, ws_get_statistics_during_period)
91  websocket_api.async_register_command(hass, ws_get_statistics_metadata)
92  websocket_api.async_register_command(hass, ws_list_statistic_ids)
93  websocket_api.async_register_command(hass, ws_import_statistics)
94  websocket_api.async_register_command(hass, ws_update_statistics_issues)
95  websocket_api.async_register_command(hass, ws_update_statistics_metadata)
96  websocket_api.async_register_command(hass, ws_validate_statistics)
97 
98 
100  hass: HomeAssistant,
101  msg_id: int,
102  start_time: dt | None,
103  end_time: dt | None,
104  statistic_id: str,
105  types: set[Literal["max", "mean", "min", "change"]] | None,
106  units: dict[str, str],
107 ) -> bytes:
108  """Fetch statistics and convert them to json in the executor."""
109  return json_bytes(
110  messages.result_message(
111  msg_id,
113  hass, start_time, end_time, statistic_id, types, units=units
114  ),
115  )
116  )
117 
118 
119 @websocket_api.websocket_command( { vol.Required("type"): "recorder/statistic_during_period",
120  vol.Required("statistic_id"): str,
121  vol.Optional("types"): vol.All(
122  [vol.Any("max", "mean", "min", "change")], vol.Coerce(set)
123  ),
124  vol.Optional("units"): UNIT_SCHEMA,
125  **PERIOD_SCHEMA.schema,
126  }
127 )
128 @websocket_api.async_response
130  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
131 ) -> None:
132  """Handle statistics websocket command."""
133  if ("start_time" in msg or "end_time" in msg) and "duration" in msg:
134  raise HomeAssistantError
135  if "offset" in msg and "duration" not in msg:
136  raise HomeAssistantError
137 
138  start_time, end_time = resolve_period(cast(StatisticPeriod, msg))
139 
140  connection.send_message(
141  await get_instance(hass).async_add_executor_job(
142  _ws_get_statistic_during_period,
143  hass,
144  msg["id"],
145  start_time,
146  end_time,
147  msg["statistic_id"],
148  msg.get("types"),
149  msg.get("units"),
150  )
151  )
152 
153 
155  hass: HomeAssistant,
156  msg_id: int,
157  start_time: dt,
158  end_time: dt | None,
159  statistic_ids: set[str] | None,
160  period: Literal["5minute", "day", "hour", "week", "month"],
161  units: dict[str, str],
162  types: set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]],
163 ) -> bytes:
164  """Fetch statistics and convert them to json in the executor."""
165  result = statistics_during_period(
166  hass,
167  start_time,
168  end_time,
169  statistic_ids,
170  period,
171  units,
172  types,
173  )
174  include_last_reset = "last_reset" in types
175  for statistic_rows in result.values():
176  for row in statistic_rows:
177  row["start"] = int(row["start"] * 1000)
178  row["end"] = int(row["end"] * 1000)
179  if include_last_reset and (last_reset := row["last_reset"]) is not None:
180  row["last_reset"] = int(last_reset * 1000)
181  return json_bytes(messages.result_message(msg_id, result))
182 
183 
185  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
186 ) -> None:
187  """Handle statistics websocket command."""
188  start_time_str = msg["start_time"]
189  end_time_str = msg.get("end_time")
190 
191  if start_time := dt_util.parse_datetime(start_time_str):
192  start_time = dt_util.as_utc(start_time)
193  else:
194  connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
195  return
196 
197  if end_time_str:
198  if end_time := dt_util.parse_datetime(end_time_str):
199  end_time = dt_util.as_utc(end_time)
200  else:
201  connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
202  return
203  else:
204  end_time = None
205 
206  if (types := msg.get("types")) is None:
207  types = {"change", "last_reset", "max", "mean", "min", "state", "sum"}
208  connection.send_message(
209  await get_instance(hass).async_add_executor_job(
210  _ws_get_statistics_during_period,
211  hass,
212  msg["id"],
213  start_time,
214  end_time,
215  set(msg["statistic_ids"]),
216  msg.get("period"),
217  msg.get("units"),
218  types,
219  )
220  )
221 
222 
223 @websocket_api.websocket_command( { vol.Required("type"): "recorder/statistics_during_period",
224  vol.Required("start_time"): str,
225  vol.Optional("end_time"): str,
226  vol.Required("statistic_ids"): vol.All([str], vol.Length(min=1)),
227  vol.Required("period"): vol.Any("5minute", "hour", "day", "week", "month"),
228  vol.Optional("units"): UNIT_SCHEMA,
229  vol.Optional("types"): vol.All(
230  [vol.Any("change", "last_reset", "max", "mean", "min", "state", "sum")],
231  vol.Coerce(set),
232  ),
233  }
234 )
235 @websocket_api.async_response
237  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
238 ) -> None:
239  """Handle statistics websocket command."""
240  await ws_handle_get_statistics_during_period(hass, connection, msg)
241 
242 
244  hass: HomeAssistant,
245  msg_id: int,
246  statistic_type: Literal["mean", "sum"] | None = None,
247 ) -> bytes:
248  """Fetch a list of available statistic_id and convert them to JSON.
249 
250  Runs in the executor.
251  """
252  return json_bytes(
253  messages.result_message(msg_id, list_statistic_ids(hass, None, statistic_type))
254  )
255 
256 
258  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
259 ) -> None:
260  """Fetch a list of available statistic_id."""
261  connection.send_message(
262  await get_instance(hass).async_add_executor_job(
263  _ws_get_list_statistic_ids,
264  hass,
265  msg["id"],
266  msg.get("statistic_type"),
267  )
268  )
269 
270 
271 @websocket_api.websocket_command( { vol.Required("type"): "recorder/list_statistic_ids",
272  vol.Optional("statistic_type"): vol.Any("sum", "mean"),
273  }
274 )
275 @websocket_api.async_response
276 async def ws_list_statistic_ids(
277  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
278 ) -> None:
279  """Fetch a list of available statistic_id."""
280  await ws_handle_list_statistic_ids(hass, connection, msg)
281 
282 
283 @websocket_api.websocket_command( { vol.Required("type"): "recorder/validate_statistics",
284  }
285 )
286 @websocket_api.async_response
287 async def ws_validate_statistics(
288  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
289 ) -> None:
290  """Fetch a list of available statistic_id."""
291  instance = get_instance(hass)
292  statistic_ids = await instance.async_add_executor_job(
293  validate_statistics,
294  hass,
295  )
296  connection.send_result(msg["id"], statistic_ids)
297 
298 
299 @websocket_api.websocket_command( { vol.Required("type"): "recorder/update_statistics_issues",
300  }
301 )
302 @websocket_api.async_response
304  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
305 ) -> None:
306  """Update statistics issues."""
307  instance = get_instance(hass)
308  await instance.async_add_executor_job(
309  update_statistics_issues,
310  hass,
311  )
312  connection.send_result(msg["id"])
313 
314 
315 @websocket_api.require_admin
316 @websocket_api.websocket_command( { vol.Required("type"): "recorder/clear_statistics",
317  vol.Required("statistic_ids"): [str],
318  }
319 )
320 @websocket_api.async_response
321 async def ws_clear_statistics(
322  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
323 ) -> None:
324  """Clear statistics for a list of statistic_ids.
325 
326  Note: The WS call posts a job to the recorder's queue and then returns, it doesn't
327  wait until the job is completed.
328  """
329  done_event = asyncio.Event()
330 
331  def clear_statistics_done() -> None:
332  hass.loop.call_soon_threadsafe(done_event.set)
333 
334  get_instance(hass).async_clear_statistics(
335  msg["statistic_ids"], on_done=clear_statistics_done
336  )
337  try:
338  async with asyncio.timeout(CLEAR_STATISTICS_TIME_OUT):
339  await done_event.wait()
340  except TimeoutError:
341  connection.send_error(
342  msg["id"], websocket_api.ERR_TIMEOUT, "clear_statistics timed out"
343  )
344  return
345 
346  connection.send_result(msg["id"])
347 
348 
349 @websocket_api.websocket_command( { vol.Required("type"): "recorder/get_statistics_metadata",
350  vol.Optional("statistic_ids"): [str],
351  }
352 )
353 @websocket_api.async_response
355  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
356 ) -> None:
357  """Get metadata for a list of statistic_ids."""
358  statistic_ids = msg.get("statistic_ids")
359  statistic_ids_set_or_none = set(statistic_ids) if statistic_ids else None
360  metadata = await async_list_statistic_ids(hass, statistic_ids_set_or_none)
361  connection.send_result(msg["id"], metadata)
362 
363 
364 @websocket_api.require_admin
365 @websocket_api.websocket_command( { vol.Required("type"): "recorder/update_statistics_metadata",
366  vol.Required("statistic_id"): str,
367  vol.Required("unit_of_measurement"): vol.Any(str, None),
368  }
369 )
370 @websocket_api.async_response
372  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
373 ) -> None:
374  """Update statistics metadata for a statistic_id.
375 
376  Only the normalized unit of measurement can be updated.
377  """
378  done_event = asyncio.Event()
379 
380  def update_statistics_metadata_done() -> None:
381  hass.loop.call_soon_threadsafe(done_event.set)
382 
383  get_instance(hass).async_update_statistics_metadata(
384  msg["statistic_id"],
385  new_unit_of_measurement=msg["unit_of_measurement"],
386  on_done=update_statistics_metadata_done,
387  )
388  try:
389  async with asyncio.timeout(UPDATE_STATISTICS_METADATA_TIME_OUT):
390  await done_event.wait()
391  except TimeoutError:
392  connection.send_error(
393  msg["id"], websocket_api.ERR_TIMEOUT, "update_statistics_metadata timed out"
394  )
395  return
396 
397  connection.send_result(msg["id"])
398 
399 
400 @websocket_api.require_admin
401 @websocket_api.websocket_command( { vol.Required("type"): "recorder/change_statistics_unit",
402  vol.Required("statistic_id"): str,
403  vol.Required("new_unit_of_measurement"): vol.Any(str, None),
404  vol.Required("old_unit_of_measurement"): vol.Any(str, None),
405  }
406 )
407 @callback
409  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
410 ) -> None:
411  """Change the unit_of_measurement for a statistic_id.
412 
413  All existing statistics will be converted to the new unit.
414  """
416  hass,
417  msg["statistic_id"],
418  new_unit_of_measurement=msg["new_unit_of_measurement"],
419  old_unit_of_measurement=msg["old_unit_of_measurement"],
420  )
421  connection.send_result(msg["id"])
422 
423 
424 @websocket_api.require_admin
425 @websocket_api.websocket_command( { vol.Required("type"): "recorder/adjust_sum_statistics",
426  vol.Required("statistic_id"): str,
427  vol.Required("start_time"): str,
428  vol.Required("adjustment"): vol.Any(float, int),
429  vol.Required("adjustment_unit_of_measurement"): vol.Any(str, None),
430  }
431 )
432 @websocket_api.async_response
433 async def ws_adjust_sum_statistics(
434  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
435 ) -> None:
436  """Adjust sum statistics.
437 
438  If the statistics is stored as NORMALIZED_UNIT,
439  it's allowed to make an adjustment in VALID_UNIT
440  """
441  start_time_str = msg["start_time"]
442 
443  if start_time := dt_util.parse_datetime(start_time_str):
444  start_time = dt_util.as_utc(start_time)
445  else:
446  connection.send_error(msg["id"], "invalid_start_time", "Invalid start time")
447  return
448 
449  instance = get_instance(hass)
450  metadatas = await instance.async_add_executor_job(
451  list_statistic_ids, hass, {msg["statistic_id"]}
452  )
453  if not metadatas:
454  connection.send_error(msg["id"], "unknown_statistic_id", "Unknown statistic ID")
455  return
456  metadata = metadatas[0]
457 
458  def valid_units(statistics_unit: str | None, adjustment_unit: str | None) -> bool:
459  if statistics_unit == adjustment_unit:
460  return True
461  converter = STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistics_unit)
462  if converter is not None and adjustment_unit in converter.VALID_UNITS:
463  return True
464  return False
465 
466  stat_unit = metadata["statistics_unit_of_measurement"]
467  adjustment_unit = msg["adjustment_unit_of_measurement"]
468  if not valid_units(stat_unit, adjustment_unit):
469  connection.send_error(
470  msg["id"],
471  "invalid_units",
472  f"Can't convert {stat_unit} to {adjustment_unit}",
473  )
474  return
475 
476  get_instance(hass).async_adjust_statistics(
477  msg["statistic_id"], start_time, msg["adjustment"], adjustment_unit
478  )
479  connection.send_result(msg["id"])
480 
481 
482 @websocket_api.require_admin
483 @websocket_api.websocket_command( { vol.Required("type"): "recorder/import_statistics",
484  vol.Required("metadata"): {
485  vol.Required("has_mean"): bool,
486  vol.Required("has_sum"): bool,
487  vol.Required("name"): vol.Any(str, None),
488  vol.Required("source"): str,
489  vol.Required("statistic_id"): str,
490  vol.Required("unit_of_measurement"): vol.Any(str, None),
491  },
492  vol.Required("stats"): [
493  {
494  vol.Required("start"): cv.datetime,
495  vol.Optional("mean"): vol.Any(float, int),
496  vol.Optional("min"): vol.Any(float, int),
497  vol.Optional("max"): vol.Any(float, int),
498  vol.Optional("last_reset"): vol.Any(cv.datetime, None),
499  vol.Optional("state"): vol.Any(float, int),
500  vol.Optional("sum"): vol.Any(float, int),
501  }
502  ],
503  }
504 )
505 @callback
507  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
508 ) -> None:
509  """Import statistics."""
510  metadata = msg["metadata"]
511  stats = msg["stats"]
512 
513  if valid_entity_id(metadata["statistic_id"]):
514  async_import_statistics(hass, metadata, stats)
515  else:
516  async_add_external_statistics(hass, metadata, stats)
517  connection.send_result(msg["id"])
518 
dict[str, list[StatisticsRow]] statistics_during_period(HomeAssistant hass, datetime start_time, datetime|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] types)
Definition: statistics.py:1825
dict[str, Any] statistic_during_period(HomeAssistant hass, datetime|None start_time, datetime|None end_time, str statistic_id, set[Literal["max", "mean", "min", "change"]]|None types, dict[str, str]|None units)
Definition: statistics.py:1463
None async_change_statistics_unit(HomeAssistant hass, str statistic_id, *str new_unit_of_measurement, str old_unit_of_measurement)
Definition: statistics.py:2562
list[dict] list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
Definition: statistics.py:858
None async_import_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2298
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
Definition: statistics.py:2318
list[dict] async_list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
Definition: statistics.py:788
tuple[datetime|None, datetime|None] resolve_period(StatisticPeriod period_def)
Definition: util.py:873
bytes _ws_get_statistics_during_period(HomeAssistant hass, int msg_id, dt start_time, dt|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str] units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] types)
None ws_handle_get_statistics_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_update_statistics_issues(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_clear_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bytes _ws_get_statistic_during_period(HomeAssistant hass, int msg_id, dt|None start_time, dt|None end_time, str statistic_id, set[Literal["max", "mean", "min", "change"]]|None types, dict[str, str] units)
None ws_get_statistic_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_list_statistic_ids(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_handle_list_statistic_ids(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_change_statistics_unit(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bytes _ws_get_list_statistic_ids(HomeAssistant hass, int msg_id, Literal["mean", "sum"]|None statistic_type=None)
None ws_adjust_sum_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_import_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_get_statistics_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_validate_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_update_statistics_metadata(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_get_statistics_metadata(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bool valid_entity_id(str entity_id)
Definition: core.py:235
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74