1 """Helper to help store data."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Iterable, Mapping, Sequence
7 from contextlib
import suppress
8 from copy
import deepcopy
10 from json
import JSONDecodeError, JSONEncoder
13 from pathlib
import Path
14 from typing
import Any
16 from propcache
import cached_property
19 EVENT_HOMEASSISTANT_FINAL_WRITE,
20 EVENT_HOMEASSISTANT_STARTED,
21 EVENT_HOMEASSISTANT_STOP,
25 DOMAIN
as HOMEASSISTANT_DOMAIN,
38 from .
import json
as json_helper
42 MAX_LOAD_CONCURRENTLY = 6
44 STORAGE_DIR =
".storage"
45 _LOGGER = logging.getLogger(__name__)
47 STORAGE_SEMAPHORE: HassKey[asyncio.Semaphore] =
HassKey(
"storage_semaphore")
48 STORAGE_MANAGER: HassKey[_StoreManager] =
HassKey(
"storage_manager")
50 MANAGER_CLEANUP_DELAY = 60
54 async
def async_migrator[_T: Mapping[str, Any] | Sequence[Any]](
59 old_conf_load_func: Callable |
None =
None,
60 old_conf_migrate_func: Callable |
None =
None,
62 """Migrate old data to a store and then load data.
64 async def old_conf_migrate_func(old_data)
67 if (store_data := await store.async_load())
is not None:
70 def load_old_config():
71 """Load old config."""
72 if not os.path.isfile(old_path):
75 if old_conf_load_func
is not None:
76 return old_conf_load_func(old_path)
78 return json_util.load_json(old_path)
80 config = await hass.async_add_executor_job(load_old_config)
85 if old_conf_migrate_func
is not None:
86 config = await old_conf_migrate_func(config)
88 await store.async_save(config)
89 await hass.async_add_executor_job(os.remove, old_path)
94 """Get the store manager.
96 This function is not part of the API and should only be
97 used in the Home Assistant core internals. It is not
98 guaranteed to be stable.
100 if STORAGE_MANAGER
not in hass.data:
102 hass.data[STORAGE_MANAGER] = manager
103 return hass.data[STORAGE_MANAGER]
107 """Class to help storing data.
109 The store manager is used to cache and manage storage files.
113 """Initialize storage manager class."""
115 self._invalidated: set[str] = set()
116 self.
_files_files: set[str] |
None =
None
117 self._data_preload: dict[str, json_util.JsonValueType] = {}
118 self._storage_path: Path = Path(hass.config.config_dir).joinpath(STORAGE_DIR)
122 """Initialize the storage manager."""
123 hass = self.
_hass_hass
125 hass.bus.async_listen_once(
126 EVENT_HOMEASSISTANT_STARTED,
134 Store calls this when its going to save data
135 to ensure that the cache is not used after that.
138 self._invalidated.
add(key)
139 self._data_preload.pop(key,
None)
144 ) -> tuple[bool, json_util.JsonValueType |
None] |
None:
145 """Fetch data from cache."""
159 if "/" in key
or key
in self._invalidated
or self.
_files_files
is None:
160 _LOGGER.debug(
"%s: Cache miss", key)
165 if key
not in self.
_files_files:
166 _LOGGER.debug(
"%s: Cache hit, does not exist", key)
170 if data := self._data_preload.pop(key,
None):
171 _LOGGER.debug(
"%s: Cache hit data", key)
174 _LOGGER.debug(
"%s: Cache miss, not preloaded", key)
179 """Schedule the cleanup of old files."""
184 self.
_hass_hass.bus.async_listen_once(
185 EVENT_HOMEASSISTANT_STOP,
191 """Cancel the cleanup of old files."""
199 """Cleanup unused cache.
201 If nothing consumes the cache 60s after startup or when we
202 stop Home Assistant, we'll clear the cache.
204 self._data_preload.clear()
207 """Cache the keys."""
209 if self.
_files_files
is not None and (existing := self.
_files_files.intersection(keys)):
210 await self.
_hass_hass.async_add_executor_job(self.
_preload_preload, existing)
213 """Cache the keys."""
214 storage_path = self._storage_path
215 data_preload = self._data_preload
217 storage_file: Path = storage_path.joinpath(key)
219 if storage_file.is_file():
220 data_preload[key] = json_util.load_json(storage_file)
221 except Exception
as ex:
222 _LOGGER.debug(
"Error loading %s: %s", key, ex)
225 """Initialize the cache."""
226 if self._storage_path.exists():
227 self.
_files_files = set(os.listdir(self._storage_path))
231 class Store[_T: Mapping[str, Any] | Sequence[Any]]:
232 """Class to help storing data."""
239 private: bool =
False,
241 atomic_writes: bool =
False,
242 encoder: type[JSONEncoder] |
None =
None,
243 minor_version: int = 1,
244 read_only: bool =
False,
246 """Initialize storage class."""
247 self.version = version
248 self.minor_version = minor_version
251 self._private = private
252 self._data: dict[str, Any] |
None =
None
253 self._delay_handle: asyncio.TimerHandle |
None =
None
254 self._unsub_final_write_listener: CALLBACK_TYPE |
None =
None
255 self._write_lock = asyncio.Lock()
256 self._load_future: asyncio.Future[_T |
None] |
None =
None
257 self._encoder = encoder
258 self._atomic_writes = atomic_writes
259 self._read_only = read_only
260 self._next_write_time = 0.0
265 """Return the config path."""
266 return self.hass.config.path(STORAGE_DIR, self.key)
269 """Make the store read-only.
271 This method is irreversible.
273 self._read_only =
True
278 If the expected version and minor version do not match the given
279 versions, the migrate function will be invoked with
280 migrate_func(version, minor_version, config).
282 Will ensure that when a call comes in while another one is in progress,
283 the second call will wait and return the result of the first call.
285 if self._load_future:
286 return await self._load_future
288 self._load_future = self.hass.loop.create_future()
290 result = await self._async_load()
291 except BaseException
as ex:
292 self._load_future.set_exception(ex)
296 self._load_future.exception()
299 self._load_future.set_result(result)
301 self._load_future =
None
306 """Load the data and ensure the task is removed."""
307 if STORAGE_SEMAPHORE
not in self.hass.data:
308 self.hass.data[STORAGE_SEMAPHORE] = asyncio.Semaphore(MAX_LOAD_CONCURRENTLY)
309 async
with self.hass.data[STORAGE_SEMAPHORE]:
310 return await self._async_load_data()
315 if self._data
is not None:
319 if "data_func" in data:
320 data[
"data"] = data.pop(
"data_func")()
324 data = deepcopy(data)
325 elif cache := self._manager.async_fetch(self.key):
331 data = await self.hass.async_add_executor_job(
332 json_util.load_json, self.path
334 except HomeAssistantError
as err:
335 if isinstance(err.__cause__, JSONDecodeError):
340 isotime = dt_util.utcnow().isoformat()
341 corrupt_postfix = f
".corrupt.{isotime}"
342 corrupt_path = f
"{self.path}{corrupt_postfix}"
343 await self.hass.async_add_executor_job(
344 os.rename, self.path, corrupt_path
346 storage_key = self.key
348 "Unrecoverable error decoding storage %s at %s; "
349 "This may indicate an unclean shutdown, invalid syntax "
350 "from manual edits, or disk corruption; "
351 "The corrupt file has been saved as %s; "
352 "It is recommended to restore from backup: %s",
358 from .issue_registry
import (
363 issue_domain = HOMEASSISTANT_DOMAIN
365 domain := (storage_key.partition(
".")[0])
366 )
and domain
in self.hass.config.components:
367 issue_domain = domain
371 HOMEASSISTANT_DOMAIN,
372 f
"storage_corruption_{storage_key}_{isotime}",
374 issue_domain=issue_domain,
375 translation_key=
"storage_corruption",
377 severity=IssueSeverity.CRITICAL,
378 translation_placeholders={
379 "storage_key": storage_key,
380 "original_path": self.path,
381 "corrupt_path": corrupt_path,
392 if "minor_version" not in data:
393 data[
"minor_version"] = 1
396 data[
"version"] == self.version
397 and data[
"minor_version"] == self.minor_version
399 stored = data[
"data"]
402 "Migrating %s storage from %s.%s to %s.%s",
405 data[
"minor_version"],
409 if len(inspect.signature(self._async_migrate_func).parameters) == 2:
410 stored = await self._async_migrate_func(data[
"version"], data[
"data"])
413 stored = await self._async_migrate_func(
414 data[
"version"], data[
"minor_version"], data[
"data"]
416 except NotImplementedError:
417 if data[
"version"] != self.version:
419 stored = data[
"data"]
420 await self.async_save(stored)
427 "version": self.version,
428 "minor_version": self.minor_version,
433 if self.hass.state
is CoreState.stopping:
434 self._async_ensure_final_write_listener()
437 await self._async_handle_write_data()
442 data_func: Callable[[], _T],
445 """Save data with an optional delay."""
447 "version": self.version,
448 "minor_version": self.minor_version,
450 "data_func": data_func,
453 next_when = self.hass.loop.time() + delay
454 if self._delay_handle
and self._delay_handle.when() < next_when:
455 self._next_write_time = next_when
458 self._async_cleanup_delay_listener()
459 self._async_ensure_final_write_listener()
461 if self.hass.state
is CoreState.stopping:
465 self._async_reschedule_delayed_write(next_when)
469 """Reschedule a delayed write."""
470 self._delay_handle = self.hass.loop.call_at(
471 when, self._async_schedule_callback_delayed_write
476 """Schedule the delayed write in a task."""
477 if self.hass.loop.time() < self._next_write_time:
481 self._async_reschedule_delayed_write(self._next_write_time)
483 self.hass.async_create_task_internal(
484 self._async_callback_delayed_write(), eager_start=
True
489 """Ensure that we write if we quit before delay has passed."""
490 if self._unsub_final_write_listener
is None:
491 self._unsub_final_write_listener = self.hass.bus.async_listen_once(
492 EVENT_HOMEASSISTANT_FINAL_WRITE,
493 self._async_callback_final_write,
498 """Clean up a stop listener."""
499 if self._unsub_final_write_listener
is not None:
500 self._unsub_final_write_listener()
501 self._unsub_final_write_listener =
None
505 """Clean up a delay listener."""
506 if self._delay_handle
is not None:
507 self._delay_handle.cancel()
508 self._delay_handle =
None
511 """Handle a delayed write callback."""
513 if self.hass.state
is CoreState.stopping:
514 self._async_ensure_final_write_listener()
516 await self._async_handle_write_data()
519 """Handle a write because Home Assistant is in final write state."""
520 self._unsub_final_write_listener =
None
521 await self._async_handle_write_data()
524 """Handle writing the config."""
525 async
with self._write_lock:
526 self._manager.async_invalidate(self.key)
527 self._async_cleanup_delay_listener()
528 self._async_cleanup_final_write_listener()
530 if self._data
is None:
541 await self._async_write_data(self.path, data)
542 except (json_util.SerializationError, WriteError)
as err:
543 _LOGGER.error(
"Error writing config for %s: %s", self.key, err)
546 await self.hass.async_add_executor_job(self._write_data, self.path, data)
549 """Write the data."""
550 os.makedirs(os.path.dirname(path), exist_ok=
True)
552 if "data_func" in data:
553 data[
"data"] = data.pop(
"data_func")()
555 _LOGGER.debug(
"Writing data for %s to %s", self.key, path)
556 json_helper.save_json(
560 encoder=self._encoder,
561 atomic_writes=self._atomic_writes,
565 """Migrate to the new version."""
566 raise NotImplementedError
569 """Remove all data."""
570 self._manager.async_invalidate(self.key)
571 self._async_cleanup_delay_listener()
572 self._async_cleanup_final_write_listener()
574 with suppress(FileNotFoundError):
575 await self.hass.async_add_executor_job(os.unlink, self.path)
None _preload(self, Iterable[str] keys)
None async_preload(self, Iterable[str] keys)
None _async_cleanup(self)
tuple[bool, json_util.JsonValueType|None]|None async_fetch(self, str key)
None async_initialize(self)
None _async_cancel_and_cleanup(self, Event _event)
None _initialize_files(self)
None __init__(self, HomeAssistant hass)
None _async_schedule_cleanup(self, Event _event)
None async_invalidate(self, str key)
bool add(self, _T matcher)
None async_create_issue(HomeAssistant hass, str entry_id)
def _async_handle_write_data(self, *_args)
_T|None _async_load(self)
def _async_migrate_func(self, old_major_version, old_minor_version, old_data)
None _async_callback_delayed_write(self)
None _async_cleanup_delay_listener(self)
None _async_ensure_final_write_listener(self)
None _async_cleanup_final_write_listener(self)
def _async_load_data(self)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)
None make_read_only(self)
None _async_write_data(self, str path, dict data)
None async_save(self, _T data)
None __init__(self, HomeAssistant hass, int version, str key, bool private=False, *bool atomic_writes=False, type[JSONEncoder]|None encoder=None, int minor_version=1, bool read_only=False)
_StoreManager get_internal_store_manager(HomeAssistant hass)
None _async_callback_final_write(self, Event _event)
None _write_data(self, str path, dict data)
None _async_reschedule_delayed_write(self, float when)
None _async_schedule_callback_delayed_write(self)