1 """The Recorder websocket API."""
3 from __future__
import annotations
6 from datetime
import datetime
as dt
7 from typing
import Any, Literal, cast
9 import voluptuous
as vol
20 BloodGlucoseConcentrationConverter,
21 ConductivityConverter,
25 ElectricCurrentConverter,
26 ElectricPotentialConverter,
34 UnitlessRatioConverter,
36 VolumeFlowRateConverter,
39 from .models
import StatisticPeriod
40 from .statistics
import (
41 STATISTIC_UNIT_TO_UNIT_CONVERTER,
42 async_add_external_statistics,
43 async_change_statistics_unit,
44 async_import_statistics,
45 async_list_statistic_ids,
47 statistic_during_period,
48 statistics_during_period,
49 update_statistics_issues,
52 from .util
import PERIOD_SCHEMA, get_instance, resolve_period
54 CLEAR_STATISTICS_TIME_OUT = 10
55 UPDATE_STATISTICS_METADATA_TIME_OUT = 10
57 UNIT_SCHEMA = vol.Schema(
59 vol.Optional(
"area"): vol.In(AreaConverter.VALID_UNITS),
60 vol.Optional(
"blood_glucose_concentration"): vol.In(
61 BloodGlucoseConcentrationConverter.VALID_UNITS
63 vol.Optional(
"conductivity"): vol.In(ConductivityConverter.VALID_UNITS),
64 vol.Optional(
"data_rate"): vol.In(DataRateConverter.VALID_UNITS),
65 vol.Optional(
"distance"): vol.In(DistanceConverter.VALID_UNITS),
66 vol.Optional(
"duration"): vol.In(DurationConverter.VALID_UNITS),
67 vol.Optional(
"electric_current"): vol.In(ElectricCurrentConverter.VALID_UNITS),
68 vol.Optional(
"voltage"): vol.In(ElectricPotentialConverter.VALID_UNITS),
69 vol.Optional(
"energy"): vol.In(EnergyConverter.VALID_UNITS),
70 vol.Optional(
"information"): vol.In(InformationConverter.VALID_UNITS),
71 vol.Optional(
"mass"): vol.In(MassConverter.VALID_UNITS),
72 vol.Optional(
"power"): vol.In(PowerConverter.VALID_UNITS),
73 vol.Optional(
"pressure"): vol.In(PressureConverter.VALID_UNITS),
74 vol.Optional(
"speed"): vol.In(SpeedConverter.VALID_UNITS),
75 vol.Optional(
"temperature"): vol.In(TemperatureConverter.VALID_UNITS),
76 vol.Optional(
"unitless"): vol.In(UnitlessRatioConverter.VALID_UNITS),
77 vol.Optional(
"volume"): vol.In(VolumeConverter.VALID_UNITS),
78 vol.Optional(
"volume_flow_rate"): vol.In(VolumeFlowRateConverter.VALID_UNITS),
85 """Set up the recorder websocket API."""
86 websocket_api.async_register_command(hass, ws_adjust_sum_statistics)
87 websocket_api.async_register_command(hass, ws_change_statistics_unit)
88 websocket_api.async_register_command(hass, ws_clear_statistics)
89 websocket_api.async_register_command(hass, ws_get_statistic_during_period)
90 websocket_api.async_register_command(hass, ws_get_statistics_during_period)
91 websocket_api.async_register_command(hass, ws_get_statistics_metadata)
92 websocket_api.async_register_command(hass, ws_list_statistic_ids)
93 websocket_api.async_register_command(hass, ws_import_statistics)
94 websocket_api.async_register_command(hass, ws_update_statistics_issues)
95 websocket_api.async_register_command(hass, ws_update_statistics_metadata)
96 websocket_api.async_register_command(hass, ws_validate_statistics)
102 start_time: dt |
None,
105 types: set[Literal[
"max",
"mean",
"min",
"change"]] |
None,
106 units: dict[str, str],
108 """Fetch statistics and convert them to json in the executor."""
110 messages.result_message(
113 hass, start_time, end_time, statistic_id, types, units=units
119 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/statistic_during_period",
120 vol.Required(
"statistic_id"): str,
121 vol.Optional(
"types"): vol.All(
122 [vol.Any(
"max",
"mean",
"min",
"change")], vol.Coerce(set)
124 vol.Optional(
"units"): UNIT_SCHEMA,
125 **PERIOD_SCHEMA.schema,
128 @websocket_api.async_response
132 """Handle statistics websocket command."""
133 if (
"start_time" in msg
or "end_time" in msg)
and "duration" in msg:
134 raise HomeAssistantError
135 if "offset" in msg
and "duration" not in msg:
136 raise HomeAssistantError
140 connection.send_message(
142 _ws_get_statistic_during_period,
159 statistic_ids: set[str] |
None,
160 period: Literal[
"5minute",
"day",
"hour",
"week",
"month"],
161 units: dict[str, str],
162 types: set[Literal[
"change",
"last_reset",
"max",
"mean",
"min",
"state",
"sum"]],
164 """Fetch statistics and convert them to json in the executor."""
174 include_last_reset =
"last_reset" in types
175 for statistic_rows
in result.values():
176 for row
in statistic_rows:
177 row[
"start"] =
int(row[
"start"] * 1000)
178 row[
"end"] =
int(row[
"end"] * 1000)
179 if include_last_reset
and (last_reset := row[
"last_reset"])
is not None:
180 row[
"last_reset"] =
int(last_reset * 1000)
181 return json_bytes(messages.result_message(msg_id, result))
187 """Handle statistics websocket command."""
188 start_time_str = msg[
"start_time"]
189 end_time_str = msg.get(
"end_time")
191 if start_time := dt_util.parse_datetime(start_time_str):
192 start_time = dt_util.as_utc(start_time)
194 connection.send_error(msg[
"id"],
"invalid_start_time",
"Invalid start_time")
198 if end_time := dt_util.parse_datetime(end_time_str):
199 end_time = dt_util.as_utc(end_time)
201 connection.send_error(msg[
"id"],
"invalid_end_time",
"Invalid end_time")
206 if (types := msg.get(
"types"))
is None:
207 types = {
"change",
"last_reset",
"max",
"mean",
"min",
"state",
"sum"}
208 connection.send_message(
210 _ws_get_statistics_during_period,
215 set(msg[
"statistic_ids"]),
223 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/statistics_during_period",
224 vol.Required(
"start_time"): str,
225 vol.Optional(
"end_time"): str,
226 vol.Required(
"statistic_ids"): vol.All([str], vol.Length(min=1)),
227 vol.Required(
"period"): vol.Any(
"5minute",
"hour",
"day",
"week",
"month"),
228 vol.Optional(
"units"): UNIT_SCHEMA,
229 vol.Optional(
"types"): vol.All(
230 [vol.Any(
"change",
"last_reset",
"max",
"mean",
"min",
"state",
"sum")],
235 @websocket_api.async_response
239 """Handle statistics websocket command."""
246 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
248 """Fetch a list of available statistic_id and convert them to JSON.
250 Runs in the executor.
260 """Fetch a list of available statistic_id."""
261 connection.send_message(
263 _ws_get_list_statistic_ids,
266 msg.get(
"statistic_type"),
271 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/list_statistic_ids",
272 vol.Optional(
"statistic_type"): vol.Any(
"sum",
"mean"),
275 @websocket_api.async_response
279 """Fetch a list of available statistic_id."""
283 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/validate_statistics",
286 @websocket_api.async_response
290 """Fetch a list of available statistic_id."""
292 statistic_ids = await instance.async_add_executor_job(
296 connection.send_result(msg[
"id"], statistic_ids)
299 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/update_statistics_issues",
302 @websocket_api.async_response
306 """Update statistics issues."""
308 await instance.async_add_executor_job(
309 update_statistics_issues,
312 connection.send_result(msg[
"id"])
315 @websocket_api.require_admin
316 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/clear_statistics",
317 vol.Required(
"statistic_ids"): [str],
320 @websocket_api.async_response
324 """Clear statistics for a list of statistic_ids.
326 Note: The WS call posts a job to the recorder's queue and then returns, it doesn't
327 wait until the job is completed.
329 done_event = asyncio.Event()
331 def clear_statistics_done() -> None:
332 hass.loop.call_soon_threadsafe(done_event.set)
335 msg[
"statistic_ids"], on_done=clear_statistics_done
338 async
with asyncio.timeout(CLEAR_STATISTICS_TIME_OUT):
339 await done_event.wait()
341 connection.send_error(
342 msg[
"id"], websocket_api.ERR_TIMEOUT,
"clear_statistics timed out"
346 connection.send_result(msg[
"id"])
349 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/get_statistics_metadata",
350 vol.Optional(
"statistic_ids"): [str],
353 @websocket_api.async_response
357 """Get metadata for a list of statistic_ids."""
358 statistic_ids = msg.get(
"statistic_ids")
359 statistic_ids_set_or_none = set(statistic_ids)
if statistic_ids
else None
361 connection.send_result(msg[
"id"], metadata)
364 @websocket_api.require_admin
365 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/update_statistics_metadata",
366 vol.Required(
"statistic_id"): str,
367 vol.Required(
"unit_of_measurement"): vol.Any(str,
None),
370 @websocket_api.async_response
374 """Update statistics metadata for a statistic_id.
376 Only the normalized unit of measurement can be updated.
378 done_event = asyncio.Event()
380 def update_statistics_metadata_done() -> None:
381 hass.loop.call_soon_threadsafe(done_event.set)
385 new_unit_of_measurement=msg[
"unit_of_measurement"],
386 on_done=update_statistics_metadata_done,
389 async
with asyncio.timeout(UPDATE_STATISTICS_METADATA_TIME_OUT):
390 await done_event.wait()
392 connection.send_error(
393 msg[
"id"], websocket_api.ERR_TIMEOUT,
"update_statistics_metadata timed out"
397 connection.send_result(msg[
"id"])
400 @websocket_api.require_admin
401 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/change_statistics_unit",
402 vol.Required(
"statistic_id"): str,
403 vol.Required(
"new_unit_of_measurement"): vol.Any(str,
None),
404 vol.Required(
"old_unit_of_measurement"): vol.Any(str,
None),
411 """Change the unit_of_measurement for a statistic_id.
413 All existing statistics will be converted to the new unit.
418 new_unit_of_measurement=msg[
"new_unit_of_measurement"],
419 old_unit_of_measurement=msg[
"old_unit_of_measurement"],
421 connection.send_result(msg[
"id"])
424 @websocket_api.require_admin
425 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/adjust_sum_statistics",
426 vol.Required(
"statistic_id"): str,
427 vol.Required(
"start_time"): str,
428 vol.Required(
"adjustment"): vol.Any(float, int),
429 vol.Required(
"adjustment_unit_of_measurement"): vol.Any(str,
None),
432 @websocket_api.async_response
436 """Adjust sum statistics.
438 If the statistics is stored as NORMALIZED_UNIT,
439 it's allowed to make an adjustment in VALID_UNIT
441 start_time_str = msg[
"start_time"]
443 if start_time := dt_util.parse_datetime(start_time_str):
444 start_time = dt_util.as_utc(start_time)
446 connection.send_error(msg[
"id"],
"invalid_start_time",
"Invalid start time")
450 metadatas = await instance.async_add_executor_job(
451 list_statistic_ids, hass, {msg[
"statistic_id"]}
454 connection.send_error(msg[
"id"],
"unknown_statistic_id",
"Unknown statistic ID")
456 metadata = metadatas[0]
458 def valid_units(statistics_unit: str |
None, adjustment_unit: str |
None) -> bool:
459 if statistics_unit == adjustment_unit:
461 converter = STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistics_unit)
462 if converter
is not None and adjustment_unit
in converter.VALID_UNITS:
466 stat_unit = metadata[
"statistics_unit_of_measurement"]
467 adjustment_unit = msg[
"adjustment_unit_of_measurement"]
468 if not valid_units(stat_unit, adjustment_unit):
469 connection.send_error(
472 f
"Can't convert {stat_unit} to {adjustment_unit}",
477 msg[
"statistic_id"], start_time, msg[
"adjustment"], adjustment_unit
479 connection.send_result(msg[
"id"])
482 @websocket_api.require_admin
483 @websocket_api.websocket_command(
{
vol.Required("type"):
"recorder/import_statistics",
484 vol.Required(
"metadata"): {
485 vol.Required(
"has_mean"): bool,
486 vol.Required(
"has_sum"): bool,
487 vol.Required(
"name"): vol.Any(str,
None),
488 vol.Required(
"source"): str,
489 vol.Required(
"statistic_id"): str,
490 vol.Required(
"unit_of_measurement"): vol.Any(str,
None),
492 vol.Required(
"stats"): [
494 vol.Required(
"start"): cv.datetime,
495 vol.Optional(
"mean"): vol.Any(float, int),
496 vol.Optional(
"min"): vol.Any(float, int),
497 vol.Optional(
"max"): vol.Any(float, int),
498 vol.Optional(
"last_reset"): vol.Any(cv.datetime,
None),
499 vol.Optional(
"state"): vol.Any(float, int),
500 vol.Optional(
"sum"): vol.Any(float, int),
509 """Import statistics."""
510 metadata = msg[
"metadata"]
517 connection.send_result(msg[
"id"])
518
dict[str, list[StatisticsRow]] statistics_during_period(HomeAssistant hass, datetime start_time, datetime|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str]|None units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] types)
dict[str, Any] statistic_during_period(HomeAssistant hass, datetime|None start_time, datetime|None end_time, str statistic_id, set[Literal["max", "mean", "min", "change"]]|None types, dict[str, str]|None units)
None async_change_statistics_unit(HomeAssistant hass, str statistic_id, *str new_unit_of_measurement, str old_unit_of_measurement)
list[dict] list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
None async_import_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
None async_add_external_statistics(HomeAssistant hass, StatisticMetaData metadata, Iterable[StatisticData] statistics)
list[dict] async_list_statistic_ids(HomeAssistant hass, set[str]|None statistic_ids=None, Literal["mean", "sum"]|None statistic_type=None)
tuple[datetime|None, datetime|None] resolve_period(StatisticPeriod period_def)
bytes _ws_get_statistics_during_period(HomeAssistant hass, int msg_id, dt start_time, dt|None end_time, set[str]|None statistic_ids, Literal["5minute", "day", "hour", "week", "month"] period, dict[str, str] units, set[Literal["change", "last_reset", "max", "mean", "min", "state", "sum"]] types)
None ws_handle_get_statistics_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_update_statistics_issues(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_clear_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bytes _ws_get_statistic_during_period(HomeAssistant hass, int msg_id, dt|None start_time, dt|None end_time, str statistic_id, set[Literal["max", "mean", "min", "change"]]|None types, dict[str, str] units)
None ws_get_statistic_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_list_statistic_ids(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None async_setup(HomeAssistant hass)
None ws_handle_list_statistic_ids(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_change_statistics_unit(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bytes _ws_get_list_statistic_ids(HomeAssistant hass, int msg_id, Literal["mean", "sum"]|None statistic_type=None)
None ws_adjust_sum_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_import_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_get_statistics_during_period(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_validate_statistics(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_update_statistics_metadata(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None ws_get_statistics_metadata(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bool valid_entity_id(str entity_id)
Recorder get_instance(HomeAssistant hass)