1 """Support for recording details."""
3 from __future__
import annotations
7 from collections.abc
import Callable, Iterable
8 from dataclasses
import dataclass
9 from datetime
import datetime
12 from typing
import TYPE_CHECKING, Any
17 from .
import entity_registry, purge, statistics
18 from .const
import DOMAIN
19 from .db_schema
import Statistics, StatisticsShortTerm
20 from .models
import StatisticData, StatisticMetaData
21 from .util
import periodic_db_cleanups, session_scope
23 _LOGGER = logging.getLogger(__name__)
27 from .core
import Recorder
30 @dataclass(slots=True)
32 """ABC for recorder tasks."""
37 def run(self, instance: Recorder) ->
None:
38 """Handle the task."""
41 @dataclass(slots=True)
43 """Object to store statistics_id and unit to convert unit of statistics."""
46 new_unit_of_measurement: str
47 old_unit_of_measurement: str
49 def run(self, instance: Recorder) ->
None:
50 """Handle the task."""
51 statistics.change_statistics_unit(
54 self.new_unit_of_measurement,
55 self.old_unit_of_measurement,
59 @dataclass(slots=True)
61 """Object to store statistics_ids which for which to remove statistics."""
63 on_done: Callable[[],
None] |
None
64 statistic_ids: list[str]
66 def run(self, instance: Recorder) ->
None:
67 """Handle the task."""
68 statistics.clear_statistics(instance, self.statistic_ids)
73 @dataclass(slots=True)
75 """Object to store statistics_id and unit for update of statistics metadata."""
77 on_done: Callable[[],
None] |
None
79 new_statistic_id: str |
None | UndefinedType
80 new_unit_of_measurement: str |
None | UndefinedType
82 def run(self, instance: Recorder) ->
None:
83 """Handle the task."""
84 statistics.update_statistics_metadata(
87 self.new_statistic_id,
88 self.new_unit_of_measurement,
94 @dataclass(slots=True)
96 """Task to update states metadata."""
101 def run(self, instance: Recorder) ->
None:
102 """Handle the task."""
103 entity_registry.update_states_metadata(
110 @dataclass(slots=True)
112 """Object to store information about purge task."""
114 purge_before: datetime
118 def run(self, instance: Recorder) ->
None:
119 """Purge the database."""
120 if purge.purge_old_data(
121 instance, self.purge_before, self.repack, self.apply_filter
130 PurgeTask(self.purge_before, self.repack, self.apply_filter)
134 @dataclass(slots=True)
136 """Object to store entity information about purge task."""
138 entity_filter: Callable[[str], bool]
139 purge_before: datetime
141 def run(self, instance: Recorder) ->
None:
142 """Purge entities from the database."""
143 if purge.purge_entity_data(instance, self.entity_filter, self.purge_before):
149 @dataclass(slots=True)
151 """An object to insert into the recorder to trigger cleanup tasks.
153 Trigger cleanup tasks when auto purge is disabled.
156 def run(self, instance: Recorder) ->
None:
157 """Handle the task."""
161 @dataclass(slots=True)
163 """An object to insert into the recorder queue to run a statistics task."""
168 def run(self, instance: Recorder) ->
None:
169 """Run statistics task."""
170 if statistics.compile_statistics(instance, self.start, self.fire_events):
176 @dataclass(slots=True)
178 """An object to insert into the recorder queue to run a compile missing statistics."""
180 def run(self, instance: Recorder) ->
None:
181 """Run statistics task to compile missing statistics."""
182 if statistics.compile_missing_statistics(instance):
188 @dataclass(slots=True)
190 """An object to insert into the recorder queue to run an import statistics task."""
192 metadata: StatisticMetaData
193 statistics: Iterable[StatisticData]
194 table: type[Statistics | StatisticsShortTerm]
196 def run(self, instance: Recorder) ->
None:
197 """Run statistics task."""
198 if statistics.import_statistics(
199 instance, self.metadata, self.statistics, self.table
208 @dataclass(slots=True)
210 """An object to insert into the recorder queue to run an adjust statistics task."""
214 sum_adjustment: float
217 def run(self, instance: Recorder) ->
None:
218 """Run statistics task."""
219 if statistics.adjust_statistics(
224 self.adjustment_unit,
233 self.adjustment_unit,
238 @dataclass(slots=True)
240 """An object to insert into the recorder queue.
242 Tell it set the _queue_watch event.
245 commit_before =
False
247 def run(self, instance: Recorder) ->
None:
248 """Handle the task."""
249 instance._queue_watch.set()
252 @dataclass(slots=True)
254 """An object to insert into the recorder queue to prevent writes to the database."""
256 database_locked: asyncio.Event
257 database_unlock: threading.Event
260 def run(self, instance: Recorder) ->
None:
261 """Handle the task."""
262 instance._lock_database(self)
265 @dataclass(slots=True)
267 """An object to insert into the recorder queue to stop the event handler."""
269 commit_before =
False
271 def run(self, instance: Recorder) ->
None:
272 """Handle the task."""
273 instance.stop_requested =
True
276 @dataclass(slots=True)
278 """A keep alive to be sent."""
280 commit_before =
False
282 def run(self, instance: Recorder) ->
None:
283 """Handle the task."""
284 instance._send_keep_alive()
287 @dataclass(slots=True)
289 """Commit the event session."""
291 commit_before =
False
293 def run(self, instance: Recorder) ->
None:
294 """Handle the task."""
295 instance._commit_event_session_or_retry()
298 @dataclass(slots=True)
300 """Add a recorder platform."""
304 commit_before =
False
306 def run(self, instance: Recorder) ->
None:
307 """Handle the task."""
310 platform = self.platform
311 platforms: dict[str, Any] = hass.data[DOMAIN].recorder_platforms
312 platforms[domain] = platform
315 @dataclass(slots=True)
317 """Ensure all pending data has been committed."""
322 def run(self, instance: Recorder) ->
None:
323 """Handle the task."""
326 instance.hass.loop.call_soon_threadsafe(self.event.set)
329 @dataclass(slots=True)
331 """An object to insert into the recorder queue to adjust the LRU size."""
333 commit_before =
False
335 def run(self, instance: Recorder) ->
None:
336 """Handle the task to adjust the size."""
337 instance._adjust_lru_size()
340 @dataclass(slots=True)
342 """An object to insert into the recorder queue to refresh event types."""
344 event_types: list[EventType[Any] | str]
346 def run(self, instance: Recorder) ->
None:
347 """Refresh event types."""
348 with session_scope(session=instance.get_session(), read_only=
True)
as session:
349 instance.event_type_manager.get_many(
350 self.event_types, session, from_recorder=
True
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None run(self, Recorder instance)
None periodic_db_cleanups(Recorder instance)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)