Home Assistant Unofficial Reference 2024.12.1
tasks.py
Go to the documentation of this file.
1 """Support for recording details."""
2 
3 from __future__ import annotations
4 
5 import abc
6 import asyncio
7 from collections.abc import Callable, Iterable
8 from dataclasses import dataclass
9 from datetime import datetime
10 import logging
11 import threading
12 from typing import TYPE_CHECKING, Any
13 
14 from homeassistant.helpers.typing import UndefinedType
15 from homeassistant.util.event_type import EventType
16 
17 from . import entity_registry, purge, statistics
18 from .const import DOMAIN
19 from .db_schema import Statistics, StatisticsShortTerm
20 from .models import StatisticData, StatisticMetaData
21 from .util import periodic_db_cleanups, session_scope
22 
23 _LOGGER = logging.getLogger(__name__)
24 
25 
26 if TYPE_CHECKING:
27  from .core import Recorder
28 
29 
30 @dataclass(slots=True)
32  """ABC for recorder tasks."""
33 
34  commit_before = True
35 
36  @abc.abstractmethod
37  def run(self, instance: Recorder) -> None:
38  """Handle the task."""
39 
40 
41 @dataclass(slots=True)
43  """Object to store statistics_id and unit to convert unit of statistics."""
44 
45  statistic_id: str
46  new_unit_of_measurement: str
47  old_unit_of_measurement: str
48 
49  def run(self, instance: Recorder) -> None:
50  """Handle the task."""
51  statistics.change_statistics_unit(
52  instance,
53  self.statistic_id,
54  self.new_unit_of_measurement,
55  self.old_unit_of_measurement,
56  )
57 
58 
59 @dataclass(slots=True)
61  """Object to store statistics_ids which for which to remove statistics."""
62 
63  on_done: Callable[[], None] | None
64  statistic_ids: list[str]
65 
66  def run(self, instance: Recorder) -> None:
67  """Handle the task."""
68  statistics.clear_statistics(instance, self.statistic_ids)
69  if self.on_done:
70  self.on_done()
71 
72 
73 @dataclass(slots=True)
75  """Object to store statistics_id and unit for update of statistics metadata."""
76 
77  on_done: Callable[[], None] | None
78  statistic_id: str
79  new_statistic_id: str | None | UndefinedType
80  new_unit_of_measurement: str | None | UndefinedType
81 
82  def run(self, instance: Recorder) -> None:
83  """Handle the task."""
84  statistics.update_statistics_metadata(
85  instance,
86  self.statistic_id,
87  self.new_statistic_id,
88  self.new_unit_of_measurement,
89  )
90  if self.on_done:
91  self.on_done()
92 
93 
94 @dataclass(slots=True)
96  """Task to update states metadata."""
97 
98  entity_id: str
99  new_entity_id: str
100 
101  def run(self, instance: Recorder) -> None:
102  """Handle the task."""
103  entity_registry.update_states_metadata(
104  instance,
105  self.entity_id,
106  self.new_entity_id,
107  )
108 
109 
110 @dataclass(slots=True)
112  """Object to store information about purge task."""
113 
114  purge_before: datetime
115  repack: bool
116  apply_filter: bool
117 
118  def run(self, instance: Recorder) -> None:
119  """Purge the database."""
120  if purge.purge_old_data(
121  instance, self.purge_before, self.repack, self.apply_filter
122  ):
123  # We always need to do the db cleanups after a purge
124  # is finished to ensure the WAL checkpoint and other
125  # tasks happen after a vacuum.
126  periodic_db_cleanups(instance)
127  return
128  # Schedule a new purge task if this one didn't finish
129  instance.queue_task(
130  PurgeTask(self.purge_before, self.repack, self.apply_filter)
131  )
132 
133 
134 @dataclass(slots=True)
136  """Object to store entity information about purge task."""
137 
138  entity_filter: Callable[[str], bool]
139  purge_before: datetime
140 
141  def run(self, instance: Recorder) -> None:
142  """Purge entities from the database."""
143  if purge.purge_entity_data(instance, self.entity_filter, self.purge_before):
144  return
145  # Schedule a new purge task if this one didn't finish
146  instance.queue_task(PurgeEntitiesTask(self.entity_filter, self.purge_before))
147 
148 
149 @dataclass(slots=True)
151  """An object to insert into the recorder to trigger cleanup tasks.
152 
153  Trigger cleanup tasks when auto purge is disabled.
154  """
155 
156  def run(self, instance: Recorder) -> None:
157  """Handle the task."""
158  periodic_db_cleanups(instance)
159 
160 
161 @dataclass(slots=True)
163  """An object to insert into the recorder queue to run a statistics task."""
164 
165  start: datetime
166  fire_events: bool
167 
168  def run(self, instance: Recorder) -> None:
169  """Run statistics task."""
170  if statistics.compile_statistics(instance, self.start, self.fire_events):
171  return
172  # Schedule a new statistics task if this one didn't finish
173  instance.queue_task(StatisticsTask(self.start, self.fire_events))
174 
175 
176 @dataclass(slots=True)
178  """An object to insert into the recorder queue to run a compile missing statistics."""
179 
180  def run(self, instance: Recorder) -> None:
181  """Run statistics task to compile missing statistics."""
182  if statistics.compile_missing_statistics(instance):
183  return
184  # Schedule a new statistics task if this one didn't finish
185  instance.queue_task(CompileMissingStatisticsTask())
186 
187 
188 @dataclass(slots=True)
190  """An object to insert into the recorder queue to run an import statistics task."""
191 
192  metadata: StatisticMetaData
193  statistics: Iterable[StatisticData]
194  table: type[Statistics | StatisticsShortTerm]
195 
196  def run(self, instance: Recorder) -> None:
197  """Run statistics task."""
198  if statistics.import_statistics(
199  instance, self.metadata, self.statistics, self.table
200  ):
201  return
202  # Schedule a new statistics task if this one didn't finish
203  instance.queue_task(
204  ImportStatisticsTask(self.metadata, self.statistics, self.table)
205  )
206 
207 
208 @dataclass(slots=True)
210  """An object to insert into the recorder queue to run an adjust statistics task."""
211 
212  statistic_id: str
213  start_time: datetime
214  sum_adjustment: float
215  adjustment_unit: str
216 
217  def run(self, instance: Recorder) -> None:
218  """Run statistics task."""
219  if statistics.adjust_statistics(
220  instance,
221  self.statistic_id,
222  self.start_time,
223  self.sum_adjustment,
224  self.adjustment_unit,
225  ):
226  return
227  # Schedule a new adjust statistics task if this one didn't finish
228  instance.queue_task(
230  self.statistic_id,
231  self.start_time,
232  self.sum_adjustment,
233  self.adjustment_unit,
234  )
235  )
236 
237 
238 @dataclass(slots=True)
240  """An object to insert into the recorder queue.
241 
242  Tell it set the _queue_watch event.
243  """
244 
245  commit_before = False
246 
247  def run(self, instance: Recorder) -> None:
248  """Handle the task."""
249  instance._queue_watch.set() # noqa: SLF001
250 
251 
252 @dataclass(slots=True)
254  """An object to insert into the recorder queue to prevent writes to the database."""
255 
256  database_locked: asyncio.Event
257  database_unlock: threading.Event
258  queue_overflow: bool
259 
260  def run(self, instance: Recorder) -> None:
261  """Handle the task."""
262  instance._lock_database(self) # noqa: SLF001
263 
264 
265 @dataclass(slots=True)
267  """An object to insert into the recorder queue to stop the event handler."""
268 
269  commit_before = False
270 
271  def run(self, instance: Recorder) -> None:
272  """Handle the task."""
273  instance.stop_requested = True
274 
275 
276 @dataclass(slots=True)
278  """A keep alive to be sent."""
279 
280  commit_before = False
281 
282  def run(self, instance: Recorder) -> None:
283  """Handle the task."""
284  instance._send_keep_alive() # noqa: SLF001
285 
286 
287 @dataclass(slots=True)
289  """Commit the event session."""
290 
291  commit_before = False
292 
293  def run(self, instance: Recorder) -> None:
294  """Handle the task."""
295  instance._commit_event_session_or_retry() # noqa: SLF001
296 
297 
298 @dataclass(slots=True)
300  """Add a recorder platform."""
301 
302  domain: str
303  platform: Any
304  commit_before = False
305 
306  def run(self, instance: Recorder) -> None:
307  """Handle the task."""
308  hass = instance.hass
309  domain = self.domain
310  platform = self.platform
311  platforms: dict[str, Any] = hass.data[DOMAIN].recorder_platforms
312  platforms[domain] = platform
313 
314 
315 @dataclass(slots=True)
317  """Ensure all pending data has been committed."""
318 
319  # commit_before is the default
320  event: asyncio.Event
321 
322  def run(self, instance: Recorder) -> None:
323  """Handle the task."""
324  # Does not use a tracked task to avoid
325  # blocking shutdown if the recorder is broken
326  instance.hass.loop.call_soon_threadsafe(self.event.set)
327 
328 
329 @dataclass(slots=True)
331  """An object to insert into the recorder queue to adjust the LRU size."""
332 
333  commit_before = False
334 
335  def run(self, instance: Recorder) -> None:
336  """Handle the task to adjust the size."""
337  instance._adjust_lru_size() # noqa: SLF001
338 
339 
340 @dataclass(slots=True)
342  """An object to insert into the recorder queue to refresh event types."""
343 
344  event_types: list[EventType[Any] | str]
345 
346  def run(self, instance: Recorder) -> None:
347  """Refresh event types."""
348  with session_scope(session=instance.get_session(), read_only=True) as session:
349  instance.event_type_manager.get_many(
350  self.event_types, session, from_recorder=True
351  )
None run(self, Recorder instance)
Definition: tasks.py:293
None run(self, Recorder instance)
Definition: tasks.py:118
None run(self, Recorder instance)
Definition: tasks.py:37
None run(self, Recorder instance)
Definition: tasks.py:271
None run(self, Recorder instance)
Definition: tasks.py:247
None periodic_db_cleanups(Recorder instance)
Definition: util.py:777
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Definition: recorder.py:86