Home Assistant Unofficial Reference 2024.12.1
storage.py
Go to the documentation of this file.
1 """Helper to help store data."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Iterable, Mapping, Sequence
7 from contextlib import suppress
8 from copy import deepcopy
9 import inspect
10 from json import JSONDecodeError, JSONEncoder
11 import logging
12 import os
13 from pathlib import Path
14 from typing import Any
15 
16 from propcache import cached_property
17 
18 from homeassistant.const import (
19  EVENT_HOMEASSISTANT_FINAL_WRITE,
20  EVENT_HOMEASSISTANT_STARTED,
21  EVENT_HOMEASSISTANT_STOP,
22 )
23 from homeassistant.core import (
24  CALLBACK_TYPE,
25  DOMAIN as HOMEASSISTANT_DOMAIN,
26  CoreState,
27  Event,
28  HomeAssistant,
29  callback,
30 )
31 from homeassistant.exceptions import HomeAssistantError
32 from homeassistant.loader import bind_hass
33 from homeassistant.util import json as json_util
34 import homeassistant.util.dt as dt_util
35 from homeassistant.util.file import WriteError
36 from homeassistant.util.hass_dict import HassKey
37 
38 from . import json as json_helper
39 
40 # mypy: allow-untyped-calls, allow-untyped-defs, no-warn-return-any
41 # mypy: no-check-untyped-defs
42 MAX_LOAD_CONCURRENTLY = 6
43 
44 STORAGE_DIR = ".storage"
45 _LOGGER = logging.getLogger(__name__)
46 
47 STORAGE_SEMAPHORE: HassKey[asyncio.Semaphore] = HassKey("storage_semaphore")
48 STORAGE_MANAGER: HassKey[_StoreManager] = HassKey("storage_manager")
49 
50 MANAGER_CLEANUP_DELAY = 60
51 
52 
53 @bind_hass
54 async def async_migrator[_T: Mapping[str, Any] | Sequence[Any]](
55  hass: HomeAssistant,
56  old_path: str,
57  store: Store[_T],
58  *,
59  old_conf_load_func: Callable | None = None,
60  old_conf_migrate_func: Callable | None = None,
61 ) -> _T | None:
62  """Migrate old data to a store and then load data.
63 
64  async def old_conf_migrate_func(old_data)
65  """
66  # If we already have store data we have already migrated in the past.
67  if (store_data := await store.async_load()) is not None:
68  return store_data
69 
70  def load_old_config():
71  """Load old config."""
72  if not os.path.isfile(old_path):
73  return None
74 
75  if old_conf_load_func is not None:
76  return old_conf_load_func(old_path)
77 
78  return json_util.load_json(old_path)
79 
80  config = await hass.async_add_executor_job(load_old_config)
81 
82  if config is None:
83  return None
84 
85  if old_conf_migrate_func is not None:
86  config = await old_conf_migrate_func(config)
87 
88  await store.async_save(config)
89  await hass.async_add_executor_job(os.remove, old_path)
90  return config
91 
92 
93 def get_internal_store_manager(hass: HomeAssistant) -> _StoreManager:
94  """Get the store manager.
95 
96  This function is not part of the API and should only be
97  used in the Home Assistant core internals. It is not
98  guaranteed to be stable.
99  """
100  if STORAGE_MANAGER not in hass.data:
101  manager = _StoreManager(hass)
102  hass.data[STORAGE_MANAGER] = manager
103  return hass.data[STORAGE_MANAGER]
104 
105 
107  """Class to help storing data.
108 
109  The store manager is used to cache and manage storage files.
110  """
111 
112  def __init__(self, hass: HomeAssistant) -> None:
113  """Initialize storage manager class."""
114  self._hass_hass = hass
115  self._invalidated: set[str] = set()
116  self._files_files: set[str] | None = None
117  self._data_preload: dict[str, json_util.JsonValueType] = {}
118  self._storage_path: Path = Path(hass.config.config_dir).joinpath(STORAGE_DIR)
119  self._cancel_cleanup_cancel_cleanup: asyncio.TimerHandle | None = None
120 
121  async def async_initialize(self) -> None:
122  """Initialize the storage manager."""
123  hass = self._hass_hass
124  await hass.async_add_executor_job(self._initialize_files_initialize_files)
125  hass.bus.async_listen_once(
126  EVENT_HOMEASSISTANT_STARTED,
127  self._async_schedule_cleanup_async_schedule_cleanup,
128  )
129 
130  @callback
131  def async_invalidate(self, key: str) -> None:
132  """Invalidate cache.
133 
134  Store calls this when its going to save data
135  to ensure that the cache is not used after that.
136  """
137  if "/" not in key:
138  self._invalidated.add(key)
139  self._data_preload.pop(key, None)
140 
141  @callback
143  self, key: str
144  ) -> tuple[bool, json_util.JsonValueType | None] | None:
145  """Fetch data from cache."""
146  #
147  # If the key is invalidated, we don't need to check the cache
148  # If async_initialize has not been called yet, we don't know
149  # if the file exists or not so its a cache miss
150  #
151  # It is very important that we check if self._files is None
152  # because we do not want to incorrectly return a cache miss
153  # because async_initialize has not been called yet as it would
154  # cause the Store to return None when it should not.
155  #
156  # The "/" in key check is to prevent the cache from being used
157  # for subdirs in case we have a key like "hacs/XXX"
158  #
159  if "/" in key or key in self._invalidated or self._files_files is None:
160  _LOGGER.debug("%s: Cache miss", key)
161  return None
162 
163  # If async_initialize has been called and the key is not in self._files
164  # then the file does not exist
165  if key not in self._files_files:
166  _LOGGER.debug("%s: Cache hit, does not exist", key)
167  return (False, None)
168 
169  # If the key is in the preload cache, return it
170  if data := self._data_preload.pop(key, None):
171  _LOGGER.debug("%s: Cache hit data", key)
172  return (True, data)
173 
174  _LOGGER.debug("%s: Cache miss, not preloaded", key)
175  return None
176 
177  @callback
178  def _async_schedule_cleanup(self, _event: Event) -> None:
179  """Schedule the cleanup of old files."""
180  self._cancel_cleanup_cancel_cleanup = self._hass_hass.loop.call_later(
181  MANAGER_CLEANUP_DELAY, self._async_cleanup_async_cleanup
182  )
183  # Handle the case where we stop in the first 60s
184  self._hass_hass.bus.async_listen_once(
185  EVENT_HOMEASSISTANT_STOP,
186  self._async_cancel_and_cleanup_async_cancel_and_cleanup,
187  )
188 
189  @callback
190  def _async_cancel_and_cleanup(self, _event: Event) -> None:
191  """Cancel the cleanup of old files."""
192  self._async_cleanup_async_cleanup()
193  if self._cancel_cleanup_cancel_cleanup:
194  self._cancel_cleanup_cancel_cleanup.cancel()
195  self._cancel_cleanup_cancel_cleanup = None
196 
197  @callback
198  def _async_cleanup(self) -> None:
199  """Cleanup unused cache.
200 
201  If nothing consumes the cache 60s after startup or when we
202  stop Home Assistant, we'll clear the cache.
203  """
204  self._data_preload.clear()
205 
206  async def async_preload(self, keys: Iterable[str]) -> None:
207  """Cache the keys."""
208  # If async_initialize has not been called yet, we can't preload
209  if self._files_files is not None and (existing := self._files_files.intersection(keys)):
210  await self._hass_hass.async_add_executor_job(self._preload_preload, existing)
211 
212  def _preload(self, keys: Iterable[str]) -> None:
213  """Cache the keys."""
214  storage_path = self._storage_path
215  data_preload = self._data_preload
216  for key in keys:
217  storage_file: Path = storage_path.joinpath(key)
218  try:
219  if storage_file.is_file():
220  data_preload[key] = json_util.load_json(storage_file)
221  except Exception as ex: # noqa: BLE001
222  _LOGGER.debug("Error loading %s: %s", key, ex)
223 
224  def _initialize_files(self) -> None:
225  """Initialize the cache."""
226  if self._storage_path.exists():
227  self._files_files = set(os.listdir(self._storage_path))
228 
229 
230 @bind_hass
231 class Store[_T: Mapping[str, Any] | Sequence[Any]]:
232  """Class to help storing data."""
233 
234  def __init__(
235  self,
236  hass: HomeAssistant,
237  version: int,
238  key: str,
239  private: bool = False,
240  *,
241  atomic_writes: bool = False,
242  encoder: type[JSONEncoder] | None = None,
243  minor_version: int = 1,
244  read_only: bool = False,
245  ) -> None:
246  """Initialize storage class."""
247  self.version = version
248  self.minor_version = minor_version
249  self.key = key
250  self.hass = hass
251  self._private = private
252  self._data: dict[str, Any] | None = None
253  self._delay_handle: asyncio.TimerHandle | None = None
254  self._unsub_final_write_listener: CALLBACK_TYPE | None = None
255  self._write_lock = asyncio.Lock()
256  self._load_future: asyncio.Future[_T | None] | None = None
257  self._encoder = encoder
258  self._atomic_writes = atomic_writes
259  self._read_only = read_only
260  self._next_write_time = 0.0
261  self._manager = get_internal_store_manager(hass)
262 
263  @cached_property
264  def path(self):
265  """Return the config path."""
266  return self.hass.config.path(STORAGE_DIR, self.key)
267 
268  def make_read_only(self) -> None:
269  """Make the store read-only.
270 
271  This method is irreversible.
272  """
273  self._read_only = True
274 
275  async def async_load(self) -> _T | None:
276  """Load data.
277 
278  If the expected version and minor version do not match the given
279  versions, the migrate function will be invoked with
280  migrate_func(version, minor_version, config).
281 
282  Will ensure that when a call comes in while another one is in progress,
283  the second call will wait and return the result of the first call.
284  """
285  if self._load_future:
286  return await self._load_future
287 
288  self._load_future = self.hass.loop.create_future()
289  try:
290  result = await self._async_load()
291  except BaseException as ex:
292  self._load_future.set_exception(ex)
293  # Ensure the future is marked as retrieved
294  # since if there is no concurrent call it
295  # will otherwise never be retrieved.
296  self._load_future.exception()
297  raise
298  else:
299  self._load_future.set_result(result)
300  finally:
301  self._load_future = None
302 
303  return result
304 
305  async def _async_load(self) -> _T | None:
306  """Load the data and ensure the task is removed."""
307  if STORAGE_SEMAPHORE not in self.hass.data:
308  self.hass.data[STORAGE_SEMAPHORE] = asyncio.Semaphore(MAX_LOAD_CONCURRENTLY)
309  async with self.hass.data[STORAGE_SEMAPHORE]:
310  return await self._async_load_data()
311 
312  async def _async_load_data(self):
313  """Load the data."""
314  # Check if we have a pending write
315  if self._data is not None:
316  data = self._data
317 
318  # If we didn't generate data yet, do it now.
319  if "data_func" in data:
320  data["data"] = data.pop("data_func")()
321 
322  # We make a copy because code might assume it's safe to mutate loaded data
323  # and we don't want that to mess with what we're trying to store.
324  data = deepcopy(data)
325  elif cache := self._manager.async_fetch(self.key):
326  exists, data = cache
327  if not exists:
328  return None
329  else:
330  try:
331  data = await self.hass.async_add_executor_job(
332  json_util.load_json, self.path
333  )
334  except HomeAssistantError as err:
335  if isinstance(err.__cause__, JSONDecodeError):
336  # If we have a JSONDecodeError, it means the file is corrupt.
337  # We can't recover from this, so we'll log an error, rename the file and
338  # return None so that we can start with a clean slate which will
339  # allow startup to continue so they can restore from a backup.
340  isotime = dt_util.utcnow().isoformat()
341  corrupt_postfix = f".corrupt.{isotime}"
342  corrupt_path = f"{self.path}{corrupt_postfix}"
343  await self.hass.async_add_executor_job(
344  os.rename, self.path, corrupt_path
345  )
346  storage_key = self.key
347  _LOGGER.error(
348  "Unrecoverable error decoding storage %s at %s; "
349  "This may indicate an unclean shutdown, invalid syntax "
350  "from manual edits, or disk corruption; "
351  "The corrupt file has been saved as %s; "
352  "It is recommended to restore from backup: %s",
353  storage_key,
354  self.path,
355  corrupt_path,
356  err,
357  )
358  from .issue_registry import ( # pylint: disable=import-outside-toplevel
359  IssueSeverity,
360  async_create_issue,
361  )
362 
363  issue_domain = HOMEASSISTANT_DOMAIN
364  if (
365  domain := (storage_key.partition(".")[0])
366  ) and domain in self.hass.config.components:
367  issue_domain = domain
368 
370  self.hass,
371  HOMEASSISTANT_DOMAIN,
372  f"storage_corruption_{storage_key}_{isotime}",
373  is_fixable=True,
374  issue_domain=issue_domain,
375  translation_key="storage_corruption",
376  is_persistent=True,
377  severity=IssueSeverity.CRITICAL,
378  translation_placeholders={
379  "storage_key": storage_key,
380  "original_path": self.path,
381  "corrupt_path": corrupt_path,
382  "error": str(err),
383  },
384  )
385  return None
386  raise
387 
388  if data == {}:
389  return None
390 
391  # Add minor_version if not set
392  if "minor_version" not in data:
393  data["minor_version"] = 1
394 
395  if (
396  data["version"] == self.version
397  and data["minor_version"] == self.minor_version
398  ):
399  stored = data["data"]
400  else:
401  _LOGGER.info(
402  "Migrating %s storage from %s.%s to %s.%s",
403  self.key,
404  data["version"],
405  data["minor_version"],
406  self.version,
407  self.minor_version,
408  )
409  if len(inspect.signature(self._async_migrate_func).parameters) == 2:
410  stored = await self._async_migrate_func(data["version"], data["data"])
411  else:
412  try:
413  stored = await self._async_migrate_func(
414  data["version"], data["minor_version"], data["data"]
415  )
416  except NotImplementedError:
417  if data["version"] != self.version:
418  raise
419  stored = data["data"]
420  await self.async_save(stored)
421 
422  return stored
423 
424  async def async_save(self, data: _T) -> None:
425  """Save data."""
426  self._data = {
427  "version": self.version,
428  "minor_version": self.minor_version,
429  "key": self.key,
430  "data": data,
431  }
432 
433  if self.hass.state is CoreState.stopping:
434  self._async_ensure_final_write_listener()
435  return
436 
437  await self._async_handle_write_data()
438 
439  @callback
441  self,
442  data_func: Callable[[], _T],
443  delay: float = 0,
444  ) -> None:
445  """Save data with an optional delay."""
446  self._data = {
447  "version": self.version,
448  "minor_version": self.minor_version,
449  "key": self.key,
450  "data_func": data_func,
451  }
452 
453  next_when = self.hass.loop.time() + delay
454  if self._delay_handle and self._delay_handle.when() < next_when:
455  self._next_write_time = next_when
456  return
457 
458  self._async_cleanup_delay_listener()
459  self._async_ensure_final_write_listener()
460 
461  if self.hass.state is CoreState.stopping:
462  return
463 
464  # We use call_later directly here to avoid a circular import
465  self._async_reschedule_delayed_write(next_when)
466 
467  @callback
468  def _async_reschedule_delayed_write(self, when: float) -> None:
469  """Reschedule a delayed write."""
470  self._delay_handle = self.hass.loop.call_at(
471  when, self._async_schedule_callback_delayed_write
472  )
473 
474  @callback
476  """Schedule the delayed write in a task."""
477  if self.hass.loop.time() < self._next_write_time:
478  # Timer fired too early because there were multiple
479  # calls to async_delay_save before the first one
480  # wrote. Reschedule the timer to the next write time.
481  self._async_reschedule_delayed_write(self._next_write_time)
482  return
483  self.hass.async_create_task_internal(
484  self._async_callback_delayed_write(), eager_start=True
485  )
486 
487  @callback
489  """Ensure that we write if we quit before delay has passed."""
490  if self._unsub_final_write_listener is None:
491  self._unsub_final_write_listener = self.hass.bus.async_listen_once(
492  EVENT_HOMEASSISTANT_FINAL_WRITE,
493  self._async_callback_final_write,
494  )
495 
496  @callback
498  """Clean up a stop listener."""
499  if self._unsub_final_write_listener is not None:
500  self._unsub_final_write_listener()
501  self._unsub_final_write_listener = None
502 
503  @callback
504  def _async_cleanup_delay_listener(self) -> None:
505  """Clean up a delay listener."""
506  if self._delay_handle is not None:
507  self._delay_handle.cancel()
508  self._delay_handle = None
509 
510  async def _async_callback_delayed_write(self) -> None:
511  """Handle a delayed write callback."""
512  # catch the case where a call is scheduled and then we stop Home Assistant
513  if self.hass.state is CoreState.stopping:
514  self._async_ensure_final_write_listener()
515  return
516  await self._async_handle_write_data()
517 
518  async def _async_callback_final_write(self, _event: Event) -> None:
519  """Handle a write because Home Assistant is in final write state."""
520  self._unsub_final_write_listener = None
521  await self._async_handle_write_data()
522 
523  async def _async_handle_write_data(self, *_args):
524  """Handle writing the config."""
525  async with self._write_lock:
526  self._manager.async_invalidate(self.key)
527  self._async_cleanup_delay_listener()
528  self._async_cleanup_final_write_listener()
529 
530  if self._data is None:
531  # Another write already consumed the data
532  return
533 
534  data = self._data
535  self._data = None
536 
537  if self._read_only:
538  return
539 
540  try:
541  await self._async_write_data(self.path, data)
542  except (json_util.SerializationError, WriteError) as err:
543  _LOGGER.error("Error writing config for %s: %s", self.key, err)
544 
545  async def _async_write_data(self, path: str, data: dict) -> None:
546  await self.hass.async_add_executor_job(self._write_data, self.path, data)
547 
548  def _write_data(self, path: str, data: dict) -> None:
549  """Write the data."""
550  os.makedirs(os.path.dirname(path), exist_ok=True)
551 
552  if "data_func" in data:
553  data["data"] = data.pop("data_func")()
554 
555  _LOGGER.debug("Writing data for %s to %s", self.key, path)
556  json_helper.save_json(
557  path,
558  data,
559  self._private,
560  encoder=self._encoder,
561  atomic_writes=self._atomic_writes,
562  )
563 
564  async def _async_migrate_func(self, old_major_version, old_minor_version, old_data):
565  """Migrate to the new version."""
566  raise NotImplementedError
567 
568  async def async_remove(self) -> None:
569  """Remove all data."""
570  self._manager.async_invalidate(self.key)
571  self._async_cleanup_delay_listener()
572  self._async_cleanup_final_write_listener()
573 
574  with suppress(FileNotFoundError):
575  await self.hass.async_add_executor_job(os.unlink, self.path)
None _preload(self, Iterable[str] keys)
Definition: storage.py:212
None async_preload(self, Iterable[str] keys)
Definition: storage.py:206
tuple[bool, json_util.JsonValueType|None]|None async_fetch(self, str key)
Definition: storage.py:144
None _async_cancel_and_cleanup(self, Event _event)
Definition: storage.py:190
None __init__(self, HomeAssistant hass)
Definition: storage.py:112
None _async_schedule_cleanup(self, Event _event)
Definition: storage.py:178
bool add(self, _T matcher)
Definition: match.py:185
None async_create_issue(HomeAssistant hass, str entry_id)
Definition: repairs.py:69
def _async_handle_write_data(self, *_args)
Definition: storage.py:523
_T|None _async_load(self)
Definition: storage.py:305
def _async_migrate_func(self, old_major_version, old_minor_version, old_data)
Definition: storage.py:564
None _async_callback_delayed_write(self)
Definition: storage.py:510
_T|None async_load(self)
Definition: storage.py:275
None _async_cleanup_delay_listener(self)
Definition: storage.py:504
None _async_ensure_final_write_listener(self)
Definition: storage.py:488
None _async_cleanup_final_write_listener(self)
Definition: storage.py:497
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)
Definition: storage.py:444
None _async_write_data(self, str path, dict data)
Definition: storage.py:545
None async_save(self, _T data)
Definition: storage.py:424
None __init__(self, HomeAssistant hass, int version, str key, bool private=False, *bool atomic_writes=False, type[JSONEncoder]|None encoder=None, int minor_version=1, bool read_only=False)
Definition: storage.py:245
_StoreManager get_internal_store_manager(HomeAssistant hass)
Definition: storage.py:93
None _async_callback_final_write(self, Event _event)
Definition: storage.py:518
None _write_data(self, str path, dict data)
Definition: storage.py:548
None _async_reschedule_delayed_write(self, float when)
Definition: storage.py:468
None _async_schedule_callback_delayed_write(self)
Definition: storage.py:475